Merge pull request #1970 from dmcgowan/lease-add-on-writer
metadata: add content lease on existing content
This commit is contained in:
commit
787e1a2b69
@ -318,12 +318,23 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
cs.l.RLock()
|
||||
defer cs.l.RUnlock()
|
||||
|
||||
var w content.Writer
|
||||
var (
|
||||
w content.Writer
|
||||
exists bool
|
||||
)
|
||||
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
if expected != "" {
|
||||
cbkt := getBlobBucket(tx, ns, expected)
|
||||
if cbkt != nil {
|
||||
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
|
||||
// Add content to lease to prevent other reference removals
|
||||
// from effecting this object during a provided lease
|
||||
if err := addContentLease(ctx, tx, expected); err != nil {
|
||||
return errors.Wrap(err, "unable to lease content")
|
||||
}
|
||||
// Return error outside of transaction to ensure
|
||||
// commit succeeds with the lease.
|
||||
exists = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -363,6 +374,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exists {
|
||||
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
|
||||
}
|
||||
|
||||
// TODO: keep the expected in the writer to use on commit
|
||||
// when no expected is provided there.
|
||||
|
@ -1,6 +1,7 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@ -9,6 +10,11 @@ import (
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/content/local"
|
||||
"github.com/containerd/containerd/content/testsuite"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func createContentStore(ctx context.Context, root string) (content.Store, func() error, error) {
|
||||
@ -31,3 +37,76 @@ func createContentStore(ctx context.Context, root string) (content.Store, func()
|
||||
func TestContent(t *testing.T) {
|
||||
testsuite.ContentSuite(t, "metadata", createContentStore)
|
||||
}
|
||||
|
||||
func TestContentLeased(t *testing.T) {
|
||||
ctx, db, cancel := testDB(t)
|
||||
defer cancel()
|
||||
|
||||
cs := db.ContentStore()
|
||||
|
||||
blob := []byte("any content")
|
||||
expected := digest.FromBytes(blob)
|
||||
|
||||
lctx, _, err := createLease(ctx, db, "lease-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := checkContentLeased(lctx, db, expected); err != nil {
|
||||
t.Fatal("lease checked failed:", err)
|
||||
}
|
||||
|
||||
lctx, _, err = createLease(ctx, db, "lease-2")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil {
|
||||
t.Fatal("expected already exist error")
|
||||
} else if !errdefs.IsAlreadyExists(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := checkContentLeased(lctx, db, expected); err != nil {
|
||||
t.Fatal("lease checked failed:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := NewLeaseManager(tx).Create(ctx, name, nil)
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return leases.WithLease(ctx, name), func() error {
|
||||
return db.Update(func(tx *bolt.Tx) error {
|
||||
return NewLeaseManager(tx).Delete(ctx, name)
|
||||
})
|
||||
}, nil
|
||||
}
|
||||
|
||||
func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error {
|
||||
ns, ok := namespaces.Namespace(ctx)
|
||||
if !ok {
|
||||
return errors.New("no namespace in context")
|
||||
}
|
||||
lease, ok := leases.Lease(ctx)
|
||||
if !ok {
|
||||
return errors.New("no lease in context")
|
||||
}
|
||||
|
||||
return db.View(func(tx *bolt.Tx) error {
|
||||
bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectContent)
|
||||
if bkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "bucket not found", lease)
|
||||
}
|
||||
v := bkt.Get([]byte(dgst.String()))
|
||||
if v == nil {
|
||||
return errors.Wrap(errdefs.ErrNotFound, "object not leased")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -29,6 +30,44 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func testDB(t *testing.T) (context.Context, *DB, func()) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx = namespaces.WithNamespace(ctx, "testing")
|
||||
|
||||
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
snapshotter, err := naive.NewSnapshotter(filepath.Join(dirname, "naive"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cs, err := local.NewStore(filepath.Join(dirname, "content"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bdb, err := bolt.Open(filepath.Join(dirname, "metadata.db"), 0644, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db := NewDB(bdb, cs, map[string]snapshots.Snapshotter{"naive": snapshotter})
|
||||
if err := db.Init(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return ctx, db, func() {
|
||||
bdb.Close()
|
||||
if err := os.RemoveAll(dirname); err != nil {
|
||||
t.Log("failed removing temp dir", err)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
ctx, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
|
Loading…
Reference in New Issue
Block a user