diff --git a/metadata/content.go b/metadata/content.go index c13f7867e..57eabf307 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -318,12 +318,23 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe cs.l.RLock() defer cs.l.RUnlock() - var w content.Writer + var ( + w content.Writer + exists bool + ) if err := update(ctx, cs.db, func(tx *bolt.Tx) error { if expected != "" { cbkt := getBlobBucket(tx, ns, expected) if cbkt != nil { - return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) + // Add content to lease to prevent other reference removals + // from effecting this object during a provided lease + if err := addContentLease(ctx, tx, expected); err != nil { + return errors.Wrap(err, "unable to lease content") + } + // Return error outside of transaction to ensure + // commit succeeds with the lease. + exists = true + return nil } } @@ -363,6 +374,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe }); err != nil { return nil, err } + if exists { + return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) + } // TODO: keep the expected in the writer to use on commit // when no expected is provided there. diff --git a/metadata/content_test.go b/metadata/content_test.go index d51a2dbb5..ad212c28a 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -1,6 +1,7 @@ package metadata import ( + "bytes" "context" "path/filepath" "testing" @@ -9,6 +10,11 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/content/testsuite" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/namespaces" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" ) func createContentStore(ctx context.Context, root string) (content.Store, func() error, error) { @@ -31,3 +37,76 @@ func createContentStore(ctx context.Context, root string) (content.Store, func() func TestContent(t *testing.T) { testsuite.ContentSuite(t, "metadata", createContentStore) } + +func TestContentLeased(t *testing.T) { + ctx, db, cancel := testDB(t) + defer cancel() + + cs := db.ContentStore() + + blob := []byte("any content") + expected := digest.FromBytes(blob) + + lctx, _, err := createLease(ctx, db, "lease-1") + if err != nil { + t.Fatal(err) + } + if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil { + t.Fatal(err) + } + if err := checkContentLeased(lctx, db, expected); err != nil { + t.Fatal("lease checked failed:", err) + } + + lctx, _, err = createLease(ctx, db, "lease-2") + if err != nil { + t.Fatal(err) + } + + if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil { + t.Fatal("expected already exist error") + } else if !errdefs.IsAlreadyExists(err) { + t.Fatal(err) + } + if err := checkContentLeased(lctx, db, expected); err != nil { + t.Fatal("lease checked failed:", err) + } +} + +func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) { + if err := db.Update(func(tx *bolt.Tx) error { + _, err := NewLeaseManager(tx).Create(ctx, name, nil) + return err + }); err != nil { + return nil, nil, err + } + return leases.WithLease(ctx, name), func() error { + return db.Update(func(tx *bolt.Tx) error { + return NewLeaseManager(tx).Delete(ctx, name) + }) + }, nil +} + +func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error { + ns, ok := namespaces.Namespace(ctx) + if !ok { + return errors.New("no namespace in context") + } + lease, ok := leases.Lease(ctx) + if !ok { + return errors.New("no lease in context") + } + + return db.View(func(tx *bolt.Tx) error { + bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectContent) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "bucket not found", lease) + } + v := bkt.Get([]byte(dgst.String())) + if v == nil { + return errors.Wrap(errdefs.ErrNotFound, "object not leased") + } + + return nil + }) +} diff --git a/metadata/db_test.go b/metadata/db_test.go index bc97e6244..209bc6c5b 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "runtime/pprof" + "strings" "testing" "time" @@ -29,6 +30,44 @@ import ( "github.com/pkg/errors" ) +func testDB(t *testing.T) (context.Context, *DB, func()) { + ctx, cancel := context.WithCancel(context.Background()) + ctx = namespaces.WithNamespace(ctx, "testing") + + dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-") + if err != nil { + t.Fatal(err) + } + + snapshotter, err := naive.NewSnapshotter(filepath.Join(dirname, "naive")) + if err != nil { + t.Fatal(err) + } + + cs, err := local.NewStore(filepath.Join(dirname, "content")) + if err != nil { + t.Fatal(err) + } + + bdb, err := bolt.Open(filepath.Join(dirname, "metadata.db"), 0644, nil) + if err != nil { + t.Fatal(err) + } + + db := NewDB(bdb, cs, map[string]snapshots.Snapshotter{"naive": snapshotter}) + if err := db.Init(ctx); err != nil { + t.Fatal(err) + } + + return ctx, db, func() { + bdb.Close() + if err := os.RemoveAll(dirname); err != nil { + t.Log("failed removing temp dir", err) + } + cancel() + } +} + func TestInit(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel()