Always check exists on commit error
Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
c0cb2f2568
commit
6875d3df3a
@ -132,11 +132,11 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest,
|
||||
// clean up!!
|
||||
defer os.RemoveAll(w.path)
|
||||
|
||||
if _, err := os.Stat(target); err == nil {
|
||||
// collision with the target file!
|
||||
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst)
|
||||
}
|
||||
if err := os.Rename(ingest, target); err != nil {
|
||||
if os.IsExist(err) {
|
||||
// collision with the target file!
|
||||
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst)
|
||||
}
|
||||
return err
|
||||
}
|
||||
commitTime := time.Now()
|
||||
|
@ -139,7 +139,9 @@ func (s *walkingDiff) Compare(ctx context.Context, lower, upper []mount.Mount, o
|
||||
|
||||
dgst := cw.Digest()
|
||||
if err := cw.Commit(ctx, 0, dgst, commitopts...); err != nil {
|
||||
return errors.Wrap(err, "failed to commit")
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return errors.Wrap(err, "failed to commit")
|
||||
}
|
||||
}
|
||||
|
||||
info, err := s.store.Info(ctx, dgst)
|
||||
|
@ -164,11 +164,11 @@ func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Buck
|
||||
}
|
||||
|
||||
func createBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) {
|
||||
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String()))
|
||||
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bkt, nil
|
||||
return bkt.CreateBucket([]byte(dgst.String()))
|
||||
}
|
||||
|
||||
func getBlobsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
|
||||
|
@ -592,9 +592,6 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
|
||||
}
|
||||
size = nw.desc.Size
|
||||
actual = nw.desc.Digest
|
||||
if getBlobBucket(tx, nw.namespace, actual) != nil {
|
||||
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
|
||||
}
|
||||
} else {
|
||||
status, err := nw.w.Status()
|
||||
if err != nil {
|
||||
@ -606,18 +603,16 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
|
||||
size = status.Offset
|
||||
actual = nw.w.Digest()
|
||||
|
||||
if err := nw.w.Commit(ctx, size, expected); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return "", err
|
||||
}
|
||||
if getBlobBucket(tx, nw.namespace, actual) != nil {
|
||||
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
|
||||
}
|
||||
if err := nw.w.Commit(ctx, size, expected); err != nil && !errdefs.IsAlreadyExists(err) {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
bkt, err := createBlobBucket(tx, nw.namespace, actual)
|
||||
if err != nil {
|
||||
if err == bolt.ErrBucketExists {
|
||||
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
@ -213,7 +213,7 @@ func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServ
|
||||
_, err = io.CopyBuffer(
|
||||
&readResponseWriter{session: session},
|
||||
io.NewSectionReader(ra, offset, size), *p)
|
||||
return err
|
||||
return errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
// readResponseWriter is a writer that places the output into ReadContentRequest messages.
|
||||
@ -420,7 +420,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) {
|
||||
// maintain the offset as append only, we just issue the write.
|
||||
n, err := wr.Write(req.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
return errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
if n != len(req.Data) {
|
||||
@ -438,7 +438,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) {
|
||||
opts = append(opts, content.WithLabels(req.Labels))
|
||||
}
|
||||
if err := wr.Commit(ctx, total, expected, opts...); err != nil {
|
||||
return err
|
||||
return errdefs.ToGRPC(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
5
task.go
5
task.go
@ -607,8 +607,11 @@ func writeContent(ctx context.Context, store content.Ingester, mediaType, ref st
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
|
||||
if err := writer.Commit(ctx, size, "", opts...); err != nil {
|
||||
return d, err
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return d, err
|
||||
}
|
||||
}
|
||||
return v1.Descriptor{
|
||||
MediaType: mediaType,
|
||||
|
Loading…
Reference in New Issue
Block a user