Fix writer deadlock in local store

The local store could end up in a state where the writer is
closed but the reference is locked after a commit on an
existing object.
Cleans up Commit logic to always close the writer even after
an error occurs, guaranteeing the reference is unlocked after commit.
Adds a test to the content test suite to verify this behavior.
Updates the content store interface definitions to clarify the behavior.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan
2018-10-10 17:00:07 -07:00
parent 15f19d7a67
commit 0f756495a9
6 changed files with 240 additions and 42 deletions

View File

@@ -556,12 +556,6 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
var innerErr error
if err := update(ctx, nw.db, func(tx *bolt.Tx) error {
bkt := getIngestsBucket(tx, nw.namespace)
if bkt != nil {
if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
}
dgst, err := nw.commit(ctx, tx, size, expected, opts...)
if err != nil {
if !errdefs.IsAlreadyExists(err) {
@@ -569,6 +563,12 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
}
innerErr = err
}
bkt := getIngestsBucket(tx, nw.namespace)
if bkt != nil {
if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
}
if err := removeIngestLease(ctx, tx, nw.ref); err != nil {
return err
}
@@ -584,30 +584,38 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
if nw.w != nil {
nw.w.Close()
}
return "", err
}
}
if err := validateInfo(&base); err != nil {
if nw.w != nil {
nw.w.Close()
}
return "", err
}
var actual digest.Digest
if nw.w == nil {
if size != 0 && size != nw.desc.Size {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size)
return "", errors.Wrapf(errdefs.ErrFailedPrecondition, "%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size)
}
if expected != "" && expected != nw.desc.Digest {
return "", errors.Errorf("%q unexpected digest", nw.ref)
return "", errors.Wrapf(errdefs.ErrFailedPrecondition, "%q unexpected digest", nw.ref)
}
size = nw.desc.Size
actual = nw.desc.Digest
} else {
status, err := nw.w.Status()
if err != nil {
nw.w.Close()
return "", err
}
if size != 0 && size != status.Offset {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
nw.w.Close()
return "", errors.Wrapf(errdefs.ErrFailedPrecondition, "%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
actual = nw.w.Digest()