From 0f1d27412f2321f3eea4a0b03ea4c2414a945c1a Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Tue, 30 Jan 2024 20:20:55 +0800 Subject: [PATCH 1/3] sandbox: add methods to sandboxService so that we cri service don't have to get sandbox controller everytime it needs to call sandbox controller api. Signed-off-by: Abel Feng --- .../build_local_containerd_helper_test.go | 1 - internal/cri/server/container_create.go | 9 +- internal/cri/server/container_stats_list.go | 7 +- internal/cri/server/podsandbox/recover.go | 4 +- internal/cri/server/restart.go | 20 ++-- internal/cri/server/sandbox_remove.go | 8 +- internal/cri/server/sandbox_run.go | 30 ++---- internal/cri/server/sandbox_service.go | 94 ++++++++++++++++--- internal/cri/server/sandbox_status.go | 7 +- internal/cri/server/sandbox_stop.go | 8 +- internal/cri/server/service.go | 16 +++- internal/cri/server/service_test.go | 36 ++++++- internal/cri/store/sandbox/sandbox.go | 2 + plugins/cri/cri.go | 25 +++-- 14 files changed, 169 insertions(+), 98 deletions(-) diff --git a/integration/build_local_containerd_helper_test.go b/integration/build_local_containerd_helper_test.go index 1a3a44193..20488feff 100644 --- a/integration/build_local_containerd_helper_test.go +++ b/integration/build_local_containerd_helper_test.go @@ -129,7 +129,6 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPl containerd.WithDefaultNamespace(constants.K8sContainerdNamespace), containerd.WithDefaultPlatform(platforms.Default()), containerd.WithInMemoryServices(lastInitContext), - containerd.WithInMemorySandboxControllers(lastInitContext), ) require.NoError(t, err) diff --git a/internal/cri/server/container_create.go b/internal/cri/server/container_create.go index abc8f618e..f6ec18e91 100644 --- a/internal/cri/server/container_create.go +++ b/internal/cri/server/container_create.go @@ -63,12 +63,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta return nil, fmt.Errorf("failed to find sandbox id %q: %w", r.GetPodSandboxId(), err) } - controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox controller: %w", err) - } - - cstatus, err := controller.Status(ctx, sandbox.ID, false) + cstatus, err := c.sandboxService.SandboxStatus(ctx, sandbox.Sandboxer, sandbox.ID, false) if err != nil { return nil, fmt.Errorf("failed to get controller status: %w", err) } @@ -150,7 +145,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta } }() - platform, err := controller.Platform(ctx, sandboxID) + platform, err := c.sandboxService.SandboxPlatform(ctx, sandbox.Sandboxer, sandboxID) if err != nil { return nil, fmt.Errorf("failed to query sandbox platform: %w", err) } diff --git a/internal/cri/server/container_stats_list.go b/internal/cri/server/container_stats_list.go index 4279c544a..950886e48 100644 --- a/internal/cri/server/container_stats_list.go +++ b/internal/cri/server/container_stats_list.go @@ -68,15 +68,12 @@ func (c *criService) getMetricsHandler(ctx context.Context, sandboxID string) (m if err != nil { return nil, fmt.Errorf("failed to find sandbox id %q: %w", sandboxID, err) } - controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox controller: %w", err) - } + // Grab the platform that this containers sandbox advertises. Reason being, even if // the host may be {insert platform}, if it virtualizes or emulates a different platform // it will return stats in that format, and we need to handle the conversion logic based // off of this info. - p, err := controller.Platform(ctx, sandboxID) + p, err := c.sandboxService.SandboxPlatform(ctx, sandbox.Sandboxer, sandboxID) if err != nil { return nil, err } diff --git a/internal/cri/server/podsandbox/recover.go b/internal/cri/server/podsandbox/recover.go index 1693c43c1..8b8b250a2 100644 --- a/internal/cri/server/podsandbox/recover.go +++ b/internal/cri/server/podsandbox/recover.go @@ -22,16 +22,17 @@ import ( goruntime "runtime" "time" + "github.com/containerd/errdefs" "github.com/containerd/log" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" containerd "github.com/containerd/containerd/v2/client" sandbox2 "github.com/containerd/containerd/v2/core/sandbox" + "github.com/containerd/containerd/v2/internal/cri/config" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" "github.com/containerd/containerd/v2/pkg/netns" - "github.com/containerd/errdefs" ) // loadContainerTimeout is the default timeout for loading a container/sandbox. @@ -144,6 +145,7 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta sandbox = sandboxstore.NewSandbox(*meta, s) sandbox.Container = cntr + sandbox.Sandboxer = string(config.ModePodSandbox) // Load network namespace. sandbox.NetNS = getNetNS(meta) diff --git a/internal/cri/server/restart.go b/internal/cri/server/restart.go index 63b87b37c..49b0c5832 100644 --- a/internal/cri/server/restart.go +++ b/internal/cri/server/restart.go @@ -58,8 +58,10 @@ func (c *criService) recover(ctx context.Context) error { return fmt.Errorf("failed to list sandbox containers: %w", err) } - podSandboxController := c.client.SandboxController(string(criconfig.ModePodSandbox)) - + podSandboxController, err := c.sandboxService.SandboxController(string(criconfig.ModePodSandbox)) + if err != nil { + return fmt.Errorf("failed to get podsanbox controller %v", err) + } podSandboxLoader, ok := podSandboxController.(podSandboxRecover) if !ok { log.G(ctx).Fatal("pod sandbox controller doesn't support recovery") @@ -134,6 +136,7 @@ func (c *criService) recover(ctx context.Context) error { } sb := sandboxstore.NewSandbox(metadata, sandboxstore.Status{State: state}) + sb.Sandboxer = sbx.Sandboxer // Load network namespace. sb.NetNS = getNetNS(&metadata) @@ -149,20 +152,11 @@ func (c *criService) recover(ctx context.Context) error { if status.State == sandboxstore.StateNotReady { continue } - controller, err := c.sandboxService.SandboxController(sb.Config, sb.RuntimeHandler) + exitCh, err := c.sandboxService.WaitSandbox(ctrdutil.NamespacedContext(), sb.Sandboxer, sb.ID) if err != nil { - log.G(ctx).WithError(err).Error("failed to get sandbox controller while waiting sandbox") + log.G(ctx).WithError(err).Error("failed to wait sandbox") continue } - exitCh := make(chan containerd.ExitStatus, 1) - go func() { - exit, err := controller.Wait(ctrdutil.NamespacedContext(), sb.ID) - if err != nil { - log.G(ctx).WithError(err).Error("failed to wait for sandbox exit") - exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err) - } - exitCh <- *containerd.NewExitStatus(exit.ExitStatus, exit.ExitedAt, nil) - }() c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh) } // Recover all containers. diff --git a/internal/cri/server/sandbox_remove.go b/internal/cri/server/sandbox_remove.go index ed3b1ae97..1829accd8 100644 --- a/internal/cri/server/sandbox_remove.go +++ b/internal/cri/server/sandbox_remove.go @@ -79,13 +79,7 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS } } - // Use sandbox controller to delete sandbox - controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox controller: %w", err) - } - - if err := controller.Shutdown(ctx, id); err != nil && !errdefs.IsNotFound(err) { + if err := c.sandboxService.ShutdownSandbox(ctx, sandbox.Sandboxer, id); err != nil && !errdefs.IsNotFound(err) { return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err) } diff --git a/internal/cri/server/sandbox_run.go b/internal/cri/server/sandbox_run.go index d2d6822bd..f01cc4ef9 100644 --- a/internal/cri/server/sandbox_run.go +++ b/internal/cri/server/sandbox_run.go @@ -31,7 +31,6 @@ import ( "github.com/containerd/typeurl/v2" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" - containerd "github.com/containerd/containerd/v2/client" sb "github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/internal/cri/annotations" "github.com/containerd/containerd/v2/internal/cri/bandwidth" @@ -124,6 +123,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox CreatedAt: time.Now().UTC(), }, ) + sandbox.Sandboxer = ociRuntime.Sandboxer if _, err := c.client.SandboxStore().Create(ctx, sandboxInfo); err != nil { return nil, fmt.Errorf("failed to save sandbox metadata: %w", err) @@ -240,21 +240,16 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err) } - controller, err := c.sandboxService.SandboxController(config, r.GetRuntimeHandler()) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox controller: %w", err) - } - // Save sandbox metadata to store if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil { return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err) } - if err := controller.Create(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil { + if err := c.sandboxService.CreateSandbox(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil { return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err) } - ctrl, err := controller.Start(ctx, id) + ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id) if err != nil { var cerr podsandbox.CleanupErr if errors.As(err, &cerr) { @@ -401,21 +396,10 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox // SandboxStatus from the store and include it in the event. c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) - // TODO: Use sandbox client instead - exitCh := make(chan containerd.ExitStatus, 1) - go func() { - defer close(exitCh) - - ctx := util.NamespacedContext() - resp, err := controller.Wait(ctx, id) - if err != nil { - log.G(ctx).WithError(err).Error("failed to wait for sandbox exit") - exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err) - return - } - - exitCh <- *containerd.NewExitStatus(resp.ExitStatus, resp.ExitedAt, nil) - }() + exitCh, err := c.sandboxService.WaitSandbox(util.NamespacedContext(), sandbox.Sandboxer, id) + if err != nil { + return nil, fmt.Errorf("failed to wait sandbox %s: %v", id, err) + } // start the monitor after adding sandbox into the store, this ensures // that sandbox is in the store, when event monitor receives the TaskExit event. diff --git a/internal/cri/server/sandbox_service.go b/internal/cri/server/sandbox_service.go index b4041ccd5..1a568ae93 100644 --- a/internal/cri/server/sandbox_service.go +++ b/internal/cri/server/sandbox_service.go @@ -17,9 +17,11 @@ package server import ( + "context" "fmt" + "time" - runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + "github.com/containerd/platforms" "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/sandbox" @@ -27,21 +29,91 @@ import ( ) type criSandboxService struct { - cli *client.Client - config *criconfig.Config + sandboxControllers map[string]sandbox.Controller + config *criconfig.Config } -func newCriSandboxService(config *criconfig.Config, c *client.Client) *criSandboxService { +func newCriSandboxService(config *criconfig.Config, sandboxers map[string]sandbox.Controller) *criSandboxService { return &criSandboxService{ - cli: c, - config: config, + sandboxControllers: sandboxers, + config: config, } } -func (c *criSandboxService) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) { - ociRuntime, err := c.config.GetSandboxRuntime(config, runtimeHandler) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox runtime: %w", err) +func (c *criSandboxService) SandboxController(sandboxer string) (sandbox.Controller, error) { + sbController, ok := c.sandboxControllers[sandboxer] + if !ok { + return nil, fmt.Errorf("failed to get sandbox controller by %s", sandboxer) } - return c.cli.SandboxController(ociRuntime.Sandboxer), nil + return sbController, nil +} + +func (c *criSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error { + ctrl, err := c.SandboxController(info.Sandboxer) + if err != nil { + return err + } + return ctrl.Create(ctx, info, opts...) +} + +func (c *criSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) { + ctrl, err := c.SandboxController(sandboxer) + if err != nil { + return sandbox.ControllerInstance{}, err + } + return ctrl.Start(ctx, sandboxID) +} + +func (c *criSandboxService) WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan client.ExitStatus, error) { + ctrl, err := c.SandboxController(sandboxer) + if err != nil { + return nil, err + } + + ch := make(chan client.ExitStatus, 1) + go func() { + defer close(ch) + + exitStatus, err := ctrl.Wait(ctx, sandboxID) + if err != nil { + ch <- *client.NewExitStatus(client.UnknownExitStatus, time.Time{}, err) + return + } + + ch <- *client.NewExitStatus(exitStatus.ExitStatus, exitStatus.ExitedAt, nil) + }() + + return ch, nil +} + +func (c *criSandboxService) SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) { + ctrl, err := c.SandboxController(sandboxer) + if err != nil { + return sandbox.ControllerStatus{}, err + } + return ctrl.Status(ctx, sandboxID, verbose) +} + +func (c *criSandboxService) SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error) { + ctrl, err := c.SandboxController(sandboxer) + if err != nil { + return platforms.Platform{}, err + } + return ctrl.Platform(ctx, sandboxID) +} + +func (c *criSandboxService) ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error { + ctrl, err := c.SandboxController(sandboxer) + if err != nil { + return err + } + return ctrl.Shutdown(ctx, sandboxID) +} + +func (c *criSandboxService) StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error { + ctrl, err := c.SandboxController(sandboxer) + if err != nil { + return err + } + return ctrl.Stop(ctx, sandboxID, opts...) } diff --git a/internal/cri/server/sandbox_status.go b/internal/cri/server/sandbox_status.go index 0f4da8a31..31a07cb6d 100644 --- a/internal/cri/server/sandbox_status.go +++ b/internal/cri/server/sandbox_status.go @@ -41,17 +41,12 @@ func (c *criService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandbox return nil, fmt.Errorf("failed to get sandbox ip: %w", err) } - controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox controller: %w", err) - } - var ( createdAt time.Time state string info map[string]string ) - cstatus, err := controller.Status(ctx, sandbox.ID, r.GetVerbose()) + cstatus, err := c.sandboxService.SandboxStatus(ctx, sandbox.Sandboxer, sandbox.ID, r.GetVerbose()) if err != nil { // If the shim died unexpectedly (segfault etc.) let's set the state as // NOTREADY and not just error out to make k8s and clients like crictl diff --git a/internal/cri/server/sandbox_stop.go b/internal/cri/server/sandbox_stop.go index 889bd75ec..f5903eafd 100644 --- a/internal/cri/server/sandbox_stop.go +++ b/internal/cri/server/sandbox_stop.go @@ -68,13 +68,7 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa // Only stop sandbox container when it's running or unknown. state := sandbox.Status.Get().State if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { - // Use sandbox controller to stop sandbox - controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler) - if err != nil { - return fmt.Errorf("failed to get sandbox controller: %w", err) - } - - if err := controller.Stop(ctx, id); err != nil { + if err := c.sandboxService.StopSandbox(ctx, sandbox.Sandboxer, id); err != nil { // Log and ignore the error if controller already removed the sandbox if errdefs.IsNotFound(err) { log.G(ctx).Warnf("sandbox %q is not found when stopping it", id) diff --git a/internal/cri/server/service.go b/internal/cri/server/service.go index 182590f60..9e4ba10b3 100644 --- a/internal/cri/server/service.go +++ b/internal/cri/server/service.go @@ -28,9 +28,9 @@ import ( "github.com/containerd/go-cni" "github.com/containerd/log" + "github.com/containerd/platforms" "github.com/containerd/typeurl/v2" "github.com/opencontainers/runtime-spec/specs-go/features" - runtime "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/kubelet/pkg/cri/streaming" @@ -74,7 +74,14 @@ type CRIService interface { } type sandboxService interface { - SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) + CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error + StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) + WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan containerd.ExitStatus, error) + StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error + ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error + SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) + SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error) + SandboxController(sandboxer string) (sandbox.Controller, error) } // RuntimeService specifies dependencies to runtime service which provides @@ -188,7 +195,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi sandboxNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(), netPlugin: make(map[string]cni.CNI), - sandboxService: newCriSandboxService(&config, options.Client), + sandboxService: newCriSandboxService(&config, options.SandboxControllers), } // TODO: Make discard time configurable @@ -231,8 +238,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi } // Initialize pod sandbox controller - // TODO: Get this from options, NOT client - podSandboxController := options.Client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller) + podSandboxController := options.SandboxControllers[string(criconfig.ModePodSandbox)].(*podsandbox.Controller) podSandboxController.Init(c) c.nri = options.NRI diff --git a/internal/cri/server/service_test.go b/internal/cri/server/service_test.go index 2507632bb..b3eacf704 100644 --- a/internal/cri/server/service_test.go +++ b/internal/cri/server/service_test.go @@ -19,10 +19,12 @@ package server import ( "context" + "github.com/containerd/errdefs" "github.com/containerd/go-cni" - runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + "github.com/containerd/platforms" "github.com/containerd/containerd/v2/api/types" + containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/sandbox" criconfig "github.com/containerd/containerd/v2/internal/cri/config" containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" @@ -32,13 +34,39 @@ import ( "github.com/containerd/containerd/v2/internal/registrar" "github.com/containerd/containerd/v2/pkg/oci" ostesting "github.com/containerd/containerd/v2/pkg/os/testing" - "github.com/containerd/errdefs" - "github.com/containerd/platforms" ) type fakeSandboxService struct{} -func (f *fakeSandboxService) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) { +func (f *fakeSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error { + return errdefs.ErrNotImplemented +} + +func (f *fakeSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) { + return sandbox.ControllerInstance{}, errdefs.ErrNotImplemented +} + +func (f *fakeSandboxService) StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error { + return errdefs.ErrNotImplemented +} + +func (f *fakeSandboxService) ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error { + return errdefs.ErrNotImplemented +} + +func (f *fakeSandboxService) WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan containerd.ExitStatus, error) { + return nil, errdefs.ErrNotImplemented +} + +func (f *fakeSandboxService) SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) { + return sandbox.ControllerStatus{}, errdefs.ErrNotImplemented +} + +func (f *fakeSandboxService) SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error) { + return platforms.DefaultSpec(), nil +} + +func (f *fakeSandboxService) SandboxController(sandboxer string) (sandbox.Controller, error) { return &fakeSandboxController{}, nil } diff --git a/internal/cri/store/sandbox/sandbox.go b/internal/cri/store/sandbox/sandbox.go index 40074069e..ee8877f44 100644 --- a/internal/cri/store/sandbox/sandbox.go +++ b/internal/cri/store/sandbox/sandbox.go @@ -37,6 +37,8 @@ type Sandbox struct { Status StatusStorage // Container is the containerd sandbox container client. Container containerd.Container + // Sandboxer is the sandbox controller name of the sandbox + Sandboxer string // CNI network namespace client. // For hostnetwork pod, this is always nil; // For non hostnetwork pod, this should never be nil. diff --git a/plugins/cri/cri.go b/plugins/cri/cri.go index 3641ca155..4151e4393 100644 --- a/plugins/cri/cri.go +++ b/plugins/cri/cri.go @@ -24,6 +24,8 @@ import ( "github.com/containerd/log" "github.com/containerd/plugin" "github.com/containerd/plugin/registry" + "google.golang.org/grpc" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/sandbox" @@ -37,10 +39,6 @@ import ( "github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/containerd/v2/version" "github.com/containerd/platforms" - - "google.golang.org/grpc" - - runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) // Register CRI service plugin @@ -127,10 +125,9 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { return nil, fmt.Errorf("failed to create containerd client: %w", err) } - // TODO(dmcgowan): Get the full list directly from configured plugins - sbControllers := map[string]sandbox.Controller{ - string(criconfig.ModePodSandbox): client.SandboxController(string(criconfig.ModePodSandbox)), - string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)), + sbControllers, err := getSandboxControllers(ic) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox controllers from plugins %v", err) } streamingConfig, err := config.StreamingConfig() @@ -240,3 +237,15 @@ func getNRIAPI(ic *plugin.InitContext) *nri.API { return nri.NewAPI(api) } + +func getSandboxControllers(ic *plugin.InitContext) (map[string]sandbox.Controller, error) { + sandboxers, err := ic.GetByType(plugins.SandboxControllerPlugin) + if err != nil { + return nil, err + } + sc := make(map[string]sandbox.Controller) + for name, p := range sandboxers { + sc[name] = p.(sandbox.Controller) + } + return sc, nil +} From a0b73ae114be56d9ae6a1220a240b6588a9acead Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Fri, 12 Jan 2024 09:57:04 +0800 Subject: [PATCH 2/3] sandbox: optimize the lock in PodSandbox Signed-off-by: Abel Feng --- internal/cri/server/podsandbox/controller.go | 17 +++-- .../cri/server/podsandbox/controller_test.go | 9 +-- internal/cri/server/podsandbox/recover.go | 5 +- .../cri/server/podsandbox/recover_test.go | 2 +- internal/cri/server/podsandbox/sandbox_run.go | 17 +++-- .../cri/server/podsandbox/sandbox_status.go | 15 ++-- .../cri/server/podsandbox/sandbox_stop.go | 15 ++-- .../cri/server/podsandbox/types/podsandbox.go | 56 ++++++-------- .../podsandbox/types/podsandbox_test.go | 73 +++++++++++++++++++ 9 files changed, 138 insertions(+), 71 deletions(-) create mode 100644 internal/cri/server/podsandbox/types/podsandbox_test.go diff --git a/internal/cri/server/podsandbox/controller.go b/internal/cri/server/podsandbox/controller.go index 3d33fbc91..b68b06433 100644 --- a/internal/cri/server/podsandbox/controller.go +++ b/internal/cri/server/podsandbox/controller.go @@ -158,10 +158,10 @@ func (c *Controller) Wait(ctx context.Context, sandboxID string) (sandbox.ExitSt } -func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) { +func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) error { select { case e := <-exitCh: - exitStatus, exitedAt, err = e.Result() + exitStatus, exitedAt, err := e.Result() if err != nil { log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", p.ID) exitStatus = unknownExitCode @@ -171,12 +171,16 @@ func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, e dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) defer dcancel() event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)} - if cleanErr := handleSandboxTaskExit(dctx, p, event); cleanErr != nil { + if err := handleSandboxTaskExit(dctx, p, event); err != nil { + // TODO will backoff the event to the controller's own EventMonitor, but not cri's, + // because we should call handleSandboxTaskExit again the next time + // eventMonitor handle this event. but now it goes into cri's EventMonitor, + // the handleSandboxTaskExit will not be called anymore c.cri.BackOffEvent(p.ID, e) } - return + return nil case <-ctx.Done(): - return unknownExitCode, time.Now(), ctx.Err() + return ctx.Err() } } @@ -196,5 +200,8 @@ func handleSandboxTaskExit(ctx context.Context, sb *types.PodSandbox, e *eventty } } } + if err := sb.Exit(e.ExitStatus, protobuf.FromTimestamp(e.ExitedAt)); err != nil { + return err + } return nil } diff --git a/internal/cri/server/podsandbox/controller_test.go b/internal/cri/server/podsandbox/controller_test.go index fccc24073..ff0155846 100644 --- a/internal/cri/server/podsandbox/controller_test.go +++ b/internal/cri/server/podsandbox/controller_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" - containerd "github.com/containerd/containerd/v2/client" criconfig "github.com/containerd/containerd/v2/internal/cri/config" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" @@ -63,10 +62,7 @@ func Test_Status(t *testing.T) { CreatedAt: createdAt, }) sb.Metadata = sandboxstore.Metadata{ID: sandboxID} - err := controller.store.Save(sb) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, controller.store.Save(sb)) s, err := controller.Status(context.Background(), sandboxID, false) if err != nil { t.Fatal(err) @@ -75,7 +71,8 @@ func Test_Status(t *testing.T) { assert.Equal(t, s.CreatedAt, createdAt) assert.Equal(t, s.State, sandboxstore.StateReady.String()) - sb.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, nil)) + assert.NoError(t, sb.Exit(exitStatus, exitedAt)) + exit, err := controller.Wait(context.Background(), sandboxID) if err != nil { t.Fatal(err) diff --git a/internal/cri/server/podsandbox/recover.go b/internal/cri/server/podsandbox/recover.go index 8b8b250a2..bbd528bf0 100644 --- a/internal/cri/server/podsandbox/recover.go +++ b/internal/cri/server/podsandbox/recover.go @@ -134,8 +134,9 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta } if ch != nil { go func() { - code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch) - podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) + if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch); err != nil { + log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err) + } }() } diff --git a/internal/cri/server/podsandbox/recover_test.go b/internal/cri/server/podsandbox/recover_test.go index e9979c40b..dfa7a9304 100644 --- a/internal/cri/server/podsandbox/recover_test.go +++ b/internal/cri/server/podsandbox/recover_test.go @@ -403,7 +403,7 @@ func TestRecoverContainer(t *testing.T) { pSb := controller.store.Get(cont.ID()) assert.NotNil(t, pSb) - assert.Equal(t, c.expectedState, pSb.State, "%s state is not expected", cont.ID()) + assert.Equal(t, c.expectedState, pSb.Status.Get().State, "%s state is not expected", cont.ID()) if c.expectedExitCode > 0 { cont.t.waitExitCh <- struct{}{} diff --git a/internal/cri/server/podsandbox/sandbox_run.go b/internal/cri/server/podsandbox/sandbox_run.go index 8013314e2..ae2fc6a9d 100644 --- a/internal/cri/server/podsandbox/sandbox_run.go +++ b/internal/cri/server/podsandbox/sandbox_run.go @@ -216,7 +216,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll if err != nil { return cin, fmt.Errorf("failed to get sandbox container info: %w", err) } - podSandbox.CreatedAt = info.CreatedAt // Create sandbox task in containerd. log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name) @@ -242,7 +241,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll } } }() - podSandbox.Pid = task.Pid() // wait is a long running background request, no timeout needed. exitCh, err := task.Wait(ctrdutil.NamespacedContext()) @@ -267,7 +265,15 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll if err := task.Start(ctx); err != nil { return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) } - podSandbox.State = sandboxstore.StateReady + pid := task.Pid() + if err := podSandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + status.Pid = pid + status.State = sandboxstore.StateReady + status.CreatedAt = info.CreatedAt + return status, nil + }); err != nil { + return cin, fmt.Errorf("failed to update status of pod sandbox %q: %w", id, err) + } cin.SandboxID = id cin.Pid = task.Pid() @@ -275,8 +281,9 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll cin.Labels = labels go func() { - code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh) - podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) + if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh); err != nil { + log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err) + } }() return diff --git a/internal/cri/server/podsandbox/sandbox_status.go b/internal/cri/server/podsandbox/sandbox_status.go index e6925b3dc..9eb4b6773 100644 --- a/internal/cri/server/podsandbox/sandbox_status.go +++ b/internal/cri/server/podsandbox/sandbox_status.go @@ -36,18 +36,15 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) if sb == nil { return sandbox.ControllerStatus{}, fmt.Errorf("unable to find sandbox %q: %w", sandboxID, errdefs.ErrNotFound) } - + status := sb.Status.Get() cstatus := sandbox.ControllerStatus{ SandboxID: sandboxID, - Pid: sb.Pid, - State: sb.State.String(), - CreatedAt: sb.CreatedAt, + Pid: status.Pid, + State: status.State.String(), + CreatedAt: status.CreatedAt, + ExitedAt: status.ExitedAt, Extra: nil, } - exitStatus := sb.GetExitStatus() - if exitStatus != nil { - cstatus.ExitedAt = exitStatus.ExitTime() - } if verbose { info, err := toCRISandboxInfo(ctx, sb) @@ -64,7 +61,7 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) { si := &critypes.SandboxInfo{ - Pid: sb.Pid, + Pid: sb.Status.Get().Pid, Config: sb.Metadata.Config, RuntimeHandler: sb.Metadata.RuntimeHandler, CNIResult: sb.Metadata.CNIResult, diff --git a/internal/cri/server/podsandbox/sandbox_stop.go b/internal/cri/server/podsandbox/sandbox_stop.go index b625621f5..be5e892c0 100644 --- a/internal/cri/server/podsandbox/sandbox_stop.go +++ b/internal/cri/server/podsandbox/sandbox_stop.go @@ -22,16 +22,15 @@ import ( "syscall" "time" + "github.com/containerd/errdefs" "github.com/containerd/log" eventtypes "github.com/containerd/containerd/v2/api/events" - containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" "github.com/containerd/containerd/v2/protobuf" - "github.com/containerd/errdefs" ) func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error { @@ -46,7 +45,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St if err != nil { return err } - state := podSandbox.State + state := podSandbox.Status.Get().State if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { if err := c.stopSandboxContainer(ctx, podSandbox); err != nil { return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err) @@ -64,7 +63,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types.PodSandbox) error { id := podSandbox.ID container := podSandbox.Container - state := podSandbox.State + state := podSandbox.Status.Get().State task, err := container.Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { @@ -95,12 +94,8 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types stopCh := make(chan struct{}) go func() { defer close(stopCh) - exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, podSandbox, exitCh) - if err != context.Canceled && err != context.DeadlineExceeded { - // The error of context.Canceled or context.DeadlineExceeded indicates the task.Wait is not finished, - // so we can not set the exit status of the pod sandbox. - podSandbox.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, err)) - } else { + err := c.waitSandboxExit(exitCtx, podSandbox, exitCh) + if err != nil { log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err) } }() diff --git a/internal/cri/server/podsandbox/types/podsandbox.go b/internal/cri/server/podsandbox/types/podsandbox.go index 5dd08e5bb..bbc83db1c 100644 --- a/internal/cri/server/podsandbox/types/podsandbox.go +++ b/internal/cri/server/podsandbox/types/podsandbox.go @@ -18,7 +18,6 @@ package types import ( "context" - "sync" "time" containerd "github.com/containerd/containerd/v2/client" @@ -28,16 +27,12 @@ import ( ) type PodSandbox struct { - mu sync.Mutex - ID string - Container containerd.Container - State sandboxstore.State - Metadata sandboxstore.Metadata - Runtime sandbox.RuntimeOpts - Pid uint32 - CreatedAt time.Time - stopChan *store.StopCh - exitStatus *containerd.ExitStatus + ID string + Container containerd.Container + Metadata sandboxstore.Metadata + Runtime sandbox.RuntimeOpts + Status sandboxstore.StatusStorage + stopChan *store.StopCh } func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox { @@ -45,39 +40,34 @@ func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox { ID: id, Container: nil, stopChan: store.NewStopCh(), - CreatedAt: status.CreatedAt, - State: status.State, - Pid: status.Pid, + Status: sandboxstore.StoreStatus(status), } if status.State == sandboxstore.StateNotReady { - podSandbox.Exit(*containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil)) + podSandbox.stopChan.Stop() } return podSandbox } -func (p *PodSandbox) Exit(status containerd.ExitStatus) { - p.mu.Lock() - defer p.mu.Unlock() - p.exitStatus = &status - p.State = sandboxstore.StateNotReady +func (p *PodSandbox) Exit(code uint32, exitTime time.Time) error { + if err := p.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + status.State = sandboxstore.StateNotReady + status.ExitStatus = code + status.ExitedAt = exitTime + status.Pid = 0 + return status, nil + }); err != nil { + return err + } p.stopChan.Stop() + return nil } -func (p *PodSandbox) Wait(ctx context.Context) (*containerd.ExitStatus, error) { - s := p.GetExitStatus() - if s != nil { - return s, nil - } +func (p *PodSandbox) Wait(ctx context.Context) (containerd.ExitStatus, error) { select { case <-ctx.Done(): - return nil, ctx.Err() + return containerd.ExitStatus{}, ctx.Err() case <-p.stopChan.Stopped(): - return p.GetExitStatus(), nil + status := p.Status.Get() + return *containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil), nil } } - -func (p *PodSandbox) GetExitStatus() *containerd.ExitStatus { - p.mu.Lock() - defer p.mu.Unlock() - return p.exitStatus -} diff --git a/internal/cri/server/podsandbox/types/podsandbox_test.go b/internal/cri/server/podsandbox/types/podsandbox_test.go new file mode 100644 index 000000000..b68bfbefa --- /dev/null +++ b/internal/cri/server/podsandbox/types/podsandbox_test.go @@ -0,0 +1,73 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package types + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/containerd/containerd/v2/internal/cri/store/sandbox" +) + +func Test_PodSandbox(t *testing.T) { + p := NewPodSandbox("test", sandbox.Status{State: sandbox.StateUnknown}) + assert.Equal(t, p.Status.Get().State, sandbox.StateUnknown) + assert.Equal(t, p.ID, "test") + p.Metadata = sandbox.Metadata{ID: "test", NetNSPath: "/test"} + createAt := time.Now() + assert.NoError(t, p.Status.Update(func(status sandbox.Status) (sandbox.Status, error) { + status.State = sandbox.StateReady + status.Pid = uint32(100) + status.CreatedAt = createAt + return status, nil + })) + status := p.Status.Get() + assert.Equal(t, status.State, sandbox.StateReady) + assert.Equal(t, status.Pid, uint32(100)) + assert.Equal(t, status.CreatedAt, createAt) + + exitAt := time.Now().Add(time.Second) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + _, err := p.Wait(ctx) + assert.Equal(t, err, ctx.Err()) + }() + + wg.Add(1) + go func() { + defer wg.Done() + exitStatus, err := p.Wait(context.Background()) + assert.Equal(t, err, nil) + code, exitTime, err := exitStatus.Result() + assert.Equal(t, err, nil) + assert.Equal(t, code, uint32(128)) + assert.Equal(t, exitTime, exitAt) + }() + time.Sleep(time.Second) + if err := p.Exit(uint32(128), exitAt); err != nil { + t.Fatalf("failed to set exit of pod sandbox %v", err) + } + wg.Wait() +} From a60e52f582729e476e0e8ef3e78eb0ae66ddaaf2 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Tue, 30 Jan 2024 21:47:09 +0800 Subject: [PATCH 3/3] sandbox: add struct tags for PinnedImages Signed-off-by: Abel Feng --- internal/cri/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cri/config/config.go b/internal/cri/config/config.go index 47befe598..e1420eab9 100644 --- a/internal/cri/config/config.go +++ b/internal/cri/config/config.go @@ -277,7 +277,7 @@ type ImageConfig struct { // "base": "docker.io/library/ubuntu:latest" // Migrated from: // (PluginConfig).SandboxImage string `toml:"sandbox_image" json:"sandboxImage"` - PinnedImages map[string]string + PinnedImages map[string]string `toml:"pinned_images" json:"pinned_images"` // RuntimePlatforms is map between the runtime and the image platform to // use for that runtime. When resolving an image for a runtime, this