content/local: ensure that resumption is properly supported

While early PoCs had download resumption working, we didn't have tests
and had not verified the behavior. With this test suite, we now are able
to show that download resumption is properly supported in the content
store. In particular, there was a bug where resuming a download would
not issue the writes to the correct offset in the file. A Seek was added
to ensure we are writing from the current ingest offset.

In this investigation, it was also discovered that using the OS/Disk
created time on files was skewed from the monotonic clock in Go's
runtime. The startedat values are now taken from the Go runtime and
written to a separate file.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-11-01 14:21:10 -07:00 committed by Michael Crosby
parent 368dc17a4c
commit a9308e174d
2 changed files with 120 additions and 28 deletions

View File

@ -324,12 +324,25 @@ func (s *store) status(ingestPath string) (content.Status, error) {
return content.Status{}, err return content.Status{}, err
} }
startedAtP, err := ioutil.ReadFile(filepath.Join(ingestPath, "startedat"))
if err != nil {
if os.IsNotExist(err) {
err = errors.Wrap(errdefs.ErrNotFound, err.Error())
}
return content.Status{}, err
}
var startedAt time.Time
if err := startedAt.UnmarshalText(startedAtP); err != nil {
return content.Status{}, errors.Wrapf(err, "could not parse startedat file")
}
return content.Status{ return content.Status{
Ref: ref, Ref: ref,
Offset: fi.Size(), Offset: fi.Size(),
Total: s.total(ingestPath), Total: s.total(ingestPath),
UpdatedAt: fi.ModTime(), UpdatedAt: fi.ModTime(),
StartedAt: getStartTime(fi), StartedAt: startedAt,
}, nil }, nil
} }
@ -412,7 +425,7 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
} }
// slow slow slow!!, send to goroutine or use resumable hashes // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
fp, err := os.Open(data) fp, err := os.Open(data)
if err != nil { if err != nil {
return nil, err return nil, err
@ -431,20 +444,29 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
startedAt = status.StartedAt startedAt = status.StartedAt
total = status.Total total = status.Total
} else { } else {
startedAt = time.Now()
updatedAt = startedAt
// the ingest is new, we need to setup the target location. // the ingest is new, we need to setup the target location.
// write the ref to a file for later use // write the ref to a file for later use
if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
return nil, err return nil, err
} }
startedAtP, err := startedAt.MarshalText()
if err != nil {
return nil, err
}
if err := ioutil.WriteFile(filepath.Join(path, "startedat"), startedAtP, 0666); err != nil {
return nil, err
}
if total > 0 { if total > 0 {
if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil { if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
return nil, err return nil, err
} }
} }
startedAt = time.Now()
updatedAt = startedAt
} }
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
@ -452,6 +474,10 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
return nil, errors.Wrap(err, "failed to open data file") return nil, errors.Wrap(err, "failed to open data file")
} }
if _, err := fp.Seek(offset, io.SeekStart); err != nil {
return nil, errors.Wrap(err, "could not seek to current write offset")
}
return &writer{ return &writer{
s: s, s: s,
fp: fp, fp: fp,

View File

@ -16,12 +16,14 @@ import (
"github.com/containerd/containerd/testutil" "github.com/containerd/containerd/testutil"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/require"
) )
// ContentSuite runs a test suite on the content store given a factory function. // ContentSuite runs a test suite on the content store given a factory function.
func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, root string) (content.Store, func() error, error)) { func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, root string) (content.Store, func() error, error)) {
t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter)) t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter))
t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus)) t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus))
t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter))
t.Run("Labels", makeTest(t, name, storeFn, checkLabels)) t.Run("Labels", makeTest(t, name, storeFn, checkLabels))
} }
@ -135,6 +137,79 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
} }
} }
func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
checkWrite := func(t *testing.T, w io.Writer, p []byte) {
t.Helper()
n, err := w.Write(p)
if err != nil {
t.Fatal(err)
}
if n != len(p) {
t.Fatal("short write to content store")
}
}
var (
cb, dgst = createContent(256, 1)
first, second = cb[:128], cb[128:]
)
preStart := time.Now()
w1, err := cs.Writer(ctx, "cb", 256, dgst)
if err != nil {
t.Fatal(err)
}
postStart := time.Now()
preUpdate := time.Now()
checkWrite(t, w1, first)
postUpdate := time.Now()
dgstFirst := digest.FromBytes(first)
expected := content.Status{
Ref: "cb",
Offset: int64(len(first)),
Total: int64(len(cb)),
Expected: dgstFirst,
}
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
require.NoError(t, w1.Close(), "close first writer")
w2, err := cs.Writer(ctx, "cb", 256, dgst)
if err != nil {
t.Fatal(err)
}
// status should be consistent with version before close.
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
preUpdate = time.Now()
checkWrite(t, w2, second)
postUpdate = time.Now()
expected.Offset = expected.Total
expected.Expected = dgst
checkStatus(t, w2, expected, dgst, preStart, postStart, preUpdate, postUpdate)
preCommit := time.Now()
if err := w2.Commit(ctx, 0, ""); err != nil {
t.Fatalf("commit failed: %+v", err)
}
postCommit := time.Now()
require.NoError(t, w2.Close(), "close second writer")
info := content.Info{
Digest: dgst,
Size: 256,
}
if err := checkInfo(ctx, cs, dgst, info, preCommit, postCommit, preCommit, postCommit); err != nil {
t.Fatalf("Check info failed: %+v", err)
}
}
func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) { func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256, 1) c1, d1 := createContent(256, 1)
@ -156,9 +231,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
preUpdate := preStart preUpdate := preStart
postUpdate := postStart postUpdate := postStart
if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate)
t.Fatalf("Status check failed: %+v", err)
}
// Write first 64 bytes // Write first 64 bytes
preUpdate = time.Now() preUpdate = time.Now()
@ -168,9 +241,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
postUpdate = time.Now() postUpdate = time.Now()
expected.Offset = 64 expected.Offset = 64
d = digest.FromBytes(c1[:64]) d = digest.FromBytes(c1[:64])
if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate)
t.Fatalf("Status check failed: %+v", err)
}
// Write next 128 bytes // Write next 128 bytes
preUpdate = time.Now() preUpdate = time.Now()
@ -180,9 +251,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
postUpdate = time.Now() postUpdate = time.Now()
expected.Offset = 192 expected.Offset = 192
d = digest.FromBytes(c1[:192]) d = digest.FromBytes(c1[:192])
if err := checkStatus(w1, expected, d, preStart, postStart, preUpdate, postUpdate); err != nil { checkStatus(t, w1, expected, d, preStart, postStart, preUpdate, postUpdate)
t.Fatalf("Status check failed: %+v", err)
}
// Write last 64 bytes // Write last 64 bytes
preUpdate = time.Now() preUpdate = time.Now()
@ -191,9 +260,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
} }
postUpdate = time.Now() postUpdate = time.Now()
expected.Offset = 256 expected.Offset = 256
if err := checkStatus(w1, expected, d1, preStart, postStart, preUpdate, postUpdate); err != nil { checkStatus(t, w1, expected, d1, preStart, postStart, preUpdate, postUpdate)
t.Fatalf("Status check failed: %+v", err)
}
preCommit := time.Now() preCommit := time.Now()
if err := w1.Commit(ctx, 0, ""); err != nil { if err := w1.Commit(ctx, 0, ""); err != nil {
@ -275,42 +342,41 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
} }
func checkStatus(w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) error { func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) {
t.Helper()
st, err := w.Status() st, err := w.Status()
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get status") t.Fatalf("failed to get status: %v", err)
} }
wd := w.Digest() wd := w.Digest()
if wd != d { if wd != d {
return errors.Errorf("unexpected digest %v, expected %v", wd, d) t.Fatalf("unexpected digest %v, expected %v", wd, d)
} }
if st.Ref != expected.Ref { if st.Ref != expected.Ref {
return errors.Errorf("unexpected ref %q, expected %q", st.Ref, expected.Ref) t.Fatalf("unexpected ref %q, expected %q", st.Ref, expected.Ref)
} }
if st.Offset != expected.Offset { if st.Offset != expected.Offset {
return errors.Errorf("unexpected offset %d, expected %d", st.Offset, expected.Offset) t.Fatalf("unexpected offset %d, expected %d", st.Offset, expected.Offset)
} }
if st.Total != expected.Total { if st.Total != expected.Total {
return errors.Errorf("unexpected total %d, expected %d", st.Total, expected.Total) t.Fatalf("unexpected total %d, expected %d", st.Total, expected.Total)
} }
// TODO: Add this test once all implementations guarantee this value is held // TODO: Add this test once all implementations guarantee this value is held
//if st.Expected != expected.Expected { //if st.Expected != expected.Expected {
// return errors.Errorf("unexpected \"expected digest\" %q, expected %q", st.Expected, expected.Expected) // t.Fatalf("unexpected \"expected digest\" %q, expected %q", st.Expected, expected.Expected)
//} //}
if st.StartedAt.After(postStart) || st.StartedAt.Before(preStart) { if st.StartedAt.After(postStart) || st.StartedAt.Before(preStart) {
return errors.Errorf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart) t.Fatalf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart)
} }
if st.UpdatedAt.After(postUpdate) || st.UpdatedAt.Before(preUpdate) { if st.UpdatedAt.After(postUpdate) || st.UpdatedAt.Before(preUpdate) {
return errors.Errorf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate) t.Fatalf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate)
} }
return nil
} }
func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error { func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {