sandbox: add methods to sandboxService

so that we cri service don't have to get sandbox controller everytime it
needs to call sandbox controller api.

Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
Abel Feng 2024-01-30 20:20:55 +08:00
parent 33e544e94a
commit 0f1d27412f
14 changed files with 169 additions and 98 deletions

View File

@ -129,7 +129,6 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPl
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
containerd.WithDefaultPlatform(platforms.Default()),
containerd.WithInMemoryServices(lastInitContext),
containerd.WithInMemorySandboxControllers(lastInitContext),
)
require.NoError(t, err)

View File

@ -63,12 +63,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
return nil, fmt.Errorf("failed to find sandbox id %q: %w", r.GetPodSandboxId(), err)
}
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
cstatus, err := controller.Status(ctx, sandbox.ID, false)
cstatus, err := c.sandboxService.SandboxStatus(ctx, sandbox.Sandboxer, sandbox.ID, false)
if err != nil {
return nil, fmt.Errorf("failed to get controller status: %w", err)
}
@ -150,7 +145,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
}
}()
platform, err := controller.Platform(ctx, sandboxID)
platform, err := c.sandboxService.SandboxPlatform(ctx, sandbox.Sandboxer, sandboxID)
if err != nil {
return nil, fmt.Errorf("failed to query sandbox platform: %w", err)
}

View File

@ -68,15 +68,12 @@ func (c *criService) getMetricsHandler(ctx context.Context, sandboxID string) (m
if err != nil {
return nil, fmt.Errorf("failed to find sandbox id %q: %w", sandboxID, err)
}
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
// Grab the platform that this containers sandbox advertises. Reason being, even if
// the host may be {insert platform}, if it virtualizes or emulates a different platform
// it will return stats in that format, and we need to handle the conversion logic based
// off of this info.
p, err := controller.Platform(ctx, sandboxID)
p, err := c.sandboxService.SandboxPlatform(ctx, sandbox.Sandboxer, sandboxID)
if err != nil {
return nil, err
}

View File

@ -22,16 +22,17 @@ import (
goruntime "runtime"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client"
sandbox2 "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/cri/config"
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/containerd/v2/pkg/netns"
"github.com/containerd/errdefs"
)
// loadContainerTimeout is the default timeout for loading a container/sandbox.
@ -144,6 +145,7 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta
sandbox = sandboxstore.NewSandbox(*meta, s)
sandbox.Container = cntr
sandbox.Sandboxer = string(config.ModePodSandbox)
// Load network namespace.
sandbox.NetNS = getNetNS(meta)

View File

@ -58,8 +58,10 @@ func (c *criService) recover(ctx context.Context) error {
return fmt.Errorf("failed to list sandbox containers: %w", err)
}
podSandboxController := c.client.SandboxController(string(criconfig.ModePodSandbox))
podSandboxController, err := c.sandboxService.SandboxController(string(criconfig.ModePodSandbox))
if err != nil {
return fmt.Errorf("failed to get podsanbox controller %v", err)
}
podSandboxLoader, ok := podSandboxController.(podSandboxRecover)
if !ok {
log.G(ctx).Fatal("pod sandbox controller doesn't support recovery")
@ -134,6 +136,7 @@ func (c *criService) recover(ctx context.Context) error {
}
sb := sandboxstore.NewSandbox(metadata, sandboxstore.Status{State: state})
sb.Sandboxer = sbx.Sandboxer
// Load network namespace.
sb.NetNS = getNetNS(&metadata)
@ -149,20 +152,11 @@ func (c *criService) recover(ctx context.Context) error {
if status.State == sandboxstore.StateNotReady {
continue
}
controller, err := c.sandboxService.SandboxController(sb.Config, sb.RuntimeHandler)
exitCh, err := c.sandboxService.WaitSandbox(ctrdutil.NamespacedContext(), sb.Sandboxer, sb.ID)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get sandbox controller while waiting sandbox")
log.G(ctx).WithError(err).Error("failed to wait sandbox")
continue
}
exitCh := make(chan containerd.ExitStatus, 1)
go func() {
exit, err := controller.Wait(ctrdutil.NamespacedContext(), sb.ID)
if err != nil {
log.G(ctx).WithError(err).Error("failed to wait for sandbox exit")
exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err)
}
exitCh <- *containerd.NewExitStatus(exit.ExitStatus, exit.ExitedAt, nil)
}()
c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh)
}
// Recover all containers.

View File

@ -79,13 +79,7 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
}
}
// Use sandbox controller to delete sandbox
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
if err := controller.Shutdown(ctx, id); err != nil && !errdefs.IsNotFound(err) {
if err := c.sandboxService.ShutdownSandbox(ctx, sandbox.Sandboxer, id); err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err)
}

View File

@ -31,7 +31,6 @@ import (
"github.com/containerd/typeurl/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client"
sb "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/cri/annotations"
"github.com/containerd/containerd/v2/internal/cri/bandwidth"
@ -124,6 +123,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
CreatedAt: time.Now().UTC(),
},
)
sandbox.Sandboxer = ociRuntime.Sandboxer
if _, err := c.client.SandboxStore().Create(ctx, sandboxInfo); err != nil {
return nil, fmt.Errorf("failed to save sandbox metadata: %w", err)
@ -240,21 +240,16 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err)
}
controller, err := c.sandboxService.SandboxController(config, r.GetRuntimeHandler())
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
// Save sandbox metadata to store
if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {
return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)
}
if err := controller.Create(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil {
if err := c.sandboxService.CreateSandbox(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil {
return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)
}
ctrl, err := controller.Start(ctx, id)
ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id)
if err != nil {
var cerr podsandbox.CleanupErr
if errors.As(err, &cerr) {
@ -401,22 +396,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
// SandboxStatus from the store and include it in the event.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
// TODO: Use sandbox client instead
exitCh := make(chan containerd.ExitStatus, 1)
go func() {
defer close(exitCh)
ctx := util.NamespacedContext()
resp, err := controller.Wait(ctx, id)
exitCh, err := c.sandboxService.WaitSandbox(util.NamespacedContext(), sandbox.Sandboxer, id)
if err != nil {
log.G(ctx).WithError(err).Error("failed to wait for sandbox exit")
exitCh <- *containerd.NewExitStatus(containerd.UnknownExitStatus, time.Time{}, err)
return
return nil, fmt.Errorf("failed to wait sandbox %s: %v", id, err)
}
exitCh <- *containerd.NewExitStatus(resp.ExitStatus, resp.ExitedAt, nil)
}()
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//

View File

@ -17,9 +17,11 @@
package server
import (
"context"
"fmt"
"time"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/platforms"
"github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox"
@ -27,21 +29,91 @@ import (
)
type criSandboxService struct {
cli *client.Client
sandboxControllers map[string]sandbox.Controller
config *criconfig.Config
}
func newCriSandboxService(config *criconfig.Config, c *client.Client) *criSandboxService {
func newCriSandboxService(config *criconfig.Config, sandboxers map[string]sandbox.Controller) *criSandboxService {
return &criSandboxService{
cli: c,
sandboxControllers: sandboxers,
config: config,
}
}
func (c *criSandboxService) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) {
ociRuntime, err := c.config.GetSandboxRuntime(config, runtimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
func (c *criSandboxService) SandboxController(sandboxer string) (sandbox.Controller, error) {
sbController, ok := c.sandboxControllers[sandboxer]
if !ok {
return nil, fmt.Errorf("failed to get sandbox controller by %s", sandboxer)
}
return c.cli.SandboxController(ociRuntime.Sandboxer), nil
return sbController, nil
}
func (c *criSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
ctrl, err := c.SandboxController(info.Sandboxer)
if err != nil {
return err
}
return ctrl.Create(ctx, info, opts...)
}
func (c *criSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return sandbox.ControllerInstance{}, err
}
return ctrl.Start(ctx, sandboxID)
}
func (c *criSandboxService) WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan client.ExitStatus, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return nil, err
}
ch := make(chan client.ExitStatus, 1)
go func() {
defer close(ch)
exitStatus, err := ctrl.Wait(ctx, sandboxID)
if err != nil {
ch <- *client.NewExitStatus(client.UnknownExitStatus, time.Time{}, err)
return
}
ch <- *client.NewExitStatus(exitStatus.ExitStatus, exitStatus.ExitedAt, nil)
}()
return ch, nil
}
func (c *criSandboxService) SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return sandbox.ControllerStatus{}, err
}
return ctrl.Status(ctx, sandboxID, verbose)
}
func (c *criSandboxService) SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return platforms.Platform{}, err
}
return ctrl.Platform(ctx, sandboxID)
}
func (c *criSandboxService) ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return err
}
return ctrl.Shutdown(ctx, sandboxID)
}
func (c *criSandboxService) StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return err
}
return ctrl.Stop(ctx, sandboxID, opts...)
}

View File

@ -41,17 +41,12 @@ func (c *criService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandbox
return nil, fmt.Errorf("failed to get sandbox ip: %w", err)
}
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
var (
createdAt time.Time
state string
info map[string]string
)
cstatus, err := controller.Status(ctx, sandbox.ID, r.GetVerbose())
cstatus, err := c.sandboxService.SandboxStatus(ctx, sandbox.Sandboxer, sandbox.ID, r.GetVerbose())
if err != nil {
// If the shim died unexpectedly (segfault etc.) let's set the state as
// NOTREADY and not just error out to make k8s and clients like crictl

View File

@ -68,13 +68,7 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
// Only stop sandbox container when it's running or unknown.
state := sandbox.Status.Get().State
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
// Use sandbox controller to stop sandbox
controller, err := c.sandboxService.SandboxController(sandbox.Config, sandbox.RuntimeHandler)
if err != nil {
return fmt.Errorf("failed to get sandbox controller: %w", err)
}
if err := controller.Stop(ctx, id); err != nil {
if err := c.sandboxService.StopSandbox(ctx, sandbox.Sandboxer, id); err != nil {
// Log and ignore the error if controller already removed the sandbox
if errdefs.IsNotFound(err) {
log.G(ctx).Warnf("sandbox %q is not found when stopping it", id)

View File

@ -28,9 +28,9 @@ import (
"github.com/containerd/go-cni"
"github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/containerd/typeurl/v2"
"github.com/opencontainers/runtime-spec/specs-go/features"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubelet/pkg/cri/streaming"
@ -74,7 +74,14 @@ type CRIService interface {
}
type sandboxService interface {
SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error)
CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error
StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error)
WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan containerd.ExitStatus, error)
StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error
ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error
SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error)
SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error)
SandboxController(sandboxer string) (sandbox.Controller, error)
}
// RuntimeService specifies dependencies to runtime service which provides
@ -188,7 +195,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
netPlugin: make(map[string]cni.CNI),
sandboxService: newCriSandboxService(&config, options.Client),
sandboxService: newCriSandboxService(&config, options.SandboxControllers),
}
// TODO: Make discard time configurable
@ -231,8 +238,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
}
// Initialize pod sandbox controller
// TODO: Get this from options, NOT client
podSandboxController := options.Client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller)
podSandboxController := options.SandboxControllers[string(criconfig.ModePodSandbox)].(*podsandbox.Controller)
podSandboxController.Init(c)
c.nri = options.NRI

View File

@ -19,10 +19,12 @@ package server
import (
"context"
"github.com/containerd/errdefs"
"github.com/containerd/go-cni"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/platforms"
"github.com/containerd/containerd/v2/api/types"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox"
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
@ -32,13 +34,39 @@ import (
"github.com/containerd/containerd/v2/internal/registrar"
"github.com/containerd/containerd/v2/pkg/oci"
ostesting "github.com/containerd/containerd/v2/pkg/os/testing"
"github.com/containerd/errdefs"
"github.com/containerd/platforms"
)
type fakeSandboxService struct{}
func (f *fakeSandboxService) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) {
func (f *fakeSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
return errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) {
return sandbox.ControllerInstance{}, errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) StopSandbox(ctx context.Context, sandboxer, sandboxID string, opts ...sandbox.StopOpt) error {
return errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) ShutdownSandbox(ctx context.Context, sandboxer string, sandboxID string) error {
return errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) WaitSandbox(ctx context.Context, sandboxer string, sandboxID string) (<-chan containerd.ExitStatus, error) {
return nil, errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) SandboxStatus(ctx context.Context, sandboxer string, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) {
return sandbox.ControllerStatus{}, errdefs.ErrNotImplemented
}
func (f *fakeSandboxService) SandboxPlatform(ctx context.Context, sandboxer string, sandboxID string) (platforms.Platform, error) {
return platforms.DefaultSpec(), nil
}
func (f *fakeSandboxService) SandboxController(sandboxer string) (sandbox.Controller, error) {
return &fakeSandboxController{}, nil
}

View File

@ -37,6 +37,8 @@ type Sandbox struct {
Status StatusStorage
// Container is the containerd sandbox container client.
Container containerd.Container
// Sandboxer is the sandbox controller name of the sandbox
Sandboxer string
// CNI network namespace client.
// For hostnetwork pod, this is always nil;
// For non hostnetwork pod, this should never be nil.

View File

@ -24,6 +24,8 @@ import (
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox"
@ -37,10 +39,6 @@ import (
"github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/containerd/v2/version"
"github.com/containerd/platforms"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// Register CRI service plugin
@ -127,10 +125,9 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return nil, fmt.Errorf("failed to create containerd client: %w", err)
}
// TODO(dmcgowan): Get the full list directly from configured plugins
sbControllers := map[string]sandbox.Controller{
string(criconfig.ModePodSandbox): client.SandboxController(string(criconfig.ModePodSandbox)),
string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)),
sbControllers, err := getSandboxControllers(ic)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controllers from plugins %v", err)
}
streamingConfig, err := config.StreamingConfig()
@ -240,3 +237,15 @@ func getNRIAPI(ic *plugin.InitContext) *nri.API {
return nri.NewAPI(api)
}
func getSandboxControllers(ic *plugin.InitContext) (map[string]sandbox.Controller, error) {
sandboxers, err := ic.GetByType(plugins.SandboxControllerPlugin)
if err != nil {
return nil, err
}
sc := make(map[string]sandbox.Controller)
for name, p := range sandboxers {
sc[name] = p.(sandbox.Controller)
}
return sc, nil
}