Scope writer locks to each writer.

Signed-off-by: Niklas Gehlen <niklas@namespacelabs.com>
This commit is contained in:
Niklas Gehlen 2024-10-23 20:39:51 +02:00
parent b291eb802b
commit 2535b187a6
No known key found for this signature in database
4 changed files with 23 additions and 24 deletions

View File

@ -18,7 +18,6 @@ package local
import (
"fmt"
"sync"
"time"
"github.com/containerd/errdefs"
@ -30,17 +29,11 @@ type lock struct {
since time.Time
}
var (
// locks lets us lock in process
locks = make(map[string]*lock)
locksMu sync.Mutex
)
func (s *store) tryLock(ref string) error {
s.locksMu.Lock()
defer s.locksMu.Unlock()
func tryLock(ref string) error {
locksMu.Lock()
defer locksMu.Unlock()
if v, ok := locks[ref]; ok {
if v, ok := s.locks[ref]; ok {
// Returning the duration may help developers distinguish dead locks (long duration) from
// lock contentions (short duration).
now := time.Now()
@ -50,13 +43,13 @@ func tryLock(ref string) error {
)
}
locks[ref] = &lock{time.Now()}
s.locks[ref] = &lock{time.Now()}
return nil
}
func unlock(ref string) {
locksMu.Lock()
defer locksMu.Unlock()
func (s *store) unlock(ref string) {
s.locksMu.Lock()
defer s.locksMu.Unlock()
delete(locks, ref)
delete(s.locks, ref)
}

View File

@ -24,11 +24,13 @@ import (
)
func TestTryLock(t *testing.T) {
err := tryLock("testref")
assert.NoError(t, err)
defer unlock("testref")
s := &store{locks: map[string]*lock{}}
err = tryLock("testref")
err := s.tryLock("testref")
assert.NoError(t, err)
defer s.unlock("testref")
err = s.tryLock("testref")
require.NotNil(t, err)
assert.Contains(t, err.Error(), "ref testref locked for ")
}

View File

@ -67,6 +67,9 @@ type store struct {
root string
ls LabelStore
integritySupported bool
locksMu sync.Mutex
locks map[string]*lock
}
// NewStore returns a local content store
@ -90,6 +93,7 @@ func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
root: root,
ls: ls,
integritySupported: supported,
locks: map[string]*lock{},
}, nil
}
@ -464,13 +468,13 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
}
if err := tryLock(wOpts.Ref); err != nil {
if err := s.tryLock(wOpts.Ref); err != nil {
return nil, err
}
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil {
unlock(wOpts.Ref)
s.unlock(wOpts.Ref)
return nil, err
}

View File

@ -78,7 +78,7 @@ func (w *writer) Write(p []byte) (n int, err error) {
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
// Ensure even on error the writer is fully closed
defer unlock(w.ref)
defer w.s.unlock(w.ref)
var base content.Info
for _, opt := range opts {
@ -198,7 +198,7 @@ func (w *writer) Close() (err error) {
err = w.fp.Close()
writeTimestampFile(filepath.Join(w.path, "updatedat"), w.updatedAt)
w.fp = nil
unlock(w.ref)
w.s.unlock(w.ref)
return
}