Merge pull request #7401 from wllenyj/sandbox_stop

Sandbox API: implement Controller.Wait and Controller.Stop
This commit is contained in:
Phil Estes 2022-09-22 14:33:52 -04:00 committed by GitHub
commit 8f95bac049
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 317 additions and 133 deletions

View File

@ -18,15 +18,21 @@ package podsandbox
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd"
api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/oci"
criconfig "github.com/containerd/containerd/pkg/cri/config"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
osinterface "github.com/containerd/containerd/pkg/os"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
"github.com/sirupsen/logrus"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -35,8 +41,8 @@ import (
type CRIService interface {
EnsureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error)
// TODO: we should implement Controller.Wait and use it instead of this to monitor sandbox exit.
StartSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{}
// TODO: we should implement Event backoff in Controller.
BackOffEvent(id string, event interface{})
}
type Controller struct {
@ -52,6 +58,8 @@ type Controller struct {
cri CRIService
// baseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
baseOCISpecs map[string]*oci.Spec
store *Store
}
func New(
@ -69,25 +77,100 @@ func New(
os: os,
cri: cri,
baseOCISpecs: baseOCISpecs,
store: NewStore(),
}
}
var _ sandbox.Controller = (*Controller)(nil)
func (c *Controller) Stop(ctx context.Context, sandboxID string) (*api.ControllerStopResponse, error) {
//TODO implement me
panic("implement me")
}
func (c *Controller) Delete(ctx context.Context, sandboxID string) (*api.ControllerDeleteResponse, error) {
//TODO implement me
panic("implement me")
}
func (c *Controller) Wait(ctx context.Context, sandboxID string) (*api.ControllerWaitResponse, error) {
func (c *Controller) Status(ctx context.Context, sandboxID string) (*api.ControllerStatusResponse, error) {
//TODO implement me
panic("implement me")
}
func (c *Controller) Status(ctx context.Context, sandboxID string) (*api.ControllerStatusResponse, error) {
panic("implement me")
func (c *Controller) Wait(ctx context.Context, sandboxID string) (*api.ControllerWaitResponse, error) {
status := c.store.Get(sandboxID)
if status == nil {
return nil, fmt.Errorf("failed to get exit channel. %q", sandboxID)
}
exitStatus, exitedAt, err := c.waitSandboxExit(ctx, sandboxID, status.Waiter)
return &api.ControllerWaitResponse{
ExitStatus: exitStatus,
ExitedAt: protobuf.ToTimestamp(exitedAt),
}, err
}
func (c *Controller) waitSandboxExit(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) {
exitStatus = unknownExitCode
exitedAt = time.Now()
select {
case exitRes := <-exitCh:
logrus.Debugf("received sandbox exit %+v", exitRes)
exitStatus, exitedAt, err = exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}
err = func() error {
dctx := ctrdutil.NamespacedContext()
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()
sb, err := c.sandboxStore.Get(id)
if err == nil {
if err := handleSandboxExit(dctx, sb); err != nil {
return err
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get sandbox %s: %w", id, err)
}
return nil
}()
if err != nil {
logrus.WithError(err).Errorf("failed to handle sandbox TaskExit %s", id)
// Don't backoff, the caller is responsible for.
return
}
case <-ctx.Done():
return exitStatus, exitedAt, ctx.Err()
}
return
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox) error {
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to load task for sandbox: %w", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop sandbox: %w", err)
}
// Move on to make sure container status is updated.
}
}
sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.State = sandboxstore.StateNotReady
status.Pid = 0
return status, nil
})
// Using channel to propagate the information of sandbox stop
sb.Stop()
return nil
}

View File

@ -23,6 +23,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
@ -64,6 +65,15 @@ const (
runtimeRunhcsV1 = "io.containerd.runhcs.v1"
)
const (
// unknownExitCode is the exit code when exit reason is unknown.
unknownExitCode = 255
)
const (
handleEventTimeout = 10 * time.Second
)
// getSandboxRootDir returns the root directory for managing sandbox files,
// e.g. hosts files.
func (c *Controller) getSandboxRootDir(id string) string {

View File

@ -225,6 +225,7 @@ func (c *Controller) Start(ctx context.Context, id string) (_ *api.ControllerSta
if err != nil {
return nil, fmt.Errorf("failed to wait for sandbox container task: %w", err)
}
c.store.Save(id, exitCh)
nric, err := nri.New()
if err != nil {
@ -244,13 +245,6 @@ func (c *Controller) Start(ctx context.Context, id string) (_ *api.ControllerSta
return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
}
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
c.cri.StartSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh) // TODO: Move back to CRI service.
resp := &api.ControllerStartResponse{
SandboxID: id,
Pid: task.Pid(),

View File

@ -0,0 +1,138 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podsandbox
import (
"context"
"fmt"
"syscall"
eventtypes "github.com/containerd/containerd/api/events"
api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
"github.com/containerd/containerd/protobuf"
"github.com/sirupsen/logrus"
)
func (c *Controller) Stop(ctx context.Context, sandboxID string) (*api.ControllerStopResponse, error) {
sandbox, err := c.sandboxStore.Get(sandboxID)
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %w",
sandboxID, err)
}
if err := c.cleanupSandboxFiles(sandboxID, sandbox.Config); err != nil {
return nil, fmt.Errorf("failed to cleanup sandbox files: %w", err)
}
// TODO: The Controller maintains its own Status instead of CRI's sandboxStore.
// Only stop sandbox container when it's running or unknown.
state := sandbox.Status.Get().State
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
if err := c.stopSandboxContainer(ctx, sandbox); err != nil {
return nil, fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err)
}
}
return &api.ControllerStopResponse{}, nil
}
// stopSandboxContainer kills the sandbox container.
// `task.Delete` is not called here because it will be called when
// the event monitor handles the `TaskExit` event.
func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error {
id := sandbox.ID
container := sandbox.Container
state := sandbox.Status.Get().State
task, err := container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get sandbox container: %w", err)
}
// Don't return for unknown state, some cleanup needs to be done.
if state == sandboxstore.StateUnknown {
return cleanupUnknownSandbox(ctx, id, sandbox)
}
return nil
}
// Handle unknown state.
// The cleanup logic is the same with container unknown state.
if state == sandboxstore.StateUnknown {
// Start an exit handler for containers in unknown state.
waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext())
defer waitCancel()
exitCh, err := task.Wait(waitCtx)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task: %w", err)
}
return cleanupUnknownSandbox(ctx, id, sandbox)
}
exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, id, exitCh)
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: task.Pid(),
ExitStatus: exitStatus,
ExitedAt: protobuf.ToTimestamp(exitedAt),
}
logrus.WithError(err).Errorf("Failed to wait sandbox exit %+v", e)
// TODO: how to backoff
c.cri.BackOffEvent(id, e)
}
}()
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
// `Wait` is cancelled, so no exit event is generated
// because of the `Wait` cancellation.
<-stopCh
}()
}
// Kill the sandbox container.
if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to kill sandbox container: %w", err)
}
return c.waitSandboxStop(ctx, sandbox)
}
// waitSandboxStop waits for sandbox to be stopped until context is cancelled or
// the context deadline is exceeded.
func (c *Controller) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox) error {
select {
case <-ctx.Done():
return fmt.Errorf("wait sandbox container %q: %w", sandbox.ID, ctx.Err())
case <-sandbox.Stopped():
return nil
}
}
// cleanupUnknownSandbox cleanup stopped sandbox in unknown state.
func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
// Reuse handleSandboxExit to do the cleanup.
return handleSandboxExit(ctx, sandbox)
}

View File

@ -0,0 +1,49 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podsandbox
import (
"sync"
"github.com/containerd/containerd"
)
type Status struct {
Waiter <-chan containerd.ExitStatus
}
type Store struct {
sync.Map
}
func NewStore() *Store {
return &Store{}
}
func (s *Store) Save(id string, exitCh <-chan containerd.ExitStatus) {
s.Store(id, &Status{Waiter: exitCh})
}
func (s *Store) Get(id string) *Status {
i, ok := s.LoadAndDelete(id)
if !ok {
// not exist
return nil
}
// Only save *Status
return i.(*Status)
}

View File

@ -27,6 +27,7 @@ import (
"strings"
"time"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/protobuf"
sb "github.com/containerd/containerd/sandbox"
"github.com/containerd/go-cni"
@ -203,6 +204,26 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)
}
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
go func() {
resp, err := c.sandboxController.Wait(context.Background(), id)
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
// Pid is not used
Pid: 0,
ExitStatus: resp.ExitStatus,
ExitedAt: resp.ExitedAt,
}
c.eventMonitor.backOff.enBackOff(id, e)
}
}()
sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil

View File

@ -17,31 +17,10 @@
package sbserver
import (
"fmt"
"os"
"github.com/containerd/containerd"
"github.com/containerd/containerd/plugin"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
osinterface "github.com/containerd/containerd/pkg/os"
)
// cleanupSandboxFiles unmount some sandbox files, we rely on the removal of sandbox root directory to
// remove these files. Unmount should *NOT* return error if the mount point is already unmounted.
func (c *criService) cleanupSandboxFiles(id string, config *runtime.PodSandboxConfig) error {
if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetIpc() != runtime.NamespaceMode_NODE {
path, err := c.os.FollowSymlinkInScope(c.getSandboxDevShm(id), "/")
if err != nil {
return fmt.Errorf("failed to follow symlink: %w", err)
}
if err := c.os.(osinterface.UNIX).Unmount(path); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unmount %q: %w", path, err)
}
}
return nil
}
// taskOpts generates task options for a (sandbox) container.
func (c *criService) taskOpts(runtimeType string) []containerd.NewTaskOpts {
// TODO(random-liu): Remove this after shim v1 is deprecated.

View File

@ -21,15 +21,8 @@ package sbserver
import (
"github.com/containerd/containerd"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// cleanupSandboxFiles unmount some sandbox files, we rely on the removal of sandbox root directory to
// remove these files. Unmount should *NOT* return error if the mount point is already unmounted.
func (c *criService) cleanupSandboxFiles(id string, config *runtime.PodSandboxConfig) error {
return nil
}
// taskOpts generates task options for a (sandbox) container.
func (c *criService) taskOpts(runtimeType string) []containerd.NewTaskOpts {
return []containerd.NewTaskOpts{}

View File

@ -18,14 +18,8 @@ package sbserver
import (
"github.com/containerd/containerd"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// No sandbox files needed for windows.
func (c *criService) cleanupSandboxFiles(id string, config *runtime.PodSandboxConfig) error {
return nil
}
// No task options needed for windows.
func (c *criService) taskOpts(runtimeType string) []containerd.NewTaskOpts {
return nil

View File

@ -20,17 +20,12 @@ import (
"context"
"errors"
"fmt"
"syscall"
"time"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/protobuf"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
)
// StopPodSandbox stops the sandbox. If there are any running containers in the
@ -69,17 +64,10 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
}
}
if err := c.cleanupSandboxFiles(id, sandbox.Config); err != nil {
return fmt.Errorf("failed to cleanup sandbox files: %w", err)
if _, err := c.sandboxController.Stop(ctx, id); err != nil {
return fmt.Errorf("failed to stop sandbox %q: %w", id, err)
}
// Only stop sandbox container when it's running or unknown.
state := sandbox.Status.Get().State
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
if err := c.stopSandboxContainer(ctx, sandbox); err != nil {
return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", id, state, err)
}
}
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
// Teardown network for sandbox.
@ -106,58 +94,6 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
return nil
}
// stopSandboxContainer kills the sandbox container.
// `task.Delete` is not called here because it will be called when
// the event monitor handles the `TaskExit` event.
func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error {
id := sandbox.ID
container := sandbox.Container
state := sandbox.Status.Get().State
task, err := container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get sandbox container: %w", err)
}
// Don't return for unknown state, some cleanup needs to be done.
if state == sandboxstore.StateUnknown {
return cleanupUnknownSandbox(ctx, id, sandbox)
}
return nil
}
// Handle unknown state.
// The cleanup logic is the same with container unknown state.
if state == sandboxstore.StateUnknown {
// Start an exit handler for containers in unknown state.
waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext())
defer waitCancel()
exitCh, err := task.Wait(waitCtx)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task: %w", err)
}
return cleanupUnknownSandbox(ctx, id, sandbox)
}
exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
// `Wait` is cancelled, so no exit event is generated
// because of the `Wait` cancellation.
<-stopCh
}()
}
// Kill the sandbox container.
if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to kill sandbox container: %w", err)
}
return c.waitSandboxStop(ctx, sandbox)
}
// waitSandboxStop waits for sandbox to be stopped until context is cancelled or
// the context deadline is exceeded.
func (c *criService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox) error {
@ -188,15 +124,3 @@ func (c *criService) teardownPodNetwork(ctx context.Context, sandbox sandboxstor
return netPlugin.Remove(ctx, id, path, opts...)
}
// cleanupUnknownSandbox cleanup stopped sandbox in unknown state.
func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
// Reuse handleSandboxExit to do the cleanup.
return handleSandboxExit(ctx, &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: 0,
ExitStatus: unknownExitCode,
ExitedAt: protobuf.ToTimestamp(time.Now()),
}, sandbox)
}

View File

@ -17,7 +17,6 @@
package sbserver
import (
"context"
"encoding/json"
"fmt"
"io"
@ -191,10 +190,10 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
return c, nil
}
// StartSandboxExitMonitor is a temporary workaround to call monitor from pause controller.
// BackOffEvent is a temporary workaround to call eventMonitor from controller.Stop.
// TODO: get rid of this.
func (c *criService) StartSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
return c.eventMonitor.startSandboxExitMonitor(ctx, id, pid, exitCh)
func (c *criService) BackOffEvent(id string, event interface{}) {
c.eventMonitor.backOff.enBackOff(id, event)
}
// Register registers all required services onto a specific grpc server.