content: reduce the contention between ref lock and boltdb lock

tryLock only once to reduce the amount of time the lock is held

Signed-off-by: jerryzhuang <zhuangqhc@gmail.com>
This commit is contained in:
jerryzhuang 2023-07-10 13:10:06 +08:00
parent 5c37d3827b
commit a4bdbf7844
2 changed files with 25 additions and 40 deletions

View File

@ -31,7 +31,6 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/randutil"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -458,23 +457,9 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.
if wOpts.Ref == "" {
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
}
var lockErr error
for count := uint64(0); count < 10; count++ {
if err := tryLock(wOpts.Ref); err != nil {
if !errdefs.IsUnavailable(err) {
return nil, err
}
lockErr = err
} else {
lockErr = nil
break
}
time.Sleep(time.Millisecond * time.Duration(randutil.Intn(1<<count)))
}
if lockErr != nil {
return nil, lockErr
if err := tryLock(wOpts.Ref); err != nil {
return nil, err
}
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)

View File

@ -148,28 +148,28 @@ var labels = map[string]string{
func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256)
w1, err := cs.Writer(ctx, content.WithRef("c1"))
w1, err := content.OpenWriter(ctx, cs, content.WithRef("c1"))
if err != nil {
t.Fatal(err)
}
defer w1.Close()
c2, d2 := createContent(256)
w2, err := cs.Writer(ctx, content.WithRef("c2"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c2))}))
w2, err := content.OpenWriter(ctx, cs, content.WithRef("c2"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c2))}))
if err != nil {
t.Fatal(err)
}
defer w2.Close()
c3, d3 := createContent(256)
w3, err := cs.Writer(ctx, content.WithRef("c3"), content.WithDescriptor(ocispec.Descriptor{Digest: d3}))
w3, err := content.OpenWriter(ctx, cs, content.WithRef("c3"), content.WithDescriptor(ocispec.Descriptor{Digest: d3}))
if err != nil {
t.Fatal(err)
}
defer w3.Close()
c4, d4 := createContent(256)
w4, err := cs.Writer(ctx, content.WithRef("c4"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c4)), Digest: d4}))
w4, err := content.OpenWriter(ctx, cs, content.WithRef("c4"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c4)), Digest: d4}))
if err != nil {
t.Fatal(err)
}
@ -252,7 +252,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
)
preStart := time.Now()
w1, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
w1, err := content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
if err != nil {
t.Fatal(err)
}
@ -273,7 +273,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
assert.Nil(t, w1.Close(), "close first writer")
w2, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
w2, err := content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
if err != nil {
t.Fatal(err)
}
@ -320,7 +320,7 @@ func checkCommitExists(ctx context.Context, t *testing.T, cs content.Store) {
},
{},
} {
w, err := cs.Writer(ctx, content.WithRef(fmt.Sprintf("c1-commitexists-%d", i)))
w, err := content.OpenWriter(ctx, cs, content.WithRef(fmt.Sprintf("c1-commitexists-%d", i)))
if err != nil {
t.Fatal(err)
}
@ -359,7 +359,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
}
ref := "c1-commiterror-state"
w, err := cs.Writer(ctx, content.WithRef(ref))
w, err := content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatal(err)
}
@ -383,7 +383,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
t.Fatalf("Unexpected error: %+v", err)
}
w, err = cs.Writer(ctx, content.WithRef(ref))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatal(err)
}
@ -409,7 +409,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
}
w.Close()
w, err = cs.Writer(ctx, content.WithRef(ref))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatal(err)
}
@ -432,7 +432,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
}
w.Close()
w, err = cs.Writer(ctx, content.WithRef(ref))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatal(err)
}
@ -448,7 +448,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
}
w.Close()
w, err = cs.Writer(ctx, content.WithRef(ref))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatal(err)
}
@ -465,7 +465,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
w.Close()
// Create another writer with same reference
w, err = cs.Writer(ctx, content.WithRef(ref))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatalf("Failed to open writer: %+v", err)
}
@ -491,7 +491,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
}
w.Close()
w, err = cs.Writer(ctx, content.WithRef(ref))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatal(err)
}
@ -503,7 +503,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
}
// Create another writer with same reference to check available
w, err = cs.Writer(ctx, content.WithRef(ref))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
t.Fatalf("Failed to open writer: %+v", err)
}
@ -516,7 +516,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256)
preStart := time.Now()
w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
w1, err := content.OpenWriter(ctx, cs, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
if err != nil {
t.Fatal(err)
}
@ -584,7 +584,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256)
w1, err := cs.Writer(ctx, content.WithRef("c1-checklabels"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
w1, err := content.OpenWriter(ctx, cs, content.WithRef("c1-checklabels"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
if err != nil {
t.Fatal(err)
}
@ -661,7 +661,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
limit := int64(float64(size) * tp)
ref := fmt.Sprintf("ref-%d-%d", i, j)
w, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
w, err := content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil {
t.Fatal(err)
}
@ -675,7 +675,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
t.Fatal(err)
}
w, err = cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil {
t.Fatal(err)
}
@ -835,7 +835,7 @@ func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) {
}
defer done(ctx2)
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil {
t.Fatal(err)
}
@ -894,7 +894,7 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
copy(b2[size:], extra)
d2 := digest.FromBytes(b2)
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil {
t.Fatal(err)
}
@ -957,7 +957,7 @@ func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) {
defer done(ctx2)
t3 := time.Now()
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil {
t.Fatal(err)
}
@ -994,7 +994,7 @@ func checkSharedNSIsolate(ctx context.Context, t *testing.T, cs content.Store) {
}
defer done2(ctx2)
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil {
t.Fatal(err)
}