diff --git a/content/helpers.go b/content/helpers.go index 83c31d917..c125e2a2b 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -3,6 +3,7 @@ package content import ( "context" "io" + "io/ioutil" "sync" "github.com/containerd/containerd/errdefs" @@ -76,14 +77,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige if ws.Offset > 0 { r, err = seekReader(r, ws.Offset, size) if err != nil { - if !isUnseekable(err) { - return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) - } - - // reader is unseekable, try to move the writer back to the start. - if err := cw.Truncate(0); err != nil { - return errors.Wrapf(err, "content writer truncate failed") - } + return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) } } @@ -103,14 +97,9 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige return nil } -var errUnseekable = errors.New("seek not supported") - -func isUnseekable(err error) bool { - return errors.Cause(err) == errUnseekable -} - // seekReader attempts to seek the reader to the given offset, either by -// resolving `io.Seeker` or by detecting `io.ReaderAt`. +// resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding +// up to the given offset. func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { // attempt to resolve r as a seeker and setup the offset. seeker, ok := r.(io.Seeker) @@ -134,5 +123,17 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { return sr, nil } - return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset) + // well then, let's just discard up to the offset + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + + n, err := io.CopyBuffer(ioutil.Discard, io.LimitReader(r, offset), *buf) + if err != nil { + return nil, errors.Wrap(err, "failed to discard to offset") + } + if n != offset { + return nil, errors.Errorf("unable to discard to offset") + } + + return r, nil } diff --git a/content/helpers_test.go b/content/helpers_test.go index b9de4dcb2..4daccfd8f 100644 --- a/content/helpers_test.go +++ b/content/helpers_test.go @@ -42,9 +42,9 @@ func TestCopy(t *testing.T) { }, { name: "copy with offset from unseekable source", - source: copySource{reader: bytes.NewBufferString("foo"), size: 3}, - writer: fakeWriter{status: Status{Offset: 8}}, - expected: "foo", + source: copySource{reader: bytes.NewBufferString("foobar"), size: 6}, + writer: fakeWriter{status: Status{Offset: 3}}, + expected: "bar", }, { name: "commit already exists", diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 14392da7c..7292d48ac 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -3,6 +3,7 @@ package testsuite import ( "bytes" "context" + "fmt" "io" "io/ioutil" "math/rand" @@ -24,6 +25,11 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus)) t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter)) + t.Run("ResumeTruncate", makeTest(t, name, storeFn, checkResume(resumeTruncate))) + t.Run("ResumeDiscard", makeTest(t, name, storeFn, checkResume(resumeDiscard))) + t.Run("ResumeCopy", makeTest(t, name, storeFn, checkResume(resumeCopy))) + t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker))) + t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt))) t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) } @@ -352,6 +358,114 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { } +func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) { + return func(ctx context.Context, t *testing.T, cs content.Store) { + sizes := []int64{500, 5000, 50000} + truncations := []float64{0.0, 0.1, 0.5, 0.9, 1.0} + + for i, size := range sizes { + for j, tp := range truncations { + b, d := createContent(size, int64(i*len(truncations)+j)) + limit := int64(float64(size) * tp) + ref := fmt.Sprintf("ref-%d-%d", i, j) + + w, err := cs.Writer(ctx, ref, size, d) + if err != nil { + t.Fatal(err) + } + + if _, err := w.Write(b[:limit]); err != nil { + w.Close() + t.Fatal(err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + w, err = cs.Writer(ctx, ref, size, d) + if err != nil { + t.Fatal(err) + } + + st, err := w.Status() + if err != nil { + w.Close() + t.Fatal(err) + } + + if st.Offset != limit { + w.Close() + t.Fatalf("Unexpected offset %d, expected %d", st.Offset, limit) + } + + preCommit := time.Now() + if err := rf(ctx, w, b, limit, size, d); err != nil { + t.Fatalf("Resume failed: %+v", err) + } + postCommit := time.Now() + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + info := content.Info{ + Digest: d, + Size: size, + } + + if err := checkInfo(ctx, cs, d, info, preCommit, postCommit, preCommit, postCommit); err != nil { + t.Fatalf("Check info failed: %+v", err) + } + } + } + } +} + +func resumeTruncate(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error { + if err := w.Truncate(0); err != nil { + return errors.Wrap(err, "truncate failed") + } + + if _, err := io.CopyBuffer(w, bytes.NewReader(b), make([]byte, 1024)); err != nil { + return errors.Wrap(err, "write failed") + } + + return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed") +} + +func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error { + if _, err := io.CopyBuffer(w, bytes.NewReader(b[written:]), make([]byte, 1024)); err != nil { + return errors.Wrap(err, "write failed") + } + return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed") +} + +func resumeCopy(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error { + r := struct { + io.Reader + }{bytes.NewReader(b)} + return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") +} + +func resumeCopySeeker(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error { + r := struct { + io.ReadSeeker + }{bytes.NewReader(b)} + return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") +} + +func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error { + type readerAt interface { + io.Reader + io.ReaderAt + } + r := struct { + readerAt + }{bytes.NewReader(b)} + return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed") +} + func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { t.Helper() st, err := w.Status()