From 9613acb2edf54e21396bd852cda0e96487c7509a Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 6 Sep 2017 10:19:12 -0700 Subject: [PATCH] Add context to content commit Content commit is updated to take in a context, allowing content to be committed within the same context the writer was in. This is useful when commit may be able to use more context to complete the action rather than creating its own. An example of this being useful is for the metadata implementation of content, having a context allows tests to fully create content in one database transaction by making use of the context. Signed-off-by: Derek McGowan --- cmd/ctr/content.go | 2 +- cmd/ctr/pushobject.go | 2 +- content/content.go | 2 +- content/helpers.go | 6 +++--- content/local/store_test.go | 4 ++-- content/local/writer.go | 3 ++- content/testsuite/testsuite.go | 6 +++--- differ/differ.go | 2 +- metadata/content.go | 10 +++++----- remotes/docker/pusher.go | 2 +- remotes/docker/schema1/converter.go | 2 +- remotes/handlers.go | 4 ++-- services/content/service.go | 2 +- services/content/writer.go | 3 ++- services/tasks/service.go | 2 +- task.go | 2 +- 16 files changed, 28 insertions(+), 26 deletions(-) diff --git a/cmd/ctr/content.go b/cmd/ctr/content.go index 6be04e942..4e9872ac6 100644 --- a/cmd/ctr/content.go +++ b/cmd/ctr/content.go @@ -332,7 +332,7 @@ var ( return err } - if err := wr.Commit(0, wr.Digest()); err != nil { + if err := wr.Commit(ctx, 0, wr.Digest()); err != nil { return err } diff --git a/cmd/ctr/pushobject.go b/cmd/ctr/pushobject.go index 00baf1894..be05e7681 100644 --- a/cmd/ctr/pushobject.go +++ b/cmd/ctr/pushobject.go @@ -70,7 +70,7 @@ var pushObjectCommand = cli.Command{ } // TODO: Progress reader - if err := content.Copy(cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil { + if err := content.Copy(ctx, cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil { return err } diff --git a/content/content.go b/content/content.go index d7a51a826..6dce8b2e7 100644 --- a/content/content.go +++ b/content/content.go @@ -90,7 +90,7 @@ type Writer interface { // Commit commits the blob (but no roll-back is guaranteed on an error). // size and expected can be zero-value when unknown. - Commit(size int64, expected digest.Digest, opts ...Opt) error + Commit(ctx context.Context, size int64, expected digest.Digest, opts ...Opt) error // Status returns the current state of write Status() (Status, error) diff --git a/content/helpers.go b/content/helpers.go index cb3ca7507..b6d21ccbd 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -58,7 +58,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } defer cw.Close() - return Copy(cw, r, size, expected) + return Copy(ctx, cw, r, size, expected) } // Copy copies data with the expected digest from the reader into the @@ -68,7 +68,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i // the size or digest is unknown, these values may be empty. // // Copy is buffered, so no need to wrap reader in buffered io. -func Copy(cw Writer, r io.Reader, size int64, expected digest.Digest) error { +func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest) error { ws, err := cw.Status() if err != nil { return err @@ -95,7 +95,7 @@ func Copy(cw Writer, r io.Reader, size int64, expected digest.Digest) error { return err } - if err := cw.Commit(size, expected); err != nil { + if err := cw.Commit(ctx, size, expected); err != nil { if !errdefs.IsAlreadyExists(err) { return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) } diff --git a/content/local/store_test.go b/content/local/store_test.go index 4d7afe070..02e65f5c4 100644 --- a/content/local/store_test.go +++ b/content/local/store_test.go @@ -94,7 +94,7 @@ func TestContentWriter(t *testing.T) { checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p)))) - if err := cw.Commit(int64(len(p)), expected); err != nil { + if err := cw.Commit(ctx, int64(len(p)), expected); err != nil { t.Fatal(err) } @@ -109,7 +109,7 @@ func TestContentWriter(t *testing.T) { // now, attempt to write the same data again checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p)))) - if err := cw.Commit(int64(len(p)), expected); err != nil { + if err := cw.Commit(ctx, int64(len(p)), expected); err != nil { t.Fatal(err) } diff --git a/content/local/writer.go b/content/local/writer.go index b67aa63f8..362c0e209 100644 --- a/content/local/writer.go +++ b/content/local/writer.go @@ -1,6 +1,7 @@ package local import ( + "context" "os" "path/filepath" "runtime" @@ -54,7 +55,7 @@ func (w *writer) Write(p []byte) (n int, err error) { return n, err } -func (w *writer) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { +func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { if w.fp == nil { return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot commit on closed writer") } diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index ed62c4249..5c4bc371c 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -121,7 +121,7 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store } preCommit := time.Now() - if err := s.writer.Commit(0, ""); err != nil { + if err := s.writer.Commit(ctx, 0, ""); err != nil { t.Fatal(err) } postCommit := time.Now() @@ -201,7 +201,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { } preCommit := time.Now() - if err := w1.Commit(0, ""); err != nil { + if err := w1.Commit(ctx, 0, ""); err != nil { t.Fatalf("Commit failed: %+v", err) } postCommit := time.Now() @@ -235,7 +235,7 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { } preCommit := time.Now() - if err := w1.Commit(0, "", content.WithLabels(labels)); err != nil { + if err := w1.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil { t.Fatalf("Commit failed: %+v", err) } postCommit := time.Now() diff --git a/differ/differ.go b/differ/differ.go index c2a4672cb..60e5d3eea 100644 --- a/differ/differ.go +++ b/differ/differ.go @@ -179,7 +179,7 @@ func (s *walkingDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount } dgst := cw.Digest() - if err := cw.Commit(0, dgst, opts...); err != nil { + if err := cw.Commit(ctx, 0, dgst, opts...); err != nil { return emptyDesc, errors.Wrap(err, "failed to commit") } diff --git a/metadata/content.go b/metadata/content.go index 91dd33c22..744cf2538 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -352,19 +352,19 @@ type namespacedWriter struct { db *bolt.DB } -func (nw *namespacedWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { - return nw.db.Update(func(tx *bolt.Tx) error { +func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + return update(ctx, nw.db, func(tx *bolt.Tx) error { bkt := getIngestBucket(tx, nw.namespace) if bkt != nil { if err := bkt.Delete([]byte(nw.ref)); err != nil { return err } } - return nw.commit(tx, size, expected, opts...) + return nw.commit(ctx, tx, size, expected, opts...) }) } -func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error { +func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error { var base content.Info for _, opt := range opts { if err := opt(&base); err != nil { @@ -382,7 +382,7 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige actual := nw.Writer.Digest() - if err := nw.Writer.Commit(size, expected); err != nil { + if err := nw.Writer.Commit(ctx, size, expected); err != nil { if !errdefs.IsAlreadyExists(err) { return err } diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index f90eace51..384490565 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -215,7 +215,7 @@ func (pw *pushWriter) Digest() digest.Digest { return pw.expected } -func (pw *pushWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { +func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { // Check whether read has already thrown an error if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe { return errors.Wrap(err, "pipe error before commit") diff --git a/remotes/docker/schema1/converter.go b/remotes/docker/schema1/converter.go index 1003ea774..8e451a214 100644 --- a/remotes/docker/schema1/converter.go +++ b/remotes/docker/schema1/converter.go @@ -252,7 +252,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro eg.Go(func() error { defer pw.Close() - return content.Copy(cw, io.TeeReader(rc, pw), desc.Size, desc.Digest) + return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest) }) if err := eg.Wait(); err != nil { diff --git a/remotes/handlers.go b/remotes/handlers.go index d21274b32..3c756ecd3 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -101,7 +101,7 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc } defer rc.Close() - return content.Copy(cw, rc, desc.Size, desc.Digest) + return content.Copy(ctx, cw, rc, desc.Size, desc.Digest) } // PushHandler returns a handler that will push all content from the provider @@ -139,5 +139,5 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc defer ra.Close() rd := io.NewSectionReader(ra, 0, desc.Size) - return content.Copy(cw, rd, desc.Size, desc.Digest) + return content.Copy(ctx, cw, rd, desc.Size, desc.Digest) } diff --git a/services/content/service.go b/services/content/service.go index 2ff4b908a..7a5e6d6d0 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -424,7 +424,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { if req.Labels != nil { opts = append(opts, content.WithLabels(req.Labels)) } - if err := wr.Commit(total, expected, opts...); err != nil { + if err := wr.Commit(ctx, total, expected, opts...); err != nil { return err } } diff --git a/services/content/writer.go b/services/content/writer.go index ed94e4167..cb45957f0 100644 --- a/services/content/writer.go +++ b/services/content/writer.go @@ -1,6 +1,7 @@ package content import ( + "context" "io" contentapi "github.com/containerd/containerd/api/services/content/v1" @@ -80,7 +81,7 @@ func (rw *remoteWriter) Write(p []byte) (n int, err error) { return } -func (rw *remoteWriter) Commit(size int64, expected digest.Digest, opts ...content.Opt) error { +func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { var base content.Info for _, opt := range opts { if err := opt(&base); err != nil { diff --git a/services/tasks/service.go b/services/tasks/service.go index fde164d4b..deabdcf64 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -465,7 +465,7 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io. if err != nil { return nil, err } - if err := writer.Commit(0, ""); err != nil { + if err := writer.Commit(ctx, 0, ""); err != nil { return nil, err } return &types.Descriptor{ diff --git a/task.go b/task.go index 787db09fc..aff7d9d2c 100644 --- a/task.go +++ b/task.go @@ -528,7 +528,7 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin if err != nil { return d, err } - if err := writer.Commit(size, ""); err != nil { + if err := writer.Commit(ctx, size, ""); err != nil { return d, err } return v1.Descriptor{