Move pod sandbox recovery to podsandbox/ package

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2023-05-30 10:58:50 -07:00
parent 45dbb4e542
commit cf56054594
4 changed files with 192 additions and 124 deletions

View File

@ -418,28 +418,11 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
// handleSandboxExit handles TaskExit event for sandbox. // handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error { func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error {
// TODO: Move pause container cleanup to podsandbox/ package.
if sb.Container != nil {
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to load task for sandbox: %w", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop sandbox: %w", err)
}
// Move on to make sure container status is updated.
}
}
}
if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.State = sandboxstore.StateNotReady status.State = sandboxstore.StateNotReady
status.Pid = 0 status.Pid = 0
status.ExitStatus = e.ExitStatus
status.ExitedAt = e.ExitedAt.AsTime()
return status, nil return status, nil
}); err != nil { }); err != nil {
return fmt.Errorf("failed to update sandbox state: %w", err) return fmt.Errorf("failed to update sandbox state: %w", err)

View File

@ -41,7 +41,6 @@ import (
criconfig "github.com/containerd/containerd/pkg/cri/config" criconfig "github.com/containerd/containerd/pkg/cri/config"
containerstore "github.com/containerd/containerd/pkg/cri/store/container" containerstore "github.com/containerd/containerd/pkg/cri/store/container"
imagestore "github.com/containerd/containerd/pkg/cri/store/image" imagestore "github.com/containerd/containerd/pkg/cri/store/image"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
runtimeoptions "github.com/containerd/containerd/pkg/runtimeoptions/v1" runtimeoptions "github.com/containerd/containerd/pkg/runtimeoptions/v1"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
runcoptions "github.com/containerd/containerd/runtime/v2/runc/options" runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
@ -81,8 +80,6 @@ const (
// containerKindContainer is a label value indicating container is application container // containerKindContainer is a label value indicating container is application container
containerKindContainer = "container" containerKindContainer = "container"
// sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest
sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata"
// containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest // containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest
containerMetadataExtension = criContainerdPrefix + ".container.metadata" containerMetadataExtension = criContainerdPrefix + ".container.metadata"
@ -334,13 +331,6 @@ func unknownContainerStatus() containerstore.Status {
} }
} }
// unknownSandboxStatus returns the default sandbox status when its status is unknown.
func unknownSandboxStatus() sandboxstore.Status {
return sandboxstore.Status{
State: sandboxstore.StateUnknown,
}
}
// getPassthroughAnnotations filters requested pod annotations by comparing // getPassthroughAnnotations filters requested pod annotations by comparing
// against permitted annotations for the given runtime. // against permitted annotations for the given runtime.
func getPassthroughAnnotations(podAnnotations map[string]string, func getPassthroughAnnotations(podAnnotations map[string]string,

View File

@ -0,0 +1,174 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podsandbox
import (
"context"
"fmt"
goruntime "runtime"
"time"
"github.com/containerd/containerd/pkg/netns"
"github.com/containerd/typeurl/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
)
// loadContainerTimeout is the default timeout for loading a container/sandbox.
// One container/sandbox hangs (e.g. containerd#2438) should not affect other
// containers/sandboxes.
// Most CRI container/sandbox related operations are per container, the ones
// which handle multiple containers at a time are:
// * ListPodSandboxes: Don't talk with containerd services.
// * ListContainers: Don't talk with containerd services.
// * ListContainerStats: Not in critical code path, a default timeout will
// be applied at CRI level.
// * Recovery logic: We should set a time for each container/sandbox recovery.
// * Event monitor: We should set a timeout for each container/sandbox event handling.
const loadContainerTimeout = 10 * time.Second
func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) {
ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout)
defer cancel()
var sandbox sandboxstore.Sandbox
// Load sandbox metadata.
exts, err := cntr.Extensions(ctx)
if err != nil {
return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err)
}
ext, ok := exts[sandboxMetadataExtension]
if !ok {
return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension)
}
data, err := typeurl.UnmarshalAny(ext)
if err != nil {
return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err)
}
meta := data.(*sandboxstore.Metadata)
s, err := func() (sandboxstore.Status, error) {
status := sandboxstore.Status{
State: sandboxstore.StateUnknown,
}
// Load sandbox created timestamp.
info, err := cntr.Info(ctx)
if err != nil {
return status, fmt.Errorf("failed to get sandbox container info: %w", err)
}
status.CreatedAt = info.CreatedAt
// Load sandbox state.
t, err := cntr.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to load task: %w", err)
}
var taskStatus containerd.Status
var notFound bool
if errdefs.IsNotFound(err) {
// Task is not found.
notFound = true
} else {
// Task is found. Get task status.
taskStatus, err = t.Status(ctx)
if err != nil {
// It's still possible that task is deleted during this window.
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to get task status: %w", err)
}
notFound = true
}
}
if notFound {
// Task does not exist, set sandbox state as NOTREADY.
status.State = sandboxstore.StateNotReady
} else {
if taskStatus.Status == containerd.Running {
// Wait for the task for sandbox monitor.
// wait is a long running background request, no timeout needed.
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
if err != nil {
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to wait for task: %w", err)
}
status.State = sandboxstore.StateNotReady
} else {
// Task is running, set sandbox state as READY.
status.State = sandboxstore.StateReady
status.Pid = t.Pid()
go func() {
c.waitSandboxExit(context.Background(), meta.ID, exitCh)
}()
}
} else {
// Task is not running. Delete the task and set sandbox state as NOTREADY.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to delete task: %w", err)
}
status.State = sandboxstore.StateNotReady
}
}
return status, nil
}()
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID())
}
sandbox = sandboxstore.NewSandbox(*meta, s)
sandbox.Container = cntr
// Load network namespace.
sandbox.NetNS = getNetNS(meta)
// It doesn't matter whether task is running or not. If it is running, sandbox
// status will be `READY`; if it is not running, sandbox status will be `NOT_READY`,
// kubelet will stop the sandbox which will properly cleanup everything.
return sandbox, nil
}
func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS {
// Don't need to load netns for host network sandbox.
if hostNetwork(meta.Config) {
return nil
}
return netns.LoadNetNS(meta.NetNSPath)
}
// hostNetwork handles checking if host networking was requested.
// TODO: Copy pasted from sbserver to handle container sandbox events in podsandbox/ package, needs refactoring.
func hostNetwork(config *runtime.PodSandboxConfig) bool {
var hostNet bool
switch goruntime.GOOS {
case "windows":
// Windows HostProcess pods can only run on the host network
hostNet = config.GetWindows().GetSecurityContext().GetHostProcess()
case "darwin":
// No CNI on Darwin yet.
hostNet = true
default:
// Even on other platforms, the logic containerd uses is to check if NamespaceMode == NODE.
// So this handles Linux, as well as any other platforms not governed by the cases above
// that have special quirks.
hostNet = config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE
}
return hostNet
}

View File

@ -31,6 +31,7 @@ import (
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
criconfig "github.com/containerd/containerd/pkg/cri/config" criconfig "github.com/containerd/containerd/pkg/cri/config"
"github.com/containerd/containerd/pkg/cri/sbserver/podsandbox" "github.com/containerd/containerd/pkg/cri/sbserver/podsandbox"
"github.com/containerd/containerd/pkg/netns"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/containerd/typeurl/v2" "github.com/containerd/typeurl/v2"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -40,7 +41,6 @@ import (
containerstore "github.com/containerd/containerd/pkg/cri/store/container" containerstore "github.com/containerd/containerd/pkg/cri/store/container"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util" ctrdutil "github.com/containerd/containerd/pkg/cri/util"
"github.com/containerd/containerd/pkg/netns"
) )
// NOTE: The recovery logic has following assumption: when the cri plugin is down: // NOTE: The recovery logic has following assumption: when the cri plugin is down:
@ -60,11 +60,21 @@ func (c *criService) recover(ctx context.Context) error {
return fmt.Errorf("failed to list sandbox containers: %w", err) return fmt.Errorf("failed to list sandbox containers: %w", err)
} }
podSandboxController, ok := c.sandboxControllers[criconfig.ModePodSandbox]
if !ok {
log.G(ctx).Fatal("unable to restore pod sandboxes, no controller found")
}
podSandboxLoader, ok := podSandboxController.(podSandboxRecover)
if !ok {
log.G(ctx).Fatal("pod sandbox controller doesn't support recovery")
}
eg, ctx2 := errgroup.WithContext(ctx) eg, ctx2 := errgroup.WithContext(ctx)
for _, sandbox := range sandboxes { for _, sandbox := range sandboxes {
sandbox := sandbox sandbox := sandbox
eg.Go(func() error { eg.Go(func() error {
sb, err := c.loadSandbox(ctx2, sandbox) sb, err := podSandboxLoader.RecoverContainer(ctx2, sandbox)
if err != nil { if err != nil {
log.G(ctx2).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID()) log.G(ctx2).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID())
return nil return nil
@ -388,99 +398,10 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
return containerstore.NewContainer(*meta, opts...) return containerstore.NewContainer(*meta, opts...)
} }
// loadSandbox loads sandbox from containerd. // podSandboxRecover is an additional interface implemented by podsandbox/ controller to handle
func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { // Pod sandbox containers recovery.
ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) type podSandboxRecover interface {
defer cancel() RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error)
var sandbox sandboxstore.Sandbox
// Load sandbox metadata.
exts, err := cntr.Extensions(ctx)
if err != nil {
return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err)
}
ext, ok := exts[sandboxMetadataExtension]
if !ok {
return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension)
}
data, err := typeurl.UnmarshalAny(ext)
if err != nil {
return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err)
}
meta := data.(*sandboxstore.Metadata)
s, err := func() (sandboxstore.Status, error) {
status := unknownSandboxStatus()
// Load sandbox created timestamp.
info, err := cntr.Info(ctx)
if err != nil {
return status, fmt.Errorf("failed to get sandbox container info: %w", err)
}
status.CreatedAt = info.CreatedAt
// Load sandbox state.
t, err := cntr.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to load task: %w", err)
}
var taskStatus containerd.Status
var notFound bool
if errdefs.IsNotFound(err) {
// Task is not found.
notFound = true
} else {
// Task is found. Get task status.
taskStatus, err = t.Status(ctx)
if err != nil {
// It's still possible that task is deleted during this window.
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to get task status: %w", err)
}
notFound = true
}
}
if notFound {
// Task does not exist, set sandbox state as NOTREADY.
status.State = sandboxstore.StateNotReady
} else {
if taskStatus.Status == containerd.Running {
// Wait for the task for sandbox monitor.
// wait is a long running background request, no timeout needed.
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
if err != nil {
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to wait for task: %w", err)
}
status.State = sandboxstore.StateNotReady
} else {
// Task is running, set sandbox state as READY.
status.State = sandboxstore.StateReady
status.Pid = t.Pid()
c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
}
} else {
// Task is not running. Delete the task and set sandbox state as NOTREADY.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to delete task: %w", err)
}
status.State = sandboxstore.StateNotReady
}
}
return status, nil
}()
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID())
}
sandbox = sandboxstore.NewSandbox(*meta, s)
sandbox.Container = cntr
// Load network namespace.
sandbox.NetNS = getNetNS(meta)
// It doesn't matter whether task is running or not. If it is running, sandbox
// status will be `READY`; if it is not running, sandbox status will be `NOT_READY`,
// kubelet will stop the sandbox which will properly cleanup everything.
return sandbox, nil
} }
func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS { func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS {