diff --git a/content/local/store.go b/content/local/store.go index 14a98881d..e4e90b39f 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -324,12 +324,25 @@ func (s *store) status(ingestPath string) (content.Status, error) { return content.Status{}, err } + startedAtP, err := ioutil.ReadFile(filepath.Join(ingestPath, "startedat")) + if err != nil { + if os.IsNotExist(err) { + err = errors.Wrap(errdefs.ErrNotFound, err.Error()) + } + return content.Status{}, err + } + + var startedAt time.Time + if err := startedAt.UnmarshalText(startedAtP); err != nil { + return content.Status{}, errors.Wrapf(err, "could not parse startedat file") + } + return content.Status{ Ref: ref, Offset: fi.Size(), Total: s.total(ingestPath), UpdatedAt: fi.ModTime(), - StartedAt: getStartTime(fi), + StartedAt: startedAt, }, nil } @@ -412,7 +425,7 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) } - // slow slow slow!!, send to goroutine or use resumable hashes + // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes fp, err := os.Open(data) if err != nil { return nil, err @@ -431,20 +444,29 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di startedAt = status.StartedAt total = status.Total } else { + startedAt = time.Now() + updatedAt = startedAt + // the ingest is new, we need to setup the target location. // write the ref to a file for later use if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { return nil, err } + startedAtP, err := startedAt.MarshalText() + if err != nil { + return nil, err + } + + if err := ioutil.WriteFile(filepath.Join(path, "startedat"), startedAtP, 0666); err != nil { + return nil, err + } + if total > 0 { if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil { return nil, err } } - - startedAt = time.Now() - updatedAt = startedAt } fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) @@ -452,6 +474,10 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di return nil, errors.Wrap(err, "failed to open data file") } + if _, err := fp.Seek(offset, io.SeekStart); err != nil { + return nil, errors.Wrap(err, "could not seek to current write offset") + } + return &writer{ s: s, fp: fp, diff --git a/content/testsuite/testsuite.go b/content/testsuite/testsuite.go index 6a4fd2043..03ff76f03 100644 --- a/content/testsuite/testsuite.go +++ b/content/testsuite/testsuite.go @@ -16,12 +16,14 @@ import ( "github.com/containerd/containerd/testutil" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) // ContentSuite runs a test suite on the content store given a factory function. func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, root string) (content.Store, func() error, error)) { t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus)) + t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter)) t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) } @@ -135,6 +137,79 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store } } +func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) { + checkWrite := func(t *testing.T, w io.Writer, p []byte) { + t.Helper() + n, err := w.Write(p) + if err != nil { + t.Fatal(err) + } + + if n != len(p) { + t.Fatal("short write to content store") + } + } + + var ( + cb, dgst = createContent(256, 1) + first, second = cb[:128], cb[128:] + ) + + preStart := time.Now() + w1, err := cs.Writer(ctx, "cb", 256, dgst) + if err != nil { + t.Fatal(err) + } + postStart := time.Now() + preUpdate := time.Now() + + checkWrite(t, w1, first) + postUpdate := time.Now() + + dgstFirst := digest.FromBytes(first) + expected := content.Status{ + Ref: "cb", + Offset: int64(len(first)), + Total: int64(len(cb)), + Expected: dgstFirst, + } + + checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate) + require.NoError(t, w1.Close(), "close first writer") + + w2, err := cs.Writer(ctx, "cb", 256, dgst) + if err != nil { + t.Fatal(err) + } + + // status should be consistent with version before close. + checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate) + + preUpdate = time.Now() + checkWrite(t, w2, second) + postUpdate = time.Now() + + expected.Offset = expected.Total + expected.Expected = dgst + checkStatus(t, w2, expected, dgst, preStart, postStart, preUpdate, postUpdate) + + preCommit := time.Now() + if err := w2.Commit(ctx, 0, ""); err != nil { + t.Fatalf("commit failed: %+v", err) + } + postCommit := time.Now() + + require.NoError(t, w2.Close(), "close second writer") + info := content.Info{ + Digest: dgst, + Size: 256, + } + + if err := checkInfo(ctx, cs, dgst, info, preCommit, postCommit, preCommit, postCommit); err != nil { + t.Fatalf("Check info failed: %+v", err) + } +} + func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { c1, d1 := createContent(256, 1) @@ -156,9 +231,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { preUpdate := preStart postUpdate := postStart - if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate) // Write first 64 bytes preUpdate = time.Now() @@ -168,9 +241,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { postUpdate = time.Now() expected.Offset = 64 d = digest.FromBytes(c1[:64]) - if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate) // Write next 128 bytes preUpdate = time.Now() @@ -180,9 +251,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { postUpdate = time.Now() expected.Offset = 192 d = digest.FromBytes(c1[:192]) - if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate) // Write last 64 bytes preUpdate = time.Now() @@ -191,9 +260,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { } postUpdate = time.Now() expected.Offset = 256 - if err := checkStatus(w1, expected, d1, preStart, postStart, preUpdate, postUpdate); err != nil { - t.Fatalf("Status check failed: %+v", err) - } + checkStatus(t, w1, expected, d1, preStart, postStart, preUpdate, postUpdate) preCommit := time.Now() if err := w1.Commit(ctx, 0, ""); err != nil { @@ -275,42 +342,41 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { } -func checkStatus(w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) error { +func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) { + t.Helper() st, err := w.Status() if err != nil { - return errors.Wrap(err, "failed to get status") + t.Fatalf("failed to get status: %v", err) } wd := w.Digest() if wd != d { - return errors.Errorf("unexpected digest %v, expected %v", wd, d) + t.Fatalf("unexpected digest %v, expected %v", wd, d) } if st.Ref != expected.Ref { - return errors.Errorf("unexpected ref %q, expected %q", st.Ref, expected.Ref) + t.Fatalf("unexpected ref %q, expected %q", st.Ref, expected.Ref) } if st.Offset != expected.Offset { - return errors.Errorf("unexpected offset %d, expected %d", st.Offset, expected.Offset) + t.Fatalf("unexpected offset %d, expected %d", st.Offset, expected.Offset) } if st.Total != expected.Total { - return errors.Errorf("unexpected total %d, expected %d", st.Total, expected.Total) + t.Fatalf("unexpected total %d, expected %d", st.Total, expected.Total) } // TODO: Add this test once all implementations guarantee this value is held //if st.Expected != expected.Expected { - // return errors.Errorf("unexpected \"expected digest\" %q, expected %q", st.Expected, expected.Expected) + // t.Fatalf("unexpected \"expected digest\" %q, expected %q", st.Expected, expected.Expected) //} if st.StartedAt.After(postStart) || st.StartedAt.Before(preStart) { - return errors.Errorf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart) + t.Fatalf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart) } if st.UpdatedAt.After(postUpdate) || st.UpdatedAt.Before(preUpdate) { - return errors.Errorf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate) + t.Fatalf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate) } - - return nil } func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {