diff --git a/integration/build_local_containerd_helper_test.go b/integration/build_local_containerd_helper_test.go index f49df219a..790399705 100644 --- a/integration/build_local_containerd_helper_test.go +++ b/integration/build_local_containerd_helper_test.go @@ -21,8 +21,10 @@ import ( "path/filepath" "sync" "testing" + "time" containerd "github.com/containerd/containerd/v2/client" + "github.com/containerd/containerd/v2/content" "github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/platforms" "github.com/containerd/containerd/v2/plugins" @@ -30,6 +32,7 @@ import ( srvconfig "github.com/containerd/containerd/v2/services/server/config" "github.com/containerd/log/logtest" "github.com/containerd/plugin" + "github.com/opencontainers/go-digest" _ "github.com/containerd/containerd/v2/diff/walking/plugin" _ "github.com/containerd/containerd/v2/events/plugin" @@ -59,9 +62,11 @@ var ( loadedPluginsErr error ) +type tweakPluginInitFunc func(t *testing.T, p plugin.Registration) plugin.Registration + // buildLocalContainerdClient is to return containerd client with initialized // core plugins in local. -func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client { +func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPluginInitFunc) *containerd.Client { ctx := logtest.WithT(context.Background(), t) // load plugins @@ -104,6 +109,10 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client initContext.Config = pc } + if tweakInitFn != nil { + p = tweakInitFn(t, p) + } + result := p.Init(initContext) assert.NoError(t, initialized.Add(result)) @@ -124,3 +133,61 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client return client } + +func tweakContentInitFnWithDelayer(commitDelayDuration time.Duration) tweakPluginInitFunc { + return func(t *testing.T, p plugin.Registration) plugin.Registration { + if p.URI() != "io.containerd.content.v1.content" { + return p + } + + oldInitFn := p.InitFn + p.InitFn = func(ic *plugin.InitContext) (interface{}, error) { + instance, err := oldInitFn(ic) + if err != nil { + return nil, err + } + + return &contentStoreDelayer{ + t: t, + + Store: instance.(content.Store), + commitDelayDuration: commitDelayDuration, + }, nil + } + return p + } +} + +type contentStoreDelayer struct { + t *testing.T + + content.Store + commitDelayDuration time.Duration +} + +func (cs *contentStoreDelayer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + w, err := cs.Store.Writer(ctx, opts...) + if err != nil { + return nil, err + } + + return &contentWriterDelayer{ + t: cs.t, + + Writer: w, + commitDelayDuration: cs.commitDelayDuration, + }, nil +} + +type contentWriterDelayer struct { + t *testing.T + + content.Writer + commitDelayDuration time.Duration +} + +func (w *contentWriterDelayer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + w.t.Logf("[testcase: %s] Commit %v blob after %v", w.t.Name(), expected, w.commitDelayDuration) + time.Sleep(w.commitDelayDuration) + return w.Writer.Commit(ctx, size, expected, opts...) +} diff --git a/integration/image_pull_timeout_test.go b/integration/image_pull_timeout_test.go index 9f9966db2..c5029bd30 100644 --- a/integration/image_pull_timeout_test.go +++ b/integration/image_pull_timeout_test.go @@ -61,6 +61,41 @@ func TestCRIImagePullTimeout(t *testing.T) { t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter) t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred) + t.Run("SlowCommitWriter", testCRIImagePullTimeoutBySlowCommitWriter) +} + +// testCRIImagePullTimeoutBySlowCommitWriter tests that +// +// It should not cancel if the content.Commit takes long time. +// +// After copying all the data from registry, the request should be inactive +// before content.Commit. If the blob is large, for instance, 2 GiB, the fsync +// during content.Commit maybe take long time during IO pressure. The +// content.Commit holds the bolt's writable mutex and blocks other goroutines +// which are going to commit blob as well. If the progress tracker still +// considers these requests active, it maybe file false alert and cancel the +// ImagePull. +// +// It's reproducer for #9347. +func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) { + t.Parallel() + + tmpDir := t.TempDir() + + delayDuration := 2 * defaultImagePullProgressTimeout + cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration)) + + criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{}) + assert.NoError(t, err) + + ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace) + + _, err = criService.PullImage(ctx, &runtimeapi.PullImageRequest{ + Image: &runtimeapi.ImageSpec{ + Image: pullProgressTestImageName, + }, + }) + assert.NoError(t, err) } // testCRIImagePullTimeoutByHoldingContentOpenWriter tests that @@ -75,7 +110,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) { tmpDir := t.TempDir() - cli := buildLocalContainerdClient(t, tmpDir) + cli := buildLocalContainerdClient(t, tmpDir, nil) criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{}) assert.NoError(t, err) @@ -213,7 +248,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) { tmpDir := t.TempDir() - cli := buildLocalContainerdClient(t, tmpDir) + cli := buildLocalContainerdClient(t, tmpDir, nil) mirrorSrv := newMirrorRegistryServer(mirrorRegistryServerConfig{ limitedBytesPerConn: 1024 * 1024 * 3, // 3MB