diff --git a/content/local/store_test.go b/content/local/store_test.go index 54e90c7cf..86e284a77 100644 --- a/content/local/store_test.go +++ b/content/local/store_test.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/testsuite" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/pkg/testutil" "github.com/opencontainers/go-digest" @@ -174,7 +175,9 @@ func TestContentWriter(t *testing.T) { // now, attempt to write the same data again checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p)))) - if err := cw.Commit(ctx, int64(len(p)), expected); err != nil { + if err := cw.Commit(ctx, int64(len(p)), expected); err == nil { + t.Fatal("expected already exists error") + } else if !errdefs.IsAlreadyExists(err) { t.Fatal(err) } diff --git a/content/local/writer.go b/content/local/writer.go index a6579a9d2..10df4a4c5 100644 --- a/content/local/writer.go +++ b/content/local/writer.go @@ -132,11 +132,11 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, // clean up!! defer os.RemoveAll(w.path) + if _, err := os.Stat(target); err == nil { + // collision with the target file! + return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst) + } if err := os.Rename(ingest, target); err != nil { - if os.IsExist(err) { - // collision with the target file! - return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst) - } return err } commitTime := time.Now() diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 366926c16..908facbdf 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -30,6 +30,7 @@ import ( "time" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/pkg/testutil" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -41,6 +42,7 @@ import ( func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, root string) (context.Context, content.Store, func() error, error)) { t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) t.Run("UpdateStatus", makeTest(t, name, storeFn, checkUpdateStatus)) + t.Run("CommitExists", makeTest(t, name, storeFn, checkCommitExists)) t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter)) t.Run("ResumeTruncate", makeTest(t, name, storeFn, checkResume(resumeTruncate))) t.Run("ResumeDiscard", makeTest(t, name, storeFn, checkResume(resumeDiscard))) @@ -281,6 +283,39 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) { } } +func checkCommitExists(ctx context.Context, t *testing.T, cs content.Store) { + c1, d1 := createContent(256) + if err := content.WriteBlob(ctx, cs, "c1", bytes.NewReader(c1), ocispec.Descriptor{Digest: d1}); err != nil { + t.Fatal(err) + } + + for i, tc := range []struct { + expected digest.Digest + }{ + { + expected: d1, + }, + {}, + } { + w, err := cs.Writer(ctx, content.WithRef(fmt.Sprintf("c1-commitexists-%d", i))) + if err != nil { + t.Fatal(err) + } + if _, err := w.Write(c1); err != nil { + w.Close() + t.Fatal(err) + } + err = w.Commit(ctx, int64(len(c1)), tc.expected) + w.Close() + if err == nil { + t.Errorf("(%d) Expected already exists error", i) + } else if !errdefs.IsAlreadyExists(err) { + t.Fatalf("(%d) Unexpected error: %+v", i, err) + } + + } +} + func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) { c1, d1 := createContent(256) @@ -353,7 +388,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) { func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { c1, d1 := createContent(256) - w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1})) + w1, err := cs.Writer(ctx, content.WithRef("c1-checklabels"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1})) if err != nil { t.Fatal(err) } diff --git a/diff/walking/differ.go b/diff/walking/differ.go index 462d70856..f06aa016f 100644 --- a/diff/walking/differ.go +++ b/diff/walking/differ.go @@ -139,7 +139,9 @@ func (s *walkingDiff) Compare(ctx context.Context, lower, upper []mount.Mount, o dgst := cw.Digest() if err := cw.Commit(ctx, 0, dgst, commitopts...); err != nil { - return errors.Wrap(err, "failed to commit") + if !errdefs.IsAlreadyExists(err) { + return errors.Wrap(err, "failed to commit") + } } info, err := s.store.Info(ctx, dgst) diff --git a/metadata/buckets.go b/metadata/buckets.go index a7af0176c..51d40a7f3 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -164,11 +164,11 @@ func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Buck } func createBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) { - bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob) if err != nil { return nil, err } - return bkt, nil + return bkt.CreateBucket([]byte(dgst.String())) } func getBlobsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { diff --git a/metadata/content.go b/metadata/content.go index 088f4ba27..ac4440060 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -592,9 +592,6 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, } size = nw.desc.Size actual = nw.desc.Digest - if getBlobBucket(tx, nw.namespace, actual) != nil { - return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) - } } else { status, err := nw.w.Status() if err != nil { @@ -606,18 +603,16 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, size = status.Offset actual = nw.w.Digest() - if err := nw.w.Commit(ctx, size, expected); err != nil { - if !errdefs.IsAlreadyExists(err) { - return "", err - } - if getBlobBucket(tx, nw.namespace, actual) != nil { - return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) - } + if err := nw.w.Commit(ctx, size, expected); err != nil && !errdefs.IsAlreadyExists(err) { + return "", err } } bkt, err := createBlobBucket(tx, nw.namespace, actual) if err != nil { + if err == bolt.ErrBucketExists { + return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) + } return "", err } diff --git a/services/content/service.go b/services/content/service.go index a27e8ee98..68d3920ff 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -213,7 +213,7 @@ func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServ _, err = io.CopyBuffer( &readResponseWriter{session: session}, io.NewSectionReader(ra, offset, size), *p) - return err + return errdefs.ToGRPC(err) } // readResponseWriter is a writer that places the output into ReadContentRequest messages. @@ -420,7 +420,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { // maintain the offset as append only, we just issue the write. n, err := wr.Write(req.Data) if err != nil { - return err + return errdefs.ToGRPC(err) } if n != len(req.Data) { @@ -438,7 +438,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { opts = append(opts, content.WithLabels(req.Labels)) } if err := wr.Commit(ctx, total, expected, opts...); err != nil { - return err + return errdefs.ToGRPC(err) } } diff --git a/task.go b/task.go index 750075f8d..6806e1162 100644 --- a/task.go +++ b/task.go @@ -607,8 +607,11 @@ func writeContent(ctx context.Context, store content.Ingester, mediaType, ref st if err != nil { return d, err } + if err := writer.Commit(ctx, size, "", opts...); err != nil { - return d, err + if !errdefs.IsAlreadyExists(err) { + return d, err + } } return v1.Descriptor{ MediaType: mediaType,