diff --git a/api/services/content/v1/content.pb.go b/api/services/content/v1/content.pb.go index 063bb9214..e9a01ee49 100644 --- a/api/services/content/v1/content.pb.go +++ b/api/services/content/v1/content.pb.go @@ -20,6 +20,8 @@ Status StatusRequest StatusResponse + ListStatusesRequest + ListStatusesResponse WriteContentRequest WriteContentResponse AbortRequest @@ -195,7 +197,7 @@ func (*Status) ProtoMessage() {} func (*Status) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{8} } type StatusRequest struct { - Filter string `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` + Ref string `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"` } func (m *StatusRequest) Reset() { *m = StatusRequest{} } @@ -203,13 +205,29 @@ func (*StatusRequest) ProtoMessage() {} func (*StatusRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{9} } type StatusResponse struct { - Statuses []Status `protobuf:"bytes,1,rep,name=statuses" json:"statuses"` + Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"` } func (m *StatusResponse) Reset() { *m = StatusResponse{} } func (*StatusResponse) ProtoMessage() {} func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{10} } +type ListStatusesRequest struct { + Filters []string `protobuf:"bytes,1,rep,name=filters" json:"filters,omitempty"` +} + +func (m *ListStatusesRequest) Reset() { *m = ListStatusesRequest{} } +func (*ListStatusesRequest) ProtoMessage() {} +func (*ListStatusesRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{11} } + +type ListStatusesResponse struct { + Statuses []Status `protobuf:"bytes,1,rep,name=statuses" json:"statuses"` +} + +func (m *ListStatusesResponse) Reset() { *m = ListStatusesResponse{} } +func (*ListStatusesResponse) ProtoMessage() {} +func (*ListStatusesResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{12} } + // WriteContentRequest writes data to the request ref at offset. type WriteContentRequest struct { // Action sets the behavior of the write. @@ -265,7 +283,7 @@ type WriteContentRequest struct { func (m *WriteContentRequest) Reset() { *m = WriteContentRequest{} } func (*WriteContentRequest) ProtoMessage() {} -func (*WriteContentRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{11} } +func (*WriteContentRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{13} } // WriteContentResponse is returned on the culmination of a write call. type WriteContentResponse struct { @@ -299,7 +317,7 @@ type WriteContentResponse struct { func (m *WriteContentResponse) Reset() { *m = WriteContentResponse{} } func (*WriteContentResponse) ProtoMessage() {} -func (*WriteContentResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{12} } +func (*WriteContentResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{14} } type AbortRequest struct { Ref string `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"` @@ -307,7 +325,7 @@ type AbortRequest struct { func (m *AbortRequest) Reset() { *m = AbortRequest{} } func (*AbortRequest) ProtoMessage() {} -func (*AbortRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{13} } +func (*AbortRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{15} } func init() { proto.RegisterType((*Info)(nil), "containerd.services.content.v1.Info") @@ -321,6 +339,8 @@ func init() { proto.RegisterType((*Status)(nil), "containerd.services.content.v1.Status") proto.RegisterType((*StatusRequest)(nil), "containerd.services.content.v1.StatusRequest") proto.RegisterType((*StatusResponse)(nil), "containerd.services.content.v1.StatusResponse") + proto.RegisterType((*ListStatusesRequest)(nil), "containerd.services.content.v1.ListStatusesRequest") + proto.RegisterType((*ListStatusesResponse)(nil), "containerd.services.content.v1.ListStatusesResponse") proto.RegisterType((*WriteContentRequest)(nil), "containerd.services.content.v1.WriteContentRequest") proto.RegisterType((*WriteContentResponse)(nil), "containerd.services.content.v1.WriteContentResponse") proto.RegisterType((*AbortRequest)(nil), "containerd.services.content.v1.AbortRequest") @@ -356,13 +376,15 @@ type ContentClient interface { // // The requested data may be returned in one or more messages. Read(ctx context.Context, in *ReadContentRequest, opts ...grpc.CallOption) (Content_ReadClient, error) - // Status returns the status of ongoing object ingestions, started via + // Status returns the status for a single reference. + Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + // ListStatuses returns the status of ongoing object ingestions, started via // Write. // // Only those matching the regular expression will be provided in the // response. If the provided regular expression is empty, all ingestions // will be provided. - Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + ListStatuses(ctx context.Context, in *ListStatusesRequest, opts ...grpc.CallOption) (*ListStatusesResponse, error) // Write begins or resumes writes to a resource identified by a unique ref. // Only one active stream may exist at a time for each ref. // @@ -484,6 +506,15 @@ func (c *contentClient) Status(ctx context.Context, in *StatusRequest, opts ...g return out, nil } +func (c *contentClient) ListStatuses(ctx context.Context, in *ListStatusesRequest, opts ...grpc.CallOption) (*ListStatusesResponse, error) { + out := new(ListStatusesResponse) + err := grpc.Invoke(ctx, "/containerd.services.content.v1.Content/ListStatuses", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *contentClient) Write(ctx context.Context, opts ...grpc.CallOption) (Content_WriteClient, error) { stream, err := grpc.NewClientStream(ctx, &_Content_serviceDesc.Streams[2], c.cc, "/containerd.services.content.v1.Content/Write", opts...) if err != nil { @@ -545,13 +576,15 @@ type ContentServer interface { // // The requested data may be returned in one or more messages. Read(*ReadContentRequest, Content_ReadServer) error - // Status returns the status of ongoing object ingestions, started via + // Status returns the status for a single reference. + Status(context.Context, *StatusRequest) (*StatusResponse, error) + // ListStatuses returns the status of ongoing object ingestions, started via // Write. // // Only those matching the regular expression will be provided in the // response. If the provided regular expression is empty, all ingestions // will be provided. - Status(context.Context, *StatusRequest) (*StatusResponse, error) + ListStatuses(context.Context, *ListStatusesRequest) (*ListStatusesResponse, error) // Write begins or resumes writes to a resource identified by a unique ref. // Only one active stream may exist at a time for each ref. // @@ -674,6 +707,24 @@ func _Content_Status_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Content_ListStatuses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListStatusesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ContentServer).ListStatuses(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/containerd.services.content.v1.Content/ListStatuses", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ContentServer).ListStatuses(ctx, req.(*ListStatusesRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Content_Write_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(ContentServer).Write(&contentWriteServer{stream}) } @@ -734,6 +785,10 @@ var _Content_serviceDesc = grpc.ServiceDesc{ MethodName: "Status", Handler: _Content_Status_Handler, }, + { + MethodName: "ListStatuses", + Handler: _Content_ListStatuses_Handler, + }, { MethodName: "Abort", Handler: _Content_Abort_Handler, @@ -1053,11 +1108,11 @@ func (m *StatusRequest) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Filter) > 0 { + if len(m.Ref) > 0 { dAtA[i] = 0xa i++ - i = encodeVarintContent(dAtA, i, uint64(len(m.Filter))) - i += copy(dAtA[i:], m.Filter) + i = encodeVarintContent(dAtA, i, uint64(len(m.Ref))) + i += copy(dAtA[i:], m.Ref) } return i, nil } @@ -1073,6 +1128,67 @@ func (m *StatusResponse) Marshal() (dAtA []byte, err error) { } func (m *StatusResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Status != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintContent(dAtA, i, uint64(m.Status.Size())) + n5, err := m.Status.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n5 + } + return i, nil +} + +func (m *ListStatusesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ListStatusesRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + +func (m *ListStatusesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ListStatusesResponse) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -1166,19 +1282,19 @@ func (m *WriteContentResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintContent(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.StartedAt))) - n5, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartedAt, dAtA[i:]) - if err != nil { - return 0, err - } - i += n5 - dAtA[i] = 0x1a - i++ - i = encodeVarintContent(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.UpdatedAt))) - n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.UpdatedAt, dAtA[i:]) + n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartedAt, dAtA[i:]) if err != nil { return 0, err } i += n6 + dAtA[i] = 0x1a + i++ + i = encodeVarintContent(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.UpdatedAt))) + n7, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.UpdatedAt, dAtA[i:]) + if err != nil { + return 0, err + } + i += n7 if m.Offset != 0 { dAtA[i] = 0x20 i++ @@ -1366,7 +1482,7 @@ func (m *Status) Size() (n int) { func (m *StatusRequest) Size() (n int) { var l int _ = l - l = len(m.Filter) + l = len(m.Ref) if l > 0 { n += 1 + l + sovContent(uint64(l)) } @@ -1374,6 +1490,28 @@ func (m *StatusRequest) Size() (n int) { } func (m *StatusResponse) Size() (n int) { + var l int + _ = l + if m.Status != nil { + l = m.Status.Size() + n += 1 + l + sovContent(uint64(l)) + } + return n +} + +func (m *ListStatusesRequest) Size() (n int) { + var l int + _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + l = len(s) + n += 1 + l + sovContent(uint64(l)) + } + } + return n +} + +func (m *ListStatusesResponse) Size() (n int) { var l int _ = l if len(m.Statuses) > 0 { @@ -1562,7 +1700,7 @@ func (this *StatusRequest) String() string { return "nil" } s := strings.Join([]string{`&StatusRequest{`, - `Filter:` + fmt.Sprintf("%v", this.Filter) + `,`, + `Ref:` + fmt.Sprintf("%v", this.Ref) + `,`, `}`, }, "") return s @@ -1572,6 +1710,26 @@ func (this *StatusResponse) String() string { return "nil" } s := strings.Join([]string{`&StatusResponse{`, + `Status:` + strings.Replace(fmt.Sprintf("%v", this.Status), "Status", "Status", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ListStatusesRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ListStatusesRequest{`, + `Filters:` + fmt.Sprintf("%v", this.Filters) + `,`, + `}`, + }, "") + return s +} +func (this *ListStatusesResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ListStatusesResponse{`, `Statuses:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Statuses), "Status", "Status", 1), `&`, ``, 1) + `,`, `}`, }, "") @@ -2576,7 +2734,7 @@ func (m *StatusRequest) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Filter", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Ref", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -2601,7 +2759,7 @@ func (m *StatusRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Filter = string(dAtA[iNdEx:postIndex]) + m.Ref = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -2653,6 +2811,168 @@ func (m *StatusResponse) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: StatusResponse: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthContent + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Status == nil { + m.Status = &Status{} + } + if err := m.Status.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipContent(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthContent + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ListStatusesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ListStatusesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ListStatusesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Filters", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthContent + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Filters = append(m.Filters, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipContent(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthContent + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ListStatusesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ListStatusesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ListStatusesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { case 1: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Statuses", wireType) @@ -3286,61 +3606,63 @@ func init() { } var fileDescriptorContent = []byte{ - // 881 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4f, 0x6f, 0xe3, 0x44, - 0x14, 0xcf, 0x24, 0x8e, 0x69, 0x5f, 0xc2, 0x12, 0x26, 0xa1, 0x8a, 0x8c, 0x70, 0x82, 0x85, 0x20, - 0xda, 0x65, 0x9d, 0x6e, 0xba, 0x37, 0x24, 0x44, 0x92, 0x5d, 0xa0, 0x88, 0x2e, 0x92, 0x37, 0xb0, - 0xa2, 0x17, 0xe4, 0x24, 0x13, 0x63, 0x29, 0xf1, 0x78, 0xed, 0x49, 0x04, 0x9c, 0xb8, 0x20, 0xa1, - 0x8a, 0x03, 0x5f, 0xa0, 0x17, 0xe0, 0xce, 0x9d, 0x4f, 0xd0, 0x23, 0x47, 0xb4, 0x87, 0x96, 0xe6, - 0x83, 0x20, 0xe4, 0xf1, 0xd8, 0x71, 0x9a, 0x96, 0xfc, 0x69, 0x38, 0xe5, 0x79, 0xfc, 0x7e, 0xbf, - 0xf7, 0xef, 0x97, 0x79, 0x86, 0x0f, 0x2d, 0x9b, 0x7d, 0x3d, 0xee, 0xea, 0x3d, 0x3a, 0xaa, 0xf7, - 0xa8, 0xc3, 0x4c, 0xdb, 0x21, 0x5e, 0x3f, 0x69, 0x9a, 0xae, 0x5d, 0xf7, 0x89, 0x37, 0xb1, 0x7b, - 0xc4, 0xe7, 0xe7, 0xc4, 0x61, 0xf5, 0xc9, 0x83, 0xc8, 0xd4, 0x5d, 0x8f, 0x32, 0x8a, 0xd5, 0x19, - 0x42, 0x8f, 0xbc, 0xf5, 0xc8, 0x65, 0xf2, 0x40, 0x29, 0x59, 0xd4, 0xa2, 0xdc, 0xb5, 0x1e, 0x58, - 0x21, 0x4a, 0xa9, 0x58, 0x94, 0x5a, 0x43, 0x52, 0xe7, 0x4f, 0xdd, 0xf1, 0xa0, 0xce, 0xec, 0x11, - 0xf1, 0x99, 0x39, 0x72, 0x85, 0xc3, 0xeb, 0x57, 0x1d, 0xc8, 0xc8, 0x65, 0xdf, 0x86, 0x2f, 0xb5, - 0xdf, 0x11, 0x48, 0x87, 0xce, 0x80, 0xe2, 0x4f, 0x40, 0xee, 0xdb, 0x16, 0xf1, 0x59, 0x19, 0x55, - 0x51, 0x6d, 0xb7, 0xd5, 0x38, 0x3b, 0xaf, 0xa4, 0x5e, 0x9c, 0x57, 0xee, 0x26, 0x8a, 0xa3, 0x2e, - 0x71, 0xe2, 0x1c, 0xfd, 0xba, 0x45, 0xef, 0x87, 0x10, 0xfd, 0x11, 0xff, 0x31, 0x04, 0x03, 0xc6, - 0x20, 0xf9, 0xf6, 0x77, 0xa4, 0x9c, 0xae, 0xa2, 0x5a, 0xc6, 0xe0, 0x36, 0xfe, 0x08, 0xf2, 0x3d, - 0x3a, 0x1a, 0xd9, 0x8c, 0x91, 0xfe, 0x57, 0x26, 0x2b, 0x67, 0xaa, 0xa8, 0x96, 0x6b, 0x28, 0x7a, - 0x98, 0x9c, 0x1e, 0x25, 0xa7, 0x77, 0xa2, 0xec, 0x5b, 0x3b, 0x41, 0x06, 0x3f, 0x5f, 0x54, 0x90, - 0x91, 0x8b, 0x91, 0x4d, 0xa6, 0x7d, 0x09, 0xb9, 0x20, 0x61, 0x83, 0x3c, 0x1f, 0x07, 0xb1, 0xb6, - 0x98, 0xb7, 0xf6, 0x04, 0xf2, 0x21, 0xb5, 0xef, 0x52, 0xc7, 0x27, 0xf8, 0x7d, 0x90, 0x6c, 0x67, - 0x40, 0x39, 0x73, 0xae, 0xf1, 0x96, 0xfe, 0xdf, 0xf3, 0xd1, 0x03, 0x6c, 0x4b, 0x0a, 0xe2, 0x1b, - 0x1c, 0xa7, 0x95, 0x00, 0x7f, 0x6a, 0xfb, 0xac, 0x1d, 0xba, 0x88, 0x8c, 0xb5, 0xcf, 0xa1, 0x38, - 0x77, 0xba, 0x10, 0x2c, 0xb3, 0x51, 0xb0, 0x2e, 0x94, 0x1e, 0x91, 0x21, 0x61, 0x64, 0x3e, 0xdc, - 0x56, 0x1b, 0xf4, 0x13, 0x02, 0x6c, 0x10, 0xb3, 0xff, 0xff, 0x85, 0xc0, 0x7b, 0x20, 0xd3, 0xc1, - 0xc0, 0x27, 0x4c, 0xa8, 0x47, 0x3c, 0xc5, 0x9a, 0xca, 0xcc, 0x34, 0xa5, 0x35, 0xa1, 0x38, 0x97, - 0x8d, 0xe8, 0xe4, 0x8c, 0x02, 0x5d, 0xa5, 0xe8, 0x9b, 0xcc, 0xe4, 0xc4, 0x79, 0x83, 0xdb, 0xda, - 0x2f, 0x69, 0x90, 0x9f, 0x32, 0x93, 0x8d, 0x7d, 0xdc, 0x06, 0xf0, 0x99, 0xe9, 0x09, 0x7d, 0xa2, - 0x35, 0xf4, 0xb9, 0x2b, 0x70, 0x4d, 0x16, 0x90, 0x8c, 0xdd, 0xbe, 0x29, 0x48, 0xd2, 0xeb, 0x90, - 0x08, 0x5c, 0x93, 0xe1, 0x02, 0x64, 0x3c, 0x32, 0xe0, 0xa5, 0xee, 0x1a, 0x81, 0x99, 0x28, 0x49, - 0x9a, 0x2b, 0xa9, 0x04, 0x59, 0x46, 0x99, 0x39, 0x2c, 0x67, 0xf9, 0x71, 0xf8, 0x80, 0x9f, 0xc0, - 0x0e, 0xf9, 0xc6, 0x25, 0x3d, 0x46, 0xfa, 0x65, 0x79, 0xe3, 0x89, 0xc4, 0x1c, 0xda, 0x3b, 0xf0, - 0x72, 0xd8, 0xa3, 0x68, 0xe0, 0x7b, 0x20, 0x0f, 0xec, 0x21, 0x23, 0x5e, 0x38, 0x70, 0x43, 0x3c, - 0x69, 0xc7, 0x70, 0x27, 0x72, 0x14, 0xb3, 0xf8, 0x18, 0x76, 0x7c, 0x7e, 0x42, 0x7c, 0xa1, 0xec, - 0xb7, 0x97, 0x29, 0x3b, 0x64, 0x10, 0xda, 0x8e, 0xd1, 0xda, 0x3f, 0x08, 0x8a, 0xcf, 0x3c, 0x7b, - 0x41, 0xdf, 0x6d, 0x90, 0xcd, 0x1e, 0xb3, 0xa9, 0xc3, 0x73, 0xb9, 0xd3, 0xb8, 0xb7, 0x8c, 0x9f, - 0x93, 0x34, 0x39, 0xc4, 0x10, 0xd0, 0xa8, 0xe3, 0xe9, 0x59, 0xc7, 0xe3, 0xce, 0x66, 0x6e, 0xea, - 0xac, 0x74, 0xfb, 0xce, 0x26, 0xe6, 0x9a, 0xbd, 0x56, 0xaa, 0x72, 0x42, 0xaa, 0x17, 0x69, 0x28, - 0xcd, 0x37, 0x40, 0xf4, 0x78, 0x2b, 0x1d, 0x98, 0x57, 0x7f, 0x7a, 0x1b, 0xea, 0xcf, 0x6c, 0xa6, - 0xfe, 0xf5, 0xb4, 0x3e, 0xbb, 0x7b, 0xe4, 0x5b, 0x5f, 0x6f, 0x55, 0xc8, 0x37, 0xbb, 0xd4, 0x8b, - 0xa5, 0x25, 0x54, 0x81, 0x62, 0x55, 0xdc, 0xfd, 0x01, 0x41, 0x2e, 0xd1, 0x3d, 0xfc, 0x06, 0x48, - 0x4f, 0x3b, 0xcd, 0x4e, 0x21, 0xa5, 0x14, 0x4f, 0x4e, 0xab, 0xaf, 0x24, 0x5e, 0x05, 0x2a, 0xc6, - 0x15, 0xc8, 0x3e, 0x33, 0x0e, 0x3b, 0x8f, 0x0b, 0x48, 0x29, 0x9d, 0x9c, 0x56, 0x0b, 0x89, 0xf7, - 0xdc, 0xc4, 0x6f, 0x82, 0xdc, 0xfe, 0xec, 0xe8, 0xe8, 0xb0, 0x53, 0x48, 0x2b, 0xaf, 0x9d, 0x9c, - 0x56, 0x5f, 0x4d, 0x78, 0xb4, 0xf9, 0xd2, 0x53, 0x8a, 0x3f, 0xfe, 0xaa, 0xa6, 0xfe, 0xf8, 0x4d, - 0x4d, 0xc6, 0x6d, 0xbc, 0xc8, 0xc2, 0x4b, 0x42, 0x06, 0xd8, 0x14, 0x1b, 0xfc, 0xde, 0x2a, 0x2b, - 0x43, 0x94, 0xa6, 0xbc, 0xbb, 0x9a, 0xb3, 0x50, 0xd8, 0x73, 0x90, 0x82, 0x95, 0x85, 0x1b, 0xcb, - 0x50, 0x8b, 0xeb, 0x4e, 0x39, 0x58, 0x0b, 0x13, 0x06, 0xdc, 0x47, 0xf8, 0x0b, 0x90, 0xc3, 0x75, - 0x86, 0x1f, 0x2e, 0x23, 0xb8, 0x6e, 0xed, 0x29, 0x7b, 0x0b, 0xb2, 0x7b, 0x1c, 0x7c, 0xf6, 0x04, - 0xa5, 0x04, 0x3b, 0x63, 0x79, 0x29, 0x8b, 0x7b, 0x6e, 0x79, 0x29, 0xd7, 0x6c, 0xa3, 0x7d, 0x84, - 0xad, 0x78, 0xc5, 0xdc, 0x5f, 0xed, 0xee, 0x8b, 0xe2, 0xe9, 0xab, 0xba, 0x8b, 0x31, 0x4d, 0x20, - 0x1b, 0xca, 0xea, 0x60, 0xa5, 0x1b, 0xe0, 0x4a, 0x75, 0x0f, 0xd7, 0x03, 0x85, 0x31, 0x6b, 0x68, - 0x1f, 0xe1, 0x23, 0xc8, 0xf2, 0xff, 0x0d, 0x5e, 0xaa, 0xaa, 0xe4, 0xdf, 0xeb, 0xa6, 0x11, 0xb5, - 0x8e, 0xcf, 0x2e, 0xd5, 0xd4, 0x5f, 0x97, 0x6a, 0xea, 0xfb, 0xa9, 0x8a, 0xce, 0xa6, 0x2a, 0xfa, - 0x73, 0xaa, 0xa2, 0xbf, 0xa7, 0x2a, 0x3a, 0xfe, 0x60, 0xd3, 0x2f, 0xed, 0xf7, 0x84, 0xd9, 0x95, - 0x79, 0xac, 0x83, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x26, 0x49, 0x54, 0x8e, 0xb4, 0x0b, 0x00, - 0x00, + // 927 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x8f, 0xdb, 0x44, + 0x14, 0xcf, 0x24, 0x8e, 0xbb, 0xfb, 0x12, 0x4a, 0x98, 0x84, 0x2a, 0x32, 0xc2, 0x49, 0x2d, 0x84, + 0x56, 0x2d, 0xb5, 0xb7, 0xd9, 0xbd, 0x21, 0x21, 0xb2, 0x69, 0x81, 0x45, 0x6c, 0x41, 0x6e, 0xa0, + 0xa2, 0x17, 0x70, 0x92, 0x89, 0xb1, 0x94, 0x78, 0x5c, 0x7b, 0x12, 0x51, 0x4e, 0x5c, 0x90, 0xd0, + 0x8a, 0x03, 0x5f, 0x60, 0x2f, 0xc0, 0x9d, 0x3b, 0x9f, 0x60, 0x8f, 0x5c, 0x90, 0x10, 0x87, 0x96, + 0xe6, 0x83, 0x20, 0xe4, 0xf1, 0xd8, 0x71, 0x92, 0x5d, 0xf2, 0x67, 0xc3, 0x69, 0xdf, 0x8c, 0xdf, + 0xef, 0xfd, 0xff, 0xcd, 0xcb, 0xc2, 0x7b, 0xb6, 0xc3, 0xbe, 0x1a, 0x75, 0xf4, 0x2e, 0x1d, 0x1a, + 0x5d, 0xea, 0x32, 0xcb, 0x71, 0x89, 0xdf, 0x4b, 0x8b, 0x96, 0xe7, 0x18, 0x01, 0xf1, 0xc7, 0x4e, + 0x97, 0x04, 0xfc, 0x9e, 0xb8, 0xcc, 0x18, 0xdf, 0x8d, 0x45, 0xdd, 0xf3, 0x29, 0xa3, 0x58, 0x9d, + 0x22, 0xf4, 0x58, 0x5b, 0x8f, 0x55, 0xc6, 0x77, 0x95, 0x8a, 0x4d, 0x6d, 0xca, 0x55, 0x8d, 0x50, + 0x8a, 0x50, 0x4a, 0xcd, 0xa6, 0xd4, 0x1e, 0x10, 0x83, 0x9f, 0x3a, 0xa3, 0xbe, 0xc1, 0x9c, 0x21, + 0x09, 0x98, 0x35, 0xf4, 0x84, 0xc2, 0x6b, 0xf3, 0x0a, 0x64, 0xe8, 0xb1, 0xa7, 0xd1, 0x47, 0xed, + 0x57, 0x04, 0xd2, 0xb1, 0xdb, 0xa7, 0xf8, 0x43, 0x90, 0x7b, 0x8e, 0x4d, 0x02, 0x56, 0x45, 0x75, + 0xb4, 0xb7, 0x7b, 0xd4, 0x38, 0x7f, 0x56, 0xcb, 0xfc, 0xf5, 0xac, 0x76, 0x2b, 0x95, 0x1c, 0xf5, + 0x88, 0x9b, 0xc4, 0x18, 0x18, 0x36, 0xbd, 0x13, 0x41, 0xf4, 0x7b, 0xfc, 0x8f, 0x29, 0x2c, 0x60, + 0x0c, 0x52, 0xe0, 0x7c, 0x43, 0xaa, 0xd9, 0x3a, 0xda, 0xcb, 0x99, 0x5c, 0xc6, 0xef, 0x43, 0xb1, + 0x4b, 0x87, 0x43, 0x87, 0x31, 0xd2, 0xfb, 0xc2, 0x62, 0xd5, 0x5c, 0x1d, 0xed, 0x15, 0x1a, 0x8a, + 0x1e, 0x05, 0xa7, 0xc7, 0xc1, 0xe9, 0xed, 0x38, 0xfa, 0xa3, 0x9d, 0x30, 0x82, 0x1f, 0x9f, 0xd7, + 0x90, 0x59, 0x48, 0x90, 0x4d, 0xa6, 0x7d, 0x0e, 0x85, 0x30, 0x60, 0x93, 0x3c, 0x19, 0x85, 0xbe, + 0xb6, 0x18, 0xb7, 0xf6, 0x00, 0x8a, 0x91, 0xe9, 0xc0, 0xa3, 0x6e, 0x40, 0xf0, 0x3b, 0x20, 0x39, + 0x6e, 0x9f, 0x72, 0xcb, 0x85, 0xc6, 0x1b, 0xfa, 0x7f, 0xf7, 0x47, 0x0f, 0xb1, 0x47, 0x52, 0xe8, + 0xdf, 0xe4, 0x38, 0xad, 0x02, 0xf8, 0x23, 0x27, 0x60, 0xad, 0x48, 0x45, 0x44, 0xac, 0x7d, 0x0a, + 0xe5, 0x99, 0xdb, 0x05, 0x67, 0xb9, 0x8d, 0x9c, 0x75, 0xa0, 0x72, 0x8f, 0x0c, 0x08, 0x23, 0xb3, + 0xee, 0xb6, 0x5a, 0xa0, 0x1f, 0x10, 0x60, 0x93, 0x58, 0xbd, 0xff, 0xcf, 0x05, 0xbe, 0x01, 0x32, + 0xed, 0xf7, 0x03, 0xc2, 0xc4, 0xf4, 0x88, 0x53, 0x32, 0x53, 0xb9, 0xe9, 0x4c, 0x69, 0x4d, 0x28, + 0xcf, 0x44, 0x23, 0x2a, 0x39, 0x35, 0x81, 0xe6, 0x4d, 0xf4, 0x2c, 0x66, 0x71, 0xc3, 0x45, 0x93, + 0xcb, 0xda, 0x4f, 0x59, 0x90, 0x1f, 0x32, 0x8b, 0x8d, 0x02, 0xdc, 0x02, 0x08, 0x98, 0xe5, 0x8b, + 0xf9, 0x44, 0x6b, 0xcc, 0xe7, 0xae, 0xc0, 0x35, 0x59, 0x68, 0x64, 0xe4, 0xf5, 0x2c, 0x61, 0x24, + 0xbb, 0x8e, 0x11, 0x81, 0x6b, 0x32, 0x5c, 0x82, 0x9c, 0x4f, 0xfa, 0x3c, 0xd5, 0x5d, 0x33, 0x14, + 0x53, 0x29, 0x49, 0x33, 0x29, 0x55, 0x20, 0xcf, 0x28, 0xb3, 0x06, 0xd5, 0x3c, 0xbf, 0x8e, 0x0e, + 0xf8, 0x01, 0xec, 0x90, 0xaf, 0x3d, 0xd2, 0x65, 0xa4, 0x57, 0x95, 0x37, 0xee, 0x48, 0x62, 0x43, + 0xbb, 0x09, 0x2f, 0x45, 0x35, 0x8a, 0x1b, 0x2e, 0x02, 0x44, 0x49, 0x80, 0xda, 0x27, 0x70, 0x3d, + 0x56, 0x49, 0xe6, 0x59, 0x0e, 0xf8, 0x8d, 0x28, 0xe5, 0x9b, 0xcb, 0x26, 0x5a, 0xe0, 0x05, 0x4a, + 0x33, 0x22, 0x9a, 0x44, 0xb7, 0x24, 0x71, 0x5d, 0x85, 0x6b, 0x7d, 0x67, 0xc0, 0x88, 0x1f, 0x70, + 0xa6, 0xec, 0x9a, 0xf1, 0x51, 0xfb, 0x12, 0x2a, 0xb3, 0x00, 0x11, 0xc8, 0x07, 0xb0, 0x13, 0x88, + 0x3b, 0x41, 0xae, 0x15, 0x43, 0x11, 0xf4, 0x4a, 0xd0, 0xda, 0x3f, 0x08, 0xca, 0x8f, 0x7c, 0x67, + 0x81, 0x62, 0x2d, 0x90, 0xad, 0x2e, 0x73, 0xa8, 0xcb, 0x53, 0xbd, 0xde, 0xb8, 0xbd, 0xcc, 0x3e, + 0x37, 0xd2, 0xe4, 0x10, 0x53, 0x40, 0xe3, 0x9a, 0x66, 0xa7, 0x4d, 0x4f, 0x9a, 0x9b, 0xbb, 0xac, + 0xb9, 0xd2, 0xd5, 0x9b, 0x9b, 0x1a, 0xad, 0xfc, 0x85, 0x6c, 0x91, 0x53, 0x6c, 0x79, 0x9e, 0x85, + 0xca, 0x6c, 0x01, 0x44, 0x8d, 0xb7, 0x52, 0x81, 0x59, 0x02, 0x66, 0xb7, 0x41, 0xc0, 0xdc, 0x66, + 0x04, 0x5c, 0x8f, 0x6e, 0xd3, 0xe7, 0x4f, 0xbe, 0xf2, 0x0b, 0x5b, 0x87, 0x62, 0xb3, 0x43, 0x7d, + 0x76, 0x29, 0xd3, 0x6e, 0x7d, 0x87, 0xa0, 0x90, 0xaa, 0x1e, 0x7e, 0x1d, 0xa4, 0x87, 0xed, 0x66, + 0xbb, 0x94, 0x51, 0xca, 0xa7, 0x67, 0xf5, 0x97, 0x53, 0x9f, 0xc2, 0x29, 0xc6, 0x35, 0xc8, 0x3f, + 0x32, 0x8f, 0xdb, 0xf7, 0x4b, 0x48, 0xa9, 0x9c, 0x9e, 0xd5, 0x4b, 0xa9, 0xef, 0x5c, 0xc4, 0x37, + 0x41, 0x6e, 0x7d, 0x7c, 0x72, 0x72, 0xdc, 0x2e, 0x65, 0x95, 0x57, 0x4f, 0xcf, 0xea, 0xaf, 0xa4, + 0x34, 0x5a, 0x7c, 0xef, 0x2a, 0xe5, 0xef, 0x7f, 0x56, 0x33, 0xbf, 0xfd, 0xa2, 0xa6, 0xfd, 0x36, + 0xfe, 0x90, 0xe1, 0x9a, 0x18, 0x03, 0x6c, 0x89, 0x1f, 0x11, 0xb7, 0x57, 0xd9, 0x5a, 0x22, 0x35, + 0xe5, 0xad, 0xd5, 0x94, 0xc5, 0x84, 0x3d, 0x01, 0x29, 0x64, 0x37, 0x6e, 0x2c, 0x43, 0x2d, 0x6e, + 0x5c, 0xe5, 0x60, 0x2d, 0x4c, 0xe4, 0x70, 0x1f, 0xe1, 0xcf, 0x40, 0x8e, 0x36, 0x2a, 0x3e, 0x5c, + 0x66, 0xe0, 0xa2, 0xcd, 0xab, 0xdc, 0x58, 0x18, 0xbb, 0xfb, 0xe1, 0x2f, 0xaf, 0x30, 0x95, 0x70, + 0x6d, 0x2d, 0x4f, 0x65, 0x71, 0xd5, 0x2e, 0x4f, 0xe5, 0x82, 0x85, 0xb8, 0x8f, 0xb0, 0x9d, 0x6c, + 0xb9, 0x3b, 0x2b, 0x3e, 0xc3, 0xc2, 0x9f, 0xbe, 0xaa, 0xba, 0x68, 0xd3, 0x53, 0x28, 0xa6, 0x1f, + 0x61, 0xbc, 0x52, 0xe9, 0xe7, 0xde, 0x78, 0xe5, 0x70, 0x3d, 0x90, 0x70, 0x3d, 0x86, 0x7c, 0x34, + 0xd1, 0x07, 0x2b, 0x3d, 0x3e, 0x73, 0x85, 0x3d, 0x5c, 0x0f, 0x14, 0xf9, 0xdc, 0x43, 0xfb, 0x08, + 0x9f, 0x40, 0x9e, 0x53, 0x16, 0x2f, 0x1d, 0xe8, 0x34, 0xb3, 0x2f, 0x9b, 0x8e, 0xa3, 0xc7, 0xe7, + 0x2f, 0xd4, 0xcc, 0x9f, 0x2f, 0xd4, 0xcc, 0xb7, 0x13, 0x15, 0x9d, 0x4f, 0x54, 0xf4, 0xfb, 0x44, + 0x45, 0x7f, 0x4f, 0x54, 0xf4, 0xf8, 0xdd, 0x4d, 0xff, 0xcf, 0x78, 0x5b, 0x88, 0x1d, 0x99, 0xfb, + 0x3a, 0xf8, 0x37, 0x00, 0x00, 0xff, 0xff, 0xc6, 0xb1, 0x1a, 0xcd, 0xb2, 0x0c, 0x00, 0x00, } diff --git a/api/services/content/v1/content.proto b/api/services/content/v1/content.proto index cb90edd9d..5bb554980 100644 --- a/api/services/content/v1/content.proto +++ b/api/services/content/v1/content.proto @@ -32,13 +32,16 @@ service Content { // The requested data may be returned in one or more messages. rpc Read(ReadContentRequest) returns (stream ReadContentResponse); - // Status returns the status of ongoing object ingestions, started via + // Status returns the status for a single reference. + rpc Status(StatusRequest) returns (StatusResponse); + + // ListStatuses returns the status of ongoing object ingestions, started via // Write. // // Only those matching the regular expression will be provided in the // response. If the provided regular expression is empty, all ingestions // will be provided. - rpc Status(StatusRequest) returns (StatusResponse); + rpc ListStatuses(ListStatusesRequest) returns (ListStatusesResponse); // Write begins or resumes writes to a resource identified by a unique ref. // Only one active stream may exist at a time for each ref. @@ -124,11 +127,20 @@ message Status { string expected = 6 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false]; } + message StatusRequest { - string filter = 1; + string ref = 1; } message StatusResponse { + Status status = 1; +} + +message ListStatusesRequest { + repeated string filters = 1; +} + +message ListStatusesResponse { repeated Status statuses = 1 [(gogoproto.nullable) = false]; } diff --git a/cmd/dist/active.go b/cmd/dist/active.go index a5935c5d2..ede21f2c4 100644 --- a/cmd/dist/active.go +++ b/cmd/dist/active.go @@ -40,7 +40,7 @@ var activeCommand = cli.Command{ return err } - active, err := cs.Status(ctx, match) + active, err := cs.ListStatuses(ctx, match) if err != nil { return err } diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go index d8f2f6848..5258da8b8 100644 --- a/cmd/dist/fetch.go +++ b/cmd/dist/fetch.go @@ -123,7 +123,7 @@ outer: activeSeen := map[string]struct{}{} if !done { - active, err := cs.Status(ctx, "") + active, err := cs.ListStatuses(ctx, "") if err != nil { log.G(ctx).WithError(err).Error("active check failed") continue diff --git a/content/content.go b/content/content.go index ca236d66c..2c91b99d5 100644 --- a/content/content.go +++ b/content/content.go @@ -58,19 +58,19 @@ type Manager interface { // Delete removes the content from the store. Delete(ctx context.Context, dgst digest.Digest) error +} - // Status returns the status of any active ingestions whose ref match the +// IngestManager provides methods for managing ingests. +type IngestManager interface { + // Status returns the status of the provided ref. + Status(ctx context.Context, ref string) (Status, error) + + // ListStatuses returns the status of any active ingestions whose ref match the // provided regular expression. If empty, all active ingestions will be // returned. - // - // TODO(stevvooe): Status may be slighly out of place here. If this remains - // here, we should remove Manager and just define these on store. - Status(ctx context.Context, re string) ([]Status, error) + ListStatuses(ctx context.Context, filters ...string) ([]Status, error) // Abort completely cancels the ingest operation targeted by ref. - // - // TODO(stevvooe): Same consideration as above. This should really be - // restricted to an ingest management interface. Abort(ctx context.Context, ref string) error } @@ -86,6 +86,7 @@ type Writer interface { // are commonly provided by complete implementations. type Store interface { Manager - Ingester Provider + IngestManager + Ingester } diff --git a/content/content_test.go b/content/content_test.go index 8942cbf03..b73cd61e4 100644 --- a/content/content_test.go +++ b/content/content_test.go @@ -52,7 +52,7 @@ func TestContentWriter(t *testing.T) { } // we should also see this as an active ingestion - ingestions, err := cs.Status(ctx, "") + ingestions, err := cs.ListStatuses(ctx, "") if err != nil { t.Fatal(err) } diff --git a/content/store.go b/content/store.go index a1092f791..ba95199d8 100644 --- a/content/store.go +++ b/content/store.go @@ -7,11 +7,11 @@ import ( "io/ioutil" "os" "path/filepath" - "regexp" "strconv" "time" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" "github.com/containerd/containerd/log" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -136,7 +136,11 @@ func (cs *store) Walk(ctx context.Context, fn WalkFunc) error { }) } -func (s *store) Status(ctx context.Context, re string) ([]Status, error) { +func (s *store) Status(ctx context.Context, ref string) (Status, error) { + return s.status(s.ingestRoot(ref)) +} + +func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error) { fp, err := os.Open(filepath.Join(s.root, "ingest")) if err != nil { return nil, err @@ -149,7 +153,7 @@ func (s *store) Status(ctx context.Context, re string) ([]Status, error) { return nil, err } - rec, err := regexp.Compile(re) + filter, err := filters.ParseAll(fs...) if err != nil { return nil, err } @@ -174,11 +178,9 @@ func (s *store) Status(ctx context.Context, re string) ([]Status, error) { continue } - if !rec.MatchString(stat.Ref) { - continue + if filter.Match(adaptStatus(stat)) { + active = append(active, stat) } - - active = append(active, stat) } return active, nil @@ -206,6 +208,20 @@ func (s *store) status(ingestPath string) (Status, error) { }, nil } +func adaptStatus(status Status) filters.Adaptor { + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + switch fieldpath[0] { + case "ref": + return status.Ref, true + } + + return "", false + }) +} + // total attempts to resolve the total expected size for the write. func (s *store) total(ingestPath string) int64 { totalS, err := readFileString(filepath.Join(ingestPath, "total")) diff --git a/differ/differ.go b/differ/differ.go index a39cb0375..76fedcced 100644 --- a/differ/differ.go +++ b/differ/differ.go @@ -5,9 +5,11 @@ import ( "io/ioutil" "os" + "github.com/boltdb/bolt" "github.com/containerd/containerd/archive" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" digest "github.com/opencontainers/go-digest" @@ -22,13 +24,18 @@ func init() { ID: "base-diff", Requires: []plugin.PluginType{ plugin.ContentPlugin, + plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { c, err := ic.Get(plugin.ContentPlugin) if err != nil { return nil, err } - return NewBaseDiff(c.(content.Store)) + md, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store))) }, }) } diff --git a/metadata/adaptors.go b/metadata/adaptors.go index 25aab0323..e3df7df80 100644 --- a/metadata/adaptors.go +++ b/metadata/adaptors.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/images" ) @@ -69,6 +70,20 @@ func adaptContainer(o interface{}) filters.Adaptor { }) } +func adaptContentStatus(status content.Status) filters.Adaptor { + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + switch fieldpath[0] { + case "ref": + return status.Ref, true + } + + return "", false + }) +} + func checkMap(fieldpath []string, m map[string]string) (string, bool) { if len(m) == 0 { return "", false diff --git a/metadata/buckets.go b/metadata/buckets.go index ca555f395..0ba3126bb 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -2,6 +2,7 @@ package metadata import ( "github.com/boltdb/bolt" + digest "github.com/opencontainers/go-digest" ) // The layout where a "/" delineates a bucket is desribed in the following @@ -33,6 +34,9 @@ var ( bucketKeyObjectImages = []byte("images") // stores image objects bucketKeyObjectContainers = []byte("containers") // stores container objects bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references + bucketKeyObjectContent = []byte("content") // stores content references + bucketKeyObjectBlob = []byte("blob") // stores content links + bucketKeyObjectIngest = []byte("ingest") // stores ingest links bucketKeyDigest = []byte("digest") bucketKeyMediaType = []byte("mediatype") @@ -139,3 +143,31 @@ func createSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) (*bolt. func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Bucket { return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots, []byte(snapshotter)) } + +func createBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) + if err != nil { + return nil, err + } + return bkt, nil +} + +func getBlobsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob) +} + +func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) +} + +func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) + if err != nil { + return nil, err + } + return bkt, nil +} + +func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) +} diff --git a/metadata/content.go b/metadata/content.go new file mode 100644 index 000000000..16aa0b3da --- /dev/null +++ b/metadata/content.go @@ -0,0 +1,382 @@ +package metadata + +import ( + "context" + "encoding/binary" + "io" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" + "github.com/containerd/containerd/namespaces" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +type contentStore struct { + content.Store + db *bolt.DB +} + +// NewContentStore returns a namespaced content store using an existing +// content store interface. +func NewContentStore(db *bolt.DB, cs content.Store) content.Store { + return &contentStore{ + Store: cs, + db: db, + } +} + +func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return content.Info{}, err + } + + var info content.Info + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getBlobBucket(tx, ns, dgst) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) + } + + info.Digest = dgst + return readInfo(&info, bkt) + }); err != nil { + return content.Info{}, err + } + + return info, nil +} + +func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + // TODO: Batch results to keep from reading all info into memory + var infos []content.Info + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getBlobsBucket(tx, ns) + if bkt == nil { + return nil + } + + return bkt.ForEach(func(k, v []byte) error { + dgst, err := digest.Parse(string(k)) + if err != nil { + return nil + } + info := content.Info{ + Digest: dgst, + } + if err := readInfo(&info, bkt.Bucket(k)); err != nil { + return err + } + infos = append(infos, info) + return nil + }) + }); err != nil { + return err + } + + for _, info := range infos { + if err := fn(info); err != nil { + return err + } + } + + return nil +} + +func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return update(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getBlobBucket(tx, ns, dgst) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) + } + + // Just remove local reference, garbage collector is responsible for + // cleaning up on disk content + return getBlobsBucket(tx, ns).Delete([]byte(dgst.String())) + }) +} + +func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + filter, err := filters.ParseAll(fs...) + if err != nil { + return nil, err + } + + brefs := map[string]string{} + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getIngestBucket(tx, ns) + if bkt == nil { + return nil + } + + return bkt.ForEach(func(k, v []byte) error { + // TODO(dmcgowan): match name and potentially labels here + brefs[string(k)] = string(v) + return nil + }) + }); err != nil { + return nil, err + } + + statuses := make([]content.Status, 0, len(brefs)) + for k, bref := range brefs { + status, err := cs.Store.Status(ctx, bref) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + status.Ref = k + + if filter.Match(adaptContentStatus(status)) { + statuses = append(statuses, status) + } + } + + return statuses, nil + +} + +func getRef(tx *bolt.Tx, ns, ref string) string { + bkt := getIngestBucket(tx, ns) + if bkt == nil { + return "" + } + v := bkt.Get([]byte(ref)) + if len(v) == 0 { + return "" + } + return string(v) +} + +func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return content.Status{}, err + } + + var bref string + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bref = getRef(tx, ns, ref) + if bref == "" { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + + return nil + }); err != nil { + return content.Status{}, err + } + + return cs.Store.Status(ctx, bref) +} + +func (cs *contentStore) Abort(ctx context.Context, ref string) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return update(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getIngestBucket(tx, ns) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + bref := string(bkt.Get([]byte(ref))) + if bref == "" { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + if err := bkt.Delete([]byte(ref)); err != nil { + return err + } + + return cs.Store.Abort(ctx, bref) + }) + +} + +func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + var w content.Writer + if err := update(ctx, cs.db, func(tx *bolt.Tx) error { + if expected != "" { + cbkt := getBlobBucket(tx, ns, expected) + if cbkt != nil { + return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) + } + } + + bkt, err := createIngestBucket(tx, ns) + if err != nil { + return err + } + + if len(bkt.Get([]byte(ref))) > 0 { + return errors.Wrapf(errdefs.ErrUnavailable, "ref %v is currently in use", ref) + } + + sid, err := bkt.NextSequence() + if err != nil { + return err + } + + bref := createKey(sid, ns, ref) + if err := bkt.Put([]byte(ref), []byte(bref)); err != nil { + return err + } + + // Do not use the passed in expected value here since it was + // already checked against the user metadata. If the content + // store has the content, it must still be written before + // linked into the given namespace. It is possible in the future + // to allow content which exists in content store but not + // namespace to be linked here and returned an exist error, but + // this would require more configuration to make secure. + w, err = cs.Store.Writer(ctx, bref, size, "") + return err + }); err != nil { + return nil, err + } + + // TODO: keep the expected in the writer to use on commit + // when no expected is provided there. + return &namespacedWriter{ + Writer: w, + ref: ref, + namespace: ns, + db: cs.db, + }, nil +} + +type namespacedWriter struct { + content.Writer + ref string + namespace string + db *bolt.DB +} + +func (nw *namespacedWriter) Commit(size int64, expected digest.Digest) error { + return nw.db.Update(func(tx *bolt.Tx) error { + bkt := getIngestBucket(tx, nw.namespace) + if bkt != nil { + if err := bkt.Delete([]byte(nw.ref)); err != nil { + return err + } + } + return nw.commit(tx, size, expected) + }) +} + +func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest) error { + status, err := nw.Writer.Status() + if err != nil { + return err + } + if size != 0 && size != status.Offset { + return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) + } + size = status.Offset + + actual := nw.Writer.Digest() + + if err := nw.Writer.Commit(size, expected); err != nil { + if !errdefs.IsAlreadyExists(err) { + return err + } + if getBlobBucket(tx, nw.namespace, actual) != nil { + return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) + } + } + + bkt, err := createBlobBucket(tx, nw.namespace, actual) + if err != nil { + return err + } + + sizeEncoded, err := encodeSize(size) + if err != nil { + return err + } + + timeEncoded, err := status.UpdatedAt.MarshalBinary() + if err != nil { + return err + } + + for _, v := range [][2][]byte{ + {bucketKeyCreatedAt, timeEncoded}, + {bucketKeySize, sizeEncoded}, + } { + if err := bkt.Put(v[0], v[1]); err != nil { + return err + } + } + + return nil +} + +func (cs *contentStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + if err := cs.checkAccess(ctx, dgst); err != nil { + return nil, err + } + return cs.Store.Reader(ctx, dgst) +} + +func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { + if err := cs.checkAccess(ctx, dgst); err != nil { + return nil, err + } + return cs.Store.ReaderAt(ctx, dgst) +} + +func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getBlobBucket(tx, ns, dgst) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) + } + return nil + }) +} + +func readInfo(info *content.Info, bkt *bolt.Bucket) error { + return bkt.ForEach(func(k, v []byte) error { + switch string(k) { + case string(bucketKeyCreatedAt): + if err := info.CommittedAt.UnmarshalBinary(v); err != nil { + return err + } + case string(bucketKeySize): + info.Size, _ = binary.Varint(v) + } + // TODO: Read labels + return nil + }) +} diff --git a/metadata/images.go b/metadata/images.go index 97e6cc2af..b09529eab 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -244,14 +244,9 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return err } - var ( - buf [binary.MaxVarintLen64]byte - sizeEncoded []byte = buf[:] - ) - sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, image.Target.Size)] - - if len(sizeEncoded) == 0 { - return fmt.Errorf("failed encoding size = %v", image.Target.Size) + sizeEncoded, err := encodeSize(image.Target.Size) + if err != nil { + return err } for _, v := range [][2][]byte{ @@ -266,3 +261,16 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return nil } + +func encodeSize(size int64) ([]byte, error) { + var ( + buf [binary.MaxVarintLen64]byte + sizeEncoded []byte = buf[:] + ) + sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)] + + if len(sizeEncoded) == 0 { + return nil, fmt.Errorf("failed encoding size = %v", size) + } + return sizeEncoded, nil +} diff --git a/metadata/snapshot.go b/metadata/snapshot.go index 0ac3133ef..f2267ec53 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -29,11 +29,11 @@ func NewSnapshotter(db *bolt.DB, name string, sn snapshot.Snapshotter) snapshot. } } -func snapshotKey(id uint64, namespace, key string) string { +func createKey(id uint64, namespace, key string) string { return fmt.Sprintf("%s/%d/%s", namespace, id, key) } -func trimName(key string) string { +func trimKey(key string) string { parts := strings.SplitN(key, "/", 3) if len(parts) < 3 { return "" @@ -82,9 +82,9 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro if err != nil { return snapshot.Info{}, err } - info.Name = trimName(info.Name) + info.Name = trimKey(info.Name) if info.Parent != "" { - info.Parent = trimName(info.Parent) + info.Parent = trimKey(info.Parent) } return info, nil @@ -143,7 +143,7 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re if err != nil { return err } - bkey = snapshotKey(sid, ns, key) + bkey = createKey(sid, ns, key) if err := bkt.Put([]byte(key), []byte(bkey)); err != nil { return err } @@ -188,7 +188,7 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string) error { if err != nil { return err } - nameKey = snapshotKey(sid, ns, name) + nameKey = createKey(sid, ns, name) if err := bkt.Put([]byte(name), []byte(nameKey)); err != nil { return err } @@ -259,9 +259,9 @@ func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapsho return err } - info.Name = trimName(info.Name) + info.Name = trimKey(info.Name) if info.Parent != "" { - info.Parent = trimName(info.Parent) + info.Parent = trimKey(info.Parent) } if err := fn(ctx, info); err != nil { return err diff --git a/services/content/service.go b/services/content/service.go index 322d956da..affdc4386 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -5,12 +5,14 @@ import ( "sync" "github.com/Sirupsen/logrus" + "github.com/boltdb/bolt" api "github.com/containerd/containerd/api/services/content/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" digest "github.com/opencontainers/go-digest" @@ -39,6 +41,7 @@ func init() { ID: "content", Requires: []plugin.PluginType{ plugin.ContentPlugin, + plugin.MetadataPlugin, }, Init: NewService, }) @@ -49,8 +52,13 @@ func NewService(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store)) return &Service{ - store: c.(content.Store), + store: cs, emitter: events.GetPoster(ic.Context), }, nil } @@ -216,12 +224,31 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) { } func (s *Service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) { - statuses, err := s.store.Status(ctx, req.Filter) + status, err := s.store.Status(ctx, req.Ref) if err != nil { - return nil, errdefs.ToGRPCf(err, "could not get status for filter %q", req.Filter) + return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref) } var resp api.StatusResponse + resp.Status = &api.Status{ + StartedAt: status.StartedAt, + UpdatedAt: status.UpdatedAt, + Ref: status.Ref, + Offset: status.Offset, + Total: status.Total, + Expected: status.Expected, + } + + return &resp, nil +} + +func (s *Service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) { + statuses, err := s.store.ListStatuses(ctx, req.Filters...) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + var resp api.ListStatusesResponse for _, status := range statuses { resp.Statuses = append(resp.Statuses, api.Status{ StartedAt: status.StartedAt, diff --git a/services/content/store.go b/services/content/store.go index 2153e3d6e..f03d429f9 100644 --- a/services/content/store.go +++ b/services/content/store.go @@ -94,9 +94,28 @@ func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.Rea }, nil } -func (rs *remoteStore) Status(ctx context.Context, filter string) ([]content.Status, error) { +func (rs *remoteStore) Status(ctx context.Context, ref string) (content.Status, error) { resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{ - Filter: filter, + Ref: ref, + }) + if err != nil { + return content.Status{}, errdefs.FromGRPC(err) + } + + status := resp.Status + return content.Status{ + Ref: status.Ref, + StartedAt: status.StartedAt, + UpdatedAt: status.UpdatedAt, + Offset: status.Offset, + Total: status.Total, + Expected: status.Expected, + }, nil +} + +func (rs *remoteStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { + resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{ + Filters: filters, }) if err != nil { return nil, errdefs.FromGRPC(err) diff --git a/services/diff/service.go b/services/diff/service.go index b97908ace..b4d1a150b 100644 --- a/services/diff/service.go +++ b/services/diff/service.go @@ -3,6 +3,7 @@ package diff import ( diffapi "github.com/containerd/containerd/api/services/diff/v1" "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" "golang.org/x/net/context" @@ -45,7 +46,7 @@ func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi ocidesc, err := s.diff.Apply(ctx, desc, mounts) if err != nil { - return nil, err + return nil, errdefs.ToGRPC(err) } return &diffapi.ApplyResponse{ @@ -60,7 +61,7 @@ func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.D ocidesc, err := s.diff.DiffMounts(ctx, aMounts, bMounts, dr.MediaType, dr.Ref) if err != nil { - return nil, err + return nil, errdefs.ToGRPC(err) } return &diffapi.DiffResponse{ diff --git a/services/tasks/service.go b/services/tasks/service.go index 63a273872..81bd61910 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -62,6 +62,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + cs := metadata.NewContentStore(m.(*bolt.DB), ct.(content.Store)) runtimes := make(map[string]runtime.Runtime) for _, rr := range rt { r := rr.(runtime.Runtime) @@ -71,7 +72,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { return &Service{ runtimes: runtimes, db: m.(*bolt.DB), - store: ct.(content.Store), + store: cs, emitter: e, }, nil } diff --git a/task.go b/task.go index bfbaa6453..60f894686 100644 --- a/task.go +++ b/task.go @@ -346,7 +346,7 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin if err != nil { return d, err } - if err := writer.Commit(0, ""); err != nil { + if err := writer.Commit(size, ""); err != nil { return d, err } return v1.Descriptor{