diff --git a/client_test.go b/client_test.go index 06fc1da4f..4b8d09bc8 100644 --- a/client_test.go +++ b/client_test.go @@ -214,7 +214,7 @@ func TestImagePullAllPlatforms(t *testing.T) { } // check if childless data type has blob in content store for _, desc := range children { - ra, err := cs.ReaderAt(ctx, desc.Digest) + ra, err := cs.ReaderAt(ctx, desc) if err != nil { t.Fatal(err) } @@ -275,7 +275,7 @@ func TestImagePullSomePlatforms(t *testing.T) { // check if childless data type has blob in content store for _, desc := range children { - ra, err := cs.ReaderAt(ctx, desc.Digest) + ra, err := cs.ReaderAt(ctx, desc) if err != nil { t.Fatal(err) } diff --git a/cmd/ctr/commands/content/content.go b/cmd/ctr/commands/content/content.go index a5cc90f3b..2eee81507 100644 --- a/cmd/ctr/commands/content/content.go +++ b/cmd/ctr/commands/content/content.go @@ -72,7 +72,7 @@ var ( } defer cancel() cs := client.ContentStore() - ra, err := cs.ReaderAt(ctx, dgst) + ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst}) if err != nil { return err } @@ -121,7 +121,7 @@ var ( // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // all data to be written in a single invocation. Allow multiple writes // to the same transaction key followed by a commit. - return content.WriteBlob(ctx, cs, ref, os.Stdin, expectedSize, expectedDigest) + return content.WriteBlob(ctx, cs, ref, os.Stdin, ocispec.Descriptor{Size: expectedSize, Digest: expectedDigest}) }, } @@ -314,7 +314,7 @@ var ( } defer cancel() cs := client.ContentStore() - ra, err := cs.ReaderAt(ctx, dgst) + ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst}) if err != nil { return err } @@ -326,7 +326,7 @@ var ( } defer nrc.Close() - wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key? + wr, err := cs.Writer(ctx, content.WithRef("edit-"+object)) // TODO(stevvooe): Choose a better key? if err != nil { return err } @@ -482,7 +482,7 @@ var ( Size: info.Size, } - ra, err := cs.ReaderAt(ctx, dgst) + ra, err := cs.ReaderAt(ctx, desc) if err != nil { return err } diff --git a/cmd/ctr/commands/snapshots/snapshots.go b/cmd/ctr/commands/snapshots/snapshots.go index b720104eb..f8b16bce5 100644 --- a/cmd/ctr/commands/snapshots/snapshots.go +++ b/cmd/ctr/commands/snapshots/snapshots.go @@ -167,7 +167,7 @@ var diffCommand = cli.Command{ } } - ra, err := client.ContentStore().ReaderAt(ctx, desc.Digest) + ra, err := client.ContentStore().ReaderAt(ctx, desc) if err != nil { return err } diff --git a/container_opts_unix.go b/container_opts_unix.go index 743688895..a4935b2b4 100644 --- a/container_opts_unix.go +++ b/container_opts_unix.go @@ -36,9 +36,9 @@ import ( "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/gogo/protobuf/proto" protobuf "github.com/gogo/protobuf/types" - digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/specs-go/v1" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -50,10 +50,9 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts { return func(ctx context.Context, client *Client, c *containers.Container) error { var ( desc = im.Target() - id = desc.Digest store = client.ContentStore() ) - index, err := decodeIndex(ctx, store, id) + index, err := decodeIndex(ctx, store, desc) if err != nil { return err } @@ -80,7 +79,7 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts { } c.Image = index.Annotations["image.name"] case images.MediaTypeContainerd1CheckpointConfig: - data, err := content.ReadBlob(ctx, store, m.Digest) + data, err := content.ReadBlob(ctx, store, m) if err != nil { return errors.Wrap(err, "unable to read checkpoint config") } @@ -113,7 +112,7 @@ func WithTaskCheckpoint(im Image) NewTaskOpts { return func(ctx context.Context, c *Client, info *TaskInfo) error { desc := im.Target() id := desc.Digest - index, err := decodeIndex(ctx, c.ContentStore(), id) + index, err := decodeIndex(ctx, c.ContentStore(), desc) if err != nil { return err } @@ -131,9 +130,9 @@ func WithTaskCheckpoint(im Image) NewTaskOpts { } } -func decodeIndex(ctx context.Context, store content.Provider, id digest.Digest) (*v1.Index, error) { +func decodeIndex(ctx context.Context, store content.Provider, desc ocispec.Descriptor) (*v1.Index, error) { var index v1.Index - p, err := content.ReadBlob(ctx, store, id) + p, err := content.ReadBlob(ctx, store, desc) if err != nil { return nil, err } diff --git a/content/content.go b/content/content.go index de9dd48f5..aabf4c8f3 100644 --- a/content/content.go +++ b/content/content.go @@ -22,6 +22,7 @@ import ( "time" "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) // ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer @@ -33,12 +34,16 @@ type ReaderAt interface { // Provider provides a reader interface for specific content type Provider interface { - ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error) + // ReaderAt only requires desc.Digest to be set. + // Other fields in the descriptor may be used internally for resolving + // the location of the actual data. + ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error) } // Ingester writes content type Ingester interface { - Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error) + // Some implementations require WithRef to be included in opts. + Writer(ctx context.Context, opts ...WriterOpt) (Writer, error) } // Info holds content specific information @@ -142,3 +147,33 @@ func WithLabels(labels map[string]string) Opt { return nil } } + +// WriterOpts is internally used by WriterOpt. +type WriterOpts struct { + Ref string + Desc ocispec.Descriptor +} + +// WriterOpt is used for passing options to Ingester.Writer. +type WriterOpt func(*WriterOpts) error + +// WithDescriptor specifies an OCI descriptor. +// Writer may optionally use the descriptor internally for resolving +// the location of the actual data. +// Write does not require any field of desc to be set. +// If the data size is unknown, desc.Size should be set to 0. +// Some implementations may also accept negative values as "unknown". +func WithDescriptor(desc ocispec.Descriptor) WriterOpt { + return func(opts *WriterOpts) error { + opts.Desc = desc + return nil + } +} + +// WithRef specifies a ref string. +func WithRef(ref string) WriterOpt { + return func(opts *WriterOpts) error { + opts.Ref = ref + return nil + } +} diff --git a/content/helpers.go b/content/helpers.go index 3b0de7ac1..819b7ea1e 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -45,8 +46,8 @@ func NewReader(ra ReaderAt) io.Reader { // ReadBlob retrieves the entire contents of the blob from the provider. // // Avoid using this for large blobs, such as layers. -func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) { - ra, err := provider.ReaderAt(ctx, dgst) +func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) { + ra, err := provider.ReaderAt(ctx, desc) if err != nil { return nil, err } @@ -65,8 +66,8 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt // This is useful when the digest and size are known beforehand. // // Copy is buffered, so no need to wrap reader in buffered io. -func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { - cw, err := OpenWriter(ctx, cs, ref, size, expected) +func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error { + cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc)) if err != nil { if !errdefs.IsAlreadyExists(err) { return err @@ -76,19 +77,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } defer cw.Close() - return Copy(ctx, cw, r, size, expected, opts...) + return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...) } // OpenWriter opens a new writer for the given reference, retrying if the writer // is locked until the reference is available or returns an error. -func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) { +func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) { var ( cw Writer err error retry = 16 ) for { - cw, err = cs.Writer(ctx, ref, size, expected) + cw, err = cs.Writer(ctx, opts...) if err != nil { if !errdefs.IsUnavailable(err) { return nil, err diff --git a/content/local/store.go b/content/local/store.go index 69437dfd1..11dfd61a3 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -34,6 +34,7 @@ import ( "github.com/containerd/containerd/filters" "github.com/containerd/containerd/log" digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -119,15 +120,15 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]strin } // ReaderAt returns an io.ReaderAt for the blob. -func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { - p := s.blobPath(dgst) +func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { + p := s.blobPath(desc.Digest) fi, err := os.Stat(p) if err != nil { if !os.IsNotExist(err) { return nil, err } - return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) + return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p) } fp, err := os.Open(p) @@ -136,7 +137,7 @@ func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.Reade return nil, err } - return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) + return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p) } return sizeReaderAt{size: fi.Size(), fp: fp}, nil @@ -400,11 +401,22 @@ func (s *store) total(ingestPath string) int64 { // ref at a time. // // The argument `ref` is used to uniquely identify a long-lived writer transaction. -func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { +func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + var wOpts content.WriterOpts + for _, opt := range opts { + if err := opt(&wOpts); err != nil { + return nil, err + } + } + // TODO(AkihiroSuda): we could create a random string or one calculated based on the context + // https://github.com/containerd/containerd/issues/2129#issuecomment-380255019 + if wOpts.Ref == "" { + return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty") + } var lockErr error for count := uint64(0); count < 10; count++ { time.Sleep(time.Millisecond * time.Duration(rand.Intn(1< 0 { - ra, err := nw.provider.ReaderAt(ctx, nw.expected) + if desc.Size > 0 { + ra, err := nw.provider.ReaderAt(ctx, nw.desc) if err != nil { return err } defer ra.Close() - if err := content.CopyReaderAt(w, ra, size); err != nil { + if err := content.CopyReaderAt(w, ra, desc.Size); err != nil { nw.w.Close() nw.w = nil return err @@ -544,14 +560,14 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, var actual digest.Digest if nw.w == nil { - if size != 0 && size != nw.size { - return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size) + if size != 0 && size != nw.desc.Size { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size) } - if expected != "" && expected != nw.expected { + if expected != "" && expected != nw.desc.Digest { return "", errors.Errorf("%q unexpected digest", nw.ref) } - size = nw.size - actual = nw.expected + size = nw.desc.Size + actual = nw.desc.Digest if getBlobBucket(tx, nw.namespace, actual) != nil { return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) } @@ -601,11 +617,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) { if nw.w != nil { st, err = nw.w.Status() } else { - st.Offset = nw.size - st.Total = nw.size + st.Offset = nw.desc.Size + st.Total = nw.desc.Size st.StartedAt = nw.started st.UpdatedAt = nw.started - st.Expected = nw.expected + st.Expected = nw.desc.Digest } if err == nil { st.Ref = nw.ref @@ -613,11 +629,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) { return } -func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { - if err := cs.checkAccess(ctx, dgst); err != nil { +func (cs *contentStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { + if err := cs.checkAccess(ctx, desc.Digest); err != nil { return nil, err } - return cs.Store.ReaderAt(ctx, dgst) + return cs.Store.ReaderAt(ctx, desc) } func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { diff --git a/metadata/content_test.go b/metadata/content_test.go index 272f08040..50c5f8002 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -81,7 +82,8 @@ func TestContentLeased(t *testing.T) { if err != nil { t.Fatal(err) } - if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil { + if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), + ocispec.Descriptor{Size: int64(len(blob)), Digest: expected}); err != nil { t.Fatal(err) } if err := checkContentLeased(lctx, db, expected); err != nil { @@ -93,7 +95,9 @@ func TestContentLeased(t *testing.T) { t.Fatal(err) } - if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil { + if _, err := cs.Writer(lctx, + content.WithRef("test-2"), + content.WithDescriptor(ocispec.Descriptor{Size: int64(len(blob)), Digest: expected})); err == nil { t.Fatal("expected already exist error") } else if !errdefs.IsAlreadyExists(err) { t.Fatal(err) diff --git a/metadata/db_test.go b/metadata/db_test.go index c1889c089..9cb34eb16 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -506,7 +506,9 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps case testContent: ctx := WithTransactionContext(ctx, tx) expected := digest.FromBytes(v.data) - w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected) + w, err := cs.Writer(ctx, + content.WithRef("test-ref"), + content.WithDescriptor(ocispec.Descriptor{Size: int64(len(v.data)), Digest: expected})) if err != nil { return nil, errors.Wrap(err, "failed to create writer") } diff --git a/oci/spec_opts_unix.go b/oci/spec_opts_unix.go index 04b18e2f4..27be93a20 100644 --- a/oci/spec_opts_unix.go +++ b/oci/spec_opts_unix.go @@ -117,7 +117,7 @@ func WithImageConfig(image Image) SpecOpts { ) switch ic.MediaType { case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: - p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) + p, err := content.ReadBlob(ctx, image.ContentStore(), ic) if err != nil { return err } diff --git a/oci/spec_opts_windows.go b/oci/spec_opts_windows.go index 7fe76ea5b..3688a582d 100644 --- a/oci/spec_opts_windows.go +++ b/oci/spec_opts_windows.go @@ -44,7 +44,7 @@ func WithImageConfig(image Image) SpecOpts { ) switch ic.MediaType { case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: - p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) + p, err := content.ReadBlob(ctx, image.ContentStore(), ic) if err != nil { return err } diff --git a/remotes/docker/schema1/converter.go b/remotes/docker/schema1/converter.go index 19a0d9856..c6261c5df 100644 --- a/remotes/docker/schema1/converter.go +++ b/remotes/docker/schema1/converter.go @@ -211,12 +211,12 @@ func (c *Converter) Convert(ctx context.Context, opts ...ConvertOpt) (ocispec.De } ref := remotes.MakeRefKey(ctx, desc) - if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc.Size, desc.Digest, content.WithLabels(labels)); err != nil { + if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc, content.WithLabels(labels)); err != nil { return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") } ref = remotes.MakeRefKey(ctx, config) - if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config.Size, config.Digest); err != nil { + if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config); err != nil { return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") } @@ -265,7 +265,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro size = 0 } - cw, err := content.OpenWriter(ctx, c.contentStore, ref, size, desc.Digest) + cw, err := content.OpenWriter(ctx, c.contentStore, content.WithRef(ref), content.WithDescriptor(desc)) if err != nil { if !errdefs.IsAlreadyExists(err) { return err @@ -274,7 +274,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro // TODO: Check if blob -> diff id mapping already exists // TODO: Check if blob empty label exists - ra, err := c.contentStore.ReaderAt(ctx, desc.Digest) + ra, err := c.contentStore.ReaderAt(ctx, desc) if err != nil { return err } diff --git a/remotes/handlers.go b/remotes/handlers.go index f0334d516..5c2d84ce4 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -81,7 +81,7 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { log.G(ctx).Debug("fetch") - cw, err := content.OpenWriter(ctx, ingester, MakeRefKey(ctx, desc), desc.Size, desc.Digest) + cw, err := content.OpenWriter(ctx, ingester, content.WithRef(MakeRefKey(ctx, desc)), content.WithDescriptor(desc)) if err != nil { if errdefs.IsAlreadyExists(err) { return nil @@ -141,7 +141,7 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc } defer cw.Close() - ra, err := provider.ReaderAt(ctx, desc.Digest) + ra, err := provider.ReaderAt(ctx, desc) if err != nil { return err } diff --git a/services/content/service.go b/services/content/service.go index b615e3aa0..42926890e 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -29,6 +29,7 @@ import ( "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "google.golang.org/grpc" @@ -179,7 +180,7 @@ func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServ return errdefs.ToGRPC(err) } - ra, err := s.store.ReaderAt(session.Context(), req.Digest) + ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest}) if err != nil { return errdefs.ToGRPC(err) } @@ -334,7 +335,9 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { log.G(ctx).Debug("(*service).Write started") // this action locks the writer for the session. - wr, err := s.store.Writer(ctx, ref, total, expected) + wr, err := s.store.Writer(ctx, + content.WithRef(ref), + content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected})) if err != nil { return errdefs.ToGRPC(err) } diff --git a/services/tasks/local.go b/services/tasks/local.go index 1de7a4aa2..d974445e7 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -45,6 +45,7 @@ import ( "github.com/containerd/containerd/services" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -121,7 +122,11 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) } - reader, err := l.store.ReaderAt(ctx, r.Checkpoint.Digest) + reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{ + MediaType: r.Checkpoint.MediaType, + Digest: r.Checkpoint.Digest, + Size: r.Checkpoint.Size_, + }) if err != nil { return nil, err } @@ -573,7 +578,7 @@ func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime } func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { - writer, err := l.store.Writer(ctx, ref, 0, "") + writer, err := l.store.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{MediaType: mediaType})) if err != nil { return nil, err } diff --git a/task.go b/task.go index a8f0e1f73..2ea53ed02 100644 --- a/task.go +++ b/task.go @@ -597,7 +597,7 @@ func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor } func writeContent(ctx context.Context, store content.Ingester, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) { - writer, err := store.Writer(ctx, ref, 0, "") + writer, err := store.Writer(ctx, content.WithRef(ref)) if err != nil { return d, err } diff --git a/vendor.conf b/vendor.conf index 7c052d163..bbee67b14 100644 --- a/vendor.conf +++ b/vendor.conf @@ -43,8 +43,8 @@ github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16 github.com/gotestyourself/gotestyourself 44dbf532bbf5767611f6f2a61bded572e337010a github.com/google/go-cmp v0.1.0 -# cri dependencies -github.com/containerd/cri b68fb075d49aa1c2885f45f2467142666c244f4a +# #2135: cri is temporarily forked because of circular dependency. will be fixed immediately in a follow-up PR. +github.com/containerd/cri 6e975823be192ad19f2ce7afcf6c57b88a991c30 https://github.com/AkihiroSuda/cri-containerd.git github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7 github.com/blang/semver v3.1.0 github.com/containernetworking/cni v0.6.0 diff --git a/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go b/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go index 2191e5c12..5b25bf9bd 100644 --- a/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go +++ b/vendor/github.com/containerd/cri/pkg/containerd/importer/importer.go @@ -254,23 +254,24 @@ func writeDockerSchema2Manifest(ctx context.Context, cs content.Ingester, manife for i, ch := range manifest.Layers { labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String() } - if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR, - int64(len(manifestBytes)), manifestDigest, content.WithLabels(labels)); err != nil { - return nil, err - } - desc := &ocispec.Descriptor{ + desc := ocispec.Descriptor{ MediaType: images.MediaTypeDockerSchema2Manifest, Digest: manifestDigest, Size: int64(len(manifestBytes)), } + if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR, + desc, content.WithLabels(labels)); err != nil { + return nil, err + } + if arch != "" || os != "" { desc.Platform = &ocispec.Platform{ Architecture: arch, OS: os, } } - return desc, nil + return &desc, nil } func onUntarManifestJSON(r io.Reader) ([]manifestDotJSON, error) { @@ -290,7 +291,7 @@ func onUntarLayerTar(ctx context.Context, r io.Reader, cs content.Ingester, name // name is like "foobar/layer.tar" ( guaranteed by isLayerTar() ) split := strings.Split(name, "/") // note: split[0] is not expected digest here - cw, err := cs.Writer(ctx, "layer-"+split[0], size, "") + cw, err := cs.Writer(ctx, content.WithRef("layer-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size})) if err != nil { return nil, err } @@ -311,7 +312,7 @@ func onUntarDotJSON(ctx context.Context, r io.Reader, cs content.Ingester, name config.desc.Size = size // name is like "foobar.json" ( guaranteed by is DotJSON() ) split := strings.Split(name, ".") - cw, err := cs.Writer(ctx, "config-"+split[0], size, "") + cw, err := cs.Writer(ctx, content.WithRef("config-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size})) if err != nil { return nil, err } diff --git a/vendor/github.com/containerd/cri/pkg/server/events.go b/vendor/github.com/containerd/cri/pkg/server/events.go index 6837acf0a..9bf11a791 100644 --- a/vendor/github.com/containerd/cri/pkg/server/events.go +++ b/vendor/github.com/containerd/cri/pkg/server/events.go @@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) { } // start starts the event monitor which monitors and handles all container events. It returns -// a channel for the caller to wait for the event monitor to stop. start must be called after -// subscribe. -func (em *eventMonitor) start() (<-chan struct{}, error) { +// an error channel for the caller to wait for stop errors from the event monitor. +// start must be called after subscribe. +func (em *eventMonitor) start() <-chan error { + errCh := make(chan error) if em.ch == nil || em.errCh == nil { - return nil, errors.New("event channel is nil") + panic("event channel is nil") } - closeCh := make(chan struct{}) backOffCheckCh := em.backOff.start() go func() { + defer close(errCh) for { select { case e := <-em.ch: @@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { em.backOff.enBackOff(cID, evt) } case err := <-em.errCh: - logrus.WithError(err).Error("Failed to handle event stream") - close(closeCh) + // Close errCh in defer directly if there is no error. + if err != nil { + logrus.WithError(err).Errorf("Failed to handle event stream") + errCh <- err + } return case <-backOffCheckCh: cIDs := em.backOff.getExpiredContainers() @@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { } } }() - return closeCh, nil + return errCh } // stop stops the event monitor. It will close the event channel. diff --git a/vendor/github.com/containerd/cri/pkg/server/helpers.go b/vendor/github.com/containerd/cri/pkg/server/helpers.go index d5a314446..5bfdb8205 100644 --- a/vendor/github.com/containerd/cri/pkg/server/helpers.go +++ b/vendor/github.com/containerd/cri/pkg/server/helpers.go @@ -345,7 +345,7 @@ func getImageInfo(ctx context.Context, image containerd.Image) (*imageInfo, erro } id := desc.Digest.String() - rb, err := content.ReadBlob(ctx, image.ContentStore(), desc.Digest) + rb, err := content.ReadBlob(ctx, image.ContentStore(), desc) if err != nil { return nil, errors.Wrap(err, "failed to read image config from content store") } diff --git a/vendor/github.com/containerd/cri/pkg/server/service.go b/vendor/github.com/containerd/cri/pkg/server/service.go index 7fdeb61de..86ec5bb73 100644 --- a/vendor/github.com/containerd/cri/pkg/server/service.go +++ b/vendor/github.com/containerd/cri/pkg/server/service.go @@ -19,6 +19,7 @@ package server import ( "fmt" "io" + "net/http" "path/filepath" "time" @@ -179,10 +180,7 @@ func (c *criService) Run() error { // Start event handler. logrus.Info("Start event monitor") - eventMonitorCloseCh, err := c.eventMonitor.start() - if err != nil { - return errors.Wrap(err, "failed to start event monitor") - } + eventMonitorErrCh := c.eventMonitor.start() // Start snapshot stats syncer, it doesn't need to be stopped. logrus.Info("Start snapshots syncer") @@ -195,27 +193,32 @@ func (c *criService) Run() error { // Start streaming server. logrus.Info("Start streaming server") - streamServerCloseCh := make(chan struct{}) + streamServerErrCh := make(chan error) go func() { - if err := c.streamServer.Start(true); err != nil { + defer close(streamServerErrCh) + if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed { logrus.WithError(err).Error("Failed to start streaming server") + streamServerErrCh <- err } - close(streamServerCloseCh) }() // Set the server as initialized. GRPC services could start serving traffic. c.initialized.Set() + var eventMonitorErr, streamServerErr error // Stop the whole CRI service if any of the critical service exits. select { - case <-eventMonitorCloseCh: - case <-streamServerCloseCh: + case eventMonitorErr = <-eventMonitorErrCh: + case streamServerErr = <-streamServerErrCh: } if err := c.Close(); err != nil { return errors.Wrap(err, "failed to stop cri service") } - - <-eventMonitorCloseCh + // If the error is set above, err from channel must be nil here, because + // the channel is supposed to be closed. Or else, we wait and set it. + if err := <-eventMonitorErrCh; err != nil { + eventMonitorErr = err + } logrus.Info("Event monitor stopped") // There is a race condition with http.Server.Serve. // When `Close` is called at the same time with `Serve`, `Close` @@ -227,18 +230,27 @@ func (c *criService) Run() error { // is fixed. const streamServerStopTimeout = 2 * time.Second select { - case <-streamServerCloseCh: + case err := <-streamServerErrCh: + if err != nil { + streamServerErr = err + } logrus.Info("Stream server stopped") case <-time.After(streamServerStopTimeout): logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout) } + if eventMonitorErr != nil { + return errors.Wrap(eventMonitorErr, "event monitor error") + } + if streamServerErr != nil { + return errors.Wrap(streamServerErr, "stream server error") + } return nil } -// Stop stops the CRI service. +// Close stops the CRI service. +// TODO(random-liu): Make close synchronous. func (c *criService) Close() error { logrus.Info("Stop CRI service") - // TODO(random-liu): Make event monitor stop synchronous. c.eventMonitor.stop() if err := c.streamServer.Stop(); err != nil { return errors.Wrap(err, "failed to stop stream server") diff --git a/vendor/github.com/containerd/cri/vendor.conf b/vendor/github.com/containerd/cri/vendor.conf index a485d715a..c20069d51 100644 --- a/vendor/github.com/containerd/cri/vendor.conf +++ b/vendor/github.com/containerd/cri/vendor.conf @@ -4,7 +4,7 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130 github.com/containerd/console cb7008ab3d8359b78c5f464cb7cf160107ad5925 -github.com/containerd/containerd 1e8b09cfc6825f7e6349884b5f76e86c1f04a5d4 +github.com/containerd/containerd 1f8c612a6c7ef2fc8328c953fb660adce2bf0a80 https://github.com/AkihiroSuda/containerd.git github.com/containerd/continuity 2d3749b4da569ac97ca63dccba5eee4f5ee2beab github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7