metadata: add content lease on existing content
When a writer is requested for an object that already exists, add that object to the provided any lease to prevent other operations from affecting the current lease's use of that content. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
ab7150f0ff
commit
0d8b093361
@ -318,12 +318,23 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
cs.l.RLock()
|
||||
defer cs.l.RUnlock()
|
||||
|
||||
var w content.Writer
|
||||
var (
|
||||
w content.Writer
|
||||
exists bool
|
||||
)
|
||||
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
if expected != "" {
|
||||
cbkt := getBlobBucket(tx, ns, expected)
|
||||
if cbkt != nil {
|
||||
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
|
||||
// Add content to lease to prevent other reference removals
|
||||
// from effecting this object during a provided lease
|
||||
if err := addContentLease(ctx, tx, expected); err != nil {
|
||||
return errors.Wrap(err, "unable to lease content")
|
||||
}
|
||||
// Return error outside of transaction to ensure
|
||||
// commit succeeds with the lease.
|
||||
exists = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -363,6 +374,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exists {
|
||||
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
|
||||
}
|
||||
|
||||
// TODO: keep the expected in the writer to use on commit
|
||||
// when no expected is provided there.
|
||||
|
@ -1,6 +1,7 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@ -9,6 +10,11 @@ import (
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/content/local"
|
||||
"github.com/containerd/containerd/content/testsuite"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func createContentStore(ctx context.Context, root string) (content.Store, func() error, error) {
|
||||
@ -31,3 +37,76 @@ func createContentStore(ctx context.Context, root string) (content.Store, func()
|
||||
func TestContent(t *testing.T) {
|
||||
testsuite.ContentSuite(t, "metadata", createContentStore)
|
||||
}
|
||||
|
||||
func TestContentLeased(t *testing.T) {
|
||||
ctx, db, cancel := testDB(t)
|
||||
defer cancel()
|
||||
|
||||
cs := db.ContentStore()
|
||||
|
||||
blob := []byte("any content")
|
||||
expected := digest.FromBytes(blob)
|
||||
|
||||
lctx, _, err := createLease(ctx, db, "lease-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := checkContentLeased(lctx, db, expected); err != nil {
|
||||
t.Fatal("lease checked failed:", err)
|
||||
}
|
||||
|
||||
lctx, _, err = createLease(ctx, db, "lease-2")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil {
|
||||
t.Fatal("expected already exist error")
|
||||
} else if !errdefs.IsAlreadyExists(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := checkContentLeased(lctx, db, expected); err != nil {
|
||||
t.Fatal("lease checked failed:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := NewLeaseManager(tx).Create(ctx, name, nil)
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return leases.WithLease(ctx, name), func() error {
|
||||
return db.Update(func(tx *bolt.Tx) error {
|
||||
return NewLeaseManager(tx).Delete(ctx, name)
|
||||
})
|
||||
}, nil
|
||||
}
|
||||
|
||||
func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error {
|
||||
ns, ok := namespaces.Namespace(ctx)
|
||||
if !ok {
|
||||
return errors.New("no namespace in context")
|
||||
}
|
||||
lease, ok := leases.Lease(ctx)
|
||||
if !ok {
|
||||
return errors.New("no lease in context")
|
||||
}
|
||||
|
||||
return db.View(func(tx *bolt.Tx) error {
|
||||
bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectContent)
|
||||
if bkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "bucket not found", lease)
|
||||
}
|
||||
v := bkt.Get([]byte(dgst.String()))
|
||||
if v == nil {
|
||||
return errors.Wrap(errdefs.ErrNotFound, "object not leased")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -29,6 +30,44 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func testDB(t *testing.T) (context.Context, *DB, func()) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx = namespaces.WithNamespace(ctx, "testing")
|
||||
|
||||
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
snapshotter, err := naive.NewSnapshotter(filepath.Join(dirname, "naive"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cs, err := local.NewStore(filepath.Join(dirname, "content"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bdb, err := bolt.Open(filepath.Join(dirname, "metadata.db"), 0644, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db := NewDB(bdb, cs, map[string]snapshots.Snapshotter{"naive": snapshotter})
|
||||
if err := db.Init(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return ctx, db, func() {
|
||||
bdb.Close()
|
||||
if err := os.RemoveAll(dirname); err != nil {
|
||||
t.Log("failed removing temp dir", err)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
ctx, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
|
Loading…
Reference in New Issue
Block a user