Merge pull request #1152 from jterry75/deadline_exceeded
Return gRPC codes.DeadlineExceeded for all timeout operations
This commit is contained in:
commit
74d2b6947c
@ -164,16 +164,15 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
var timeoutCh <-chan time.Time
|
execCtx := ctx
|
||||||
if opts.timeout == 0 {
|
if opts.timeout > 0 {
|
||||||
// Do not set timeout if it's 0.
|
var execCtxCancel context.CancelFunc
|
||||||
timeoutCh = make(chan time.Time)
|
execCtx, execCtxCancel = context.WithTimeout(ctx, opts.timeout)
|
||||||
} else {
|
defer execCtxCancel()
|
||||||
timeoutCh = time.After(opts.timeout)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-timeoutCh:
|
case <-execCtx.Done():
|
||||||
//TODO(Abhi) Use context.WithDeadline instead of timeout.
|
|
||||||
// Ignore the not found error because the process may exit itself before killing.
|
// Ignore the not found error because the process may exit itself before killing.
|
||||||
if err := process.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
|
if err := process.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
|
||||||
return nil, errors.Wrapf(err, "failed to kill exec %q", execID)
|
return nil, errors.Wrapf(err, "failed to kill exec %q", execID)
|
||||||
@ -184,7 +183,7 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
|
|||||||
execID, exitRes.ExitCode(), exitRes.Error())
|
execID, exitRes.ExitCode(), exitRes.Error())
|
||||||
<-attachDone
|
<-attachDone
|
||||||
logrus.Debugf("Stream pipe for exec process %q done", execID)
|
logrus.Debugf("Stream pipe for exec process %q done", execID)
|
||||||
return nil, errors.Errorf("timeout %v exceeded", opts.timeout)
|
return nil, errors.Wrapf(execCtx.Err(), "timeout %v exceeded", opts.timeout)
|
||||||
case exitRes := <-exitCh:
|
case exitRes := <-exitCh:
|
||||||
code, _, err := exitRes.Result()
|
code, _, err := exitRes.Result()
|
||||||
logrus.Infof("Exec process %q exits with exit code %d and error %v", execID, code, err)
|
logrus.Infof("Exec process %q exits with exit code %d and error %v", execID, code, err)
|
||||||
|
@ -33,13 +33,6 @@ import (
|
|||||||
containerstore "github.com/containerd/cri/pkg/store/container"
|
containerstore "github.com/containerd/cri/pkg/store/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
// killContainerTimeout is the timeout that we wait for the container to
|
|
||||||
// be SIGKILLed.
|
|
||||||
// The timeout is set to 1 min, because the default CRI operation timeout
|
|
||||||
// for StopContainer is (2 min + stop timeout). Set to 1 min, so that we
|
|
||||||
// have enough time for kill(all=true) and kill(all=false).
|
|
||||||
const killContainerTimeout = 1 * time.Minute
|
|
||||||
|
|
||||||
// StopContainer stops a running container with a grace period (i.e., timeout).
|
// StopContainer stops a running container with a grace period (i.e., timeout).
|
||||||
func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) {
|
func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) {
|
||||||
// Get container config from container store.
|
// Get container config from container store.
|
||||||
@ -141,11 +134,21 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
|
|||||||
return errors.Wrapf(err, "failed to stop container %q", id)
|
return errors.Wrapf(err, "failed to stop container %q", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.waitContainerStop(ctx, container, timeout); err == nil || errors.Cause(err) == ctx.Err() {
|
sigTermCtx, sigTermCtxCancel := context.WithTimeout(ctx, timeout)
|
||||||
// Do not SIGKILL container if the context is cancelled.
|
err = c.waitContainerStop(sigTermCtx, container)
|
||||||
return err
|
if err == nil {
|
||||||
|
// Container stopped on first signal no need for SIGKILL
|
||||||
|
sigTermCtxCancel()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
logrus.WithError(err).Errorf("An error occurs during waiting for container %q to be stopped", id)
|
// If the parent context was cancelled or exceeded return immediately
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
sigTermCtxCancel()
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
sigTermCtxCancel()
|
||||||
|
// sigTermCtx was exceeded. Send SIGKILL
|
||||||
|
logrus.Debugf("Stop container %q with signal %v timed out", id, sig)
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("Kill container %q", id)
|
logrus.Infof("Kill container %q", id)
|
||||||
@ -154,21 +157,19 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for a fixed timeout until container stop is observed by event monitor.
|
// Wait for a fixed timeout until container stop is observed by event monitor.
|
||||||
if err = c.waitContainerStop(ctx, container, killContainerTimeout); err == nil {
|
err = c.waitContainerStop(ctx, container)
|
||||||
return nil
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "an error occurs during waiting for container %q to be killed", id)
|
||||||
}
|
}
|
||||||
return errors.Wrapf(err, "an error occurs during waiting for container %q to be killed", id)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitContainerStop waits for container to be stopped until timeout exceeds or context is cancelled.
|
// waitContainerStop waits for container to be stopped until context is
|
||||||
func (c *criService) waitContainerStop(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
|
// cancelled or the context deadline is exceeded.
|
||||||
timeoutTimer := time.NewTimer(timeout)
|
func (c *criService) waitContainerStop(ctx context.Context, container containerstore.Container) error {
|
||||||
defer timeoutTimer.Stop()
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.Wrapf(ctx.Err(), "wait container %q is cancelled", container.ID)
|
return errors.Wrapf(ctx.Err(), "wait container %q", container.ID)
|
||||||
case <-timeoutTimer.C:
|
|
||||||
return errors.Errorf("wait container %q stop timeout", container.ID)
|
|
||||||
case <-container.Stopped():
|
case <-container.Stopped():
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -74,7 +74,12 @@ func TestWaitContainerStop(t *testing.T) {
|
|||||||
cancel()
|
cancel()
|
||||||
ctx = cancelledCtx
|
ctx = cancelledCtx
|
||||||
}
|
}
|
||||||
err = c.waitContainerStop(ctx, container, test.timeout)
|
if test.timeout > 0 {
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(ctx, test.timeout)
|
||||||
|
defer cancel()
|
||||||
|
ctx = timeoutCtx
|
||||||
|
}
|
||||||
|
err = c.waitContainerStop(ctx, container)
|
||||||
assert.Equal(t, test.expectErr, err != nil, desc)
|
assert.Equal(t, test.expectErr, err != nil, desc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -142,18 +142,15 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
|
|||||||
return errors.Wrap(err, "failed to kill sandbox container")
|
return errors.Wrap(err, "failed to kill sandbox container")
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.waitSandboxStop(ctx, sandbox, killContainerTimeout)
|
return c.waitSandboxStop(ctx, sandbox)
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitSandboxStop waits for sandbox to be stopped until timeout exceeds or context is cancelled.
|
// waitSandboxStop waits for sandbox to be stopped until context is cancelled or
|
||||||
func (c *criService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox, timeout time.Duration) error {
|
// the context deadline is exceeded.
|
||||||
timeoutTimer := time.NewTimer(timeout)
|
func (c *criService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox) error {
|
||||||
defer timeoutTimer.Stop()
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.Wrapf(ctx.Err(), "wait sandbox container %q is cancelled", sandbox.ID)
|
return errors.Wrapf(ctx.Err(), "wait sandbox container %q", sandbox.ID)
|
||||||
case <-timeoutTimer.C:
|
|
||||||
return errors.Errorf("wait sandbox container %q stop timeout", sandbox.ID)
|
|
||||||
case <-sandbox.Stopped():
|
case <-sandbox.Stopped():
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,12 @@ func TestWaitSandboxStop(t *testing.T) {
|
|||||||
cancel()
|
cancel()
|
||||||
ctx = cancelledCtx
|
ctx = cancelledCtx
|
||||||
}
|
}
|
||||||
err := c.waitSandboxStop(ctx, sandbox, test.timeout)
|
if test.timeout > 0 {
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(ctx, test.timeout)
|
||||||
|
defer cancel()
|
||||||
|
ctx = timeoutCtx
|
||||||
|
}
|
||||||
|
err := c.waitSandboxStop(ctx, sandbox)
|
||||||
assert.Equal(t, test.expectErr, err != nil, desc)
|
assert.Equal(t, test.expectErr, err != nil, desc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user