diff --git a/cmd/ctr/content.go b/cmd/ctr/content.go index 6be04e942..4e9872ac6 100644 --- a/cmd/ctr/content.go +++ b/cmd/ctr/content.go @@ -332,7 +332,7 @@ var ( return err } - if err := wr.Commit(0, wr.Digest()); err != nil { + if err := wr.Commit(ctx, 0, wr.Digest()); err != nil { return err } diff --git a/cmd/ctr/pushobject.go b/cmd/ctr/pushobject.go index 00baf1894..be05e7681 100644 --- a/cmd/ctr/pushobject.go +++ b/cmd/ctr/pushobject.go @@ -70,7 +70,7 @@ var pushObjectCommand = cli.Command{ } // TODO: Progress reader - if err := content.Copy(cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil { + if err := content.Copy(ctx, cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil { return err } diff --git a/content/content.go b/content/content.go index d7a51a826..6dce8b2e7 100644 --- a/content/content.go +++ b/content/content.go @@ -90,7 +90,7 @@ type Writer interface { // Commit commits the blob (but no roll-back is guaranteed on an error). // size and expected can be zero-value when unknown. - Commit(size int64, expected digest.Digest, opts ...Opt) error + Commit(ctx context.Context, size int64, expected digest.Digest, opts ...Opt) error // Status returns the current state of write Status() (Status, error) diff --git a/content/helpers.go b/content/helpers.go index cb3ca7507..b6d21ccbd 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -58,7 +58,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } defer cw.Close() - return Copy(cw, r, size, expected) + return Copy(ctx, cw, r, size, expected) } // Copy copies data with the expected digest from the reader into the @@ -68,7 +68,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i // the size or digest is unknown, these values may be empty. // // Copy is buffered, so no need to wrap reader in buffered io. -func Copy(cw Writer, r io.Reader, size int64, expected digest.Digest) error { +func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest) error { ws, err := cw.Status() if err != nil { return err @@ -95,7 +95,7 @@ func Copy(cw Writer, r io.Reader, size int64, expected digest.Digest) error { return err } - if err := cw.Commit(size, expected); err != nil { + if err := cw.Commit(ctx, size, expected); err != nil { if !errdefs.IsAlreadyExists(err) { return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) } diff --git a/content/local/store_test.go b/content/local/store_test.go index 4d7afe070..02e65f5c4 100644 --- a/content/local/store_test.go +++ b/content/local/store_test.go @@ -94,7 +94,7 @@ func TestContentWriter(t *testing.T) { checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p)))) - if err := cw.Commit(int64(len(p)), expected); err != nil { + if err := cw.Commit(ctx, int64(len(p)), expected); err != nil { t.Fatal(err) } @@ -109,7 +109,7 @@ func TestContentWriter(t *testing.T) { // now, attempt to write the same data again checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p)))) - if err := cw.Commit(int64(len(p)), expected); err != nil { + if err := cw.Commit(ctx, int64(len(p)), expected); err != nil { t.Fatal(err) } diff --git a/content/local/writer.go b/content/local/writer.go index b67aa63f8..362c0e209 100644 --- a/content/local/writer.go +++ b/content/local/writer.go @@ -1,6 +1,7 @@ package local import ( + "context" "os" "path/filepath" "runtime" @@ -54,7 +55,7 @@ func (w *writer) Write(p []byte) (n int, err error) { return n, err } -func (w *writer) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { +func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { if w.fp == nil { return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot commit on closed writer") } diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index ed62c4249..5c4bc371c 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -121,7 +121,7 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store } preCommit := time.Now() - if err := s.writer.Commit(0, ""); err != nil { + if err := s.writer.Commit(ctx, 0, ""); err != nil { t.Fatal(err) } postCommit := time.Now() @@ -201,7 +201,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { } preCommit := time.Now() - if err := w1.Commit(0, ""); err != nil { + if err := w1.Commit(ctx, 0, ""); err != nil { t.Fatalf("Commit failed: %+v", err) } postCommit := time.Now() @@ -235,7 +235,7 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { } preCommit := time.Now() - if err := w1.Commit(0, "", content.WithLabels(labels)); err != nil { + if err := w1.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil { t.Fatalf("Commit failed: %+v", err) } postCommit := time.Now() diff --git a/differ/differ.go b/differ/differ.go index c2a4672cb..60e5d3eea 100644 --- a/differ/differ.go +++ b/differ/differ.go @@ -179,7 +179,7 @@ func (s *walkingDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount } dgst := cw.Digest() - if err := cw.Commit(0, dgst, opts...); err != nil { + if err := cw.Commit(ctx, 0, dgst, opts...); err != nil { return emptyDesc, errors.Wrap(err, "failed to commit") } diff --git a/metadata/content.go b/metadata/content.go index 91dd33c22..744cf2538 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -352,19 +352,19 @@ type namespacedWriter struct { db *bolt.DB } -func (nw *namespacedWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { - return nw.db.Update(func(tx *bolt.Tx) error { +func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + return update(ctx, nw.db, func(tx *bolt.Tx) error { bkt := getIngestBucket(tx, nw.namespace) if bkt != nil { if err := bkt.Delete([]byte(nw.ref)); err != nil { return err } } - return nw.commit(tx, size, expected, opts...) + return nw.commit(ctx, tx, size, expected, opts...) }) } -func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error { +func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error { var base content.Info for _, opt := range opts { if err := opt(&base); err != nil { @@ -382,7 +382,7 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige actual := nw.Writer.Digest() - if err := nw.Writer.Commit(size, expected); err != nil { + if err := nw.Writer.Commit(ctx, size, expected); err != nil { if !errdefs.IsAlreadyExists(err) { return err } diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index bd080b0cc..abaaac386 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -221,7 +221,7 @@ func (pw *pushWriter) Digest() digest.Digest { return pw.expected } -func (pw *pushWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { +func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { // Check whether read has already thrown an error if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe { return errors.Wrap(err, "pipe error before commit") diff --git a/remotes/docker/schema1/converter.go b/remotes/docker/schema1/converter.go index 1003ea774..8e451a214 100644 --- a/remotes/docker/schema1/converter.go +++ b/remotes/docker/schema1/converter.go @@ -252,7 +252,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro eg.Go(func() error { defer pw.Close() - return content.Copy(cw, io.TeeReader(rc, pw), desc.Size, desc.Digest) + return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest) }) if err := eg.Wait(); err != nil { diff --git a/remotes/handlers.go b/remotes/handlers.go index d21274b32..3c756ecd3 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -101,7 +101,7 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc } defer rc.Close() - return content.Copy(cw, rc, desc.Size, desc.Digest) + return content.Copy(ctx, cw, rc, desc.Size, desc.Digest) } // PushHandler returns a handler that will push all content from the provider @@ -139,5 +139,5 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc defer ra.Close() rd := io.NewSectionReader(ra, 0, desc.Size) - return content.Copy(cw, rd, desc.Size, desc.Digest) + return content.Copy(ctx, cw, rd, desc.Size, desc.Digest) } diff --git a/services/content/service.go b/services/content/service.go index 2ff4b908a..7a5e6d6d0 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -424,7 +424,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { if req.Labels != nil { opts = append(opts, content.WithLabels(req.Labels)) } - if err := wr.Commit(total, expected, opts...); err != nil { + if err := wr.Commit(ctx, total, expected, opts...); err != nil { return err } } diff --git a/services/content/writer.go b/services/content/writer.go index ed94e4167..cb45957f0 100644 --- a/services/content/writer.go +++ b/services/content/writer.go @@ -1,6 +1,7 @@ package content import ( + "context" "io" contentapi "github.com/containerd/containerd/api/services/content/v1" @@ -80,7 +81,7 @@ func (rw *remoteWriter) Write(p []byte) (n int, err error) { return } -func (rw *remoteWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { +func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { var base content.Info for _, opt := range opts { if err := opt(&base); err != nil { diff --git a/services/tasks/service.go b/services/tasks/service.go index 0d51cd5b6..e6e5dc45f 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -464,7 +464,7 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io. if err != nil { return nil, err } - if err := writer.Commit(0, ""); err != nil { + if err := writer.Commit(ctx, 0, ""); err != nil { return nil, err } return &types.Descriptor{ diff --git a/task.go b/task.go index 93a57e545..2f9b0b06d 100644 --- a/task.go +++ b/task.go @@ -535,7 +535,7 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin if err != nil { return d, err } - if err := writer.Commit(size, ""); err != nil { + if err := writer.Commit(ctx, size, ""); err != nil { return d, err } return v1.Descriptor{