From 9795677fe92368d5fc6b0967acc89571cf7bddc0 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 23 Jan 2024 21:53:37 -0800 Subject: [PATCH 1/5] Move cri base plugin to CRI runtime service Create new plugin type for CRI runtime and image services. Signed-off-by: Derek McGowan --- cmd/containerd/builtins/cri.go | 1 + contrib/fuzz/builtins.go | 1 + contrib/fuzz/cri_server_fuzzer.go | 20 +++++-- .../build_local_containerd_helper_test.go | 1 + pkg/cri/config/config.go | 8 ++- pkg/cri/config/config_unix.go | 1 - pkg/cri/config/config_windows.go | 1 - pkg/cri/cri.go | 49 ++++++++++++----- pkg/cri/server/container_create.go | 6 +-- pkg/cri/server/container_create_linux_test.go | 25 ++++----- pkg/cri/server/container_create_test.go | 19 ++++--- pkg/cri/server/podsandbox/controller.go | 36 +++++++------ pkg/cri/server/podsandbox/helpers.go | 6 +-- pkg/cri/server/service.go | 23 +++++--- pkg/cri/server/service_test.go | 42 +++++++++++++-- .../{server/base => types}/sandbox_info.go | 0 plugins/cri/images/plugin.go | 11 ++-- .../cri/runtime/load_test.go | 2 +- .../cri/runtime/plugin.go | 53 ++++++++++++------- plugins/types.go | 4 +- 20 files changed, 205 insertions(+), 104 deletions(-) rename pkg/cri/{server/base => types}/sandbox_info.go (100%) rename pkg/cri/server/base/cri_base_test.go => plugins/cri/runtime/load_test.go (98%) rename pkg/cri/server/base/cri_base.go => plugins/cri/runtime/plugin.go (85%) diff --git a/cmd/containerd/builtins/cri.go b/cmd/containerd/builtins/cri.go index 3673889d3..5c88cb9e4 100644 --- a/cmd/containerd/builtins/cri.go +++ b/cmd/containerd/builtins/cri.go @@ -21,4 +21,5 @@ package builtins import ( _ "github.com/containerd/containerd/v2/pkg/cri" _ "github.com/containerd/containerd/v2/plugins/cri/images" + _ "github.com/containerd/containerd/v2/plugins/cri/runtime" ) diff --git a/contrib/fuzz/builtins.go b/contrib/fuzz/builtins.go index 2b7c7fe12..6c4888748 100644 --- a/contrib/fuzz/builtins.go +++ b/contrib/fuzz/builtins.go @@ -23,6 +23,7 @@ import ( _ "github.com/containerd/containerd/v2/pkg/events/plugin" _ "github.com/containerd/containerd/v2/pkg/nri/plugin" _ "github.com/containerd/containerd/v2/plugins/cri/images" + _ "github.com/containerd/containerd/v2/plugins/cri/runtime" _ "github.com/containerd/containerd/v2/plugins/diff/walking/plugin" _ "github.com/containerd/containerd/v2/plugins/gc" _ "github.com/containerd/containerd/v2/plugins/imageverifier" diff --git a/contrib/fuzz/cri_server_fuzzer.go b/contrib/fuzz/cri_server_fuzzer.go index adb9a388b..08cf4a645 100644 --- a/contrib/fuzz/cri_server_fuzzer.go +++ b/contrib/fuzz/cri_server_fuzzer.go @@ -29,6 +29,7 @@ import ( "github.com/containerd/containerd/v2/pkg/cri/server" "github.com/containerd/containerd/v2/pkg/cri/server/images" "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/errdefs" ) func FuzzCRIServer(data []byte) int { @@ -42,7 +43,6 @@ func FuzzCRIServer(data []byte) int { } defer client.Close() - config := criconfig.Config{} imageConfig := criconfig.ImageConfig{} imageService, err := images.NewService(imageConfig, &images.CRIImageServiceOptions{ @@ -52,10 +52,10 @@ func FuzzCRIServer(data []byte) int { panic(err) } - c, rs, err := server.NewCRIService(config, &server.CRIServiceOptions{ - ImageService: imageService, - Client: client, - BaseOCISpecs: map[string]*oci.Spec{}, + c, rs, err := server.NewCRIService(&server.CRIServiceOptions{ + RuntimeService: &fakeRuntimeService{}, + ImageService: imageService, + Client: client, }) if err != nil { panic(err) @@ -68,6 +68,16 @@ func FuzzCRIServer(data []byte) int { }) } +type fakeRuntimeService struct{} + +func (fakeRuntimeService) Config() criconfig.Config { + return criconfig.Config{} +} + +func (fakeRuntimeService) LoadOCISpec(string) (*oci.Spec, error) { + return nil, errdefs.ErrNotFound +} + type service struct { server.CRIService runtime.RuntimeServiceServer diff --git a/integration/build_local_containerd_helper_test.go b/integration/build_local_containerd_helper_test.go index f9577485e..0a4471df6 100644 --- a/integration/build_local_containerd_helper_test.go +++ b/integration/build_local_containerd_helper_test.go @@ -38,6 +38,7 @@ import ( _ "github.com/containerd/containerd/v2/core/runtime/v2/runc/options" _ "github.com/containerd/containerd/v2/pkg/events/plugin" _ "github.com/containerd/containerd/v2/plugins/cri/images" + _ "github.com/containerd/containerd/v2/plugins/cri/runtime" _ "github.com/containerd/containerd/v2/plugins/diff/walking/plugin" _ "github.com/containerd/containerd/v2/plugins/gc" _ "github.com/containerd/containerd/v2/plugins/leases" diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 1ae4f1341..15617b1f0 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -319,8 +319,6 @@ type PluginConfig struct { ContainerdConfig `toml:"containerd" json:"containerd"` // CniConfig contains config related to cni CniConfig `toml:"cni" json:"cni"` - // DisableTCPService disables serving CRI on the TCP server. - DisableTCPService bool `toml:"disable_tcp_service" json:"disableTCPService"` // StreamServerAddress is the ip address streaming server is listening on. StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"` // StreamServerPort is the port streaming server is listening on. @@ -433,6 +431,12 @@ type Config struct { StateDir string `json:"stateDir"` } +// ServiceConfig contains all the configuration for the CRI API server. +type ServiceConfig struct { + // DisableTCPService disables serving CRI on the TCP server. + DisableTCPService bool `toml:"disable_tcp_service" json:"disableTCPService"` +} + const ( // RuntimeUntrusted is the implicit runtime defined for ContainerdConfig.UntrustedWorkloadRuntime RuntimeUntrusted = "untrusted" diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index 7a0405566..c04651ecc 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -89,7 +89,6 @@ func DefaultConfig() PluginConfig { }, }, }, - DisableTCPService: true, StreamServerAddress: "127.0.0.1", StreamServerPort: "0", StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index 9c2eeac15..db499a074 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -78,7 +78,6 @@ func DefaultConfig() PluginConfig { }, }, }, - DisableTCPService: true, StreamServerAddress: "127.0.0.1", StreamServerPort: "0", StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour diff --git a/pkg/cri/cri.go b/pkg/cri/cri.go index 3488b9915..a9071e40a 100644 --- a/pkg/cri/cri.go +++ b/pkg/cri/cri.go @@ -17,6 +17,7 @@ package cri import ( + "context" "fmt" "io" @@ -25,13 +26,13 @@ import ( "github.com/containerd/plugin/registry" containerd "github.com/containerd/containerd/v2/client" + srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config" "github.com/containerd/containerd/v2/core/sandbox" criconfig "github.com/containerd/containerd/v2/pkg/cri/config" "github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/pkg/cri/instrument" "github.com/containerd/containerd/v2/pkg/cri/nri" "github.com/containerd/containerd/v2/pkg/cri/server" - "github.com/containerd/containerd/v2/pkg/cri/server/base" nriservice "github.com/containerd/containerd/v2/pkg/nri" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/platforms" @@ -43,13 +44,11 @@ import ( // Register CRI service plugin func init() { - registry.Register(&plugin.Registration{ Type: plugins.GRPCPlugin, ID: "cri", Requires: []plugin.Type{ - plugins.CRIImagePlugin, - plugins.InternalPlugin, + plugins.CRIServicePlugin, plugins.SandboxControllerPlugin, plugins.NRIApiPlugin, plugins.EventPlugin, @@ -58,23 +57,46 @@ func init() { plugins.SandboxStorePlugin, plugins.TransferPlugin, }, + Config: &criconfig.ServiceConfig{ + DisableTCPService: true, + }, + ConfigMigration: func(ctx context.Context, version int, pluginConfigs map[string]interface{}) error { + if version >= srvconfig.CurrentConfigVersion { + return nil + } + const pluginName = string(plugins.GRPCPlugin) + ".cri" + original, ok := pluginConfigs[pluginName] + if !ok { + return nil + } + src := original.(map[string]interface{}) + + // Currently only a single key migrated + if val, ok := src["disable_tcp_service"]; ok { + pluginConfigs[pluginName] = map[string]interface{}{ + "disable_tcp_service": val, + } + } else { + delete(pluginConfigs, pluginName) + } + return nil + }, InitFn: initCRIService, }) } func initCRIService(ic *plugin.InitContext) (interface{}, error) { ctx := ic.Context + config := ic.Config.(*criconfig.ServiceConfig) - // Get base CRI dependencies. - criBasePlugin, err := ic.GetByID(plugins.InternalPlugin, "cri") + // Get runtime service. + criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime") if err != nil { - return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err) + return nil, fmt.Errorf("unable to load CRI runtime service plugin dependency: %w", err) } - criBase := criBasePlugin.(*base.CRIBase) - c := criBase.Config // Get image service. - criImagePlugin, err := ic.GetSingle(plugins.CRIImagePlugin) + criImagePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "images") if err != nil { return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err) } @@ -98,15 +120,16 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { } options := &server.CRIServiceOptions{ + RuntimeService: criRuntimePlugin.(server.RuntimeService), ImageService: criImagePlugin.(server.ImageService), NRI: getNRIAPI(ic), Client: client, SandboxControllers: sbControllers, - BaseOCISpecs: criBase.BaseOCISpecs, } is := criImagePlugin.(imageService).GRPCService() - s, rs, err := server.NewCRIService(criBase.Config, options) + // TODO: More options specifically for grpc service? + s, rs, err := server.NewCRIService(options) if err != nil { return nil, fmt.Errorf("failed to create CRI service: %w", err) } @@ -127,7 +150,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { initializer: s, } - if c.DisableTCPService { + if config.DisableTCPService { return service, nil } diff --git a/pkg/cri/server/container_create.go b/pkg/cri/server/container_create.go index 272096aaf..b922bc976 100644 --- a/pkg/cri/server/container_create.go +++ b/pkg/cri/server/container_create.go @@ -394,9 +394,9 @@ func (c *criService) runtimeSpec(id string, platform platforms.Platform, baseSpe container := &containers.Container{ID: id} if baseSpecFile != "" { - baseSpec, ok := c.baseOCISpecs[baseSpecFile] - if !ok { - return nil, fmt.Errorf("can't find base OCI spec %q", baseSpecFile) + baseSpec, err := c.LoadOCISpec(baseSpecFile) + if err != nil { + return nil, fmt.Errorf("can't load base OCI spec %q: %w", baseSpecFile, err) } spec := oci.Spec{} diff --git a/pkg/cri/server/container_create_linux_test.go b/pkg/cri/server/container_create_linux_test.go index f106d327c..e46b00df1 100644 --- a/pkg/cri/server/container_create_linux_test.go +++ b/pkg/cri/server/container_create_linux_test.go @@ -1680,23 +1680,24 @@ func TestPrivilegedDevices(t *testing.T) { } func TestBaseOCISpec(t *testing.T) { - c := newTestCRIService() baseLimit := int64(100) - c.baseOCISpecs = map[string]*oci.Spec{ - "/etc/containerd/cri-base.json": { - Process: &runtimespec.Process{ - User: runtimespec.User{AdditionalGids: []uint32{9999}}, - Capabilities: &runtimespec.LinuxCapabilities{ - Permitted: []string{"CAP_SETUID"}, + c := newTestCRIService(withRuntimeService(&fakeRuntimeService{ + ocispecs: map[string]*oci.Spec{ + "/etc/containerd/cri-base.json": { + Process: &runtimespec.Process{ + User: runtimespec.User{AdditionalGids: []uint32{9999}}, + Capabilities: &runtimespec.LinuxCapabilities{ + Permitted: []string{"CAP_SETUID"}, + }, }, - }, - Linux: &runtimespec.Linux{ - Resources: &runtimespec.LinuxResources{ - Memory: &runtimespec.LinuxMemory{Limit: &baseLimit}, // Will be overwritten by `getCreateContainerTestData` + Linux: &runtimespec.Linux{ + Resources: &runtimespec.LinuxResources{ + Memory: &runtimespec.LinuxMemory{Limit: &baseLimit}, // Will be overwritten by `getCreateContainerTestData` + }, }, }, }, - } + })) ociRuntime := config.Runtime{} ociRuntime.BaseRuntimeSpec = "/etc/containerd/cri-base.json" diff --git a/pkg/cri/server/container_create_test.go b/pkg/cri/server/container_create_test.go index f0f93fcbb..fa5f6bbf7 100644 --- a/pkg/cri/server/container_create_test.go +++ b/pkg/cri/server/container_create_test.go @@ -524,13 +524,14 @@ func TestContainerAnnotationPassthroughContainerSpec(t *testing.T) { } func TestBaseRuntimeSpec(t *testing.T) { - c := newTestCRIService() - c.baseOCISpecs = map[string]*oci.Spec{ - "/etc/containerd/cri-base.json": { - Version: "1.0.2", - Hostname: "old", + c := newTestCRIService(withRuntimeService(&fakeRuntimeService{ + ocispecs: map[string]*oci.Spec{ + "/etc/containerd/cri-base.json": { + Version: "1.0.2", + Hostname: "old", + }, }, - } + })) out, err := c.runtimeSpec( "id1", @@ -546,8 +547,10 @@ func TestBaseRuntimeSpec(t *testing.T) { assert.Equal(t, "new-domain", out.Domainname) // Make sure original base spec not changed - assert.NotEqual(t, out, c.baseOCISpecs["/etc/containerd/cri-base.json"]) - assert.Equal(t, c.baseOCISpecs["/etc/containerd/cri-base.json"].Hostname, "old") + spec, err := c.LoadOCISpec("/etc/containerd/cri-base.json") + assert.NoError(t, err) + assert.NotEqual(t, out, spec) + assert.Equal(t, spec.Hostname, "old") assert.Equal(t, filepath.Join("/", constants.K8sContainerdNamespace, "id1"), out.Linux.CgroupsPath) } diff --git a/pkg/cri/server/podsandbox/controller.go b/pkg/cri/server/podsandbox/controller.go index 6ab044112..604b5168d 100644 --- a/pkg/cri/server/podsandbox/controller.go +++ b/pkg/cri/server/podsandbox/controller.go @@ -31,7 +31,6 @@ import ( "github.com/containerd/containerd/v2/core/sandbox" criconfig "github.com/containerd/containerd/v2/pkg/cri/config" "github.com/containerd/containerd/v2/pkg/cri/constants" - "github.com/containerd/containerd/v2/pkg/cri/server/base" "github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" @@ -51,8 +50,7 @@ func init() { plugins.EventPlugin, plugins.LeasePlugin, plugins.SandboxStorePlugin, - plugins.InternalPlugin, - plugins.CRIImagePlugin, + plugins.CRIServicePlugin, plugins.ServicePlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { @@ -66,26 +64,26 @@ func init() { return nil, fmt.Errorf("unable to init client for podsandbox: %w", err) } - // Get base CRI dependencies. - criBasePlugin, err := ic.GetByID(plugins.InternalPlugin, "cri") + // Get runtime service. + criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime") if err != nil { - return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err) + return nil, fmt.Errorf("unable to load CRI runtime service plugin dependency: %w", err) } - criBase := criBasePlugin.(*base.CRIBase) + runtimeService := criRuntimePlugin.(RuntimeService) // Get image service. - criImagePlugin, err := ic.GetSingle(plugins.CRIImagePlugin) + criImagePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "images") if err != nil { return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err) } c := Controller{ - client: client, - config: criBase.Config, - os: osinterface.RealOS{}, - baseOCISpecs: criBase.BaseOCISpecs, - imageService: criImagePlugin.(ImageService), - store: NewStore(), + client: client, + config: runtimeService.Config(), + os: osinterface.RealOS{}, + runtimeService: runtimeService, + imageService: criImagePlugin.(ImageService), + store: NewStore(), } return &c, nil }, @@ -99,6 +97,12 @@ type CRIService interface { BackOffEvent(id string, event interface{}) } +// RuntimeService specifies dependencies to CRI runtime service. +type RuntimeService interface { + Config() criconfig.Config + LoadOCISpec(string) (*oci.Spec, error) +} + // ImageService specifies dependencies to CRI image service. type ImageService interface { LocalResolve(refOrID string) (imagestore.Image, error) @@ -113,14 +117,14 @@ type Controller struct { config criconfig.Config // client is an instance of the containerd client client *containerd.Client + // runtimeService is a dependency to CRI runtime service. + runtimeService RuntimeService // imageService is a dependency to CRI image service. imageService ImageService // os is an interface for all required os operations. os osinterface.OS // cri is CRI service that provides missing gaps needed by controller. cri CRIService - // baseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec` - baseOCISpecs map[string]*oci.Spec store *Store } diff --git a/pkg/cri/server/podsandbox/helpers.go b/pkg/cri/server/podsandbox/helpers.go index 3fa231181..67b380d3a 100644 --- a/pkg/cri/server/podsandbox/helpers.go +++ b/pkg/cri/server/podsandbox/helpers.go @@ -159,9 +159,9 @@ func (c *Controller) runtimeSpec(id string, baseSpecFile string, opts ...oci.Spe container := &containers.Container{ID: id} if baseSpecFile != "" { - baseSpec, ok := c.baseOCISpecs[baseSpecFile] - if !ok { - return nil, fmt.Errorf("can't find base OCI spec %q", baseSpecFile) + baseSpec, err := c.runtimeService.LoadOCISpec(baseSpecFile) + if err != nil { + return nil, fmt.Errorf("can't load base OCI spec %q: %w", baseSpecFile, err) } spec := oci.Spec{} diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index 708088b92..dc1bbc23a 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -65,6 +65,15 @@ type sandboxService interface { SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) } +// RuntimeService specifies dependencies to runtime service which provides +// the runtime configuration and OCI spec loading. +type RuntimeService interface { + Config() criconfig.Config + + // LoadCISpec loads cached OCI specs via `Runtime.BaseRuntimeSpec` + LoadOCISpec(string) (*oci.Spec, error) +} + // ImageService specifies dependencies to image service. type ImageService interface { RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string @@ -84,6 +93,7 @@ type ImageService interface { // criService implements CRIService. type criService struct { + RuntimeService ImageService // config contains all configurations. config criconfig.Config @@ -115,8 +125,6 @@ type criService struct { // cniNetConfMonitor is used to reload cni network conf if there is // any valid fs change events from cni network conf dir. cniNetConfMonitor map[string]*cniNetConfSyncer - // baseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec` - baseOCISpecs map[string]*oci.Spec // allCaps is the list of the capabilities. // When nil, parsed from CapEff of /proc/self/status. allCaps []string //nolint:nolintlint,unused // Ignore on non-Linux @@ -130,6 +138,8 @@ type criService struct { } type CRIServiceOptions struct { + RuntimeService RuntimeService + ImageService ImageService NRI *nri.API @@ -137,9 +147,6 @@ type CRIServiceOptions struct { // SandboxControllers is a map of all the loaded sandbox controllers SandboxControllers map[string]sandbox.Controller - // BaseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec` - BaseOCISpecs map[string]*oci.Spec - // Client is the base containerd client used for accessing services, // // TODO: Replace this gradually with directly configured instances @@ -147,18 +154,18 @@ type CRIServiceOptions struct { } // NewCRIService returns a new instance of CRIService -// TODO: Add criBase.BaseOCISpecs to options -func NewCRIService(config criconfig.Config, options *CRIServiceOptions) (CRIService, runtime.RuntimeServiceServer, error) { +func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServiceServer, error) { var err error labels := label.NewStore() + config := options.RuntimeService.Config() c := &criService{ + RuntimeService: options.RuntimeService, ImageService: options.ImageService, config: config, client: options.Client, imageFSPaths: options.ImageService.ImageFSPaths(), os: osinterface.RealOS{}, - baseOCISpecs: options.BaseOCISpecs, sandboxStore: sandboxstore.NewStore(labels), containerStore: containerstore.NewStore(labels), sandboxNameIndex: registrar.NewRegistrar(), diff --git a/pkg/cri/server/service_test.go b/pkg/cri/server/service_test.go index 4d8cc047b..f6d2c1d5f 100644 --- a/pkg/cri/server/service_test.go +++ b/pkg/cri/server/service_test.go @@ -25,10 +25,12 @@ import ( "github.com/containerd/containerd/v2/api/types" "github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/internal/registrar" + criconfig "github.com/containerd/containerd/v2/pkg/cri/config" containerstore "github.com/containerd/containerd/v2/pkg/cri/store/container" "github.com/containerd/containerd/v2/pkg/cri/store/label" sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" servertesting "github.com/containerd/containerd/v2/pkg/cri/testing" + "github.com/containerd/containerd/v2/pkg/oci" ostesting "github.com/containerd/containerd/v2/pkg/os/testing" "github.com/containerd/errdefs" "github.com/containerd/platforms" @@ -74,11 +76,34 @@ func (f fakeSandboxController) Metrics(ctx context.Context, sandboxID string) (* return &types.Metric{}, errdefs.ErrNotImplemented } +type fakeRuntimeService struct { + ocispecs map[string]*oci.Spec +} + +func (f fakeRuntimeService) Config() criconfig.Config { + return testConfig +} + +func (f fakeRuntimeService) LoadOCISpec(filename string) (*oci.Spec, error) { + spec, ok := f.ocispecs[filename] + if !ok { + return nil, errdefs.ErrNotFound + } + return spec, nil +} + +type testOpt func(*criService) + +func withRuntimeService(rs RuntimeService) testOpt { + return func(service *criService) { + service.RuntimeService = rs + } +} + // newTestCRIService creates a fake criService for test. -func newTestCRIService() *criService { +func newTestCRIService(opts ...testOpt) *criService { labels := label.NewStore() - return &criService{ - ImageService: &fakeImageService{}, + service := &criService{ config: testConfig, os: ostesting.NewFakeOS(), sandboxStore: sandboxstore.NewStore(labels), @@ -90,4 +115,15 @@ func newTestCRIService() *criService { }, sandboxService: &fakeSandboxService{}, } + for _, opt := range opts { + opt(service) + } + if service.RuntimeService == nil { + service.RuntimeService = &fakeRuntimeService{} + } + if service.ImageService == nil { + service.ImageService = &fakeImageService{} + } + + return service } diff --git a/pkg/cri/server/base/sandbox_info.go b/pkg/cri/types/sandbox_info.go similarity index 100% rename from pkg/cri/server/base/sandbox_info.go rename to pkg/cri/types/sandbox_info.go diff --git a/plugins/cri/images/plugin.go b/plugins/cri/images/plugin.go index c297a28a5..963ef3b50 100644 --- a/plugins/cri/images/plugin.go +++ b/plugins/cri/images/plugin.go @@ -40,15 +40,14 @@ func init() { config := criconfig.DefaultImageConfig() registry.Register(&plugin.Registration{ - Type: plugins.CRIImagePlugin, - ID: "local", + Type: plugins.CRIServicePlugin, + ID: "images", Config: &config, Requires: []plugin.Type{ plugins.LeasePlugin, plugins.EventPlugin, plugins.MetadataPlugin, plugins.SandboxStorePlugin, - plugins.InternalPlugin, // For config migration ordering plugins.ServicePlugin, // For client plugins.SnapshotPlugin, // For root directory properties }, @@ -152,12 +151,12 @@ func configMigration(ctx context.Context, version int, pluginConfigs map[string] if version >= srvconfig.CurrentConfigVersion { return nil } - original, ok := pluginConfigs[string(plugins.InternalPlugin)+".cri"] + original, ok := pluginConfigs[string(plugins.GRPCPlugin)+".cri"] if !ok { return nil } src := original.(map[string]interface{}) - updated, ok := pluginConfigs[string(plugins.CRIImagePlugin)+".local"] + updated, ok := pluginConfigs[string(plugins.CRIServicePlugin)+".images"] var dst map[string]interface{} if ok { dst = updated.(map[string]interface{}) @@ -166,7 +165,7 @@ func configMigration(ctx context.Context, version int, pluginConfigs map[string] } migrateConfig(dst, src) - pluginConfigs[string(plugins.CRIImagePlugin)+".local"] = dst + pluginConfigs[string(plugins.CRIServicePlugin)+".images"] = dst return nil } func migrateConfig(dst, src map[string]interface{}) { diff --git a/pkg/cri/server/base/cri_base_test.go b/plugins/cri/runtime/load_test.go similarity index 98% rename from pkg/cri/server/base/cri_base_test.go rename to plugins/cri/runtime/load_test.go index 07e796640..2abd21580 100644 --- a/pkg/cri/server/base/cri_base_test.go +++ b/plugins/cri/runtime/load_test.go @@ -14,7 +14,7 @@ limitations under the License. */ -package base +package runtime import ( "encoding/json" diff --git a/pkg/cri/server/base/cri_base.go b/plugins/cri/runtime/plugin.go similarity index 85% rename from pkg/cri/server/base/cri_base.go rename to plugins/cri/runtime/plugin.go index a28070700..7e8181a88 100644 --- a/pkg/cri/server/base/cri_base.go +++ b/plugins/cri/runtime/plugin.go @@ -14,7 +14,7 @@ limitations under the License. */ -package base +package runtime import ( "context" @@ -36,47 +36,39 @@ import ( "github.com/containerd/containerd/v2/pkg/oci" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins/services/warning" + "github.com/containerd/errdefs" "github.com/containerd/platforms" ) -// CRIBase contains common dependencies for CRI's runtime, image, and podsandbox services. -type CRIBase struct { - // Config contains all configurations. - Config criconfig.Config - // BaseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec` - BaseOCISpecs map[string]*oci.Spec -} - func init() { config := criconfig.DefaultConfig() // Base plugin that other CRI services depend on. registry.Register(&plugin.Registration{ - Type: plugins.InternalPlugin, - ID: "cri", + Type: plugins.CRIServicePlugin, + ID: "runtime", Config: &config, Requires: []plugin.Type{ plugins.WarningPlugin, }, - ConfigMigration: func(ctx context.Context, version int, plugins map[string]interface{}) error { + ConfigMigration: func(ctx context.Context, version int, pluginConfigs map[string]interface{}) error { if version >= srvconfig.CurrentConfigVersion { return nil } - c, ok := plugins["io.containerd.grpc.v1.cri"] + c, ok := pluginConfigs[string(plugins.GRPCPlugin)+".cri"] if !ok { return nil } conf := c.(map[string]interface{}) migrateConfig(conf) - plugins["io.containerd.internal.v1.cri"] = conf - delete(plugins, "io.containerd.grpc.v1.cri") + pluginConfigs[string(plugins.CRIServicePlugin)+".runtime"] = conf return nil }, - InitFn: initCRIBase, + InitFn: initCRIRuntime, }) } -func initCRIBase(ic *plugin.InitContext) (interface{}, error) { +func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) { ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ctx := ic.Context @@ -118,12 +110,33 @@ func initCRIBase(ic *plugin.InitContext) (interface{}, error) { return nil, fmt.Errorf("failed to create load basic oci spec: %w", err) } - return &CRIBase{ - Config: c, - BaseOCISpecs: ociSpec, + return &runtime{ + config: c, + baseOCISpecs: ociSpec, }, nil } +// runtime contains common dependencies for CRI's runtime, image, and podsandbox services. +type runtime struct { + // Config contains all configurations. + config criconfig.Config + // BaseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec` + baseOCISpecs map[string]*oci.Spec +} + +func (r *runtime) Config() criconfig.Config { + return r.config +} + +func (r *runtime) LoadOCISpec(filename string) (*oci.Spec, error) { + spec, ok := r.baseOCISpecs[filename] + if !ok { + // TODO: Load here or only allow preloading... + return nil, errdefs.ErrNotFound + } + return spec, nil +} + func loadBaseOCISpecs(config *criconfig.Config) (map[string]*oci.Spec, error) { specs := map[string]*oci.Spec{} for _, cfg := range config.Runtimes { diff --git a/plugins/types.go b/plugins/types.go index 740e2d253..c09734443 100644 --- a/plugins/types.go +++ b/plugins/types.go @@ -67,8 +67,8 @@ const ( ImageVerifierPlugin plugin.Type = "io.containerd.image-verifier.v1" // WarningPlugin implements a warning service WarningPlugin plugin.Type = "io.containerd.warning.v1" - // CRIImagePlugin implements a cri image service - CRIImagePlugin plugin.Type = "io.containerd.cri.image.v1" + // CRIServicePlugin implements a cri service + CRIServicePlugin plugin.Type = "io.containerd.cri.v1" ) const ( From d29a1bc6a0a6dbf7ebde120edc2b228968850987 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 23 Jan 2024 21:55:57 -0800 Subject: [PATCH 2/5] Move sandbox info to cri types packages Signed-off-by: Derek McGowan --- integration/main_test.go | 6 +++--- integration/sandbox_run_rollback_test.go | 6 +++--- pkg/cri/server/podsandbox/sandbox_status.go | 4 ++-- pkg/cri/server/sandbox_status.go | 4 ++-- pkg/cri/types/sandbox_info.go | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/integration/main_test.go b/integration/main_test.go index bfe37eb17..57e1617d6 100644 --- a/integration/main_test.go +++ b/integration/main_test.go @@ -51,7 +51,7 @@ import ( dialer "github.com/containerd/containerd/v2/integration/remote/util" criconfig "github.com/containerd/containerd/v2/pkg/cri/config" "github.com/containerd/containerd/v2/pkg/cri/constants" - "github.com/containerd/containerd/v2/pkg/cri/server/base" + "github.com/containerd/containerd/v2/pkg/cri/types" "github.com/containerd/containerd/v2/pkg/cri/util" ) @@ -686,7 +686,7 @@ func CRIConfig() (*criconfig.Config, error) { } // SandboxInfo gets sandbox info. -func SandboxInfo(id string) (*runtime.PodSandboxStatus, *base.SandboxInfo, error) { +func SandboxInfo(id string) (*runtime.PodSandboxStatus, *types.SandboxInfo, error) { client, err := RawRuntimeClient() if err != nil { return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) @@ -699,7 +699,7 @@ func SandboxInfo(id string) (*runtime.PodSandboxStatus, *base.SandboxInfo, error return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) } status := resp.GetStatus() - var info base.SandboxInfo + var info types.SandboxInfo if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) } diff --git a/integration/sandbox_run_rollback_test.go b/integration/sandbox_run_rollback_test.go index cfaeab1c5..bd580aa49 100644 --- a/integration/sandbox_run_rollback_test.go +++ b/integration/sandbox_run_rollback_test.go @@ -36,7 +36,7 @@ import ( criapiv1 "k8s.io/cri-api/pkg/apis/runtime/v1" "github.com/containerd/containerd/v2/internal/failpoint" - "github.com/containerd/containerd/v2/pkg/cri/server/base" + "github.com/containerd/containerd/v2/pkg/cri/types" ) const ( @@ -299,7 +299,7 @@ func TestRunPodSandboxAndTeardownCNISlow(t *testing.T) { } // sbserverSandboxInfo gets sandbox info. -func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *base.SandboxInfo, error) { +func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *types.SandboxInfo, error) { client, err := RawRuntimeClient() if err != nil { return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) @@ -312,7 +312,7 @@ func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *base.SandboxIn return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) } status := resp.GetStatus() - var info base.SandboxInfo + var info types.SandboxInfo if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) } diff --git a/pkg/cri/server/podsandbox/sandbox_status.go b/pkg/cri/server/podsandbox/sandbox_status.go index d75b376f5..f48fddd4a 100644 --- a/pkg/cri/server/podsandbox/sandbox_status.go +++ b/pkg/cri/server/podsandbox/sandbox_status.go @@ -26,8 +26,8 @@ import ( containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/containers" "github.com/containerd/containerd/v2/core/sandbox" - "github.com/containerd/containerd/v2/pkg/cri/server/base" "github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" + critypes "github.com/containerd/containerd/v2/pkg/cri/types" "github.com/containerd/errdefs" ) @@ -63,7 +63,7 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) { - si := &base.SandboxInfo{ + si := &critypes.SandboxInfo{ Pid: sb.Pid, Config: sb.Metadata.Config, RuntimeHandler: sb.Metadata.RuntimeHandler, diff --git a/pkg/cri/server/sandbox_status.go b/pkg/cri/server/sandbox_status.go index 1bbd539f5..5dc9163a9 100644 --- a/pkg/cri/server/sandbox_status.go +++ b/pkg/cri/server/sandbox_status.go @@ -24,8 +24,8 @@ import ( runtime "k8s.io/cri-api/pkg/apis/runtime/v1" - "github.com/containerd/containerd/v2/pkg/cri/server/base" sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" + "github.com/containerd/containerd/v2/pkg/cri/types" "github.com/containerd/errdefs" ) @@ -152,7 +152,7 @@ func toCRISandboxStatus(meta sandboxstore.Metadata, status string, createdAt tim // but if controller.Status() returns a NotFound error, // we should fallback to get SandboxInfo from cached sandbox itself. func toDeletedCRISandboxInfo(sandbox sandboxstore.Sandbox) (map[string]string, error) { - si := &base.SandboxInfo{ + si := &types.SandboxInfo{ Pid: sandbox.Status.Get().Pid, Config: sandbox.Config, RuntimeHandler: sandbox.RuntimeHandler, diff --git a/pkg/cri/types/sandbox_info.go b/pkg/cri/types/sandbox_info.go index c38dd081b..1f5e63c8d 100644 --- a/pkg/cri/types/sandbox_info.go +++ b/pkg/cri/types/sandbox_info.go @@ -14,7 +14,7 @@ limitations under the License. */ -package base +package types import ( "github.com/containerd/go-cni" From 58ff9d368d2882df209d5eead4d0007e78947bc4 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 23 Jan 2024 22:05:52 -0800 Subject: [PATCH 3/5] Move cri plugin to plugins subpackage Signed-off-by: Derek McGowan --- cmd/containerd/builtins/cri.go | 2 +- contrib/fuzz/builtins.go | 2 +- {pkg => plugins}/cri/cri.go | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename {pkg => plugins}/cri/cri.go (100%) diff --git a/cmd/containerd/builtins/cri.go b/cmd/containerd/builtins/cri.go index 5c88cb9e4..4e68b40dd 100644 --- a/cmd/containerd/builtins/cri.go +++ b/cmd/containerd/builtins/cri.go @@ -19,7 +19,7 @@ package builtins import ( - _ "github.com/containerd/containerd/v2/pkg/cri" + _ "github.com/containerd/containerd/v2/plugins/cri" _ "github.com/containerd/containerd/v2/plugins/cri/images" _ "github.com/containerd/containerd/v2/plugins/cri/runtime" ) diff --git a/contrib/fuzz/builtins.go b/contrib/fuzz/builtins.go index 6c4888748..a5e5c2c1b 100644 --- a/contrib/fuzz/builtins.go +++ b/contrib/fuzz/builtins.go @@ -19,9 +19,9 @@ package fuzz import ( // base containerd imports _ "github.com/containerd/containerd/v2/core/runtime/v2" - _ "github.com/containerd/containerd/v2/pkg/cri" _ "github.com/containerd/containerd/v2/pkg/events/plugin" _ "github.com/containerd/containerd/v2/pkg/nri/plugin" + _ "github.com/containerd/containerd/v2/plugins/cri" _ "github.com/containerd/containerd/v2/plugins/cri/images" _ "github.com/containerd/containerd/v2/plugins/cri/runtime" _ "github.com/containerd/containerd/v2/plugins/diff/walking/plugin" diff --git a/pkg/cri/cri.go b/plugins/cri/cri.go similarity index 100% rename from pkg/cri/cri.go rename to plugins/cri/cri.go From 65b3922df75de299fe95a53a74fed8835740d6ba Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Sun, 28 Jan 2024 22:00:11 -0800 Subject: [PATCH 4/5] Split streaming config from runtime config Signed-off-by: Derek McGowan --- pkg/cri/config/config.go | 78 +++++---- pkg/cri/config/config_kernel_linux.go | 2 +- pkg/cri/config/config_kernel_linux_test.go | 8 +- pkg/cri/config/config_kernel_other.go | 2 +- pkg/cri/config/config_test.go | 77 +++++---- pkg/cri/config/config_unix.go | 19 +- pkg/cri/config/config_windows.go | 15 +- pkg/cri/config/streaming.go | 163 ++++++++++++++++++ pkg/cri/config/streaming_test.go | 130 ++++++++++++++ .../container_update_resources_linux_test.go | 2 +- pkg/cri/server/podsandbox/controller_test.go | 2 +- pkg/cri/server/runtime_config_linux_test.go | 4 +- pkg/cri/server/service.go | 4 +- pkg/cri/server/streaming.go | 137 --------------- pkg/cri/server/streaming_test.go | 163 ------------------ pkg/cri/server/test_config.go | 2 +- plugins/cri/cri.go | 29 +++- plugins/cri/runtime/plugin.go | 8 +- 18 files changed, 433 insertions(+), 412 deletions(-) create mode 100644 pkg/cri/config/streaming.go create mode 100644 pkg/cri/config/streaming_test.go delete mode 100644 pkg/cri/server/streaming_test.go diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 15617b1f0..6b0a0be93 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/log" "github.com/pelletier/go-toml/v2" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/kubelet/pkg/cri/streaming" runhcsoptions "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options" runcoptions "github.com/containerd/containerd/v2/core/runtime/v2/runc/options" @@ -312,31 +313,18 @@ type ImageConfig struct { StatsCollectPeriod int `toml:"stats_collect_period" json:"statsCollectPeriod"` } -// PluginConfig contains toml config related to CRI plugin, +// RuntimeConfig contains toml config related to CRI plugin, // it is a subset of Config. -type PluginConfig struct { +type RuntimeConfig struct { // ContainerdConfig contains config related to containerd ContainerdConfig `toml:"containerd" json:"containerd"` // CniConfig contains config related to cni CniConfig `toml:"cni" json:"cni"` - // StreamServerAddress is the ip address streaming server is listening on. - StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"` - // StreamServerPort is the port streaming server is listening on. - StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"` - // StreamIdleTimeout is the maximum time a streaming connection - // can be idle before the connection is automatically closed. - // The string is in the golang duration format, see: - // https://golang.org/pkg/time/#ParseDuration - StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"` // EnableSelinux indicates to enable the selinux support. EnableSelinux bool `toml:"enable_selinux" json:"enableSelinux"` // SelinuxCategoryRange allows the upper bound on the category range to be set. // If not specified or set to 0, defaults to 1024 from the selinux package. SelinuxCategoryRange int `toml:"selinux_category_range" json:"selinuxCategoryRange"` - // EnableTLSStreaming indicates to enable the TLS streaming support. - EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"` - // X509KeyPairStreaming is a x509 key pair used for TLS streaming - X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"` // MaxContainerLogLineSize is the maximum log line size in bytes for a container. // Log line longer than the limit will be split into multiple lines. Non-positive // value means no limit. @@ -416,10 +404,10 @@ type X509KeyPairStreaming struct { TLSKeyFile string `toml:"tls_key_file" json:"tlsKeyFile"` } -// Config contains all configurations for cri server. +// Config contains all configurations for CRI runtime plugin. type Config struct { - // PluginConfig is the config for CRI plugin. - PluginConfig + // RuntimeConfig is the config for CRI runtime. + RuntimeConfig // ContainerdRootDir is the root directory path for containerd. ContainerdRootDir string `json:"containerdRootDir"` // ContainerdEndpoint is the containerd endpoint path. @@ -431,10 +419,23 @@ type Config struct { StateDir string `json:"stateDir"` } -// ServiceConfig contains all the configuration for the CRI API server. -type ServiceConfig struct { +// ServerConfig contains all the configuration for the CRI API server. +type ServerConfig struct { // DisableTCPService disables serving CRI on the TCP server. DisableTCPService bool `toml:"disable_tcp_service" json:"disableTCPService"` + // StreamServerAddress is the ip address streaming server is listening on. + StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"` + // StreamServerPort is the port streaming server is listening on. + StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"` + // StreamIdleTimeout is the maximum time a streaming connection + // can be idle before the connection is automatically closed. + // The string is in the golang duration format, see: + // https://golang.org/pkg/time/#ParseDuration + StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"` + // EnableTLSStreaming indicates to enable the TLS streaming support. + EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"` + // X509KeyPairStreaming is a x509 key pair used for TLS streaming + X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"` } const ( @@ -498,8 +499,8 @@ func ValidateImageConfig(ctx context.Context, c *ImageConfig) ([]deprecation.War return warnings, nil } -// ValidatePluginConfig validates the given plugin configuration. -func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.Warning, error) { +// ValidateRuntimeConfig validates the given runtime configuration. +func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation.Warning, error) { var warnings []deprecation.Warning if c.ContainerdConfig.Runtimes == nil { c.ContainerdConfig.Runtimes = make(map[string]Runtime) @@ -524,13 +525,6 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W } } - // Validation for stream_idle_timeout - if c.StreamIdleTimeout != "" { - if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil { - return warnings, fmt.Errorf("invalid stream idle timeout: %w", err) - } - } - // Validation for drain_exec_sync_io_timeout if c.DrainExecSyncIOTimeout != "" { if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil { @@ -543,6 +537,18 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W return warnings, nil } +// ValidateServerConfig validates the given server configuration. +func ValidateServerConfig(ctx context.Context, c *ServerConfig) ([]deprecation.Warning, error) { + var warnings []deprecation.Warning + // Validation for stream_idle_timeout + if c.StreamIdleTimeout != "" { + if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil { + return warnings, fmt.Errorf("invalid stream idle timeout: %w", err) + } + } + return warnings, nil +} + func (config *Config) GetSandboxRuntime(podSandboxConfig *runtime.PodSandboxConfig, runtimeHandler string) (Runtime, error) { if untrustedWorkload(podSandboxConfig) { // If the untrusted annotation is provided, runtimeHandler MUST be empty. @@ -631,3 +637,17 @@ func getRuntimeOptionsType(t string) interface{} { return &runtimeoptions.Options{} } } + +func DefaultServerConfig() ServerConfig { + return ServerConfig{ + DisableTCPService: true, + StreamServerAddress: "127.0.0.1", + StreamServerPort: "0", + StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "", + TLSCertFile: "", + }, + } +} diff --git a/pkg/cri/config/config_kernel_linux.go b/pkg/cri/config/config_kernel_linux.go index 9da860750..85c564ae3 100644 --- a/pkg/cri/config/config_kernel_linux.go +++ b/pkg/cri/config/config_kernel_linux.go @@ -28,7 +28,7 @@ import ( var kernelGreaterEqualThan = kernel.GreaterEqualThan -func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error { +func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error { if c.EnableUnprivilegedICMP || c.EnableUnprivilegedPorts { fourDotEleven := kernel.KernelVersion{Kernel: 4, Major: 11} ok, err := kernelGreaterEqualThan(fourDotEleven) diff --git a/pkg/cri/config/config_kernel_linux_test.go b/pkg/cri/config/config_kernel_linux_test.go index 703178282..0afc57420 100644 --- a/pkg/cri/config/config_kernel_linux_test.go +++ b/pkg/cri/config/config_kernel_linux_test.go @@ -32,13 +32,13 @@ func TestValidateEnableUnprivileged(t *testing.T) { tests := []struct { name string - config *PluginConfig + config *RuntimeConfig kernelGreater bool expectedErr string }{ { name: "disable unprivileged_icmp and unprivileged_port", - config: &PluginConfig{ + config: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -54,7 +54,7 @@ func TestValidateEnableUnprivileged(t *testing.T) { }, { name: "enable unprivileged_icmp or unprivileged_port, but kernel version is smaller than 4.11", - config: &PluginConfig{ + config: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -71,7 +71,7 @@ func TestValidateEnableUnprivileged(t *testing.T) { }, { name: "enable unprivileged_icmp or unprivileged_port, but kernel version is greater than or equal 4.11", - config: &PluginConfig{ + config: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ diff --git a/pkg/cri/config/config_kernel_other.go b/pkg/cri/config/config_kernel_other.go index b4012e163..433c1682f 100644 --- a/pkg/cri/config/config_kernel_other.go +++ b/pkg/cri/config/config_kernel_other.go @@ -22,6 +22,6 @@ import ( "context" ) -func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error { +func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error { return nil } diff --git a/pkg/cri/config/config_test.go b/pkg/cri/config/config_test.go index a52b87df5..f7a5f30df 100644 --- a/pkg/cri/config/config_test.go +++ b/pkg/cri/config/config_test.go @@ -28,29 +28,32 @@ import ( func TestValidateConfig(t *testing.T) { for desc, test := range map[string]struct { - config *PluginConfig - expectedErr string - expected *PluginConfig - imageConfig *ImageConfig - imageExpectedErr string - imageExpected *ImageConfig - warnings []deprecation.Warning + runtimeConfig *RuntimeConfig + runtimeExpectedErr string + runtimeExpected *RuntimeConfig + imageConfig *ImageConfig + imageExpectedErr string + imageExpected *ImageConfig + serverConfig *ServerConfig + serverExpectedErr string + serverExpected *ServerConfig + warnings []deprecation.Warning }{ "no default_runtime_name": { - config: &PluginConfig{}, - expectedErr: "`default_runtime_name` is empty", + runtimeConfig: &RuntimeConfig{}, + runtimeExpectedErr: "`default_runtime_name` is empty", }, "no runtime[default_runtime_name]": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, }, }, - expectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"", + runtimeExpectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"", }, "deprecated auths": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -58,7 +61,7 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expected: &PluginConfig{ + runtimeExpected: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -92,18 +95,10 @@ func TestValidateConfig(t *testing.T) { warnings: []deprecation.Warning{deprecation.CRIRegistryAuths}, }, "invalid stream_idle_timeout": { - config: &PluginConfig{ + serverConfig: &ServerConfig{ StreamIdleTimeout: "invalid", - ContainerdConfig: ContainerdConfig{ - DefaultRuntimeName: RuntimeDefault, - Runtimes: map[string]Runtime{ - RuntimeDefault: { - Type: "default", - }, - }, - }, }, - expectedErr: "invalid stream idle timeout", + serverExpectedErr: "invalid stream idle timeout", }, "conflicting mirror registry config": { imageConfig: &ImageConfig{ @@ -117,7 +112,7 @@ func TestValidateConfig(t *testing.T) { imageExpectedErr: "`mirrors` cannot be set when `config_path` is provided", }, "deprecated mirrors": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -132,7 +127,7 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expected: &PluginConfig{ + runtimeExpected: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -152,7 +147,7 @@ func TestValidateConfig(t *testing.T) { warnings: []deprecation.Warning{deprecation.CRIRegistryMirrors}, }, "deprecated configs": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -171,7 +166,7 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expected: &PluginConfig{ + runtimeExpected: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -195,7 +190,7 @@ func TestValidateConfig(t *testing.T) { warnings: []deprecation.Warning{deprecation.CRIRegistryConfigs}, }, "privileged_without_host_devices_all_devices_allowed without privileged_without_host_devices": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -207,10 +202,10 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled", + runtimeExpectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled", }, "invalid drain_exec_sync_io_timeout input": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -221,18 +216,18 @@ func TestValidateConfig(t *testing.T) { }, DrainExecSyncIOTimeout: "10", }, - expectedErr: "invalid `drain_exec_sync_io_timeout`", + runtimeExpectedErr: "invalid `drain_exec_sync_io_timeout`", }, } { t.Run(desc, func(t *testing.T) { var warnings []deprecation.Warning - if test.config != nil { - w, err := ValidatePluginConfig(context.Background(), test.config) - if test.expectedErr != "" { - assert.Contains(t, err.Error(), test.expectedErr) + if test.runtimeConfig != nil { + w, err := ValidateRuntimeConfig(context.Background(), test.runtimeConfig) + if test.runtimeExpectedErr != "" { + assert.Contains(t, err.Error(), test.runtimeExpectedErr) } else { assert.NoError(t, err) - assert.Equal(t, test.expected, test.config) + assert.Equal(t, test.runtimeExpected, test.runtimeConfig) } warnings = append(warnings, w...) } @@ -246,6 +241,16 @@ func TestValidateConfig(t *testing.T) { } warnings = append(warnings, w...) } + if test.serverConfig != nil { + w, err := ValidateServerConfig(context.Background(), test.serverConfig) + if test.serverExpectedErr != "" { + assert.Contains(t, err.Error(), test.serverExpectedErr) + } else { + assert.NoError(t, err) + assert.Equal(t, test.serverExpected, test.serverConfig) + } + warnings = append(warnings, w...) + } if len(test.warnings) > 0 { assert.ElementsMatch(t, test.warnings, warnings) diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index c04651ecc..d31b090a1 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -21,7 +21,6 @@ package config import ( "github.com/containerd/containerd/v2/defaults" "github.com/pelletier/go-toml/v2" - "k8s.io/kubelet/pkg/cri/streaming" ) func DefaultImageConfig() ImageConfig { @@ -41,8 +40,8 @@ func DefaultImageConfig() ImageConfig { } } -// DefaultConfig returns default configurations of cri plugin. -func DefaultConfig() PluginConfig { +// DefaultRuntimeConfig returns default configurations of cri plugin. +func DefaultRuntimeConfig() RuntimeConfig { defaultRuncV2Opts := ` # NoNewKeyring disables new keyring for the container. NoNewKeyring = false @@ -71,7 +70,7 @@ func DefaultConfig() PluginConfig { var m map[string]interface{} toml.Unmarshal([]byte(defaultRuncV2Opts), &m) - return PluginConfig{ + return RuntimeConfig{ CniConfig: CniConfig{ NetworkPluginBinDir: "/opt/cni/bin", NetworkPluginConfDir: "/etc/cni/net.d", @@ -89,16 +88,8 @@ func DefaultConfig() PluginConfig { }, }, }, - StreamServerAddress: "127.0.0.1", - StreamServerPort: "0", - StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour - EnableSelinux: false, - SelinuxCategoryRange: 1024, - EnableTLSStreaming: false, - X509KeyPairStreaming: X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "", - }, + EnableSelinux: false, + SelinuxCategoryRange: 1024, MaxContainerLogLineSize: 16 * 1024, DisableProcMount: false, TolerateMissingHugetlbController: true, diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index db499a074..a1d4b072c 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -21,7 +21,6 @@ import ( "path/filepath" "github.com/containerd/containerd/v2/defaults" - "k8s.io/kubelet/pkg/cri/streaming" ) func DefaultImageConfig() ImageConfig { @@ -39,9 +38,9 @@ func DefaultImageConfig() ImageConfig { } } -// DefaultConfig returns default configurations of cri plugin. -func DefaultConfig() PluginConfig { - return PluginConfig{ +// DefaultRuntimeConfig returns default configurations of cri plugin. +func DefaultRuntimeConfig() RuntimeConfig { + return RuntimeConfig{ CniConfig: CniConfig{ NetworkPluginBinDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "bin"), NetworkPluginConfDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "conf"), @@ -78,14 +77,6 @@ func DefaultConfig() PluginConfig { }, }, }, - StreamServerAddress: "127.0.0.1", - StreamServerPort: "0", - StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour - EnableTLSStreaming: false, - X509KeyPairStreaming: X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "", - }, MaxContainerLogLineSize: 16 * 1024, IgnoreImageDefinedVolumes: false, // TODO(windows): Add platform specific config, so that most common defaults can be shared. diff --git a/pkg/cri/config/streaming.go b/pkg/cri/config/streaming.go new file mode 100644 index 000000000..b02e0bd58 --- /dev/null +++ b/pkg/cri/config/streaming.go @@ -0,0 +1,163 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package config + +import ( + "crypto/tls" + "errors" + "fmt" + "net" + "os" + "time" + + k8snet "k8s.io/apimachinery/pkg/util/net" + k8scert "k8s.io/client-go/util/cert" + + "k8s.io/kubelet/pkg/cri/streaming" +) + +type streamListenerMode int + +const ( + x509KeyPairTLS streamListenerMode = iota + selfSignTLS + withoutTLS +) + +func getStreamListenerMode(config *ServerConfig) (streamListenerMode, error) { + if config.EnableTLSStreaming { + if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile != "" { + return x509KeyPairTLS, nil + } + if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile == "" { + return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile") + } + if config.X509KeyPairStreaming.TLSCertFile == "" && config.X509KeyPairStreaming.TLSKeyFile != "" { + return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile") + } + return selfSignTLS, nil + } + if config.X509KeyPairStreaming.TLSCertFile != "" { + return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set") + } + if config.X509KeyPairStreaming.TLSKeyFile != "" { + return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set") + } + return withoutTLS, nil +} + +func (c *ServerConfig) StreamingConfig() (streaming.Config, error) { + var ( + addr = c.StreamServerAddress + port = c.StreamServerPort + streamIdleTimeout = c.StreamIdleTimeout + ) + if addr == "" { + a, err := k8snet.ResolveBindAddress(nil) + if err != nil { + return streaming.Config{}, fmt.Errorf("failed to get stream server address: %w", err) + } + addr = a.String() + } + config := streaming.DefaultConfig + if streamIdleTimeout != "" { + var err error + config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout) + if err != nil { + return streaming.Config{}, fmt.Errorf("invalid stream idle timeout: %w", err) + } + } + config.Addr = net.JoinHostPort(addr, port) + + tlsMode, err := getStreamListenerMode(c) + if err != nil { + return streaming.Config{}, fmt.Errorf("invalid stream server configuration: %w", err) + } + switch tlsMode { + case x509KeyPairTLS: + tlsCert, err := tls.LoadX509KeyPair(c.X509KeyPairStreaming.TLSCertFile, c.X509KeyPairStreaming.TLSKeyFile) + if err != nil { + return streaming.Config{}, fmt.Errorf("failed to load x509 key pair for stream server: %w", err) + } + config.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + } + case selfSignTLS: + tlsCert, err := newTLSCert() + if err != nil { + return streaming.Config{}, fmt.Errorf("failed to generate tls certificate for stream server: %w", err) + } + config.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + } + case withoutTLS: + default: + return streaming.Config{}, errors.New("invalid configuration for the stream listener") + } + return config, nil +} + +// newTLSCert returns a self CA signed tls.certificate. +// TODO (mikebrow): replace / rewrite this function to support using CA +// signing of the certificate. Requires a security plan for kubernetes regarding +// CRI connections / streaming, etc. For example, kubernetes could configure or +// require a CA service and pass a configuration down through CRI. +func newTLSCert() (tls.Certificate, error) { + fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err } + + hostName, err := os.Hostname() + if err != nil { + return fail(fmt.Errorf("failed to get hostname: %w", err)) + } + + addrs, err := net.InterfaceAddrs() + if err != nil { + return fail(fmt.Errorf("failed to get host IP addresses: %w", err)) + } + + var alternateIPs []net.IP + var alternateDNS []string + for _, addr := range addrs { + var ip net.IP + + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + default: + continue + } + + alternateIPs = append(alternateIPs, ip) + alternateDNS = append(alternateDNS, ip.String()) + } + + // Generate a self signed certificate key (CA is self) + certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS) + if err != nil { + return fail(fmt.Errorf("certificate key could not be created: %w", err)) + } + + // Load the tls certificate + tlsCert, err := tls.X509KeyPair(certPem, keyPem) + if err != nil { + return fail(fmt.Errorf("certificate could not be loaded: %w", err)) + } + + return tlsCert, nil +} diff --git a/pkg/cri/config/streaming_test.go b/pkg/cri/config/streaming_test.go new file mode 100644 index 000000000..cb86cc6f6 --- /dev/null +++ b/pkg/cri/config/streaming_test.go @@ -0,0 +1,130 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidateStreamServer(t *testing.T) { + for _, test := range []struct { + desc string + config ServerConfig + tlsMode streamListenerMode + expectErr bool + }{ + { + desc: "should pass with default withoutTLS", + config: DefaultServerConfig(), + tlsMode: withoutTLS, + expectErr: false, + }, + { + desc: "should pass with x509KeyPairTLS", + config: ServerConfig{ + EnableTLSStreaming: true, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "non-empty", + }, + }, + tlsMode: x509KeyPairTLS, + expectErr: false, + }, + { + desc: "should pass with selfSign", + config: ServerConfig{ + EnableTLSStreaming: true, + }, + tlsMode: selfSignTLS, + expectErr: false, + }, + { + desc: "should return error with X509 keypair but not EnableTLSStreaming", + config: ServerConfig{ + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "non-empty", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error with X509 TLSCertFile empty", + config: ServerConfig{ + EnableTLSStreaming: true, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error with X509 TLSKeyFile empty", + config: ServerConfig{ + EnableTLSStreaming: true, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "", + TLSCertFile: "non-empty", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error without EnableTLSStreaming and only TLSCertFile set", + config: ServerConfig{ + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "", + TLSCertFile: "non-empty", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error without EnableTLSStreaming and only TLSKeyFile set", + config: ServerConfig{ + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "", + }, + }, + tlsMode: -1, + expectErr: true, + }, + } { + test := test + t.Run(test.desc, func(t *testing.T) { + tlsMode, err := getStreamListenerMode(&test.config) + if test.expectErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, test.tlsMode, tlsMode) + }) + } +} diff --git a/pkg/cri/server/container_update_resources_linux_test.go b/pkg/cri/server/container_update_resources_linux_test.go index e81ef70be..ea9b36cff 100644 --- a/pkg/cri/server/container_update_resources_linux_test.go +++ b/pkg/cri/server/container_update_resources_linux_test.go @@ -239,7 +239,7 @@ func TestUpdateOCILinuxResource(t *testing.T) { test := test t.Run(test.desc, func(t *testing.T) { config := criconfig.Config{ - PluginConfig: criconfig.PluginConfig{ + RuntimeConfig: criconfig.RuntimeConfig{ TolerateMissingHugetlbController: true, DisableHugetlbController: false, }, diff --git a/pkg/cri/server/podsandbox/controller_test.go b/pkg/cri/server/podsandbox/controller_test.go index e71edf580..3a1580a10 100644 --- a/pkg/cri/server/podsandbox/controller_test.go +++ b/pkg/cri/server/podsandbox/controller_test.go @@ -38,7 +38,7 @@ const ( var testConfig = criconfig.Config{ RootDir: testRootDir, StateDir: testStateDir, - PluginConfig: criconfig.PluginConfig{ + RuntimeConfig: criconfig.RuntimeConfig{ TolerateMissingHugetlbController: true, }, } diff --git a/pkg/cri/server/runtime_config_linux_test.go b/pkg/cri/server/runtime_config_linux_test.go index 63768081c..4f409e21b 100644 --- a/pkg/cri/server/runtime_config_linux_test.go +++ b/pkg/cri/server/runtime_config_linux_test.go @@ -94,8 +94,8 @@ func TestRuntimeConfig(t *testing.T) { test := test t.Run(test.desc, func(t *testing.T) { c := newTestCRIService() - c.config.PluginConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime - c.config.PluginConfig.ContainerdConfig.Runtimes = test.runtimes + c.config.RuntimeConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime + c.config.RuntimeConfig.ContainerdConfig.Runtimes = test.runtimes resp, err := c.RuntimeConfig(context.TODO(), &runtime.RuntimeConfigRequest{}) assert.NoError(t, err) diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index dc1bbc23a..825287456 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -142,6 +142,8 @@ type CRIServiceOptions struct { ImageService ImageService + StreamingConfig streaming.Config + NRI *nri.API // SandboxControllers is a map of all the loaded sandbox controllers @@ -189,7 +191,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi } // prepare streaming server - c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout) + c.streamServer, err = streaming.NewServer(options.StreamingConfig, newStreamRuntime(c)) if err != nil { return nil, nil, fmt.Errorf("failed to create stream server: %w", err) } diff --git a/pkg/cri/server/streaming.go b/pkg/cri/server/streaming.go index 50ae7cb09..d3e9f9f27 100644 --- a/pkg/cri/server/streaming.go +++ b/pkg/cri/server/streaming.go @@ -18,104 +18,18 @@ package server import ( "context" - "crypto/tls" - "errors" "fmt" "io" "math" - "net" - "os" - "time" - k8snet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/remotecommand" - k8scert "k8s.io/client-go/util/cert" "k8s.io/utils/exec" ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" "k8s.io/kubelet/pkg/cri/streaming" ) -type streamListenerMode int - -const ( - x509KeyPairTLS streamListenerMode = iota - selfSignTLS - withoutTLS -) - -func getStreamListenerMode(c *criService) (streamListenerMode, error) { - if c.config.EnableTLSStreaming { - if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" { - return x509KeyPairTLS, nil - } - if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile == "" { - return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile") - } - if c.config.X509KeyPairStreaming.TLSCertFile == "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" { - return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile") - } - return selfSignTLS, nil - } - if c.config.X509KeyPairStreaming.TLSCertFile != "" { - return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set") - } - if c.config.X509KeyPairStreaming.TLSKeyFile != "" { - return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set") - } - return withoutTLS, nil -} - -func newStreamServer(c *criService, addr, port, streamIdleTimeout string) (streaming.Server, error) { - if addr == "" { - a, err := k8snet.ResolveBindAddress(nil) - if err != nil { - return nil, fmt.Errorf("failed to get stream server address: %w", err) - } - addr = a.String() - } - config := streaming.DefaultConfig - if streamIdleTimeout != "" { - var err error - config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout) - if err != nil { - return nil, fmt.Errorf("invalid stream idle timeout: %w", err) - } - } - config.Addr = net.JoinHostPort(addr, port) - run := newStreamRuntime(c) - tlsMode, err := getStreamListenerMode(c) - if err != nil { - return nil, fmt.Errorf("invalid stream server configuration: %w", err) - } - switch tlsMode { - case x509KeyPairTLS: - tlsCert, err := tls.LoadX509KeyPair(c.config.X509KeyPairStreaming.TLSCertFile, c.config.X509KeyPairStreaming.TLSKeyFile) - if err != nil { - return nil, fmt.Errorf("failed to load x509 key pair for stream server: %w", err) - } - config.TLSConfig = &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - } - return streaming.NewServer(config, run) - case selfSignTLS: - tlsCert, err := newTLSCert() - if err != nil { - return nil, fmt.Errorf("failed to generate tls certificate for stream server: %w", err) - } - config.TLSConfig = &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - InsecureSkipVerify: true, - } - return streaming.NewServer(config, run) - case withoutTLS: - return streaming.NewServer(config, run) - default: - return nil, errors.New("invalid configuration for the stream listener") - } -} - type streamRuntime struct { c *criService } @@ -187,54 +101,3 @@ func handleResizing(ctx context.Context, resize <-chan remotecommand.TerminalSiz } }() } - -// newTLSCert returns a self CA signed tls.certificate. -// TODO (mikebrow): replace / rewrite this function to support using CA -// signing of the certificate. Requires a security plan for kubernetes regarding -// CRI connections / streaming, etc. For example, kubernetes could configure or -// require a CA service and pass a configuration down through CRI. -func newTLSCert() (tls.Certificate, error) { - fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err } - - hostName, err := os.Hostname() - if err != nil { - return fail(fmt.Errorf("failed to get hostname: %w", err)) - } - - addrs, err := net.InterfaceAddrs() - if err != nil { - return fail(fmt.Errorf("failed to get host IP addresses: %w", err)) - } - - var alternateIPs []net.IP - var alternateDNS []string - for _, addr := range addrs { - var ip net.IP - - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - default: - continue - } - - alternateIPs = append(alternateIPs, ip) - alternateDNS = append(alternateDNS, ip.String()) - } - - // Generate a self signed certificate key (CA is self) - certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS) - if err != nil { - return fail(fmt.Errorf("certificate key could not be created: %w", err)) - } - - // Load the tls certificate - tlsCert, err := tls.X509KeyPair(certPem, keyPem) - if err != nil { - return fail(fmt.Errorf("certificate could not be loaded: %w", err)) - } - - return tlsCert, nil -} diff --git a/pkg/cri/server/streaming_test.go b/pkg/cri/server/streaming_test.go deleted file mode 100644 index d93945d6b..000000000 --- a/pkg/cri/server/streaming_test.go +++ /dev/null @@ -1,163 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package server - -import ( - "testing" - - "github.com/containerd/containerd/v2/pkg/cri/config" - "github.com/stretchr/testify/assert" -) - -func TestValidateStreamServer(t *testing.T) { - for _, test := range []struct { - desc string - *criService - tlsMode streamListenerMode - expectErr bool - }{ - { - desc: "should pass with default withoutTLS", - criService: &criService{ - config: config.Config{ - PluginConfig: config.DefaultConfig(), - }, - }, - tlsMode: withoutTLS, - expectErr: false, - }, - { - desc: "should pass with x509KeyPairTLS", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: x509KeyPairTLS, - expectErr: false, - }, - { - desc: "should pass with selfSign", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - }, - }, - }, - tlsMode: selfSignTLS, - expectErr: false, - }, - { - desc: "should return error with X509 keypair but not EnableTLSStreaming", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: false, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error with X509 TLSCertFile empty", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error with X509 TLSKeyFile empty", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error without EnableTLSStreaming and only TLSCertFile set", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: false, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error without EnableTLSStreaming and only TLSKeyFile set", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: false, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - } { - test := test - t.Run(test.desc, func(t *testing.T) { - tlsMode, err := getStreamListenerMode(test.criService) - if test.expectErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - assert.Equal(t, test.tlsMode, tlsMode) - }) - } -} diff --git a/pkg/cri/server/test_config.go b/pkg/cri/server/test_config.go index a0ec785ff..6c0eaeaf7 100644 --- a/pkg/cri/server/test_config.go +++ b/pkg/cri/server/test_config.go @@ -26,7 +26,7 @@ const ( var testConfig = criconfig.Config{ RootDir: testRootDir, StateDir: testStateDir, - PluginConfig: criconfig.PluginConfig{ + RuntimeConfig: criconfig.RuntimeConfig{ TolerateMissingHugetlbController: true, ContainerdConfig: criconfig.ContainerdConfig{ DefaultRuntimeName: "runc", diff --git a/plugins/cri/cri.go b/plugins/cri/cri.go index a9071e40a..1f1956c89 100644 --- a/plugins/cri/cri.go +++ b/plugins/cri/cri.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/v2/pkg/cri/server" nriservice "github.com/containerd/containerd/v2/pkg/nri" "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/platforms" "google.golang.org/grpc" @@ -44,6 +45,7 @@ import ( // Register CRI service plugin func init() { + defaultConfig := criconfig.DefaultServerConfig() registry.Register(&plugin.Registration{ Type: plugins.GRPCPlugin, ID: "cri", @@ -56,10 +58,9 @@ func init() { plugins.LeasePlugin, plugins.SandboxStorePlugin, plugins.TransferPlugin, + plugins.WarningPlugin, }, - Config: &criconfig.ServiceConfig{ - DisableTCPService: true, - }, + Config: &defaultConfig, ConfigMigration: func(ctx context.Context, version int, pluginConfigs map[string]interface{}) error { if version >= srvconfig.CurrentConfigVersion { return nil @@ -87,7 +88,7 @@ func init() { func initCRIService(ic *plugin.InitContext) (interface{}, error) { ctx := ic.Context - config := ic.Config.(*criconfig.ServiceConfig) + config := ic.Config.(*criconfig.ServerConfig) // Get runtime service. criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime") @@ -101,6 +102,19 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err) } + if warnings, err := criconfig.ValidateServerConfig(ic.Context, config); err != nil { + return nil, fmt.Errorf("invalid cri image config: %w", err) + } else if len(warnings) > 0 { + ws, err := ic.GetSingle(plugins.WarningPlugin) + if err != nil { + return nil, err + } + warn := ws.(warning.Service) + for _, w := range warnings { + warn.Emit(ic.Context, w) + } + } + log.G(ctx).Info("Connect containerd service") client, err := containerd.New( "", @@ -119,16 +133,21 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)), } + streamingConfig, err := config.StreamingConfig() + if err != nil { + return nil, fmt.Errorf("failed to get streaming config: %w", err) + } + options := &server.CRIServiceOptions{ RuntimeService: criRuntimePlugin.(server.RuntimeService), ImageService: criImagePlugin.(server.ImageService), + StreamingConfig: streamingConfig, NRI: getNRIAPI(ic), Client: client, SandboxControllers: sbControllers, } is := criImagePlugin.(imageService).GRPCService() - // TODO: More options specifically for grpc service? s, rs, err := server.NewCRIService(options) if err != nil { return nil, fmt.Errorf("failed to create CRI service: %w", err) diff --git a/plugins/cri/runtime/plugin.go b/plugins/cri/runtime/plugin.go index 7e8181a88..eb82ef2ce 100644 --- a/plugins/cri/runtime/plugin.go +++ b/plugins/cri/runtime/plugin.go @@ -41,7 +41,7 @@ import ( ) func init() { - config := criconfig.DefaultConfig() + config := criconfig.DefaultRuntimeConfig() // Base plugin that other CRI services depend on. registry.Register(&plugin.Registration{ @@ -72,8 +72,8 @@ func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) { ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ctx := ic.Context - pluginConfig := ic.Config.(*criconfig.PluginConfig) - if warnings, err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil { + pluginConfig := ic.Config.(*criconfig.RuntimeConfig) + if warnings, err := criconfig.ValidateRuntimeConfig(ctx, pluginConfig); err != nil { return nil, fmt.Errorf("invalid plugin config: %w", err) } else if len(warnings) > 0 { ws, err := ic.GetSingle(plugins.WarningPlugin) @@ -92,7 +92,7 @@ func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) { containerdStateDir := filepath.Dir(ic.Properties[plugins.PropertyStateDir]) stateDir := filepath.Join(containerdStateDir, "io.containerd.grpc.v1.cri") c := criconfig.Config{ - PluginConfig: *pluginConfig, + RuntimeConfig: *pluginConfig, ContainerdRootDir: containerdRootDir, ContainerdEndpoint: ic.Properties[plugins.PropertyGRPCAddress], RootDir: rootDir, From 64b4778fc2c360b153b5df5b504ccd858d3601d5 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Sun, 28 Jan 2024 23:14:19 -0800 Subject: [PATCH 5/5] Add deprecation warnings to CRI image server configuration Signed-off-by: Derek McGowan --- plugins/cri/images/plugin.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/plugins/cri/images/plugin.go b/plugins/cri/images/plugin.go index 963ef3b50..a0f2de88c 100644 --- a/plugins/cri/images/plugin.go +++ b/plugins/cri/images/plugin.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd/v2/pkg/cri/server/images" "github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/log" "github.com/containerd/platforms" "github.com/containerd/plugin" @@ -50,6 +51,7 @@ func init() { plugins.SandboxStorePlugin, plugins.ServicePlugin, // For client plugins.SnapshotPlugin, // For root directory properties + plugins.WarningPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { m, err := ic.GetSingle(plugins.MetadataPlugin) @@ -63,6 +65,19 @@ func init() { return nil, err } + if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil { + return nil, fmt.Errorf("invalid cri image config: %w", err) + } else if len(warnings) > 0 { + ws, err := ic.GetSingle(plugins.WarningPlugin) + if err != nil { + return nil, err + } + warn := ws.(warning.Service) + for _, w := range warnings { + warn.Emit(ic.Context, w) + } + } + options := &images.CRIImageServiceOptions{ Content: mdb.ContentStore(), Images: metadata.NewImageStore(mdb),