diff --git a/content/helpers.go b/content/helpers.go index ac0b5a3db..a093c210b 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -78,7 +78,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } // Copy copies data with the expected digest from the reader into the -// provided content store writer. +// provided content store writer. This copy commits the writer. // // This is useful when the digest and size are known beforehand. When // the size or digest is unknown, these values may be empty. @@ -113,6 +113,22 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige return nil } +// CopyReaderAt copies to a writer from a given reader at for the given +// number of bytes. This copy does not commit the writer. +func CopyReaderAt(cw Writer, ra ReaderAt, n int64) error { + ws, err := cw.Status() + if err != nil { + return err + } + + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + + _, err = io.CopyBuffer(cw, io.NewSectionReader(ra, ws.Offset, n), *buf) + + return err +} + // seekReader attempts to seek the reader to the given offset, either by // resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding // up to the given offset. diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 38bdbd9e5..0f77f59b2 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -30,7 +30,6 @@ import ( "time" "github.com/containerd/containerd/content" - "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/testutil" "github.com/gotestyourself/gotestyourself/assert" digest "github.com/opencontainers/go-digest" @@ -48,11 +47,38 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker))) t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt))) t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) + + t.Run("CrossNamespaceAppend", makeTest(t, name, storeFn, checkCrossNSAppend)) + t.Run("CrossNamespaceShare", makeTest(t, name, storeFn, checkCrossNSShare)) +} + +// ContextWrapper is used to decorate new context used inside the test +// before using the context on the content store. +// This can be used to support leasing and multiple namespaces tests. +type ContextWrapper func(ctx context.Context) (context.Context, func() error, error) + +type wrapperKey struct{} + +// SetContextWrapper sets the wrapper on the context for deriving +// new test contexts from the context. +func SetContextWrapper(ctx context.Context, w ContextWrapper) context.Context { + return context.WithValue(ctx, wrapperKey{}, w) +} + +type nameKey struct{} + +// Name gets the test name from the context +func Name(ctx context.Context) string { + name, ok := ctx.Value(nameKey{}).(string) + if !ok { + return "" + } + return name } func makeTest(t *testing.T, name string, storeFn func(ctx context.Context, root string) (context.Context, content.Store, func() error, error), fn func(ctx context.Context, t *testing.T, cs content.Store)) func(t *testing.T) { return func(t *testing.T) { - ctx := namespaces.WithNamespace(context.Background(), name) + ctx := context.WithValue(context.Background(), nameKey{}, name) tmpDir, err := ioutil.TempDir("", "content-suite-"+name+"-") if err != nil { @@ -70,6 +96,20 @@ func makeTest(t *testing.T, name string, storeFn func(ctx context.Context, root } }() + w, ok := ctx.Value(wrapperKey{}).(ContextWrapper) + if ok { + var done func() error + ctx, done, err = w(ctx) + if err != nil { + t.Fatalf("Error wrapping context: %+v", err) + } + defer func() { + if err := done(); err != nil && !t.Failed() { + t.Fatalf("Wrapper release failed: %+v", err) + } + }() + } + defer testutil.DumpDirOnFailure(t, tmpDir) fn(ctx, t, cs) } @@ -483,6 +523,124 @@ func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") } +func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) { + wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper) + if !ok { + t.Skip("multiple contexts not supported") + } + + var size int64 = 1000 + b, d := createContent(size) + ref := fmt.Sprintf("ref-%d", size) + t1 := time.Now() + + if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { + t.Fatal(err) + } + + ctx2, done, err := wrap(context.Background()) + if err != nil { + t.Fatal(err) + } + defer done() + + w, err := cs.Writer(ctx2, ref, size, d) + if err != nil { + t.Fatal(err) + } + t2 := time.Now() + + checkStatus(t, w, content.Status{ + Ref: ref, + Offset: size, + Total: size, + }, d, t1, t2, t1, t2) + + if err := w.Commit(ctx2, size, d); err != nil { + t.Fatal(err) + } + t3 := time.Now() + + info := content.Info{ + Digest: d, + Size: size, + } + if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + + if err := checkContent(ctx2, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } +} + +func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) { + wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper) + if !ok { + t.Skip("multiple contexts not supported") + } + + var size int64 = 1000 + b, d := createContent(size) + ref := fmt.Sprintf("ref-%d", size) + t1 := time.Now() + + if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { + t.Fatal(err) + } + + ctx2, done, err := wrap(context.Background()) + if err != nil { + t.Fatal(err) + } + defer done() + + extra := []byte("appended bytes") + size2 := size + int64(len(extra)) + b2 := make([]byte, size2) + copy(b2[:size], b) + copy(b2[size:], extra) + d2 := digest.FromBytes(b2) + + w, err := cs.Writer(ctx2, ref, size, d) + if err != nil { + t.Fatal(err) + } + t2 := time.Now() + + checkStatus(t, w, content.Status{ + Ref: ref, + Offset: size, + Total: size, + }, d, t1, t2, t1, t2) + + if _, err := w.Write(extra); err != nil { + t.Fatal(err) + } + + if err := w.Commit(ctx2, size2, d2); err != nil { + t.Fatal(err) + } + t3 := time.Now() + + info := content.Info{ + Digest: d, + Size: size, + } + if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + + info2 := content.Info{ + Digest: d2, + Size: size2, + } + if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + +} + func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { t.Helper() st, err := w.Status() @@ -564,6 +722,27 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected return nil } +func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error { + if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil { + return err + } + + b, err := content.ReadBlob(ctx, cs, d) + if err != nil { + return errors.Wrap(err, "failed to read blob") + } + + if int64(len(b)) != expected.Size { + return errors.Errorf("wrong blob size %d, expected %d", len(b), expected.Size) + } + + actual := digest.FromBytes(b) + if actual != d { + return errors.Errorf("wrong digest %s, expected %s", actual, d) + } + + return nil +} var contentSeed int64 diff --git a/content_test.go b/content_test.go index 87f8dcae6..c7112e683 100644 --- a/content_test.go +++ b/content_test.go @@ -18,11 +18,14 @@ package containerd import ( "context" + "fmt" + "sync/atomic" "testing" "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/testsuite" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" "github.com/pkg/errors" ) @@ -31,33 +34,48 @@ func newContentStore(ctx context.Context, root string) (context.Context, content if err != nil { return nil, nil, nil, err } - ctx, releaselease, err := client.WithLease(ctx) - if err != nil { - return nil, nil, nil, err + + var ( + count uint64 + cs = client.ContentStore() + name = testsuite.Name(ctx) + ) + + wrap := func(ctx context.Context) (context.Context, func() error, error) { + n := atomic.AddUint64(&count, 1) + ctx = namespaces.WithNamespace(ctx, fmt.Sprintf("%s-n%d", name, n)) + return client.WithLease(ctx) } - cs := client.ContentStore() + + ctx = testsuite.SetContextWrapper(ctx, wrap) return ctx, cs, func() error { - statuses, err := cs.ListStatuses(ctx) - if err != nil { - return err - } - for _, st := range statuses { - if err := cs.Abort(ctx, st.Ref); err != nil { - return errors.Wrapf(err, "failed to abort %s", st.Ref) - } - } - releaselease() - return cs.Walk(ctx, func(info content.Info) error { - if err := cs.Delete(ctx, info.Digest); err != nil { - if errdefs.IsNotFound(err) { - return nil - } - + for i := uint64(1); i <= count; i++ { + ctx = namespaces.WithNamespace(ctx, fmt.Sprintf("%s-n%d", name, i)) + statuses, err := cs.ListStatuses(ctx) + if err != nil { return err } - return nil - }) + for _, st := range statuses { + if err := cs.Abort(ctx, st.Ref); err != nil { + return errors.Wrapf(err, "failed to abort %s", st.Ref) + } + } + err = cs.Walk(ctx, func(info content.Info) error { + if err := cs.Delete(ctx, info.Digest); err != nil { + if errdefs.IsNotFound(err) { + return nil + } + + return err + } + return nil + }) + if err != nil { + return err + } + } + return nil }, nil } diff --git a/metadata/buckets.go b/metadata/buckets.go index cade7eea3..873626f2b 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -52,7 +52,7 @@ var ( bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references bucketKeyObjectContent = []byte("content") // stores content references bucketKeyObjectBlob = []byte("blob") // stores content links - bucketKeyObjectIngest = []byte("ingest") // stores ingest links + bucketKeyObjectIngests = []byte("ingests") // stores ingest objects bucketKeyObjectLeases = []byte("leases") // stores leases bucketKeyDigest = []byte("digest") @@ -70,6 +70,10 @@ var ( bucketKeyTarget = []byte("target") bucketKeyExtensions = []byte("extensions") bucketKeyCreatedAt = []byte("createdat") + bucketKeyExpected = []byte("expected") + bucketKeyRef = []byte("ref") + + deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2 ) func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket { @@ -178,14 +182,18 @@ func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Buck return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) } -func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { - bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) +func getIngestsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests) +} + +func createIngestBucket(tx *bolt.Tx, namespace, ref string) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref)) if err != nil { return nil, err } return bkt, nil } -func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { - return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) +func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref)) } diff --git a/metadata/content.go b/metadata/content.go index 3a746c016..ecb74ba66 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -226,14 +226,16 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte brefs := map[string]string{} if err := view(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, ns) + bkt := getIngestsBucket(tx, ns) if bkt == nil { return nil } return bkt.ForEach(func(k, v []byte) error { - // TODO(dmcgowan): match name and potentially labels here - brefs[string(k)] = string(v) + if v == nil { + // TODO(dmcgowan): match name and potentially labels here + brefs[string(k)] = string(bkt.Bucket(k).Get(bucketKeyRef)) + } return nil }) }); err != nil { @@ -261,11 +263,11 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte } func getRef(tx *bolt.Tx, ns, ref string) string { - bkt := getIngestBucket(tx, ns) + bkt := getIngestBucket(tx, ns, ref) if bkt == nil { return "" } - v := bkt.Get([]byte(ref)) + v := bkt.Get(bucketKeyRef) if len(v) == 0 { return "" } @@ -308,19 +310,29 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error { defer cs.l.RUnlock() return update(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, ns) + ibkt := getIngestsBucket(tx, ns) + if ibkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + bkt := ibkt.Bucket([]byte(ref)) if bkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) } - bref := string(bkt.Get([]byte(ref))) + bref := string(bkt.Get(bucketKeyRef)) if bref == "" { return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) } - if err := bkt.Delete([]byte(ref)); err != nil { + expected := string(bkt.Get(bucketKeyExpected)) + if err := ibkt.DeleteBucket([]byte(ref)); err != nil { return err } - return cs.Store.Abort(ctx, bref) + // if not shared content, delete active ingest on backend + if expected == "" { + return cs.Store.Abort(ctx, bref) + } + + return nil }) } @@ -337,8 +349,10 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe var ( w content.Writer exists bool + bref string ) if err := update(ctx, cs.db, func(tx *bolt.Tx) error { + var shared bool if expected != "" { cbkt := getBlobBucket(tx, ns, expected) if cbkt != nil { @@ -352,18 +366,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe exists = true return nil } + + if st, err := cs.Store.Info(ctx, expected); err == nil { + // Ensure the expected size is the same, it is likely + // an error if the size is mismatched but the caller + // must resolve this on commit + if size == 0 || size == st.Size { + shared = true + size = st.Size + } + } } - bkt, err := createIngestBucket(tx, ns) + bkt, err := createIngestBucket(tx, ns, ref) if err != nil { return err } - var ( - bref string - brefb = bkt.Get([]byte(ref)) - ) - + brefb := bkt.Get(bucketKeyRef) if brefb == nil { sid, err := bkt.NextSequence() if err != nil { @@ -371,21 +391,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe } bref = createKey(sid, ns, ref) - if err := bkt.Put([]byte(ref), []byte(bref)); err != nil { + if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil { return err } } else { bref = string(brefb) } - // Do not use the passed in expected value here since it was - // already checked against the user metadata. If the content - // store has the content, it must still be written before - // linked into the given namespace. It is possible in the future - // to allow content which exists in content store but not - // namespace to be linked here and returned an exist error, but - // this would require more configuration to make secure. - w, err = cs.Store.Writer(ctx, bref, size, "") + if shared { + if err := bkt.Put(bucketKeyExpected, []byte(expected)); err != nil { + return err + } + } else { + // Do not use the passed in expected value here since it was + // already checked against the user metadata. The content must + // be committed in the namespace before it will be seen as + // available in the current namespace. + w, err = cs.Store.Writer(ctx, bref, size, "") + } return err }); err != nil { return nil, err @@ -394,23 +417,99 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) } - // TODO: keep the expected in the writer to use on commit - // when no expected is provided there. return &namespacedWriter{ - Writer: w, + ctx: ctx, ref: ref, namespace: ns, db: cs.db, + provider: cs.Store, l: &cs.l, + w: w, + bref: bref, + started: time.Now(), + expected: expected, + size: size, }, nil } type namespacedWriter struct { - content.Writer + ctx context.Context ref string namespace string db transactor - l *sync.RWMutex + provider interface { + content.Provider + content.Ingester + } + l *sync.RWMutex + + w content.Writer + + bref string + started time.Time + expected digest.Digest + size int64 +} + +func (nw *namespacedWriter) Close() error { + if nw.w != nil { + return nw.w.Close() + } + return nil +} + +func (nw *namespacedWriter) Write(p []byte) (int, error) { + // if no writer, first copy and unshare before performing write + if nw.w == nil { + if len(p) == 0 { + return 0, nil + } + + if err := nw.createAndCopy(nw.ctx, nw.size); err != nil { + return 0, err + } + } + + return nw.w.Write(p) +} + +func (nw *namespacedWriter) Digest() digest.Digest { + if nw.w != nil { + return nw.w.Digest() + } + return nw.expected +} + +func (nw *namespacedWriter) Truncate(size int64) error { + if nw.w != nil { + return nw.w.Truncate(size) + } + + return nw.createAndCopy(nw.ctx, size) +} + +func (nw *namespacedWriter) createAndCopy(ctx context.Context, size int64) error { + w, err := nw.provider.Writer(ctx, nw.bref, nw.size, "") + if err != nil { + return err + } + + if size > 0 { + ra, err := nw.provider.ReaderAt(ctx, nw.expected) + if err != nil { + return err + } + defer ra.Close() + + if err := content.CopyReaderAt(w, ra, size); err != nil { + nw.w.Close() + nw.w = nil + return err + } + } + nw.w = w + + return nil } func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { @@ -418,9 +517,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig defer nw.l.RUnlock() return update(ctx, nw.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, nw.namespace) + bkt := getIngestsBucket(tx, nw.namespace) if bkt != nil { - if err := bkt.Delete([]byte(nw.ref)); err != nil { + if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound { return err } } @@ -443,24 +542,38 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, return "", err } - status, err := nw.Writer.Status() - if err != nil { - return "", err - } - if size != 0 && size != status.Offset { - return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) - } - size = status.Offset - - actual := nw.Writer.Digest() - - if err := nw.Writer.Commit(ctx, size, expected); err != nil { - if !errdefs.IsAlreadyExists(err) { - return "", err + var actual digest.Digest + if nw.w == nil { + if size != 0 && size != nw.size { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size) } + if expected != "" && expected != nw.expected { + return "", errors.Errorf("%q unexpected digest", nw.ref) + } + size = nw.size + actual = nw.expected if getBlobBucket(tx, nw.namespace, actual) != nil { return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) } + } else { + status, err := nw.w.Status() + if err != nil { + return "", err + } + if size != 0 && size != status.Offset { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) + } + size = status.Offset + actual = nw.w.Digest() + + if err := nw.w.Commit(ctx, size, expected); err != nil { + if !errdefs.IsAlreadyExists(err) { + return "", err + } + if getBlobBucket(tx, nw.namespace, actual) != nil { + return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) + } + } } bkt, err := createBlobBucket(tx, nw.namespace, actual) @@ -484,12 +597,20 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, return actual, bkt.Put(bucketKeySize, sizeEncoded) } -func (nw *namespacedWriter) Status() (content.Status, error) { - st, err := nw.Writer.Status() +func (nw *namespacedWriter) Status() (st content.Status, err error) { + if nw.w != nil { + st, err = nw.w.Status() + } else { + st.Offset = nw.size + st.Total = nw.size + st.StartedAt = nw.started + st.UpdatedAt = nw.started + st.Expected = nw.expected + } if err == nil { st.Ref = nw.ref } - return st, err + return } func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { @@ -590,13 +711,30 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er continue } bbkt := cbkt.Bucket(bucketKeyObjectBlob) - if err := bbkt.ForEach(func(ck, cv []byte) error { - if cv == nil { - seen[string(ck)] = struct{}{} + if bbkt != nil { + if err := bbkt.ForEach(func(ck, cv []byte) error { + if cv == nil { + seen[string(ck)] = struct{}{} + } + return nil + }); err != nil { + return err + } + } + + ibkt := cbkt.Bucket(bucketKeyObjectIngests) + if ibkt != nil { + if err := ibkt.ForEach(func(ref, v []byte) error { + if v == nil { + expected := ibkt.Bucket(ref).Get(bucketKeyExpected) + if len(expected) > 0 { + seen[string(expected)] = struct{}{} + } + } + return nil + }); err != nil { + return err } - return nil - }); err != nil { - return err } } diff --git a/metadata/content_test.go b/metadata/content_test.go index 6f05bd1a2..1ba4a027d 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -19,7 +19,9 @@ package metadata import ( "bytes" "context" + "fmt" "path/filepath" + "sync/atomic" "testing" "github.com/boltdb/bolt" @@ -45,6 +47,16 @@ func createContentStore(ctx context.Context, root string) (context.Context, cont return nil, nil, nil, err } + var ( + count uint64 + name = testsuite.Name(ctx) + ) + wrap := func(ctx context.Context) (context.Context, func() error, error) { + n := atomic.AddUint64(&count, 1) + return namespaces.WithNamespace(ctx, fmt.Sprintf("%s-n%d", name, n)), func() error { return nil }, nil + } + ctx = testsuite.SetContextWrapper(ctx, wrap) + return ctx, NewDB(db, cs, nil).ContentStore(), func() error { return db.Close() }, nil diff --git a/metadata/db.go b/metadata/db.go index 7296d8caa..4398fc20f 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -43,7 +43,7 @@ const ( // dbVersion represents updates to the schema // version which are additions and compatible with // prior version of the same schema. - dbVersion = 1 + dbVersion = 2 ) // DB represents a metadata database backed by a bolt diff --git a/metadata/db_test.go b/metadata/db_test.go index 1c583d794..dedc1dd3f 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -102,6 +102,27 @@ func TestInit(t *testing.T) { } func TestMigrations(t *testing.T) { + testRefs := []struct { + ref string + bref string + }{ + { + ref: "k1", + bref: "bk1", + }, + { + ref: strings.Repeat("longerkey", 30), // 270 characters + bref: "short", + }, + { + ref: "short", + bref: strings.Repeat("longerkey", 30), // 270 characters + }, + { + ref: "emptykey", + bref: "", + }, + } migrationTests := []struct { name string init func(*bolt.Tx) error @@ -199,6 +220,48 @@ func TestMigrations(t *testing.T) { } } + return nil + }, + }, + { + name: "IngestUpdate", + init: func(tx *bolt.Tx) error { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest) + if err != nil { + return err + } + + for _, s := range testRefs { + if err := bkt.Put([]byte(s.ref), []byte(s.bref)); err != nil { + return err + } + } + + return nil + }, + check: func(tx *bolt.Tx) error { + bkt := getIngestsBucket(tx, "testing") + if bkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "ingests bucket not found") + } + + for _, s := range testRefs { + sbkt := bkt.Bucket([]byte(s.ref)) + if sbkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "ref does not exist") + } + + bref := string(sbkt.Get(bucketKeyRef)) + if bref != s.bref { + return errors.Errorf("unexpected reference key %q, expected %q", bref, s.bref) + } + } + + dbkt := getBucket(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest) + if dbkt != nil { + return errors.New("deprecated ingest bucket still exists") + } + return nil }, }, diff --git a/metadata/migrations.go b/metadata/migrations.go index 5be5f8301..3ffda6b3b 100644 --- a/metadata/migrations.go +++ b/metadata/migrations.go @@ -40,6 +40,11 @@ var migrations = []migration{ version: 1, migrate: addChildLinks, }, + { + schema: "v1", + version: 2, + migrate: migrateIngests, + }, } // addChildLinks Adds children key to the snapshotters to enforce snapshot @@ -99,3 +104,53 @@ func addChildLinks(tx *bolt.Tx) error { return nil } + +// migrateIngests moves ingests from the key/value ingest bucket +// to a structured ingest bucket for storing additional state about +// an ingest. +func migrateIngests(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + bkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent) + if bkt == nil { + continue + } + + dbkt := bkt.Bucket(deprecatedBucketKeyObjectIngest) + if dbkt == nil { + continue + } + + // Create new ingests bucket + nbkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests) + if err != nil { + return err + } + + if err := dbkt.ForEach(func(ref, bref []byte) error { + ibkt, err := nbkt.CreateBucketIfNotExists(ref) + if err != nil { + return err + } + return ibkt.Put(bucketKeyRef, bref) + }); err != nil { + return err + } + + if err := bkt.DeleteBucket(deprecatedBucketKeyObjectIngest); err != nil { + return err + } + } + + return nil +} diff --git a/namespaces/context.go b/namespaces/context.go index afcd9d1b0..cc5621a68 100644 --- a/namespaces/context.go +++ b/namespaces/context.go @@ -17,11 +17,11 @@ package namespaces import ( + "context" "os" "github.com/containerd/containerd/errdefs" "github.com/pkg/errors" - "golang.org/x/net/context" ) const ( diff --git a/services/content/service.go b/services/content/service.go index a2c87a4d5..d809dab1e 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -366,7 +366,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { // users use the same writer style for each with a minimum of overhead. if req.Expected != "" { if expected != "" && expected != req.Expected { - return status.Errorf(codes.InvalidArgument, "inconsistent digest provided: %v != %v", req.Expected, expected) + log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected) } expected = req.Expected @@ -383,7 +383,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { // Update the expected total. Typically, this could be seen at // negotiation time or on a commit message. if total > 0 && req.Total != total { - return status.Errorf(codes.InvalidArgument, "inconsistent total provided: %v != %v", req.Total, total) + log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total) } total = req.Total }