From 02d737f9674792da1cc1c531ae0165e076e8645e Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 25 Jan 2018 17:11:46 -0800 Subject: [PATCH 1/2] Add resume content test cases Signed-off-by: Derek McGowan --- content/testsuite/testsuite.go | 115 +++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 14392da7c..8d6f35de5 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -3,6 +3,7 @@ package testsuite import ( "bytes" "context" + "fmt" "io" "io/ioutil" "math/rand" @@ -24,6 +25,11 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus)) t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter)) + t.Run("ResumeTruncate", makeTest(t, name, storeFn, checkResume(resumeTruncate))) + t.Run("ResumeDiscard", makeTest(t, name, storeFn, checkResume(resumeDiscard))) + t.Run("ResumeCopy", makeTest(t, name, storeFn, checkResume(resumeCopy))) + t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker))) + t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt))) t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) } @@ -352,6 +358,115 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { } +func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) { + return func(ctx context.Context, t *testing.T, cs content.Store) { + sizes := []int64{500, 5000, 50000} + truncations := []float64{0.0, 0.1, 0.5, 0.9, 1.0} + + for i, size := range sizes { + for j, tp := range truncations { + b, d := createContent(size, int64(i*len(truncations)+j)) + limit := int64(float64(size) * tp) + ref := fmt.Sprintf("ref-%d-%d", i, j) + + w, err := cs.Writer(ctx, ref, size, d) + if err != nil { + t.Fatal(err) + } + + if _, err := w.Write(b[:limit]); err != nil { + w.Close() + t.Fatal(err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + w, err = cs.Writer(ctx, ref, size, d) + if err != nil { + t.Fatal(err) + } + + st, err := w.Status() + if err != nil { + w.Close() + t.Fatal(err) + } + + if st.Offset != limit { + w.Close() + t.Fatalf("Unexpected offset %d, expected %d", st.Offset, limit) + } + + preCommit := time.Now() + if err := rf(ctx, w, b, limit, size, d); err != nil { + t.Fatalf("Resume failed: %+v", err) + } + + postCommit := time.Now() + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + info := content.Info{ + Digest: d, + Size: size, + } + + if err := checkInfo(ctx, cs, d, info, preCommit, postCommit, preCommit, postCommit); err != nil { + t.Fatalf("Check info failed: %+v", err) + } + } + } + } +} + +func resumeTruncate(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error { + if err := w.Truncate(0); err != nil { + return errors.Wrap(err, "truncate failed") + } + + if _, err := io.CopyBuffer(w, bytes.NewReader(b), make([]byte, 1024)); err != nil { + return errors.Wrap(err, "write failed") + } + + return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed") +} + +func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error { + if _, err := io.CopyBuffer(w, bytes.NewReader(b[written:]), make([]byte, 1024)); err != nil { + return errors.Wrap(err, "write failed") + } + return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed") +} + +func resumeCopy(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error { + r := struct { + io.Reader + }{bytes.NewReader(b)} + return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") +} + +func resumeCopySeeker(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error { + r := struct { + io.ReadSeeker + }{bytes.NewReader(b)} + return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") +} + +func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error { + type readerAt interface { + io.Reader + io.ReaderAt + } + r := struct { + readerAt + }{bytes.NewReader(b)} + return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") +} + func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { t.Helper() st, err := w.Status() From a12f493bd316adbf48aacdb453c64f365fe89cb0 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 25 Jan 2018 17:15:32 -0800 Subject: [PATCH 2/2] Update copy to discard over truncate Prevents the copy method from calling discard on the writer when the reader is not seekable. Instead, the copy method will discard up to the offset. Truncate is a more expensive operation since any bytes that are truncated already have their hash calculated and are stored on disk in the backend. Re-writing bytes which were truncated requires transfering the data over GRPC again and re-computing the hash up to the point of truncation. Signed-off-by: Derek McGowan --- content/helpers.go | 33 +++++++++++++++++---------------- content/helpers_test.go | 6 +++--- content/testsuite/testsuite.go | 1 - 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/content/helpers.go b/content/helpers.go index 83c31d917..c125e2a2b 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -3,6 +3,7 @@ package content import ( "context" "io" + "io/ioutil" "sync" "github.com/containerd/containerd/errdefs" @@ -76,14 +77,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige if ws.Offset > 0 { r, err = seekReader(r, ws.Offset, size) if err != nil { - if !isUnseekable(err) { - return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) - } - - // reader is unseekable, try to move the writer back to the start. - if err := cw.Truncate(0); err != nil { - return errors.Wrapf(err, "content writer truncate failed") - } + return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) } } @@ -103,14 +97,9 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige return nil } -var errUnseekable = errors.New("seek not supported") - -func isUnseekable(err error) bool { - return errors.Cause(err) == errUnseekable -} - // seekReader attempts to seek the reader to the given offset, either by -// resolving `io.Seeker` or by detecting `io.ReaderAt`. +// resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding +// up to the given offset. func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { // attempt to resolve r as a seeker and setup the offset. seeker, ok := r.(io.Seeker) @@ -134,5 +123,17 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { return sr, nil } - return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset) + // well then, let's just discard up to the offset + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + + n, err := io.CopyBuffer(ioutil.Discard, io.LimitReader(r, offset), *buf) + if err != nil { + return nil, errors.Wrap(err, "failed to discard to offset") + } + if n != offset { + return nil, errors.Errorf("unable to discard to offset") + } + + return r, nil } diff --git a/content/helpers_test.go b/content/helpers_test.go index b9de4dcb2..4daccfd8f 100644 --- a/content/helpers_test.go +++ b/content/helpers_test.go @@ -42,9 +42,9 @@ func TestCopy(t *testing.T) { }, { name: "copy with offset from unseekable source", - source: copySource{reader: bytes.NewBufferString("foo"), size: 3}, - writer: fakeWriter{status: Status{Offset: 8}}, - expected: "foo", + source: copySource{reader: bytes.NewBufferString("foobar"), size: 6}, + writer: fakeWriter{status: Status{Offset: 3}}, + expected: "bar", }, { name: "commit already exists", diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 8d6f35de5..7292d48ac 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -403,7 +403,6 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, if err := rf(ctx, w, b, limit, size, d); err != nil { t.Fatalf("Resume failed: %+v", err) } - postCommit := time.Now() if err := w.Close(); err != nil {