content/local: consistent handling of data and locks
The locks now retry on the backend side to prevent clients from having to round trip on locks that might be momentarily held. This exposed some timing errors in the updated_at fields for content ingest, so we've had to move that to a separate file to export the monotonic go runtime timestamps. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
f4fdb940ed
commit
3e5e2ecc0a
@ -8,7 +8,6 @@ import (
|
||||
)
|
||||
|
||||
// Handles locking references
|
||||
// TODO: use boltdb for lock status
|
||||
|
||||
var (
|
||||
// locks lets us lock in process
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
@ -219,7 +220,7 @@ func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string
|
||||
|
||||
// TODO(stevvooe): There are few more cases with subdirs that should be
|
||||
// handled in case the layout gets corrupted. This isn't strict enough
|
||||
// an may spew bad data.
|
||||
// and may spew bad data.
|
||||
|
||||
if path == root {
|
||||
return nil
|
||||
@ -324,24 +325,27 @@ func (s *store) status(ingestPath string) (content.Status, error) {
|
||||
return content.Status{}, err
|
||||
}
|
||||
|
||||
startedAtP, err := ioutil.ReadFile(filepath.Join(ingestPath, "startedat"))
|
||||
startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = errors.Wrap(errdefs.ErrNotFound, err.Error())
|
||||
}
|
||||
return content.Status{}, err
|
||||
return content.Status{}, errors.Wrapf(err, "could not read startedat")
|
||||
}
|
||||
|
||||
var startedAt time.Time
|
||||
if err := startedAt.UnmarshalText(startedAtP); err != nil {
|
||||
return content.Status{}, errors.Wrapf(err, "could not parse startedat file")
|
||||
updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))
|
||||
if err != nil {
|
||||
return content.Status{}, errors.Wrapf(err, "could not read updatedat")
|
||||
}
|
||||
|
||||
// because we don't write updatedat on every write, the mod time may
|
||||
// actually be more up to date.
|
||||
if fi.ModTime().After(updatedAt) {
|
||||
updatedAt = fi.ModTime()
|
||||
}
|
||||
|
||||
return content.Status{
|
||||
Ref: ref,
|
||||
Offset: fi.Size(),
|
||||
Total: s.total(ingestPath),
|
||||
UpdatedAt: fi.ModTime(),
|
||||
UpdatedAt: updatedAt,
|
||||
StartedAt: startedAt,
|
||||
}, nil
|
||||
}
|
||||
@ -382,6 +386,37 @@ func (s *store) total(ingestPath string) int64 {
|
||||
//
|
||||
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
|
||||
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
|
||||
var lockErr error
|
||||
for count := uint64(0); count < 10; count++ {
|
||||
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
|
||||
if err := tryLock(ref); err != nil {
|
||||
if !errdefs.IsUnavailable(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lockErr = err
|
||||
} else {
|
||||
lockErr = nil
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lockErr != nil {
|
||||
return nil, lockErr
|
||||
}
|
||||
|
||||
w, err := s.writer(ctx, ref, total, expected)
|
||||
if err != nil {
|
||||
unlock(ref)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return w, nil // lock is now held by w.
|
||||
}
|
||||
|
||||
// writer provides the main implementation of the Writer method. The caller
|
||||
// must hold the lock correctly and release on error if there is a problem.
|
||||
func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
|
||||
// TODO(stevvooe): Need to actually store expected here. We have
|
||||
// code in the service that shouldn't be dealing with this.
|
||||
if expected != "" {
|
||||
@ -393,10 +428,6 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
|
||||
|
||||
path, refp, data := s.ingestPaths(ref)
|
||||
|
||||
if err := tryLock(ref); err != nil {
|
||||
return nil, errors.Wrapf(err, "locking ref %v failed", ref)
|
||||
}
|
||||
|
||||
var (
|
||||
digester = digest.Canonical.Digester()
|
||||
offset int64
|
||||
@ -453,12 +484,11 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
|
||||
return nil, err
|
||||
}
|
||||
|
||||
startedAtP, err := startedAt.MarshalText()
|
||||
if err != nil {
|
||||
if writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(filepath.Join(path, "startedat"), startedAtP, 0666); err != nil {
|
||||
if writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -469,7 +499,7 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
|
||||
}
|
||||
}
|
||||
|
||||
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
|
||||
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to open data file")
|
||||
}
|
||||
@ -535,3 +565,30 @@ func readFileString(path string) (string, error) {
|
||||
p, err := ioutil.ReadFile(path)
|
||||
return string(p), err
|
||||
}
|
||||
|
||||
// readFileTimestamp reads a file with just a timestamp present.
|
||||
func readFileTimestamp(p string) (time.Time, error) {
|
||||
b, err := ioutil.ReadFile(p)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = errors.Wrap(errdefs.ErrNotFound, err.Error())
|
||||
}
|
||||
return time.Time{}, err
|
||||
}
|
||||
|
||||
var t time.Time
|
||||
if err := t.UnmarshalText(b); err != nil {
|
||||
return time.Time{}, errors.Wrapf(err, "could not parse timestamp file %v", p)
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func writeTimestampFile(p string, t time.Time) error {
|
||||
b, err := t.MarshalText()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(p, b, 0666)
|
||||
}
|
||||
|
@ -152,6 +152,7 @@ func (w *writer) Close() (err error) {
|
||||
if w.fp != nil {
|
||||
w.fp.Sync()
|
||||
err = w.fp.Close()
|
||||
writeTimestampFile(filepath.Join(w.path, "updatedat"), w.updatedAt)
|
||||
w.fp = nil
|
||||
unlock(w.ref)
|
||||
return
|
||||
|
@ -151,24 +151,25 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
}
|
||||
|
||||
var (
|
||||
cb, dgst = createContent(256, 1)
|
||||
ref = "cb"
|
||||
cb, dgst = createContent(256, 10)
|
||||
first, second = cb[:128], cb[128:]
|
||||
)
|
||||
|
||||
preStart := time.Now()
|
||||
w1, err := cs.Writer(ctx, "cb", 256, dgst)
|
||||
w1, err := cs.Writer(ctx, ref, 256, dgst)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
postStart := time.Now()
|
||||
preUpdate := time.Now()
|
||||
preUpdate := postStart
|
||||
|
||||
checkWrite(t, w1, first)
|
||||
postUpdate := time.Now()
|
||||
|
||||
dgstFirst := digest.FromBytes(first)
|
||||
expected := content.Status{
|
||||
Ref: "cb",
|
||||
Ref: ref,
|
||||
Offset: int64(len(first)),
|
||||
Total: int64(len(cb)),
|
||||
Expected: dgstFirst,
|
||||
@ -177,13 +178,13 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
|
||||
require.NoError(t, w1.Close(), "close first writer")
|
||||
|
||||
w2, err := cs.Writer(ctx, "cb", 256, dgst)
|
||||
w2, err := cs.Writer(ctx, ref, 256, dgst)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// status should be consistent with version before close.
|
||||
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
|
||||
checkStatus(t, w2, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
|
||||
|
||||
preUpdate = time.Now()
|
||||
checkWrite(t, w2, second)
|
||||
@ -211,7 +212,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
}
|
||||
|
||||
func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
c1, d1 := createContent(256, 1)
|
||||
c1, d1 := createContent(256, 17)
|
||||
|
||||
preStart := time.Now()
|
||||
w1, err := cs.Writer(ctx, "c1", 256, d1)
|
||||
@ -279,7 +280,7 @@ func checkUploadStatus(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
}
|
||||
|
||||
func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
c1, d1 := createContent(256, 1)
|
||||
c1, d1 := createContent(256, 19)
|
||||
|
||||
w1, err := cs.Writer(ctx, "c1", 256, d1)
|
||||
if err != nil {
|
||||
@ -374,6 +375,8 @@ func checkStatus(t *testing.T, w content.Writer, expected content.Status, d dige
|
||||
if st.StartedAt.After(postStart) || st.StartedAt.Before(preStart) {
|
||||
t.Fatalf("unexpected started at time %s, expected between %s and %s", st.StartedAt, preStart, postStart)
|
||||
}
|
||||
|
||||
t.Logf("compare update %v against (%v, %v)", st.UpdatedAt, preUpdate, postUpdate)
|
||||
if st.UpdatedAt.After(postUpdate) || st.UpdatedAt.Before(preUpdate) {
|
||||
t.Fatalf("unexpected updated at time %s, expected between %s and %s", st.UpdatedAt, preUpdate, postUpdate)
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ func (rw *remoteWriter) Status() (content.Status, error) {
|
||||
Action: contentapi.WriteActionStat,
|
||||
})
|
||||
if err != nil {
|
||||
return content.Status{}, err
|
||||
return content.Status{}, errors.Wrap(err, "error getting writer status")
|
||||
}
|
||||
|
||||
return content.Status{
|
||||
|
Loading…
Reference in New Issue
Block a user