diff --git a/content/content.go b/content/content.go index 7c02ded53..a61e2aa1f 100644 --- a/content/content.go +++ b/content/content.go @@ -21,6 +21,12 @@ var ( // Use IsExists(err) to detect this condition. ErrExists = errors.New("content: exists") + // ErrLocked is returned when content is actively being uploaded, this + // indicates that another process is attempting to upload the same content. + // + // Use IsLocked(err) to detect this condition. + ErrLocked = errors.New("content: locked") + bufPool = sync.Pool{ New: func() interface{} { return make([]byte, 1<<20) @@ -107,3 +113,7 @@ func IsNotFound(err error) bool { func IsExists(err error) bool { return errors.Cause(err) == ErrExists } + +func IsLocked(err error) bool { + return errors.Cause(err) == ErrLocked +} diff --git a/content/locks.go b/content/locks.go index 400793d4a..44ed16d3d 100644 --- a/content/locks.go +++ b/content/locks.go @@ -3,54 +3,35 @@ package content import ( "sync" - "github.com/nightlyone/lockfile" "github.com/pkg/errors" ) -// In addition to providing inter-process locks for content ingest, we also -// define a global in process lock to prevent two goroutines writing to the -// same file. -// -// This is pretty unsophisticated for now. In the future, we'd probably like to -// have more information about who is holding which locks, as well as better -// error reporting. +// Handles locking references +// TODO: use boltdb for lock status var ( - errLocked = errors.New("key is locked") - - // locks lets us lock in process, as well as output of process. - locks = map[lockfile.Lockfile]struct{}{} + // locks lets us lock in process + locks = map[string]struct{}{} locksMu sync.Mutex ) -func tryLock(lock lockfile.Lockfile) error { +func tryLock(ref string) error { locksMu.Lock() defer locksMu.Unlock() - if _, ok := locks[lock]; ok { - return errLocked + if _, ok := locks[ref]; ok { + return errors.Wrapf(ErrLocked, "key %s is locked", ref) } - if err := lock.TryLock(); err != nil { - if errors.Cause(err) == lockfile.ErrBusy { - return errLocked - } - - return errors.Wrapf(err, "lock.TryLock() encountered an error") - } - - locks[lock] = struct{}{} + locks[ref] = struct{}{} return nil } -func unlock(lock lockfile.Lockfile) error { +func unlock(ref string) { locksMu.Lock() defer locksMu.Unlock() - if _, ok := locks[lock]; !ok { - return nil + if _, ok := locks[ref]; ok { + delete(locks, ref) } - - delete(locks, lock) - return lock.Unlock() } diff --git a/content/store.go b/content/store.go index f1d59caf3..dd2092de7 100644 --- a/content/store.go +++ b/content/store.go @@ -12,7 +12,6 @@ import ( "time" "github.com/containerd/containerd/log" - "github.com/nightlyone/lockfile" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -230,17 +229,10 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di // TODO(stevvooe): Need to actually store and handle expected here. We have // code in the service that shouldn't be dealing with this. - path, refp, data, lock, err := s.ingestPaths(ref) - if err != nil { - return nil, err - } + path, refp, data := s.ingestPaths(ref) - if err := tryLock(lock); err != nil { - if !os.IsNotExist(errors.Cause(err)) { - return nil, errors.Wrapf(err, "locking %v failed", ref) - } - - // if it doesn't exist, we'll make it so below! + if err := tryLock(ref); err != nil { + return nil, errors.Wrapf(err, "locking %v failed", ref) } var ( @@ -314,7 +306,6 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di return &writer{ s: s, fp: fp, - lock: lock, ref: ref, path: path, offset: offset, @@ -349,25 +340,18 @@ func (s *store) ingestRoot(ref string) string { return filepath.Join(s.root, "ingest", dgst.Hex()) } -// ingestPaths are returned, including the lockfile. The paths are the following: +// ingestPaths are returned. The paths are the following: // // - root: entire ingest directory // - ref: name of the starting ref, must be unique // - data: file where data is written -// - lock: lock file location // -func (s *store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) { +func (s *store) ingestPaths(ref string) (string, string, string) { var ( fp = s.ingestRoot(ref) rp = filepath.Join(fp, "ref") - lp = filepath.Join(fp, "lock") dp = filepath.Join(fp, "data") ) - lock, err := lockfile.New(lp) - if err != nil { - return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp) - } - - return fp, rp, dp, lock, nil + return fp, rp, dp } diff --git a/content/writer.go b/content/writer.go index 2a6f506a7..c50636960 100644 --- a/content/writer.go +++ b/content/writer.go @@ -5,8 +5,6 @@ import ( "path/filepath" "time" - "github.com/containerd/containerd/log" - "github.com/nightlyone/lockfile" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -15,9 +13,8 @@ import ( type writer struct { s *store fp *os.File // opened data file - lock lockfile.Lockfile - path string // path to writer dir - ref string // ref key + path string // path to writer dir + ref string // ref key offset int64 total int64 digester digest.Digester @@ -107,8 +104,9 @@ func (w *writer) Commit(size int64, expected digest.Digest) error { return err } - unlock(w.lock) + unlock(w.ref) w.fp = nil + return nil } @@ -122,9 +120,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error { // To abandon a transaction completely, first call close then `Store.Remove` to // clean up the associated resources. func (cw *writer) Close() (err error) { - if err := unlock(cw.lock); err != nil { - log.L.Debug("unlock failed: %v", err) - } + unlock(cw.ref) if cw.fp != nil { cw.fp.Sync() diff --git a/remotes/handlers.go b/remotes/handlers.go index 06d75a5ca..ad3343637 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -3,6 +3,7 @@ package remotes import ( "context" "fmt" + "time" "github.com/Sirupsen/logrus" "github.com/containerd/containerd/content" @@ -55,17 +56,39 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { log.G(ctx).Debug("fetch") - ref := MakeRefKey(ctx, desc) - cw, err := ingester.Writer(ctx, ref, desc.Size, desc.Digest) - if err != nil { - if !content.IsExists(err) { - return err + var ( + ref = MakeRefKey(ctx, desc) + cw content.Writer + err error + retry = 16 + ) + for { + cw, err = ingester.Writer(ctx, ref, desc.Size, desc.Digest) + if err != nil { + if content.IsExists(err) { + return nil + } else if !content.IsLocked(err) { + return err + } + + // TODO: On first time locked is encountered, get status + // of writer and abort if not updated recently. + + select { + case <-time.After(time.Millisecond * time.Duration(retry)): + if retry < 2048 { + retry = retry << 1 + } + continue + case <-ctx.Done(): + // Propagate lock error + return err + } } - - return nil + defer cw.Close() + break } - defer cw.Close() rc, err := fetcher.Fetch(ctx, desc) if err != nil { diff --git a/services/content/helpers.go b/services/content/helpers.go index a107bcb98..d518909c5 100644 --- a/services/content/helpers.go +++ b/services/content/helpers.go @@ -13,6 +13,8 @@ func rewriteGRPCError(err error) error { return content.ErrExists case codes.NotFound: return content.ErrNotFound + case codes.Unavailable: + return content.ErrLocked } return err @@ -24,6 +26,8 @@ func serverErrorToGRPC(err error, id string) error { return grpc.Errorf(codes.NotFound, "%v: not found", id) case content.IsExists(err): return grpc.Errorf(codes.AlreadyExists, "%v: exists", id) + case content.IsLocked(err): + return grpc.Errorf(codes.Unavailable, "%v: locked", id) } return err diff --git a/services/content/service.go b/services/content/service.go index 662d41d65..34b981bdd 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -275,7 +275,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { // this action locks the writer for the session. wr, err := s.store.Writer(ctx, ref, total, expected) if err != nil { - return err + return serverErrorToGRPC(err, ref) } defer wr.Close() @@ -283,7 +283,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { msg.Action = req.Action ws, err := wr.Status() if err != nil { - return err + return serverErrorToGRPC(err, ref) } msg.Offset = ws.Offset // always set the offset. diff --git a/vendor.conf b/vendor.conf index 76ebbacdf..1bd280d2a 100644 --- a/vendor.conf +++ b/vendor.conf @@ -25,7 +25,6 @@ github.com/urfave/cli 8ba6f23b6e36d03666a14bd9421f5e3efcb59aca golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6 google.golang.org/grpc v1.3.0 github.com/pkg/errors v0.8.0 -github.com/nightlyone/lockfile 1d49c987357a327b5b03aa84cbddd582c328615d github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448 golang.org/x/sys f3918c30c5c2cb527c0b071a27c35120a6c0719a github.com/opencontainers/image-spec v1.0.0-rc6 diff --git a/vendor/github.com/nightlyone/lockfile/LICENSE b/vendor/github.com/nightlyone/lockfile/LICENSE deleted file mode 100644 index eb5b80468..000000000 --- a/vendor/github.com/nightlyone/lockfile/LICENSE +++ /dev/null @@ -1,19 +0,0 @@ -Copyright (c) 2012 Ingo Oeser - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/vendor/github.com/nightlyone/lockfile/README.md b/vendor/github.com/nightlyone/lockfile/README.md deleted file mode 100644 index 54ee19cc4..000000000 --- a/vendor/github.com/nightlyone/lockfile/README.md +++ /dev/null @@ -1,52 +0,0 @@ -lockfile -========= -Handle locking via pid files. - -[![Build Status Unix][1]][2] -[![Build status Windows][3]][4] - -[1]: https://secure.travis-ci.org/nightlyone/lockfile.png -[2]: https://travis-ci.org/nightlyone/lockfile -[3]: https://ci.appveyor.com/api/projects/status/7mojkmauj81uvp8u/branch/master?svg=true -[4]: https://ci.appveyor.com/project/nightlyone/lockfile/branch/master - - - -install -------- -Install [Go 1][5], either [from source][6] or [with a prepackaged binary][7]. -For Windows suport, Go 1.4 or newer is required. - -Then run - - go get github.com/nightlyone/lockfile - -[5]: http://golang.org -[6]: http://golang.org/doc/install/source -[7]: http://golang.org/doc/install - -LICENSE -------- -BSD - -documentation -------------- -[package documentation at godoc.org](http://godoc.org/github.com/nightlyone/lockfile) - -install -------------------- - go get github.com/nightlyone/lockfile - - -contributing -============ - -Contributions are welcome. Please open an issue or send me a pull request for a dedicated branch. -Make sure the git commit hooks show it works. - -git commit hooks ------------------------ -enable commit hooks via - - cd .git ; rm -rf hooks; ln -s ../git-hooks hooks ; cd .. - diff --git a/vendor/github.com/nightlyone/lockfile/lockfile.go b/vendor/github.com/nightlyone/lockfile/lockfile.go deleted file mode 100644 index da00bec78..000000000 --- a/vendor/github.com/nightlyone/lockfile/lockfile.go +++ /dev/null @@ -1,201 +0,0 @@ -// Package lockfile handles pid file based locking. -// While a sync.Mutex helps against concurrency issues within a single process, -// this package is designed to help against concurrency issues between cooperating processes -// or serializing multiple invocations of the same process. You can also combine sync.Mutex -// with Lockfile in order to serialize an action between different goroutines in a single program -// and also multiple invocations of this program. -package lockfile - -import ( - "errors" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" -) - -// Lockfile is a pid file which can be locked -type Lockfile string - -// TemporaryError is a type of error where a retry after a random amount of sleep should help to mitigate it. -type TemporaryError string - -func (t TemporaryError) Error() string { return string(t) } - -// Temporary returns always true. -// It exists, so you can detect it via -// if te, ok := err.(interface{ Temporary() bool }); ok { -// fmt.Println("I am a temporay error situation, so wait and retry") -// } -func (t TemporaryError) Temporary() bool { return true } - -// Various errors returned by this package -var ( - ErrBusy = TemporaryError("Locked by other process") // If you get this, retry after a short sleep might help - ErrNotExist = TemporaryError("Lockfile created, but doesn't exist") // If you get this, retry after a short sleep might help - ErrNeedAbsPath = errors.New("Lockfiles must be given as absolute path names") - ErrInvalidPid = errors.New("Lockfile contains invalid pid for system") - ErrDeadOwner = errors.New("Lockfile contains pid of process not existent on this system anymore") - ErrRogueDeletion = errors.New("Lockfile owned by me has been removed unexpectedly") -) - -// New describes a new filename located at the given absolute path. -func New(path string) (Lockfile, error) { - if !filepath.IsAbs(path) { - return Lockfile(""), ErrNeedAbsPath - } - return Lockfile(path), nil -} - -// GetOwner returns who owns the lockfile. -func (l Lockfile) GetOwner() (*os.Process, error) { - name := string(l) - - // Ok, see, if we have a stale lockfile here - content, err := ioutil.ReadFile(name) - if err != nil { - return nil, err - } - - // try hard for pids. If no pid, the lockfile is junk anyway and we delete it. - pid, err := scanPidLine(content) - if err != nil { - return nil, err - } - running, err := isRunning(pid) - if err != nil { - return nil, err - } - - if running { - proc, err := os.FindProcess(pid) - if err != nil { - return nil, err - } - return proc, nil - } - return nil, ErrDeadOwner - -} - -// TryLock tries to own the lock. -// It Returns nil, if successful and and error describing the reason, it didn't work out. -// Please note, that existing lockfiles containing pids of dead processes -// and lockfiles containing no pid at all are simply deleted. -func (l Lockfile) TryLock() error { - name := string(l) - - // This has been checked by New already. If we trigger here, - // the caller didn't use New and re-implemented it's functionality badly. - // So panic, that he might find this easily during testing. - if !filepath.IsAbs(name) { - panic(ErrNeedAbsPath) - } - - tmplock, err := ioutil.TempFile(filepath.Dir(name), "") - if err != nil { - return err - } - - cleanup := func() { - _ = tmplock.Close() - _ = os.Remove(tmplock.Name()) - } - defer cleanup() - - if err := writePidLine(tmplock, os.Getpid()); err != nil { - return err - } - - // return value intentionally ignored, as ignoring it is part of the algorithm - _ = os.Link(tmplock.Name(), name) - - fiTmp, err := os.Lstat(tmplock.Name()) - if err != nil { - return err - } - fiLock, err := os.Lstat(name) - if err != nil { - // tell user that a retry would be a good idea - if os.IsNotExist(err) { - return ErrNotExist - } - return err - } - - // Success - if os.SameFile(fiTmp, fiLock) { - return nil - } - - proc, err := l.GetOwner() - switch err { - default: - // Other errors -> defensively fail and let caller handle this - return err - case nil: - if proc.Pid != os.Getpid() { - return ErrBusy - } - case ErrDeadOwner, ErrInvalidPid: - // cases we can fix below - } - - // clean stale/invalid lockfile - err = os.Remove(name) - if err != nil { - // If it doesn't exist, then it doesn't matter who removed it. - if !os.IsNotExist(err) { - return err - } - } - - // now that the stale lockfile is gone, let's recurse - return l.TryLock() -} - -// Unlock a lock again, if we owned it. Returns any error that happend during release of lock. -func (l Lockfile) Unlock() error { - proc, err := l.GetOwner() - switch err { - case ErrInvalidPid, ErrDeadOwner: - return ErrRogueDeletion - case nil: - if proc.Pid == os.Getpid() { - // we really own it, so let's remove it. - return os.Remove(string(l)) - } - // Not owned by me, so don't delete it. - return ErrRogueDeletion - default: - // This is an application error or system error. - // So give a better error for logging here. - if os.IsNotExist(err) { - return ErrRogueDeletion - } - // Other errors -> defensively fail and let caller handle this - return err - } -} - -func writePidLine(w io.Writer, pid int) error { - _, err := io.WriteString(w, fmt.Sprintf("%d\n", pid)) - return err -} - -func scanPidLine(content []byte) (int, error) { - if len(content) == 0 { - return 0, ErrInvalidPid - } - - var pid int - if _, err := fmt.Sscanln(string(content), &pid); err != nil { - return 0, ErrInvalidPid - } - - if pid <= 0 { - return 0, ErrInvalidPid - } - return pid, nil -} diff --git a/vendor/github.com/nightlyone/lockfile/lockfile_unix.go b/vendor/github.com/nightlyone/lockfile/lockfile_unix.go deleted file mode 100644 index 742b041fb..000000000 --- a/vendor/github.com/nightlyone/lockfile/lockfile_unix.go +++ /dev/null @@ -1,20 +0,0 @@ -// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris - -package lockfile - -import ( - "os" - "syscall" -) - -func isRunning(pid int) (bool, error) { - proc, err := os.FindProcess(pid) - if err != nil { - return false, err - } - - if err := proc.Signal(syscall.Signal(0)); err != nil { - return false, nil - } - return true, nil -} diff --git a/vendor/github.com/nightlyone/lockfile/lockfile_windows.go b/vendor/github.com/nightlyone/lockfile/lockfile_windows.go deleted file mode 100644 index 482bd91d7..000000000 --- a/vendor/github.com/nightlyone/lockfile/lockfile_windows.go +++ /dev/null @@ -1,30 +0,0 @@ -package lockfile - -import ( - "syscall" -) - -//For some reason these consts don't exist in syscall. -const ( - error_invalid_parameter = 87 - code_still_active = 259 -) - -func isRunning(pid int) (bool, error) { - procHnd, err := syscall.OpenProcess(syscall.PROCESS_QUERY_INFORMATION, true, uint32(pid)) - if err != nil { - if scerr, ok := err.(syscall.Errno); ok { - if uintptr(scerr) == error_invalid_parameter { - return false, nil - } - } - } - - var code uint32 - err = syscall.GetExitCodeProcess(procHnd, &code) - if err != nil { - return false, err - } - - return code == code_still_active, nil -}