From a9308e174d7bc20427be52dcd4c02433a6b1f8ac Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 1 Nov 2017 14:21:10 -0700 Subject: [PATCH] content/local: ensure that resumption is properly supported While early PoCs had download resumption working, we didn't have tests and had not verified the behavior. With this test suite, we now are able to show that download resumption is properly supported in the content store. In particular, there was a bug where resuming a download would not issue the writes to the correct offset in the file. A Seek was added to ensure we are writing from the current ingest offset. In this investigation, it was also discovered that using the OS/Disk created time on files was skewed from the monotonic clock in Go's runtime. The startedat values are now taken from the Go runtime and written to a separate file. Signed-off-by: Stephen J Day --- content/local/store.go | 36 +++++++++-- content/testsuite/testsuite.go | 112 ++++++++++++++++++++++++++------- 2 files changed, 120 insertions(+), 28 deletions(-) diff --git a/content/local/store.go b/content/local/store.go index 14a98881d..e4e90b39f 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -324,12 +324,25 @@ func (s *store) status(ingestPath string) (content.Status, error) { return content.Status{}, err } + startedAtP, err := ioutil.ReadFile(filepath.Join(ingestPath, "startedat")) + if err != nil { + if os.IsNotExist(err) { + err = errors.Wrap(errdefs.ErrNotFound, err.Error()) + } + return content.Status{}, err + } + + var startedAt time.Time + if err := startedAt.UnmarshalText(startedAtP); err != nil { + return content.Status{}, errors.Wrapf(err, "could not parse startedat file") + } + return content.Status{ Ref: ref, Offset: fi.Size(), Total: s.total(ingestPath), UpdatedAt: fi.ModTime(), - StartedAt: getStartTime(fi), + StartedAt: startedAt, }, nil } @@ -412,7 +425,7 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) } - // slow slow slow!!, send to goroutine or use resumable hashes + // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes fp, err := os.Open(data) if err != nil { return nil, err @@ -431,20 +444,29 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di startedAt = status.StartedAt total = status.Total } else { + startedAt = time.Now() + updatedAt = startedAt + // the ingest is new, we need to setup the target location. // write the ref to a file for later use if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { return nil, err } + startedAtP, err := startedAt.MarshalText() + if err != nil { + return nil, err + } + + if err := ioutil.WriteFile(filepath.Join(path, "startedat"), startedAtP, 0666); err != nil { + return nil, err + } + if total > 0 { if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil { return nil, err } } - - startedAt = time.Now() - updatedAt = startedAt } fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) @@ -452,6 +474,10 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di return nil, errors.Wrap(err, "failed to open data file") } + if _, err := fp.Seek(offset, io.SeekStart); err != nil { + return nil, errors.Wrap(err, "could not seek to current write offset") + } + return &writer{ s: s, fp: fp, diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 6a4fd2043..03ff76f03 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -16,12 +16,14 @@ import ( "github.com/containerd/containerd/testutil" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) // ContentSuite runs a test suite on the content store given a factory function. func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, root string) (content.Store, func() error, error)) { t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus)) + t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter)) t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) } @@ -135,6 +137,79 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store } } +func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) { + checkWrite := func(t *testing.T, w io.Writer, p []byte) { + t.Helper() + n, err := w.Write(p) + if err != nil { + t.Fatal(err) + } + + if n != len(p) { + t.Fatal("short write to content store") + } + } + + var ( + cb, dgst = createContent(256, 1) + first, second = cb[:128], cb[128:] + ) + + preStart := time.Now() + w1, err := cs.Writer(ctx, "cb", 256, dgst) + if err != nil { + t.Fatal(err) + } + postStart := time.Now() + preUpdate := time.Now() + + checkWrite(t, w1, first) + postUpdate := time.Now() + + dgstFirst := digest.FromBytes(first) + expected := content.Status{ + Ref: "cb", + Offset: int64(len(first)), + Total: int64(len(cb)), + Expected: dgstFirst, + } + + checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate) + require.NoError(t, w1.Close(), "close first writer") + + w2, err := cs.Writer(ctx, "cb", 256, dgst) + if err != nil { + t.Fatal(err) + } + + // status should be consistent with version before close. + checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate) + + preUpdate = time.Now() + checkWrite(t, w2, second) + postUpdate = time.Now() + + expected.Offset = expected.Total + expected.Expected = dgst + checkStatus(t, w2, expected, dgst, preStart, postStart, preUpdate, postUpdate) + + preCommit := time.Now() + if err := w2.Commit(ctx, 0, ""); err != nil { + t.Fatalf("commit failed: %+v", err) + } + postCommit := time.Now() + + require.NoError(t, w2.Close(), "close second writer") + info := content.Info{ + Digest: dgst, + Size: 256, + } + + if err := checkInfo(ctx, cs, dgst, info, preCommit, postCommit, preCommit, postCommit); err != nil { + t.Fatalf("Check info failed: %+v", err) + } +} + func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { c1, d1 := createContent(256, 1) @@ -156,9 +231,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { preUpdate := preStart postUpdate := postStart - if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate) // Write first 64 bytes preUpdate = time.Now() @@ -168,9 +241,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { postUpdate = time.Now() expected.Offset = 64 d = digest.FromBytes(c1[:64]) - if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate) // Write next 128 bytes preUpdate = time.Now() @@ -180,9 +251,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { postUpdate = time.Now() expected.Offset = 192 d = digest.FromBytes(c1[:192]) - if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate) // Write last 64 bytes preUpdate = time.Now() @@ -191,9 +260,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { } postUpdate = time.Now() expected.Offset = 256 - if err := checkStatus(w1, expected, d1, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d1, preStart, postStart, preUpdate, postUpdate) preCommit := time.Now() if err := w1.Commit(ctx, 0, ""); err != nil { @@ -275,42 +342,41 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { } -func checkStatus(w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) error { +func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { + t.Helper() st, err := w.Status() if err != nil { - return errors.Wrap(err, "failed to get status") + t.Fatalf("failed to get status: %v", err) } wd := w.Digest() if wd != d { - return errors.Errorf("unexpected digest %v, expected %v", wd, d) + t.Fatalf("unexpected digest %v, expected %v", wd, d) } if st.Ref != expected.Ref { - return errors.Errorf("unexpected ref %q, expected %q", st.Ref, expected.Ref) + t.Fatalf("unexpected ref %q, expected %q", st.Ref, expected.Ref) } if st.Offset != expected.Offset { - return errors.Errorf("unexpected offset %d, expected %d", st.Offset, expected.Offset) + t.Fatalf("unexpected offset %d, expected %d", st.Offset, expected.Offset) } if st.Total != expected.Total { - return errors.Errorf("unexpected total %d, expected %d", st.Total, expected.Total) + t.Fatalf("unexpected total %d, expected %d", st.Total, expected.Total) } // TODO: Add this test once all implementations guarantee this value is held //if st.Expected != expected.Expected { - // return errors.Errorf("unexpected \"expected digest\" %q, expected %q", st.Expected, expected.Expected) + // t.Fatalf("unexpected \"expected digest\" %q, expected %q", st.Expected, expected.Expected) //} if st.StartedAt.After(postStart) || st.StartedAt.Before(preStart) { - return errors.Errorf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart) + t.Fatalf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart) } if st.UpdatedAt.After(postUpdate) || st.UpdatedAt.Before(preUpdate) { - return errors.Errorf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate) + t.Fatalf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate) } - - return nil } func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {