Merge pull request #9369 from fuweid/fix-pull-progress
fix: ImagePull should close http connection if there is no available data to read.
This commit is contained in:
commit
f12e84b5c5
@ -21,8 +21,10 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
containerd "github.com/containerd/containerd/v2/client"
|
containerd "github.com/containerd/containerd/v2/client"
|
||||||
|
"github.com/containerd/containerd/v2/content"
|
||||||
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
||||||
"github.com/containerd/containerd/v2/platforms"
|
"github.com/containerd/containerd/v2/platforms"
|
||||||
"github.com/containerd/containerd/v2/plugins"
|
"github.com/containerd/containerd/v2/plugins"
|
||||||
@ -30,6 +32,7 @@ import (
|
|||||||
srvconfig "github.com/containerd/containerd/v2/services/server/config"
|
srvconfig "github.com/containerd/containerd/v2/services/server/config"
|
||||||
"github.com/containerd/log/logtest"
|
"github.com/containerd/log/logtest"
|
||||||
"github.com/containerd/plugin"
|
"github.com/containerd/plugin"
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
|
||||||
_ "github.com/containerd/containerd/v2/diff/walking/plugin"
|
_ "github.com/containerd/containerd/v2/diff/walking/plugin"
|
||||||
_ "github.com/containerd/containerd/v2/events/plugin"
|
_ "github.com/containerd/containerd/v2/events/plugin"
|
||||||
@ -59,9 +62,11 @@ var (
|
|||||||
loadedPluginsErr error
|
loadedPluginsErr error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type tweakPluginInitFunc func(t *testing.T, p plugin.Registration) plugin.Registration
|
||||||
|
|
||||||
// buildLocalContainerdClient is to return containerd client with initialized
|
// buildLocalContainerdClient is to return containerd client with initialized
|
||||||
// core plugins in local.
|
// core plugins in local.
|
||||||
func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client {
|
func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPluginInitFunc) *containerd.Client {
|
||||||
ctx := logtest.WithT(context.Background(), t)
|
ctx := logtest.WithT(context.Background(), t)
|
||||||
|
|
||||||
// load plugins
|
// load plugins
|
||||||
@ -104,6 +109,10 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client
|
|||||||
initContext.Config = pc
|
initContext.Config = pc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tweakInitFn != nil {
|
||||||
|
p = tweakInitFn(t, p)
|
||||||
|
}
|
||||||
|
|
||||||
result := p.Init(initContext)
|
result := p.Init(initContext)
|
||||||
assert.NoError(t, initialized.Add(result))
|
assert.NoError(t, initialized.Add(result))
|
||||||
|
|
||||||
@ -124,3 +133,61 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client
|
|||||||
|
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tweakContentInitFnWithDelayer(commitDelayDuration time.Duration) tweakPluginInitFunc {
|
||||||
|
return func(t *testing.T, p plugin.Registration) plugin.Registration {
|
||||||
|
if p.URI() != "io.containerd.content.v1.content" {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
oldInitFn := p.InitFn
|
||||||
|
p.InitFn = func(ic *plugin.InitContext) (interface{}, error) {
|
||||||
|
instance, err := oldInitFn(ic)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &contentStoreDelayer{
|
||||||
|
t: t,
|
||||||
|
|
||||||
|
Store: instance.(content.Store),
|
||||||
|
commitDelayDuration: commitDelayDuration,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type contentStoreDelayer struct {
|
||||||
|
t *testing.T
|
||||||
|
|
||||||
|
content.Store
|
||||||
|
commitDelayDuration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *contentStoreDelayer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
||||||
|
w, err := cs.Store.Writer(ctx, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &contentWriterDelayer{
|
||||||
|
t: cs.t,
|
||||||
|
|
||||||
|
Writer: w,
|
||||||
|
commitDelayDuration: cs.commitDelayDuration,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type contentWriterDelayer struct {
|
||||||
|
t *testing.T
|
||||||
|
|
||||||
|
content.Writer
|
||||||
|
commitDelayDuration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *contentWriterDelayer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||||
|
w.t.Logf("[testcase: %s] Commit %v blob after %v", w.t.Name(), expected, w.commitDelayDuration)
|
||||||
|
time.Sleep(w.commitDelayDuration)
|
||||||
|
return w.Writer.Commit(ctx, size, expected, opts...)
|
||||||
|
}
|
||||||
|
@ -61,6 +61,41 @@ func TestCRIImagePullTimeout(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter)
|
t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter)
|
||||||
t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred)
|
t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred)
|
||||||
|
t.Run("SlowCommitWriter", testCRIImagePullTimeoutBySlowCommitWriter)
|
||||||
|
}
|
||||||
|
|
||||||
|
// testCRIImagePullTimeoutBySlowCommitWriter tests that
|
||||||
|
//
|
||||||
|
// It should not cancel if the content.Commit takes long time.
|
||||||
|
//
|
||||||
|
// After copying all the data from registry, the request should be inactive
|
||||||
|
// before content.Commit. If the blob is large, for instance, 2 GiB, the fsync
|
||||||
|
// during content.Commit maybe take long time during IO pressure. The
|
||||||
|
// content.Commit holds the bolt's writable mutex and blocks other goroutines
|
||||||
|
// which are going to commit blob as well. If the progress tracker still
|
||||||
|
// considers these requests active, it maybe file false alert and cancel the
|
||||||
|
// ImagePull.
|
||||||
|
//
|
||||||
|
// It's reproducer for #9347.
|
||||||
|
func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
tmpDir := t.TempDir()
|
||||||
|
|
||||||
|
delayDuration := 2 * defaultImagePullProgressTimeout
|
||||||
|
cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration))
|
||||||
|
|
||||||
|
criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
||||||
|
|
||||||
|
_, err = criService.PullImage(ctx, &runtimeapi.PullImageRequest{
|
||||||
|
Image: &runtimeapi.ImageSpec{
|
||||||
|
Image: pullProgressTestImageName,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// testCRIImagePullTimeoutByHoldingContentOpenWriter tests that
|
// testCRIImagePullTimeoutByHoldingContentOpenWriter tests that
|
||||||
@ -75,7 +110,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {
|
|||||||
|
|
||||||
tmpDir := t.TempDir()
|
tmpDir := t.TempDir()
|
||||||
|
|
||||||
cli := buildLocalContainerdClient(t, tmpDir)
|
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
||||||
|
|
||||||
criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
|
criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@ -213,7 +248,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {
|
|||||||
|
|
||||||
tmpDir := t.TempDir()
|
tmpDir := t.TempDir()
|
||||||
|
|
||||||
cli := buildLocalContainerdClient(t, tmpDir)
|
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
||||||
|
|
||||||
mirrorSrv := newMirrorRegistryServer(mirrorRegistryServerConfig{
|
mirrorSrv := newMirrorRegistryServer(mirrorRegistryServerConfig{
|
||||||
limitedBytesPerConn: 1024 * 1024 * 3, // 3MB
|
limitedBytesPerConn: 1024 * 1024 * 3, // 3MB
|
||||||
|
@ -30,6 +30,30 @@ import (
|
|||||||
"github.com/containerd/containerd/v2/pkg/deprecation"
|
"github.com/containerd/containerd/v2/pkg/deprecation"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defaultImagePullProgressTimeoutDuration is the default value of imagePullProgressTimeout.
|
||||||
|
//
|
||||||
|
// NOTE:
|
||||||
|
//
|
||||||
|
// This ImagePullProgressTimeout feature is ported from kubelet/dockershim's
|
||||||
|
// --image-pull-progress-deadline. The original value is 1m0. Unlike docker
|
||||||
|
// daemon, the containerd doesn't have global concurrent download limitation
|
||||||
|
// before migrating to Transfer Service. If kubelet runs with concurrent
|
||||||
|
// image pull, the node will run under IO pressure. The ImagePull process
|
||||||
|
// could be impacted by self, if the target image is large one with a
|
||||||
|
// lot of layers. And also both container's writable layers and image's storage
|
||||||
|
// share one disk. The ImagePull process commits blob to content store
|
||||||
|
// with fsync, which might bring the unrelated files' dirty pages into
|
||||||
|
// disk in one transaction [1]. The 1m0 value isn't good enough. Based
|
||||||
|
// on #9347 case and kubernetes community's usage [2], the default value
|
||||||
|
// is updated to 5m0. If end-user still runs into unexpected cancel,
|
||||||
|
// they need to config it based on their environment.
|
||||||
|
//
|
||||||
|
// [1]: Fast commits for ext4 - https://lwn.net/Articles/842385/
|
||||||
|
// [2]: https://github.com/kubernetes/kubernetes/blob/1635c380b26a1d8cc25d36e9feace9797f4bae3c/cluster/gce/util.sh#L882
|
||||||
|
defaultImagePullProgressTimeoutDuration = 5 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
type SandboxControllerMode string
|
type SandboxControllerMode string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -19,8 +19,6 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
containerd "github.com/containerd/containerd/v2/client"
|
containerd "github.com/containerd/containerd/v2/client"
|
||||||
"github.com/pelletier/go-toml/v2"
|
"github.com/pelletier/go-toml/v2"
|
||||||
"k8s.io/kubelet/pkg/cri/streaming"
|
"k8s.io/kubelet/pkg/cri/streaming"
|
||||||
@ -100,7 +98,7 @@ func DefaultConfig() PluginConfig {
|
|||||||
},
|
},
|
||||||
EnableCDI: false,
|
EnableCDI: false,
|
||||||
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
|
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
|
||||||
ImagePullProgressTimeout: time.Minute.String(),
|
ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(),
|
||||||
DrainExecSyncIOTimeout: "0s",
|
DrainExecSyncIOTimeout: "0s",
|
||||||
EnableUnprivilegedPorts: true,
|
EnableUnprivilegedPorts: true,
|
||||||
EnableUnprivilegedICMP: true,
|
EnableUnprivilegedICMP: true,
|
||||||
|
@ -19,7 +19,6 @@ package config
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
|
||||||
|
|
||||||
containerd "github.com/containerd/containerd/v2/client"
|
containerd "github.com/containerd/containerd/v2/client"
|
||||||
"k8s.io/kubelet/pkg/cri/streaming"
|
"k8s.io/kubelet/pkg/cri/streaming"
|
||||||
@ -84,7 +83,7 @@ func DefaultConfig() PluginConfig {
|
|||||||
ImageDecryption: ImageDecryption{
|
ImageDecryption: ImageDecryption{
|
||||||
KeyModel: KeyModelNode,
|
KeyModel: KeyModelNode,
|
||||||
},
|
},
|
||||||
ImagePullProgressTimeout: time.Minute.String(),
|
ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(),
|
||||||
DrainExecSyncIOTimeout: "0s",
|
DrainExecSyncIOTimeout: "0s",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -555,9 +555,6 @@ func newTransport() *http.Transport {
|
|||||||
//}
|
//}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// minPullProgressReportInternal is used to prevent the reporter from
|
|
||||||
// eating more CPU resources
|
|
||||||
minPullProgressReportInternal = 5 * time.Second
|
|
||||||
// defaultPullProgressReportInterval represents that how often the
|
// defaultPullProgressReportInterval represents that how often the
|
||||||
// reporter checks that pull progress.
|
// reporter checks that pull progress.
|
||||||
defaultPullProgressReportInterval = 10 * time.Second
|
defaultPullProgressReportInterval = 10 * time.Second
|
||||||
@ -605,10 +602,6 @@ func (reporter *pullProgressReporter) start(ctx context.Context) {
|
|||||||
// check progress more frequently if timeout < default internal
|
// check progress more frequently if timeout < default internal
|
||||||
if reporter.timeout < reportInterval {
|
if reporter.timeout < reportInterval {
|
||||||
reportInterval = reporter.timeout / 2
|
reportInterval = reporter.timeout / 2
|
||||||
|
|
||||||
if reportInterval < minPullProgressReportInternal {
|
|
||||||
reportInterval = minPullProgressReportInternal
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var ticker = time.NewTicker(reportInterval)
|
var ticker = time.NewTicker(reportInterval)
|
||||||
@ -623,9 +616,9 @@ func (reporter *pullProgressReporter) start(ctx context.Context) {
|
|||||||
WithField("activeReqs", activeReqs).
|
WithField("activeReqs", activeReqs).
|
||||||
WithField("totalBytesRead", bytesRead).
|
WithField("totalBytesRead", bytesRead).
|
||||||
WithField("lastSeenBytesRead", lastSeenBytesRead).
|
WithField("lastSeenBytesRead", lastSeenBytesRead).
|
||||||
WithField("lastSeenTimestamp", lastSeenTimestamp).
|
WithField("lastSeenTimestamp", lastSeenTimestamp.Format(time.RFC3339)).
|
||||||
WithField("reportInterval", reportInterval).
|
WithField("reportInterval", reportInterval).
|
||||||
Tracef("progress for image pull")
|
Debugf("progress for image pull")
|
||||||
|
|
||||||
if activeReqs == 0 || bytesRead > lastSeenBytesRead {
|
if activeReqs == 0 || bytesRead > lastSeenBytesRead {
|
||||||
lastSeenBytesRead = bytesRead
|
lastSeenBytesRead = bytesRead
|
||||||
|
@ -76,6 +76,16 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
|
|||||||
if _, err2 := hrs.reader(); err2 == nil {
|
if _, err2 := hrs.reader(); err2 == nil {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
} else if err == io.EOF {
|
||||||
|
// The CRI's imagePullProgressTimeout relies on responseBody.Close to
|
||||||
|
// update the process monitor's status. If the err is io.EOF, close
|
||||||
|
// the connection since there is no more available data.
|
||||||
|
if hrs.rc != nil {
|
||||||
|
if clsErr := hrs.rc.Close(); clsErr != nil {
|
||||||
|
log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser after io.EOF")
|
||||||
|
}
|
||||||
|
hrs.rc = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user