Merge pull request #8792 from zhuangqh/fix/writer-deadlock
content: reduce the contention between ref lock and boltdb lock
This commit is contained in:
commit
74e205f1e7
@ -31,7 +31,6 @@ import (
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/filters"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/pkg/randutil"
|
||||
|
||||
"github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
@ -458,25 +457,11 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.
|
||||
if wOpts.Ref == "" {
|
||||
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
|
||||
}
|
||||
var lockErr error
|
||||
for count := uint64(0); count < 10; count++ {
|
||||
|
||||
if err := tryLock(wOpts.Ref); err != nil {
|
||||
if !errdefs.IsUnavailable(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lockErr = err
|
||||
} else {
|
||||
lockErr = nil
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond * time.Duration(randutil.Intn(1<<count)))
|
||||
}
|
||||
|
||||
if lockErr != nil {
|
||||
return nil, lockErr
|
||||
}
|
||||
|
||||
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
|
||||
if err != nil {
|
||||
unlock(wOpts.Ref)
|
||||
|
@ -148,28 +148,28 @@ var labels = map[string]string{
|
||||
|
||||
func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
c1, d1 := createContent(256)
|
||||
w1, err := cs.Writer(ctx, content.WithRef("c1"))
|
||||
w1, err := content.OpenWriter(ctx, cs, content.WithRef("c1"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w1.Close()
|
||||
|
||||
c2, d2 := createContent(256)
|
||||
w2, err := cs.Writer(ctx, content.WithRef("c2"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c2))}))
|
||||
w2, err := content.OpenWriter(ctx, cs, content.WithRef("c2"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c2))}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w2.Close()
|
||||
|
||||
c3, d3 := createContent(256)
|
||||
w3, err := cs.Writer(ctx, content.WithRef("c3"), content.WithDescriptor(ocispec.Descriptor{Digest: d3}))
|
||||
w3, err := content.OpenWriter(ctx, cs, content.WithRef("c3"), content.WithDescriptor(ocispec.Descriptor{Digest: d3}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w3.Close()
|
||||
|
||||
c4, d4 := createContent(256)
|
||||
w4, err := cs.Writer(ctx, content.WithRef("c4"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c4)), Digest: d4}))
|
||||
w4, err := content.OpenWriter(ctx, cs, content.WithRef("c4"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c4)), Digest: d4}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -252,7 +252,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
)
|
||||
|
||||
preStart := time.Now()
|
||||
w1, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
|
||||
w1, err := content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -273,7 +273,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
|
||||
assert.Nil(t, w1.Close(), "close first writer")
|
||||
|
||||
w2, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
|
||||
w2, err := content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -320,7 +320,7 @@ func checkCommitExists(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
},
|
||||
{},
|
||||
} {
|
||||
w, err := cs.Writer(ctx, content.WithRef(fmt.Sprintf("c1-commitexists-%d", i)))
|
||||
w, err := content.OpenWriter(ctx, cs, content.WithRef(fmt.Sprintf("c1-commitexists-%d", i)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -359,7 +359,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
}
|
||||
|
||||
ref := "c1-commiterror-state"
|
||||
w, err := cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err := content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -383,7 +383,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
t.Fatalf("Unexpected error: %+v", err)
|
||||
}
|
||||
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -409,7 +409,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -432,7 +432,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
}
|
||||
|
||||
w.Close()
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -448,7 +448,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
}
|
||||
|
||||
w.Close()
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -465,7 +465,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
|
||||
w.Close()
|
||||
// Create another writer with same reference
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to open writer: %+v", err)
|
||||
}
|
||||
@ -491,7 +491,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
}
|
||||
|
||||
w.Close()
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -503,7 +503,7 @@ func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store)
|
||||
}
|
||||
|
||||
// Create another writer with same reference to check available
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to open writer: %+v", err)
|
||||
}
|
||||
@ -516,7 +516,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
c1, d1 := createContent(256)
|
||||
|
||||
preStart := time.Now()
|
||||
w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
|
||||
w1, err := content.OpenWriter(ctx, cs, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -584,7 +584,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
c1, d1 := createContent(256)
|
||||
|
||||
w1, err := cs.Writer(ctx, content.WithRef("c1-checklabels"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
|
||||
w1, err := content.OpenWriter(ctx, cs, content.WithRef("c1-checklabels"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -661,7 +661,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
|
||||
limit := int64(float64(size) * tp)
|
||||
ref := fmt.Sprintf("ref-%d-%d", i, j)
|
||||
|
||||
w, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
w, err := content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -675,7 +675,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w, err = cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
w, err = content.OpenWriter(ctx, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -835,7 +835,7 @@ func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
}
|
||||
defer done(ctx2)
|
||||
|
||||
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -894,7 +894,7 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
copy(b2[size:], extra)
|
||||
d2 := digest.FromBytes(b2)
|
||||
|
||||
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -957,7 +957,7 @@ func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
defer done(ctx2)
|
||||
|
||||
t3 := time.Now()
|
||||
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -994,7 +994,7 @@ func checkSharedNSIsolate(ctx context.Context, t *testing.T, cs content.Store) {
|
||||
}
|
||||
defer done2(ctx2)
|
||||
|
||||
w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
w, err := content.OpenWriter(ctx2, cs, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user