integration: reproduce #9347
Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
2e9686c054
commit
7f410ae05a
@ -21,8 +21,10 @@ import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/content"
|
||||
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
||||
"github.com/containerd/containerd/v2/platforms"
|
||||
"github.com/containerd/containerd/v2/plugins"
|
||||
@ -30,6 +32,7 @@ import (
|
||||
srvconfig "github.com/containerd/containerd/v2/services/server/config"
|
||||
"github.com/containerd/log/logtest"
|
||||
"github.com/containerd/plugin"
|
||||
"github.com/opencontainers/go-digest"
|
||||
|
||||
_ "github.com/containerd/containerd/v2/diff/walking/plugin"
|
||||
_ "github.com/containerd/containerd/v2/events/plugin"
|
||||
@ -59,9 +62,11 @@ var (
|
||||
loadedPluginsErr error
|
||||
)
|
||||
|
||||
type tweakPluginInitFunc func(t *testing.T, p plugin.Registration) plugin.Registration
|
||||
|
||||
// buildLocalContainerdClient is to return containerd client with initialized
|
||||
// core plugins in local.
|
||||
func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client {
|
||||
func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPluginInitFunc) *containerd.Client {
|
||||
ctx := logtest.WithT(context.Background(), t)
|
||||
|
||||
// load plugins
|
||||
@ -104,6 +109,10 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client
|
||||
initContext.Config = pc
|
||||
}
|
||||
|
||||
if tweakInitFn != nil {
|
||||
p = tweakInitFn(t, p)
|
||||
}
|
||||
|
||||
result := p.Init(initContext)
|
||||
assert.NoError(t, initialized.Add(result))
|
||||
|
||||
@ -124,3 +133,61 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
func tweakContentInitFnWithDelayer(commitDelayDuration time.Duration) tweakPluginInitFunc {
|
||||
return func(t *testing.T, p plugin.Registration) plugin.Registration {
|
||||
if p.URI() != "io.containerd.content.v1.content" {
|
||||
return p
|
||||
}
|
||||
|
||||
oldInitFn := p.InitFn
|
||||
p.InitFn = func(ic *plugin.InitContext) (interface{}, error) {
|
||||
instance, err := oldInitFn(ic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &contentStoreDelayer{
|
||||
t: t,
|
||||
|
||||
Store: instance.(content.Store),
|
||||
commitDelayDuration: commitDelayDuration,
|
||||
}, nil
|
||||
}
|
||||
return p
|
||||
}
|
||||
}
|
||||
|
||||
type contentStoreDelayer struct {
|
||||
t *testing.T
|
||||
|
||||
content.Store
|
||||
commitDelayDuration time.Duration
|
||||
}
|
||||
|
||||
func (cs *contentStoreDelayer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
||||
w, err := cs.Store.Writer(ctx, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &contentWriterDelayer{
|
||||
t: cs.t,
|
||||
|
||||
Writer: w,
|
||||
commitDelayDuration: cs.commitDelayDuration,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type contentWriterDelayer struct {
|
||||
t *testing.T
|
||||
|
||||
content.Writer
|
||||
commitDelayDuration time.Duration
|
||||
}
|
||||
|
||||
func (w *contentWriterDelayer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
w.t.Logf("[testcase: %s] Commit %v blob after %v", w.t.Name(), expected, w.commitDelayDuration)
|
||||
time.Sleep(w.commitDelayDuration)
|
||||
return w.Writer.Commit(ctx, size, expected, opts...)
|
||||
}
|
||||
|
@ -61,6 +61,41 @@ func TestCRIImagePullTimeout(t *testing.T) {
|
||||
|
||||
t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter)
|
||||
t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred)
|
||||
t.Run("SlowCommitWriter", testCRIImagePullTimeoutBySlowCommitWriter)
|
||||
}
|
||||
|
||||
// testCRIImagePullTimeoutBySlowCommitWriter tests that
|
||||
//
|
||||
// It should not cancel if the content.Commit takes long time.
|
||||
//
|
||||
// After copying all the data from registry, the request should be inactive
|
||||
// before content.Commit. If the blob is large, for instance, 2 GiB, the fsync
|
||||
// during content.Commit maybe take long time during IO pressure. The
|
||||
// content.Commit holds the bolt's writable mutex and blocks other goroutines
|
||||
// which are going to commit blob as well. If the progress tracker still
|
||||
// considers these requests active, it maybe file false alert and cancel the
|
||||
// ImagePull.
|
||||
//
|
||||
// It's reproducer for #9347.
|
||||
func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
delayDuration := 2 * defaultImagePullProgressTimeout
|
||||
cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration))
|
||||
|
||||
criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
||||
|
||||
_, err = criService.PullImage(ctx, &runtimeapi.PullImageRequest{
|
||||
Image: &runtimeapi.ImageSpec{
|
||||
Image: pullProgressTestImageName,
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// testCRIImagePullTimeoutByHoldingContentOpenWriter tests that
|
||||
@ -75,7 +110,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
cli := buildLocalContainerdClient(t, tmpDir)
|
||||
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
||||
|
||||
criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
|
||||
assert.NoError(t, err)
|
||||
@ -213,7 +248,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
cli := buildLocalContainerdClient(t, tmpDir)
|
||||
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
||||
|
||||
mirrorSrv := newMirrorRegistryServer(mirrorRegistryServerConfig{
|
||||
limitedBytesPerConn: 1024 * 1024 * 3, // 3MB
|
||||
|
Loading…
Reference in New Issue
Block a user