diff --git a/pkg/cri/sbserver/events.go b/pkg/cri/sbserver/events.go index d5abe820b..2a3f319d9 100644 --- a/pkg/cri/sbserver/events.go +++ b/pkg/cri/sbserver/events.go @@ -418,28 +418,11 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta // handleSandboxExit handles TaskExit event for sandbox. func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error { - // TODO: Move pause container cleanup to podsandbox/ package. - if sb.Container != nil { - // No stream attached to sandbox container. - task, err := sb.Container.Task(ctx, nil) - if err != nil { - if !errdefs.IsNotFound(err) { - return fmt.Errorf("failed to load task for sandbox: %w", err) - } - } else { - // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil { - if !errdefs.IsNotFound(err) { - return fmt.Errorf("failed to stop sandbox: %w", err) - } - // Move on to make sure container status is updated. - } - } - } - if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { status.State = sandboxstore.StateNotReady status.Pid = 0 + status.ExitStatus = e.ExitStatus + status.ExitedAt = e.ExitedAt.AsTime() return status, nil }); err != nil { return fmt.Errorf("failed to update sandbox state: %w", err) diff --git a/pkg/cri/sbserver/helpers.go b/pkg/cri/sbserver/helpers.go index fefb1646c..37cdb019f 100644 --- a/pkg/cri/sbserver/helpers.go +++ b/pkg/cri/sbserver/helpers.go @@ -41,7 +41,6 @@ import ( criconfig "github.com/containerd/containerd/pkg/cri/config" containerstore "github.com/containerd/containerd/pkg/cri/store/container" imagestore "github.com/containerd/containerd/pkg/cri/store/image" - sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" runtimeoptions "github.com/containerd/containerd/pkg/runtimeoptions/v1" "github.com/containerd/containerd/plugin" runcoptions "github.com/containerd/containerd/runtime/v2/runc/options" @@ -81,8 +80,6 @@ const ( // containerKindContainer is a label value indicating container is application container containerKindContainer = "container" - // sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest - sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata" // containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest containerMetadataExtension = criContainerdPrefix + ".container.metadata" @@ -334,13 +331,6 @@ func unknownContainerStatus() containerstore.Status { } } -// unknownSandboxStatus returns the default sandbox status when its status is unknown. -func unknownSandboxStatus() sandboxstore.Status { - return sandboxstore.Status{ - State: sandboxstore.StateUnknown, - } -} - // getPassthroughAnnotations filters requested pod annotations by comparing // against permitted annotations for the given runtime. func getPassthroughAnnotations(podAnnotations map[string]string, diff --git a/pkg/cri/sbserver/podsandbox/recover.go b/pkg/cri/sbserver/podsandbox/recover.go new file mode 100644 index 000000000..eb12cf665 --- /dev/null +++ b/pkg/cri/sbserver/podsandbox/recover.go @@ -0,0 +1,174 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package podsandbox + +import ( + "context" + "fmt" + goruntime "runtime" + "time" + + "github.com/containerd/containerd/pkg/netns" + "github.com/containerd/typeurl/v2" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" + ctrdutil "github.com/containerd/containerd/pkg/cri/util" +) + +// loadContainerTimeout is the default timeout for loading a container/sandbox. +// One container/sandbox hangs (e.g. containerd#2438) should not affect other +// containers/sandboxes. +// Most CRI container/sandbox related operations are per container, the ones +// which handle multiple containers at a time are: +// * ListPodSandboxes: Don't talk with containerd services. +// * ListContainers: Don't talk with containerd services. +// * ListContainerStats: Not in critical code path, a default timeout will +// be applied at CRI level. +// * Recovery logic: We should set a time for each container/sandbox recovery. +// * Event monitor: We should set a timeout for each container/sandbox event handling. +const loadContainerTimeout = 10 * time.Second + +func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { + ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) + defer cancel() + var sandbox sandboxstore.Sandbox + // Load sandbox metadata. + exts, err := cntr.Extensions(ctx) + if err != nil { + return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err) + } + ext, ok := exts[sandboxMetadataExtension] + if !ok { + return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension) + } + data, err := typeurl.UnmarshalAny(ext) + if err != nil { + return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err) + } + meta := data.(*sandboxstore.Metadata) + + s, err := func() (sandboxstore.Status, error) { + status := sandboxstore.Status{ + State: sandboxstore.StateUnknown, + } + // Load sandbox created timestamp. + info, err := cntr.Info(ctx) + if err != nil { + return status, fmt.Errorf("failed to get sandbox container info: %w", err) + } + status.CreatedAt = info.CreatedAt + + // Load sandbox state. + t, err := cntr.Task(ctx, nil) + if err != nil && !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to load task: %w", err) + } + var taskStatus containerd.Status + var notFound bool + if errdefs.IsNotFound(err) { + // Task is not found. + notFound = true + } else { + // Task is found. Get task status. + taskStatus, err = t.Status(ctx) + if err != nil { + // It's still possible that task is deleted during this window. + if !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to get task status: %w", err) + } + notFound = true + } + } + if notFound { + // Task does not exist, set sandbox state as NOTREADY. + status.State = sandboxstore.StateNotReady + } else { + if taskStatus.Status == containerd.Running { + // Wait for the task for sandbox monitor. + // wait is a long running background request, no timeout needed. + exitCh, err := t.Wait(ctrdutil.NamespacedContext()) + if err != nil { + if !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to wait for task: %w", err) + } + status.State = sandboxstore.StateNotReady + } else { + // Task is running, set sandbox state as READY. + status.State = sandboxstore.StateReady + status.Pid = t.Pid() + + go func() { + c.waitSandboxExit(context.Background(), meta.ID, exitCh) + }() + } + } else { + // Task is not running. Delete the task and set sandbox state as NOTREADY. + if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to delete task: %w", err) + } + status.State = sandboxstore.StateNotReady + } + } + return status, nil + }() + if err != nil { + log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID()) + } + + sandbox = sandboxstore.NewSandbox(*meta, s) + sandbox.Container = cntr + + // Load network namespace. + sandbox.NetNS = getNetNS(meta) + + // It doesn't matter whether task is running or not. If it is running, sandbox + // status will be `READY`; if it is not running, sandbox status will be `NOT_READY`, + // kubelet will stop the sandbox which will properly cleanup everything. + return sandbox, nil +} + +func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS { + // Don't need to load netns for host network sandbox. + if hostNetwork(meta.Config) { + return nil + } + return netns.LoadNetNS(meta.NetNSPath) +} + +// hostNetwork handles checking if host networking was requested. +// TODO: Copy pasted from sbserver to handle container sandbox events in podsandbox/ package, needs refactoring. +func hostNetwork(config *runtime.PodSandboxConfig) bool { + var hostNet bool + switch goruntime.GOOS { + case "windows": + // Windows HostProcess pods can only run on the host network + hostNet = config.GetWindows().GetSecurityContext().GetHostProcess() + case "darwin": + // No CNI on Darwin yet. + hostNet = true + default: + // Even on other platforms, the logic containerd uses is to check if NamespaceMode == NODE. + // So this handles Linux, as well as any other platforms not governed by the cases above + // that have special quirks. + hostNet = config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE + } + return hostNet +} diff --git a/pkg/cri/sbserver/restart.go b/pkg/cri/sbserver/restart.go index 7c88818ba..070e405be 100644 --- a/pkg/cri/sbserver/restart.go +++ b/pkg/cri/sbserver/restart.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd/log" criconfig "github.com/containerd/containerd/pkg/cri/config" "github.com/containerd/containerd/pkg/cri/sbserver/podsandbox" + "github.com/containerd/containerd/pkg/netns" "github.com/containerd/containerd/platforms" "github.com/containerd/typeurl/v2" "golang.org/x/sync/errgroup" @@ -40,7 +41,6 @@ import ( containerstore "github.com/containerd/containerd/pkg/cri/store/container" sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/pkg/cri/util" - "github.com/containerd/containerd/pkg/netns" ) // NOTE: The recovery logic has following assumption: when the cri plugin is down: @@ -60,11 +60,21 @@ func (c *criService) recover(ctx context.Context) error { return fmt.Errorf("failed to list sandbox containers: %w", err) } + podSandboxController, ok := c.sandboxControllers[criconfig.ModePodSandbox] + if !ok { + log.G(ctx).Fatal("unable to restore pod sandboxes, no controller found") + } + + podSandboxLoader, ok := podSandboxController.(podSandboxRecover) + if !ok { + log.G(ctx).Fatal("pod sandbox controller doesn't support recovery") + } + eg, ctx2 := errgroup.WithContext(ctx) for _, sandbox := range sandboxes { sandbox := sandbox eg.Go(func() error { - sb, err := c.loadSandbox(ctx2, sandbox) + sb, err := podSandboxLoader.RecoverContainer(ctx2, sandbox) if err != nil { log.G(ctx2).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID()) return nil @@ -388,99 +398,10 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe return containerstore.NewContainer(*meta, opts...) } -// loadSandbox loads sandbox from containerd. -func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { - ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) - defer cancel() - var sandbox sandboxstore.Sandbox - // Load sandbox metadata. - exts, err := cntr.Extensions(ctx) - if err != nil { - return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err) - } - ext, ok := exts[sandboxMetadataExtension] - if !ok { - return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension) - } - data, err := typeurl.UnmarshalAny(ext) - if err != nil { - return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err) - } - meta := data.(*sandboxstore.Metadata) - - s, err := func() (sandboxstore.Status, error) { - status := unknownSandboxStatus() - // Load sandbox created timestamp. - info, err := cntr.Info(ctx) - if err != nil { - return status, fmt.Errorf("failed to get sandbox container info: %w", err) - } - status.CreatedAt = info.CreatedAt - - // Load sandbox state. - t, err := cntr.Task(ctx, nil) - if err != nil && !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to load task: %w", err) - } - var taskStatus containerd.Status - var notFound bool - if errdefs.IsNotFound(err) { - // Task is not found. - notFound = true - } else { - // Task is found. Get task status. - taskStatus, err = t.Status(ctx) - if err != nil { - // It's still possible that task is deleted during this window. - if !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to get task status: %w", err) - } - notFound = true - } - } - if notFound { - // Task does not exist, set sandbox state as NOTREADY. - status.State = sandboxstore.StateNotReady - } else { - if taskStatus.Status == containerd.Running { - // Wait for the task for sandbox monitor. - // wait is a long running background request, no timeout needed. - exitCh, err := t.Wait(ctrdutil.NamespacedContext()) - if err != nil { - if !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to wait for task: %w", err) - } - status.State = sandboxstore.StateNotReady - } else { - // Task is running, set sandbox state as READY. - status.State = sandboxstore.StateReady - status.Pid = t.Pid() - c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh) - } - } else { - // Task is not running. Delete the task and set sandbox state as NOTREADY. - if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to delete task: %w", err) - } - status.State = sandboxstore.StateNotReady - } - } - return status, nil - }() - if err != nil { - log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID()) - } - - sandbox = sandboxstore.NewSandbox(*meta, s) - sandbox.Container = cntr - - // Load network namespace. - sandbox.NetNS = getNetNS(meta) - - // It doesn't matter whether task is running or not. If it is running, sandbox - // status will be `READY`; if it is not running, sandbox status will be `NOT_READY`, - // kubelet will stop the sandbox which will properly cleanup everything. - return sandbox, nil +// podSandboxRecover is an additional interface implemented by podsandbox/ controller to handle +// Pod sandbox containers recovery. +type podSandboxRecover interface { + RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) } func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS {