From 2e9686c054241b4643ba9e8a91b8ab9b39b63f20 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Wed, 15 Nov 2023 18:35:26 +0800 Subject: [PATCH 1/3] fix: deflake TestCRIImagePullTimeout/HoldingContentOpenWriter The new active request is filed and there is no bytes read yet when the progress reporter just wakes up. If the timeout / 2 is less than the minPullProgressReportInternal, it's easy to file false alert. We should remove the minPullProgressReportInternal limit. Fixes: #8024 Signed-off-by: Wei Fu --- pkg/cri/server/images/image_pull.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/cri/server/images/image_pull.go b/pkg/cri/server/images/image_pull.go index cb9db2738..840015052 100644 --- a/pkg/cri/server/images/image_pull.go +++ b/pkg/cri/server/images/image_pull.go @@ -555,9 +555,6 @@ func newTransport() *http.Transport { //} const ( - // minPullProgressReportInternal is used to prevent the reporter from - // eating more CPU resources - minPullProgressReportInternal = 5 * time.Second // defaultPullProgressReportInterval represents that how often the // reporter checks that pull progress. defaultPullProgressReportInterval = 10 * time.Second @@ -605,10 +602,6 @@ func (reporter *pullProgressReporter) start(ctx context.Context) { // check progress more frequently if timeout < default internal if reporter.timeout < reportInterval { reportInterval = reporter.timeout / 2 - - if reportInterval < minPullProgressReportInternal { - reportInterval = minPullProgressReportInternal - } } var ticker = time.NewTicker(reportInterval) From 7f410ae05a9d85700799e724fc8a8bbbf50bf853 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Wed, 15 Nov 2023 19:47:05 +0800 Subject: [PATCH 2/3] integration: reproduce #9347 Signed-off-by: Wei Fu --- .../build_local_containerd_helper_test.go | 69 ++++++++++++++++++- integration/image_pull_timeout_test.go | 39 ++++++++++- 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/integration/build_local_containerd_helper_test.go b/integration/build_local_containerd_helper_test.go index f49df219a..790399705 100644 --- a/integration/build_local_containerd_helper_test.go +++ b/integration/build_local_containerd_helper_test.go @@ -21,8 +21,10 @@ import ( "path/filepath" "sync" "testing" + "time" containerd "github.com/containerd/containerd/v2/client" + "github.com/containerd/containerd/v2/content" "github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/platforms" "github.com/containerd/containerd/v2/plugins" @@ -30,6 +32,7 @@ import ( srvconfig "github.com/containerd/containerd/v2/services/server/config" "github.com/containerd/log/logtest" "github.com/containerd/plugin" + "github.com/opencontainers/go-digest" _ "github.com/containerd/containerd/v2/diff/walking/plugin" _ "github.com/containerd/containerd/v2/events/plugin" @@ -59,9 +62,11 @@ var ( loadedPluginsErr error ) +type tweakPluginInitFunc func(t *testing.T, p plugin.Registration) plugin.Registration + // buildLocalContainerdClient is to return containerd client with initialized // core plugins in local. -func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client { +func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPluginInitFunc) *containerd.Client { ctx := logtest.WithT(context.Background(), t) // load plugins @@ -104,6 +109,10 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client initContext.Config = pc } + if tweakInitFn != nil { + p = tweakInitFn(t, p) + } + result := p.Init(initContext) assert.NoError(t, initialized.Add(result)) @@ -124,3 +133,61 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client return client } + +func tweakContentInitFnWithDelayer(commitDelayDuration time.Duration) tweakPluginInitFunc { + return func(t *testing.T, p plugin.Registration) plugin.Registration { + if p.URI() != "io.containerd.content.v1.content" { + return p + } + + oldInitFn := p.InitFn + p.InitFn = func(ic *plugin.InitContext) (interface{}, error) { + instance, err := oldInitFn(ic) + if err != nil { + return nil, err + } + + return &contentStoreDelayer{ + t: t, + + Store: instance.(content.Store), + commitDelayDuration: commitDelayDuration, + }, nil + } + return p + } +} + +type contentStoreDelayer struct { + t *testing.T + + content.Store + commitDelayDuration time.Duration +} + +func (cs *contentStoreDelayer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + w, err := cs.Store.Writer(ctx, opts...) + if err != nil { + return nil, err + } + + return &contentWriterDelayer{ + t: cs.t, + + Writer: w, + commitDelayDuration: cs.commitDelayDuration, + }, nil +} + +type contentWriterDelayer struct { + t *testing.T + + content.Writer + commitDelayDuration time.Duration +} + +func (w *contentWriterDelayer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + w.t.Logf("[testcase: %s] Commit %v blob after %v", w.t.Name(), expected, w.commitDelayDuration) + time.Sleep(w.commitDelayDuration) + return w.Writer.Commit(ctx, size, expected, opts...) +} diff --git a/integration/image_pull_timeout_test.go b/integration/image_pull_timeout_test.go index 9f9966db2..c5029bd30 100644 --- a/integration/image_pull_timeout_test.go +++ b/integration/image_pull_timeout_test.go @@ -61,6 +61,41 @@ func TestCRIImagePullTimeout(t *testing.T) { t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter) t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred) + t.Run("SlowCommitWriter", testCRIImagePullTimeoutBySlowCommitWriter) +} + +// testCRIImagePullTimeoutBySlowCommitWriter tests that +// +// It should not cancel if the content.Commit takes long time. +// +// After copying all the data from registry, the request should be inactive +// before content.Commit. If the blob is large, for instance, 2 GiB, the fsync +// during content.Commit maybe take long time during IO pressure. The +// content.Commit holds the bolt's writable mutex and blocks other goroutines +// which are going to commit blob as well. If the progress tracker still +// considers these requests active, it maybe file false alert and cancel the +// ImagePull. +// +// It's reproducer for #9347. +func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) { + t.Parallel() + + tmpDir := t.TempDir() + + delayDuration := 2 * defaultImagePullProgressTimeout + cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration)) + + criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{}) + assert.NoError(t, err) + + ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace) + + _, err = criService.PullImage(ctx, &runtimeapi.PullImageRequest{ + Image: &runtimeapi.ImageSpec{ + Image: pullProgressTestImageName, + }, + }) + assert.NoError(t, err) } // testCRIImagePullTimeoutByHoldingContentOpenWriter tests that @@ -75,7 +110,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) { tmpDir := t.TempDir() - cli := buildLocalContainerdClient(t, tmpDir) + cli := buildLocalContainerdClient(t, tmpDir, nil) criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{}) assert.NoError(t, err) @@ -213,7 +248,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) { tmpDir := t.TempDir() - cli := buildLocalContainerdClient(t, tmpDir) + cli := buildLocalContainerdClient(t, tmpDir, nil) mirrorSrv := newMirrorRegistryServer(mirrorRegistryServerConfig{ limitedBytesPerConn: 1024 * 1024 * 3, // 3MB From 80dd779debad2c8a4e00b457042e39b66adff16e Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Wed, 15 Nov 2023 20:00:26 +0800 Subject: [PATCH 3/3] remotes/docker: close connection if no more data Close connection if no more data. It's to fix false alert filed by image pull progress. ``` dst = OpenWriter (--> Content Store) src = Fetch Open (--> Registry) Mark it as active request Copy(dst, src) (--> Keep updating total received bytes) ^ | (Active Request > 0, but total received bytes won't be updated) v defer src.Close() content.Commit(dst) ``` Before migrating to transfer service, CRI plugin doesn't limit global concurrent downloads for ImagePulls. Each ImagePull requests have 3 concurrent goroutines to download blob and 1 goroutine to unpack blob. Like ext4 filesystem [1][1], the fsync from content.Commit may sync unrelated dirty pages into disk. The host is running under IO pressure, and then the content.Commit will take long time and block other goroutines. If httpreadseeker doesn't close the connection after io.EOF, this connection will be considered as active. The pull progress reporter reports there is no bytes transfered and cancels the ImagePull. The original 1-minute timeout[2][2] is from kubelet settting. Since CRI-plugin can't limit the total concurrent downloads, this patch is to update 1-minute to 5-minutes to prevent from unexpected cancel. [1]: https://lwn.net/Articles/842385/ [2]: https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/kubelet/config/flags.go#L45-L48 Signed-off-by: Wei Fu --- pkg/cri/config/config.go | 24 ++++++++++++++++++++++++ pkg/cri/config/config_unix.go | 4 +--- pkg/cri/config/config_windows.go | 3 +-- pkg/cri/server/images/image_pull.go | 4 ++-- remotes/docker/httpreadseeker.go | 10 ++++++++++ 5 files changed, 38 insertions(+), 7 deletions(-) diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 43e954b5b..fb526ecd9 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -30,6 +30,30 @@ import ( "github.com/containerd/containerd/v2/pkg/deprecation" ) +const ( + // defaultImagePullProgressTimeoutDuration is the default value of imagePullProgressTimeout. + // + // NOTE: + // + // This ImagePullProgressTimeout feature is ported from kubelet/dockershim's + // --image-pull-progress-deadline. The original value is 1m0. Unlike docker + // daemon, the containerd doesn't have global concurrent download limitation + // before migrating to Transfer Service. If kubelet runs with concurrent + // image pull, the node will run under IO pressure. The ImagePull process + // could be impacted by self, if the target image is large one with a + // lot of layers. And also both container's writable layers and image's storage + // share one disk. The ImagePull process commits blob to content store + // with fsync, which might bring the unrelated files' dirty pages into + // disk in one transaction [1]. The 1m0 value isn't good enough. Based + // on #9347 case and kubernetes community's usage [2], the default value + // is updated to 5m0. If end-user still runs into unexpected cancel, + // they need to config it based on their environment. + // + // [1]: Fast commits for ext4 - https://lwn.net/Articles/842385/ + // [2]: https://github.com/kubernetes/kubernetes/blob/1635c380b26a1d8cc25d36e9feace9797f4bae3c/cluster/gce/util.sh#L882 + defaultImagePullProgressTimeoutDuration = 5 * time.Minute +) + type SandboxControllerMode string const ( diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index fbd308124..d3fe60e8e 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -19,8 +19,6 @@ package config import ( - "time" - containerd "github.com/containerd/containerd/v2/client" "github.com/pelletier/go-toml/v2" "k8s.io/kubelet/pkg/cri/streaming" @@ -100,7 +98,7 @@ func DefaultConfig() PluginConfig { }, EnableCDI: false, CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"}, - ImagePullProgressTimeout: time.Minute.String(), + ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(), DrainExecSyncIOTimeout: "0s", EnableUnprivilegedPorts: true, EnableUnprivilegedICMP: true, diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index 53ee83f9f..f4f23ea7f 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -19,7 +19,6 @@ package config import ( "os" "path/filepath" - "time" containerd "github.com/containerd/containerd/v2/client" "k8s.io/kubelet/pkg/cri/streaming" @@ -84,7 +83,7 @@ func DefaultConfig() PluginConfig { ImageDecryption: ImageDecryption{ KeyModel: KeyModelNode, }, - ImagePullProgressTimeout: time.Minute.String(), + ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(), DrainExecSyncIOTimeout: "0s", } } diff --git a/pkg/cri/server/images/image_pull.go b/pkg/cri/server/images/image_pull.go index 840015052..8de15f902 100644 --- a/pkg/cri/server/images/image_pull.go +++ b/pkg/cri/server/images/image_pull.go @@ -616,9 +616,9 @@ func (reporter *pullProgressReporter) start(ctx context.Context) { WithField("activeReqs", activeReqs). WithField("totalBytesRead", bytesRead). WithField("lastSeenBytesRead", lastSeenBytesRead). - WithField("lastSeenTimestamp", lastSeenTimestamp). + WithField("lastSeenTimestamp", lastSeenTimestamp.Format(time.RFC3339)). WithField("reportInterval", reportInterval). - Tracef("progress for image pull") + Debugf("progress for image pull") if activeReqs == 0 || bytesRead > lastSeenBytesRead { lastSeenBytesRead = bytesRead diff --git a/remotes/docker/httpreadseeker.go b/remotes/docker/httpreadseeker.go index 5ada7e247..5d873173c 100644 --- a/remotes/docker/httpreadseeker.go +++ b/remotes/docker/httpreadseeker.go @@ -76,6 +76,16 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { if _, err2 := hrs.reader(); err2 == nil { return n, nil } + } else if err == io.EOF { + // The CRI's imagePullProgressTimeout relies on responseBody.Close to + // update the process monitor's status. If the err is io.EOF, close + // the connection since there is no more available data. + if hrs.rc != nil { + if clsErr := hrs.rc.Close(); clsErr != nil { + log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser after io.EOF") + } + hrs.rc = nil + } } return }