Merge pull request #2642 from dmcgowan/fix-commit-already-exists
Fix content store bug when already exists
This commit is contained in:
		| @@ -35,6 +35,7 @@ import ( | ||||
|  | ||||
| 	"github.com/containerd/containerd/content" | ||||
| 	"github.com/containerd/containerd/content/testsuite" | ||||
| 	"github.com/containerd/containerd/errdefs" | ||||
| 	"github.com/containerd/containerd/pkg/testutil" | ||||
|  | ||||
| 	"github.com/opencontainers/go-digest" | ||||
| @@ -174,7 +175,9 @@ func TestContentWriter(t *testing.T) { | ||||
|  | ||||
| 	// now, attempt to write the same data again | ||||
| 	checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p)))) | ||||
| 	if err := cw.Commit(ctx, int64(len(p)), expected); err != nil { | ||||
| 	if err := cw.Commit(ctx, int64(len(p)), expected); err == nil { | ||||
| 		t.Fatal("expected already exists error") | ||||
| 	} else if !errdefs.IsAlreadyExists(err) { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -132,11 +132,11 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, | ||||
| 	// clean up!! | ||||
| 	defer os.RemoveAll(w.path) | ||||
|  | ||||
| 	if err := os.Rename(ingest, target); err != nil { | ||||
| 		if os.IsExist(err) { | ||||
| 	if _, err := os.Stat(target); err == nil { | ||||
| 		// collision with the target file! | ||||
| 		return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst) | ||||
| 	} | ||||
| 	if err := os.Rename(ingest, target); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	commitTime := time.Now() | ||||
|   | ||||
| @@ -30,6 +30,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/containerd/containerd/content" | ||||
| 	"github.com/containerd/containerd/errdefs" | ||||
| 	"github.com/containerd/containerd/pkg/testutil" | ||||
| 	digest "github.com/opencontainers/go-digest" | ||||
| 	ocispec "github.com/opencontainers/image-spec/specs-go/v1" | ||||
| @@ -41,6 +42,7 @@ import ( | ||||
| func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, root string) (context.Context, content.Store, func() error, error)) { | ||||
| 	t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) | ||||
| 	t.Run("UpdateStatus", makeTest(t, name, storeFn, checkUpdateStatus)) | ||||
| 	t.Run("CommitExists", makeTest(t, name, storeFn, checkCommitExists)) | ||||
| 	t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter)) | ||||
| 	t.Run("ResumeTruncate", makeTest(t, name, storeFn, checkResume(resumeTruncate))) | ||||
| 	t.Run("ResumeDiscard", makeTest(t, name, storeFn, checkResume(resumeDiscard))) | ||||
| @@ -281,6 +283,39 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func checkCommitExists(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| 	c1, d1 := createContent(256) | ||||
| 	if err := content.WriteBlob(ctx, cs, "c1", bytes.NewReader(c1), ocispec.Descriptor{Digest: d1}); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	for i, tc := range []struct { | ||||
| 		expected digest.Digest | ||||
| 	}{ | ||||
| 		{ | ||||
| 			expected: d1, | ||||
| 		}, | ||||
| 		{}, | ||||
| 	} { | ||||
| 		w, err := cs.Writer(ctx, content.WithRef(fmt.Sprintf("c1-commitexists-%d", i))) | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		if _, err := w.Write(c1); err != nil { | ||||
| 			w.Close() | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		err = w.Commit(ctx, int64(len(c1)), tc.expected) | ||||
| 		w.Close() | ||||
| 		if err == nil { | ||||
| 			t.Errorf("(%d) Expected already exists error", i) | ||||
| 		} else if !errdefs.IsAlreadyExists(err) { | ||||
| 			t.Fatalf("(%d) Unexpected error: %+v", i, err) | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| 	c1, d1 := createContent(256) | ||||
|  | ||||
| @@ -353,7 +388,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| 	c1, d1 := createContent(256) | ||||
|  | ||||
| 	w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1})) | ||||
| 	w1, err := cs.Writer(ctx, content.WithRef("c1-checklabels"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1})) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|   | ||||
| @@ -139,8 +139,10 @@ func (s *walkingDiff) Compare(ctx context.Context, lower, upper []mount.Mount, o | ||||
|  | ||||
| 			dgst := cw.Digest() | ||||
| 			if err := cw.Commit(ctx, 0, dgst, commitopts...); err != nil { | ||||
| 				if !errdefs.IsAlreadyExists(err) { | ||||
| 					return errors.Wrap(err, "failed to commit") | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			info, err := s.store.Info(ctx, dgst) | ||||
| 			if err != nil { | ||||
|   | ||||
| @@ -164,11 +164,11 @@ func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Buck | ||||
| } | ||||
|  | ||||
| func createBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) { | ||||
| 	bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) | ||||
| 	bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return bkt, nil | ||||
| 	return bkt.CreateBucket([]byte(dgst.String())) | ||||
| } | ||||
|  | ||||
| func getBlobsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { | ||||
|   | ||||
| @@ -592,9 +592,6 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, | ||||
| 		} | ||||
| 		size = nw.desc.Size | ||||
| 		actual = nw.desc.Digest | ||||
| 		if getBlobBucket(tx, nw.namespace, actual) != nil { | ||||
| 			return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) | ||||
| 		} | ||||
| 	} else { | ||||
| 		status, err := nw.w.Status() | ||||
| 		if err != nil { | ||||
| @@ -606,18 +603,16 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, | ||||
| 		size = status.Offset | ||||
| 		actual = nw.w.Digest() | ||||
|  | ||||
| 		if err := nw.w.Commit(ctx, size, expected); err != nil { | ||||
| 			if !errdefs.IsAlreadyExists(err) { | ||||
| 		if err := nw.w.Commit(ctx, size, expected); err != nil && !errdefs.IsAlreadyExists(err) { | ||||
| 			return "", err | ||||
| 		} | ||||
| 			if getBlobBucket(tx, nw.namespace, actual) != nil { | ||||
| 				return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	bkt, err := createBlobBucket(tx, nw.namespace, actual) | ||||
| 	if err != nil { | ||||
| 		if err == bolt.ErrBucketExists { | ||||
| 			return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) | ||||
| 		} | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -213,7 +213,7 @@ func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServ | ||||
| 	_, err = io.CopyBuffer( | ||||
| 		&readResponseWriter{session: session}, | ||||
| 		io.NewSectionReader(ra, offset, size), *p) | ||||
| 	return err | ||||
| 	return errdefs.ToGRPC(err) | ||||
| } | ||||
|  | ||||
| // readResponseWriter is a writer that places the output into ReadContentRequest messages. | ||||
| @@ -420,7 +420,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { | ||||
| 				// maintain the offset as append only, we just issue the write. | ||||
| 				n, err := wr.Write(req.Data) | ||||
| 				if err != nil { | ||||
| 					return err | ||||
| 					return errdefs.ToGRPC(err) | ||||
| 				} | ||||
|  | ||||
| 				if n != len(req.Data) { | ||||
| @@ -438,7 +438,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { | ||||
| 					opts = append(opts, content.WithLabels(req.Labels)) | ||||
| 				} | ||||
| 				if err := wr.Commit(ctx, total, expected, opts...); err != nil { | ||||
| 					return err | ||||
| 					return errdefs.ToGRPC(err) | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
|   | ||||
							
								
								
									
										3
									
								
								task.go
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								task.go
									
									
									
									
									
								
							| @@ -607,9 +607,12 @@ func writeContent(ctx context.Context, store content.Ingester, mediaType, ref st | ||||
| 	if err != nil { | ||||
| 		return d, err | ||||
| 	} | ||||
|  | ||||
| 	if err := writer.Commit(ctx, size, "", opts...); err != nil { | ||||
| 		if !errdefs.IsAlreadyExists(err) { | ||||
| 			return d, err | ||||
| 		} | ||||
| 	} | ||||
| 	return v1.Descriptor{ | ||||
| 		MediaType: mediaType, | ||||
| 		Digest:    writer.Digest(), | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Phil Estes
					Phil Estes