diff --git a/content/content.go b/content/content.go index aabf4c8f3..d8141a68b 100644 --- a/content/content.go +++ b/content/content.go @@ -110,8 +110,9 @@ type IngestManager interface { // Writer handles the write of content into a content store type Writer interface { - // Close is expected to be called after Commit() when commission is needed. - // Closing a writer without commit allows resuming or aborting. + // Close closes the writer, if the writer has not been + // committed this allows resuming or aborting. + // Calling Close on a closed writer will not error. io.WriteCloser // Digest may return empty digest or panics until committed. @@ -119,6 +120,8 @@ type Writer interface { // Commit commits the blob (but no roll-back is guaranteed on an error). // size and expected can be zero-value when unknown. + // Commit always closes the writer, even on error. + // ErrAlreadyExists aborts the writer. Commit(ctx context.Context, size int64, expected digest.Digest, opts ...Opt) error // Status returns the current state of write diff --git a/content/local/store.go b/content/local/store.go index 7fa9bb736..996724057 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -524,12 +524,11 @@ func (s *store) writer(ctx context.Context, ref string, total int64, expected di if err != nil { return nil, err } - defer fp.Close() p := bufPool.Get().(*[]byte) - defer bufPool.Put(p) - offset, err = io.CopyBuffer(digester.Hash(), fp, *p) + bufPool.Put(p) + fp.Close() if err != nil { return nil, err } diff --git a/content/local/writer.go b/content/local/writer.go index 10df4a4c5..223b14544 100644 --- a/content/local/writer.go +++ b/content/local/writer.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -80,43 +81,36 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, } } - if w.fp == nil { + // Ensure even on error the writer is fully closed + defer unlock(w.ref) + fp := w.fp + w.fp = nil + + if fp == nil { return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot commit on closed writer") } - if err := w.fp.Sync(); err != nil { + if err := fp.Sync(); err != nil { + fp.Close() return errors.Wrap(err, "sync failed") } - fi, err := w.fp.Stat() + fi, err := fp.Stat() + closeErr := fp.Close() if err != nil { return errors.Wrap(err, "stat on ingest file failed") } - - // change to readonly, more important for read, but provides _some_ - // protection from this point on. We use the existing perms with a mask - // only allowing reads honoring the umask on creation. - // - // This removes write and exec, only allowing read per the creation umask. - // - // NOTE: Windows does not support this operation - if runtime.GOOS != "windows" { - if err := w.fp.Chmod((fi.Mode() & os.ModePerm) &^ 0333); err != nil { - return errors.Wrap(err, "failed to change ingest file permissions") - } + if closeErr != nil { + return errors.Wrap(err, "failed to close ingest file") } if size > 0 && size != fi.Size() { - return errors.Errorf("unexpected commit size %d, expected %d", fi.Size(), size) - } - - if err := w.fp.Close(); err != nil { - return errors.Wrap(err, "failed closing ingest") + return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit size %d, expected %d", fi.Size(), size) } dgst := w.digester.Digest() if expected != "" && expected != dgst { - return errors.Errorf("unexpected commit digest %s, expected %s", dgst, expected) + return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit digest %s, expected %s", dgst, expected) } var ( @@ -129,27 +123,48 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, return err } - // clean up!! - defer os.RemoveAll(w.path) - if _, err := os.Stat(target); err == nil { // collision with the target file! + if err := os.RemoveAll(w.path); err != nil { + log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Errorf("failed to remove ingest directory") + } return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst) } + if err := os.Rename(ingest, target); err != nil { return err } + + // Ingest has now been made available in the content store, attempt to complete + // setting metadata but errors should only be logged and not returned since + // the content store cannot be cleanly rolled back. + commitTime := time.Now() if err := os.Chtimes(target, commitTime, commitTime); err != nil { - return err + log.G(ctx).WithField("digest", dgst).Errorf("failed to change file time to commit time") } - w.fp = nil - unlock(w.ref) + // clean up!! + if err := os.RemoveAll(w.path); err != nil { + log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Errorf("failed to remove ingest directory") + } if w.s.ls != nil && base.Labels != nil { if err := w.s.ls.Set(dgst, base.Labels); err != nil { - return err + log.G(ctx).WithField("digest", dgst).Errorf("failed to set labels") + } + } + + // change to readonly, more important for read, but provides _some_ + // protection from this point on. We use the existing perms with a mask + // only allowing reads honoring the umask on creation. + // + // This removes write and exec, only allowing read per the creation umask. + // + // NOTE: Windows does not support this operation + if runtime.GOOS != "windows" { + if err := os.Chmod(target, (fi.Mode()&os.ModePerm)&^0333); err != nil { + log.G(ctx).WithField("ref", w.ref).Errorf("failed to make readonly") } } diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 908facbdf..13392f6d1 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -54,6 +54,8 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r t.Run("CrossNamespaceAppend", makeTest(t, name, storeFn, checkCrossNSAppend)) t.Run("CrossNamespaceShare", makeTest(t, name, storeFn, checkCrossNSShare)) + + t.Run("CommitErrorState", makeTest(t, name, storeFn, checkCommitErrorState)) } // ContextWrapper is used to decorate new context used inside the test @@ -312,7 +314,175 @@ func checkCommitExists(ctx context.Context, t *testing.T, cs content.Store) { } else if !errdefs.IsAlreadyExists(err) { t.Fatalf("(%d) Unexpected error: %+v", i, err) } + } +} +func checkRefNotAvailable(ctx context.Context, t *testing.T, cs content.Store, ref string) { + t.Helper() + + w, err := cs.Writer(ctx, content.WithRef(ref)) + if err == nil { + w.Close() + t.Fatal("writer created with ref, expected to be in use") + } + if !errdefs.IsUnavailable(err) { + t.Fatalf("Expected unavailable error, got %+v", err) + } +} + +func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store) { + c1, d1 := createContent(256) + _, d2 := createContent(256) + if err := content.WriteBlob(ctx, cs, "c1", bytes.NewReader(c1), ocispec.Descriptor{Digest: d1}); err != nil { + t.Fatal(err) + } + + ref := "c1-commiterror-state" + w, err := cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatal(err) + } + if _, err := w.Write(c1); err != nil { + if err := w.Close(); err != nil { + t.Errorf("Close error: %+v", err) + } + t.Fatal(err) + } + + checkRefNotAvailable(ctx, t, cs, ref) + + // Check exists + err = w.Commit(ctx, int64(len(c1)), d1) + if err == nil { + t.Fatalf("Expected already exists error") + } else if !errdefs.IsAlreadyExists(err) { + if err := w.Close(); err != nil { + t.Errorf("Close error: %+v", err) + } + t.Fatalf("Unexpected error: %+v", err) + } + + w, err = cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatal(err) + } + + checkRefNotAvailable(ctx, t, cs, ref) + + if _, err := w.Write(c1); err != nil { + if err := w.Close(); err != nil { + t.Errorf("close error: %+v", err) + } + t.Fatal(err) + } + + // Check exists without providing digest + err = w.Commit(ctx, int64(len(c1)), "") + if err == nil { + t.Fatalf("Expected already exists error") + } else if !errdefs.IsAlreadyExists(err) { + if err := w.Close(); err != nil { + t.Errorf("Close error: %+v", err) + } + t.Fatalf("Unexpected error: %+v", err) + } + + w, err = cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatal(err) + } + + checkRefNotAvailable(ctx, t, cs, ref) + + if _, err := w.Write(append(c1, []byte("more")...)); err != nil { + if err := w.Close(); err != nil { + t.Errorf("close error: %+v", err) + } + t.Fatal(err) + } + + // Commit with the wrong digest should produce an error + err = w.Commit(ctx, int64(len(c1))+4, d2) + if err == nil { + t.Fatalf("Expected error from wrong digest") + } else if !errdefs.IsFailedPrecondition(err) { + t.Errorf("Unexpected error: %+v", err) + } + + w, err = cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatal(err) + } + + checkRefNotAvailable(ctx, t, cs, ref) + + // Commit with wrong size should also produce an error + err = w.Commit(ctx, int64(len(c1)), "") + if err == nil { + t.Fatalf("Expected error from wrong size") + } else if !errdefs.IsFailedPrecondition(err) { + t.Errorf("Unexpected error: %+v", err) + } + + w, err = cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatal(err) + } + + checkRefNotAvailable(ctx, t, cs, ref) + + // Now expect commit to succeed + if err := w.Commit(ctx, int64(len(c1))+4, ""); err != nil { + if err := w.Close(); err != nil { + t.Errorf("close error: %+v", err) + } + t.Fatalf("Failed to commit: %+v", err) + } + + // Create another writer with same reference + w, err = cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatalf("Failed to open writer: %+v", err) + } + + if _, err := w.Write(c1); err != nil { + if err := w.Close(); err != nil { + t.Errorf("close error: %+v", err) + } + t.Fatal(err) + } + + checkRefNotAvailable(ctx, t, cs, ref) + + // Commit should fail due to already exists + err = w.Commit(ctx, int64(len(c1)), d1) + if err == nil { + t.Fatalf("Expected already exists error") + } else if !errdefs.IsAlreadyExists(err) { + if err := w.Close(); err != nil { + t.Errorf("close error: %+v", err) + } + t.Fatalf("Unexpected error: %+v", err) + } + + w, err = cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatal(err) + } + + checkRefNotAvailable(ctx, t, cs, ref) + + if err := w.Close(); err != nil { + t.Fatalf("Close failed: %+v", err) + } + + // Create another writer with same reference to check available + w, err = cs.Writer(ctx, content.WithRef(ref)) + if err != nil { + t.Fatalf("Failed to open writer: %+v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close failed: %+v", err) } } diff --git a/metadata/content.go b/metadata/content.go index f51b0aadd..8ee0f2e20 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -556,12 +556,6 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig var innerErr error if err := update(ctx, nw.db, func(tx *bolt.Tx) error { - bkt := getIngestsBucket(tx, nw.namespace) - if bkt != nil { - if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound { - return err - } - } dgst, err := nw.commit(ctx, tx, size, expected, opts...) if err != nil { if !errdefs.IsAlreadyExists(err) { @@ -569,6 +563,12 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig } innerErr = err } + bkt := getIngestsBucket(tx, nw.namespace) + if bkt != nil { + if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound { + return err + } + } if err := removeIngestLease(ctx, tx, nw.ref); err != nil { return err } @@ -584,30 +584,38 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, var base content.Info for _, opt := range opts { if err := opt(&base); err != nil { + if nw.w != nil { + nw.w.Close() + } return "", err } } if err := validateInfo(&base); err != nil { + if nw.w != nil { + nw.w.Close() + } return "", err } var actual digest.Digest if nw.w == nil { if size != 0 && size != nw.desc.Size { - return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size) + return "", errors.Wrapf(errdefs.ErrFailedPrecondition, "%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size) } if expected != "" && expected != nw.desc.Digest { - return "", errors.Errorf("%q unexpected digest", nw.ref) + return "", errors.Wrapf(errdefs.ErrFailedPrecondition, "%q unexpected digest", nw.ref) } size = nw.desc.Size actual = nw.desc.Digest } else { status, err := nw.w.Status() if err != nil { + nw.w.Close() return "", err } if size != 0 && size != status.Offset { - return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) + nw.w.Close() + return "", errors.Wrapf(errdefs.ErrFailedPrecondition, "%q failed size validation: %v != %v", nw.ref, status.Offset, size) } size = status.Offset actual = nw.w.Digest() diff --git a/services/content/service.go b/services/content/service.go index 68d3920ff..d7e666053 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -376,6 +376,9 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { expected = req.Expected if _, err := s.store.Info(session.Context(), req.Expected); err == nil { + if err := wr.Close(); err != nil { + log.G(ctx).WithError(err).Error("failed to close writer") + } if err := s.store.Abort(session.Context(), ref); err != nil { log.G(ctx).WithError(err).Error("failed to abort write") }