Merge pull request #2154 from dmcgowan/shared-content-ingests

content: shared content across namespaces
This commit is contained in:
Stephen Day
2018-03-12 16:11:32 -07:00
committed by GitHub
11 changed files with 577 additions and 88 deletions

View File

@@ -52,7 +52,7 @@ var (
bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references
bucketKeyObjectContent = []byte("content") // stores content references
bucketKeyObjectBlob = []byte("blob") // stores content links
bucketKeyObjectIngest = []byte("ingest") // stores ingest links
bucketKeyObjectIngests = []byte("ingests") // stores ingest objects
bucketKeyObjectLeases = []byte("leases") // stores leases
bucketKeyDigest = []byte("digest")
@@ -70,6 +70,10 @@ var (
bucketKeyTarget = []byte("target")
bucketKeyExtensions = []byte("extensions")
bucketKeyCreatedAt = []byte("createdat")
bucketKeyExpected = []byte("expected")
bucketKeyRef = []byte("ref")
deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2
)
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
@@ -178,14 +182,18 @@ func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Buck
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String()))
}
func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest)
func getIngestsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests)
}
func createIngestBucket(tx *bolt.Tx, namespace, ref string) (*bolt.Bucket, error) {
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref))
if err != nil {
return nil, err
}
return bkt, nil
}
func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest)
func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref))
}

View File

@@ -226,14 +226,16 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte
brefs := map[string]string{}
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
bkt := getIngestsBucket(tx, ns)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
// TODO(dmcgowan): match name and potentially labels here
brefs[string(k)] = string(v)
if v == nil {
// TODO(dmcgowan): match name and potentially labels here
brefs[string(k)] = string(bkt.Bucket(k).Get(bucketKeyRef))
}
return nil
})
}); err != nil {
@@ -261,11 +263,11 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte
}
func getRef(tx *bolt.Tx, ns, ref string) string {
bkt := getIngestBucket(tx, ns)
bkt := getIngestBucket(tx, ns, ref)
if bkt == nil {
return ""
}
v := bkt.Get([]byte(ref))
v := bkt.Get(bucketKeyRef)
if len(v) == 0 {
return ""
}
@@ -308,19 +310,29 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
ibkt := getIngestsBucket(tx, ns)
if ibkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
bkt := ibkt.Bucket([]byte(ref))
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
bref := string(bkt.Get([]byte(ref)))
bref := string(bkt.Get(bucketKeyRef))
if bref == "" {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
if err := bkt.Delete([]byte(ref)); err != nil {
expected := string(bkt.Get(bucketKeyExpected))
if err := ibkt.DeleteBucket([]byte(ref)); err != nil {
return err
}
return cs.Store.Abort(ctx, bref)
// if not shared content, delete active ingest on backend
if expected == "" {
return cs.Store.Abort(ctx, bref)
}
return nil
})
}
@@ -337,8 +349,10 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
var (
w content.Writer
exists bool
bref string
)
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
var shared bool
if expected != "" {
cbkt := getBlobBucket(tx, ns, expected)
if cbkt != nil {
@@ -352,18 +366,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
exists = true
return nil
}
if st, err := cs.Store.Info(ctx, expected); err == nil {
// Ensure the expected size is the same, it is likely
// an error if the size is mismatched but the caller
// must resolve this on commit
if size == 0 || size == st.Size {
shared = true
size = st.Size
}
}
}
bkt, err := createIngestBucket(tx, ns)
bkt, err := createIngestBucket(tx, ns, ref)
if err != nil {
return err
}
var (
bref string
brefb = bkt.Get([]byte(ref))
)
brefb := bkt.Get(bucketKeyRef)
if brefb == nil {
sid, err := bkt.NextSequence()
if err != nil {
@@ -371,21 +391,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
}
bref = createKey(sid, ns, ref)
if err := bkt.Put([]byte(ref), []byte(bref)); err != nil {
if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil {
return err
}
} else {
bref = string(brefb)
}
// Do not use the passed in expected value here since it was
// already checked against the user metadata. If the content
// store has the content, it must still be written before
// linked into the given namespace. It is possible in the future
// to allow content which exists in content store but not
// namespace to be linked here and returned an exist error, but
// this would require more configuration to make secure.
w, err = cs.Store.Writer(ctx, bref, size, "")
if shared {
if err := bkt.Put(bucketKeyExpected, []byte(expected)); err != nil {
return err
}
} else {
// Do not use the passed in expected value here since it was
// already checked against the user metadata. The content must
// be committed in the namespace before it will be seen as
// available in the current namespace.
w, err = cs.Store.Writer(ctx, bref, size, "")
}
return err
}); err != nil {
return nil, err
@@ -394,23 +417,99 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
}
// TODO: keep the expected in the writer to use on commit
// when no expected is provided there.
return &namespacedWriter{
Writer: w,
ctx: ctx,
ref: ref,
namespace: ns,
db: cs.db,
provider: cs.Store,
l: &cs.l,
w: w,
bref: bref,
started: time.Now(),
expected: expected,
size: size,
}, nil
}
type namespacedWriter struct {
content.Writer
ctx context.Context
ref string
namespace string
db transactor
l *sync.RWMutex
provider interface {
content.Provider
content.Ingester
}
l *sync.RWMutex
w content.Writer
bref string
started time.Time
expected digest.Digest
size int64
}
func (nw *namespacedWriter) Close() error {
if nw.w != nil {
return nw.w.Close()
}
return nil
}
func (nw *namespacedWriter) Write(p []byte) (int, error) {
// if no writer, first copy and unshare before performing write
if nw.w == nil {
if len(p) == 0 {
return 0, nil
}
if err := nw.createAndCopy(nw.ctx, nw.size); err != nil {
return 0, err
}
}
return nw.w.Write(p)
}
func (nw *namespacedWriter) Digest() digest.Digest {
if nw.w != nil {
return nw.w.Digest()
}
return nw.expected
}
func (nw *namespacedWriter) Truncate(size int64) error {
if nw.w != nil {
return nw.w.Truncate(size)
}
return nw.createAndCopy(nw.ctx, size)
}
func (nw *namespacedWriter) createAndCopy(ctx context.Context, size int64) error {
w, err := nw.provider.Writer(ctx, nw.bref, nw.size, "")
if err != nil {
return err
}
if size > 0 {
ra, err := nw.provider.ReaderAt(ctx, nw.expected)
if err != nil {
return err
}
defer ra.Close()
if err := content.CopyReaderAt(w, ra, size); err != nil {
nw.w.Close()
nw.w = nil
return err
}
}
nw.w = w
return nil
}
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
@@ -418,9 +517,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
defer nw.l.RUnlock()
return update(ctx, nw.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, nw.namespace)
bkt := getIngestsBucket(tx, nw.namespace)
if bkt != nil {
if err := bkt.Delete([]byte(nw.ref)); err != nil {
if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
}
@@ -443,24 +542,38 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
return "", err
}
status, err := nw.Writer.Status()
if err != nil {
return "", err
}
if size != 0 && size != status.Offset {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
actual := nw.Writer.Digest()
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return "", err
var actual digest.Digest
if nw.w == nil {
if size != 0 && size != nw.size {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size)
}
if expected != "" && expected != nw.expected {
return "", errors.Errorf("%q unexpected digest", nw.ref)
}
size = nw.size
actual = nw.expected
if getBlobBucket(tx, nw.namespace, actual) != nil {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
} else {
status, err := nw.w.Status()
if err != nil {
return "", err
}
if size != 0 && size != status.Offset {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
actual = nw.w.Digest()
if err := nw.w.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return "", err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
@@ -484,12 +597,20 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
return actual, bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {
st, err := nw.Writer.Status()
func (nw *namespacedWriter) Status() (st content.Status, err error) {
if nw.w != nil {
st, err = nw.w.Status()
} else {
st.Offset = nw.size
st.Total = nw.size
st.StartedAt = nw.started
st.UpdatedAt = nw.started
st.Expected = nw.expected
}
if err == nil {
st.Ref = nw.ref
}
return st, err
return
}
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
@@ -590,13 +711,30 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
continue
}
bbkt := cbkt.Bucket(bucketKeyObjectBlob)
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
if bbkt != nil {
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
}
return nil
}); err != nil {
return err
}
}
ibkt := cbkt.Bucket(bucketKeyObjectIngests)
if ibkt != nil {
if err := ibkt.ForEach(func(ref, v []byte) error {
if v == nil {
expected := ibkt.Bucket(ref).Get(bucketKeyExpected)
if len(expected) > 0 {
seen[string(expected)] = struct{}{}
}
}
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}

View File

@@ -19,7 +19,9 @@ package metadata
import (
"bytes"
"context"
"fmt"
"path/filepath"
"sync/atomic"
"testing"
"github.com/boltdb/bolt"
@@ -45,6 +47,16 @@ func createContentStore(ctx context.Context, root string) (context.Context, cont
return nil, nil, nil, err
}
var (
count uint64
name = testsuite.Name(ctx)
)
wrap := func(ctx context.Context) (context.Context, func() error, error) {
n := atomic.AddUint64(&count, 1)
return namespaces.WithNamespace(ctx, fmt.Sprintf("%s-n%d", name, n)), func() error { return nil }, nil
}
ctx = testsuite.SetContextWrapper(ctx, wrap)
return ctx, NewDB(db, cs, nil).ContentStore(), func() error {
return db.Close()
}, nil

View File

@@ -43,7 +43,7 @@ const (
// dbVersion represents updates to the schema
// version which are additions and compatible with
// prior version of the same schema.
dbVersion = 1
dbVersion = 2
)
// DB represents a metadata database backed by a bolt

View File

@@ -102,6 +102,27 @@ func TestInit(t *testing.T) {
}
func TestMigrations(t *testing.T) {
testRefs := []struct {
ref string
bref string
}{
{
ref: "k1",
bref: "bk1",
},
{
ref: strings.Repeat("longerkey", 30), // 270 characters
bref: "short",
},
{
ref: "short",
bref: strings.Repeat("longerkey", 30), // 270 characters
},
{
ref: "emptykey",
bref: "",
},
}
migrationTests := []struct {
name string
init func(*bolt.Tx) error
@@ -199,6 +220,48 @@ func TestMigrations(t *testing.T) {
}
}
return nil
},
},
{
name: "IngestUpdate",
init: func(tx *bolt.Tx) error {
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest)
if err != nil {
return err
}
for _, s := range testRefs {
if err := bkt.Put([]byte(s.ref), []byte(s.bref)); err != nil {
return err
}
}
return nil
},
check: func(tx *bolt.Tx) error {
bkt := getIngestsBucket(tx, "testing")
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "ingests bucket not found")
}
for _, s := range testRefs {
sbkt := bkt.Bucket([]byte(s.ref))
if sbkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "ref does not exist")
}
bref := string(sbkt.Get(bucketKeyRef))
if bref != s.bref {
return errors.Errorf("unexpected reference key %q, expected %q", bref, s.bref)
}
}
dbkt := getBucket(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest)
if dbkt != nil {
return errors.New("deprecated ingest bucket still exists")
}
return nil
},
},

View File

@@ -40,6 +40,11 @@ var migrations = []migration{
version: 1,
migrate: addChildLinks,
},
{
schema: "v1",
version: 2,
migrate: migrateIngests,
},
}
// addChildLinks Adds children key to the snapshotters to enforce snapshot
@@ -99,3 +104,53 @@ func addChildLinks(tx *bolt.Tx) error {
return nil
}
// migrateIngests moves ingests from the key/value ingest bucket
// to a structured ingest bucket for storing additional state about
// an ingest.
func migrateIngests(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
bkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent)
if bkt == nil {
continue
}
dbkt := bkt.Bucket(deprecatedBucketKeyObjectIngest)
if dbkt == nil {
continue
}
// Create new ingests bucket
nbkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests)
if err != nil {
return err
}
if err := dbkt.ForEach(func(ref, bref []byte) error {
ibkt, err := nbkt.CreateBucketIfNotExists(ref)
if err != nil {
return err
}
return ibkt.Put(bucketKeyRef, bref)
}); err != nil {
return err
}
if err := bkt.DeleteBucket(deprecatedBucketKeyObjectIngest); err != nil {
return err
}
}
return nil
}