*: add DrainExecSyncIOTimeout config and disable as by default
Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
a9cbddd65d
commit
3c18decea7
@ -351,6 +351,13 @@ type PluginConfig struct {
|
||||
// The string is in the golang duration format, see:
|
||||
// https://golang.org/pkg/time/#ParseDuration
|
||||
ImagePullProgressTimeout string `toml:"image_pull_progress_timeout" json:"imagePullProgressTimeout"`
|
||||
// DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync
|
||||
// API' IO EOF event after exec init process exits. A zero value means
|
||||
// there is no timeout.
|
||||
//
|
||||
// The string is in the golang duration format, see:
|
||||
// https://golang.org/pkg/time/#ParseDuration
|
||||
DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"`
|
||||
}
|
||||
|
||||
// X509KeyPairStreaming contains the x509 configuration for streaming
|
||||
@ -509,5 +516,12 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) error {
|
||||
return fmt.Errorf("invalid image pull progress timeout: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Validation for drain_exec_sync_io_timeout
|
||||
if c.DrainExecSyncIOTimeout != "" {
|
||||
if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil {
|
||||
return fmt.Errorf("invalid drain exec sync io timeout: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -110,5 +110,6 @@ func DefaultConfig() PluginConfig {
|
||||
EnableCDI: false,
|
||||
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
|
||||
ImagePullProgressTimeout: time.Minute.String(),
|
||||
DrainExecSyncIOTimeout: "0",
|
||||
}
|
||||
}
|
||||
|
@ -86,5 +86,6 @@ func DefaultConfig() PluginConfig {
|
||||
KeyModel: KeyModelNode,
|
||||
},
|
||||
ImagePullProgressTimeout: time.Minute.String(),
|
||||
DrainExecSyncIOTimeout: "0",
|
||||
}
|
||||
}
|
||||
|
@ -37,23 +37,6 @@ import (
|
||||
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
||||
)
|
||||
|
||||
// defaultDrainExecIOTimeout is used to drain exec io after exec process exits.
|
||||
//
|
||||
// By default, the child processes spawned by exec process will inherit standard
|
||||
// io file descriptors. The shim server creates a pipe as data channel. Both
|
||||
// exec process and its children write data into the write end of the pipe.
|
||||
// And the shim server will read data from the pipe. If the write end is still
|
||||
// open, the shim server will continue to wait for data from pipe.
|
||||
//
|
||||
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
|
||||
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
|
||||
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
|
||||
// should wait for it.
|
||||
//
|
||||
// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec
|
||||
// process to stop copy io in shim side.
|
||||
const defaultDrainExecIOTimeout = 15 * time.Second
|
||||
|
||||
type cappedWriter struct {
|
||||
w io.WriteCloser
|
||||
remain int
|
||||
@ -134,6 +117,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err)
|
||||
}
|
||||
|
||||
spec, err := container.Spec(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get container spec: %w", err)
|
||||
@ -224,7 +212,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
||||
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
|
||||
execID, exitRes.ExitCode(), exitRes.Error())
|
||||
|
||||
if err := drainExecIO(ctx, process, attachDone); err != nil {
|
||||
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
|
||||
log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID)
|
||||
}
|
||||
|
||||
@ -236,7 +224,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
||||
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
|
||||
}
|
||||
|
||||
if err := drainExecIO(ctx, process, attachDone); err != nil {
|
||||
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
|
||||
return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err)
|
||||
}
|
||||
return &code, nil
|
||||
@ -272,12 +260,30 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
|
||||
return c.execInternal(ctx, cntr.Container, id, opts)
|
||||
}
|
||||
|
||||
func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error {
|
||||
timer := time.NewTimer(defaultDrainExecIOTimeout)
|
||||
// drainExecSyncIO drains process IO with timeout after exec init process exits.
|
||||
//
|
||||
// By default, the child processes spawned by exec process will inherit standard
|
||||
// io file descriptors. The shim server creates a pipe as data channel. Both
|
||||
// exec process and its children write data into the write end of the pipe.
|
||||
// And the shim server will read data from the pipe. If the write end is still
|
||||
// open, the shim server will continue to wait for data from pipe.
|
||||
//
|
||||
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
|
||||
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
|
||||
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
|
||||
// should wait for it.
|
||||
func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error {
|
||||
var timerCh <-chan time.Time
|
||||
|
||||
if drainExecIOTimeout != 0 {
|
||||
timer := time.NewTimer(drainExecIOTimeout)
|
||||
defer timer.Stop()
|
||||
|
||||
timerCh = timer.C
|
||||
}
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-timerCh:
|
||||
|
||||
case <-attachDone:
|
||||
log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID())
|
||||
|
@ -18,8 +18,14 @@ package sbserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/cio"
|
||||
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) {
|
||||
err := cw.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDrainExecSyncIO(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
t.Run("NoTimeout", func(t *testing.T) {
|
||||
ep := &fakeExecProcess{
|
||||
id: t.Name(),
|
||||
pid: uint32(os.Getpid()),
|
||||
}
|
||||
|
||||
attachDoneCh := make(chan struct{})
|
||||
time.AfterFunc(2*time.Second, func() { close(attachDoneCh) })
|
||||
assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh))
|
||||
assert.Equal(t, 0, len(ep.actionEvents))
|
||||
})
|
||||
|
||||
t.Run("With3Seconds", func(t *testing.T) {
|
||||
ep := &fakeExecProcess{
|
||||
id: t.Name(),
|
||||
pid: uint32(os.Getpid()),
|
||||
}
|
||||
|
||||
attachDoneCh := make(chan struct{})
|
||||
time.AfterFunc(100*time.Second, func() { close(attachDoneCh) })
|
||||
assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh))
|
||||
assert.Equal(t, []string{"Delete"}, ep.actionEvents)
|
||||
})
|
||||
}
|
||||
|
||||
type fakeExecProcess struct {
|
||||
id string
|
||||
pid uint32
|
||||
actionEvents []string
|
||||
}
|
||||
|
||||
// ID of the process
|
||||
func (p *fakeExecProcess) ID() string {
|
||||
return p.id
|
||||
}
|
||||
|
||||
// Pid is the system specific process id
|
||||
func (p *fakeExecProcess) Pid() uint32 {
|
||||
return p.pid
|
||||
}
|
||||
|
||||
// Start starts the process executing the user's defined binary
|
||||
func (p *fakeExecProcess) Start(context.Context) error {
|
||||
p.actionEvents = append(p.actionEvents, "Start")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes the process and any resources allocated returning the exit status
|
||||
func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) {
|
||||
p.actionEvents = append(p.actionEvents, "Delete")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Kill sends the provided signal to the process
|
||||
func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error {
|
||||
p.actionEvents = append(p.actionEvents, "Kill")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel
|
||||
func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) {
|
||||
p.actionEvents = append(p.actionEvents, "Wait")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// CloseIO allows various pipes to be closed on the process
|
||||
func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error {
|
||||
p.actionEvents = append(p.actionEvents, "CloseIO")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resize changes the width and height of the process's terminal
|
||||
func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error {
|
||||
p.actionEvents = append(p.actionEvents, "Resize")
|
||||
return nil
|
||||
}
|
||||
|
||||
// IO returns the io set for the process
|
||||
func (p *fakeExecProcess) IO() cio.IO {
|
||||
p.actionEvents = append(p.actionEvents, "IO")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Status returns the executing status of the process
|
||||
func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) {
|
||||
p.actionEvents = append(p.actionEvents, "Status")
|
||||
return containerd.Status{}, nil
|
||||
}
|
||||
|
@ -37,23 +37,6 @@ import (
|
||||
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
||||
)
|
||||
|
||||
// defaultDrainExecIOTimeout is used to drain exec io after exec process exits.
|
||||
//
|
||||
// By default, the child processes spawned by exec process will inherit standard
|
||||
// io file descriptors. The shim server creates a pipe as data channel. Both
|
||||
// exec process and its children write data into the write end of the pipe.
|
||||
// And the shim server will read data from the pipe. If the write end is still
|
||||
// open, the shim server will continue to wait for data from pipe.
|
||||
//
|
||||
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
|
||||
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
|
||||
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
|
||||
// should wait for it.
|
||||
//
|
||||
// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec
|
||||
// process to stop copy io in shim side.
|
||||
const defaultDrainExecIOTimeout = 15 * time.Second
|
||||
|
||||
type cappedWriter struct {
|
||||
w io.WriteCloser
|
||||
remain int
|
||||
@ -134,6 +117,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err)
|
||||
}
|
||||
|
||||
spec, err := container.Spec(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get container spec: %w", err)
|
||||
@ -224,7 +212,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
||||
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
|
||||
execID, exitRes.ExitCode(), exitRes.Error())
|
||||
|
||||
if err := drainExecIO(ctx, process, attachDone); err != nil {
|
||||
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
|
||||
log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID)
|
||||
}
|
||||
|
||||
@ -236,7 +224,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
||||
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
|
||||
}
|
||||
|
||||
if err := drainExecIO(ctx, process, attachDone); err != nil {
|
||||
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
|
||||
return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err)
|
||||
}
|
||||
return &code, nil
|
||||
@ -272,12 +260,30 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
|
||||
return c.execInternal(ctx, cntr.Container, id, opts)
|
||||
}
|
||||
|
||||
func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error {
|
||||
timer := time.NewTimer(defaultDrainExecIOTimeout)
|
||||
// drainExecSyncIO drains process IO with timeout after exec init process exits.
|
||||
//
|
||||
// By default, the child processes spawned by exec process will inherit standard
|
||||
// io file descriptors. The shim server creates a pipe as data channel. Both
|
||||
// exec process and its children write data into the write end of the pipe.
|
||||
// And the shim server will read data from the pipe. If the write end is still
|
||||
// open, the shim server will continue to wait for data from pipe.
|
||||
//
|
||||
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
|
||||
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
|
||||
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
|
||||
// should wait for it.
|
||||
func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error {
|
||||
var timerCh <-chan time.Time
|
||||
|
||||
if drainExecIOTimeout != 0 {
|
||||
timer := time.NewTimer(drainExecIOTimeout)
|
||||
defer timer.Stop()
|
||||
|
||||
timerCh = timer.C
|
||||
}
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-timerCh:
|
||||
|
||||
case <-attachDone:
|
||||
log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID())
|
||||
|
@ -18,8 +18,14 @@ package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/cio"
|
||||
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) {
|
||||
err := cw.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDrainExecSyncIO(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
t.Run("NoTimeout", func(t *testing.T) {
|
||||
ep := &fakeExecProcess{
|
||||
id: t.Name(),
|
||||
pid: uint32(os.Getpid()),
|
||||
}
|
||||
|
||||
attachDoneCh := make(chan struct{})
|
||||
time.AfterFunc(2*time.Second, func() { close(attachDoneCh) })
|
||||
assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh))
|
||||
assert.Equal(t, 0, len(ep.actionEvents))
|
||||
})
|
||||
|
||||
t.Run("With3Seconds", func(t *testing.T) {
|
||||
ep := &fakeExecProcess{
|
||||
id: t.Name(),
|
||||
pid: uint32(os.Getpid()),
|
||||
}
|
||||
|
||||
attachDoneCh := make(chan struct{})
|
||||
time.AfterFunc(100*time.Second, func() { close(attachDoneCh) })
|
||||
assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh))
|
||||
assert.Equal(t, []string{"Delete"}, ep.actionEvents)
|
||||
})
|
||||
}
|
||||
|
||||
type fakeExecProcess struct {
|
||||
id string
|
||||
pid uint32
|
||||
actionEvents []string
|
||||
}
|
||||
|
||||
// ID of the process
|
||||
func (p *fakeExecProcess) ID() string {
|
||||
return p.id
|
||||
}
|
||||
|
||||
// Pid is the system specific process id
|
||||
func (p *fakeExecProcess) Pid() uint32 {
|
||||
return p.pid
|
||||
}
|
||||
|
||||
// Start starts the process executing the user's defined binary
|
||||
func (p *fakeExecProcess) Start(context.Context) error {
|
||||
p.actionEvents = append(p.actionEvents, "Start")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes the process and any resources allocated returning the exit status
|
||||
func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) {
|
||||
p.actionEvents = append(p.actionEvents, "Delete")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Kill sends the provided signal to the process
|
||||
func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error {
|
||||
p.actionEvents = append(p.actionEvents, "Kill")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel
|
||||
func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) {
|
||||
p.actionEvents = append(p.actionEvents, "Wait")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// CloseIO allows various pipes to be closed on the process
|
||||
func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error {
|
||||
p.actionEvents = append(p.actionEvents, "CloseIO")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resize changes the width and height of the process's terminal
|
||||
func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error {
|
||||
p.actionEvents = append(p.actionEvents, "Resize")
|
||||
return nil
|
||||
}
|
||||
|
||||
// IO returns the io set for the process
|
||||
func (p *fakeExecProcess) IO() cio.IO {
|
||||
p.actionEvents = append(p.actionEvents, "IO")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Status returns the executing status of the process
|
||||
func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) {
|
||||
p.actionEvents = append(p.actionEvents, "Status")
|
||||
return containerd.Status{}, nil
|
||||
}
|
||||
|
@ -39,11 +39,23 @@ CONTAINERD_RUNTIME=${CONTAINERD_RUNTIME:-""}
|
||||
if [ -z "${CONTAINERD_CONFIG_FILE}" ]; then
|
||||
config_file="${CONTAINERD_CONFIG_DIR}/containerd-config-cri.toml"
|
||||
truncate --size 0 "${config_file}"
|
||||
echo "version=2" >> ${config_file}
|
||||
# TODO(fuweid): if the config.Imports supports patch update, it will be easy
|
||||
# to write the integration test case with different configuration, like:
|
||||
#
|
||||
# 1. write configuration into importable containerd config path.
|
||||
# 2. restart containerd
|
||||
# 3. verify the behaviour
|
||||
# 4. delete the configuration
|
||||
# 5. restart containerd
|
||||
cat >>${config_file} <<EOF
|
||||
version=2
|
||||
|
||||
[plugins."io.containerd.grpc.v1.cri"]
|
||||
drain_exec_sync_io_timeout = "10s"
|
||||
EOF
|
||||
|
||||
if command -v sestatus >/dev/null 2>&1; then
|
||||
cat >>${config_file} <<EOF
|
||||
[plugins."io.containerd.grpc.v1.cri"]
|
||||
enable_selinux = true
|
||||
EOF
|
||||
fi
|
||||
|
Loading…
Reference in New Issue
Block a user