Merge pull request #2065 from dmcgowan/content-discard-over-truncate

content: update copy to discard instead of truncate
This commit is contained in:
Phil Estes 2018-01-29 11:20:17 -05:00 committed by GitHub
commit c6a7d10568
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 19 deletions

View File

@ -3,6 +3,7 @@ package content
import ( import (
"context" "context"
"io" "io"
"io/ioutil"
"sync" "sync"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
@ -76,14 +77,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
if ws.Offset > 0 { if ws.Offset > 0 {
r, err = seekReader(r, ws.Offset, size) r, err = seekReader(r, ws.Offset, size)
if err != nil { if err != nil {
if !isUnseekable(err) { return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
}
// reader is unseekable, try to move the writer back to the start.
if err := cw.Truncate(0); err != nil {
return errors.Wrapf(err, "content writer truncate failed")
}
} }
} }
@ -103,14 +97,9 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
return nil return nil
} }
var errUnseekable = errors.New("seek not supported")
func isUnseekable(err error) bool {
return errors.Cause(err) == errUnseekable
}
// seekReader attempts to seek the reader to the given offset, either by // seekReader attempts to seek the reader to the given offset, either by
// resolving `io.Seeker` or by detecting `io.ReaderAt`. // resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding
// up to the given offset.
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
// attempt to resolve r as a seeker and setup the offset. // attempt to resolve r as a seeker and setup the offset.
seeker, ok := r.(io.Seeker) seeker, ok := r.(io.Seeker)
@ -134,5 +123,17 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
return sr, nil return sr, nil
} }
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset) // well then, let's just discard up to the offset
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
n, err := io.CopyBuffer(ioutil.Discard, io.LimitReader(r, offset), *buf)
if err != nil {
return nil, errors.Wrap(err, "failed to discard to offset")
}
if n != offset {
return nil, errors.Errorf("unable to discard to offset")
}
return r, nil
} }

View File

@ -42,9 +42,9 @@ func TestCopy(t *testing.T) {
}, },
{ {
name: "copy with offset from unseekable source", name: "copy with offset from unseekable source",
source: copySource{reader: bytes.NewBufferString("foo"), size: 3}, source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
writer: fakeWriter{status: Status{Offset: 8}}, writer: fakeWriter{status: Status{Offset: 3}},
expected: "foo", expected: "bar",
}, },
{ {
name: "commit already exists", name: "commit already exists",

View File

@ -3,6 +3,7 @@ package testsuite
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -24,6 +25,11 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r
t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter))
t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus)) t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus))
t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter)) t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter))
t.Run("ResumeTruncate", makeTest(t, name, storeFn, checkResume(resumeTruncate)))
t.Run("ResumeDiscard", makeTest(t, name, storeFn, checkResume(resumeDiscard)))
t.Run("ResumeCopy", makeTest(t, name, storeFn, checkResume(resumeCopy)))
t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker)))
t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt)))
t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) t.Run("Labels", makeTest(t, name, storeFn, checkLabels))
} }
@ -352,6 +358,114 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
} }
func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) {
return func(ctx context.Context, t *testing.T, cs content.Store) {
sizes := []int64{500, 5000, 50000}
truncations := []float64{0.0, 0.1, 0.5, 0.9, 1.0}
for i, size := range sizes {
for j, tp := range truncations {
b, d := createContent(size, int64(i*len(truncations)+j))
limit := int64(float64(size) * tp)
ref := fmt.Sprintf("ref-%d-%d", i, j)
w, err := cs.Writer(ctx, ref, size, d)
if err != nil {
t.Fatal(err)
}
if _, err := w.Write(b[:limit]); err != nil {
w.Close()
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
w, err = cs.Writer(ctx, ref, size, d)
if err != nil {
t.Fatal(err)
}
st, err := w.Status()
if err != nil {
w.Close()
t.Fatal(err)
}
if st.Offset != limit {
w.Close()
t.Fatalf("Unexpected offset %d, expected %d", st.Offset, limit)
}
preCommit := time.Now()
if err := rf(ctx, w, b, limit, size, d); err != nil {
t.Fatalf("Resume failed: %+v", err)
}
postCommit := time.Now()
if err := w.Close(); err != nil {
t.Fatal(err)
}
info := content.Info{
Digest: d,
Size: size,
}
if err := checkInfo(ctx, cs, d, info, preCommit, postCommit, preCommit, postCommit); err != nil {
t.Fatalf("Check info failed: %+v", err)
}
}
}
}
}
func resumeTruncate(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error {
if err := w.Truncate(0); err != nil {
return errors.Wrap(err, "truncate failed")
}
if _, err := io.CopyBuffer(w, bytes.NewReader(b), make([]byte, 1024)); err != nil {
return errors.Wrap(err, "write failed")
}
return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed")
}
func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error {
if _, err := io.CopyBuffer(w, bytes.NewReader(b[written:]), make([]byte, 1024)); err != nil {
return errors.Wrap(err, "write failed")
}
return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed")
}
func resumeCopy(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error {
r := struct {
io.Reader
}{bytes.NewReader(b)}
return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed")
}
func resumeCopySeeker(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error {
r := struct {
io.ReadSeeker
}{bytes.NewReader(b)}
return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed")
}
func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error {
type readerAt interface {
io.Reader
io.ReaderAt
}
r := struct {
readerAt
}{bytes.NewReader(b)}
return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed")
}
func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) {
t.Helper() t.Helper()
st, err := w.Status() st, err := w.Status()