Merge pull request #9681 from dmcgowan/cri-runtime-plugin

Add CRI Service plugin type
This commit is contained in:
Derek McGowan 2024-01-29 16:05:16 +00:00 committed by GitHub
commit 1b6019b5ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 659 additions and 522 deletions

View File

@ -19,6 +19,7 @@
package builtins package builtins
import ( import (
_ "github.com/containerd/containerd/v2/pkg/cri" _ "github.com/containerd/containerd/v2/plugins/cri"
_ "github.com/containerd/containerd/v2/plugins/cri/images" _ "github.com/containerd/containerd/v2/plugins/cri/images"
_ "github.com/containerd/containerd/v2/plugins/cri/runtime"
) )

View File

@ -19,10 +19,11 @@ package fuzz
import ( import (
// base containerd imports // base containerd imports
_ "github.com/containerd/containerd/v2/core/runtime/v2" _ "github.com/containerd/containerd/v2/core/runtime/v2"
_ "github.com/containerd/containerd/v2/pkg/cri"
_ "github.com/containerd/containerd/v2/pkg/events/plugin" _ "github.com/containerd/containerd/v2/pkg/events/plugin"
_ "github.com/containerd/containerd/v2/pkg/nri/plugin" _ "github.com/containerd/containerd/v2/pkg/nri/plugin"
_ "github.com/containerd/containerd/v2/plugins/cri"
_ "github.com/containerd/containerd/v2/plugins/cri/images" _ "github.com/containerd/containerd/v2/plugins/cri/images"
_ "github.com/containerd/containerd/v2/plugins/cri/runtime"
_ "github.com/containerd/containerd/v2/plugins/diff/walking/plugin" _ "github.com/containerd/containerd/v2/plugins/diff/walking/plugin"
_ "github.com/containerd/containerd/v2/plugins/gc" _ "github.com/containerd/containerd/v2/plugins/gc"
_ "github.com/containerd/containerd/v2/plugins/imageverifier" _ "github.com/containerd/containerd/v2/plugins/imageverifier"

View File

@ -29,6 +29,7 @@ import (
"github.com/containerd/containerd/v2/pkg/cri/server" "github.com/containerd/containerd/v2/pkg/cri/server"
"github.com/containerd/containerd/v2/pkg/cri/server/images" "github.com/containerd/containerd/v2/pkg/cri/server/images"
"github.com/containerd/containerd/v2/pkg/oci" "github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
) )
func FuzzCRIServer(data []byte) int { func FuzzCRIServer(data []byte) int {
@ -42,7 +43,6 @@ func FuzzCRIServer(data []byte) int {
} }
defer client.Close() defer client.Close()
config := criconfig.Config{}
imageConfig := criconfig.ImageConfig{} imageConfig := criconfig.ImageConfig{}
imageService, err := images.NewService(imageConfig, &images.CRIImageServiceOptions{ imageService, err := images.NewService(imageConfig, &images.CRIImageServiceOptions{
@ -52,10 +52,10 @@ func FuzzCRIServer(data []byte) int {
panic(err) panic(err)
} }
c, rs, err := server.NewCRIService(config, &server.CRIServiceOptions{ c, rs, err := server.NewCRIService(&server.CRIServiceOptions{
ImageService: imageService, RuntimeService: &fakeRuntimeService{},
Client: client, ImageService: imageService,
BaseOCISpecs: map[string]*oci.Spec{}, Client: client,
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -68,6 +68,16 @@ func FuzzCRIServer(data []byte) int {
}) })
} }
type fakeRuntimeService struct{}
func (fakeRuntimeService) Config() criconfig.Config {
return criconfig.Config{}
}
func (fakeRuntimeService) LoadOCISpec(string) (*oci.Spec, error) {
return nil, errdefs.ErrNotFound
}
type service struct { type service struct {
server.CRIService server.CRIService
runtime.RuntimeServiceServer runtime.RuntimeServiceServer

View File

@ -38,6 +38,7 @@ import (
_ "github.com/containerd/containerd/v2/core/runtime/v2/runc/options" _ "github.com/containerd/containerd/v2/core/runtime/v2/runc/options"
_ "github.com/containerd/containerd/v2/pkg/events/plugin" _ "github.com/containerd/containerd/v2/pkg/events/plugin"
_ "github.com/containerd/containerd/v2/plugins/cri/images" _ "github.com/containerd/containerd/v2/plugins/cri/images"
_ "github.com/containerd/containerd/v2/plugins/cri/runtime"
_ "github.com/containerd/containerd/v2/plugins/diff/walking/plugin" _ "github.com/containerd/containerd/v2/plugins/diff/walking/plugin"
_ "github.com/containerd/containerd/v2/plugins/gc" _ "github.com/containerd/containerd/v2/plugins/gc"
_ "github.com/containerd/containerd/v2/plugins/leases" _ "github.com/containerd/containerd/v2/plugins/leases"

View File

@ -51,7 +51,7 @@ import (
dialer "github.com/containerd/containerd/v2/integration/remote/util" dialer "github.com/containerd/containerd/v2/integration/remote/util"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/pkg/cri/constants"
"github.com/containerd/containerd/v2/pkg/cri/server/base" "github.com/containerd/containerd/v2/pkg/cri/types"
"github.com/containerd/containerd/v2/pkg/cri/util" "github.com/containerd/containerd/v2/pkg/cri/util"
) )
@ -686,7 +686,7 @@ func CRIConfig() (*criconfig.Config, error) {
} }
// SandboxInfo gets sandbox info. // SandboxInfo gets sandbox info.
func SandboxInfo(id string) (*runtime.PodSandboxStatus, *base.SandboxInfo, error) { func SandboxInfo(id string) (*runtime.PodSandboxStatus, *types.SandboxInfo, error) {
client, err := RawRuntimeClient() client, err := RawRuntimeClient()
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err)
@ -699,7 +699,7 @@ func SandboxInfo(id string) (*runtime.PodSandboxStatus, *base.SandboxInfo, error
return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err)
} }
status := resp.GetStatus() status := resp.GetStatus()
var info base.SandboxInfo var info types.SandboxInfo
if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err)
} }

View File

@ -36,7 +36,7 @@ import (
criapiv1 "k8s.io/cri-api/pkg/apis/runtime/v1" criapiv1 "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/v2/internal/failpoint" "github.com/containerd/containerd/v2/internal/failpoint"
"github.com/containerd/containerd/v2/pkg/cri/server/base" "github.com/containerd/containerd/v2/pkg/cri/types"
) )
const ( const (
@ -299,7 +299,7 @@ func TestRunPodSandboxAndTeardownCNISlow(t *testing.T) {
} }
// sbserverSandboxInfo gets sandbox info. // sbserverSandboxInfo gets sandbox info.
func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *base.SandboxInfo, error) { func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *types.SandboxInfo, error) {
client, err := RawRuntimeClient() client, err := RawRuntimeClient()
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err) return nil, nil, fmt.Errorf("failed to get raw runtime client: %w", err)
@ -312,7 +312,7 @@ func sbserverSandboxInfo(id string) (*criapiv1.PodSandboxStatus, *base.SandboxIn
return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err) return nil, nil, fmt.Errorf("failed to get sandbox status: %w", err)
} }
status := resp.GetStatus() status := resp.GetStatus()
var info base.SandboxInfo var info types.SandboxInfo
if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil { if err := json.Unmarshal([]byte(resp.GetInfo()["info"]), &info); err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err) return nil, nil, fmt.Errorf("failed to unmarshal sandbox info: %w", err)
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/containerd/log" "github.com/containerd/log"
"github.com/pelletier/go-toml/v2" "github.com/pelletier/go-toml/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubelet/pkg/cri/streaming"
runhcsoptions "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options" runhcsoptions "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
runcoptions "github.com/containerd/containerd/v2/core/runtime/v2/runc/options" runcoptions "github.com/containerd/containerd/v2/core/runtime/v2/runc/options"
@ -312,33 +313,18 @@ type ImageConfig struct {
StatsCollectPeriod int `toml:"stats_collect_period" json:"statsCollectPeriod"` StatsCollectPeriod int `toml:"stats_collect_period" json:"statsCollectPeriod"`
} }
// PluginConfig contains toml config related to CRI plugin, // RuntimeConfig contains toml config related to CRI plugin,
// it is a subset of Config. // it is a subset of Config.
type PluginConfig struct { type RuntimeConfig struct {
// ContainerdConfig contains config related to containerd // ContainerdConfig contains config related to containerd
ContainerdConfig `toml:"containerd" json:"containerd"` ContainerdConfig `toml:"containerd" json:"containerd"`
// CniConfig contains config related to cni // CniConfig contains config related to cni
CniConfig `toml:"cni" json:"cni"` CniConfig `toml:"cni" json:"cni"`
// DisableTCPService disables serving CRI on the TCP server.
DisableTCPService bool `toml:"disable_tcp_service" json:"disableTCPService"`
// StreamServerAddress is the ip address streaming server is listening on.
StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"`
// StreamServerPort is the port streaming server is listening on.
StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"`
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"`
// EnableSelinux indicates to enable the selinux support. // EnableSelinux indicates to enable the selinux support.
EnableSelinux bool `toml:"enable_selinux" json:"enableSelinux"` EnableSelinux bool `toml:"enable_selinux" json:"enableSelinux"`
// SelinuxCategoryRange allows the upper bound on the category range to be set. // SelinuxCategoryRange allows the upper bound on the category range to be set.
// If not specified or set to 0, defaults to 1024 from the selinux package. // If not specified or set to 0, defaults to 1024 from the selinux package.
SelinuxCategoryRange int `toml:"selinux_category_range" json:"selinuxCategoryRange"` SelinuxCategoryRange int `toml:"selinux_category_range" json:"selinuxCategoryRange"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// X509KeyPairStreaming is a x509 key pair used for TLS streaming
X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"`
// MaxContainerLogLineSize is the maximum log line size in bytes for a container. // MaxContainerLogLineSize is the maximum log line size in bytes for a container.
// Log line longer than the limit will be split into multiple lines. Non-positive // Log line longer than the limit will be split into multiple lines. Non-positive
// value means no limit. // value means no limit.
@ -418,10 +404,10 @@ type X509KeyPairStreaming struct {
TLSKeyFile string `toml:"tls_key_file" json:"tlsKeyFile"` TLSKeyFile string `toml:"tls_key_file" json:"tlsKeyFile"`
} }
// Config contains all configurations for cri server. // Config contains all configurations for CRI runtime plugin.
type Config struct { type Config struct {
// PluginConfig is the config for CRI plugin. // RuntimeConfig is the config for CRI runtime.
PluginConfig RuntimeConfig
// ContainerdRootDir is the root directory path for containerd. // ContainerdRootDir is the root directory path for containerd.
ContainerdRootDir string `json:"containerdRootDir"` ContainerdRootDir string `json:"containerdRootDir"`
// ContainerdEndpoint is the containerd endpoint path. // ContainerdEndpoint is the containerd endpoint path.
@ -433,6 +419,25 @@ type Config struct {
StateDir string `json:"stateDir"` StateDir string `json:"stateDir"`
} }
// ServerConfig contains all the configuration for the CRI API server.
type ServerConfig struct {
// DisableTCPService disables serving CRI on the TCP server.
DisableTCPService bool `toml:"disable_tcp_service" json:"disableTCPService"`
// StreamServerAddress is the ip address streaming server is listening on.
StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"`
// StreamServerPort is the port streaming server is listening on.
StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"`
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// X509KeyPairStreaming is a x509 key pair used for TLS streaming
X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"`
}
const ( const (
// RuntimeUntrusted is the implicit runtime defined for ContainerdConfig.UntrustedWorkloadRuntime // RuntimeUntrusted is the implicit runtime defined for ContainerdConfig.UntrustedWorkloadRuntime
RuntimeUntrusted = "untrusted" RuntimeUntrusted = "untrusted"
@ -494,8 +499,8 @@ func ValidateImageConfig(ctx context.Context, c *ImageConfig) ([]deprecation.War
return warnings, nil return warnings, nil
} }
// ValidatePluginConfig validates the given plugin configuration. // ValidateRuntimeConfig validates the given runtime configuration.
func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.Warning, error) { func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation.Warning, error) {
var warnings []deprecation.Warning var warnings []deprecation.Warning
if c.ContainerdConfig.Runtimes == nil { if c.ContainerdConfig.Runtimes == nil {
c.ContainerdConfig.Runtimes = make(map[string]Runtime) c.ContainerdConfig.Runtimes = make(map[string]Runtime)
@ -520,13 +525,6 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W
} }
} }
// Validation for stream_idle_timeout
if c.StreamIdleTimeout != "" {
if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil {
return warnings, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
// Validation for drain_exec_sync_io_timeout // Validation for drain_exec_sync_io_timeout
if c.DrainExecSyncIOTimeout != "" { if c.DrainExecSyncIOTimeout != "" {
if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil { if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil {
@ -539,6 +537,18 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W
return warnings, nil return warnings, nil
} }
// ValidateServerConfig validates the given server configuration.
func ValidateServerConfig(ctx context.Context, c *ServerConfig) ([]deprecation.Warning, error) {
var warnings []deprecation.Warning
// Validation for stream_idle_timeout
if c.StreamIdleTimeout != "" {
if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil {
return warnings, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
return warnings, nil
}
func (config *Config) GetSandboxRuntime(podSandboxConfig *runtime.PodSandboxConfig, runtimeHandler string) (Runtime, error) { func (config *Config) GetSandboxRuntime(podSandboxConfig *runtime.PodSandboxConfig, runtimeHandler string) (Runtime, error) {
if untrustedWorkload(podSandboxConfig) { if untrustedWorkload(podSandboxConfig) {
// If the untrusted annotation is provided, runtimeHandler MUST be empty. // If the untrusted annotation is provided, runtimeHandler MUST be empty.
@ -627,3 +637,17 @@ func getRuntimeOptionsType(t string) interface{} {
return &runtimeoptions.Options{} return &runtimeoptions.Options{}
} }
} }
func DefaultServerConfig() ServerConfig {
return ServerConfig{
DisableTCPService: true,
StreamServerAddress: "127.0.0.1",
StreamServerPort: "0",
StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "",
},
}
}

View File

@ -28,7 +28,7 @@ import (
var kernelGreaterEqualThan = kernel.GreaterEqualThan var kernelGreaterEqualThan = kernel.GreaterEqualThan
func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error { func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error {
if c.EnableUnprivilegedICMP || c.EnableUnprivilegedPorts { if c.EnableUnprivilegedICMP || c.EnableUnprivilegedPorts {
fourDotEleven := kernel.KernelVersion{Kernel: 4, Major: 11} fourDotEleven := kernel.KernelVersion{Kernel: 4, Major: 11}
ok, err := kernelGreaterEqualThan(fourDotEleven) ok, err := kernelGreaterEqualThan(fourDotEleven)

View File

@ -32,13 +32,13 @@ func TestValidateEnableUnprivileged(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
config *PluginConfig config *RuntimeConfig
kernelGreater bool kernelGreater bool
expectedErr string expectedErr string
}{ }{
{ {
name: "disable unprivileged_icmp and unprivileged_port", name: "disable unprivileged_icmp and unprivileged_port",
config: &PluginConfig{ config: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -54,7 +54,7 @@ func TestValidateEnableUnprivileged(t *testing.T) {
}, },
{ {
name: "enable unprivileged_icmp or unprivileged_port, but kernel version is smaller than 4.11", name: "enable unprivileged_icmp or unprivileged_port, but kernel version is smaller than 4.11",
config: &PluginConfig{ config: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -71,7 +71,7 @@ func TestValidateEnableUnprivileged(t *testing.T) {
}, },
{ {
name: "enable unprivileged_icmp or unprivileged_port, but kernel version is greater than or equal 4.11", name: "enable unprivileged_icmp or unprivileged_port, but kernel version is greater than or equal 4.11",
config: &PluginConfig{ config: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{

View File

@ -22,6 +22,6 @@ import (
"context" "context"
) )
func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error { func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error {
return nil return nil
} }

View File

@ -28,29 +28,32 @@ import (
func TestValidateConfig(t *testing.T) { func TestValidateConfig(t *testing.T) {
for desc, test := range map[string]struct { for desc, test := range map[string]struct {
config *PluginConfig runtimeConfig *RuntimeConfig
expectedErr string runtimeExpectedErr string
expected *PluginConfig runtimeExpected *RuntimeConfig
imageConfig *ImageConfig imageConfig *ImageConfig
imageExpectedErr string imageExpectedErr string
imageExpected *ImageConfig imageExpected *ImageConfig
warnings []deprecation.Warning serverConfig *ServerConfig
serverExpectedErr string
serverExpected *ServerConfig
warnings []deprecation.Warning
}{ }{
"no default_runtime_name": { "no default_runtime_name": {
config: &PluginConfig{}, runtimeConfig: &RuntimeConfig{},
expectedErr: "`default_runtime_name` is empty", runtimeExpectedErr: "`default_runtime_name` is empty",
}, },
"no runtime[default_runtime_name]": { "no runtime[default_runtime_name]": {
config: &PluginConfig{ runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
}, },
}, },
expectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"", runtimeExpectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"",
}, },
"deprecated auths": { "deprecated auths": {
config: &PluginConfig{ runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -58,7 +61,7 @@ func TestValidateConfig(t *testing.T) {
}, },
}, },
}, },
expected: &PluginConfig{ runtimeExpected: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -92,18 +95,10 @@ func TestValidateConfig(t *testing.T) {
warnings: []deprecation.Warning{deprecation.CRIRegistryAuths}, warnings: []deprecation.Warning{deprecation.CRIRegistryAuths},
}, },
"invalid stream_idle_timeout": { "invalid stream_idle_timeout": {
config: &PluginConfig{ serverConfig: &ServerConfig{
StreamIdleTimeout: "invalid", StreamIdleTimeout: "invalid",
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
RuntimeDefault: {
Type: "default",
},
},
},
}, },
expectedErr: "invalid stream idle timeout", serverExpectedErr: "invalid stream idle timeout",
}, },
"conflicting mirror registry config": { "conflicting mirror registry config": {
imageConfig: &ImageConfig{ imageConfig: &ImageConfig{
@ -117,7 +112,7 @@ func TestValidateConfig(t *testing.T) {
imageExpectedErr: "`mirrors` cannot be set when `config_path` is provided", imageExpectedErr: "`mirrors` cannot be set when `config_path` is provided",
}, },
"deprecated mirrors": { "deprecated mirrors": {
config: &PluginConfig{ runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -132,7 +127,7 @@ func TestValidateConfig(t *testing.T) {
}, },
}, },
}, },
expected: &PluginConfig{ runtimeExpected: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -152,7 +147,7 @@ func TestValidateConfig(t *testing.T) {
warnings: []deprecation.Warning{deprecation.CRIRegistryMirrors}, warnings: []deprecation.Warning{deprecation.CRIRegistryMirrors},
}, },
"deprecated configs": { "deprecated configs": {
config: &PluginConfig{ runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -171,7 +166,7 @@ func TestValidateConfig(t *testing.T) {
}, },
}, },
}, },
expected: &PluginConfig{ runtimeExpected: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -195,7 +190,7 @@ func TestValidateConfig(t *testing.T) {
warnings: []deprecation.Warning{deprecation.CRIRegistryConfigs}, warnings: []deprecation.Warning{deprecation.CRIRegistryConfigs},
}, },
"privileged_without_host_devices_all_devices_allowed without privileged_without_host_devices": { "privileged_without_host_devices_all_devices_allowed without privileged_without_host_devices": {
config: &PluginConfig{ runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -207,10 +202,10 @@ func TestValidateConfig(t *testing.T) {
}, },
}, },
}, },
expectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled", runtimeExpectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled",
}, },
"invalid drain_exec_sync_io_timeout input": { "invalid drain_exec_sync_io_timeout input": {
config: &PluginConfig{ runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{ ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault, DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{ Runtimes: map[string]Runtime{
@ -221,18 +216,18 @@ func TestValidateConfig(t *testing.T) {
}, },
DrainExecSyncIOTimeout: "10", DrainExecSyncIOTimeout: "10",
}, },
expectedErr: "invalid `drain_exec_sync_io_timeout`", runtimeExpectedErr: "invalid `drain_exec_sync_io_timeout`",
}, },
} { } {
t.Run(desc, func(t *testing.T) { t.Run(desc, func(t *testing.T) {
var warnings []deprecation.Warning var warnings []deprecation.Warning
if test.config != nil { if test.runtimeConfig != nil {
w, err := ValidatePluginConfig(context.Background(), test.config) w, err := ValidateRuntimeConfig(context.Background(), test.runtimeConfig)
if test.expectedErr != "" { if test.runtimeExpectedErr != "" {
assert.Contains(t, err.Error(), test.expectedErr) assert.Contains(t, err.Error(), test.runtimeExpectedErr)
} else { } else {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, test.expected, test.config) assert.Equal(t, test.runtimeExpected, test.runtimeConfig)
} }
warnings = append(warnings, w...) warnings = append(warnings, w...)
} }
@ -246,6 +241,16 @@ func TestValidateConfig(t *testing.T) {
} }
warnings = append(warnings, w...) warnings = append(warnings, w...)
} }
if test.serverConfig != nil {
w, err := ValidateServerConfig(context.Background(), test.serverConfig)
if test.serverExpectedErr != "" {
assert.Contains(t, err.Error(), test.serverExpectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.serverExpected, test.serverConfig)
}
warnings = append(warnings, w...)
}
if len(test.warnings) > 0 { if len(test.warnings) > 0 {
assert.ElementsMatch(t, test.warnings, warnings) assert.ElementsMatch(t, test.warnings, warnings)

View File

@ -21,7 +21,6 @@ package config
import ( import (
"github.com/containerd/containerd/v2/defaults" "github.com/containerd/containerd/v2/defaults"
"github.com/pelletier/go-toml/v2" "github.com/pelletier/go-toml/v2"
"k8s.io/kubelet/pkg/cri/streaming"
) )
func DefaultImageConfig() ImageConfig { func DefaultImageConfig() ImageConfig {
@ -41,8 +40,8 @@ func DefaultImageConfig() ImageConfig {
} }
} }
// DefaultConfig returns default configurations of cri plugin. // DefaultRuntimeConfig returns default configurations of cri plugin.
func DefaultConfig() PluginConfig { func DefaultRuntimeConfig() RuntimeConfig {
defaultRuncV2Opts := ` defaultRuncV2Opts := `
# NoNewKeyring disables new keyring for the container. # NoNewKeyring disables new keyring for the container.
NoNewKeyring = false NoNewKeyring = false
@ -71,7 +70,7 @@ func DefaultConfig() PluginConfig {
var m map[string]interface{} var m map[string]interface{}
toml.Unmarshal([]byte(defaultRuncV2Opts), &m) toml.Unmarshal([]byte(defaultRuncV2Opts), &m)
return PluginConfig{ return RuntimeConfig{
CniConfig: CniConfig{ CniConfig: CniConfig{
NetworkPluginBinDir: "/opt/cni/bin", NetworkPluginBinDir: "/opt/cni/bin",
NetworkPluginConfDir: "/etc/cni/net.d", NetworkPluginConfDir: "/etc/cni/net.d",
@ -89,17 +88,8 @@ func DefaultConfig() PluginConfig {
}, },
}, },
}, },
DisableTCPService: true, EnableSelinux: false,
StreamServerAddress: "127.0.0.1", SelinuxCategoryRange: 1024,
StreamServerPort: "0",
StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour
EnableSelinux: false,
SelinuxCategoryRange: 1024,
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "",
},
MaxContainerLogLineSize: 16 * 1024, MaxContainerLogLineSize: 16 * 1024,
DisableProcMount: false, DisableProcMount: false,
TolerateMissingHugetlbController: true, TolerateMissingHugetlbController: true,

View File

@ -21,7 +21,6 @@ import (
"path/filepath" "path/filepath"
"github.com/containerd/containerd/v2/defaults" "github.com/containerd/containerd/v2/defaults"
"k8s.io/kubelet/pkg/cri/streaming"
) )
func DefaultImageConfig() ImageConfig { func DefaultImageConfig() ImageConfig {
@ -39,9 +38,9 @@ func DefaultImageConfig() ImageConfig {
} }
} }
// DefaultConfig returns default configurations of cri plugin. // DefaultRuntimeConfig returns default configurations of cri plugin.
func DefaultConfig() PluginConfig { func DefaultRuntimeConfig() RuntimeConfig {
return PluginConfig{ return RuntimeConfig{
CniConfig: CniConfig{ CniConfig: CniConfig{
NetworkPluginBinDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "bin"), NetworkPluginBinDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "bin"),
NetworkPluginConfDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "conf"), NetworkPluginConfDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "conf"),
@ -78,15 +77,6 @@ func DefaultConfig() PluginConfig {
}, },
}, },
}, },
DisableTCPService: true,
StreamServerAddress: "127.0.0.1",
StreamServerPort: "0",
StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "",
},
MaxContainerLogLineSize: 16 * 1024, MaxContainerLogLineSize: 16 * 1024,
IgnoreImageDefinedVolumes: false, IgnoreImageDefinedVolumes: false,
// TODO(windows): Add platform specific config, so that most common defaults can be shared. // TODO(windows): Add platform specific config, so that most common defaults can be shared.

163
pkg/cri/config/streaming.go Normal file
View File

@ -0,0 +1,163 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"crypto/tls"
"errors"
"fmt"
"net"
"os"
"time"
k8snet "k8s.io/apimachinery/pkg/util/net"
k8scert "k8s.io/client-go/util/cert"
"k8s.io/kubelet/pkg/cri/streaming"
)
type streamListenerMode int
const (
x509KeyPairTLS streamListenerMode = iota
selfSignTLS
withoutTLS
)
func getStreamListenerMode(config *ServerConfig) (streamListenerMode, error) {
if config.EnableTLSStreaming {
if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile != "" {
return x509KeyPairTLS, nil
}
if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile == "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile")
}
if config.X509KeyPairStreaming.TLSCertFile == "" && config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile")
}
return selfSignTLS, nil
}
if config.X509KeyPairStreaming.TLSCertFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set")
}
if config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set")
}
return withoutTLS, nil
}
func (c *ServerConfig) StreamingConfig() (streaming.Config, error) {
var (
addr = c.StreamServerAddress
port = c.StreamServerPort
streamIdleTimeout = c.StreamIdleTimeout
)
if addr == "" {
a, err := k8snet.ResolveBindAddress(nil)
if err != nil {
return streaming.Config{}, fmt.Errorf("failed to get stream server address: %w", err)
}
addr = a.String()
}
config := streaming.DefaultConfig
if streamIdleTimeout != "" {
var err error
config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout)
if err != nil {
return streaming.Config{}, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
config.Addr = net.JoinHostPort(addr, port)
tlsMode, err := getStreamListenerMode(c)
if err != nil {
return streaming.Config{}, fmt.Errorf("invalid stream server configuration: %w", err)
}
switch tlsMode {
case x509KeyPairTLS:
tlsCert, err := tls.LoadX509KeyPair(c.X509KeyPairStreaming.TLSCertFile, c.X509KeyPairStreaming.TLSKeyFile)
if err != nil {
return streaming.Config{}, fmt.Errorf("failed to load x509 key pair for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
case selfSignTLS:
tlsCert, err := newTLSCert()
if err != nil {
return streaming.Config{}, fmt.Errorf("failed to generate tls certificate for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
case withoutTLS:
default:
return streaming.Config{}, errors.New("invalid configuration for the stream listener")
}
return config, nil
}
// newTLSCert returns a self CA signed tls.certificate.
// TODO (mikebrow): replace / rewrite this function to support using CA
// signing of the certificate. Requires a security plan for kubernetes regarding
// CRI connections / streaming, etc. For example, kubernetes could configure or
// require a CA service and pass a configuration down through CRI.
func newTLSCert() (tls.Certificate, error) {
fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err }
hostName, err := os.Hostname()
if err != nil {
return fail(fmt.Errorf("failed to get hostname: %w", err))
}
addrs, err := net.InterfaceAddrs()
if err != nil {
return fail(fmt.Errorf("failed to get host IP addresses: %w", err))
}
var alternateIPs []net.IP
var alternateDNS []string
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
continue
}
alternateIPs = append(alternateIPs, ip)
alternateDNS = append(alternateDNS, ip.String())
}
// Generate a self signed certificate key (CA is self)
certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS)
if err != nil {
return fail(fmt.Errorf("certificate key could not be created: %w", err))
}
// Load the tls certificate
tlsCert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
return fail(fmt.Errorf("certificate could not be loaded: %w", err))
}
return tlsCert, nil
}

View File

@ -0,0 +1,130 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestValidateStreamServer(t *testing.T) {
for _, test := range []struct {
desc string
config ServerConfig
tlsMode streamListenerMode
expectErr bool
}{
{
desc: "should pass with default withoutTLS",
config: DefaultServerConfig(),
tlsMode: withoutTLS,
expectErr: false,
},
{
desc: "should pass with x509KeyPairTLS",
config: ServerConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
tlsMode: x509KeyPairTLS,
expectErr: false,
},
{
desc: "should pass with selfSign",
config: ServerConfig{
EnableTLSStreaming: true,
},
tlsMode: selfSignTLS,
expectErr: false,
},
{
desc: "should return error with X509 keypair but not EnableTLSStreaming",
config: ServerConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSCertFile empty",
config: ServerConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSKeyFile empty",
config: ServerConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSCertFile set",
config: ServerConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSKeyFile set",
config: ServerConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
tlsMode: -1,
expectErr: true,
},
} {
test := test
t.Run(test.desc, func(t *testing.T) {
tlsMode, err := getStreamListenerMode(&test.config)
if test.expectErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, test.tlsMode, tlsMode)
})
}
}

View File

@ -394,9 +394,9 @@ func (c *criService) runtimeSpec(id string, platform platforms.Platform, baseSpe
container := &containers.Container{ID: id} container := &containers.Container{ID: id}
if baseSpecFile != "" { if baseSpecFile != "" {
baseSpec, ok := c.baseOCISpecs[baseSpecFile] baseSpec, err := c.LoadOCISpec(baseSpecFile)
if !ok { if err != nil {
return nil, fmt.Errorf("can't find base OCI spec %q", baseSpecFile) return nil, fmt.Errorf("can't load base OCI spec %q: %w", baseSpecFile, err)
} }
spec := oci.Spec{} spec := oci.Spec{}

View File

@ -1680,23 +1680,24 @@ func TestPrivilegedDevices(t *testing.T) {
} }
func TestBaseOCISpec(t *testing.T) { func TestBaseOCISpec(t *testing.T) {
c := newTestCRIService()
baseLimit := int64(100) baseLimit := int64(100)
c.baseOCISpecs = map[string]*oci.Spec{ c := newTestCRIService(withRuntimeService(&fakeRuntimeService{
"/etc/containerd/cri-base.json": { ocispecs: map[string]*oci.Spec{
Process: &runtimespec.Process{ "/etc/containerd/cri-base.json": {
User: runtimespec.User{AdditionalGids: []uint32{9999}}, Process: &runtimespec.Process{
Capabilities: &runtimespec.LinuxCapabilities{ User: runtimespec.User{AdditionalGids: []uint32{9999}},
Permitted: []string{"CAP_SETUID"}, Capabilities: &runtimespec.LinuxCapabilities{
Permitted: []string{"CAP_SETUID"},
},
}, },
}, Linux: &runtimespec.Linux{
Linux: &runtimespec.Linux{ Resources: &runtimespec.LinuxResources{
Resources: &runtimespec.LinuxResources{ Memory: &runtimespec.LinuxMemory{Limit: &baseLimit}, // Will be overwritten by `getCreateContainerTestData`
Memory: &runtimespec.LinuxMemory{Limit: &baseLimit}, // Will be overwritten by `getCreateContainerTestData` },
}, },
}, },
}, },
} }))
ociRuntime := config.Runtime{} ociRuntime := config.Runtime{}
ociRuntime.BaseRuntimeSpec = "/etc/containerd/cri-base.json" ociRuntime.BaseRuntimeSpec = "/etc/containerd/cri-base.json"

View File

@ -524,13 +524,14 @@ func TestContainerAnnotationPassthroughContainerSpec(t *testing.T) {
} }
func TestBaseRuntimeSpec(t *testing.T) { func TestBaseRuntimeSpec(t *testing.T) {
c := newTestCRIService() c := newTestCRIService(withRuntimeService(&fakeRuntimeService{
c.baseOCISpecs = map[string]*oci.Spec{ ocispecs: map[string]*oci.Spec{
"/etc/containerd/cri-base.json": { "/etc/containerd/cri-base.json": {
Version: "1.0.2", Version: "1.0.2",
Hostname: "old", Hostname: "old",
},
}, },
} }))
out, err := c.runtimeSpec( out, err := c.runtimeSpec(
"id1", "id1",
@ -546,8 +547,10 @@ func TestBaseRuntimeSpec(t *testing.T) {
assert.Equal(t, "new-domain", out.Domainname) assert.Equal(t, "new-domain", out.Domainname)
// Make sure original base spec not changed // Make sure original base spec not changed
assert.NotEqual(t, out, c.baseOCISpecs["/etc/containerd/cri-base.json"]) spec, err := c.LoadOCISpec("/etc/containerd/cri-base.json")
assert.Equal(t, c.baseOCISpecs["/etc/containerd/cri-base.json"].Hostname, "old") assert.NoError(t, err)
assert.NotEqual(t, out, spec)
assert.Equal(t, spec.Hostname, "old")
assert.Equal(t, filepath.Join("/", constants.K8sContainerdNamespace, "id1"), out.Linux.CgroupsPath) assert.Equal(t, filepath.Join("/", constants.K8sContainerdNamespace, "id1"), out.Linux.CgroupsPath)
} }

View File

@ -239,7 +239,7 @@ func TestUpdateOCILinuxResource(t *testing.T) {
test := test test := test
t.Run(test.desc, func(t *testing.T) { t.Run(test.desc, func(t *testing.T) {
config := criconfig.Config{ config := criconfig.Config{
PluginConfig: criconfig.PluginConfig{ RuntimeConfig: criconfig.RuntimeConfig{
TolerateMissingHugetlbController: true, TolerateMissingHugetlbController: true,
DisableHugetlbController: false, DisableHugetlbController: false,
}, },

View File

@ -31,7 +31,6 @@ import (
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/pkg/cri/constants"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" "github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image"
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
@ -51,8 +50,7 @@ func init() {
plugins.EventPlugin, plugins.EventPlugin,
plugins.LeasePlugin, plugins.LeasePlugin,
plugins.SandboxStorePlugin, plugins.SandboxStorePlugin,
plugins.InternalPlugin, plugins.CRIServicePlugin,
plugins.CRIImagePlugin,
plugins.ServicePlugin, plugins.ServicePlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
@ -66,26 +64,26 @@ func init() {
return nil, fmt.Errorf("unable to init client for podsandbox: %w", err) return nil, fmt.Errorf("unable to init client for podsandbox: %w", err)
} }
// Get base CRI dependencies. // Get runtime service.
criBasePlugin, err := ic.GetByID(plugins.InternalPlugin, "cri") criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime")
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err) return nil, fmt.Errorf("unable to load CRI runtime service plugin dependency: %w", err)
} }
criBase := criBasePlugin.(*base.CRIBase) runtimeService := criRuntimePlugin.(RuntimeService)
// Get image service. // Get image service.
criImagePlugin, err := ic.GetSingle(plugins.CRIImagePlugin) criImagePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "images")
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err) return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err)
} }
c := Controller{ c := Controller{
client: client, client: client,
config: criBase.Config, config: runtimeService.Config(),
os: osinterface.RealOS{}, os: osinterface.RealOS{},
baseOCISpecs: criBase.BaseOCISpecs, runtimeService: runtimeService,
imageService: criImagePlugin.(ImageService), imageService: criImagePlugin.(ImageService),
store: NewStore(), store: NewStore(),
} }
return &c, nil return &c, nil
}, },
@ -99,6 +97,12 @@ type CRIService interface {
BackOffEvent(id string, event interface{}) BackOffEvent(id string, event interface{})
} }
// RuntimeService specifies dependencies to CRI runtime service.
type RuntimeService interface {
Config() criconfig.Config
LoadOCISpec(string) (*oci.Spec, error)
}
// ImageService specifies dependencies to CRI image service. // ImageService specifies dependencies to CRI image service.
type ImageService interface { type ImageService interface {
LocalResolve(refOrID string) (imagestore.Image, error) LocalResolve(refOrID string) (imagestore.Image, error)
@ -113,14 +117,14 @@ type Controller struct {
config criconfig.Config config criconfig.Config
// client is an instance of the containerd client // client is an instance of the containerd client
client *containerd.Client client *containerd.Client
// runtimeService is a dependency to CRI runtime service.
runtimeService RuntimeService
// imageService is a dependency to CRI image service. // imageService is a dependency to CRI image service.
imageService ImageService imageService ImageService
// os is an interface for all required os operations. // os is an interface for all required os operations.
os osinterface.OS os osinterface.OS
// cri is CRI service that provides missing gaps needed by controller. // cri is CRI service that provides missing gaps needed by controller.
cri CRIService cri CRIService
// baseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
baseOCISpecs map[string]*oci.Spec
store *Store store *Store
} }

View File

@ -38,7 +38,7 @@ const (
var testConfig = criconfig.Config{ var testConfig = criconfig.Config{
RootDir: testRootDir, RootDir: testRootDir,
StateDir: testStateDir, StateDir: testStateDir,
PluginConfig: criconfig.PluginConfig{ RuntimeConfig: criconfig.RuntimeConfig{
TolerateMissingHugetlbController: true, TolerateMissingHugetlbController: true,
}, },
} }

View File

@ -159,9 +159,9 @@ func (c *Controller) runtimeSpec(id string, baseSpecFile string, opts ...oci.Spe
container := &containers.Container{ID: id} container := &containers.Container{ID: id}
if baseSpecFile != "" { if baseSpecFile != "" {
baseSpec, ok := c.baseOCISpecs[baseSpecFile] baseSpec, err := c.runtimeService.LoadOCISpec(baseSpecFile)
if !ok { if err != nil {
return nil, fmt.Errorf("can't find base OCI spec %q", baseSpecFile) return nil, fmt.Errorf("can't load base OCI spec %q: %w", baseSpecFile, err)
} }
spec := oci.Spec{} spec := oci.Spec{}

View File

@ -26,8 +26,8 @@ import (
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/containers" "github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types" "github.com/containerd/containerd/v2/pkg/cri/server/podsandbox/types"
critypes "github.com/containerd/containerd/v2/pkg/cri/types"
"github.com/containerd/errdefs" "github.com/containerd/errdefs"
) )
@ -63,7 +63,7 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool)
// toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map.
func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) { func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) {
si := &base.SandboxInfo{ si := &critypes.SandboxInfo{
Pid: sb.Pid, Pid: sb.Pid,
Config: sb.Metadata.Config, Config: sb.Metadata.Config,
RuntimeHandler: sb.Metadata.RuntimeHandler, RuntimeHandler: sb.Metadata.RuntimeHandler,

View File

@ -94,8 +94,8 @@ func TestRuntimeConfig(t *testing.T) {
test := test test := test
t.Run(test.desc, func(t *testing.T) { t.Run(test.desc, func(t *testing.T) {
c := newTestCRIService() c := newTestCRIService()
c.config.PluginConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime c.config.RuntimeConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime
c.config.PluginConfig.ContainerdConfig.Runtimes = test.runtimes c.config.RuntimeConfig.ContainerdConfig.Runtimes = test.runtimes
resp, err := c.RuntimeConfig(context.TODO(), &runtime.RuntimeConfigRequest{}) resp, err := c.RuntimeConfig(context.TODO(), &runtime.RuntimeConfigRequest{})
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -24,8 +24,8 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
"github.com/containerd/containerd/v2/pkg/cri/types"
"github.com/containerd/errdefs" "github.com/containerd/errdefs"
) )
@ -152,7 +152,7 @@ func toCRISandboxStatus(meta sandboxstore.Metadata, status string, createdAt tim
// but if controller.Status() returns a NotFound error, // but if controller.Status() returns a NotFound error,
// we should fallback to get SandboxInfo from cached sandbox itself. // we should fallback to get SandboxInfo from cached sandbox itself.
func toDeletedCRISandboxInfo(sandbox sandboxstore.Sandbox) (map[string]string, error) { func toDeletedCRISandboxInfo(sandbox sandboxstore.Sandbox) (map[string]string, error) {
si := &base.SandboxInfo{ si := &types.SandboxInfo{
Pid: sandbox.Status.Get().Pid, Pid: sandbox.Status.Get().Pid,
Config: sandbox.Config, Config: sandbox.Config,
RuntimeHandler: sandbox.RuntimeHandler, RuntimeHandler: sandbox.RuntimeHandler,

View File

@ -65,6 +65,15 @@ type sandboxService interface {
SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error)
} }
// RuntimeService specifies dependencies to runtime service which provides
// the runtime configuration and OCI spec loading.
type RuntimeService interface {
Config() criconfig.Config
// LoadCISpec loads cached OCI specs via `Runtime.BaseRuntimeSpec`
LoadOCISpec(string) (*oci.Spec, error)
}
// ImageService specifies dependencies to image service. // ImageService specifies dependencies to image service.
type ImageService interface { type ImageService interface {
RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string
@ -84,6 +93,7 @@ type ImageService interface {
// criService implements CRIService. // criService implements CRIService.
type criService struct { type criService struct {
RuntimeService
ImageService ImageService
// config contains all configurations. // config contains all configurations.
config criconfig.Config config criconfig.Config
@ -115,8 +125,6 @@ type criService struct {
// cniNetConfMonitor is used to reload cni network conf if there is // cniNetConfMonitor is used to reload cni network conf if there is
// any valid fs change events from cni network conf dir. // any valid fs change events from cni network conf dir.
cniNetConfMonitor map[string]*cniNetConfSyncer cniNetConfMonitor map[string]*cniNetConfSyncer
// baseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
baseOCISpecs map[string]*oci.Spec
// allCaps is the list of the capabilities. // allCaps is the list of the capabilities.
// When nil, parsed from CapEff of /proc/self/status. // When nil, parsed from CapEff of /proc/self/status.
allCaps []string //nolint:nolintlint,unused // Ignore on non-Linux allCaps []string //nolint:nolintlint,unused // Ignore on non-Linux
@ -130,16 +138,17 @@ type criService struct {
} }
type CRIServiceOptions struct { type CRIServiceOptions struct {
RuntimeService RuntimeService
ImageService ImageService ImageService ImageService
StreamingConfig streaming.Config
NRI *nri.API NRI *nri.API
// SandboxControllers is a map of all the loaded sandbox controllers // SandboxControllers is a map of all the loaded sandbox controllers
SandboxControllers map[string]sandbox.Controller SandboxControllers map[string]sandbox.Controller
// BaseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
BaseOCISpecs map[string]*oci.Spec
// Client is the base containerd client used for accessing services, // Client is the base containerd client used for accessing services,
// //
// TODO: Replace this gradually with directly configured instances // TODO: Replace this gradually with directly configured instances
@ -147,18 +156,18 @@ type CRIServiceOptions struct {
} }
// NewCRIService returns a new instance of CRIService // NewCRIService returns a new instance of CRIService
// TODO: Add criBase.BaseOCISpecs to options func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServiceServer, error) {
func NewCRIService(config criconfig.Config, options *CRIServiceOptions) (CRIService, runtime.RuntimeServiceServer, error) {
var err error var err error
labels := label.NewStore() labels := label.NewStore()
config := options.RuntimeService.Config()
c := &criService{ c := &criService{
RuntimeService: options.RuntimeService,
ImageService: options.ImageService, ImageService: options.ImageService,
config: config, config: config,
client: options.Client, client: options.Client,
imageFSPaths: options.ImageService.ImageFSPaths(), imageFSPaths: options.ImageService.ImageFSPaths(),
os: osinterface.RealOS{}, os: osinterface.RealOS{},
baseOCISpecs: options.BaseOCISpecs,
sandboxStore: sandboxstore.NewStore(labels), sandboxStore: sandboxstore.NewStore(labels),
containerStore: containerstore.NewStore(labels), containerStore: containerstore.NewStore(labels),
sandboxNameIndex: registrar.NewRegistrar(), sandboxNameIndex: registrar.NewRegistrar(),
@ -182,7 +191,7 @@ func NewCRIService(config criconfig.Config, options *CRIServiceOptions) (CRIServ
} }
// prepare streaming server // prepare streaming server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout) c.streamServer, err = streaming.NewServer(options.StreamingConfig, newStreamRuntime(c))
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to create stream server: %w", err) return nil, nil, fmt.Errorf("failed to create stream server: %w", err)
} }

View File

@ -25,10 +25,12 @@ import (
"github.com/containerd/containerd/v2/api/types" "github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/registrar" "github.com/containerd/containerd/v2/internal/registrar"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
containerstore "github.com/containerd/containerd/v2/pkg/cri/store/container" containerstore "github.com/containerd/containerd/v2/pkg/cri/store/container"
"github.com/containerd/containerd/v2/pkg/cri/store/label" "github.com/containerd/containerd/v2/pkg/cri/store/label"
sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox" sandboxstore "github.com/containerd/containerd/v2/pkg/cri/store/sandbox"
servertesting "github.com/containerd/containerd/v2/pkg/cri/testing" servertesting "github.com/containerd/containerd/v2/pkg/cri/testing"
"github.com/containerd/containerd/v2/pkg/oci"
ostesting "github.com/containerd/containerd/v2/pkg/os/testing" ostesting "github.com/containerd/containerd/v2/pkg/os/testing"
"github.com/containerd/errdefs" "github.com/containerd/errdefs"
"github.com/containerd/platforms" "github.com/containerd/platforms"
@ -74,11 +76,34 @@ func (f fakeSandboxController) Metrics(ctx context.Context, sandboxID string) (*
return &types.Metric{}, errdefs.ErrNotImplemented return &types.Metric{}, errdefs.ErrNotImplemented
} }
type fakeRuntimeService struct {
ocispecs map[string]*oci.Spec
}
func (f fakeRuntimeService) Config() criconfig.Config {
return testConfig
}
func (f fakeRuntimeService) LoadOCISpec(filename string) (*oci.Spec, error) {
spec, ok := f.ocispecs[filename]
if !ok {
return nil, errdefs.ErrNotFound
}
return spec, nil
}
type testOpt func(*criService)
func withRuntimeService(rs RuntimeService) testOpt {
return func(service *criService) {
service.RuntimeService = rs
}
}
// newTestCRIService creates a fake criService for test. // newTestCRIService creates a fake criService for test.
func newTestCRIService() *criService { func newTestCRIService(opts ...testOpt) *criService {
labels := label.NewStore() labels := label.NewStore()
return &criService{ service := &criService{
ImageService: &fakeImageService{},
config: testConfig, config: testConfig,
os: ostesting.NewFakeOS(), os: ostesting.NewFakeOS(),
sandboxStore: sandboxstore.NewStore(labels), sandboxStore: sandboxstore.NewStore(labels),
@ -90,4 +115,15 @@ func newTestCRIService() *criService {
}, },
sandboxService: &fakeSandboxService{}, sandboxService: &fakeSandboxService{},
} }
for _, opt := range opts {
opt(service)
}
if service.RuntimeService == nil {
service.RuntimeService = &fakeRuntimeService{}
}
if service.ImageService == nil {
service.ImageService = &fakeImageService{}
}
return service
} }

View File

@ -18,104 +18,18 @@ package server
import ( import (
"context" "context"
"crypto/tls"
"errors"
"fmt" "fmt"
"io" "io"
"math" "math"
"net"
"os"
"time"
k8snet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
k8scert "k8s.io/client-go/util/cert"
"k8s.io/utils/exec" "k8s.io/utils/exec"
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
"k8s.io/kubelet/pkg/cri/streaming" "k8s.io/kubelet/pkg/cri/streaming"
) )
type streamListenerMode int
const (
x509KeyPairTLS streamListenerMode = iota
selfSignTLS
withoutTLS
)
func getStreamListenerMode(c *criService) (streamListenerMode, error) {
if c.config.EnableTLSStreaming {
if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" {
return x509KeyPairTLS, nil
}
if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile == "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile")
}
if c.config.X509KeyPairStreaming.TLSCertFile == "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile")
}
return selfSignTLS, nil
}
if c.config.X509KeyPairStreaming.TLSCertFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set")
}
if c.config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set")
}
return withoutTLS, nil
}
func newStreamServer(c *criService, addr, port, streamIdleTimeout string) (streaming.Server, error) {
if addr == "" {
a, err := k8snet.ResolveBindAddress(nil)
if err != nil {
return nil, fmt.Errorf("failed to get stream server address: %w", err)
}
addr = a.String()
}
config := streaming.DefaultConfig
if streamIdleTimeout != "" {
var err error
config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout)
if err != nil {
return nil, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
config.Addr = net.JoinHostPort(addr, port)
run := newStreamRuntime(c)
tlsMode, err := getStreamListenerMode(c)
if err != nil {
return nil, fmt.Errorf("invalid stream server configuration: %w", err)
}
switch tlsMode {
case x509KeyPairTLS:
tlsCert, err := tls.LoadX509KeyPair(c.config.X509KeyPairStreaming.TLSCertFile, c.config.X509KeyPairStreaming.TLSKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load x509 key pair for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
return streaming.NewServer(config, run)
case selfSignTLS:
tlsCert, err := newTLSCert()
if err != nil {
return nil, fmt.Errorf("failed to generate tls certificate for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
InsecureSkipVerify: true,
}
return streaming.NewServer(config, run)
case withoutTLS:
return streaming.NewServer(config, run)
default:
return nil, errors.New("invalid configuration for the stream listener")
}
}
type streamRuntime struct { type streamRuntime struct {
c *criService c *criService
} }
@ -187,54 +101,3 @@ func handleResizing(ctx context.Context, resize <-chan remotecommand.TerminalSiz
} }
}() }()
} }
// newTLSCert returns a self CA signed tls.certificate.
// TODO (mikebrow): replace / rewrite this function to support using CA
// signing of the certificate. Requires a security plan for kubernetes regarding
// CRI connections / streaming, etc. For example, kubernetes could configure or
// require a CA service and pass a configuration down through CRI.
func newTLSCert() (tls.Certificate, error) {
fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err }
hostName, err := os.Hostname()
if err != nil {
return fail(fmt.Errorf("failed to get hostname: %w", err))
}
addrs, err := net.InterfaceAddrs()
if err != nil {
return fail(fmt.Errorf("failed to get host IP addresses: %w", err))
}
var alternateIPs []net.IP
var alternateDNS []string
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
continue
}
alternateIPs = append(alternateIPs, ip)
alternateDNS = append(alternateDNS, ip.String())
}
// Generate a self signed certificate key (CA is self)
certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS)
if err != nil {
return fail(fmt.Errorf("certificate key could not be created: %w", err))
}
// Load the tls certificate
tlsCert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
return fail(fmt.Errorf("certificate could not be loaded: %w", err))
}
return tlsCert, nil
}

View File

@ -1,163 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
"github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/stretchr/testify/assert"
)
func TestValidateStreamServer(t *testing.T) {
for _, test := range []struct {
desc string
*criService
tlsMode streamListenerMode
expectErr bool
}{
{
desc: "should pass with default withoutTLS",
criService: &criService{
config: config.Config{
PluginConfig: config.DefaultConfig(),
},
},
tlsMode: withoutTLS,
expectErr: false,
},
{
desc: "should pass with x509KeyPairTLS",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: x509KeyPairTLS,
expectErr: false,
},
{
desc: "should pass with selfSign",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
},
},
},
tlsMode: selfSignTLS,
expectErr: false,
},
{
desc: "should return error with X509 keypair but not EnableTLSStreaming",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSCertFile empty",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSKeyFile empty",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSCertFile set",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSKeyFile set",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
} {
test := test
t.Run(test.desc, func(t *testing.T) {
tlsMode, err := getStreamListenerMode(test.criService)
if test.expectErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, test.tlsMode, tlsMode)
})
}
}

View File

@ -26,7 +26,7 @@ const (
var testConfig = criconfig.Config{ var testConfig = criconfig.Config{
RootDir: testRootDir, RootDir: testRootDir,
StateDir: testStateDir, StateDir: testStateDir,
PluginConfig: criconfig.PluginConfig{ RuntimeConfig: criconfig.RuntimeConfig{
TolerateMissingHugetlbController: true, TolerateMissingHugetlbController: true,
ContainerdConfig: criconfig.ContainerdConfig{ ContainerdConfig: criconfig.ContainerdConfig{
DefaultRuntimeName: "runc", DefaultRuntimeName: "runc",

View File

@ -14,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package base package types
import ( import (
"github.com/containerd/go-cni" "github.com/containerd/go-cni"

View File

@ -17,6 +17,7 @@
package cri package cri
import ( import (
"context"
"fmt" "fmt"
"io" "io"
@ -25,15 +26,16 @@ import (
"github.com/containerd/plugin/registry" "github.com/containerd/plugin/registry"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/pkg/cri/constants"
"github.com/containerd/containerd/v2/pkg/cri/instrument" "github.com/containerd/containerd/v2/pkg/cri/instrument"
"github.com/containerd/containerd/v2/pkg/cri/nri" "github.com/containerd/containerd/v2/pkg/cri/nri"
"github.com/containerd/containerd/v2/pkg/cri/server" "github.com/containerd/containerd/v2/pkg/cri/server"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
nriservice "github.com/containerd/containerd/v2/pkg/nri" nriservice "github.com/containerd/containerd/v2/pkg/nri"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/platforms" "github.com/containerd/platforms"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -43,13 +45,12 @@ import (
// Register CRI service plugin // Register CRI service plugin
func init() { func init() {
defaultConfig := criconfig.DefaultServerConfig()
registry.Register(&plugin.Registration{ registry.Register(&plugin.Registration{
Type: plugins.GRPCPlugin, Type: plugins.GRPCPlugin,
ID: "cri", ID: "cri",
Requires: []plugin.Type{ Requires: []plugin.Type{
plugins.CRIImagePlugin, plugins.CRIServicePlugin,
plugins.InternalPlugin,
plugins.SandboxControllerPlugin, plugins.SandboxControllerPlugin,
plugins.NRIApiPlugin, plugins.NRIApiPlugin,
plugins.EventPlugin, plugins.EventPlugin,
@ -57,6 +58,29 @@ func init() {
plugins.LeasePlugin, plugins.LeasePlugin,
plugins.SandboxStorePlugin, plugins.SandboxStorePlugin,
plugins.TransferPlugin, plugins.TransferPlugin,
plugins.WarningPlugin,
},
Config: &defaultConfig,
ConfigMigration: func(ctx context.Context, version int, pluginConfigs map[string]interface{}) error {
if version >= srvconfig.CurrentConfigVersion {
return nil
}
const pluginName = string(plugins.GRPCPlugin) + ".cri"
original, ok := pluginConfigs[pluginName]
if !ok {
return nil
}
src := original.(map[string]interface{})
// Currently only a single key migrated
if val, ok := src["disable_tcp_service"]; ok {
pluginConfigs[pluginName] = map[string]interface{}{
"disable_tcp_service": val,
}
} else {
delete(pluginConfigs, pluginName)
}
return nil
}, },
InitFn: initCRIService, InitFn: initCRIService,
}) })
@ -64,21 +88,33 @@ func init() {
func initCRIService(ic *plugin.InitContext) (interface{}, error) { func initCRIService(ic *plugin.InitContext) (interface{}, error) {
ctx := ic.Context ctx := ic.Context
config := ic.Config.(*criconfig.ServerConfig)
// Get base CRI dependencies. // Get runtime service.
criBasePlugin, err := ic.GetByID(plugins.InternalPlugin, "cri") criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime")
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err) return nil, fmt.Errorf("unable to load CRI runtime service plugin dependency: %w", err)
} }
criBase := criBasePlugin.(*base.CRIBase)
c := criBase.Config
// Get image service. // Get image service.
criImagePlugin, err := ic.GetSingle(plugins.CRIImagePlugin) criImagePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "images")
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err) return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err)
} }
if warnings, err := criconfig.ValidateServerConfig(ic.Context, config); err != nil {
return nil, fmt.Errorf("invalid cri image config: %w", err)
} else if len(warnings) > 0 {
ws, err := ic.GetSingle(plugins.WarningPlugin)
if err != nil {
return nil, err
}
warn := ws.(warning.Service)
for _, w := range warnings {
warn.Emit(ic.Context, w)
}
}
log.G(ctx).Info("Connect containerd service") log.G(ctx).Info("Connect containerd service")
client, err := containerd.New( client, err := containerd.New(
"", "",
@ -97,16 +133,22 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)), string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)),
} }
streamingConfig, err := config.StreamingConfig()
if err != nil {
return nil, fmt.Errorf("failed to get streaming config: %w", err)
}
options := &server.CRIServiceOptions{ options := &server.CRIServiceOptions{
RuntimeService: criRuntimePlugin.(server.RuntimeService),
ImageService: criImagePlugin.(server.ImageService), ImageService: criImagePlugin.(server.ImageService),
StreamingConfig: streamingConfig,
NRI: getNRIAPI(ic), NRI: getNRIAPI(ic),
Client: client, Client: client,
SandboxControllers: sbControllers, SandboxControllers: sbControllers,
BaseOCISpecs: criBase.BaseOCISpecs,
} }
is := criImagePlugin.(imageService).GRPCService() is := criImagePlugin.(imageService).GRPCService()
s, rs, err := server.NewCRIService(criBase.Config, options) s, rs, err := server.NewCRIService(options)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create CRI service: %w", err) return nil, fmt.Errorf("failed to create CRI service: %w", err)
} }
@ -127,7 +169,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
initializer: s, initializer: s,
} }
if c.DisableTCPService { if config.DisableTCPService {
return service, nil return service, nil
} }

View File

@ -30,6 +30,7 @@ import (
"github.com/containerd/containerd/v2/pkg/cri/server/images" "github.com/containerd/containerd/v2/pkg/cri/server/images"
"github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/log" "github.com/containerd/log"
"github.com/containerd/platforms" "github.com/containerd/platforms"
"github.com/containerd/plugin" "github.com/containerd/plugin"
@ -40,17 +41,17 @@ func init() {
config := criconfig.DefaultImageConfig() config := criconfig.DefaultImageConfig()
registry.Register(&plugin.Registration{ registry.Register(&plugin.Registration{
Type: plugins.CRIImagePlugin, Type: plugins.CRIServicePlugin,
ID: "local", ID: "images",
Config: &config, Config: &config,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugins.LeasePlugin, plugins.LeasePlugin,
plugins.EventPlugin, plugins.EventPlugin,
plugins.MetadataPlugin, plugins.MetadataPlugin,
plugins.SandboxStorePlugin, plugins.SandboxStorePlugin,
plugins.InternalPlugin, // For config migration ordering
plugins.ServicePlugin, // For client plugins.ServicePlugin, // For client
plugins.SnapshotPlugin, // For root directory properties plugins.SnapshotPlugin, // For root directory properties
plugins.WarningPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.GetSingle(plugins.MetadataPlugin) m, err := ic.GetSingle(plugins.MetadataPlugin)
@ -64,6 +65,19 @@ func init() {
return nil, err return nil, err
} }
if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil {
return nil, fmt.Errorf("invalid cri image config: %w", err)
} else if len(warnings) > 0 {
ws, err := ic.GetSingle(plugins.WarningPlugin)
if err != nil {
return nil, err
}
warn := ws.(warning.Service)
for _, w := range warnings {
warn.Emit(ic.Context, w)
}
}
options := &images.CRIImageServiceOptions{ options := &images.CRIImageServiceOptions{
Content: mdb.ContentStore(), Content: mdb.ContentStore(),
Images: metadata.NewImageStore(mdb), Images: metadata.NewImageStore(mdb),
@ -152,12 +166,12 @@ func configMigration(ctx context.Context, version int, pluginConfigs map[string]
if version >= srvconfig.CurrentConfigVersion { if version >= srvconfig.CurrentConfigVersion {
return nil return nil
} }
original, ok := pluginConfigs[string(plugins.InternalPlugin)+".cri"] original, ok := pluginConfigs[string(plugins.GRPCPlugin)+".cri"]
if !ok { if !ok {
return nil return nil
} }
src := original.(map[string]interface{}) src := original.(map[string]interface{})
updated, ok := pluginConfigs[string(plugins.CRIImagePlugin)+".local"] updated, ok := pluginConfigs[string(plugins.CRIServicePlugin)+".images"]
var dst map[string]interface{} var dst map[string]interface{}
if ok { if ok {
dst = updated.(map[string]interface{}) dst = updated.(map[string]interface{})
@ -166,7 +180,7 @@ func configMigration(ctx context.Context, version int, pluginConfigs map[string]
} }
migrateConfig(dst, src) migrateConfig(dst, src)
pluginConfigs[string(plugins.CRIImagePlugin)+".local"] = dst pluginConfigs[string(plugins.CRIServicePlugin)+".images"] = dst
return nil return nil
} }
func migrateConfig(dst, src map[string]interface{}) { func migrateConfig(dst, src map[string]interface{}) {

View File

@ -14,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package base package runtime
import ( import (
"encoding/json" "encoding/json"

View File

@ -14,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package base package runtime
import ( import (
"context" "context"
@ -36,52 +36,44 @@ import (
"github.com/containerd/containerd/v2/pkg/oci" "github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/errdefs"
"github.com/containerd/platforms" "github.com/containerd/platforms"
) )
// CRIBase contains common dependencies for CRI's runtime, image, and podsandbox services.
type CRIBase struct {
// Config contains all configurations.
Config criconfig.Config
// BaseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
BaseOCISpecs map[string]*oci.Spec
}
func init() { func init() {
config := criconfig.DefaultConfig() config := criconfig.DefaultRuntimeConfig()
// Base plugin that other CRI services depend on. // Base plugin that other CRI services depend on.
registry.Register(&plugin.Registration{ registry.Register(&plugin.Registration{
Type: plugins.InternalPlugin, Type: plugins.CRIServicePlugin,
ID: "cri", ID: "runtime",
Config: &config, Config: &config,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugins.WarningPlugin, plugins.WarningPlugin,
}, },
ConfigMigration: func(ctx context.Context, version int, plugins map[string]interface{}) error { ConfigMigration: func(ctx context.Context, version int, pluginConfigs map[string]interface{}) error {
if version >= srvconfig.CurrentConfigVersion { if version >= srvconfig.CurrentConfigVersion {
return nil return nil
} }
c, ok := plugins["io.containerd.grpc.v1.cri"] c, ok := pluginConfigs[string(plugins.GRPCPlugin)+".cri"]
if !ok { if !ok {
return nil return nil
} }
conf := c.(map[string]interface{}) conf := c.(map[string]interface{})
migrateConfig(conf) migrateConfig(conf)
plugins["io.containerd.internal.v1.cri"] = conf pluginConfigs[string(plugins.CRIServicePlugin)+".runtime"] = conf
delete(plugins, "io.containerd.grpc.v1.cri")
return nil return nil
}, },
InitFn: initCRIBase, InitFn: initCRIRuntime,
}) })
} }
func initCRIBase(ic *plugin.InitContext) (interface{}, error) { func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
ctx := ic.Context ctx := ic.Context
pluginConfig := ic.Config.(*criconfig.PluginConfig) pluginConfig := ic.Config.(*criconfig.RuntimeConfig)
if warnings, err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil { if warnings, err := criconfig.ValidateRuntimeConfig(ctx, pluginConfig); err != nil {
return nil, fmt.Errorf("invalid plugin config: %w", err) return nil, fmt.Errorf("invalid plugin config: %w", err)
} else if len(warnings) > 0 { } else if len(warnings) > 0 {
ws, err := ic.GetSingle(plugins.WarningPlugin) ws, err := ic.GetSingle(plugins.WarningPlugin)
@ -100,7 +92,7 @@ func initCRIBase(ic *plugin.InitContext) (interface{}, error) {
containerdStateDir := filepath.Dir(ic.Properties[plugins.PropertyStateDir]) containerdStateDir := filepath.Dir(ic.Properties[plugins.PropertyStateDir])
stateDir := filepath.Join(containerdStateDir, "io.containerd.grpc.v1.cri") stateDir := filepath.Join(containerdStateDir, "io.containerd.grpc.v1.cri")
c := criconfig.Config{ c := criconfig.Config{
PluginConfig: *pluginConfig, RuntimeConfig: *pluginConfig,
ContainerdRootDir: containerdRootDir, ContainerdRootDir: containerdRootDir,
ContainerdEndpoint: ic.Properties[plugins.PropertyGRPCAddress], ContainerdEndpoint: ic.Properties[plugins.PropertyGRPCAddress],
RootDir: rootDir, RootDir: rootDir,
@ -118,12 +110,33 @@ func initCRIBase(ic *plugin.InitContext) (interface{}, error) {
return nil, fmt.Errorf("failed to create load basic oci spec: %w", err) return nil, fmt.Errorf("failed to create load basic oci spec: %w", err)
} }
return &CRIBase{ return &runtime{
Config: c, config: c,
BaseOCISpecs: ociSpec, baseOCISpecs: ociSpec,
}, nil }, nil
} }
// runtime contains common dependencies for CRI's runtime, image, and podsandbox services.
type runtime struct {
// Config contains all configurations.
config criconfig.Config
// BaseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
baseOCISpecs map[string]*oci.Spec
}
func (r *runtime) Config() criconfig.Config {
return r.config
}
func (r *runtime) LoadOCISpec(filename string) (*oci.Spec, error) {
spec, ok := r.baseOCISpecs[filename]
if !ok {
// TODO: Load here or only allow preloading...
return nil, errdefs.ErrNotFound
}
return spec, nil
}
func loadBaseOCISpecs(config *criconfig.Config) (map[string]*oci.Spec, error) { func loadBaseOCISpecs(config *criconfig.Config) (map[string]*oci.Spec, error) {
specs := map[string]*oci.Spec{} specs := map[string]*oci.Spec{}
for _, cfg := range config.Runtimes { for _, cfg := range config.Runtimes {

View File

@ -67,8 +67,8 @@ const (
ImageVerifierPlugin plugin.Type = "io.containerd.image-verifier.v1" ImageVerifierPlugin plugin.Type = "io.containerd.image-verifier.v1"
// WarningPlugin implements a warning service // WarningPlugin implements a warning service
WarningPlugin plugin.Type = "io.containerd.warning.v1" WarningPlugin plugin.Type = "io.containerd.warning.v1"
// CRIImagePlugin implements a cri image service // CRIServicePlugin implements a cri service
CRIImagePlugin plugin.Type = "io.containerd.cri.image.v1" CRIServicePlugin plugin.Type = "io.containerd.cri.v1"
) )
const ( const (