diff --git a/content/helpers.go b/content/helpers.go index c125e2a2b..974ff9f38 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -62,7 +62,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } // Copy copies data with the expected digest from the reader into the -// provided content store writer. +// provided content store writer. This copy commits the writer. // // This is useful when the digest and size are known beforehand. When // the size or digest is unknown, these values may be empty. @@ -97,6 +97,22 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige return nil } +// CopyReaderAt copies to a writer from a given reader at for the given +// number of bytes. This copy does not commit the writer. +func CopyReaderAt(cw Writer, ra ReaderAt, n int64) error { + ws, err := cw.Status() + if err != nil { + return err + } + + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + + _, err = io.CopyBuffer(cw, io.NewSectionReader(ra, ws.Offset, n), *buf) + + return err +} + // seekReader attempts to seek the reader to the given offset, either by // resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding // up to the given offset. diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index a1ad98a3a..cf57309a1 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -31,6 +31,9 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker))) t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt))) t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) + + t.Run("CrossNamespaceAppend", makeTest(t, name, storeFn, checkCrossNSAppend)) + t.Run("CrossNamespaceShare", makeTest(t, name, storeFn, checkCrossNSShare)) } // ContextWrapper is used to decorate new context used inside the test @@ -504,6 +507,124 @@ func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") } +func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) { + wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper) + if !ok { + t.Skip("multiple contexts not supported") + } + + var size int64 = 1000 + b, d := createContent(size) + ref := fmt.Sprintf("ref-%d", size) + t1 := time.Now() + + if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { + t.Fatal(err) + } + + ctx2, done, err := wrap(context.Background()) + if err != nil { + t.Fatal(err) + } + defer done() + + w, err := cs.Writer(ctx2, ref, size, d) + if err != nil { + t.Fatal(err) + } + t2 := time.Now() + + checkStatus(t, w, content.Status{ + Ref: ref, + Offset: size, + Total: size, + }, d, t1, t2, t1, t2) + + if err := w.Commit(ctx2, size, d); err != nil { + t.Fatal(err) + } + t3 := time.Now() + + info := content.Info{ + Digest: d, + Size: size, + } + if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + + if err := checkContent(ctx2, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } +} + +func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) { + wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper) + if !ok { + t.Skip("multiple contexts not supported") + } + + var size int64 = 1000 + b, d := createContent(size) + ref := fmt.Sprintf("ref-%d", size) + t1 := time.Now() + + if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { + t.Fatal(err) + } + + ctx2, done, err := wrap(context.Background()) + if err != nil { + t.Fatal(err) + } + defer done() + + extra := []byte("appended bytes") + size2 := size + int64(len(extra)) + b2 := make([]byte, size2) + copy(b2[:size], b) + copy(b2[size:], extra) + d2 := digest.FromBytes(b2) + + w, err := cs.Writer(ctx2, ref, size, d) + if err != nil { + t.Fatal(err) + } + t2 := time.Now() + + checkStatus(t, w, content.Status{ + Ref: ref, + Offset: size, + Total: size, + }, d, t1, t2, t1, t2) + + if _, err := w.Write(extra); err != nil { + t.Fatal(err) + } + + if err := w.Commit(ctx2, size2, d2); err != nil { + t.Fatal(err) + } + t3 := time.Now() + + info := content.Info{ + Digest: d, + Size: size, + } + if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + + info2 := content.Info{ + Digest: d2, + Size: size2, + } + if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + +} + func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { t.Helper() st, err := w.Status() @@ -585,6 +706,27 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected return nil } +func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error { + if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil { + return err + } + + b, err := content.ReadBlob(ctx, cs, d) + if err != nil { + return errors.Wrap(err, "failed to read blob") + } + + if int64(len(b)) != expected.Size { + return errors.Errorf("wrong blob size %d, expected %d", len(b), expected.Size) + } + + actual := digest.FromBytes(b) + if actual != d { + return errors.Errorf("wrong digest %s, expected %s", actual, d) + } + + return nil +} var contentSeed int64 diff --git a/metadata/buckets.go b/metadata/buckets.go index 45309724f..80a8010e1 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -36,7 +36,7 @@ var ( bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references bucketKeyObjectContent = []byte("content") // stores content references bucketKeyObjectBlob = []byte("blob") // stores content links - bucketKeyObjectIngest = []byte("ingest") // stores ingest links + bucketKeyObjectIngests = []byte("ingests") // stores ingest objects bucketKeyObjectLeases = []byte("leases") // stores leases bucketKeyDigest = []byte("digest") @@ -54,6 +54,10 @@ var ( bucketKeyTarget = []byte("target") bucketKeyExtensions = []byte("extensions") bucketKeyCreatedAt = []byte("createdat") + bucketKeyExpected = []byte("expected") + bucketKeyRef = []byte("ref") + + deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2 ) func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket { @@ -162,14 +166,18 @@ func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Buck return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) } -func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { - bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) +func getIngestsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests) +} + +func createIngestBucket(tx *bolt.Tx, namespace, ref string) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref)) if err != nil { return nil, err } return bkt, nil } -func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { - return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) +func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref)) } diff --git a/metadata/content.go b/metadata/content.go index 57eabf307..8b88a6c81 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -210,14 +210,16 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte brefs := map[string]string{} if err := view(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, ns) + bkt := getIngestsBucket(tx, ns) if bkt == nil { return nil } return bkt.ForEach(func(k, v []byte) error { - // TODO(dmcgowan): match name and potentially labels here - brefs[string(k)] = string(v) + if v == nil { + // TODO(dmcgowan): match name and potentially labels here + brefs[string(k)] = string(bkt.Bucket(k).Get(bucketKeyRef)) + } return nil }) }); err != nil { @@ -245,11 +247,11 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte } func getRef(tx *bolt.Tx, ns, ref string) string { - bkt := getIngestBucket(tx, ns) + bkt := getIngestBucket(tx, ns, ref) if bkt == nil { return "" } - v := bkt.Get([]byte(ref)) + v := bkt.Get(bucketKeyRef) if len(v) == 0 { return "" } @@ -292,19 +294,29 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error { defer cs.l.RUnlock() return update(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, ns) + ibkt := getIngestsBucket(tx, ns) + if ibkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + bkt := ibkt.Bucket([]byte(ref)) if bkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) } - bref := string(bkt.Get([]byte(ref))) + bref := string(bkt.Get(bucketKeyRef)) if bref == "" { return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) } - if err := bkt.Delete([]byte(ref)); err != nil { + expected := string(bkt.Get(bucketKeyExpected)) + if err := ibkt.DeleteBucket([]byte(ref)); err != nil { return err } - return cs.Store.Abort(ctx, bref) + // if not shared content, delete active ingest on backend + if expected == "" { + return cs.Store.Abort(ctx, bref) + } + + return nil }) } @@ -321,8 +333,10 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe var ( w content.Writer exists bool + bref string ) if err := update(ctx, cs.db, func(tx *bolt.Tx) error { + var shared bool if expected != "" { cbkt := getBlobBucket(tx, ns, expected) if cbkt != nil { @@ -336,18 +350,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe exists = true return nil } + + if st, err := cs.Store.Info(ctx, expected); err == nil { + // Ensure the expected size is the same, it is likely + // an error if the size is mismatched but the caller + // must resolve this on commit + if size == 0 || size == st.Size { + shared = true + size = st.Size + } + } } - bkt, err := createIngestBucket(tx, ns) + bkt, err := createIngestBucket(tx, ns, ref) if err != nil { return err } - var ( - bref string - brefb = bkt.Get([]byte(ref)) - ) - + brefb := bkt.Get(bucketKeyRef) if brefb == nil { sid, err := bkt.NextSequence() if err != nil { @@ -355,21 +375,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe } bref = createKey(sid, ns, ref) - if err := bkt.Put([]byte(ref), []byte(bref)); err != nil { + if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil { return err } } else { bref = string(brefb) } - // Do not use the passed in expected value here since it was - // already checked against the user metadata. If the content - // store has the content, it must still be written before - // linked into the given namespace. It is possible in the future - // to allow content which exists in content store but not - // namespace to be linked here and returned an exist error, but - // this would require more configuration to make secure. - w, err = cs.Store.Writer(ctx, bref, size, "") + if shared { + if err := bkt.Put(bucketKeyExpected, []byte(expected)); err != nil { + return err + } + } else { + // Do not use the passed in expected value here since it was + // already checked against the user metadata. The content must + // be committed in the namespace before it will be seen as + // available in the current namespace. + w, err = cs.Store.Writer(ctx, bref, size, "") + } return err }); err != nil { return nil, err @@ -378,23 +401,99 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) } - // TODO: keep the expected in the writer to use on commit - // when no expected is provided there. return &namespacedWriter{ - Writer: w, + ctx: ctx, ref: ref, namespace: ns, db: cs.db, + provider: cs.Store, l: &cs.l, + w: w, + bref: bref, + started: time.Now(), + expected: expected, + size: size, }, nil } type namespacedWriter struct { - content.Writer + ctx context.Context ref string namespace string db transactor - l *sync.RWMutex + provider interface { + content.Provider + content.Ingester + } + l *sync.RWMutex + + w content.Writer + + bref string + started time.Time + expected digest.Digest + size int64 +} + +func (nw *namespacedWriter) Close() error { + if nw.w != nil { + return nw.w.Close() + } + return nil +} + +func (nw *namespacedWriter) Write(p []byte) (int, error) { + // if no writer, first copy and unshare before performing write + if nw.w == nil { + if len(p) == 0 { + return 0, nil + } + + if err := nw.createAndCopy(nw.ctx, nw.size); err != nil { + return 0, err + } + } + + return nw.w.Write(p) +} + +func (nw *namespacedWriter) Digest() digest.Digest { + if nw.w != nil { + return nw.w.Digest() + } + return nw.expected +} + +func (nw *namespacedWriter) Truncate(size int64) error { + if nw.w != nil { + return nw.w.Truncate(size) + } + + return nw.createAndCopy(nw.ctx, size) +} + +func (nw *namespacedWriter) createAndCopy(ctx context.Context, size int64) error { + w, err := nw.provider.Writer(ctx, nw.bref, nw.size, "") + if err != nil { + return err + } + + if size > 0 { + ra, err := nw.provider.ReaderAt(ctx, nw.expected) + if err != nil { + return err + } + defer ra.Close() + + if err := content.CopyReaderAt(w, ra, size); err != nil { + nw.w.Close() + nw.w = nil + return err + } + } + nw.w = w + + return nil } func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { @@ -402,9 +501,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig defer nw.l.RUnlock() return update(ctx, nw.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, nw.namespace) + bkt := getIngestsBucket(tx, nw.namespace) if bkt != nil { - if err := bkt.Delete([]byte(nw.ref)); err != nil { + if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound { return err } } @@ -427,24 +526,38 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, return "", err } - status, err := nw.Writer.Status() - if err != nil { - return "", err - } - if size != 0 && size != status.Offset { - return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) - } - size = status.Offset - - actual := nw.Writer.Digest() - - if err := nw.Writer.Commit(ctx, size, expected); err != nil { - if !errdefs.IsAlreadyExists(err) { - return "", err + var actual digest.Digest + if nw.w == nil { + if size != 0 && size != nw.size { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size) } + if expected != "" && expected != nw.expected { + return "", errors.Errorf("%q unexpected digest", nw.ref) + } + size = nw.size + actual = nw.expected if getBlobBucket(tx, nw.namespace, actual) != nil { return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) } + } else { + status, err := nw.w.Status() + if err != nil { + return "", err + } + if size != 0 && size != status.Offset { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) + } + size = status.Offset + actual = nw.w.Digest() + + if err := nw.w.Commit(ctx, size, expected); err != nil { + if !errdefs.IsAlreadyExists(err) { + return "", err + } + if getBlobBucket(tx, nw.namespace, actual) != nil { + return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) + } + } } bkt, err := createBlobBucket(tx, nw.namespace, actual) @@ -468,12 +581,20 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, return actual, bkt.Put(bucketKeySize, sizeEncoded) } -func (nw *namespacedWriter) Status() (content.Status, error) { - st, err := nw.Writer.Status() +func (nw *namespacedWriter) Status() (st content.Status, err error) { + if nw.w != nil { + st, err = nw.w.Status() + } else { + st.Offset = nw.size + st.Total = nw.size + st.StartedAt = nw.started + st.UpdatedAt = nw.started + st.Expected = nw.expected + } if err == nil { st.Ref = nw.ref } - return st, err + return } func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { @@ -574,13 +695,30 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er continue } bbkt := cbkt.Bucket(bucketKeyObjectBlob) - if err := bbkt.ForEach(func(ck, cv []byte) error { - if cv == nil { - seen[string(ck)] = struct{}{} + if bbkt != nil { + if err := bbkt.ForEach(func(ck, cv []byte) error { + if cv == nil { + seen[string(ck)] = struct{}{} + } + return nil + }); err != nil { + return err + } + } + + ibkt := cbkt.Bucket(bucketKeyObjectIngests) + if ibkt != nil { + if err := ibkt.ForEach(func(ref, v []byte) error { + if v == nil { + expected := ibkt.Bucket(ref).Get(bucketKeyExpected) + if len(expected) > 0 { + seen[string(expected)] = struct{}{} + } + } + return nil + }); err != nil { + return err } - return nil - }); err != nil { - return err } } diff --git a/metadata/db.go b/metadata/db.go index 5e830ad82..5601b919e 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -27,7 +27,7 @@ const ( // dbVersion represents updates to the schema // version which are additions and compatible with // prior version of the same schema. - dbVersion = 1 + dbVersion = 2 ) // DB represents a metadata database backed by a bolt diff --git a/metadata/db_test.go b/metadata/db_test.go index 54519436e..879139ff8 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -86,6 +86,27 @@ func TestInit(t *testing.T) { } func TestMigrations(t *testing.T) { + testRefs := []struct { + ref string + bref string + }{ + { + ref: "k1", + bref: "bk1", + }, + { + ref: strings.Repeat("longerkey", 30), // 270 characters + bref: "short", + }, + { + ref: "short", + bref: strings.Repeat("longerkey", 30), // 270 characters + }, + { + ref: "emptykey", + bref: "", + }, + } migrationTests := []struct { name string init func(*bolt.Tx) error @@ -183,6 +204,48 @@ func TestMigrations(t *testing.T) { } } + return nil + }, + }, + { + name: "IngestUpdate", + init: func(tx *bolt.Tx) error { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest) + if err != nil { + return err + } + + for _, s := range testRefs { + if err := bkt.Put([]byte(s.ref), []byte(s.bref)); err != nil { + return err + } + } + + return nil + }, + check: func(tx *bolt.Tx) error { + bkt := getIngestsBucket(tx, "testing") + if bkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "ingests bucket not found") + } + + for _, s := range testRefs { + sbkt := bkt.Bucket([]byte(s.ref)) + if sbkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "ref does not exist") + } + + bref := string(sbkt.Get(bucketKeyRef)) + if bref != s.bref { + return errors.Errorf("unexpected reference key %q, expected %q", bref, s.bref) + } + } + + dbkt := getBucket(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest) + if dbkt != nil { + return errors.New("deprecated ingest bucket still exists") + } + return nil }, }, diff --git a/metadata/migrations.go b/metadata/migrations.go index 997aee298..61e5b3185 100644 --- a/metadata/migrations.go +++ b/metadata/migrations.go @@ -24,6 +24,11 @@ var migrations = []migration{ version: 1, migrate: addChildLinks, }, + { + schema: "v1", + version: 2, + migrate: migrateIngests, + }, } // addChildLinks Adds children key to the snapshotters to enforce snapshot @@ -83,3 +88,53 @@ func addChildLinks(tx *bolt.Tx) error { return nil } + +// migrateIngests moves ingests from the key/value ingest bucket +// to a structured ingest bucket for storing additional state about +// an ingest. +func migrateIngests(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + bkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent) + if bkt == nil { + continue + } + + dbkt := bkt.Bucket(deprecatedBucketKeyObjectIngest) + if dbkt == nil { + continue + } + + // Create new ingests bucket + nbkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests) + if err != nil { + return err + } + + if err := dbkt.ForEach(func(ref, bref []byte) error { + ibkt, err := nbkt.CreateBucketIfNotExists(ref) + if err != nil { + return err + } + return ibkt.Put(bucketKeyRef, bref) + }); err != nil { + return err + } + + if err := bkt.DeleteBucket(deprecatedBucketKeyObjectIngest); err != nil { + return err + } + } + + return nil +} diff --git a/services/content/service.go b/services/content/service.go index 8a55ba2f3..036db82db 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -356,7 +356,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { // users use the same writer style for each with a minimum of overhead. if req.Expected != "" { if expected != "" && expected != req.Expected { - return status.Errorf(codes.InvalidArgument, "inconsistent digest provided: %v != %v", req.Expected, expected) + log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected) } expected = req.Expected @@ -373,7 +373,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { // Update the expected total. Typically, this could be seen at // negotiation time or on a commit message. if total > 0 && req.Total != total { - return status.Errorf(codes.InvalidArgument, "inconsistent total provided: %v != %v", req.Total, total) + log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total) } total = req.Total }