Shared content across namespaces

Update content ingests to use content from another namespace.
Ingests must be committed to make content available and the
client will see the sharing as an ingest which has already
been fully written to, but not completed.

Updated the database version to change the ingest record in
the database from a link key to an object with a link and
expected value. This expected value is used to indicate that
the content already exists and an underlying writer may
not yet exist.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2018-02-02 15:26:35 -08:00
parent b3aeba7062
commit a1a67899f8
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
8 changed files with 485 additions and 63 deletions

View File

@ -62,7 +62,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
}
// Copy copies data with the expected digest from the reader into the
// provided content store writer.
// provided content store writer. This copy commits the writer.
//
// This is useful when the digest and size are known beforehand. When
// the size or digest is unknown, these values may be empty.
@ -97,6 +97,22 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
return nil
}
// CopyReaderAt copies to a writer from a given reader at for the given
// number of bytes. This copy does not commit the writer.
func CopyReaderAt(cw Writer, ra ReaderAt, n int64) error {
ws, err := cw.Status()
if err != nil {
return err
}
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
_, err = io.CopyBuffer(cw, io.NewSectionReader(ra, ws.Offset, n), *buf)
return err
}
// seekReader attempts to seek the reader to the given offset, either by
// resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding
// up to the given offset.

View File

@ -31,6 +31,9 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r
t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker)))
t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt)))
t.Run("Labels", makeTest(t, name, storeFn, checkLabels))
t.Run("CrossNamespaceAppend", makeTest(t, name, storeFn, checkCrossNSAppend))
t.Run("CrossNamespaceShare", makeTest(t, name, storeFn, checkCrossNSShare))
}
// ContextWrapper is used to decorate new context used inside the test
@ -504,6 +507,124 @@ func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size
return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed")
}
func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) {
wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper)
if !ok {
t.Skip("multiple contexts not supported")
}
var size int64 = 1000
b, d := createContent(size)
ref := fmt.Sprintf("ref-%d", size)
t1 := time.Now()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil {
t.Fatal(err)
}
ctx2, done, err := wrap(context.Background())
if err != nil {
t.Fatal(err)
}
defer done()
w, err := cs.Writer(ctx2, ref, size, d)
if err != nil {
t.Fatal(err)
}
t2 := time.Now()
checkStatus(t, w, content.Status{
Ref: ref,
Offset: size,
Total: size,
}, d, t1, t2, t1, t2)
if err := w.Commit(ctx2, size, d); err != nil {
t.Fatal(err)
}
t3 := time.Now()
info := content.Info{
Digest: d,
Size: size,
}
if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil {
t.Fatal(err)
}
if err := checkContent(ctx2, cs, d, info, t1, t3, t1, t3); err != nil {
t.Fatal(err)
}
}
func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper)
if !ok {
t.Skip("multiple contexts not supported")
}
var size int64 = 1000
b, d := createContent(size)
ref := fmt.Sprintf("ref-%d", size)
t1 := time.Now()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil {
t.Fatal(err)
}
ctx2, done, err := wrap(context.Background())
if err != nil {
t.Fatal(err)
}
defer done()
extra := []byte("appended bytes")
size2 := size + int64(len(extra))
b2 := make([]byte, size2)
copy(b2[:size], b)
copy(b2[size:], extra)
d2 := digest.FromBytes(b2)
w, err := cs.Writer(ctx2, ref, size, d)
if err != nil {
t.Fatal(err)
}
t2 := time.Now()
checkStatus(t, w, content.Status{
Ref: ref,
Offset: size,
Total: size,
}, d, t1, t2, t1, t2)
if _, err := w.Write(extra); err != nil {
t.Fatal(err)
}
if err := w.Commit(ctx2, size2, d2); err != nil {
t.Fatal(err)
}
t3 := time.Now()
info := content.Info{
Digest: d,
Size: size,
}
if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil {
t.Fatal(err)
}
info2 := content.Info{
Digest: d2,
Size: size2,
}
if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil {
t.Fatal(err)
}
}
func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) {
t.Helper()
st, err := w.Status()
@ -585,6 +706,27 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected
return nil
}
func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {
if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil {
return err
}
b, err := content.ReadBlob(ctx, cs, d)
if err != nil {
return errors.Wrap(err, "failed to read blob")
}
if int64(len(b)) != expected.Size {
return errors.Errorf("wrong blob size %d, expected %d", len(b), expected.Size)
}
actual := digest.FromBytes(b)
if actual != d {
return errors.Errorf("wrong digest %s, expected %s", actual, d)
}
return nil
}
var contentSeed int64

View File

@ -36,7 +36,7 @@ var (
bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references
bucketKeyObjectContent = []byte("content") // stores content references
bucketKeyObjectBlob = []byte("blob") // stores content links
bucketKeyObjectIngest = []byte("ingest") // stores ingest links
bucketKeyObjectIngests = []byte("ingests") // stores ingest objects
bucketKeyObjectLeases = []byte("leases") // stores leases
bucketKeyDigest = []byte("digest")
@ -54,6 +54,10 @@ var (
bucketKeyTarget = []byte("target")
bucketKeyExtensions = []byte("extensions")
bucketKeyCreatedAt = []byte("createdat")
bucketKeyExpected = []byte("expected")
bucketKeyRef = []byte("ref")
deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2
)
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
@ -162,14 +166,18 @@ func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Buck
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String()))
}
func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest)
func getIngestsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests)
}
func createIngestBucket(tx *bolt.Tx, namespace, ref string) (*bolt.Bucket, error) {
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref))
if err != nil {
return nil, err
}
return bkt, nil
}
func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest)
func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref))
}

View File

@ -210,14 +210,16 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte
brefs := map[string]string{}
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
bkt := getIngestsBucket(tx, ns)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
// TODO(dmcgowan): match name and potentially labels here
brefs[string(k)] = string(v)
if v == nil {
// TODO(dmcgowan): match name and potentially labels here
brefs[string(k)] = string(bkt.Bucket(k).Get(bucketKeyRef))
}
return nil
})
}); err != nil {
@ -245,11 +247,11 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte
}
func getRef(tx *bolt.Tx, ns, ref string) string {
bkt := getIngestBucket(tx, ns)
bkt := getIngestBucket(tx, ns, ref)
if bkt == nil {
return ""
}
v := bkt.Get([]byte(ref))
v := bkt.Get(bucketKeyRef)
if len(v) == 0 {
return ""
}
@ -292,19 +294,29 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
ibkt := getIngestsBucket(tx, ns)
if ibkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
bkt := ibkt.Bucket([]byte(ref))
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
bref := string(bkt.Get([]byte(ref)))
bref := string(bkt.Get(bucketKeyRef))
if bref == "" {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
if err := bkt.Delete([]byte(ref)); err != nil {
expected := string(bkt.Get(bucketKeyExpected))
if err := ibkt.DeleteBucket([]byte(ref)); err != nil {
return err
}
return cs.Store.Abort(ctx, bref)
// if not shared content, delete active ingest on backend
if expected == "" {
return cs.Store.Abort(ctx, bref)
}
return nil
})
}
@ -321,8 +333,10 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
var (
w content.Writer
exists bool
bref string
)
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
var shared bool
if expected != "" {
cbkt := getBlobBucket(tx, ns, expected)
if cbkt != nil {
@ -336,18 +350,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
exists = true
return nil
}
if st, err := cs.Store.Info(ctx, expected); err == nil {
// Ensure the expected size is the same, it is likely
// an error if the size is mismatched but the caller
// must resolve this on commit
if size == 0 || size == st.Size {
shared = true
size = st.Size
}
}
}
bkt, err := createIngestBucket(tx, ns)
bkt, err := createIngestBucket(tx, ns, ref)
if err != nil {
return err
}
var (
bref string
brefb = bkt.Get([]byte(ref))
)
brefb := bkt.Get(bucketKeyRef)
if brefb == nil {
sid, err := bkt.NextSequence()
if err != nil {
@ -355,21 +375,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
}
bref = createKey(sid, ns, ref)
if err := bkt.Put([]byte(ref), []byte(bref)); err != nil {
if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil {
return err
}
} else {
bref = string(brefb)
}
// Do not use the passed in expected value here since it was
// already checked against the user metadata. If the content
// store has the content, it must still be written before
// linked into the given namespace. It is possible in the future
// to allow content which exists in content store but not
// namespace to be linked here and returned an exist error, but
// this would require more configuration to make secure.
w, err = cs.Store.Writer(ctx, bref, size, "")
if shared {
if err := bkt.Put(bucketKeyExpected, []byte(expected)); err != nil {
return err
}
} else {
// Do not use the passed in expected value here since it was
// already checked against the user metadata. The content must
// be committed in the namespace before it will be seen as
// available in the current namespace.
w, err = cs.Store.Writer(ctx, bref, size, "")
}
return err
}); err != nil {
return nil, err
@ -378,23 +401,99 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
}
// TODO: keep the expected in the writer to use on commit
// when no expected is provided there.
return &namespacedWriter{
Writer: w,
ctx: ctx,
ref: ref,
namespace: ns,
db: cs.db,
provider: cs.Store,
l: &cs.l,
w: w,
bref: bref,
started: time.Now(),
expected: expected,
size: size,
}, nil
}
type namespacedWriter struct {
content.Writer
ctx context.Context
ref string
namespace string
db transactor
l *sync.RWMutex
provider interface {
content.Provider
content.Ingester
}
l *sync.RWMutex
w content.Writer
bref string
started time.Time
expected digest.Digest
size int64
}
func (nw *namespacedWriter) Close() error {
if nw.w != nil {
return nw.w.Close()
}
return nil
}
func (nw *namespacedWriter) Write(p []byte) (int, error) {
// if no writer, first copy and unshare before performing write
if nw.w == nil {
if len(p) == 0 {
return 0, nil
}
if err := nw.createAndCopy(nw.ctx, nw.size); err != nil {
return 0, err
}
}
return nw.w.Write(p)
}
func (nw *namespacedWriter) Digest() digest.Digest {
if nw.w != nil {
return nw.w.Digest()
}
return nw.expected
}
func (nw *namespacedWriter) Truncate(size int64) error {
if nw.w != nil {
return nw.w.Truncate(size)
}
return nw.createAndCopy(nw.ctx, size)
}
func (nw *namespacedWriter) createAndCopy(ctx context.Context, size int64) error {
w, err := nw.provider.Writer(ctx, nw.bref, nw.size, "")
if err != nil {
return err
}
if size > 0 {
ra, err := nw.provider.ReaderAt(ctx, nw.expected)
if err != nil {
return err
}
defer ra.Close()
if err := content.CopyReaderAt(w, ra, size); err != nil {
nw.w.Close()
nw.w = nil
return err
}
}
nw.w = w
return nil
}
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
@ -402,9 +501,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
defer nw.l.RUnlock()
return update(ctx, nw.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, nw.namespace)
bkt := getIngestsBucket(tx, nw.namespace)
if bkt != nil {
if err := bkt.Delete([]byte(nw.ref)); err != nil {
if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
}
@ -427,24 +526,38 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
return "", err
}
status, err := nw.Writer.Status()
if err != nil {
return "", err
}
if size != 0 && size != status.Offset {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
actual := nw.Writer.Digest()
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return "", err
var actual digest.Digest
if nw.w == nil {
if size != 0 && size != nw.size {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size)
}
if expected != "" && expected != nw.expected {
return "", errors.Errorf("%q unexpected digest", nw.ref)
}
size = nw.size
actual = nw.expected
if getBlobBucket(tx, nw.namespace, actual) != nil {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
} else {
status, err := nw.w.Status()
if err != nil {
return "", err
}
if size != 0 && size != status.Offset {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
actual = nw.w.Digest()
if err := nw.w.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return "", err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
@ -468,12 +581,20 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
return actual, bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {
st, err := nw.Writer.Status()
func (nw *namespacedWriter) Status() (st content.Status, err error) {
if nw.w != nil {
st, err = nw.w.Status()
} else {
st.Offset = nw.size
st.Total = nw.size
st.StartedAt = nw.started
st.UpdatedAt = nw.started
st.Expected = nw.expected
}
if err == nil {
st.Ref = nw.ref
}
return st, err
return
}
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
@ -574,13 +695,30 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
continue
}
bbkt := cbkt.Bucket(bucketKeyObjectBlob)
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
if bbkt != nil {
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
}
return nil
}); err != nil {
return err
}
}
ibkt := cbkt.Bucket(bucketKeyObjectIngests)
if ibkt != nil {
if err := ibkt.ForEach(func(ref, v []byte) error {
if v == nil {
expected := ibkt.Bucket(ref).Get(bucketKeyExpected)
if len(expected) > 0 {
seen[string(expected)] = struct{}{}
}
}
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}

View File

@ -27,7 +27,7 @@ const (
// dbVersion represents updates to the schema
// version which are additions and compatible with
// prior version of the same schema.
dbVersion = 1
dbVersion = 2
)
// DB represents a metadata database backed by a bolt

View File

@ -86,6 +86,27 @@ func TestInit(t *testing.T) {
}
func TestMigrations(t *testing.T) {
testRefs := []struct {
ref string
bref string
}{
{
ref: "k1",
bref: "bk1",
},
{
ref: strings.Repeat("longerkey", 30), // 270 characters
bref: "short",
},
{
ref: "short",
bref: strings.Repeat("longerkey", 30), // 270 characters
},
{
ref: "emptykey",
bref: "",
},
}
migrationTests := []struct {
name string
init func(*bolt.Tx) error
@ -183,6 +204,48 @@ func TestMigrations(t *testing.T) {
}
}
return nil
},
},
{
name: "IngestUpdate",
init: func(tx *bolt.Tx) error {
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest)
if err != nil {
return err
}
for _, s := range testRefs {
if err := bkt.Put([]byte(s.ref), []byte(s.bref)); err != nil {
return err
}
}
return nil
},
check: func(tx *bolt.Tx) error {
bkt := getIngestsBucket(tx, "testing")
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "ingests bucket not found")
}
for _, s := range testRefs {
sbkt := bkt.Bucket([]byte(s.ref))
if sbkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "ref does not exist")
}
bref := string(sbkt.Get(bucketKeyRef))
if bref != s.bref {
return errors.Errorf("unexpected reference key %q, expected %q", bref, s.bref)
}
}
dbkt := getBucket(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest)
if dbkt != nil {
return errors.New("deprecated ingest bucket still exists")
}
return nil
},
},

View File

@ -24,6 +24,11 @@ var migrations = []migration{
version: 1,
migrate: addChildLinks,
},
{
schema: "v1",
version: 2,
migrate: migrateIngests,
},
}
// addChildLinks Adds children key to the snapshotters to enforce snapshot
@ -83,3 +88,53 @@ func addChildLinks(tx *bolt.Tx) error {
return nil
}
// migrateIngests moves ingests from the key/value ingest bucket
// to a structured ingest bucket for storing additional state about
// an ingest.
func migrateIngests(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
bkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent)
if bkt == nil {
continue
}
dbkt := bkt.Bucket(deprecatedBucketKeyObjectIngest)
if dbkt == nil {
continue
}
// Create new ingests bucket
nbkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests)
if err != nil {
return err
}
if err := dbkt.ForEach(func(ref, bref []byte) error {
ibkt, err := nbkt.CreateBucketIfNotExists(ref)
if err != nil {
return err
}
return ibkt.Put(bucketKeyRef, bref)
}); err != nil {
return err
}
if err := bkt.DeleteBucket(deprecatedBucketKeyObjectIngest); err != nil {
return err
}
}
return nil
}

View File

@ -356,7 +356,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) {
// users use the same writer style for each with a minimum of overhead.
if req.Expected != "" {
if expected != "" && expected != req.Expected {
return status.Errorf(codes.InvalidArgument, "inconsistent digest provided: %v != %v", req.Expected, expected)
log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected)
}
expected = req.Expected
@ -373,7 +373,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) {
// Update the expected total. Typically, this could be seen at
// negotiation time or on a commit message.
if total > 0 && req.Total != total {
return status.Errorf(codes.InvalidArgument, "inconsistent total provided: %v != %v", req.Total, total)
log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total)
}
total = req.Total
}