Merge pull request #10867 from namespacelabs/main
Lock store writer per digest and writer.
This commit is contained in:
commit
8c98e18a91
@ -18,7 +18,6 @@ package local
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/errdefs"
|
"github.com/containerd/errdefs"
|
||||||
@ -30,17 +29,11 @@ type lock struct {
|
|||||||
since time.Time
|
since time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
func (s *store) tryLock(ref string) error {
|
||||||
// locks lets us lock in process
|
s.locksMu.Lock()
|
||||||
locks = make(map[string]*lock)
|
defer s.locksMu.Unlock()
|
||||||
locksMu sync.Mutex
|
|
||||||
)
|
|
||||||
|
|
||||||
func tryLock(ref string) error {
|
if v, ok := s.locks[ref]; ok {
|
||||||
locksMu.Lock()
|
|
||||||
defer locksMu.Unlock()
|
|
||||||
|
|
||||||
if v, ok := locks[ref]; ok {
|
|
||||||
// Returning the duration may help developers distinguish dead locks (long duration) from
|
// Returning the duration may help developers distinguish dead locks (long duration) from
|
||||||
// lock contentions (short duration).
|
// lock contentions (short duration).
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -50,13 +43,13 @@ func tryLock(ref string) error {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
locks[ref] = &lock{time.Now()}
|
s.locks[ref] = &lock{time.Now()}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func unlock(ref string) {
|
func (s *store) unlock(ref string) {
|
||||||
locksMu.Lock()
|
s.locksMu.Lock()
|
||||||
defer locksMu.Unlock()
|
defer s.locksMu.Unlock()
|
||||||
|
|
||||||
delete(locks, ref)
|
delete(s.locks, ref)
|
||||||
}
|
}
|
||||||
|
@ -24,11 +24,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestTryLock(t *testing.T) {
|
func TestTryLock(t *testing.T) {
|
||||||
err := tryLock("testref")
|
s := &store{locks: map[string]*lock{}}
|
||||||
assert.NoError(t, err)
|
|
||||||
defer unlock("testref")
|
|
||||||
|
|
||||||
err = tryLock("testref")
|
err := s.tryLock("testref")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer s.unlock("testref")
|
||||||
|
|
||||||
|
err = s.tryLock("testref")
|
||||||
require.NotNil(t, err)
|
require.NotNil(t, err)
|
||||||
assert.Contains(t, err.Error(), "ref testref locked for ")
|
assert.Contains(t, err.Error(), "ref testref locked for ")
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,9 @@ type store struct {
|
|||||||
root string
|
root string
|
||||||
ls LabelStore
|
ls LabelStore
|
||||||
integritySupported bool
|
integritySupported bool
|
||||||
|
|
||||||
|
locksMu sync.Mutex
|
||||||
|
locks map[string]*lock
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStore returns a local content store
|
// NewStore returns a local content store
|
||||||
@ -90,6 +93,7 @@ func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
|
|||||||
root: root,
|
root: root,
|
||||||
ls: ls,
|
ls: ls,
|
||||||
integritySupported: supported,
|
integritySupported: supported,
|
||||||
|
locks: map[string]*lock{},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -464,13 +468,13 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.
|
|||||||
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
|
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tryLock(wOpts.Ref); err != nil {
|
if err := s.tryLock(wOpts.Ref); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
|
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
unlock(wOpts.Ref)
|
s.unlock(wOpts.Ref)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ func (w *writer) Write(p []byte) (n int, err error) {
|
|||||||
|
|
||||||
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||||
// Ensure even on error the writer is fully closed
|
// Ensure even on error the writer is fully closed
|
||||||
defer unlock(w.ref)
|
defer w.s.unlock(w.ref)
|
||||||
|
|
||||||
var base content.Info
|
var base content.Info
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@ -198,7 +198,7 @@ func (w *writer) Close() (err error) {
|
|||||||
err = w.fp.Close()
|
err = w.fp.Close()
|
||||||
writeTimestampFile(filepath.Join(w.path, "updatedat"), w.updatedAt)
|
writeTimestampFile(filepath.Join(w.path, "updatedat"), w.updatedAt)
|
||||||
w.fp = nil
|
w.fp = nil
|
||||||
unlock(w.ref)
|
w.s.unlock(w.ref)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user