From 2c9004d431df0e02eabbb293d8c2be819869ad48 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 23 Jun 2017 13:04:06 -0700 Subject: [PATCH 1/4] Add namespace content store in metadata Add a metadata store for content which enforces content is only visible inside a given namespace. Signed-off-by: Derek McGowan --- differ/differ.go | 9 +- metadata/buckets.go | 18 +++ metadata/content.go | 281 ++++++++++++++++++++++++++++++++++++ metadata/images.go | 24 ++- services/content/service.go | 10 +- 5 files changed, 332 insertions(+), 10 deletions(-) create mode 100644 metadata/content.go diff --git a/differ/differ.go b/differ/differ.go index a39cb0375..76fedcced 100644 --- a/differ/differ.go +++ b/differ/differ.go @@ -5,9 +5,11 @@ import ( "io/ioutil" "os" + "github.com/boltdb/bolt" "github.com/containerd/containerd/archive" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" digest "github.com/opencontainers/go-digest" @@ -22,13 +24,18 @@ func init() { ID: "base-diff", Requires: []plugin.PluginType{ plugin.ContentPlugin, + plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { c, err := ic.Get(plugin.ContentPlugin) if err != nil { return nil, err } - return NewBaseDiff(c.(content.Store)) + md, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store))) }, }) } diff --git a/metadata/buckets.go b/metadata/buckets.go index ca555f395..d4be495c1 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -2,6 +2,7 @@ package metadata import ( "github.com/boltdb/bolt" + digest "github.com/opencontainers/go-digest" ) // The layout where a "/" delineates a bucket is desribed in the following @@ -33,6 +34,7 @@ var ( bucketKeyObjectImages = []byte("images") // stores image objects bucketKeyObjectContainers = []byte("containers") // stores container objects bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references + bucketKeyObjectContent = []byte("content") // stores content links bucketKeyDigest = []byte("digest") bucketKeyMediaType = []byte("mediatype") @@ -139,3 +141,19 @@ func createSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) (*bolt. func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Bucket { return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots, []byte(snapshotter)) } + +func createContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String())) + if err != nil { + return nil, err + } + return bkt, nil +} + +func getAllContentBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent) +} + +func getContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String())) +} diff --git a/metadata/content.go b/metadata/content.go new file mode 100644 index 000000000..e53e01457 --- /dev/null +++ b/metadata/content.go @@ -0,0 +1,281 @@ +package metadata + +import ( + "context" + "encoding/binary" + "io" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/namespaces" + digest "github.com/opencontainers/go-digest" +) + +type contentStore struct { + content.Store + db *bolt.DB +} + +// NewContentStore returns a namespaced content store using an existing +// content store interface. +func NewContentStore(db *bolt.DB, cs content.Store) content.Store { + return &contentStore{ + Store: cs, + db: db, + } +} + +func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return content.Info{}, err + } + + var info content.Info + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, dgst) + if bkt == nil { + return content.ErrNotFound("") + } + + info.Digest = dgst + return readInfo(&info, bkt) + }); err != nil { + return content.Info{}, err + } + + return info, nil +} + +func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + // TODO: Batch results to keep from reading all info into memory + var infos []content.Info + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getAllContentBucket(tx, ns) + if bkt == nil { + return nil + } + + return bkt.ForEach(func(k, v []byte) error { + dgst, err := digest.Parse(string(k)) + if err != nil { + return nil + } + info := content.Info{ + Digest: dgst, + } + if err := readInfo(&info, bkt.Bucket(k)); err != nil { + return err + } + infos = append(infos, info) + return nil + }) + }); err != nil { + return err + } + + for _, info := range infos { + if err := fn(info); err != nil { + return err + } + } + + return nil +} + +func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return update(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, dgst) + if bkt == nil { + return content.ErrNotFound("") + } + + // Just remove local reference, garbage collector is responsible for + // cleaning up on disk content + return getAllContentBucket(tx, ns).Delete([]byte(dgst.String())) + }) +} + +func (cs *contentStore) Status(ctx context.Context, re string) ([]content.Status, error) { + _, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + // TODO: Read status keys and match + + return cs.Store.Status(ctx, re) +} + +func (cs *contentStore) Abort(ctx context.Context, ref string) error { + _, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + // TODO: Read status key and delete + + return cs.Store.Abort(ctx, ref) +} + +func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + // TODO: Create ref key + + if expected != "" { + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, expected) + if bkt != nil { + return content.ErrExists("") + } + return nil + }); err != nil { + return nil, err + } + } + + // Do not use the passed in expected value here since it was + // already checked against the user metadata. If the content + // store has the content, it must still be written before + // linked into the given namespace. It is possible in the future + // to allow content which exists in content store but not + // namespace to be linked here and returned an exist error, but + // this would require more configuration to make secure. + w, err := cs.Store.Writer(ctx, ref, size, "") + if err != nil { + return nil, err + } + + // TODO: keep the expected in the writer to use on commit + // when no expected is provided there. + return &namespacedWriter{ + Writer: w, + namespace: ns, + db: cs.db, + }, nil +} + +type namespacedWriter struct { + content.Writer + namespace string + db *bolt.DB +} + +func (nw *namespacedWriter) Commit(size int64, expected digest.Digest) error { + tx, err := nw.db.Begin(true) + if err != nil { + return err + } + + if err := nw.commit(tx, size, expected); err != nil { + tx.Rollback() + return err + } + + return tx.Commit() +} + +func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest) error { + status, err := nw.Writer.Status() + if err != nil { + return err + } + actual := nw.Writer.Digest() + + // TODO: Handle already exists + if err := nw.Writer.Commit(size, expected); err != nil { + if !content.IsExists(err) { + return err + } + if getContentBucket(tx, nw.namespace, actual) != nil { + return content.ErrExists("") + } + // Link into this namespace + } + + size = status.Total + + bkt, err := createContentBucket(tx, nw.namespace, actual) + if err != nil { + return err + } + + sizeEncoded, err := encodeSize(size) + if err != nil { + return err + } + + timeEncoded, err := status.UpdatedAt.MarshalBinary() + if err != nil { + return err + } + + for _, v := range [][2][]byte{ + {bucketKeyCreatedAt, timeEncoded}, + {bucketKeySize, sizeEncoded}, + } { + if err := bkt.Put(v[0], v[1]); err != nil { + return err + } + } + + return nil +} + +func (cs *contentStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + if err := cs.checkAccess(ctx, dgst); err != nil { + return nil, err + } + return cs.Store.Reader(ctx, dgst) +} + +func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { + if err := cs.checkAccess(ctx, dgst); err != nil { + return nil, err + } + return cs.Store.ReaderAt(ctx, dgst) +} + +func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, dgst) + if bkt == nil { + return content.ErrNotFound("") + } + return nil + }) +} + +func readInfo(info *content.Info, bkt *bolt.Bucket) error { + return bkt.ForEach(func(k, v []byte) error { + switch string(k) { + case string(bucketKeyCreatedAt): + if err := info.CommittedAt.UnmarshalBinary(v); err != nil { + return err + } + case string(bucketKeySize): + info.Size, _ = binary.Varint(v) + } + // TODO: Read labels + return nil + }) +} diff --git a/metadata/images.go b/metadata/images.go index 47fc4c167..adb4eac59 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -243,14 +243,9 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return err } - var ( - buf [binary.MaxVarintLen64]byte - sizeEncoded []byte = buf[:] - ) - sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, image.Target.Size)] - - if len(sizeEncoded) == 0 { - return fmt.Errorf("failed encoding size = %v", image.Target.Size) + sizeEncoded, err := encodeSize(image.Target.Size) + if err != nil { + return err } for _, v := range [][2][]byte{ @@ -265,3 +260,16 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return nil } + +func encodeSize(size int64) ([]byte, error) { + var ( + buf [binary.MaxVarintLen64]byte + sizeEncoded []byte = buf[:] + ) + sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)] + + if len(sizeEncoded) == 0 { + return nil, fmt.Errorf("failed encoding size = %v", size) + } + return sizeEncoded, nil +} diff --git a/services/content/service.go b/services/content/service.go index 322d956da..62f1997e7 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -5,12 +5,14 @@ import ( "sync" "github.com/Sirupsen/logrus" + "github.com/boltdb/bolt" api "github.com/containerd/containerd/api/services/content/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" digest "github.com/opencontainers/go-digest" @@ -39,6 +41,7 @@ func init() { ID: "content", Requires: []plugin.PluginType{ plugin.ContentPlugin, + plugin.MetadataPlugin, }, Init: NewService, }) @@ -49,8 +52,13 @@ func NewService(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store)) return &Service{ - store: c.(content.Store), + store: cs, emitter: events.GetPoster(ic.Context), }, nil } From b6d58f63a8a3533c45f8a0c58e32395b48a61988 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 29 Jun 2017 15:29:41 -0700 Subject: [PATCH 2/4] Support for ingest namespacing Move content status to list statuses and add single status to interface. Updates API to support list statuses and status Updates snapshot key creation to be generic Signed-off-by: Derek McGowan --- api/services/content/v1/content.pb.go | 475 +++++++++++++++++++++----- api/services/content/v1/content.proto | 19 +- cmd/dist/active.go | 2 +- cmd/dist/fetch.go | 2 +- content/content.go | 19 +- content/content_test.go | 2 +- content/store.go | 6 +- metadata/buckets.go | 28 +- metadata/content.go | 204 ++++++++--- metadata/snapshot.go | 16 +- services/content/service.go | 23 +- services/content/store.go | 21 +- 12 files changed, 649 insertions(+), 168 deletions(-) diff --git a/api/services/content/v1/content.pb.go b/api/services/content/v1/content.pb.go index 063bb9214..9856277f6 100644 --- a/api/services/content/v1/content.pb.go +++ b/api/services/content/v1/content.pb.go @@ -20,6 +20,8 @@ Status StatusRequest StatusResponse + ListStatusesRequest + ListStatusesResponse WriteContentRequest WriteContentResponse AbortRequest @@ -195,7 +197,7 @@ func (*Status) ProtoMessage() {} func (*Status) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{8} } type StatusRequest struct { - Filter string `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` + Ref string `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"` } func (m *StatusRequest) Reset() { *m = StatusRequest{} } @@ -203,13 +205,29 @@ func (*StatusRequest) ProtoMessage() {} func (*StatusRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{9} } type StatusResponse struct { - Statuses []Status `protobuf:"bytes,1,rep,name=statuses" json:"statuses"` + Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"` } func (m *StatusResponse) Reset() { *m = StatusResponse{} } func (*StatusResponse) ProtoMessage() {} func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{10} } +type ListStatusesRequest struct { + Filter string `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` +} + +func (m *ListStatusesRequest) Reset() { *m = ListStatusesRequest{} } +func (*ListStatusesRequest) ProtoMessage() {} +func (*ListStatusesRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{11} } + +type ListStatusesResponse struct { + Statuses []Status `protobuf:"bytes,1,rep,name=statuses" json:"statuses"` +} + +func (m *ListStatusesResponse) Reset() { *m = ListStatusesResponse{} } +func (*ListStatusesResponse) ProtoMessage() {} +func (*ListStatusesResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{12} } + // WriteContentRequest writes data to the request ref at offset. type WriteContentRequest struct { // Action sets the behavior of the write. @@ -265,7 +283,7 @@ type WriteContentRequest struct { func (m *WriteContentRequest) Reset() { *m = WriteContentRequest{} } func (*WriteContentRequest) ProtoMessage() {} -func (*WriteContentRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{11} } +func (*WriteContentRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{13} } // WriteContentResponse is returned on the culmination of a write call. type WriteContentResponse struct { @@ -299,7 +317,7 @@ type WriteContentResponse struct { func (m *WriteContentResponse) Reset() { *m = WriteContentResponse{} } func (*WriteContentResponse) ProtoMessage() {} -func (*WriteContentResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{12} } +func (*WriteContentResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{14} } type AbortRequest struct { Ref string `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"` @@ -307,7 +325,7 @@ type AbortRequest struct { func (m *AbortRequest) Reset() { *m = AbortRequest{} } func (*AbortRequest) ProtoMessage() {} -func (*AbortRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{13} } +func (*AbortRequest) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{15} } func init() { proto.RegisterType((*Info)(nil), "containerd.services.content.v1.Info") @@ -321,6 +339,8 @@ func init() { proto.RegisterType((*Status)(nil), "containerd.services.content.v1.Status") proto.RegisterType((*StatusRequest)(nil), "containerd.services.content.v1.StatusRequest") proto.RegisterType((*StatusResponse)(nil), "containerd.services.content.v1.StatusResponse") + proto.RegisterType((*ListStatusesRequest)(nil), "containerd.services.content.v1.ListStatusesRequest") + proto.RegisterType((*ListStatusesResponse)(nil), "containerd.services.content.v1.ListStatusesResponse") proto.RegisterType((*WriteContentRequest)(nil), "containerd.services.content.v1.WriteContentRequest") proto.RegisterType((*WriteContentResponse)(nil), "containerd.services.content.v1.WriteContentResponse") proto.RegisterType((*AbortRequest)(nil), "containerd.services.content.v1.AbortRequest") @@ -356,13 +376,15 @@ type ContentClient interface { // // The requested data may be returned in one or more messages. Read(ctx context.Context, in *ReadContentRequest, opts ...grpc.CallOption) (Content_ReadClient, error) - // Status returns the status of ongoing object ingestions, started via + // Status returns the status for a single reference. + Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + // ListStatuses returns the status of ongoing object ingestions, started via // Write. // // Only those matching the regular expression will be provided in the // response. If the provided regular expression is empty, all ingestions // will be provided. - Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + ListStatuses(ctx context.Context, in *ListStatusesRequest, opts ...grpc.CallOption) (*ListStatusesResponse, error) // Write begins or resumes writes to a resource identified by a unique ref. // Only one active stream may exist at a time for each ref. // @@ -484,6 +506,15 @@ func (c *contentClient) Status(ctx context.Context, in *StatusRequest, opts ...g return out, nil } +func (c *contentClient) ListStatuses(ctx context.Context, in *ListStatusesRequest, opts ...grpc.CallOption) (*ListStatusesResponse, error) { + out := new(ListStatusesResponse) + err := grpc.Invoke(ctx, "/containerd.services.content.v1.Content/ListStatuses", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *contentClient) Write(ctx context.Context, opts ...grpc.CallOption) (Content_WriteClient, error) { stream, err := grpc.NewClientStream(ctx, &_Content_serviceDesc.Streams[2], c.cc, "/containerd.services.content.v1.Content/Write", opts...) if err != nil { @@ -545,13 +576,15 @@ type ContentServer interface { // // The requested data may be returned in one or more messages. Read(*ReadContentRequest, Content_ReadServer) error - // Status returns the status of ongoing object ingestions, started via + // Status returns the status for a single reference. + Status(context.Context, *StatusRequest) (*StatusResponse, error) + // ListStatuses returns the status of ongoing object ingestions, started via // Write. // // Only those matching the regular expression will be provided in the // response. If the provided regular expression is empty, all ingestions // will be provided. - Status(context.Context, *StatusRequest) (*StatusResponse, error) + ListStatuses(context.Context, *ListStatusesRequest) (*ListStatusesResponse, error) // Write begins or resumes writes to a resource identified by a unique ref. // Only one active stream may exist at a time for each ref. // @@ -674,6 +707,24 @@ func _Content_Status_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Content_ListStatuses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListStatusesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ContentServer).ListStatuses(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/containerd.services.content.v1.Content/ListStatuses", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ContentServer).ListStatuses(ctx, req.(*ListStatusesRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Content_Write_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(ContentServer).Write(&contentWriteServer{stream}) } @@ -734,6 +785,10 @@ var _Content_serviceDesc = grpc.ServiceDesc{ MethodName: "Status", Handler: _Content_Status_Handler, }, + { + MethodName: "ListStatuses", + Handler: _Content_ListStatuses_Handler, + }, { MethodName: "Abort", Handler: _Content_Abort_Handler, @@ -1053,11 +1108,11 @@ func (m *StatusRequest) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Filter) > 0 { + if len(m.Ref) > 0 { dAtA[i] = 0xa i++ - i = encodeVarintContent(dAtA, i, uint64(len(m.Filter))) - i += copy(dAtA[i:], m.Filter) + i = encodeVarintContent(dAtA, i, uint64(len(m.Ref))) + i += copy(dAtA[i:], m.Ref) } return i, nil } @@ -1073,6 +1128,58 @@ func (m *StatusResponse) Marshal() (dAtA []byte, err error) { } func (m *StatusResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Status != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintContent(dAtA, i, uint64(m.Status.Size())) + n5, err := m.Status.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n5 + } + return i, nil +} + +func (m *ListStatusesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ListStatusesRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Filter) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintContent(dAtA, i, uint64(len(m.Filter))) + i += copy(dAtA[i:], m.Filter) + } + return i, nil +} + +func (m *ListStatusesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ListStatusesResponse) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -1166,19 +1273,19 @@ func (m *WriteContentResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintContent(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.StartedAt))) - n5, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartedAt, dAtA[i:]) - if err != nil { - return 0, err - } - i += n5 - dAtA[i] = 0x1a - i++ - i = encodeVarintContent(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.UpdatedAt))) - n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.UpdatedAt, dAtA[i:]) + n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartedAt, dAtA[i:]) if err != nil { return 0, err } i += n6 + dAtA[i] = 0x1a + i++ + i = encodeVarintContent(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.UpdatedAt))) + n7, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.UpdatedAt, dAtA[i:]) + if err != nil { + return 0, err + } + i += n7 if m.Offset != 0 { dAtA[i] = 0x20 i++ @@ -1366,7 +1473,7 @@ func (m *Status) Size() (n int) { func (m *StatusRequest) Size() (n int) { var l int _ = l - l = len(m.Filter) + l = len(m.Ref) if l > 0 { n += 1 + l + sovContent(uint64(l)) } @@ -1374,6 +1481,26 @@ func (m *StatusRequest) Size() (n int) { } func (m *StatusResponse) Size() (n int) { + var l int + _ = l + if m.Status != nil { + l = m.Status.Size() + n += 1 + l + sovContent(uint64(l)) + } + return n +} + +func (m *ListStatusesRequest) Size() (n int) { + var l int + _ = l + l = len(m.Filter) + if l > 0 { + n += 1 + l + sovContent(uint64(l)) + } + return n +} + +func (m *ListStatusesResponse) Size() (n int) { var l int _ = l if len(m.Statuses) > 0 { @@ -1562,7 +1689,7 @@ func (this *StatusRequest) String() string { return "nil" } s := strings.Join([]string{`&StatusRequest{`, - `Filter:` + fmt.Sprintf("%v", this.Filter) + `,`, + `Ref:` + fmt.Sprintf("%v", this.Ref) + `,`, `}`, }, "") return s @@ -1572,6 +1699,26 @@ func (this *StatusResponse) String() string { return "nil" } s := strings.Join([]string{`&StatusResponse{`, + `Status:` + strings.Replace(fmt.Sprintf("%v", this.Status), "Status", "Status", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ListStatusesRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ListStatusesRequest{`, + `Filter:` + fmt.Sprintf("%v", this.Filter) + `,`, + `}`, + }, "") + return s +} +func (this *ListStatusesResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ListStatusesResponse{`, `Statuses:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Statuses), "Status", "Status", 1), `&`, ``, 1) + `,`, `}`, }, "") @@ -2576,7 +2723,7 @@ func (m *StatusRequest) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Filter", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Ref", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -2601,7 +2748,7 @@ func (m *StatusRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Filter = string(dAtA[iNdEx:postIndex]) + m.Ref = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -2653,6 +2800,168 @@ func (m *StatusResponse) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: StatusResponse: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthContent + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Status == nil { + m.Status = &Status{} + } + if err := m.Status.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipContent(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthContent + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ListStatusesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ListStatusesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ListStatusesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Filter", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthContent + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Filter = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipContent(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthContent + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ListStatusesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ListStatusesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ListStatusesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { case 1: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Statuses", wireType) @@ -3286,61 +3595,63 @@ func init() { } var fileDescriptorContent = []byte{ - // 881 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4f, 0x6f, 0xe3, 0x44, - 0x14, 0xcf, 0x24, 0x8e, 0x69, 0x5f, 0xc2, 0x12, 0x26, 0xa1, 0x8a, 0x8c, 0x70, 0x82, 0x85, 0x20, - 0xda, 0x65, 0x9d, 0x6e, 0xba, 0x37, 0x24, 0x44, 0x92, 0x5d, 0xa0, 0x88, 0x2e, 0x92, 0x37, 0xb0, - 0xa2, 0x17, 0xe4, 0x24, 0x13, 0x63, 0x29, 0xf1, 0x78, 0xed, 0x49, 0x04, 0x9c, 0xb8, 0x20, 0xa1, - 0x8a, 0x03, 0x5f, 0xa0, 0x17, 0xe0, 0xce, 0x9d, 0x4f, 0xd0, 0x23, 0x47, 0xb4, 0x87, 0x96, 0xe6, - 0x83, 0x20, 0xe4, 0xf1, 0xd8, 0x71, 0x9a, 0x96, 0xfc, 0x69, 0x38, 0xe5, 0x79, 0xfc, 0x7e, 0xbf, - 0xf7, 0xef, 0x97, 0x79, 0x86, 0x0f, 0x2d, 0x9b, 0x7d, 0x3d, 0xee, 0xea, 0x3d, 0x3a, 0xaa, 0xf7, - 0xa8, 0xc3, 0x4c, 0xdb, 0x21, 0x5e, 0x3f, 0x69, 0x9a, 0xae, 0x5d, 0xf7, 0x89, 0x37, 0xb1, 0x7b, - 0xc4, 0xe7, 0xe7, 0xc4, 0x61, 0xf5, 0xc9, 0x83, 0xc8, 0xd4, 0x5d, 0x8f, 0x32, 0x8a, 0xd5, 0x19, - 0x42, 0x8f, 0xbc, 0xf5, 0xc8, 0x65, 0xf2, 0x40, 0x29, 0x59, 0xd4, 0xa2, 0xdc, 0xb5, 0x1e, 0x58, - 0x21, 0x4a, 0xa9, 0x58, 0x94, 0x5a, 0x43, 0x52, 0xe7, 0x4f, 0xdd, 0xf1, 0xa0, 0xce, 0xec, 0x11, - 0xf1, 0x99, 0x39, 0x72, 0x85, 0xc3, 0xeb, 0x57, 0x1d, 0xc8, 0xc8, 0x65, 0xdf, 0x86, 0x2f, 0xb5, - 0xdf, 0x11, 0x48, 0x87, 0xce, 0x80, 0xe2, 0x4f, 0x40, 0xee, 0xdb, 0x16, 0xf1, 0x59, 0x19, 0x55, - 0x51, 0x6d, 0xb7, 0xd5, 0x38, 0x3b, 0xaf, 0xa4, 0x5e, 0x9c, 0x57, 0xee, 0x26, 0x8a, 0xa3, 0x2e, - 0x71, 0xe2, 0x1c, 0xfd, 0xba, 0x45, 0xef, 0x87, 0x10, 0xfd, 0x11, 0xff, 0x31, 0x04, 0x03, 0xc6, - 0x20, 0xf9, 0xf6, 0x77, 0xa4, 0x9c, 0xae, 0xa2, 0x5a, 0xc6, 0xe0, 0x36, 0xfe, 0x08, 0xf2, 0x3d, - 0x3a, 0x1a, 0xd9, 0x8c, 0x91, 0xfe, 0x57, 0x26, 0x2b, 0x67, 0xaa, 0xa8, 0x96, 0x6b, 0x28, 0x7a, - 0x98, 0x9c, 0x1e, 0x25, 0xa7, 0x77, 0xa2, 0xec, 0x5b, 0x3b, 0x41, 0x06, 0x3f, 0x5f, 0x54, 0x90, - 0x91, 0x8b, 0x91, 0x4d, 0xa6, 0x7d, 0x09, 0xb9, 0x20, 0x61, 0x83, 0x3c, 0x1f, 0x07, 0xb1, 0xb6, - 0x98, 0xb7, 0xf6, 0x04, 0xf2, 0x21, 0xb5, 0xef, 0x52, 0xc7, 0x27, 0xf8, 0x7d, 0x90, 0x6c, 0x67, - 0x40, 0x39, 0x73, 0xae, 0xf1, 0x96, 0xfe, 0xdf, 0xf3, 0xd1, 0x03, 0x6c, 0x4b, 0x0a, 0xe2, 0x1b, - 0x1c, 0xa7, 0x95, 0x00, 0x7f, 0x6a, 0xfb, 0xac, 0x1d, 0xba, 0x88, 0x8c, 0xb5, 0xcf, 0xa1, 0x38, - 0x77, 0xba, 0x10, 0x2c, 0xb3, 0x51, 0xb0, 0x2e, 0x94, 0x1e, 0x91, 0x21, 0x61, 0x64, 0x3e, 0xdc, - 0x56, 0x1b, 0xf4, 0x13, 0x02, 0x6c, 0x10, 0xb3, 0xff, 0xff, 0x85, 0xc0, 0x7b, 0x20, 0xd3, 0xc1, - 0xc0, 0x27, 0x4c, 0xa8, 0x47, 0x3c, 0xc5, 0x9a, 0xca, 0xcc, 0x34, 0xa5, 0x35, 0xa1, 0x38, 0x97, - 0x8d, 0xe8, 0xe4, 0x8c, 0x02, 0x5d, 0xa5, 0xe8, 0x9b, 0xcc, 0xe4, 0xc4, 0x79, 0x83, 0xdb, 0xda, - 0x2f, 0x69, 0x90, 0x9f, 0x32, 0x93, 0x8d, 0x7d, 0xdc, 0x06, 0xf0, 0x99, 0xe9, 0x09, 0x7d, 0xa2, - 0x35, 0xf4, 0xb9, 0x2b, 0x70, 0x4d, 0x16, 0x90, 0x8c, 0xdd, 0xbe, 0x29, 0x48, 0xd2, 0xeb, 0x90, - 0x08, 0x5c, 0x93, 0xe1, 0x02, 0x64, 0x3c, 0x32, 0xe0, 0xa5, 0xee, 0x1a, 0x81, 0x99, 0x28, 0x49, - 0x9a, 0x2b, 0xa9, 0x04, 0x59, 0x46, 0x99, 0x39, 0x2c, 0x67, 0xf9, 0x71, 0xf8, 0x80, 0x9f, 0xc0, - 0x0e, 0xf9, 0xc6, 0x25, 0x3d, 0x46, 0xfa, 0x65, 0x79, 0xe3, 0x89, 0xc4, 0x1c, 0xda, 0x3b, 0xf0, - 0x72, 0xd8, 0xa3, 0x68, 0xe0, 0x7b, 0x20, 0x0f, 0xec, 0x21, 0x23, 0x5e, 0x38, 0x70, 0x43, 0x3c, - 0x69, 0xc7, 0x70, 0x27, 0x72, 0x14, 0xb3, 0xf8, 0x18, 0x76, 0x7c, 0x7e, 0x42, 0x7c, 0xa1, 0xec, - 0xb7, 0x97, 0x29, 0x3b, 0x64, 0x10, 0xda, 0x8e, 0xd1, 0xda, 0x3f, 0x08, 0x8a, 0xcf, 0x3c, 0x7b, - 0x41, 0xdf, 0x6d, 0x90, 0xcd, 0x1e, 0xb3, 0xa9, 0xc3, 0x73, 0xb9, 0xd3, 0xb8, 0xb7, 0x8c, 0x9f, - 0x93, 0x34, 0x39, 0xc4, 0x10, 0xd0, 0xa8, 0xe3, 0xe9, 0x59, 0xc7, 0xe3, 0xce, 0x66, 0x6e, 0xea, - 0xac, 0x74, 0xfb, 0xce, 0x26, 0xe6, 0x9a, 0xbd, 0x56, 0xaa, 0x72, 0x42, 0xaa, 0x17, 0x69, 0x28, - 0xcd, 0x37, 0x40, 0xf4, 0x78, 0x2b, 0x1d, 0x98, 0x57, 0x7f, 0x7a, 0x1b, 0xea, 0xcf, 0x6c, 0xa6, - 0xfe, 0xf5, 0xb4, 0x3e, 0xbb, 0x7b, 0xe4, 0x5b, 0x5f, 0x6f, 0x55, 0xc8, 0x37, 0xbb, 0xd4, 0x8b, - 0xa5, 0x25, 0x54, 0x81, 0x62, 0x55, 0xdc, 0xfd, 0x01, 0x41, 0x2e, 0xd1, 0x3d, 0xfc, 0x06, 0x48, - 0x4f, 0x3b, 0xcd, 0x4e, 0x21, 0xa5, 0x14, 0x4f, 0x4e, 0xab, 0xaf, 0x24, 0x5e, 0x05, 0x2a, 0xc6, - 0x15, 0xc8, 0x3e, 0x33, 0x0e, 0x3b, 0x8f, 0x0b, 0x48, 0x29, 0x9d, 0x9c, 0x56, 0x0b, 0x89, 0xf7, - 0xdc, 0xc4, 0x6f, 0x82, 0xdc, 0xfe, 0xec, 0xe8, 0xe8, 0xb0, 0x53, 0x48, 0x2b, 0xaf, 0x9d, 0x9c, - 0x56, 0x5f, 0x4d, 0x78, 0xb4, 0xf9, 0xd2, 0x53, 0x8a, 0x3f, 0xfe, 0xaa, 0xa6, 0xfe, 0xf8, 0x4d, - 0x4d, 0xc6, 0x6d, 0xbc, 0xc8, 0xc2, 0x4b, 0x42, 0x06, 0xd8, 0x14, 0x1b, 0xfc, 0xde, 0x2a, 0x2b, - 0x43, 0x94, 0xa6, 0xbc, 0xbb, 0x9a, 0xb3, 0x50, 0xd8, 0x73, 0x90, 0x82, 0x95, 0x85, 0x1b, 0xcb, - 0x50, 0x8b, 0xeb, 0x4e, 0x39, 0x58, 0x0b, 0x13, 0x06, 0xdc, 0x47, 0xf8, 0x0b, 0x90, 0xc3, 0x75, - 0x86, 0x1f, 0x2e, 0x23, 0xb8, 0x6e, 0xed, 0x29, 0x7b, 0x0b, 0xb2, 0x7b, 0x1c, 0x7c, 0xf6, 0x04, - 0xa5, 0x04, 0x3b, 0x63, 0x79, 0x29, 0x8b, 0x7b, 0x6e, 0x79, 0x29, 0xd7, 0x6c, 0xa3, 0x7d, 0x84, - 0xad, 0x78, 0xc5, 0xdc, 0x5f, 0xed, 0xee, 0x8b, 0xe2, 0xe9, 0xab, 0xba, 0x8b, 0x31, 0x4d, 0x20, - 0x1b, 0xca, 0xea, 0x60, 0xa5, 0x1b, 0xe0, 0x4a, 0x75, 0x0f, 0xd7, 0x03, 0x85, 0x31, 0x6b, 0x68, - 0x1f, 0xe1, 0x23, 0xc8, 0xf2, 0xff, 0x0d, 0x5e, 0xaa, 0xaa, 0xe4, 0xdf, 0xeb, 0xa6, 0x11, 0xb5, - 0x8e, 0xcf, 0x2e, 0xd5, 0xd4, 0x5f, 0x97, 0x6a, 0xea, 0xfb, 0xa9, 0x8a, 0xce, 0xa6, 0x2a, 0xfa, - 0x73, 0xaa, 0xa2, 0xbf, 0xa7, 0x2a, 0x3a, 0xfe, 0x60, 0xd3, 0x2f, 0xed, 0xf7, 0x84, 0xd9, 0x95, - 0x79, 0xac, 0x83, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x26, 0x49, 0x54, 0x8e, 0xb4, 0x0b, 0x00, - 0x00, + // 925 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x6f, 0xe3, 0x44, + 0x14, 0xcf, 0x24, 0x8e, 0x69, 0x5f, 0xc2, 0x12, 0x26, 0xa1, 0x8a, 0x8c, 0x70, 0xb2, 0x16, 0x42, + 0xd5, 0x2e, 0x75, 0xba, 0x69, 0x6f, 0x48, 0x88, 0x24, 0xbb, 0x40, 0x11, 0x5d, 0x90, 0x37, 0xb0, + 0x62, 0x2f, 0xe0, 0x24, 0x13, 0x63, 0x29, 0xf1, 0x78, 0xed, 0x49, 0xc4, 0x72, 0xe2, 0x82, 0x84, + 0x2a, 0x0e, 0x7c, 0x81, 0x5e, 0x80, 0x3b, 0x77, 0x3e, 0x41, 0x8f, 0x5c, 0x90, 0x10, 0x87, 0x5d, + 0x36, 0x1f, 0x04, 0x21, 0x8f, 0xc7, 0x8e, 0x93, 0xb4, 0xe4, 0x4f, 0xb3, 0xa7, 0xbe, 0x19, 0xbf, + 0xdf, 0xfb, 0xfb, 0x7b, 0xf3, 0x52, 0x78, 0xdf, 0xb2, 0xd9, 0xd7, 0xa3, 0x8e, 0xde, 0xa5, 0xc3, + 0x5a, 0x97, 0x3a, 0xcc, 0xb4, 0x1d, 0xe2, 0xf5, 0x92, 0xa2, 0xe9, 0xda, 0x35, 0x9f, 0x78, 0x63, + 0xbb, 0x4b, 0x7c, 0x7e, 0x4f, 0x1c, 0x56, 0x1b, 0xdf, 0x89, 0x44, 0xdd, 0xf5, 0x28, 0xa3, 0x58, + 0x9d, 0x22, 0xf4, 0x48, 0x5b, 0x8f, 0x54, 0xc6, 0x77, 0x94, 0x92, 0x45, 0x2d, 0xca, 0x55, 0x6b, + 0x81, 0x14, 0xa2, 0x94, 0x8a, 0x45, 0xa9, 0x35, 0x20, 0x35, 0x7e, 0xea, 0x8c, 0xfa, 0x35, 0x66, + 0x0f, 0x89, 0xcf, 0xcc, 0xa1, 0x2b, 0x14, 0x5e, 0x9f, 0x57, 0x20, 0x43, 0x97, 0x3d, 0x09, 0x3f, + 0x6a, 0xbf, 0x21, 0x90, 0x4e, 0x9c, 0x3e, 0xc5, 0x1f, 0x81, 0xdc, 0xb3, 0x2d, 0xe2, 0xb3, 0x32, + 0xaa, 0xa2, 0xfd, 0xdd, 0x66, 0xfd, 0xe2, 0x69, 0x25, 0xf5, 0xf7, 0xd3, 0xca, 0xad, 0x44, 0x72, + 0xd4, 0x25, 0x4e, 0x1c, 0xa3, 0x5f, 0xb3, 0xe8, 0x41, 0x08, 0xd1, 0xef, 0xf2, 0x3f, 0x86, 0xb0, + 0x80, 0x31, 0x48, 0xbe, 0xfd, 0x2d, 0x29, 0xa7, 0xab, 0x68, 0x3f, 0x63, 0x70, 0x19, 0x7f, 0x00, + 0xf9, 0x2e, 0x1d, 0x0e, 0x6d, 0xc6, 0x48, 0xef, 0x4b, 0x93, 0x95, 0x33, 0x55, 0xb4, 0x9f, 0xab, + 0x2b, 0x7a, 0x18, 0x9c, 0x1e, 0x05, 0xa7, 0xb7, 0xa3, 0xe8, 0x9b, 0x3b, 0x41, 0x04, 0x3f, 0x3d, + 0xab, 0x20, 0x23, 0x17, 0x23, 0x1b, 0x4c, 0xfb, 0x02, 0x72, 0x41, 0xc0, 0x06, 0x79, 0x3c, 0x0a, + 0x7c, 0x6d, 0x31, 0x6e, 0xed, 0x3e, 0xe4, 0x43, 0xd3, 0xbe, 0x4b, 0x1d, 0x9f, 0xe0, 0x77, 0x41, + 0xb2, 0x9d, 0x3e, 0xe5, 0x96, 0x73, 0xf5, 0x37, 0xf5, 0xff, 0xef, 0x8f, 0x1e, 0x60, 0x9b, 0x52, + 0xe0, 0xdf, 0xe0, 0x38, 0xad, 0x04, 0xf8, 0x63, 0xdb, 0x67, 0xad, 0x50, 0x45, 0x44, 0xac, 0x7d, + 0x06, 0xc5, 0x99, 0xdb, 0x05, 0x67, 0x99, 0x8d, 0x9c, 0x75, 0xa0, 0x74, 0x97, 0x0c, 0x08, 0x23, + 0xb3, 0xee, 0xb6, 0x5a, 0xa0, 0x1f, 0x11, 0x60, 0x83, 0x98, 0xbd, 0x17, 0xe7, 0x02, 0xef, 0x81, + 0x4c, 0xfb, 0x7d, 0x9f, 0x30, 0xc1, 0x1e, 0x71, 0x8a, 0x39, 0x95, 0x99, 0x72, 0x4a, 0x6b, 0x40, + 0x71, 0x26, 0x1a, 0x51, 0xc9, 0xa9, 0x09, 0x34, 0x6f, 0xa2, 0x67, 0x32, 0x93, 0x1b, 0xce, 0x1b, + 0x5c, 0xd6, 0x7e, 0x4e, 0x83, 0xfc, 0x80, 0x99, 0x6c, 0xe4, 0xe3, 0x16, 0x80, 0xcf, 0x4c, 0x4f, + 0xf0, 0x13, 0xad, 0xc1, 0xcf, 0x5d, 0x81, 0x6b, 0xb0, 0xc0, 0xc8, 0xc8, 0xed, 0x99, 0xc2, 0x48, + 0x7a, 0x1d, 0x23, 0x02, 0xd7, 0x60, 0xb8, 0x00, 0x19, 0x8f, 0xf4, 0x79, 0xaa, 0xbb, 0x46, 0x20, + 0x26, 0x52, 0x92, 0x66, 0x52, 0x2a, 0x41, 0x96, 0x51, 0x66, 0x0e, 0xca, 0x59, 0x7e, 0x1d, 0x1e, + 0xf0, 0x7d, 0xd8, 0x21, 0xdf, 0xb8, 0xa4, 0xcb, 0x48, 0xaf, 0x2c, 0x6f, 0xdc, 0x91, 0xd8, 0x86, + 0x76, 0x13, 0x5e, 0x0e, 0x6b, 0x14, 0x35, 0x5c, 0x04, 0x88, 0xe2, 0x00, 0xb5, 0x4f, 0xe1, 0x46, + 0xa4, 0x12, 0xf3, 0x59, 0xf6, 0xf9, 0x8d, 0x28, 0xe5, 0x5b, 0xcb, 0x18, 0x2d, 0xf0, 0x02, 0xa5, + 0x1d, 0x84, 0x63, 0x12, 0xde, 0x92, 0xd8, 0xf5, 0x1e, 0xc8, 0x7d, 0x7b, 0xc0, 0x88, 0x27, 0xbc, + 0x8b, 0x93, 0xf6, 0x15, 0x94, 0x66, 0xd5, 0x45, 0x18, 0x1f, 0xc2, 0x8e, 0x2f, 0xee, 0xc4, 0x68, + 0xad, 0x18, 0x88, 0x18, 0xae, 0x18, 0xad, 0xfd, 0x8b, 0xa0, 0xf8, 0xd0, 0xb3, 0x17, 0x06, 0xac, + 0x05, 0xb2, 0xd9, 0x65, 0x36, 0x75, 0x78, 0x44, 0x37, 0xea, 0xb7, 0x97, 0xd9, 0xe7, 0x46, 0x1a, + 0x1c, 0x62, 0x08, 0x68, 0x54, 0xd1, 0xf4, 0xb4, 0xe5, 0x71, 0x6b, 0x33, 0x57, 0xb5, 0x56, 0xba, + 0x7e, 0x6b, 0x13, 0xc4, 0xca, 0x5e, 0x3a, 0x2b, 0x72, 0x62, 0x56, 0x9e, 0xa5, 0xa1, 0x34, 0x5b, + 0x00, 0x51, 0xe3, 0xad, 0x54, 0x60, 0x76, 0xfc, 0xd2, 0xdb, 0x18, 0xbf, 0xcc, 0x66, 0xe3, 0xb7, + 0xde, 0xb0, 0x4d, 0x1f, 0x3f, 0xf9, 0xda, 0xef, 0x6b, 0x15, 0xf2, 0x8d, 0x0e, 0xf5, 0xd8, 0x95, + 0x73, 0x76, 0xeb, 0x7b, 0x04, 0xb9, 0x44, 0xf5, 0xf0, 0x1b, 0x20, 0x3d, 0x68, 0x37, 0xda, 0x85, + 0x94, 0x52, 0x3c, 0x3b, 0xaf, 0xbe, 0x92, 0xf8, 0x14, 0xb0, 0x18, 0x57, 0x20, 0xfb, 0xd0, 0x38, + 0x69, 0xdf, 0x2b, 0x20, 0xa5, 0x74, 0x76, 0x5e, 0x2d, 0x24, 0xbe, 0x73, 0x11, 0xdf, 0x04, 0xb9, + 0xf5, 0xc9, 0xe9, 0xe9, 0x49, 0xbb, 0x90, 0x56, 0x5e, 0x3b, 0x3b, 0xaf, 0xbe, 0x9a, 0xd0, 0x68, + 0xf1, 0xad, 0xab, 0x14, 0x7f, 0xf8, 0x45, 0x4d, 0xfd, 0xfe, 0xab, 0x9a, 0xf4, 0x5b, 0xff, 0x53, + 0x86, 0x97, 0x04, 0x0d, 0xb0, 0x29, 0x7e, 0x42, 0xdc, 0x5e, 0x65, 0x67, 0x89, 0xd4, 0x94, 0xb7, + 0x57, 0x53, 0x16, 0x0c, 0x7b, 0x0c, 0x52, 0x30, 0xdd, 0xb8, 0xbe, 0x0c, 0xb5, 0xb8, 0x6f, 0x95, + 0xa3, 0xb5, 0x30, 0xa1, 0xc3, 0x43, 0x84, 0x3f, 0x07, 0x39, 0xdc, 0xa7, 0xf8, 0x78, 0x99, 0x81, + 0xcb, 0xf6, 0xae, 0xb2, 0xb7, 0x40, 0xbb, 0x7b, 0xc1, 0xef, 0xae, 0x20, 0x95, 0x60, 0x69, 0x2d, + 0x4f, 0x65, 0x71, 0xd1, 0x2e, 0x4f, 0xe5, 0x92, 0x75, 0x78, 0x88, 0xb0, 0x15, 0xef, 0xb8, 0x83, + 0x15, 0x1f, 0x61, 0xe1, 0x4f, 0x5f, 0x55, 0x5d, 0xb4, 0xe9, 0x09, 0xe4, 0x93, 0x8f, 0x30, 0x5e, + 0xa9, 0xf4, 0x73, 0x2f, 0xbc, 0x72, 0xbc, 0x1e, 0x48, 0xb8, 0x1e, 0x43, 0x36, 0x64, 0xf4, 0xd1, + 0x4a, 0x8f, 0xcf, 0x5c, 0x61, 0x8f, 0xd7, 0x03, 0x85, 0x3e, 0xf7, 0xd1, 0x21, 0xc2, 0xa7, 0x90, + 0xe5, 0x23, 0x8b, 0x97, 0x12, 0x3a, 0x39, 0xd9, 0x57, 0xb1, 0xa3, 0xf9, 0xe8, 0xe2, 0xb9, 0x9a, + 0xfa, 0xeb, 0xb9, 0x9a, 0xfa, 0x6e, 0xa2, 0xa2, 0x8b, 0x89, 0x8a, 0xfe, 0x98, 0xa8, 0xe8, 0x9f, + 0x89, 0x8a, 0x1e, 0xbd, 0xb7, 0xe9, 0x7f, 0x19, 0xef, 0x08, 0xb1, 0x23, 0x73, 0x5f, 0x47, 0xff, + 0x05, 0x00, 0x00, 0xff, 0xff, 0x07, 0xb9, 0xbc, 0xaf, 0xb0, 0x0c, 0x00, 0x00, } diff --git a/api/services/content/v1/content.proto b/api/services/content/v1/content.proto index cb90edd9d..bafca9066 100644 --- a/api/services/content/v1/content.proto +++ b/api/services/content/v1/content.proto @@ -32,13 +32,16 @@ service Content { // The requested data may be returned in one or more messages. rpc Read(ReadContentRequest) returns (stream ReadContentResponse); - // Status returns the status of ongoing object ingestions, started via + // Status returns the status for a single reference. + rpc Status(StatusRequest) returns (StatusResponse); + + // ListStatuses returns the status of ongoing object ingestions, started via // Write. // // Only those matching the regular expression will be provided in the // response. If the provided regular expression is empty, all ingestions // will be provided. - rpc Status(StatusRequest) returns (StatusResponse); + rpc ListStatuses(ListStatusesRequest) returns (ListStatusesResponse); // Write begins or resumes writes to a resource identified by a unique ref. // Only one active stream may exist at a time for each ref. @@ -124,11 +127,21 @@ message Status { string expected = 6 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false]; } + message StatusRequest { - string filter = 1; + string ref = 1; } message StatusResponse { + Status status = 1; +} + + +message ListStatusesRequest { + string filter = 1; +} + +message ListStatusesResponse { repeated Status statuses = 1 [(gogoproto.nullable) = false]; } diff --git a/cmd/dist/active.go b/cmd/dist/active.go index a5935c5d2..ede21f2c4 100644 --- a/cmd/dist/active.go +++ b/cmd/dist/active.go @@ -40,7 +40,7 @@ var activeCommand = cli.Command{ return err } - active, err := cs.Status(ctx, match) + active, err := cs.ListStatuses(ctx, match) if err != nil { return err } diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go index d8f2f6848..5258da8b8 100644 --- a/cmd/dist/fetch.go +++ b/cmd/dist/fetch.go @@ -123,7 +123,7 @@ outer: activeSeen := map[string]struct{}{} if !done { - active, err := cs.Status(ctx, "") + active, err := cs.ListStatuses(ctx, "") if err != nil { log.G(ctx).WithError(err).Error("active check failed") continue diff --git a/content/content.go b/content/content.go index ca236d66c..b9abcf6ca 100644 --- a/content/content.go +++ b/content/content.go @@ -58,19 +58,19 @@ type Manager interface { // Delete removes the content from the store. Delete(ctx context.Context, dgst digest.Digest) error +} - // Status returns the status of any active ingestions whose ref match the +// IngestManager provides methods for managing ingests. +type IngestManager interface { + // Status returns the status of the provided ref. + Status(ctx context.Context, ref string) (Status, error) + + // ListStatuses returns the status of any active ingestions whose ref match the // provided regular expression. If empty, all active ingestions will be // returned. - // - // TODO(stevvooe): Status may be slighly out of place here. If this remains - // here, we should remove Manager and just define these on store. - Status(ctx context.Context, re string) ([]Status, error) + ListStatuses(ctx context.Context, ref string) ([]Status, error) // Abort completely cancels the ingest operation targeted by ref. - // - // TODO(stevvooe): Same consideration as above. This should really be - // restricted to an ingest management interface. Abort(ctx context.Context, ref string) error } @@ -86,6 +86,7 @@ type Writer interface { // are commonly provided by complete implementations. type Store interface { Manager - Ingester Provider + IngestManager + Ingester } diff --git a/content/content_test.go b/content/content_test.go index 8942cbf03..b73cd61e4 100644 --- a/content/content_test.go +++ b/content/content_test.go @@ -52,7 +52,7 @@ func TestContentWriter(t *testing.T) { } // we should also see this as an active ingestion - ingestions, err := cs.Status(ctx, "") + ingestions, err := cs.ListStatuses(ctx, "") if err != nil { t.Fatal(err) } diff --git a/content/store.go b/content/store.go index a1092f791..643a21c8d 100644 --- a/content/store.go +++ b/content/store.go @@ -136,7 +136,11 @@ func (cs *store) Walk(ctx context.Context, fn WalkFunc) error { }) } -func (s *store) Status(ctx context.Context, re string) ([]Status, error) { +func (s *store) Status(ctx context.Context, ref string) (Status, error) { + return s.status(s.ingestRoot(ref)) +} + +func (s *store) ListStatuses(ctx context.Context, re string) ([]Status, error) { fp, err := os.Open(filepath.Join(s.root, "ingest")) if err != nil { return nil, err diff --git a/metadata/buckets.go b/metadata/buckets.go index d4be495c1..51daceb2a 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -34,7 +34,9 @@ var ( bucketKeyObjectImages = []byte("images") // stores image objects bucketKeyObjectContainers = []byte("containers") // stores container objects bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references - bucketKeyObjectContent = []byte("content") // stores content links + bucketKeyObjectContent = []byte("content") // stores content references + bucketKeyObjectBlob = []byte("blob") // stores content links + bucketKeyObjectIngest = []byte("ingest") // stores content links bucketKeyDigest = []byte("digest") bucketKeyMediaType = []byte("mediatype") @@ -142,18 +144,30 @@ func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Buck return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots, []byte(snapshotter)) } -func createContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) { - bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String())) +func createBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) if err != nil { return nil, err } return bkt, nil } -func getAllContentBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { - return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent) +func getBlobsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob) } -func getContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket { - return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String())) +func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) +} + +func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) + if err != nil { + return nil, err + } + return bkt, nil +} + +func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) } diff --git a/metadata/content.go b/metadata/content.go index e53e01457..d7b2912df 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -4,11 +4,14 @@ import ( "context" "encoding/binary" "io" + "regexp" "github.com/boltdb/bolt" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/namespaces" digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" ) type contentStore struct { @@ -33,9 +36,9 @@ func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.I var info content.Info if err := view(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getContentBucket(tx, ns, dgst) + bkt := getBlobBucket(tx, ns, dgst) if bkt == nil { - return content.ErrNotFound("") + return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) } info.Digest = dgst @@ -56,7 +59,7 @@ func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error { // TODO: Batch results to keep from reading all info into memory var infos []content.Info if err := view(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getAllContentBucket(tx, ns) + bkt := getBlobsBucket(tx, ns) if bkt == nil { return nil } @@ -95,37 +98,121 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { } return update(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getContentBucket(tx, ns, dgst) + bkt := getBlobBucket(tx, ns, dgst) if bkt == nil { - return content.ErrNotFound("") + return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) } // Just remove local reference, garbage collector is responsible for // cleaning up on disk content - return getAllContentBucket(tx, ns).Delete([]byte(dgst.String())) + return getBlobsBucket(tx, ns).Delete([]byte(dgst.String())) }) } -func (cs *contentStore) Status(ctx context.Context, re string) ([]content.Status, error) { - _, err := namespaces.NamespaceRequired(ctx) +func (cs *contentStore) ListStatuses(ctx context.Context, re string) ([]content.Status, error) { + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } - // TODO: Read status keys and match + var rec *regexp.Regexp + if re != "" { + rec, err = regexp.Compile(re) + if err != nil { + return nil, err + } + } - return cs.Store.Status(ctx, re) + var brefs []string + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getIngestBucket(tx, ns) + if bkt == nil { + return nil + } + + return bkt.ForEach(func(k, v []byte) error { + if rec == nil || rec.Match(k) { + brefs = append(brefs, string(v)) + } + return nil + }) + }); err != nil { + return nil, err + } + + statuses := make([]content.Status, 0, len(brefs)) + for _, bref := range brefs { + status, err := cs.Store.Status(ctx, bref) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + status.Ref = trimKey(status.Ref) + + statuses = append(statuses, status) + } + + return statuses, nil + +} + +func getRef(tx *bolt.Tx, ns, ref string) string { + bkt := getIngestBucket(tx, ns) + if bkt == nil { + return "" + } + v := bkt.Get([]byte(ref)) + if len(v) == 0 { + return "" + } + return string(v) +} + +func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return content.Status{}, err + } + + var bref string + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bref = getRef(tx, ns, ref) + if bref == "" { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + + return nil + }); err != nil { + return content.Status{}, err + } + + return cs.Store.Status(ctx, bref) } func (cs *contentStore) Abort(ctx context.Context, ref string) error { - _, err := namespaces.NamespaceRequired(ctx) + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err } - // TODO: Read status key and delete + return update(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getIngestBucket(tx, ns) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + bref := string(bkt.Get([]byte(ref))) + if bref == "" { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + if err := bkt.Delete([]byte(ref)); err != nil { + return err + } + + return cs.Store.Abort(ctx, bref) + }) - return cs.Store.Abort(ctx, ref) } func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { @@ -134,29 +221,44 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe return nil, err } - // TODO: Create ref key - - if expected != "" { - if err := view(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getContentBucket(tx, ns, expected) - if bkt != nil { - return content.ErrExists("") + var w content.Writer + if err := update(ctx, cs.db, func(tx *bolt.Tx) error { + if expected != "" { + cbkt := getBlobBucket(tx, ns, expected) + if cbkt != nil { + return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) } - return nil - }); err != nil { - return nil, err } - } - // Do not use the passed in expected value here since it was - // already checked against the user metadata. If the content - // store has the content, it must still be written before - // linked into the given namespace. It is possible in the future - // to allow content which exists in content store but not - // namespace to be linked here and returned an exist error, but - // this would require more configuration to make secure. - w, err := cs.Store.Writer(ctx, ref, size, "") - if err != nil { + bkt, err := createIngestBucket(tx, ns) + if err != nil { + return err + } + + if len(bkt.Get([]byte(ref))) > 0 { + return errors.Wrapf(errdefs.ErrUnavailable, "ref %v is currently in use", ref) + } + + sid, err := bkt.NextSequence() + if err != nil { + return err + } + + bref := createKey(sid, ns, ref) + if err := bkt.Put([]byte(ref), []byte(bref)); err != nil { + return err + } + + // Do not use the passed in expected value here since it was + // already checked against the user metadata. If the content + // store has the content, it must still be written before + // linked into the given namespace. It is possible in the future + // to allow content which exists in content store but not + // namespace to be linked here and returned an exist error, but + // this would require more configuration to make secure. + w, err = cs.Store.Writer(ctx, bref, size, "") + return err + }); err != nil { return nil, err } @@ -164,6 +266,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe // when no expected is provided there. return &namespacedWriter{ Writer: w, + ref: ref, namespace: ns, db: cs.db, }, nil @@ -171,22 +274,21 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe type namespacedWriter struct { content.Writer + ref string namespace string db *bolt.DB } func (nw *namespacedWriter) Commit(size int64, expected digest.Digest) error { - tx, err := nw.db.Begin(true) - if err != nil { - return err - } - - if err := nw.commit(tx, size, expected); err != nil { - tx.Rollback() - return err - } - - return tx.Commit() + return nw.db.Update(func(tx *bolt.Tx) error { + bkt := getIngestBucket(tx, nw.namespace) + if bkt != nil { + if err := bkt.Delete([]byte(nw.ref)); err != nil { + return err + } + } + return nw.commit(tx, size, expected) + }) } func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest) error { @@ -196,20 +298,18 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige } actual := nw.Writer.Digest() - // TODO: Handle already exists if err := nw.Writer.Commit(size, expected); err != nil { - if !content.IsExists(err) { + if !errdefs.IsAlreadyExists(err) { return err } - if getContentBucket(tx, nw.namespace, actual) != nil { - return content.ErrExists("") + if getBlobBucket(tx, nw.namespace, actual) != nil { + return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) } - // Link into this namespace } size = status.Total - bkt, err := createContentBucket(tx, nw.namespace, actual) + bkt, err := createBlobBucket(tx, nw.namespace, actual) if err != nil { return err } @@ -257,9 +357,9 @@ func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) err } return view(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getContentBucket(tx, ns, dgst) + bkt := getBlobBucket(tx, ns, dgst) if bkt == nil { - return content.ErrNotFound("") + return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) } return nil }) diff --git a/metadata/snapshot.go b/metadata/snapshot.go index 0ac3133ef..f2267ec53 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -29,11 +29,11 @@ func NewSnapshotter(db *bolt.DB, name string, sn snapshot.Snapshotter) snapshot. } } -func snapshotKey(id uint64, namespace, key string) string { +func createKey(id uint64, namespace, key string) string { return fmt.Sprintf("%s/%d/%s", namespace, id, key) } -func trimName(key string) string { +func trimKey(key string) string { parts := strings.SplitN(key, "/", 3) if len(parts) < 3 { return "" @@ -82,9 +82,9 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro if err != nil { return snapshot.Info{}, err } - info.Name = trimName(info.Name) + info.Name = trimKey(info.Name) if info.Parent != "" { - info.Parent = trimName(info.Parent) + info.Parent = trimKey(info.Parent) } return info, nil @@ -143,7 +143,7 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re if err != nil { return err } - bkey = snapshotKey(sid, ns, key) + bkey = createKey(sid, ns, key) if err := bkt.Put([]byte(key), []byte(bkey)); err != nil { return err } @@ -188,7 +188,7 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string) error { if err != nil { return err } - nameKey = snapshotKey(sid, ns, name) + nameKey = createKey(sid, ns, name) if err := bkt.Put([]byte(name), []byte(nameKey)); err != nil { return err } @@ -259,9 +259,9 @@ func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapsho return err } - info.Name = trimName(info.Name) + info.Name = trimKey(info.Name) if info.Parent != "" { - info.Parent = trimName(info.Parent) + info.Parent = trimKey(info.Parent) } if err := fn(ctx, info); err != nil { return err diff --git a/services/content/service.go b/services/content/service.go index 62f1997e7..d84cc529b 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -224,12 +224,31 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) { } func (s *Service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) { - statuses, err := s.store.Status(ctx, req.Filter) + status, err := s.store.Status(ctx, req.Ref) + if err != nil { + return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref) + } + + var resp api.StatusResponse + resp.Status = &api.Status{ + StartedAt: status.StartedAt, + UpdatedAt: status.UpdatedAt, + Ref: status.Ref, + Offset: status.Offset, + Total: status.Total, + Expected: status.Expected, + } + + return &resp, nil +} + +func (s *Service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) { + statuses, err := s.store.ListStatuses(ctx, req.Filter) if err != nil { return nil, errdefs.ToGRPCf(err, "could not get status for filter %q", req.Filter) } - var resp api.StatusResponse + var resp api.ListStatusesResponse for _, status := range statuses { resp.Statuses = append(resp.Statuses, api.Status{ StartedAt: status.StartedAt, diff --git a/services/content/store.go b/services/content/store.go index 2153e3d6e..58172d884 100644 --- a/services/content/store.go +++ b/services/content/store.go @@ -94,8 +94,27 @@ func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.Rea }, nil } -func (rs *remoteStore) Status(ctx context.Context, filter string) ([]content.Status, error) { +func (rs *remoteStore) Status(ctx context.Context, ref string) (content.Status, error) { resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{ + Ref: ref, + }) + if err != nil { + return content.Status{}, errdefs.FromGRPC(err) + } + + status := resp.Status + return content.Status{ + Ref: status.Ref, + StartedAt: status.StartedAt, + UpdatedAt: status.UpdatedAt, + Offset: status.Offset, + Total: status.Total, + Expected: status.Expected, + }, nil +} + +func (rs *remoteStore) ListStatuses(ctx context.Context, filter string) ([]content.Status, error) { + resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{ Filter: filter, }) if err != nil { From 4322664b882351583b2f451f44e16cb1438c2eb2 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 6 Jul 2017 15:37:25 -0700 Subject: [PATCH 3/4] Update task service to use metadata content store Address feedback and fix issues Signed-off-by: Derek McGowan --- api/services/content/v1/content.proto | 1 - metadata/buckets.go | 2 +- metadata/content.go | 7 +++++-- services/diff/service.go | 5 +++-- services/tasks/service.go | 3 ++- task.go | 2 +- 6 files changed, 12 insertions(+), 8 deletions(-) diff --git a/api/services/content/v1/content.proto b/api/services/content/v1/content.proto index bafca9066..0c48a37ad 100644 --- a/api/services/content/v1/content.proto +++ b/api/services/content/v1/content.proto @@ -136,7 +136,6 @@ message StatusResponse { Status status = 1; } - message ListStatusesRequest { string filter = 1; } diff --git a/metadata/buckets.go b/metadata/buckets.go index 51daceb2a..0ba3126bb 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -36,7 +36,7 @@ var ( bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references bucketKeyObjectContent = []byte("content") // stores content references bucketKeyObjectBlob = []byte("blob") // stores content links - bucketKeyObjectIngest = []byte("ingest") // stores content links + bucketKeyObjectIngest = []byte("ingest") // stores ingest links bucketKeyDigest = []byte("digest") bucketKeyMediaType = []byte("mediatype") diff --git a/metadata/content.go b/metadata/content.go index d7b2912df..74e6add54 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -296,6 +296,11 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige if err != nil { return err } + if size != 0 && size != status.Offset { + return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) + } + size = status.Offset + actual := nw.Writer.Digest() if err := nw.Writer.Commit(size, expected); err != nil { @@ -307,8 +312,6 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige } } - size = status.Total - bkt, err := createBlobBucket(tx, nw.namespace, actual) if err != nil { return err diff --git a/services/diff/service.go b/services/diff/service.go index b97908ace..b4d1a150b 100644 --- a/services/diff/service.go +++ b/services/diff/service.go @@ -3,6 +3,7 @@ package diff import ( diffapi "github.com/containerd/containerd/api/services/diff/v1" "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" "golang.org/x/net/context" @@ -45,7 +46,7 @@ func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi ocidesc, err := s.diff.Apply(ctx, desc, mounts) if err != nil { - return nil, err + return nil, errdefs.ToGRPC(err) } return &diffapi.ApplyResponse{ @@ -60,7 +61,7 @@ func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.D ocidesc, err := s.diff.DiffMounts(ctx, aMounts, bMounts, dr.MediaType, dr.Ref) if err != nil { - return nil, err + return nil, errdefs.ToGRPC(err) } return &diffapi.DiffResponse{ diff --git a/services/tasks/service.go b/services/tasks/service.go index 63a273872..81bd61910 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -62,6 +62,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + cs := metadata.NewContentStore(m.(*bolt.DB), ct.(content.Store)) runtimes := make(map[string]runtime.Runtime) for _, rr := range rt { r := rr.(runtime.Runtime) @@ -71,7 +72,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { return &Service{ runtimes: runtimes, db: m.(*bolt.DB), - store: ct.(content.Store), + store: cs, emitter: e, }, nil } diff --git a/task.go b/task.go index bfbaa6453..60f894686 100644 --- a/task.go +++ b/task.go @@ -346,7 +346,7 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin if err != nil { return d, err } - if err := writer.Commit(0, ""); err != nil { + if err := writer.Commit(size, ""); err != nil { return d, err } return v1.Descriptor{ From 46deddf460a5489dc9f418bd1354ad8de6c77e7a Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 11 Jul 2017 09:28:43 -0700 Subject: [PATCH 4/4] Update list statuses to use filters Signed-off-by: Derek McGowan --- api/services/content/v1/content.pb.go | 153 ++++++++++++++------------ api/services/content/v1/content.proto | 2 +- content/content.go | 2 +- content/store.go | 26 +++-- metadata/adaptors.go | 15 +++ metadata/content.go | 28 +++-- services/content/service.go | 4 +- services/content/store.go | 4 +- 8 files changed, 135 insertions(+), 99 deletions(-) diff --git a/api/services/content/v1/content.pb.go b/api/services/content/v1/content.pb.go index 9856277f6..e9a01ee49 100644 --- a/api/services/content/v1/content.pb.go +++ b/api/services/content/v1/content.pb.go @@ -213,7 +213,7 @@ func (*StatusResponse) ProtoMessage() {} func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptorContent, []int{10} } type ListStatusesRequest struct { - Filter string `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` + Filters []string `protobuf:"bytes,1,rep,name=filters" json:"filters,omitempty"` } func (m *ListStatusesRequest) Reset() { *m = ListStatusesRequest{} } @@ -1160,11 +1160,20 @@ func (m *ListStatusesRequest) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Filter) > 0 { - dAtA[i] = 0xa - i++ - i = encodeVarintContent(dAtA, i, uint64(len(m.Filter))) - i += copy(dAtA[i:], m.Filter) + if len(m.Filters) > 0 { + for _, s := range m.Filters { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } } return i, nil } @@ -1493,9 +1502,11 @@ func (m *StatusResponse) Size() (n int) { func (m *ListStatusesRequest) Size() (n int) { var l int _ = l - l = len(m.Filter) - if l > 0 { - n += 1 + l + sovContent(uint64(l)) + if len(m.Filters) > 0 { + for _, s := range m.Filters { + l = len(s) + n += 1 + l + sovContent(uint64(l)) + } } return n } @@ -1709,7 +1720,7 @@ func (this *ListStatusesRequest) String() string { return "nil" } s := strings.Join([]string{`&ListStatusesRequest{`, - `Filter:` + fmt.Sprintf("%v", this.Filter) + `,`, + `Filters:` + fmt.Sprintf("%v", this.Filters) + `,`, `}`, }, "") return s @@ -2885,7 +2896,7 @@ func (m *ListStatusesRequest) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Filter", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Filters", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -2910,7 +2921,7 @@ func (m *ListStatusesRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Filter = string(dAtA[iNdEx:postIndex]) + m.Filters = append(m.Filters, string(dAtA[iNdEx:postIndex])) iNdEx = postIndex default: iNdEx = preIndex @@ -3595,63 +3606,63 @@ func init() { } var fileDescriptorContent = []byte{ - // 925 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x6f, 0xe3, 0x44, - 0x14, 0xcf, 0x24, 0x8e, 0x69, 0x5f, 0xc2, 0x12, 0x26, 0xa1, 0x8a, 0x8c, 0x70, 0xb2, 0x16, 0x42, - 0xd5, 0x2e, 0x75, 0xba, 0x69, 0x6f, 0x48, 0x88, 0x24, 0xbb, 0x40, 0x11, 0x5d, 0x90, 0x37, 0xb0, - 0x62, 0x2f, 0xe0, 0x24, 0x13, 0x63, 0x29, 0xf1, 0x78, 0xed, 0x49, 0xc4, 0x72, 0xe2, 0x82, 0x84, - 0x2a, 0x0e, 0x7c, 0x81, 0x5e, 0x80, 0x3b, 0x77, 0x3e, 0x41, 0x8f, 0x5c, 0x90, 0x10, 0x87, 0x5d, - 0x36, 0x1f, 0x04, 0x21, 0x8f, 0xc7, 0x8e, 0x93, 0xb4, 0xe4, 0x4f, 0xb3, 0xa7, 0xbe, 0x19, 0xbf, - 0xdf, 0xfb, 0xfb, 0x7b, 0xf3, 0x52, 0x78, 0xdf, 0xb2, 0xd9, 0xd7, 0xa3, 0x8e, 0xde, 0xa5, 0xc3, - 0x5a, 0x97, 0x3a, 0xcc, 0xb4, 0x1d, 0xe2, 0xf5, 0x92, 0xa2, 0xe9, 0xda, 0x35, 0x9f, 0x78, 0x63, - 0xbb, 0x4b, 0x7c, 0x7e, 0x4f, 0x1c, 0x56, 0x1b, 0xdf, 0x89, 0x44, 0xdd, 0xf5, 0x28, 0xa3, 0x58, - 0x9d, 0x22, 0xf4, 0x48, 0x5b, 0x8f, 0x54, 0xc6, 0x77, 0x94, 0x92, 0x45, 0x2d, 0xca, 0x55, 0x6b, - 0x81, 0x14, 0xa2, 0x94, 0x8a, 0x45, 0xa9, 0x35, 0x20, 0x35, 0x7e, 0xea, 0x8c, 0xfa, 0x35, 0x66, - 0x0f, 0x89, 0xcf, 0xcc, 0xa1, 0x2b, 0x14, 0x5e, 0x9f, 0x57, 0x20, 0x43, 0x97, 0x3d, 0x09, 0x3f, - 0x6a, 0xbf, 0x21, 0x90, 0x4e, 0x9c, 0x3e, 0xc5, 0x1f, 0x81, 0xdc, 0xb3, 0x2d, 0xe2, 0xb3, 0x32, - 0xaa, 0xa2, 0xfd, 0xdd, 0x66, 0xfd, 0xe2, 0x69, 0x25, 0xf5, 0xf7, 0xd3, 0xca, 0xad, 0x44, 0x72, - 0xd4, 0x25, 0x4e, 0x1c, 0xa3, 0x5f, 0xb3, 0xe8, 0x41, 0x08, 0xd1, 0xef, 0xf2, 0x3f, 0x86, 0xb0, - 0x80, 0x31, 0x48, 0xbe, 0xfd, 0x2d, 0x29, 0xa7, 0xab, 0x68, 0x3f, 0x63, 0x70, 0x19, 0x7f, 0x00, - 0xf9, 0x2e, 0x1d, 0x0e, 0x6d, 0xc6, 0x48, 0xef, 0x4b, 0x93, 0x95, 0x33, 0x55, 0xb4, 0x9f, 0xab, - 0x2b, 0x7a, 0x18, 0x9c, 0x1e, 0x05, 0xa7, 0xb7, 0xa3, 0xe8, 0x9b, 0x3b, 0x41, 0x04, 0x3f, 0x3d, - 0xab, 0x20, 0x23, 0x17, 0x23, 0x1b, 0x4c, 0xfb, 0x02, 0x72, 0x41, 0xc0, 0x06, 0x79, 0x3c, 0x0a, - 0x7c, 0x6d, 0x31, 0x6e, 0xed, 0x3e, 0xe4, 0x43, 0xd3, 0xbe, 0x4b, 0x1d, 0x9f, 0xe0, 0x77, 0x41, - 0xb2, 0x9d, 0x3e, 0xe5, 0x96, 0x73, 0xf5, 0x37, 0xf5, 0xff, 0xef, 0x8f, 0x1e, 0x60, 0x9b, 0x52, - 0xe0, 0xdf, 0xe0, 0x38, 0xad, 0x04, 0xf8, 0x63, 0xdb, 0x67, 0xad, 0x50, 0x45, 0x44, 0xac, 0x7d, - 0x06, 0xc5, 0x99, 0xdb, 0x05, 0x67, 0x99, 0x8d, 0x9c, 0x75, 0xa0, 0x74, 0x97, 0x0c, 0x08, 0x23, - 0xb3, 0xee, 0xb6, 0x5a, 0xa0, 0x1f, 0x11, 0x60, 0x83, 0x98, 0xbd, 0x17, 0xe7, 0x02, 0xef, 0x81, - 0x4c, 0xfb, 0x7d, 0x9f, 0x30, 0xc1, 0x1e, 0x71, 0x8a, 0x39, 0x95, 0x99, 0x72, 0x4a, 0x6b, 0x40, - 0x71, 0x26, 0x1a, 0x51, 0xc9, 0xa9, 0x09, 0x34, 0x6f, 0xa2, 0x67, 0x32, 0x93, 0x1b, 0xce, 0x1b, - 0x5c, 0xd6, 0x7e, 0x4e, 0x83, 0xfc, 0x80, 0x99, 0x6c, 0xe4, 0xe3, 0x16, 0x80, 0xcf, 0x4c, 0x4f, - 0xf0, 0x13, 0xad, 0xc1, 0xcf, 0x5d, 0x81, 0x6b, 0xb0, 0xc0, 0xc8, 0xc8, 0xed, 0x99, 0xc2, 0x48, - 0x7a, 0x1d, 0x23, 0x02, 0xd7, 0x60, 0xb8, 0x00, 0x19, 0x8f, 0xf4, 0x79, 0xaa, 0xbb, 0x46, 0x20, - 0x26, 0x52, 0x92, 0x66, 0x52, 0x2a, 0x41, 0x96, 0x51, 0x66, 0x0e, 0xca, 0x59, 0x7e, 0x1d, 0x1e, - 0xf0, 0x7d, 0xd8, 0x21, 0xdf, 0xb8, 0xa4, 0xcb, 0x48, 0xaf, 0x2c, 0x6f, 0xdc, 0x91, 0xd8, 0x86, - 0x76, 0x13, 0x5e, 0x0e, 0x6b, 0x14, 0x35, 0x5c, 0x04, 0x88, 0xe2, 0x00, 0xb5, 0x4f, 0xe1, 0x46, - 0xa4, 0x12, 0xf3, 0x59, 0xf6, 0xf9, 0x8d, 0x28, 0xe5, 0x5b, 0xcb, 0x18, 0x2d, 0xf0, 0x02, 0xa5, - 0x1d, 0x84, 0x63, 0x12, 0xde, 0x92, 0xd8, 0xf5, 0x1e, 0xc8, 0x7d, 0x7b, 0xc0, 0x88, 0x27, 0xbc, - 0x8b, 0x93, 0xf6, 0x15, 0x94, 0x66, 0xd5, 0x45, 0x18, 0x1f, 0xc2, 0x8e, 0x2f, 0xee, 0xc4, 0x68, - 0xad, 0x18, 0x88, 0x18, 0xae, 0x18, 0xad, 0xfd, 0x8b, 0xa0, 0xf8, 0xd0, 0xb3, 0x17, 0x06, 0xac, - 0x05, 0xb2, 0xd9, 0x65, 0x36, 0x75, 0x78, 0x44, 0x37, 0xea, 0xb7, 0x97, 0xd9, 0xe7, 0x46, 0x1a, - 0x1c, 0x62, 0x08, 0x68, 0x54, 0xd1, 0xf4, 0xb4, 0xe5, 0x71, 0x6b, 0x33, 0x57, 0xb5, 0x56, 0xba, - 0x7e, 0x6b, 0x13, 0xc4, 0xca, 0x5e, 0x3a, 0x2b, 0x72, 0x62, 0x56, 0x9e, 0xa5, 0xa1, 0x34, 0x5b, - 0x00, 0x51, 0xe3, 0xad, 0x54, 0x60, 0x76, 0xfc, 0xd2, 0xdb, 0x18, 0xbf, 0xcc, 0x66, 0xe3, 0xb7, - 0xde, 0xb0, 0x4d, 0x1f, 0x3f, 0xf9, 0xda, 0xef, 0x6b, 0x15, 0xf2, 0x8d, 0x0e, 0xf5, 0xd8, 0x95, - 0x73, 0x76, 0xeb, 0x7b, 0x04, 0xb9, 0x44, 0xf5, 0xf0, 0x1b, 0x20, 0x3d, 0x68, 0x37, 0xda, 0x85, - 0x94, 0x52, 0x3c, 0x3b, 0xaf, 0xbe, 0x92, 0xf8, 0x14, 0xb0, 0x18, 0x57, 0x20, 0xfb, 0xd0, 0x38, - 0x69, 0xdf, 0x2b, 0x20, 0xa5, 0x74, 0x76, 0x5e, 0x2d, 0x24, 0xbe, 0x73, 0x11, 0xdf, 0x04, 0xb9, - 0xf5, 0xc9, 0xe9, 0xe9, 0x49, 0xbb, 0x90, 0x56, 0x5e, 0x3b, 0x3b, 0xaf, 0xbe, 0x9a, 0xd0, 0x68, - 0xf1, 0xad, 0xab, 0x14, 0x7f, 0xf8, 0x45, 0x4d, 0xfd, 0xfe, 0xab, 0x9a, 0xf4, 0x5b, 0xff, 0x53, - 0x86, 0x97, 0x04, 0x0d, 0xb0, 0x29, 0x7e, 0x42, 0xdc, 0x5e, 0x65, 0x67, 0x89, 0xd4, 0x94, 0xb7, - 0x57, 0x53, 0x16, 0x0c, 0x7b, 0x0c, 0x52, 0x30, 0xdd, 0xb8, 0xbe, 0x0c, 0xb5, 0xb8, 0x6f, 0x95, - 0xa3, 0xb5, 0x30, 0xa1, 0xc3, 0x43, 0x84, 0x3f, 0x07, 0x39, 0xdc, 0xa7, 0xf8, 0x78, 0x99, 0x81, - 0xcb, 0xf6, 0xae, 0xb2, 0xb7, 0x40, 0xbb, 0x7b, 0xc1, 0xef, 0xae, 0x20, 0x95, 0x60, 0x69, 0x2d, - 0x4f, 0x65, 0x71, 0xd1, 0x2e, 0x4f, 0xe5, 0x92, 0x75, 0x78, 0x88, 0xb0, 0x15, 0xef, 0xb8, 0x83, - 0x15, 0x1f, 0x61, 0xe1, 0x4f, 0x5f, 0x55, 0x5d, 0xb4, 0xe9, 0x09, 0xe4, 0x93, 0x8f, 0x30, 0x5e, - 0xa9, 0xf4, 0x73, 0x2f, 0xbc, 0x72, 0xbc, 0x1e, 0x48, 0xb8, 0x1e, 0x43, 0x36, 0x64, 0xf4, 0xd1, - 0x4a, 0x8f, 0xcf, 0x5c, 0x61, 0x8f, 0xd7, 0x03, 0x85, 0x3e, 0xf7, 0xd1, 0x21, 0xc2, 0xa7, 0x90, - 0xe5, 0x23, 0x8b, 0x97, 0x12, 0x3a, 0x39, 0xd9, 0x57, 0xb1, 0xa3, 0xf9, 0xe8, 0xe2, 0xb9, 0x9a, - 0xfa, 0xeb, 0xb9, 0x9a, 0xfa, 0x6e, 0xa2, 0xa2, 0x8b, 0x89, 0x8a, 0xfe, 0x98, 0xa8, 0xe8, 0x9f, - 0x89, 0x8a, 0x1e, 0xbd, 0xb7, 0xe9, 0x7f, 0x19, 0xef, 0x08, 0xb1, 0x23, 0x73, 0x5f, 0x47, 0xff, - 0x05, 0x00, 0x00, 0xff, 0xff, 0x07, 0xb9, 0xbc, 0xaf, 0xb0, 0x0c, 0x00, 0x00, + // 927 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x8f, 0xdb, 0x44, + 0x14, 0xcf, 0x24, 0x8e, 0xbb, 0xfb, 0x12, 0x4a, 0x98, 0x84, 0x2a, 0x32, 0xc2, 0x49, 0x2d, 0x84, + 0x56, 0x2d, 0xb5, 0xb7, 0xd9, 0xbd, 0x21, 0x21, 0xb2, 0x69, 0x81, 0x45, 0x6c, 0x41, 0x6e, 0xa0, + 0xa2, 0x17, 0x70, 0x92, 0x89, 0xb1, 0x94, 0x78, 0x5c, 0x7b, 0x12, 0x51, 0x4e, 0x5c, 0x90, 0xd0, + 0x8a, 0x03, 0x5f, 0x60, 0x2f, 0xc0, 0x9d, 0x3b, 0x9f, 0x60, 0x8f, 0x5c, 0x90, 0x10, 0x87, 0x96, + 0xe6, 0x83, 0x20, 0xe4, 0xf1, 0xd8, 0x71, 0x92, 0x5d, 0xf2, 0x67, 0xc3, 0x69, 0xdf, 0x8c, 0xdf, + 0xef, 0xfd, 0xff, 0xcd, 0xcb, 0xc2, 0x7b, 0xb6, 0xc3, 0xbe, 0x1a, 0x75, 0xf4, 0x2e, 0x1d, 0x1a, + 0x5d, 0xea, 0x32, 0xcb, 0x71, 0x89, 0xdf, 0x4b, 0x8b, 0x96, 0xe7, 0x18, 0x01, 0xf1, 0xc7, 0x4e, + 0x97, 0x04, 0xfc, 0x9e, 0xb8, 0xcc, 0x18, 0xdf, 0x8d, 0x45, 0xdd, 0xf3, 0x29, 0xa3, 0x58, 0x9d, + 0x22, 0xf4, 0x58, 0x5b, 0x8f, 0x55, 0xc6, 0x77, 0x95, 0x8a, 0x4d, 0x6d, 0xca, 0x55, 0x8d, 0x50, + 0x8a, 0x50, 0x4a, 0xcd, 0xa6, 0xd4, 0x1e, 0x10, 0x83, 0x9f, 0x3a, 0xa3, 0xbe, 0xc1, 0x9c, 0x21, + 0x09, 0x98, 0x35, 0xf4, 0x84, 0xc2, 0x6b, 0xf3, 0x0a, 0x64, 0xe8, 0xb1, 0xa7, 0xd1, 0x47, 0xed, + 0x57, 0x04, 0xd2, 0xb1, 0xdb, 0xa7, 0xf8, 0x43, 0x90, 0x7b, 0x8e, 0x4d, 0x02, 0x56, 0x45, 0x75, + 0xb4, 0xb7, 0x7b, 0xd4, 0x38, 0x7f, 0x56, 0xcb, 0xfc, 0xf5, 0xac, 0x76, 0x2b, 0x95, 0x1c, 0xf5, + 0x88, 0x9b, 0xc4, 0x18, 0x18, 0x36, 0xbd, 0x13, 0x41, 0xf4, 0x7b, 0xfc, 0x8f, 0x29, 0x2c, 0x60, + 0x0c, 0x52, 0xe0, 0x7c, 0x43, 0xaa, 0xd9, 0x3a, 0xda, 0xcb, 0x99, 0x5c, 0xc6, 0xef, 0x43, 0xb1, + 0x4b, 0x87, 0x43, 0x87, 0x31, 0xd2, 0xfb, 0xc2, 0x62, 0xd5, 0x5c, 0x1d, 0xed, 0x15, 0x1a, 0x8a, + 0x1e, 0x05, 0xa7, 0xc7, 0xc1, 0xe9, 0xed, 0x38, 0xfa, 0xa3, 0x9d, 0x30, 0x82, 0x1f, 0x9f, 0xd7, + 0x90, 0x59, 0x48, 0x90, 0x4d, 0xa6, 0x7d, 0x0e, 0x85, 0x30, 0x60, 0x93, 0x3c, 0x19, 0x85, 0xbe, + 0xb6, 0x18, 0xb7, 0xf6, 0x00, 0x8a, 0x91, 0xe9, 0xc0, 0xa3, 0x6e, 0x40, 0xf0, 0x3b, 0x20, 0x39, + 0x6e, 0x9f, 0x72, 0xcb, 0x85, 0xc6, 0x1b, 0xfa, 0x7f, 0xf7, 0x47, 0x0f, 0xb1, 0x47, 0x52, 0xe8, + 0xdf, 0xe4, 0x38, 0xad, 0x02, 0xf8, 0x23, 0x27, 0x60, 0xad, 0x48, 0x45, 0x44, 0xac, 0x7d, 0x0a, + 0xe5, 0x99, 0xdb, 0x05, 0x67, 0xb9, 0x8d, 0x9c, 0x75, 0xa0, 0x72, 0x8f, 0x0c, 0x08, 0x23, 0xb3, + 0xee, 0xb6, 0x5a, 0xa0, 0x1f, 0x10, 0x60, 0x93, 0x58, 0xbd, 0xff, 0xcf, 0x05, 0xbe, 0x01, 0x32, + 0xed, 0xf7, 0x03, 0xc2, 0xc4, 0xf4, 0x88, 0x53, 0x32, 0x53, 0xb9, 0xe9, 0x4c, 0x69, 0x4d, 0x28, + 0xcf, 0x44, 0x23, 0x2a, 0x39, 0x35, 0x81, 0xe6, 0x4d, 0xf4, 0x2c, 0x66, 0x71, 0xc3, 0x45, 0x93, + 0xcb, 0xda, 0x4f, 0x59, 0x90, 0x1f, 0x32, 0x8b, 0x8d, 0x02, 0xdc, 0x02, 0x08, 0x98, 0xe5, 0x8b, + 0xf9, 0x44, 0x6b, 0xcc, 0xe7, 0xae, 0xc0, 0x35, 0x59, 0x68, 0x64, 0xe4, 0xf5, 0x2c, 0x61, 0x24, + 0xbb, 0x8e, 0x11, 0x81, 0x6b, 0x32, 0x5c, 0x82, 0x9c, 0x4f, 0xfa, 0x3c, 0xd5, 0x5d, 0x33, 0x14, + 0x53, 0x29, 0x49, 0x33, 0x29, 0x55, 0x20, 0xcf, 0x28, 0xb3, 0x06, 0xd5, 0x3c, 0xbf, 0x8e, 0x0e, + 0xf8, 0x01, 0xec, 0x90, 0xaf, 0x3d, 0xd2, 0x65, 0xa4, 0x57, 0x95, 0x37, 0xee, 0x48, 0x62, 0x43, + 0xbb, 0x09, 0x2f, 0x45, 0x35, 0x8a, 0x1b, 0x2e, 0x02, 0x44, 0x49, 0x80, 0xda, 0x27, 0x70, 0x3d, + 0x56, 0x49, 0xe6, 0x59, 0x0e, 0xf8, 0x8d, 0x28, 0xe5, 0x9b, 0xcb, 0x26, 0x5a, 0xe0, 0x05, 0x4a, + 0x33, 0x22, 0x9a, 0x44, 0xb7, 0x24, 0x71, 0x5d, 0x85, 0x6b, 0x7d, 0x67, 0xc0, 0x88, 0x1f, 0x70, + 0xa6, 0xec, 0x9a, 0xf1, 0x51, 0xfb, 0x12, 0x2a, 0xb3, 0x00, 0x11, 0xc8, 0x07, 0xb0, 0x13, 0x88, + 0x3b, 0x41, 0xae, 0x15, 0x43, 0x11, 0xf4, 0x4a, 0xd0, 0xda, 0x3f, 0x08, 0xca, 0x8f, 0x7c, 0x67, + 0x81, 0x62, 0x2d, 0x90, 0xad, 0x2e, 0x73, 0xa8, 0xcb, 0x53, 0xbd, 0xde, 0xb8, 0xbd, 0xcc, 0x3e, + 0x37, 0xd2, 0xe4, 0x10, 0x53, 0x40, 0xe3, 0x9a, 0x66, 0xa7, 0x4d, 0x4f, 0x9a, 0x9b, 0xbb, 0xac, + 0xb9, 0xd2, 0xd5, 0x9b, 0x9b, 0x1a, 0xad, 0xfc, 0x85, 0x6c, 0x91, 0x53, 0x6c, 0x79, 0x9e, 0x85, + 0xca, 0x6c, 0x01, 0x44, 0x8d, 0xb7, 0x52, 0x81, 0x59, 0x02, 0x66, 0xb7, 0x41, 0xc0, 0xdc, 0x66, + 0x04, 0x5c, 0x8f, 0x6e, 0xd3, 0xe7, 0x4f, 0xbe, 0xf2, 0x0b, 0x5b, 0x87, 0x62, 0xb3, 0x43, 0x7d, + 0x76, 0x29, 0xd3, 0x6e, 0x7d, 0x87, 0xa0, 0x90, 0xaa, 0x1e, 0x7e, 0x1d, 0xa4, 0x87, 0xed, 0x66, + 0xbb, 0x94, 0x51, 0xca, 0xa7, 0x67, 0xf5, 0x97, 0x53, 0x9f, 0xc2, 0x29, 0xc6, 0x35, 0xc8, 0x3f, + 0x32, 0x8f, 0xdb, 0xf7, 0x4b, 0x48, 0xa9, 0x9c, 0x9e, 0xd5, 0x4b, 0xa9, 0xef, 0x5c, 0xc4, 0x37, + 0x41, 0x6e, 0x7d, 0x7c, 0x72, 0x72, 0xdc, 0x2e, 0x65, 0x95, 0x57, 0x4f, 0xcf, 0xea, 0xaf, 0xa4, + 0x34, 0x5a, 0x7c, 0xef, 0x2a, 0xe5, 0xef, 0x7f, 0x56, 0x33, 0xbf, 0xfd, 0xa2, 0xa6, 0xfd, 0x36, + 0xfe, 0x90, 0xe1, 0x9a, 0x18, 0x03, 0x6c, 0x89, 0x1f, 0x11, 0xb7, 0x57, 0xd9, 0x5a, 0x22, 0x35, + 0xe5, 0xad, 0xd5, 0x94, 0xc5, 0x84, 0x3d, 0x01, 0x29, 0x64, 0x37, 0x6e, 0x2c, 0x43, 0x2d, 0x6e, + 0x5c, 0xe5, 0x60, 0x2d, 0x4c, 0xe4, 0x70, 0x1f, 0xe1, 0xcf, 0x40, 0x8e, 0x36, 0x2a, 0x3e, 0x5c, + 0x66, 0xe0, 0xa2, 0xcd, 0xab, 0xdc, 0x58, 0x18, 0xbb, 0xfb, 0xe1, 0x2f, 0xaf, 0x30, 0x95, 0x70, + 0x6d, 0x2d, 0x4f, 0x65, 0x71, 0xd5, 0x2e, 0x4f, 0xe5, 0x82, 0x85, 0xb8, 0x8f, 0xb0, 0x9d, 0x6c, + 0xb9, 0x3b, 0x2b, 0x3e, 0xc3, 0xc2, 0x9f, 0xbe, 0xaa, 0xba, 0x68, 0xd3, 0x53, 0x28, 0xa6, 0x1f, + 0x61, 0xbc, 0x52, 0xe9, 0xe7, 0xde, 0x78, 0xe5, 0x70, 0x3d, 0x90, 0x70, 0x3d, 0x86, 0x7c, 0x34, + 0xd1, 0x07, 0x2b, 0x3d, 0x3e, 0x73, 0x85, 0x3d, 0x5c, 0x0f, 0x14, 0xf9, 0xdc, 0x43, 0xfb, 0x08, + 0x9f, 0x40, 0x9e, 0x53, 0x16, 0x2f, 0x1d, 0xe8, 0x34, 0xb3, 0x2f, 0x9b, 0x8e, 0xa3, 0xc7, 0xe7, + 0x2f, 0xd4, 0xcc, 0x9f, 0x2f, 0xd4, 0xcc, 0xb7, 0x13, 0x15, 0x9d, 0x4f, 0x54, 0xf4, 0xfb, 0x44, + 0x45, 0x7f, 0x4f, 0x54, 0xf4, 0xf8, 0xdd, 0x4d, 0xff, 0xcf, 0x78, 0x5b, 0x88, 0x1d, 0x99, 0xfb, + 0x3a, 0xf8, 0x37, 0x00, 0x00, 0xff, 0xff, 0xc6, 0xb1, 0x1a, 0xcd, 0xb2, 0x0c, 0x00, 0x00, } diff --git a/api/services/content/v1/content.proto b/api/services/content/v1/content.proto index 0c48a37ad..5bb554980 100644 --- a/api/services/content/v1/content.proto +++ b/api/services/content/v1/content.proto @@ -137,7 +137,7 @@ message StatusResponse { } message ListStatusesRequest { - string filter = 1; + repeated string filters = 1; } message ListStatusesResponse { diff --git a/content/content.go b/content/content.go index b9abcf6ca..2c91b99d5 100644 --- a/content/content.go +++ b/content/content.go @@ -68,7 +68,7 @@ type IngestManager interface { // ListStatuses returns the status of any active ingestions whose ref match the // provided regular expression. If empty, all active ingestions will be // returned. - ListStatuses(ctx context.Context, ref string) ([]Status, error) + ListStatuses(ctx context.Context, filters ...string) ([]Status, error) // Abort completely cancels the ingest operation targeted by ref. Abort(ctx context.Context, ref string) error diff --git a/content/store.go b/content/store.go index 643a21c8d..ba95199d8 100644 --- a/content/store.go +++ b/content/store.go @@ -7,11 +7,11 @@ import ( "io/ioutil" "os" "path/filepath" - "regexp" "strconv" "time" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" "github.com/containerd/containerd/log" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -140,7 +140,7 @@ func (s *store) Status(ctx context.Context, ref string) (Status, error) { return s.status(s.ingestRoot(ref)) } -func (s *store) ListStatuses(ctx context.Context, re string) ([]Status, error) { +func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]Status, error) { fp, err := os.Open(filepath.Join(s.root, "ingest")) if err != nil { return nil, err @@ -153,7 +153,7 @@ func (s *store) ListStatuses(ctx context.Context, re string) ([]Status, error) { return nil, err } - rec, err := regexp.Compile(re) + filter, err := filters.ParseAll(fs...) if err != nil { return nil, err } @@ -178,11 +178,9 @@ func (s *store) ListStatuses(ctx context.Context, re string) ([]Status, error) { continue } - if !rec.MatchString(stat.Ref) { - continue + if filter.Match(adaptStatus(stat)) { + active = append(active, stat) } - - active = append(active, stat) } return active, nil @@ -210,6 +208,20 @@ func (s *store) status(ingestPath string) (Status, error) { }, nil } +func adaptStatus(status Status) filters.Adaptor { + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + switch fieldpath[0] { + case "ref": + return status.Ref, true + } + + return "", false + }) +} + // total attempts to resolve the total expected size for the write. func (s *store) total(ingestPath string) int64 { totalS, err := readFileString(filepath.Join(ingestPath, "total")) diff --git a/metadata/adaptors.go b/metadata/adaptors.go index 25aab0323..e3df7df80 100644 --- a/metadata/adaptors.go +++ b/metadata/adaptors.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/images" ) @@ -69,6 +70,20 @@ func adaptContainer(o interface{}) filters.Adaptor { }) } +func adaptContentStatus(status content.Status) filters.Adaptor { + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + switch fieldpath[0] { + case "ref": + return status.Ref, true + } + + return "", false + }) +} + func checkMap(fieldpath []string, m map[string]string) (string, bool) { if len(m) == 0 { return "", false diff --git a/metadata/content.go b/metadata/content.go index 74e6add54..16aa0b3da 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -4,11 +4,11 @@ import ( "context" "encoding/binary" "io" - "regexp" "github.com/boltdb/bolt" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" "github.com/containerd/containerd/namespaces" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -109,21 +109,18 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { }) } -func (cs *contentStore) ListStatuses(ctx context.Context, re string) ([]content.Status, error) { +func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } - var rec *regexp.Regexp - if re != "" { - rec, err = regexp.Compile(re) - if err != nil { - return nil, err - } + filter, err := filters.ParseAll(fs...) + if err != nil { + return nil, err } - var brefs []string + brefs := map[string]string{} if err := view(ctx, cs.db, func(tx *bolt.Tx) error { bkt := getIngestBucket(tx, ns) if bkt == nil { @@ -131,9 +128,8 @@ func (cs *contentStore) ListStatuses(ctx context.Context, re string) ([]content. } return bkt.ForEach(func(k, v []byte) error { - if rec == nil || rec.Match(k) { - brefs = append(brefs, string(v)) - } + // TODO(dmcgowan): match name and potentially labels here + brefs[string(k)] = string(v) return nil }) }); err != nil { @@ -141,7 +137,7 @@ func (cs *contentStore) ListStatuses(ctx context.Context, re string) ([]content. } statuses := make([]content.Status, 0, len(brefs)) - for _, bref := range brefs { + for k, bref := range brefs { status, err := cs.Store.Status(ctx, bref) if err != nil { if errdefs.IsNotFound(err) { @@ -149,9 +145,11 @@ func (cs *contentStore) ListStatuses(ctx context.Context, re string) ([]content. } return nil, err } - status.Ref = trimKey(status.Ref) + status.Ref = k - statuses = append(statuses, status) + if filter.Match(adaptContentStatus(status)) { + statuses = append(statuses, status) + } } return statuses, nil diff --git a/services/content/service.go b/services/content/service.go index d84cc529b..affdc4386 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -243,9 +243,9 @@ func (s *Service) Status(ctx context.Context, req *api.StatusRequest) (*api.Stat } func (s *Service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) { - statuses, err := s.store.ListStatuses(ctx, req.Filter) + statuses, err := s.store.ListStatuses(ctx, req.Filters...) if err != nil { - return nil, errdefs.ToGRPCf(err, "could not get status for filter %q", req.Filter) + return nil, errdefs.ToGRPC(err) } var resp api.ListStatusesResponse diff --git a/services/content/store.go b/services/content/store.go index 58172d884..f03d429f9 100644 --- a/services/content/store.go +++ b/services/content/store.go @@ -113,9 +113,9 @@ func (rs *remoteStore) Status(ctx context.Context, ref string) (content.Status, }, nil } -func (rs *remoteStore) ListStatuses(ctx context.Context, filter string) ([]content.Status, error) { +func (rs *remoteStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{ - Filter: filter, + Filters: filters, }) if err != nil { return nil, errdefs.FromGRPC(err)