sandbox: support more sandbox controllers

make containerd extensible to support more sandbox controllers
registered into containerd by config.
we change the default sandbox controller plugin's name from "local" to "shim".
to make sure we can get the controller by the plugin name it registered into
containerd.

Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
Abel Feng 2023-06-05 21:01:43 +08:00 committed by f00589305
parent 8b35976850
commit 2951fb6dc6
13 changed files with 191 additions and 78 deletions

View File

@ -719,9 +719,13 @@ func (c *Client) SandboxStore() sandbox.Store {
}
// SandboxController returns the underlying sandbox controller client
func (c *Client) SandboxController() sandbox.Controller {
if c.sandboxController != nil {
return c.sandboxController
func (c *Client) SandboxController(name string) sandbox.Controller {
// default sandboxer is shim
if len(name) == 0 {
name = "shim"
}
if c.sandboxers != nil {
return c.sandboxers[name]
}
c.connMu.Lock()
defer c.connMu.Unlock()

View File

@ -274,10 +274,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
containerd.WithContainerExtension(containerMetadataExtension, &meta),
)
// When using sandboxed shims, containerd's runtime needs to know which sandbox shim instance to use.
if ociRuntime.Sandboxer == string(criconfig.ModeShim) {
opts = append(opts, containerd.WithSandbox(sandboxID))
}
opts = append(opts, containerd.WithSandbox(sandboxID))
opts = append(opts, c.nri.WithContainerAdjustment())
defer func() {

View File

@ -34,10 +34,26 @@ import (
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
osinterface "github.com/containerd/containerd/pkg/os"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/plugin/registry"
"github.com/containerd/containerd/plugins"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
)
func init() {
registry.Register(&plugin.Registration{
Type: plugins.SandboxControllerPlugin,
ID: "podsandbox",
Requires: []plugin.Type{},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
// register the global controller to containerd plugin manager,
// the global controller will be initialized when cri plugin is initializing
return controller, nil
},
})
}
// CRIService interface contains things required by controller, but not yet refactored from criService.
// TODO: this will be removed in subsequent iterations.
type CRIService interface {
@ -53,6 +69,11 @@ type ImageService interface {
GetImage(id string) (imagestore.Image, error)
}
// As the dependency from this controller to cri plugin is hard to decouple,
// we define a global podsandbox controller and register it to containerd plugin manager first,
// we will initialize this controller when we initialize the cri plugin.
var controller = &Controller{}
type Controller struct {
// config contains all configurations.
config criconfig.Config
@ -72,7 +93,7 @@ type Controller struct {
store *Store
}
func New(
func Init(
config criconfig.Config,
client *containerd.Client,
sandboxStore *sandboxstore.Store,
@ -80,17 +101,15 @@ func New(
cri CRIService,
imageService ImageService,
baseOCISpecs map[string]*oci.Spec,
) *Controller {
return &Controller{
config: config,
client: client,
imageService: imageService,
sandboxStore: sandboxStore,
os: os,
cri: cri,
baseOCISpecs: baseOCISpecs,
store: NewStore(),
}
) {
controller.cri = cri
controller.client = client
controller.config = config
controller.sandboxStore = sandboxStore
controller.os = os
controller.baseOCISpecs = baseOCISpecs
controller.store = NewStore()
controller.imageService = imageService
}
var _ sandbox.Controller = (*Controller)(nil)

View File

@ -60,10 +60,7 @@ func (c *criService) recover(ctx context.Context) error {
return fmt.Errorf("failed to list sandbox containers: %w", err)
}
podSandboxController, ok := c.sandboxControllers[criconfig.ModePodSandbox]
if !ok {
log.G(ctx).Fatal("unable to restore pod sandboxes, no controller found")
}
podSandboxController := c.client.SandboxController(string(criconfig.ModePodSandbox))
podSandboxLoader, ok := podSandboxController.(podSandboxRecover)
if !ok {
@ -115,7 +112,7 @@ func (c *criService) recover(ctx context.Context) error {
var (
state = sandboxstore.StateUnknown
controller = c.sandboxControllers[criconfig.ModeShim]
controller = c.client.SandboxController(sbx.Sandboxer)
)
status, err := controller.Status(ctx, sbx.ID, false)

View File

@ -92,6 +92,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
}
sandboxInfo.Runtime.Name = ociRuntime.Type
sandboxInfo.Sandboxer = ociRuntime.Sandboxer
runtimeStart := time.Now()
// Retrieve runtime options
@ -683,25 +684,6 @@ func (c *criService) getSandboxRuntime(config *runtime.PodSandboxConfig, runtime
return handler, nil
}
// getSandboxController returns the sandbox controller configuration for sandbox.
// If absent in legacy case, it will return the default controller.
func (c *criService) getSandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sb.Controller, error) {
ociRuntime, err := c.getSandboxRuntime(config, runtimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
}
// Validate mode
if err = ValidateMode(ociRuntime.Sandboxer); err != nil {
return nil, err
}
// Use sandbox controller to delete sandbox
controller, exist := c.sandboxControllers[criconfig.SandboxControllerMode(ociRuntime.Sandboxer)]
if !exist {
return nil, fmt.Errorf("sandbox controller %s not exist", ociRuntime.Sandboxer)
}
return controller, nil
}
func logDebugCNIResult(ctx context.Context, sandboxID string, result *cni.Result) {
if log.GetLevel() < log.DebugLevel {
return

View File

@ -98,9 +98,6 @@ type criService struct {
sandboxNameIndex *registrar.Registrar
// containerStore stores all resources associated with containers.
containerStore *containerstore.Store
// sandboxControllers contains different sandbox controller type,
// every controller controls sandbox lifecycle (and hides implementation details behind).
sandboxControllers map[criconfig.SandboxControllerMode]sandbox.Controller
// containerNameIndex stores all container names and make sure each
// name is unique.
containerNameIndex *registrar.Registrar
@ -159,7 +156,6 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
netPlugin: make(map[string]cni.CNI),
sandboxControllers: make(map[criconfig.SandboxControllerMode]sandbox.Controller),
}
// TODO: figure out a proper channel size.
@ -200,9 +196,8 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.
return nil, err
}
// Load all sandbox controllers(pod sandbox controller and remote shim controller)
c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, imageService, c.baseOCISpecs)
c.sandboxControllers[criconfig.ModeShim] = client.SandboxController()
// init the global podsandbox controller
podsandbox.Init(config, client, c.sandboxStore, c.os, c, c.imageService, c.baseOCISpecs)
c.nri = nri
@ -347,6 +342,17 @@ func (c *criService) register(s *grpc.Server) error {
return nil
}
// getSandboxController returns the sandbox controller configuration for sandbox.
// If absent in legacy case, it will return the default controller.
func (c *criService) getSandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) {
ociRuntime, err := c.getSandboxRuntime(config, runtimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
}
return c.client.SandboxController(ociRuntime.Sandboxer), nil
}
// imageFSPath returns containerd image filesystem path.
// Note that if containerd changes directory layout, we also needs to change this.
func imageFSPath(rootDir, snapshotter string) string {

View File

@ -42,7 +42,7 @@ import (
func init() {
registry.Register(&plugin.Registration{
Type: plugins.SandboxControllerPlugin,
ID: "local",
ID: "shim",
Requires: []plugin.Type{
plugins.RuntimePluginV2,
plugins.EventPlugin,

View File

@ -81,7 +81,7 @@ func (s *sandboxClient) Labels(ctx context.Context) (map[string]string, error) {
}
func (s *sandboxClient) Start(ctx context.Context) error {
resp, err := s.client.SandboxController().Start(ctx, s.ID())
resp, err := s.client.SandboxController(s.metadata.Sandboxer).Start(ctx, s.ID())
if err != nil {
return err
}
@ -95,7 +95,7 @@ func (s *sandboxClient) Wait(ctx context.Context) (<-chan ExitStatus, error) {
go func() {
defer close(c)
exitStatus, err := s.client.SandboxController().Wait(ctx, s.ID())
exitStatus, err := s.client.SandboxController(s.metadata.Sandboxer).Wait(ctx, s.ID())
if err != nil {
c <- ExitStatus{
code: UnknownExitStatus,
@ -114,11 +114,11 @@ func (s *sandboxClient) Wait(ctx context.Context) (<-chan ExitStatus, error) {
}
func (s *sandboxClient) Stop(ctx context.Context) error {
return s.client.SandboxController().Stop(ctx, s.ID())
return s.client.SandboxController(s.metadata.Sandboxer).Stop(ctx, s.ID())
}
func (s *sandboxClient) Shutdown(ctx context.Context) error {
if err := s.client.SandboxController().Shutdown(ctx, s.ID()); err != nil {
if err := s.client.SandboxController(s.metadata.Sandboxer).Shutdown(ctx, s.ID()); err != nil {
return fmt.Errorf("failed to shutdown sandbox: %w", err)
}
@ -166,7 +166,7 @@ func (c *Client) LoadSandbox(ctx context.Context, id string) (Sandbox, error) {
return nil, err
}
status, err := c.SandboxController().Status(ctx, id, false)
status, err := c.SandboxController(sandbox.Sandboxer).Status(ctx, id, false)
if err != nil {
return nil, fmt.Errorf("failed to load sandbox %s, status request failed: %w", id, err)
}

View File

@ -50,7 +50,7 @@ type services struct {
leasesService leases.Manager
introspectionService introspection.Service
sandboxStore sandbox.Store
sandboxController sandbox.Controller
sandboxers map[string]sandbox.Controller
}
// ServicesOpt allows callers to set options on the services
@ -87,6 +87,16 @@ func WithSnapshotters(snapshotters map[string]snapshots.Snapshotter) ServicesOpt
}
}
// WithSandboxers sets the sandbox controllers.
func WithSandboxers(sandboxers map[string]sandbox.Controller) ServicesOpt {
return func(s *services) {
s.sandboxers = make(map[string]sandbox.Controller)
for n, sn := range sandboxers {
s.sandboxers[n] = sn
}
}
}
// WithContainerClient sets the container service to use using a containers client.
func WithContainerClient(containerService containersapi.ContainersClient) ServicesOpt {
return func(s *services) {
@ -171,13 +181,6 @@ func WithSandboxStore(client sandbox.Store) ServicesOpt {
}
}
// WithSandboxController sets the sandbox controller.
func WithSandboxController(client sandbox.Controller) ServicesOpt {
return func(s *services) {
s.sandboxController = client
}
}
// WithInMemoryServices is suitable for cases when there is need to use containerd's client from
// another (in-memory) containerd plugin (such as CRI).
func WithInMemoryServices(ic *plugin.InitContext) ClientOpt {
@ -193,9 +196,6 @@ func WithInMemoryServices(ic *plugin.InitContext) ClientOpt {
plugins.SandboxStorePlugin: func(i interface{}) ServicesOpt {
return WithSandboxStore(i.(sandbox.Store))
},
plugins.SandboxControllerPlugin: func(i interface{}) ServicesOpt {
return WithSandboxController(i.(sandbox.Controller))
},
} {
i, err := ic.Get(t)
if err != nil {
@ -218,6 +218,9 @@ func WithInMemoryServices(ic *plugin.InitContext) ClientOpt {
srv.SnapshotsService: func(s interface{}) ServicesOpt {
return WithSnapshotters(s.(map[string]snapshots.Snapshotter))
},
srv.SandboxControllersService: func(s interface{}) ServicesOpt {
return WithSandboxers(s.(map[string]sandbox.Controller))
},
srv.ContainersService: func(s interface{}) ServicesOpt {
return WithContainerClient(s.(containersapi.ContainersClient))
},

View File

@ -18,6 +18,9 @@ package sandbox
import (
"context"
"errors"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
@ -31,6 +34,7 @@ import (
"github.com/containerd/containerd/plugins"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
"github.com/containerd/containerd/services"
"github.com/containerd/log"
)
@ -39,22 +43,30 @@ func init() {
Type: plugins.GRPCPlugin,
ID: "sandbox-controllers",
Requires: []plugin.Type{
plugins.SandboxControllerPlugin,
plugins.ServicePlugin,
plugins.EventPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
sc, err := ic.GetByID(plugins.SandboxControllerPlugin, "local")
plugs, err := ic.GetByType(plugins.ServicePlugin)
if err != nil {
return nil, err
}
p, ok := plugs[services.SandboxControllersService]
if !ok {
return nil, errors.New("sandboxes service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
sc := i.(map[string]sandbox.Controller)
ep, err := ic.Get(plugins.EventPlugin)
if err != nil {
return nil, err
}
return &controllerService{
local: sc.(sandbox.Controller),
sc: sc,
publisher: ep.(events.Publisher),
}, nil
},
@ -62,7 +74,7 @@ func init() {
}
type controllerService struct {
local sandbox.Controller
sc map[string]sandbox.Controller
publisher events.Publisher
api.UnimplementedControllerServer
}
@ -74,10 +86,24 @@ func (s *controllerService) Register(server *grpc.Server) error {
return nil
}
func (s *controllerService) getController(name string) (sandbox.Controller, error) {
if len(name) == 0 {
return nil, fmt.Errorf("%w: sandbox controller name can not be empty", errdefs.ErrInvalidArgument)
}
if ctrl, ok := s.sc[name]; ok {
return ctrl, nil
}
return nil, fmt.Errorf("%w: failed to get sandbox controller by %s", errdefs.ErrNotFound, name)
}
func (s *controllerService) Create(ctx context.Context, req *api.ControllerCreateRequest) (*api.ControllerCreateResponse, error) {
log.G(ctx).WithField("req", req).Debug("create sandbox")
// TODO: Rootfs
err := s.local.Create(ctx, req.GetSandboxID(), sandbox.WithOptions(req.GetOptions()))
ctrl, err := s.getController(req.Sandboxer)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
err = ctrl.Create(ctx, req.GetSandboxID(), sandbox.WithOptions(req.GetOptions()))
if err != nil {
return &api.ControllerCreateResponse{}, errdefs.ToGRPC(err)
}
@ -95,7 +121,11 @@ func (s *controllerService) Create(ctx context.Context, req *api.ControllerCreat
func (s *controllerService) Start(ctx context.Context, req *api.ControllerStartRequest) (*api.ControllerStartResponse, error) {
log.G(ctx).WithField("req", req).Debug("start sandbox")
inst, err := s.local.Start(ctx, req.GetSandboxID())
ctrl, err := s.getController(req.Sandboxer)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
inst, err := ctrl.Start(ctx, req.GetSandboxID())
if err != nil {
return &api.ControllerStartResponse{}, errdefs.ToGRPC(err)
}
@ -116,12 +146,20 @@ func (s *controllerService) Start(ctx context.Context, req *api.ControllerStartR
func (s *controllerService) Stop(ctx context.Context, req *api.ControllerStopRequest) (*api.ControllerStopResponse, error) {
log.G(ctx).WithField("req", req).Debug("delete sandbox")
return &api.ControllerStopResponse{}, errdefs.ToGRPC(s.local.Stop(ctx, req.GetSandboxID()))
ctrl, err := s.getController(req.Sandboxer)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.ControllerStopResponse{}, errdefs.ToGRPC(ctrl.Stop(ctx, req.GetSandboxID(), sandbox.WithTimeout(time.Duration(req.TimeoutSecs)*time.Second)))
}
func (s *controllerService) Wait(ctx context.Context, req *api.ControllerWaitRequest) (*api.ControllerWaitResponse, error) {
log.G(ctx).WithField("req", req).Debug("wait sandbox")
exitStatus, err := s.local.Wait(ctx, req.GetSandboxID())
ctrl, err := s.getController(req.Sandboxer)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
exitStatus, err := ctrl.Wait(ctx, req.GetSandboxID())
if err != nil {
return &api.ControllerWaitResponse{}, errdefs.ToGRPC(err)
}
@ -142,7 +180,11 @@ func (s *controllerService) Wait(ctx context.Context, req *api.ControllerWaitReq
func (s *controllerService) Status(ctx context.Context, req *api.ControllerStatusRequest) (*api.ControllerStatusResponse, error) {
log.G(ctx).WithField("req", req).Debug("sandbox status")
cstatus, err := s.local.Status(ctx, req.GetSandboxID(), req.GetVerbose())
ctrl, err := s.getController(req.Sandboxer)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
cstatus, err := ctrl.Status(ctx, req.GetSandboxID(), req.GetVerbose())
if err != nil {
return &api.ControllerStatusResponse{}, errdefs.ToGRPC(err)
}
@ -166,7 +208,11 @@ func (s *controllerService) Status(ctx context.Context, req *api.ControllerStatu
func (s *controllerService) Shutdown(ctx context.Context, req *api.ControllerShutdownRequest) (*api.ControllerShutdownResponse, error) {
log.G(ctx).WithField("req", req).Debug("shutdown sandbox")
return &api.ControllerShutdownResponse{}, errdefs.ToGRPC(s.local.Shutdown(ctx, req.GetSandboxID()))
ctrl, err := s.getController(req.Sandboxer)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.ControllerShutdownResponse{}, errdefs.ToGRPC(ctrl.Shutdown(ctx, req.GetSandboxID()))
}
func (s *controllerService) Metrics(ctx context.Context, req *api.ControllerMetricsRequest) (*api.ControllerMetricsResponse, error) {

View File

@ -0,0 +1,50 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sandbox
import (
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/plugin/registry"
"github.com/containerd/containerd/plugins"
"github.com/containerd/containerd/sandbox"
"github.com/containerd/containerd/services"
)
func init() {
registry.Register(&plugin.Registration{
Type: plugins.ServicePlugin,
ID: services.SandboxControllersService,
Requires: []plugin.Type{
plugins.SandboxControllerPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
sandboxesRaw, err := ic.GetByType(plugins.SandboxControllerPlugin)
if err != nil {
return nil, err
}
sandboxers := make(map[string]sandbox.Controller)
for name, srv := range sandboxesRaw {
inst, err := srv.Instance()
if err != nil {
return nil, err
}
sandboxers[name] = inst.(sandbox.Controller)
}
return sandboxers, nil
},
})
}

View File

@ -36,6 +36,7 @@ import (
csapi "github.com/containerd/containerd/api/services/content/v1"
diffapi "github.com/containerd/containerd/api/services/diff/v1"
sbapi "github.com/containerd/containerd/api/services/sandbox/v1"
ssapi "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/content/local"
csproxy "github.com/containerd/containerd/content/proxy"
@ -49,6 +50,7 @@ import (
"github.com/containerd/containerd/plugin/dynamic"
"github.com/containerd/containerd/plugin/registry"
"github.com/containerd/containerd/plugins"
sbproxy "github.com/containerd/containerd/sandbox/proxy"
srvconfig "github.com/containerd/containerd/services/server/config"
ssproxy "github.com/containerd/containerd/snapshots/proxy"
"github.com/containerd/containerd/sys"
@ -474,6 +476,11 @@ func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Regist
f = func(conn *grpc.ClientConn) interface{} {
return csproxy.NewContentStore(csapi.NewContentClient(conn))
}
case string(plugins.SandboxControllerPlugin), "sandbox":
t = plugins.SandboxControllerPlugin
f = func(conn *grpc.ClientConn) interface{} {
return sbproxy.NewSandboxController(sbapi.NewControllerClient(conn))
}
case string(plugins.DiffPlugin), "diff":
t = plugins.DiffPlugin
f = func(conn *grpc.ClientConn) interface{} {

View File

@ -21,6 +21,8 @@ const (
ContentService = "content-service"
// SnapshotsService is id of snapshots service.
SnapshotsService = "snapshots-service"
// SandboxControllersService is id of snapshots service.
SandboxControllersService = "sandboxes-service"
// ImagesService is id of images service.
ImagesService = "images-service"
// ContainersService is id of containers service.