Replace lockfile with reference lock
Updates content service to handle lock errors and return them to the client. The client remote handler has been updated to retry when a resource is locked until the resource is unlocked or the expected resource exists. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
@@ -21,6 +21,12 @@ var (
|
||||
// Use IsExists(err) to detect this condition.
|
||||
ErrExists = errors.New("content: exists")
|
||||
|
||||
// ErrLocked is returned when content is actively being uploaded, this
|
||||
// indicates that another process is attempting to upload the same content.
|
||||
//
|
||||
// Use IsLocked(err) to detect this condition.
|
||||
ErrLocked = errors.New("content: locked")
|
||||
|
||||
bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 1<<20)
|
||||
@@ -107,3 +113,7 @@ func IsNotFound(err error) bool {
|
||||
func IsExists(err error) bool {
|
||||
return errors.Cause(err) == ErrExists
|
||||
}
|
||||
|
||||
func IsLocked(err error) bool {
|
||||
return errors.Cause(err) == ErrLocked
|
||||
}
|
||||
|
||||
@@ -3,54 +3,35 @@ package content
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/nightlyone/lockfile"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// In addition to providing inter-process locks for content ingest, we also
|
||||
// define a global in process lock to prevent two goroutines writing to the
|
||||
// same file.
|
||||
//
|
||||
// This is pretty unsophisticated for now. In the future, we'd probably like to
|
||||
// have more information about who is holding which locks, as well as better
|
||||
// error reporting.
|
||||
// Handles locking references
|
||||
// TODO: use boltdb for lock status
|
||||
|
||||
var (
|
||||
errLocked = errors.New("key is locked")
|
||||
|
||||
// locks lets us lock in process, as well as output of process.
|
||||
locks = map[lockfile.Lockfile]struct{}{}
|
||||
// locks lets us lock in process
|
||||
locks = map[string]struct{}{}
|
||||
locksMu sync.Mutex
|
||||
)
|
||||
|
||||
func tryLock(lock lockfile.Lockfile) error {
|
||||
func tryLock(ref string) error {
|
||||
locksMu.Lock()
|
||||
defer locksMu.Unlock()
|
||||
|
||||
if _, ok := locks[lock]; ok {
|
||||
return errLocked
|
||||
if _, ok := locks[ref]; ok {
|
||||
return errors.Wrapf(ErrLocked, "key %s is locked", ref)
|
||||
}
|
||||
|
||||
if err := lock.TryLock(); err != nil {
|
||||
if errors.Cause(err) == lockfile.ErrBusy {
|
||||
return errLocked
|
||||
}
|
||||
|
||||
return errors.Wrapf(err, "lock.TryLock() encountered an error")
|
||||
}
|
||||
|
||||
locks[lock] = struct{}{}
|
||||
locks[ref] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func unlock(lock lockfile.Lockfile) error {
|
||||
func unlock(ref string) {
|
||||
locksMu.Lock()
|
||||
defer locksMu.Unlock()
|
||||
|
||||
if _, ok := locks[lock]; !ok {
|
||||
return nil
|
||||
if _, ok := locks[ref]; ok {
|
||||
delete(locks, ref)
|
||||
}
|
||||
|
||||
delete(locks, lock)
|
||||
return lock.Unlock()
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/nightlyone/lockfile"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@@ -230,17 +229,10 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
|
||||
// TODO(stevvooe): Need to actually store and handle expected here. We have
|
||||
// code in the service that shouldn't be dealing with this.
|
||||
|
||||
path, refp, data, lock, err := s.ingestPaths(ref)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path, refp, data := s.ingestPaths(ref)
|
||||
|
||||
if err := tryLock(lock); err != nil {
|
||||
if !os.IsNotExist(errors.Cause(err)) {
|
||||
return nil, errors.Wrapf(err, "locking %v failed", ref)
|
||||
}
|
||||
|
||||
// if it doesn't exist, we'll make it so below!
|
||||
if err := tryLock(ref); err != nil {
|
||||
return nil, errors.Wrapf(err, "locking %v failed", ref)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -314,7 +306,6 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
|
||||
return &writer{
|
||||
s: s,
|
||||
fp: fp,
|
||||
lock: lock,
|
||||
ref: ref,
|
||||
path: path,
|
||||
offset: offset,
|
||||
@@ -349,25 +340,18 @@ func (s *store) ingestRoot(ref string) string {
|
||||
return filepath.Join(s.root, "ingest", dgst.Hex())
|
||||
}
|
||||
|
||||
// ingestPaths are returned, including the lockfile. The paths are the following:
|
||||
// ingestPaths are returned. The paths are the following:
|
||||
//
|
||||
// - root: entire ingest directory
|
||||
// - ref: name of the starting ref, must be unique
|
||||
// - data: file where data is written
|
||||
// - lock: lock file location
|
||||
//
|
||||
func (s *store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) {
|
||||
func (s *store) ingestPaths(ref string) (string, string, string) {
|
||||
var (
|
||||
fp = s.ingestRoot(ref)
|
||||
rp = filepath.Join(fp, "ref")
|
||||
lp = filepath.Join(fp, "lock")
|
||||
dp = filepath.Join(fp, "data")
|
||||
)
|
||||
|
||||
lock, err := lockfile.New(lp)
|
||||
if err != nil {
|
||||
return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp)
|
||||
}
|
||||
|
||||
return fp, rp, dp, lock, nil
|
||||
return fp, rp, dp
|
||||
}
|
||||
|
||||
@@ -5,8 +5,6 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/nightlyone/lockfile"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@@ -15,9 +13,8 @@ import (
|
||||
type writer struct {
|
||||
s *store
|
||||
fp *os.File // opened data file
|
||||
lock lockfile.Lockfile
|
||||
path string // path to writer dir
|
||||
ref string // ref key
|
||||
path string // path to writer dir
|
||||
ref string // ref key
|
||||
offset int64
|
||||
total int64
|
||||
digester digest.Digester
|
||||
@@ -107,8 +104,9 @@ func (w *writer) Commit(size int64, expected digest.Digest) error {
|
||||
return err
|
||||
}
|
||||
|
||||
unlock(w.lock)
|
||||
unlock(w.ref)
|
||||
w.fp = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -122,9 +120,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error {
|
||||
// To abandon a transaction completely, first call close then `Store.Remove` to
|
||||
// clean up the associated resources.
|
||||
func (cw *writer) Close() (err error) {
|
||||
if err := unlock(cw.lock); err != nil {
|
||||
log.L.Debug("unlock failed: %v", err)
|
||||
}
|
||||
unlock(cw.ref)
|
||||
|
||||
if cw.fp != nil {
|
||||
cw.fp.Sync()
|
||||
|
||||
Reference in New Issue
Block a user