From 0d8b0933610a9a3b76ca68c0745526a93bc68820 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 5 Jan 2018 10:56:31 -0800 Subject: [PATCH] metadata: add content lease on existing content When a writer is requested for an object that already exists, add that object to the provided any lease to prevent other operations from affecting the current lease's use of that content. Signed-off-by: Derek McGowan --- metadata/content.go | 18 ++++++++- metadata/content_test.go | 79 ++++++++++++++++++++++++++++++++++++++++ metadata/db_test.go | 39 ++++++++++++++++++++ 3 files changed, 134 insertions(+), 2 deletions(-) diff --git a/metadata/content.go b/metadata/content.go index c13f7867e..57eabf307 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -318,12 +318,23 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe cs.l.RLock() defer cs.l.RUnlock() - var w content.Writer + var ( + w content.Writer + exists bool + ) if err := update(ctx, cs.db, func(tx *bolt.Tx) error { if expected != "" { cbkt := getBlobBucket(tx, ns, expected) if cbkt != nil { - return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) + // Add content to lease to prevent other reference removals + // from effecting this object during a provided lease + if err := addContentLease(ctx, tx, expected); err != nil { + return errors.Wrap(err, "unable to lease content") + } + // Return error outside of transaction to ensure + // commit succeeds with the lease. + exists = true + return nil } } @@ -363,6 +374,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe }); err != nil { return nil, err } + if exists { + return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) + } // TODO: keep the expected in the writer to use on commit // when no expected is provided there. diff --git a/metadata/content_test.go b/metadata/content_test.go index d51a2dbb5..ad212c28a 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -1,6 +1,7 @@ package metadata import ( + "bytes" "context" "path/filepath" "testing" @@ -9,6 +10,11 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/content/testsuite" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/namespaces" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" ) func createContentStore(ctx context.Context, root string) (content.Store, func() error, error) { @@ -31,3 +37,76 @@ func createContentStore(ctx context.Context, root string) (content.Store, func() func TestContent(t *testing.T) { testsuite.ContentSuite(t, "metadata", createContentStore) } + +func TestContentLeased(t *testing.T) { + ctx, db, cancel := testDB(t) + defer cancel() + + cs := db.ContentStore() + + blob := []byte("any content") + expected := digest.FromBytes(blob) + + lctx, _, err := createLease(ctx, db, "lease-1") + if err != nil { + t.Fatal(err) + } + if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil { + t.Fatal(err) + } + if err := checkContentLeased(lctx, db, expected); err != nil { + t.Fatal("lease checked failed:", err) + } + + lctx, _, err = createLease(ctx, db, "lease-2") + if err != nil { + t.Fatal(err) + } + + if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil { + t.Fatal("expected already exist error") + } else if !errdefs.IsAlreadyExists(err) { + t.Fatal(err) + } + if err := checkContentLeased(lctx, db, expected); err != nil { + t.Fatal("lease checked failed:", err) + } +} + +func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) { + if err := db.Update(func(tx *bolt.Tx) error { + _, err := NewLeaseManager(tx).Create(ctx, name, nil) + return err + }); err != nil { + return nil, nil, err + } + return leases.WithLease(ctx, name), func() error { + return db.Update(func(tx *bolt.Tx) error { + return NewLeaseManager(tx).Delete(ctx, name) + }) + }, nil +} + +func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error { + ns, ok := namespaces.Namespace(ctx) + if !ok { + return errors.New("no namespace in context") + } + lease, ok := leases.Lease(ctx) + if !ok { + return errors.New("no lease in context") + } + + return db.View(func(tx *bolt.Tx) error { + bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectContent) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "bucket not found", lease) + } + v := bkt.Get([]byte(dgst.String())) + if v == nil { + return errors.Wrap(errdefs.ErrNotFound, "object not leased") + } + + return nil + }) +} diff --git a/metadata/db_test.go b/metadata/db_test.go index bc97e6244..209bc6c5b 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "runtime/pprof" + "strings" "testing" "time" @@ -29,6 +30,44 @@ import ( "github.com/pkg/errors" ) +func testDB(t *testing.T) (context.Context, *DB, func()) { + ctx, cancel := context.WithCancel(context.Background()) + ctx = namespaces.WithNamespace(ctx, "testing") + + dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-") + if err != nil { + t.Fatal(err) + } + + snapshotter, err := naive.NewSnapshotter(filepath.Join(dirname, "naive")) + if err != nil { + t.Fatal(err) + } + + cs, err := local.NewStore(filepath.Join(dirname, "content")) + if err != nil { + t.Fatal(err) + } + + bdb, err := bolt.Open(filepath.Join(dirname, "metadata.db"), 0644, nil) + if err != nil { + t.Fatal(err) + } + + db := NewDB(bdb, cs, map[string]snapshots.Snapshotter{"naive": snapshotter}) + if err := db.Init(ctx); err != nil { + t.Fatal(err) + } + + return ctx, db, func() { + bdb.Close() + if err := os.RemoveAll(dirname); err != nil { + t.Log("failed removing temp dir", err) + } + cancel() + } +} + func TestInit(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel()