From d88de4a34f5d73256f2bb33456c16f89abe54cf2 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Mon, 21 May 2018 16:31:26 +0900 Subject: [PATCH] content: change Writer/ReaderAt to take OCI This change allows implementations to resolve the location of the actual data using OCI descriptor fields such as MediaType. No OCI descriptor field is written to the store. No change on gRPC API. Signed-off-by: Akihiro Suda --- client_test.go | 4 +- cmd/ctr/commands/content/content.go | 10 +- cmd/ctr/commands/snapshots/snapshots.go | 2 +- container_opts_unix.go | 13 ++- content/content.go | 39 +++++++- content/helpers.go | 15 +-- content/local/store.go | 28 ++++-- content/local/store_test.go | 24 ++--- content/proxy/content_store.go | 21 +++-- content/testsuite/testsuite.go | 35 +++---- diff/apply/apply.go | 2 +- diff/walking/differ.go | 6 +- diff/windows/windows.go | 2 +- images/image.go | 16 ++-- images/oci/exporter.go | 2 +- images/oci/importer.go | 4 +- metadata/content.go | 94 +++++++++++-------- metadata/content_test.go | 8 +- metadata/db_test.go | 4 +- oci/spec_opts_unix.go | 2 +- oci/spec_opts_windows.go | 2 +- remotes/docker/schema1/converter.go | 8 +- remotes/handlers.go | 4 +- services/content/service.go | 7 +- services/tasks/local.go | 9 +- task.go | 2 +- vendor.conf | 4 +- .../cri/pkg/containerd/importer/importer.go | 17 ++-- .../containerd/cri/pkg/server/events.go | 20 ++-- .../containerd/cri/pkg/server/helpers.go | 2 +- .../containerd/cri/pkg/server/service.go | 40 +++++--- vendor/github.com/containerd/cri/vendor.conf | 2 +- 32 files changed, 279 insertions(+), 169 deletions(-) diff --git a/client_test.go b/client_test.go index 06fc1da4f..4b8d09bc8 100644 --- a/client_test.go +++ b/client_test.go @@ -214,7 +214,7 @@ func TestImagePullAllPlatforms(t *testing.T) { } // check if childless data type has blob in content store for _, desc := range children { - ra, err := cs.ReaderAt(ctx, desc.Digest) + ra, err := cs.ReaderAt(ctx, desc) if err != nil { t.Fatal(err) } @@ -275,7 +275,7 @@ func TestImagePullSomePlatforms(t *testing.T) { // check if childless data type has blob in content store for _, desc := range children { - ra, err := cs.ReaderAt(ctx, desc.Digest) + ra, err := cs.ReaderAt(ctx, desc) if err != nil { t.Fatal(err) } diff --git a/cmd/ctr/commands/content/content.go b/cmd/ctr/commands/content/content.go index a5cc90f3b..2eee81507 100644 --- a/cmd/ctr/commands/content/content.go +++ b/cmd/ctr/commands/content/content.go @@ -72,7 +72,7 @@ var ( } defer cancel() cs := client.ContentStore() - ra, err := cs.ReaderAt(ctx, dgst) + ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst}) if err != nil { return err } @@ -121,7 +121,7 @@ var ( // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // all data to be written in a single invocation. Allow multiple writes // to the same transaction key followed by a commit. - return content.WriteBlob(ctx, cs, ref, os.Stdin, expectedSize, expectedDigest) + return content.WriteBlob(ctx, cs, ref, os.Stdin, ocispec.Descriptor{Size: expectedSize, Digest: expectedDigest}) }, } @@ -314,7 +314,7 @@ var ( } defer cancel() cs := client.ContentStore() - ra, err := cs.ReaderAt(ctx, dgst) + ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst}) if err != nil { return err } @@ -326,7 +326,7 @@ var ( } defer nrc.Close() - wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key? + wr, err := cs.Writer(ctx, content.WithRef("edit-"+object)) // TODO(stevvooe): Choose a better key? if err != nil { return err } @@ -482,7 +482,7 @@ var ( Size: info.Size, } - ra, err := cs.ReaderAt(ctx, dgst) + ra, err := cs.ReaderAt(ctx, desc) if err != nil { return err } diff --git a/cmd/ctr/commands/snapshots/snapshots.go b/cmd/ctr/commands/snapshots/snapshots.go index b720104eb..f8b16bce5 100644 --- a/cmd/ctr/commands/snapshots/snapshots.go +++ b/cmd/ctr/commands/snapshots/snapshots.go @@ -167,7 +167,7 @@ var diffCommand = cli.Command{ } } - ra, err := client.ContentStore().ReaderAt(ctx, desc.Digest) + ra, err := client.ContentStore().ReaderAt(ctx, desc) if err != nil { return err } diff --git a/container_opts_unix.go b/container_opts_unix.go index 743688895..a4935b2b4 100644 --- a/container_opts_unix.go +++ b/container_opts_unix.go @@ -36,9 +36,9 @@ import ( "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/gogo/protobuf/proto" protobuf "github.com/gogo/protobuf/types" - digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/specs-go/v1" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -50,10 +50,9 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { var ( desc = im.Target() - id = desc.Digest store = client.ContentStore() ) - index, err := decodeIndex(ctx, store, id) + index, err := decodeIndex(ctx, store, desc) if err != nil { return err } @@ -80,7 +79,7 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts { } c.Image = index.Annotations["image.name"] case images.MediaTypeContainerd1CheckpointConfig: - data, err := content.ReadBlob(ctx, store, m.Digest) + data, err := content.ReadBlob(ctx, store, m) if err != nil { return errors.Wrap(err, "unable to read checkpoint config") } @@ -113,7 +112,7 @@ func WithTaskCheckpoint(im Image) NewTaskOpts { return func(ctx context.Context, c *Client, info *TaskInfo) error { desc := im.Target() id := desc.Digest - index, err := decodeIndex(ctx, c.ContentStore(), id) + index, err := decodeIndex(ctx, c.ContentStore(), desc) if err != nil { return err } @@ -131,9 +130,9 @@ func WithTaskCheckpoint(im Image) NewTaskOpts { } } -func decodeIndex(ctx context.Context, store content.Provider, id digest.Digest) (*v1.Index, error) { +func decodeIndex(ctx context.Context, store content.Provider, desc ocispec.Descriptor) (*v1.Index, error) { var index v1.Index - p, err := content.ReadBlob(ctx, store, id) + p, err := content.ReadBlob(ctx, store, desc) if err != nil { return nil, err } diff --git a/content/content.go b/content/content.go index de9dd48f5..aabf4c8f3 100644 --- a/content/content.go +++ b/content/content.go @@ -22,6 +22,7 @@ import ( "time" "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) // ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer @@ -33,12 +34,16 @@ type ReaderAt interface { // Provider provides a reader interface for specific content type Provider interface { - ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error) + // ReaderAt only requires desc.Digest to be set. + // Other fields in the descriptor may be used internally for resolving + // the location of the actual data. + ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error) } // Ingester writes content type Ingester interface { - Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error) + // Some implementations require WithRef to be included in opts. + Writer(ctx context.Context, opts ...WriterOpt) (Writer, error) } // Info holds content specific information @@ -142,3 +147,33 @@ func WithLabels(labels map[string]string) Opt { return nil } } + +// WriterOpts is internally used by WriterOpt. +type WriterOpts struct { + Ref string + Desc ocispec.Descriptor +} + +// WriterOpt is used for passing options to Ingester.Writer. +type WriterOpt func(*WriterOpts) error + +// WithDescriptor specifies an OCI descriptor. +// Writer may optionally use the descriptor internally for resolving +// the location of the actual data. +// Write does not require any field of desc to be set. +// If the data size is unknown, desc.Size should be set to 0. +// Some implementations may also accept negative values as "unknown". +func WithDescriptor(desc ocispec.Descriptor) WriterOpt { + return func(opts *WriterOpts) error { + opts.Desc = desc + return nil + } +} + +// WithRef specifies a ref string. +func WithRef(ref string) WriterOpt { + return func(opts *WriterOpts) error { + opts.Ref = ref + return nil + } +} diff --git a/content/helpers.go b/content/helpers.go index 3b0de7ac1..819b7ea1e 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -45,8 +46,8 @@ func NewReader(ra ReaderAt) io.Reader { // ReadBlob retrieves the entire contents of the blob from the provider. // // Avoid using this for large blobs, such as layers. -func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) { - ra, err := provider.ReaderAt(ctx, dgst) +func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) { + ra, err := provider.ReaderAt(ctx, desc) if err != nil { return nil, err } @@ -65,8 +66,8 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt // This is useful when the digest and size are known beforehand. // // Copy is buffered, so no need to wrap reader in buffered io. -func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { - cw, err := OpenWriter(ctx, cs, ref, size, expected) +func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error { + cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc)) if err != nil { if !errdefs.IsAlreadyExists(err) { return err @@ -76,19 +77,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } defer cw.Close() - return Copy(ctx, cw, r, size, expected, opts...) + return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...) } // OpenWriter opens a new writer for the given reference, retrying if the writer // is locked until the reference is available or returns an error. -func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) { +func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) { var ( cw Writer err error retry = 16 ) for { - cw, err = cs.Writer(ctx, ref, size, expected) + cw, err = cs.Writer(ctx, opts...) if err != nil { if !errdefs.IsUnavailable(err) { return nil, err diff --git a/content/local/store.go b/content/local/store.go index 69437dfd1..11dfd61a3 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -34,6 +34,7 @@ import ( "github.com/containerd/containerd/filters" "github.com/containerd/containerd/log" digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -119,15 +120,15 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]strin } // ReaderAt returns an io.ReaderAt for the blob. -func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { - p := s.blobPath(dgst) +func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { + p := s.blobPath(desc.Digest) fi, err := os.Stat(p) if err != nil { if !os.IsNotExist(err) { return nil, err } - return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) + return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p) } fp, err := os.Open(p) @@ -136,7 +137,7 @@ func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.Reade return nil, err } - return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) + return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p) } return sizeReaderAt{size: fi.Size(), fp: fp}, nil @@ -400,11 +401,22 @@ func (s *store) total(ingestPath string) int64 { // ref at a time. // // The argument `ref` is used to uniquely identify a long-lived writer transaction. -func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { +func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + var wOpts content.WriterOpts + for _, opt := range opts { + if err := opt(&wOpts); err != nil { + return nil, err + } + } + // TODO(AkihiroSuda): we could create a random string or one calculated based on the context + // https://github.com/containerd/containerd/issues/2129#issuecomment-380255019 + if wOpts.Ref == "" { + return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty") + } var lockErr error for count := uint64(0); count < 10; count++ { time.Sleep(time.Millisecond * time.Duration(rand.Intn(1< 0 { - ra, err := nw.provider.ReaderAt(ctx, nw.expected) + if desc.Size > 0 { + ra, err := nw.provider.ReaderAt(ctx, nw.desc) if err != nil { return err } defer ra.Close() - if err := content.CopyReaderAt(w, ra, size); err != nil { + if err := content.CopyReaderAt(w, ra, desc.Size); err != nil { nw.w.Close() nw.w = nil return err @@ -544,14 +560,14 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, var actual digest.Digest if nw.w == nil { - if size != 0 && size != nw.size { - return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size) + if size != 0 && size != nw.desc.Size { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size) } - if expected != "" && expected != nw.expected { + if expected != "" && expected != nw.desc.Digest { return "", errors.Errorf("%q unexpected digest", nw.ref) } - size = nw.size - actual = nw.expected + size = nw.desc.Size + actual = nw.desc.Digest if getBlobBucket(tx, nw.namespace, actual) != nil { return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) } @@ -601,11 +617,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) { if nw.w != nil { st, err = nw.w.Status() } else { - st.Offset = nw.size - st.Total = nw.size + st.Offset = nw.desc.Size + st.Total = nw.desc.Size st.StartedAt = nw.started st.UpdatedAt = nw.started - st.Expected = nw.expected + st.Expected = nw.desc.Digest } if err == nil { st.Ref = nw.ref @@ -613,11 +629,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) { return } -func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { - if err := cs.checkAccess(ctx, dgst); err != nil { +func (cs *contentStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { + if err := cs.checkAccess(ctx, desc.Digest); err != nil { return nil, err } - return cs.Store.ReaderAt(ctx, dgst) + return cs.Store.ReaderAt(ctx, desc) } func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { diff --git a/metadata/content_test.go b/metadata/content_test.go index 272f08040..50c5f8002 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -81,7 +82,8 @@ func TestContentLeased(t *testing.T) { if err != nil { t.Fatal(err) } - if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil { + if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), + ocispec.Descriptor{Size: int64(len(blob)), Digest: expected}); err != nil { t.Fatal(err) } if err := checkContentLeased(lctx, db, expected); err != nil { @@ -93,7 +95,9 @@ func TestContentLeased(t *testing.T) { t.Fatal(err) } - if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil { + if _, err := cs.Writer(lctx, + content.WithRef("test-2"), + content.WithDescriptor(ocispec.Descriptor{Size: int64(len(blob)), Digest: expected})); err == nil { t.Fatal("expected already exist error") } else if !errdefs.IsAlreadyExists(err) { t.Fatal(err) diff --git a/metadata/db_test.go b/metadata/db_test.go index c1889c089..9cb34eb16 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -506,7 +506,9 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps case testContent: ctx := WithTransactionContext(ctx, tx) expected := digest.FromBytes(v.data) - w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected) + w, err := cs.Writer(ctx, + content.WithRef("test-ref"), + content.WithDescriptor(ocispec.Descriptor{Size: int64(len(v.data)), Digest: expected})) if err != nil { return nil, errors.Wrap(err, "failed to create writer") } diff --git a/oci/spec_opts_unix.go b/oci/spec_opts_unix.go index 04b18e2f4..27be93a20 100644 --- a/oci/spec_opts_unix.go +++ b/oci/spec_opts_unix.go @@ -117,7 +117,7 @@ func WithImageConfig(image Image) SpecOpts { ) switch ic.MediaType { case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: - p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) + p, err := content.ReadBlob(ctx, image.ContentStore(), ic) if err != nil { return err } diff --git a/oci/spec_opts_windows.go b/oci/spec_opts_windows.go index 7fe76ea5b..3688a582d 100644 --- a/oci/spec_opts_windows.go +++ b/oci/spec_opts_windows.go @@ -44,7 +44,7 @@ func WithImageConfig(image Image) SpecOpts { ) switch ic.MediaType { case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: - p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) + p, err := content.ReadBlob(ctx, image.ContentStore(), ic) if err != nil { return err } diff --git a/remotes/docker/schema1/converter.go b/remotes/docker/schema1/converter.go index 19a0d9856..c6261c5df 100644 --- a/remotes/docker/schema1/converter.go +++ b/remotes/docker/schema1/converter.go @@ -211,12 +211,12 @@ func (c *Converter) Convert(ctx context.Context, opts ...ConvertOpt) (ocispec.De } ref := remotes.MakeRefKey(ctx, desc) - if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc.Size, desc.Digest, content.WithLabels(labels)); err != nil { + if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc, content.WithLabels(labels)); err != nil { return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") } ref = remotes.MakeRefKey(ctx, config) - if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config.Size, config.Digest); err != nil { + if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config); err != nil { return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") } @@ -265,7 +265,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro size = 0 } - cw, err := content.OpenWriter(ctx, c.contentStore, ref, size, desc.Digest) + cw, err := content.OpenWriter(ctx, c.contentStore, content.WithRef(ref), content.WithDescriptor(desc)) if err != nil { if !errdefs.IsAlreadyExists(err) { return err @@ -274,7 +274,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro // TODO: Check if blob -> diff id mapping already exists // TODO: Check if blob empty label exists - ra, err := c.contentStore.ReaderAt(ctx, desc.Digest) + ra, err := c.contentStore.ReaderAt(ctx, desc) if err != nil { return err } diff --git a/remotes/handlers.go b/remotes/handlers.go index f0334d516..5c2d84ce4 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -81,7 +81,7 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { log.G(ctx).Debug("fetch") - cw, err := content.OpenWriter(ctx, ingester, MakeRefKey(ctx, desc), desc.Size, desc.Digest) + cw, err := content.OpenWriter(ctx, ingester, content.WithRef(MakeRefKey(ctx, desc)), content.WithDescriptor(desc)) if err != nil { if errdefs.IsAlreadyExists(err) { return nil @@ -141,7 +141,7 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc } defer cw.Close() - ra, err := provider.ReaderAt(ctx, desc.Digest) + ra, err := provider.ReaderAt(ctx, desc) if err != nil { return err } diff --git a/services/content/service.go b/services/content/service.go index b615e3aa0..42926890e 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -29,6 +29,7 @@ import ( "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "google.golang.org/grpc" @@ -179,7 +180,7 @@ func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServ return errdefs.ToGRPC(err) } - ra, err := s.store.ReaderAt(session.Context(), req.Digest) + ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest}) if err != nil { return errdefs.ToGRPC(err) } @@ -334,7 +335,9 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { log.G(ctx).Debug("(*service).Write started") // this action locks the writer for the session. - wr, err := s.store.Writer(ctx, ref, total, expected) + wr, err := s.store.Writer(ctx, + content.WithRef(ref), + content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected})) if err != nil { return errdefs.ToGRPC(err) } diff --git a/services/tasks/local.go b/services/tasks/local.go index 1de7a4aa2..d974445e7 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -45,6 +45,7 @@ import ( "github.com/containerd/containerd/services" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -121,7 +122,11 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) } - reader, err := l.store.ReaderAt(ctx, r.Checkpoint.Digest) + reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{ + MediaType: r.Checkpoint.MediaType, + Digest: r.Checkpoint.Digest, + Size: r.Checkpoint.Size_, + }) if err != nil { return nil, err } @@ -573,7 +578,7 @@ func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime } func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { - writer, err := l.store.Writer(ctx, ref, 0, "") + writer, err := l.store.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{MediaType: mediaType})) if err != nil { return nil, err } diff --git a/task.go b/task.go index a8f0e1f73..2ea53ed02 100644 --- a/task.go +++ b/task.go @@ -597,7 +597,7 @@ func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor } func writeContent(ctx context.Context, store content.Ingester, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) { - writer, err := store.Writer(ctx, ref, 0, "") + writer, err := store.Writer(ctx, content.WithRef(ref)) if err != nil { return d, err } diff --git a/vendor.conf b/vendor.conf index 7c052d163..bbee67b14 100644 --- a/vendor.conf +++ b/vendor.conf @@ -43,8 +43,8 @@ github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16 github.com/gotestyourself/gotestyourself 44dbf532bbf5767611f6f2a61bded572e337010a github.com/google/go-cmp v0.1.0 -# cri dependencies -github.com/containerd/cri b68fb075d49aa1c2885f45f2467142666c244f4a +# #2135: cri is temporarily forked because of circular dependency. will be fixed immediately in a follow-up PR. +github.com/containerd/cri 6e975823be192ad19f2ce7afcf6c57b88a991c30 https://github.com/AkihiroSuda/cri-containerd.git github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7 github.com/blang/semver v3.1.0 github.com/containernetworking/cni v0.6.0 diff --git a/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go b/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go index 2191e5c12..5b25bf9bd 100644 --- a/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go +++ b/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go @@ -254,23 +254,24 @@ func writeDockerSchema2Manifest(ctx context.Context, cs content.Ingester, manife for i, ch := range manifest.Layers { labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String() } - if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR, - int64(len(manifestBytes)), manifestDigest, content.WithLabels(labels)); err != nil { - return nil, err - } - desc := &ocispec.Descriptor{ + desc := ocispec.Descriptor{ MediaType: images.MediaTypeDockerSchema2Manifest, Digest: manifestDigest, Size: int64(len(manifestBytes)), } + if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR, + desc, content.WithLabels(labels)); err != nil { + return nil, err + } + if arch != "" || os != "" { desc.Platform = &ocispec.Platform{ Architecture: arch, OS: os, } } - return desc, nil + return &desc, nil } func onUntarManifestJSON(r io.Reader) ([]manifestDotJSON, error) { @@ -290,7 +291,7 @@ func onUntarLayerTar(ctx context.Context, r io.Reader, cs content.Ingester, name // name is like "foobar/layer.tar" ( guaranteed by isLayerTar() ) split := strings.Split(name, "/") // note: split[0] is not expected digest here - cw, err := cs.Writer(ctx, "layer-"+split[0], size, "") + cw, err := cs.Writer(ctx, content.WithRef("layer-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size})) if err != nil { return nil, err } @@ -311,7 +312,7 @@ func onUntarDotJSON(ctx context.Context, r io.Reader, cs content.Ingester, name config.desc.Size = size // name is like "foobar.json" ( guaranteed by is DotJSON() ) split := strings.Split(name, ".") - cw, err := cs.Writer(ctx, "config-"+split[0], size, "") + cw, err := cs.Writer(ctx, content.WithRef("config-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size})) if err != nil { return nil, err } diff --git a/vendor/github.com/containerd/cri/pkg/server/events.go b/vendor/github.com/containerd/cri/pkg/server/events.go index 6837acf0a..9bf11a791 100644 --- a/vendor/github.com/containerd/cri/pkg/server/events.go +++ b/vendor/github.com/containerd/cri/pkg/server/events.go @@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) { } // start starts the event monitor which monitors and handles all container events. It returns -// a channel for the caller to wait for the event monitor to stop. start must be called after -// subscribe. -func (em *eventMonitor) start() (<-chan struct{}, error) { +// an error channel for the caller to wait for stop errors from the event monitor. +// start must be called after subscribe. +func (em *eventMonitor) start() <-chan error { + errCh := make(chan error) if em.ch == nil || em.errCh == nil { - return nil, errors.New("event channel is nil") + panic("event channel is nil") } - closeCh := make(chan struct{}) backOffCheckCh := em.backOff.start() go func() { + defer close(errCh) for { select { case e := <-em.ch: @@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { em.backOff.enBackOff(cID, evt) } case err := <-em.errCh: - logrus.WithError(err).Error("Failed to handle event stream") - close(closeCh) + // Close errCh in defer directly if there is no error. + if err != nil { + logrus.WithError(err).Errorf("Failed to handle event stream") + errCh <- err + } return case <-backOffCheckCh: cIDs := em.backOff.getExpiredContainers() @@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { } } }() - return closeCh, nil + return errCh } // stop stops the event monitor. It will close the event channel. diff --git a/vendor/github.com/containerd/cri/pkg/server/helpers.go b/vendor/github.com/containerd/cri/pkg/server/helpers.go index d5a314446..5bfdb8205 100644 --- a/vendor/github.com/containerd/cri/pkg/server/helpers.go +++ b/vendor/github.com/containerd/cri/pkg/server/helpers.go @@ -345,7 +345,7 @@ func getImageInfo(ctx context.Context, image containerd.Image) (*imageInfo, erro } id := desc.Digest.String() - rb, err := content.ReadBlob(ctx, image.ContentStore(), desc.Digest) + rb, err := content.ReadBlob(ctx, image.ContentStore(), desc) if err != nil { return nil, errors.Wrap(err, "failed to read image config from content store") } diff --git a/vendor/github.com/containerd/cri/pkg/server/service.go b/vendor/github.com/containerd/cri/pkg/server/service.go index 7fdeb61de..86ec5bb73 100644 --- a/vendor/github.com/containerd/cri/pkg/server/service.go +++ b/vendor/github.com/containerd/cri/pkg/server/service.go @@ -19,6 +19,7 @@ package server import ( "fmt" "io" + "net/http" "path/filepath" "time" @@ -179,10 +180,7 @@ func (c *criService) Run() error { // Start event handler. logrus.Info("Start event monitor") - eventMonitorCloseCh, err := c.eventMonitor.start() - if err != nil { - return errors.Wrap(err, "failed to start event monitor") - } + eventMonitorErrCh := c.eventMonitor.start() // Start snapshot stats syncer, it doesn't need to be stopped. logrus.Info("Start snapshots syncer") @@ -195,27 +193,32 @@ func (c *criService) Run() error { // Start streaming server. logrus.Info("Start streaming server") - streamServerCloseCh := make(chan struct{}) + streamServerErrCh := make(chan error) go func() { - if err := c.streamServer.Start(true); err != nil { + defer close(streamServerErrCh) + if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed { logrus.WithError(err).Error("Failed to start streaming server") + streamServerErrCh <- err } - close(streamServerCloseCh) }() // Set the server as initialized. GRPC services could start serving traffic. c.initialized.Set() + var eventMonitorErr, streamServerErr error // Stop the whole CRI service if any of the critical service exits. select { - case <-eventMonitorCloseCh: - case <-streamServerCloseCh: + case eventMonitorErr = <-eventMonitorErrCh: + case streamServerErr = <-streamServerErrCh: } if err := c.Close(); err != nil { return errors.Wrap(err, "failed to stop cri service") } - - <-eventMonitorCloseCh + // If the error is set above, err from channel must be nil here, because + // the channel is supposed to be closed. Or else, we wait and set it. + if err := <-eventMonitorErrCh; err != nil { + eventMonitorErr = err + } logrus.Info("Event monitor stopped") // There is a race condition with http.Server.Serve. // When `Close` is called at the same time with `Serve`, `Close` @@ -227,18 +230,27 @@ func (c *criService) Run() error { // is fixed. const streamServerStopTimeout = 2 * time.Second select { - case <-streamServerCloseCh: + case err := <-streamServerErrCh: + if err != nil { + streamServerErr = err + } logrus.Info("Stream server stopped") case <-time.After(streamServerStopTimeout): logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout) } + if eventMonitorErr != nil { + return errors.Wrap(eventMonitorErr, "event monitor error") + } + if streamServerErr != nil { + return errors.Wrap(streamServerErr, "stream server error") + } return nil } -// Stop stops the CRI service. +// Close stops the CRI service. +// TODO(random-liu): Make close synchronous. func (c *criService) Close() error { logrus.Info("Stop CRI service") - // TODO(random-liu): Make event monitor stop synchronous. c.eventMonitor.stop() if err := c.streamServer.Stop(); err != nil { return errors.Wrap(err, "failed to stop stream server") diff --git a/vendor/github.com/containerd/cri/vendor.conf b/vendor/github.com/containerd/cri/vendor.conf index a485d715a..c20069d51 100644 --- a/vendor/github.com/containerd/cri/vendor.conf +++ b/vendor/github.com/containerd/cri/vendor.conf @@ -4,7 +4,7 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130 github.com/containerd/console cb7008ab3d8359b78c5f464cb7cf160107ad5925 -github.com/containerd/containerd 1e8b09cfc6825f7e6349884b5f76e86c1f04a5d4 +github.com/containerd/containerd 1f8c612a6c7ef2fc8328c953fb660adce2bf0a80 https://github.com/AkihiroSuda/containerd.git github.com/containerd/continuity 2d3749b4da569ac97ca63dccba5eee4f5ee2beab github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7