From a1a67899f82c47c0f645019b4d6edbbd220fa5db Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 2 Feb 2018 15:26:35 -0800 Subject: [PATCH] Shared content across namespaces Update content ingests to use content from another namespace. Ingests must be committed to make content available and the client will see the sharing as an ingest which has already been fully written to, but not completed. Updated the database version to change the ingest record in the database from a link key to an object with a link and expected value. This expected value is used to indicate that the content already exists and an underlying writer may not yet exist. Signed-off-by: Derek McGowan --- content/helpers.go | 18 ++- content/testsuite/testsuite.go | 142 +++++++++++++++++++ metadata/buckets.go | 18 ++- metadata/content.go | 246 +++++++++++++++++++++++++-------- metadata/db.go | 2 +- metadata/db_test.go | 63 +++++++++ metadata/migrations.go | 55 ++++++++ services/content/service.go | 4 +- 8 files changed, 485 insertions(+), 63 deletions(-) diff --git a/content/helpers.go b/content/helpers.go index c125e2a2b..974ff9f38 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -62,7 +62,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } // Copy copies data with the expected digest from the reader into the -// provided content store writer. +// provided content store writer. This copy commits the writer. // // This is useful when the digest and size are known beforehand. When // the size or digest is unknown, these values may be empty. @@ -97,6 +97,22 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige return nil } +// CopyReaderAt copies to a writer from a given reader at for the given +// number of bytes. This copy does not commit the writer. +func CopyReaderAt(cw Writer, ra ReaderAt, n int64) error { + ws, err := cw.Status() + if err != nil { + return err + } + + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + + _, err = io.CopyBuffer(cw, io.NewSectionReader(ra, ws.Offset, n), *buf) + + return err +} + // seekReader attempts to seek the reader to the given offset, either by // resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding // up to the given offset. diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index a1ad98a3a..cf57309a1 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -31,6 +31,9 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker))) t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt))) t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) + + t.Run("CrossNamespaceAppend", makeTest(t, name, storeFn, checkCrossNSAppend)) + t.Run("CrossNamespaceShare", makeTest(t, name, storeFn, checkCrossNSShare)) } // ContextWrapper is used to decorate new context used inside the test @@ -504,6 +507,124 @@ func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") } +func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) { + wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper) + if !ok { + t.Skip("multiple contexts not supported") + } + + var size int64 = 1000 + b, d := createContent(size) + ref := fmt.Sprintf("ref-%d", size) + t1 := time.Now() + + if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { + t.Fatal(err) + } + + ctx2, done, err := wrap(context.Background()) + if err != nil { + t.Fatal(err) + } + defer done() + + w, err := cs.Writer(ctx2, ref, size, d) + if err != nil { + t.Fatal(err) + } + t2 := time.Now() + + checkStatus(t, w, content.Status{ + Ref: ref, + Offset: size, + Total: size, + }, d, t1, t2, t1, t2) + + if err := w.Commit(ctx2, size, d); err != nil { + t.Fatal(err) + } + t3 := time.Now() + + info := content.Info{ + Digest: d, + Size: size, + } + if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + + if err := checkContent(ctx2, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } +} + +func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) { + wrap, ok := ctx.Value(wrapperKey{}).(ContextWrapper) + if !ok { + t.Skip("multiple contexts not supported") + } + + var size int64 = 1000 + b, d := createContent(size) + ref := fmt.Sprintf("ref-%d", size) + t1 := time.Now() + + if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { + t.Fatal(err) + } + + ctx2, done, err := wrap(context.Background()) + if err != nil { + t.Fatal(err) + } + defer done() + + extra := []byte("appended bytes") + size2 := size + int64(len(extra)) + b2 := make([]byte, size2) + copy(b2[:size], b) + copy(b2[size:], extra) + d2 := digest.FromBytes(b2) + + w, err := cs.Writer(ctx2, ref, size, d) + if err != nil { + t.Fatal(err) + } + t2 := time.Now() + + checkStatus(t, w, content.Status{ + Ref: ref, + Offset: size, + Total: size, + }, d, t1, t2, t1, t2) + + if _, err := w.Write(extra); err != nil { + t.Fatal(err) + } + + if err := w.Commit(ctx2, size2, d2); err != nil { + t.Fatal(err) + } + t3 := time.Now() + + info := content.Info{ + Digest: d, + Size: size, + } + if err := checkContent(ctx, cs, d, info, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + + info2 := content.Info{ + Digest: d2, + Size: size2, + } + if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil { + t.Fatal(err) + } + +} + func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { t.Helper() st, err := w.Status() @@ -585,6 +706,27 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected return nil } +func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error { + if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil { + return err + } + + b, err := content.ReadBlob(ctx, cs, d) + if err != nil { + return errors.Wrap(err, "failed to read blob") + } + + if int64(len(b)) != expected.Size { + return errors.Errorf("wrong blob size %d, expected %d", len(b), expected.Size) + } + + actual := digest.FromBytes(b) + if actual != d { + return errors.Errorf("wrong digest %s, expected %s", actual, d) + } + + return nil +} var contentSeed int64 diff --git a/metadata/buckets.go b/metadata/buckets.go index 45309724f..80a8010e1 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -36,7 +36,7 @@ var ( bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references bucketKeyObjectContent = []byte("content") // stores content references bucketKeyObjectBlob = []byte("blob") // stores content links - bucketKeyObjectIngest = []byte("ingest") // stores ingest links + bucketKeyObjectIngests = []byte("ingests") // stores ingest objects bucketKeyObjectLeases = []byte("leases") // stores leases bucketKeyDigest = []byte("digest") @@ -54,6 +54,10 @@ var ( bucketKeyTarget = []byte("target") bucketKeyExtensions = []byte("extensions") bucketKeyCreatedAt = []byte("createdat") + bucketKeyExpected = []byte("expected") + bucketKeyRef = []byte("ref") + + deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2 ) func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket { @@ -162,14 +166,18 @@ func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Buck return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String())) } -func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { - bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) +func getIngestsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests) +} + +func createIngestBucket(tx *bolt.Tx, namespace, ref string) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref)) if err != nil { return nil, err } return bkt, nil } -func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { - return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest) +func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref)) } diff --git a/metadata/content.go b/metadata/content.go index 57eabf307..8b88a6c81 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -210,14 +210,16 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte brefs := map[string]string{} if err := view(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, ns) + bkt := getIngestsBucket(tx, ns) if bkt == nil { return nil } return bkt.ForEach(func(k, v []byte) error { - // TODO(dmcgowan): match name and potentially labels here - brefs[string(k)] = string(v) + if v == nil { + // TODO(dmcgowan): match name and potentially labels here + brefs[string(k)] = string(bkt.Bucket(k).Get(bucketKeyRef)) + } return nil }) }); err != nil { @@ -245,11 +247,11 @@ func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]conte } func getRef(tx *bolt.Tx, ns, ref string) string { - bkt := getIngestBucket(tx, ns) + bkt := getIngestBucket(tx, ns, ref) if bkt == nil { return "" } - v := bkt.Get([]byte(ref)) + v := bkt.Get(bucketKeyRef) if len(v) == 0 { return "" } @@ -292,19 +294,29 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error { defer cs.l.RUnlock() return update(ctx, cs.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, ns) + ibkt := getIngestsBucket(tx, ns) + if ibkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) + } + bkt := ibkt.Bucket([]byte(ref)) if bkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) } - bref := string(bkt.Get([]byte(ref))) + bref := string(bkt.Get(bucketKeyRef)) if bref == "" { return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) } - if err := bkt.Delete([]byte(ref)); err != nil { + expected := string(bkt.Get(bucketKeyExpected)) + if err := ibkt.DeleteBucket([]byte(ref)); err != nil { return err } - return cs.Store.Abort(ctx, bref) + // if not shared content, delete active ingest on backend + if expected == "" { + return cs.Store.Abort(ctx, bref) + } + + return nil }) } @@ -321,8 +333,10 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe var ( w content.Writer exists bool + bref string ) if err := update(ctx, cs.db, func(tx *bolt.Tx) error { + var shared bool if expected != "" { cbkt := getBlobBucket(tx, ns, expected) if cbkt != nil { @@ -336,18 +350,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe exists = true return nil } + + if st, err := cs.Store.Info(ctx, expected); err == nil { + // Ensure the expected size is the same, it is likely + // an error if the size is mismatched but the caller + // must resolve this on commit + if size == 0 || size == st.Size { + shared = true + size = st.Size + } + } } - bkt, err := createIngestBucket(tx, ns) + bkt, err := createIngestBucket(tx, ns, ref) if err != nil { return err } - var ( - bref string - brefb = bkt.Get([]byte(ref)) - ) - + brefb := bkt.Get(bucketKeyRef) if brefb == nil { sid, err := bkt.NextSequence() if err != nil { @@ -355,21 +375,24 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe } bref = createKey(sid, ns, ref) - if err := bkt.Put([]byte(ref), []byte(bref)); err != nil { + if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil { return err } } else { bref = string(brefb) } - // Do not use the passed in expected value here since it was - // already checked against the user metadata. If the content - // store has the content, it must still be written before - // linked into the given namespace. It is possible in the future - // to allow content which exists in content store but not - // namespace to be linked here and returned an exist error, but - // this would require more configuration to make secure. - w, err = cs.Store.Writer(ctx, bref, size, "") + if shared { + if err := bkt.Put(bucketKeyExpected, []byte(expected)); err != nil { + return err + } + } else { + // Do not use the passed in expected value here since it was + // already checked against the user metadata. The content must + // be committed in the namespace before it will be seen as + // available in the current namespace. + w, err = cs.Store.Writer(ctx, bref, size, "") + } return err }); err != nil { return nil, err @@ -378,23 +401,99 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) } - // TODO: keep the expected in the writer to use on commit - // when no expected is provided there. return &namespacedWriter{ - Writer: w, + ctx: ctx, ref: ref, namespace: ns, db: cs.db, + provider: cs.Store, l: &cs.l, + w: w, + bref: bref, + started: time.Now(), + expected: expected, + size: size, }, nil } type namespacedWriter struct { - content.Writer + ctx context.Context ref string namespace string db transactor - l *sync.RWMutex + provider interface { + content.Provider + content.Ingester + } + l *sync.RWMutex + + w content.Writer + + bref string + started time.Time + expected digest.Digest + size int64 +} + +func (nw *namespacedWriter) Close() error { + if nw.w != nil { + return nw.w.Close() + } + return nil +} + +func (nw *namespacedWriter) Write(p []byte) (int, error) { + // if no writer, first copy and unshare before performing write + if nw.w == nil { + if len(p) == 0 { + return 0, nil + } + + if err := nw.createAndCopy(nw.ctx, nw.size); err != nil { + return 0, err + } + } + + return nw.w.Write(p) +} + +func (nw *namespacedWriter) Digest() digest.Digest { + if nw.w != nil { + return nw.w.Digest() + } + return nw.expected +} + +func (nw *namespacedWriter) Truncate(size int64) error { + if nw.w != nil { + return nw.w.Truncate(size) + } + + return nw.createAndCopy(nw.ctx, size) +} + +func (nw *namespacedWriter) createAndCopy(ctx context.Context, size int64) error { + w, err := nw.provider.Writer(ctx, nw.bref, nw.size, "") + if err != nil { + return err + } + + if size > 0 { + ra, err := nw.provider.ReaderAt(ctx, nw.expected) + if err != nil { + return err + } + defer ra.Close() + + if err := content.CopyReaderAt(w, ra, size); err != nil { + nw.w.Close() + nw.w = nil + return err + } + } + nw.w = w + + return nil } func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { @@ -402,9 +501,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig defer nw.l.RUnlock() return update(ctx, nw.db, func(tx *bolt.Tx) error { - bkt := getIngestBucket(tx, nw.namespace) + bkt := getIngestsBucket(tx, nw.namespace) if bkt != nil { - if err := bkt.Delete([]byte(nw.ref)); err != nil { + if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound { return err } } @@ -427,24 +526,38 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, return "", err } - status, err := nw.Writer.Status() - if err != nil { - return "", err - } - if size != 0 && size != status.Offset { - return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) - } - size = status.Offset - - actual := nw.Writer.Digest() - - if err := nw.Writer.Commit(ctx, size, expected); err != nil { - if !errdefs.IsAlreadyExists(err) { - return "", err + var actual digest.Digest + if nw.w == nil { + if size != 0 && size != nw.size { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size) } + if expected != "" && expected != nw.expected { + return "", errors.Errorf("%q unexpected digest", nw.ref) + } + size = nw.size + actual = nw.expected if getBlobBucket(tx, nw.namespace, actual) != nil { return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) } + } else { + status, err := nw.w.Status() + if err != nil { + return "", err + } + if size != 0 && size != status.Offset { + return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) + } + size = status.Offset + actual = nw.w.Digest() + + if err := nw.w.Commit(ctx, size, expected); err != nil { + if !errdefs.IsAlreadyExists(err) { + return "", err + } + if getBlobBucket(tx, nw.namespace, actual) != nil { + return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) + } + } } bkt, err := createBlobBucket(tx, nw.namespace, actual) @@ -468,12 +581,20 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, return actual, bkt.Put(bucketKeySize, sizeEncoded) } -func (nw *namespacedWriter) Status() (content.Status, error) { - st, err := nw.Writer.Status() +func (nw *namespacedWriter) Status() (st content.Status, err error) { + if nw.w != nil { + st, err = nw.w.Status() + } else { + st.Offset = nw.size + st.Total = nw.size + st.StartedAt = nw.started + st.UpdatedAt = nw.started + st.Expected = nw.expected + } if err == nil { st.Ref = nw.ref } - return st, err + return } func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { @@ -574,13 +695,30 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er continue } bbkt := cbkt.Bucket(bucketKeyObjectBlob) - if err := bbkt.ForEach(func(ck, cv []byte) error { - if cv == nil { - seen[string(ck)] = struct{}{} + if bbkt != nil { + if err := bbkt.ForEach(func(ck, cv []byte) error { + if cv == nil { + seen[string(ck)] = struct{}{} + } + return nil + }); err != nil { + return err + } + } + + ibkt := cbkt.Bucket(bucketKeyObjectIngests) + if ibkt != nil { + if err := ibkt.ForEach(func(ref, v []byte) error { + if v == nil { + expected := ibkt.Bucket(ref).Get(bucketKeyExpected) + if len(expected) > 0 { + seen[string(expected)] = struct{}{} + } + } + return nil + }); err != nil { + return err } - return nil - }); err != nil { - return err } } diff --git a/metadata/db.go b/metadata/db.go index 5e830ad82..5601b919e 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -27,7 +27,7 @@ const ( // dbVersion represents updates to the schema // version which are additions and compatible with // prior version of the same schema. - dbVersion = 1 + dbVersion = 2 ) // DB represents a metadata database backed by a bolt diff --git a/metadata/db_test.go b/metadata/db_test.go index 54519436e..879139ff8 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -86,6 +86,27 @@ func TestInit(t *testing.T) { } func TestMigrations(t *testing.T) { + testRefs := []struct { + ref string + bref string + }{ + { + ref: "k1", + bref: "bk1", + }, + { + ref: strings.Repeat("longerkey", 30), // 270 characters + bref: "short", + }, + { + ref: "short", + bref: strings.Repeat("longerkey", 30), // 270 characters + }, + { + ref: "emptykey", + bref: "", + }, + } migrationTests := []struct { name string init func(*bolt.Tx) error @@ -183,6 +204,48 @@ func TestMigrations(t *testing.T) { } } + return nil + }, + }, + { + name: "IngestUpdate", + init: func(tx *bolt.Tx) error { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest) + if err != nil { + return err + } + + for _, s := range testRefs { + if err := bkt.Put([]byte(s.ref), []byte(s.bref)); err != nil { + return err + } + } + + return nil + }, + check: func(tx *bolt.Tx) error { + bkt := getIngestsBucket(tx, "testing") + if bkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "ingests bucket not found") + } + + for _, s := range testRefs { + sbkt := bkt.Bucket([]byte(s.ref)) + if sbkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "ref does not exist") + } + + bref := string(sbkt.Get(bucketKeyRef)) + if bref != s.bref { + return errors.Errorf("unexpected reference key %q, expected %q", bref, s.bref) + } + } + + dbkt := getBucket(tx, bucketKeyVersion, []byte("testing"), bucketKeyObjectContent, deprecatedBucketKeyObjectIngest) + if dbkt != nil { + return errors.New("deprecated ingest bucket still exists") + } + return nil }, }, diff --git a/metadata/migrations.go b/metadata/migrations.go index 997aee298..61e5b3185 100644 --- a/metadata/migrations.go +++ b/metadata/migrations.go @@ -24,6 +24,11 @@ var migrations = []migration{ version: 1, migrate: addChildLinks, }, + { + schema: "v1", + version: 2, + migrate: migrateIngests, + }, } // addChildLinks Adds children key to the snapshotters to enforce snapshot @@ -83,3 +88,53 @@ func addChildLinks(tx *bolt.Tx) error { return nil } + +// migrateIngests moves ingests from the key/value ingest bucket +// to a structured ingest bucket for storing additional state about +// an ingest. +func migrateIngests(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + bkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent) + if bkt == nil { + continue + } + + dbkt := bkt.Bucket(deprecatedBucketKeyObjectIngest) + if dbkt == nil { + continue + } + + // Create new ingests bucket + nbkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests) + if err != nil { + return err + } + + if err := dbkt.ForEach(func(ref, bref []byte) error { + ibkt, err := nbkt.CreateBucketIfNotExists(ref) + if err != nil { + return err + } + return ibkt.Put(bucketKeyRef, bref) + }); err != nil { + return err + } + + if err := bkt.DeleteBucket(deprecatedBucketKeyObjectIngest); err != nil { + return err + } + } + + return nil +} diff --git a/services/content/service.go b/services/content/service.go index 8a55ba2f3..036db82db 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -356,7 +356,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { // users use the same writer style for each with a minimum of overhead. if req.Expected != "" { if expected != "" && expected != req.Expected { - return status.Errorf(codes.InvalidArgument, "inconsistent digest provided: %v != %v", req.Expected, expected) + log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected) } expected = req.Expected @@ -373,7 +373,7 @@ func (s *service) Write(session api.Content_WriteServer) (err error) { // Update the expected total. Typically, this could be seen at // negotiation time or on a commit message. if total > 0 && req.Total != total { - return status.Errorf(codes.InvalidArgument, "inconsistent total provided: %v != %v", req.Total, total) + log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total) } total = req.Total }