From 3e5e2ecc0a8c2bf6e89f6d4126bea813a6667d31 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 3 Nov 2017 16:42:55 -0700 Subject: [PATCH] content/local: consistent handling of data and locks The locks now retry on the backend side to prevent clients from having to round trip on locks that might be momentarily held. This exposed some timing errors in the updated_at fields for content ingest, so we've had to move that to a separate file to export the monotonic go runtime timestamps. Signed-off-by: Stephen J Day --- content/local/locks.go | 1 - content/local/store.go | 93 +++++++++++++++++++++++++++------- content/local/writer.go | 1 + content/testsuite/testsuite.go | 19 ++++--- services/content/writer.go | 2 +- 5 files changed, 88 insertions(+), 28 deletions(-) diff --git a/content/local/locks.go b/content/local/locks.go index cf5d0c59f..9a6c62fd8 100644 --- a/content/local/locks.go +++ b/content/local/locks.go @@ -8,7 +8,6 @@ import ( ) // Handles locking references -// TODO: use boltdb for lock status var ( // locks lets us lock in process diff --git a/content/local/store.go b/content/local/store.go index e4e90b39f..1c8156862 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "os" "path/filepath" "strconv" @@ -219,7 +220,7 @@ func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string // TODO(stevvooe): There are few more cases with subdirs that should be // handled in case the layout gets corrupted. This isn't strict enough - // an may spew bad data. + // and may spew bad data. if path == root { return nil @@ -324,24 +325,27 @@ func (s *store) status(ingestPath string) (content.Status, error) { return content.Status{}, err } - startedAtP, err := ioutil.ReadFile(filepath.Join(ingestPath, "startedat")) + startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat")) if err != nil { - if os.IsNotExist(err) { - err = errors.Wrap(errdefs.ErrNotFound, err.Error()) - } - return content.Status{}, err + return content.Status{}, errors.Wrapf(err, "could not read startedat") } - var startedAt time.Time - if err := startedAt.UnmarshalText(startedAtP); err != nil { - return content.Status{}, errors.Wrapf(err, "could not parse startedat file") + updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat")) + if err != nil { + return content.Status{}, errors.Wrapf(err, "could not read updatedat") + } + + // because we don't write updatedat on every write, the mod time may + // actually be more up to date. + if fi.ModTime().After(updatedAt) { + updatedAt = fi.ModTime() } return content.Status{ Ref: ref, Offset: fi.Size(), Total: s.total(ingestPath), - UpdatedAt: fi.ModTime(), + UpdatedAt: updatedAt, StartedAt: startedAt, }, nil } @@ -382,6 +386,37 @@ func (s *store) total(ingestPath string) int64 { // // The argument `ref` is used to uniquely identify a long-lived writer transaction. func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { + var lockErr error + for count := uint64(0); count < 10; count++ { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<