diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 15617b1f0..6b0a0be93 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/log" "github.com/pelletier/go-toml/v2" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/kubelet/pkg/cri/streaming" runhcsoptions "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options" runcoptions "github.com/containerd/containerd/v2/core/runtime/v2/runc/options" @@ -312,31 +313,18 @@ type ImageConfig struct { StatsCollectPeriod int `toml:"stats_collect_period" json:"statsCollectPeriod"` } -// PluginConfig contains toml config related to CRI plugin, +// RuntimeConfig contains toml config related to CRI plugin, // it is a subset of Config. -type PluginConfig struct { +type RuntimeConfig struct { // ContainerdConfig contains config related to containerd ContainerdConfig `toml:"containerd" json:"containerd"` // CniConfig contains config related to cni CniConfig `toml:"cni" json:"cni"` - // StreamServerAddress is the ip address streaming server is listening on. - StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"` - // StreamServerPort is the port streaming server is listening on. - StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"` - // StreamIdleTimeout is the maximum time a streaming connection - // can be idle before the connection is automatically closed. - // The string is in the golang duration format, see: - // https://golang.org/pkg/time/#ParseDuration - StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"` // EnableSelinux indicates to enable the selinux support. EnableSelinux bool `toml:"enable_selinux" json:"enableSelinux"` // SelinuxCategoryRange allows the upper bound on the category range to be set. // If not specified or set to 0, defaults to 1024 from the selinux package. SelinuxCategoryRange int `toml:"selinux_category_range" json:"selinuxCategoryRange"` - // EnableTLSStreaming indicates to enable the TLS streaming support. - EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"` - // X509KeyPairStreaming is a x509 key pair used for TLS streaming - X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"` // MaxContainerLogLineSize is the maximum log line size in bytes for a container. // Log line longer than the limit will be split into multiple lines. Non-positive // value means no limit. @@ -416,10 +404,10 @@ type X509KeyPairStreaming struct { TLSKeyFile string `toml:"tls_key_file" json:"tlsKeyFile"` } -// Config contains all configurations for cri server. +// Config contains all configurations for CRI runtime plugin. type Config struct { - // PluginConfig is the config for CRI plugin. - PluginConfig + // RuntimeConfig is the config for CRI runtime. + RuntimeConfig // ContainerdRootDir is the root directory path for containerd. ContainerdRootDir string `json:"containerdRootDir"` // ContainerdEndpoint is the containerd endpoint path. @@ -431,10 +419,23 @@ type Config struct { StateDir string `json:"stateDir"` } -// ServiceConfig contains all the configuration for the CRI API server. -type ServiceConfig struct { +// ServerConfig contains all the configuration for the CRI API server. +type ServerConfig struct { // DisableTCPService disables serving CRI on the TCP server. DisableTCPService bool `toml:"disable_tcp_service" json:"disableTCPService"` + // StreamServerAddress is the ip address streaming server is listening on. + StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"` + // StreamServerPort is the port streaming server is listening on. + StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"` + // StreamIdleTimeout is the maximum time a streaming connection + // can be idle before the connection is automatically closed. + // The string is in the golang duration format, see: + // https://golang.org/pkg/time/#ParseDuration + StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"` + // EnableTLSStreaming indicates to enable the TLS streaming support. + EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"` + // X509KeyPairStreaming is a x509 key pair used for TLS streaming + X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"` } const ( @@ -498,8 +499,8 @@ func ValidateImageConfig(ctx context.Context, c *ImageConfig) ([]deprecation.War return warnings, nil } -// ValidatePluginConfig validates the given plugin configuration. -func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.Warning, error) { +// ValidateRuntimeConfig validates the given runtime configuration. +func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation.Warning, error) { var warnings []deprecation.Warning if c.ContainerdConfig.Runtimes == nil { c.ContainerdConfig.Runtimes = make(map[string]Runtime) @@ -524,13 +525,6 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W } } - // Validation for stream_idle_timeout - if c.StreamIdleTimeout != "" { - if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil { - return warnings, fmt.Errorf("invalid stream idle timeout: %w", err) - } - } - // Validation for drain_exec_sync_io_timeout if c.DrainExecSyncIOTimeout != "" { if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil { @@ -543,6 +537,18 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W return warnings, nil } +// ValidateServerConfig validates the given server configuration. +func ValidateServerConfig(ctx context.Context, c *ServerConfig) ([]deprecation.Warning, error) { + var warnings []deprecation.Warning + // Validation for stream_idle_timeout + if c.StreamIdleTimeout != "" { + if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil { + return warnings, fmt.Errorf("invalid stream idle timeout: %w", err) + } + } + return warnings, nil +} + func (config *Config) GetSandboxRuntime(podSandboxConfig *runtime.PodSandboxConfig, runtimeHandler string) (Runtime, error) { if untrustedWorkload(podSandboxConfig) { // If the untrusted annotation is provided, runtimeHandler MUST be empty. @@ -631,3 +637,17 @@ func getRuntimeOptionsType(t string) interface{} { return &runtimeoptions.Options{} } } + +func DefaultServerConfig() ServerConfig { + return ServerConfig{ + DisableTCPService: true, + StreamServerAddress: "127.0.0.1", + StreamServerPort: "0", + StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "", + TLSCertFile: "", + }, + } +} diff --git a/pkg/cri/config/config_kernel_linux.go b/pkg/cri/config/config_kernel_linux.go index 9da860750..85c564ae3 100644 --- a/pkg/cri/config/config_kernel_linux.go +++ b/pkg/cri/config/config_kernel_linux.go @@ -28,7 +28,7 @@ import ( var kernelGreaterEqualThan = kernel.GreaterEqualThan -func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error { +func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error { if c.EnableUnprivilegedICMP || c.EnableUnprivilegedPorts { fourDotEleven := kernel.KernelVersion{Kernel: 4, Major: 11} ok, err := kernelGreaterEqualThan(fourDotEleven) diff --git a/pkg/cri/config/config_kernel_linux_test.go b/pkg/cri/config/config_kernel_linux_test.go index 703178282..0afc57420 100644 --- a/pkg/cri/config/config_kernel_linux_test.go +++ b/pkg/cri/config/config_kernel_linux_test.go @@ -32,13 +32,13 @@ func TestValidateEnableUnprivileged(t *testing.T) { tests := []struct { name string - config *PluginConfig + config *RuntimeConfig kernelGreater bool expectedErr string }{ { name: "disable unprivileged_icmp and unprivileged_port", - config: &PluginConfig{ + config: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -54,7 +54,7 @@ func TestValidateEnableUnprivileged(t *testing.T) { }, { name: "enable unprivileged_icmp or unprivileged_port, but kernel version is smaller than 4.11", - config: &PluginConfig{ + config: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -71,7 +71,7 @@ func TestValidateEnableUnprivileged(t *testing.T) { }, { name: "enable unprivileged_icmp or unprivileged_port, but kernel version is greater than or equal 4.11", - config: &PluginConfig{ + config: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ diff --git a/pkg/cri/config/config_kernel_other.go b/pkg/cri/config/config_kernel_other.go index b4012e163..433c1682f 100644 --- a/pkg/cri/config/config_kernel_other.go +++ b/pkg/cri/config/config_kernel_other.go @@ -22,6 +22,6 @@ import ( "context" ) -func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error { +func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error { return nil } diff --git a/pkg/cri/config/config_test.go b/pkg/cri/config/config_test.go index a52b87df5..f7a5f30df 100644 --- a/pkg/cri/config/config_test.go +++ b/pkg/cri/config/config_test.go @@ -28,29 +28,32 @@ import ( func TestValidateConfig(t *testing.T) { for desc, test := range map[string]struct { - config *PluginConfig - expectedErr string - expected *PluginConfig - imageConfig *ImageConfig - imageExpectedErr string - imageExpected *ImageConfig - warnings []deprecation.Warning + runtimeConfig *RuntimeConfig + runtimeExpectedErr string + runtimeExpected *RuntimeConfig + imageConfig *ImageConfig + imageExpectedErr string + imageExpected *ImageConfig + serverConfig *ServerConfig + serverExpectedErr string + serverExpected *ServerConfig + warnings []deprecation.Warning }{ "no default_runtime_name": { - config: &PluginConfig{}, - expectedErr: "`default_runtime_name` is empty", + runtimeConfig: &RuntimeConfig{}, + runtimeExpectedErr: "`default_runtime_name` is empty", }, "no runtime[default_runtime_name]": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, }, }, - expectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"", + runtimeExpectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"", }, "deprecated auths": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -58,7 +61,7 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expected: &PluginConfig{ + runtimeExpected: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -92,18 +95,10 @@ func TestValidateConfig(t *testing.T) { warnings: []deprecation.Warning{deprecation.CRIRegistryAuths}, }, "invalid stream_idle_timeout": { - config: &PluginConfig{ + serverConfig: &ServerConfig{ StreamIdleTimeout: "invalid", - ContainerdConfig: ContainerdConfig{ - DefaultRuntimeName: RuntimeDefault, - Runtimes: map[string]Runtime{ - RuntimeDefault: { - Type: "default", - }, - }, - }, }, - expectedErr: "invalid stream idle timeout", + serverExpectedErr: "invalid stream idle timeout", }, "conflicting mirror registry config": { imageConfig: &ImageConfig{ @@ -117,7 +112,7 @@ func TestValidateConfig(t *testing.T) { imageExpectedErr: "`mirrors` cannot be set when `config_path` is provided", }, "deprecated mirrors": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -132,7 +127,7 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expected: &PluginConfig{ + runtimeExpected: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -152,7 +147,7 @@ func TestValidateConfig(t *testing.T) { warnings: []deprecation.Warning{deprecation.CRIRegistryMirrors}, }, "deprecated configs": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -171,7 +166,7 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expected: &PluginConfig{ + runtimeExpected: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -195,7 +190,7 @@ func TestValidateConfig(t *testing.T) { warnings: []deprecation.Warning{deprecation.CRIRegistryConfigs}, }, "privileged_without_host_devices_all_devices_allowed without privileged_without_host_devices": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -207,10 +202,10 @@ func TestValidateConfig(t *testing.T) { }, }, }, - expectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled", + runtimeExpectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled", }, "invalid drain_exec_sync_io_timeout input": { - config: &PluginConfig{ + runtimeConfig: &RuntimeConfig{ ContainerdConfig: ContainerdConfig{ DefaultRuntimeName: RuntimeDefault, Runtimes: map[string]Runtime{ @@ -221,18 +216,18 @@ func TestValidateConfig(t *testing.T) { }, DrainExecSyncIOTimeout: "10", }, - expectedErr: "invalid `drain_exec_sync_io_timeout`", + runtimeExpectedErr: "invalid `drain_exec_sync_io_timeout`", }, } { t.Run(desc, func(t *testing.T) { var warnings []deprecation.Warning - if test.config != nil { - w, err := ValidatePluginConfig(context.Background(), test.config) - if test.expectedErr != "" { - assert.Contains(t, err.Error(), test.expectedErr) + if test.runtimeConfig != nil { + w, err := ValidateRuntimeConfig(context.Background(), test.runtimeConfig) + if test.runtimeExpectedErr != "" { + assert.Contains(t, err.Error(), test.runtimeExpectedErr) } else { assert.NoError(t, err) - assert.Equal(t, test.expected, test.config) + assert.Equal(t, test.runtimeExpected, test.runtimeConfig) } warnings = append(warnings, w...) } @@ -246,6 +241,16 @@ func TestValidateConfig(t *testing.T) { } warnings = append(warnings, w...) } + if test.serverConfig != nil { + w, err := ValidateServerConfig(context.Background(), test.serverConfig) + if test.serverExpectedErr != "" { + assert.Contains(t, err.Error(), test.serverExpectedErr) + } else { + assert.NoError(t, err) + assert.Equal(t, test.serverExpected, test.serverConfig) + } + warnings = append(warnings, w...) + } if len(test.warnings) > 0 { assert.ElementsMatch(t, test.warnings, warnings) diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index c04651ecc..d31b090a1 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -21,7 +21,6 @@ package config import ( "github.com/containerd/containerd/v2/defaults" "github.com/pelletier/go-toml/v2" - "k8s.io/kubelet/pkg/cri/streaming" ) func DefaultImageConfig() ImageConfig { @@ -41,8 +40,8 @@ func DefaultImageConfig() ImageConfig { } } -// DefaultConfig returns default configurations of cri plugin. -func DefaultConfig() PluginConfig { +// DefaultRuntimeConfig returns default configurations of cri plugin. +func DefaultRuntimeConfig() RuntimeConfig { defaultRuncV2Opts := ` # NoNewKeyring disables new keyring for the container. NoNewKeyring = false @@ -71,7 +70,7 @@ func DefaultConfig() PluginConfig { var m map[string]interface{} toml.Unmarshal([]byte(defaultRuncV2Opts), &m) - return PluginConfig{ + return RuntimeConfig{ CniConfig: CniConfig{ NetworkPluginBinDir: "/opt/cni/bin", NetworkPluginConfDir: "/etc/cni/net.d", @@ -89,16 +88,8 @@ func DefaultConfig() PluginConfig { }, }, }, - StreamServerAddress: "127.0.0.1", - StreamServerPort: "0", - StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour - EnableSelinux: false, - SelinuxCategoryRange: 1024, - EnableTLSStreaming: false, - X509KeyPairStreaming: X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "", - }, + EnableSelinux: false, + SelinuxCategoryRange: 1024, MaxContainerLogLineSize: 16 * 1024, DisableProcMount: false, TolerateMissingHugetlbController: true, diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index db499a074..a1d4b072c 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -21,7 +21,6 @@ import ( "path/filepath" "github.com/containerd/containerd/v2/defaults" - "k8s.io/kubelet/pkg/cri/streaming" ) func DefaultImageConfig() ImageConfig { @@ -39,9 +38,9 @@ func DefaultImageConfig() ImageConfig { } } -// DefaultConfig returns default configurations of cri plugin. -func DefaultConfig() PluginConfig { - return PluginConfig{ +// DefaultRuntimeConfig returns default configurations of cri plugin. +func DefaultRuntimeConfig() RuntimeConfig { + return RuntimeConfig{ CniConfig: CniConfig{ NetworkPluginBinDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "bin"), NetworkPluginConfDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "conf"), @@ -78,14 +77,6 @@ func DefaultConfig() PluginConfig { }, }, }, - StreamServerAddress: "127.0.0.1", - StreamServerPort: "0", - StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour - EnableTLSStreaming: false, - X509KeyPairStreaming: X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "", - }, MaxContainerLogLineSize: 16 * 1024, IgnoreImageDefinedVolumes: false, // TODO(windows): Add platform specific config, so that most common defaults can be shared. diff --git a/pkg/cri/config/streaming.go b/pkg/cri/config/streaming.go new file mode 100644 index 000000000..b02e0bd58 --- /dev/null +++ b/pkg/cri/config/streaming.go @@ -0,0 +1,163 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package config + +import ( + "crypto/tls" + "errors" + "fmt" + "net" + "os" + "time" + + k8snet "k8s.io/apimachinery/pkg/util/net" + k8scert "k8s.io/client-go/util/cert" + + "k8s.io/kubelet/pkg/cri/streaming" +) + +type streamListenerMode int + +const ( + x509KeyPairTLS streamListenerMode = iota + selfSignTLS + withoutTLS +) + +func getStreamListenerMode(config *ServerConfig) (streamListenerMode, error) { + if config.EnableTLSStreaming { + if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile != "" { + return x509KeyPairTLS, nil + } + if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile == "" { + return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile") + } + if config.X509KeyPairStreaming.TLSCertFile == "" && config.X509KeyPairStreaming.TLSKeyFile != "" { + return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile") + } + return selfSignTLS, nil + } + if config.X509KeyPairStreaming.TLSCertFile != "" { + return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set") + } + if config.X509KeyPairStreaming.TLSKeyFile != "" { + return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set") + } + return withoutTLS, nil +} + +func (c *ServerConfig) StreamingConfig() (streaming.Config, error) { + var ( + addr = c.StreamServerAddress + port = c.StreamServerPort + streamIdleTimeout = c.StreamIdleTimeout + ) + if addr == "" { + a, err := k8snet.ResolveBindAddress(nil) + if err != nil { + return streaming.Config{}, fmt.Errorf("failed to get stream server address: %w", err) + } + addr = a.String() + } + config := streaming.DefaultConfig + if streamIdleTimeout != "" { + var err error + config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout) + if err != nil { + return streaming.Config{}, fmt.Errorf("invalid stream idle timeout: %w", err) + } + } + config.Addr = net.JoinHostPort(addr, port) + + tlsMode, err := getStreamListenerMode(c) + if err != nil { + return streaming.Config{}, fmt.Errorf("invalid stream server configuration: %w", err) + } + switch tlsMode { + case x509KeyPairTLS: + tlsCert, err := tls.LoadX509KeyPair(c.X509KeyPairStreaming.TLSCertFile, c.X509KeyPairStreaming.TLSKeyFile) + if err != nil { + return streaming.Config{}, fmt.Errorf("failed to load x509 key pair for stream server: %w", err) + } + config.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + } + case selfSignTLS: + tlsCert, err := newTLSCert() + if err != nil { + return streaming.Config{}, fmt.Errorf("failed to generate tls certificate for stream server: %w", err) + } + config.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + } + case withoutTLS: + default: + return streaming.Config{}, errors.New("invalid configuration for the stream listener") + } + return config, nil +} + +// newTLSCert returns a self CA signed tls.certificate. +// TODO (mikebrow): replace / rewrite this function to support using CA +// signing of the certificate. Requires a security plan for kubernetes regarding +// CRI connections / streaming, etc. For example, kubernetes could configure or +// require a CA service and pass a configuration down through CRI. +func newTLSCert() (tls.Certificate, error) { + fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err } + + hostName, err := os.Hostname() + if err != nil { + return fail(fmt.Errorf("failed to get hostname: %w", err)) + } + + addrs, err := net.InterfaceAddrs() + if err != nil { + return fail(fmt.Errorf("failed to get host IP addresses: %w", err)) + } + + var alternateIPs []net.IP + var alternateDNS []string + for _, addr := range addrs { + var ip net.IP + + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + default: + continue + } + + alternateIPs = append(alternateIPs, ip) + alternateDNS = append(alternateDNS, ip.String()) + } + + // Generate a self signed certificate key (CA is self) + certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS) + if err != nil { + return fail(fmt.Errorf("certificate key could not be created: %w", err)) + } + + // Load the tls certificate + tlsCert, err := tls.X509KeyPair(certPem, keyPem) + if err != nil { + return fail(fmt.Errorf("certificate could not be loaded: %w", err)) + } + + return tlsCert, nil +} diff --git a/pkg/cri/config/streaming_test.go b/pkg/cri/config/streaming_test.go new file mode 100644 index 000000000..cb86cc6f6 --- /dev/null +++ b/pkg/cri/config/streaming_test.go @@ -0,0 +1,130 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidateStreamServer(t *testing.T) { + for _, test := range []struct { + desc string + config ServerConfig + tlsMode streamListenerMode + expectErr bool + }{ + { + desc: "should pass with default withoutTLS", + config: DefaultServerConfig(), + tlsMode: withoutTLS, + expectErr: false, + }, + { + desc: "should pass with x509KeyPairTLS", + config: ServerConfig{ + EnableTLSStreaming: true, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "non-empty", + }, + }, + tlsMode: x509KeyPairTLS, + expectErr: false, + }, + { + desc: "should pass with selfSign", + config: ServerConfig{ + EnableTLSStreaming: true, + }, + tlsMode: selfSignTLS, + expectErr: false, + }, + { + desc: "should return error with X509 keypair but not EnableTLSStreaming", + config: ServerConfig{ + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "non-empty", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error with X509 TLSCertFile empty", + config: ServerConfig{ + EnableTLSStreaming: true, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error with X509 TLSKeyFile empty", + config: ServerConfig{ + EnableTLSStreaming: true, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "", + TLSCertFile: "non-empty", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error without EnableTLSStreaming and only TLSCertFile set", + config: ServerConfig{ + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "", + TLSCertFile: "non-empty", + }, + }, + tlsMode: -1, + expectErr: true, + }, + { + desc: "should return error without EnableTLSStreaming and only TLSKeyFile set", + config: ServerConfig{ + EnableTLSStreaming: false, + X509KeyPairStreaming: X509KeyPairStreaming{ + TLSKeyFile: "non-empty", + TLSCertFile: "", + }, + }, + tlsMode: -1, + expectErr: true, + }, + } { + test := test + t.Run(test.desc, func(t *testing.T) { + tlsMode, err := getStreamListenerMode(&test.config) + if test.expectErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, test.tlsMode, tlsMode) + }) + } +} diff --git a/pkg/cri/server/container_update_resources_linux_test.go b/pkg/cri/server/container_update_resources_linux_test.go index e81ef70be..ea9b36cff 100644 --- a/pkg/cri/server/container_update_resources_linux_test.go +++ b/pkg/cri/server/container_update_resources_linux_test.go @@ -239,7 +239,7 @@ func TestUpdateOCILinuxResource(t *testing.T) { test := test t.Run(test.desc, func(t *testing.T) { config := criconfig.Config{ - PluginConfig: criconfig.PluginConfig{ + RuntimeConfig: criconfig.RuntimeConfig{ TolerateMissingHugetlbController: true, DisableHugetlbController: false, }, diff --git a/pkg/cri/server/podsandbox/controller_test.go b/pkg/cri/server/podsandbox/controller_test.go index e71edf580..3a1580a10 100644 --- a/pkg/cri/server/podsandbox/controller_test.go +++ b/pkg/cri/server/podsandbox/controller_test.go @@ -38,7 +38,7 @@ const ( var testConfig = criconfig.Config{ RootDir: testRootDir, StateDir: testStateDir, - PluginConfig: criconfig.PluginConfig{ + RuntimeConfig: criconfig.RuntimeConfig{ TolerateMissingHugetlbController: true, }, } diff --git a/pkg/cri/server/runtime_config_linux_test.go b/pkg/cri/server/runtime_config_linux_test.go index 63768081c..4f409e21b 100644 --- a/pkg/cri/server/runtime_config_linux_test.go +++ b/pkg/cri/server/runtime_config_linux_test.go @@ -94,8 +94,8 @@ func TestRuntimeConfig(t *testing.T) { test := test t.Run(test.desc, func(t *testing.T) { c := newTestCRIService() - c.config.PluginConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime - c.config.PluginConfig.ContainerdConfig.Runtimes = test.runtimes + c.config.RuntimeConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime + c.config.RuntimeConfig.ContainerdConfig.Runtimes = test.runtimes resp, err := c.RuntimeConfig(context.TODO(), &runtime.RuntimeConfigRequest{}) assert.NoError(t, err) diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index dc1bbc23a..825287456 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -142,6 +142,8 @@ type CRIServiceOptions struct { ImageService ImageService + StreamingConfig streaming.Config + NRI *nri.API // SandboxControllers is a map of all the loaded sandbox controllers @@ -189,7 +191,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi } // prepare streaming server - c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout) + c.streamServer, err = streaming.NewServer(options.StreamingConfig, newStreamRuntime(c)) if err != nil { return nil, nil, fmt.Errorf("failed to create stream server: %w", err) } diff --git a/pkg/cri/server/streaming.go b/pkg/cri/server/streaming.go index 50ae7cb09..d3e9f9f27 100644 --- a/pkg/cri/server/streaming.go +++ b/pkg/cri/server/streaming.go @@ -18,104 +18,18 @@ package server import ( "context" - "crypto/tls" - "errors" "fmt" "io" "math" - "net" - "os" - "time" - k8snet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/remotecommand" - k8scert "k8s.io/client-go/util/cert" "k8s.io/utils/exec" ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" "k8s.io/kubelet/pkg/cri/streaming" ) -type streamListenerMode int - -const ( - x509KeyPairTLS streamListenerMode = iota - selfSignTLS - withoutTLS -) - -func getStreamListenerMode(c *criService) (streamListenerMode, error) { - if c.config.EnableTLSStreaming { - if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" { - return x509KeyPairTLS, nil - } - if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile == "" { - return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile") - } - if c.config.X509KeyPairStreaming.TLSCertFile == "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" { - return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile") - } - return selfSignTLS, nil - } - if c.config.X509KeyPairStreaming.TLSCertFile != "" { - return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set") - } - if c.config.X509KeyPairStreaming.TLSKeyFile != "" { - return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set") - } - return withoutTLS, nil -} - -func newStreamServer(c *criService, addr, port, streamIdleTimeout string) (streaming.Server, error) { - if addr == "" { - a, err := k8snet.ResolveBindAddress(nil) - if err != nil { - return nil, fmt.Errorf("failed to get stream server address: %w", err) - } - addr = a.String() - } - config := streaming.DefaultConfig - if streamIdleTimeout != "" { - var err error - config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout) - if err != nil { - return nil, fmt.Errorf("invalid stream idle timeout: %w", err) - } - } - config.Addr = net.JoinHostPort(addr, port) - run := newStreamRuntime(c) - tlsMode, err := getStreamListenerMode(c) - if err != nil { - return nil, fmt.Errorf("invalid stream server configuration: %w", err) - } - switch tlsMode { - case x509KeyPairTLS: - tlsCert, err := tls.LoadX509KeyPair(c.config.X509KeyPairStreaming.TLSCertFile, c.config.X509KeyPairStreaming.TLSKeyFile) - if err != nil { - return nil, fmt.Errorf("failed to load x509 key pair for stream server: %w", err) - } - config.TLSConfig = &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - } - return streaming.NewServer(config, run) - case selfSignTLS: - tlsCert, err := newTLSCert() - if err != nil { - return nil, fmt.Errorf("failed to generate tls certificate for stream server: %w", err) - } - config.TLSConfig = &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - InsecureSkipVerify: true, - } - return streaming.NewServer(config, run) - case withoutTLS: - return streaming.NewServer(config, run) - default: - return nil, errors.New("invalid configuration for the stream listener") - } -} - type streamRuntime struct { c *criService } @@ -187,54 +101,3 @@ func handleResizing(ctx context.Context, resize <-chan remotecommand.TerminalSiz } }() } - -// newTLSCert returns a self CA signed tls.certificate. -// TODO (mikebrow): replace / rewrite this function to support using CA -// signing of the certificate. Requires a security plan for kubernetes regarding -// CRI connections / streaming, etc. For example, kubernetes could configure or -// require a CA service and pass a configuration down through CRI. -func newTLSCert() (tls.Certificate, error) { - fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err } - - hostName, err := os.Hostname() - if err != nil { - return fail(fmt.Errorf("failed to get hostname: %w", err)) - } - - addrs, err := net.InterfaceAddrs() - if err != nil { - return fail(fmt.Errorf("failed to get host IP addresses: %w", err)) - } - - var alternateIPs []net.IP - var alternateDNS []string - for _, addr := range addrs { - var ip net.IP - - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - default: - continue - } - - alternateIPs = append(alternateIPs, ip) - alternateDNS = append(alternateDNS, ip.String()) - } - - // Generate a self signed certificate key (CA is self) - certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS) - if err != nil { - return fail(fmt.Errorf("certificate key could not be created: %w", err)) - } - - // Load the tls certificate - tlsCert, err := tls.X509KeyPair(certPem, keyPem) - if err != nil { - return fail(fmt.Errorf("certificate could not be loaded: %w", err)) - } - - return tlsCert, nil -} diff --git a/pkg/cri/server/streaming_test.go b/pkg/cri/server/streaming_test.go deleted file mode 100644 index d93945d6b..000000000 --- a/pkg/cri/server/streaming_test.go +++ /dev/null @@ -1,163 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package server - -import ( - "testing" - - "github.com/containerd/containerd/v2/pkg/cri/config" - "github.com/stretchr/testify/assert" -) - -func TestValidateStreamServer(t *testing.T) { - for _, test := range []struct { - desc string - *criService - tlsMode streamListenerMode - expectErr bool - }{ - { - desc: "should pass with default withoutTLS", - criService: &criService{ - config: config.Config{ - PluginConfig: config.DefaultConfig(), - }, - }, - tlsMode: withoutTLS, - expectErr: false, - }, - { - desc: "should pass with x509KeyPairTLS", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: x509KeyPairTLS, - expectErr: false, - }, - { - desc: "should pass with selfSign", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - }, - }, - }, - tlsMode: selfSignTLS, - expectErr: false, - }, - { - desc: "should return error with X509 keypair but not EnableTLSStreaming", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: false, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error with X509 TLSCertFile empty", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error with X509 TLSKeyFile empty", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: true, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error without EnableTLSStreaming and only TLSCertFile set", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: false, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "", - TLSCertFile: "non-empty", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - { - desc: "should return error without EnableTLSStreaming and only TLSKeyFile set", - criService: &criService{ - config: config.Config{ - PluginConfig: config.PluginConfig{ - EnableTLSStreaming: false, - X509KeyPairStreaming: config.X509KeyPairStreaming{ - TLSKeyFile: "non-empty", - TLSCertFile: "", - }, - }, - }, - }, - tlsMode: -1, - expectErr: true, - }, - } { - test := test - t.Run(test.desc, func(t *testing.T) { - tlsMode, err := getStreamListenerMode(test.criService) - if test.expectErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - assert.Equal(t, test.tlsMode, tlsMode) - }) - } -} diff --git a/pkg/cri/server/test_config.go b/pkg/cri/server/test_config.go index a0ec785ff..6c0eaeaf7 100644 --- a/pkg/cri/server/test_config.go +++ b/pkg/cri/server/test_config.go @@ -26,7 +26,7 @@ const ( var testConfig = criconfig.Config{ RootDir: testRootDir, StateDir: testStateDir, - PluginConfig: criconfig.PluginConfig{ + RuntimeConfig: criconfig.RuntimeConfig{ TolerateMissingHugetlbController: true, ContainerdConfig: criconfig.ContainerdConfig{ DefaultRuntimeName: "runc", diff --git a/plugins/cri/cri.go b/plugins/cri/cri.go index a9071e40a..1f1956c89 100644 --- a/plugins/cri/cri.go +++ b/plugins/cri/cri.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/v2/pkg/cri/server" nriservice "github.com/containerd/containerd/v2/pkg/nri" "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/platforms" "google.golang.org/grpc" @@ -44,6 +45,7 @@ import ( // Register CRI service plugin func init() { + defaultConfig := criconfig.DefaultServerConfig() registry.Register(&plugin.Registration{ Type: plugins.GRPCPlugin, ID: "cri", @@ -56,10 +58,9 @@ func init() { plugins.LeasePlugin, plugins.SandboxStorePlugin, plugins.TransferPlugin, + plugins.WarningPlugin, }, - Config: &criconfig.ServiceConfig{ - DisableTCPService: true, - }, + Config: &defaultConfig, ConfigMigration: func(ctx context.Context, version int, pluginConfigs map[string]interface{}) error { if version >= srvconfig.CurrentConfigVersion { return nil @@ -87,7 +88,7 @@ func init() { func initCRIService(ic *plugin.InitContext) (interface{}, error) { ctx := ic.Context - config := ic.Config.(*criconfig.ServiceConfig) + config := ic.Config.(*criconfig.ServerConfig) // Get runtime service. criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime") @@ -101,6 +102,19 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err) } + if warnings, err := criconfig.ValidateServerConfig(ic.Context, config); err != nil { + return nil, fmt.Errorf("invalid cri image config: %w", err) + } else if len(warnings) > 0 { + ws, err := ic.GetSingle(plugins.WarningPlugin) + if err != nil { + return nil, err + } + warn := ws.(warning.Service) + for _, w := range warnings { + warn.Emit(ic.Context, w) + } + } + log.G(ctx).Info("Connect containerd service") client, err := containerd.New( "", @@ -119,16 +133,21 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)), } + streamingConfig, err := config.StreamingConfig() + if err != nil { + return nil, fmt.Errorf("failed to get streaming config: %w", err) + } + options := &server.CRIServiceOptions{ RuntimeService: criRuntimePlugin.(server.RuntimeService), ImageService: criImagePlugin.(server.ImageService), + StreamingConfig: streamingConfig, NRI: getNRIAPI(ic), Client: client, SandboxControllers: sbControllers, } is := criImagePlugin.(imageService).GRPCService() - // TODO: More options specifically for grpc service? s, rs, err := server.NewCRIService(options) if err != nil { return nil, fmt.Errorf("failed to create CRI service: %w", err) diff --git a/plugins/cri/runtime/plugin.go b/plugins/cri/runtime/plugin.go index 7e8181a88..eb82ef2ce 100644 --- a/plugins/cri/runtime/plugin.go +++ b/plugins/cri/runtime/plugin.go @@ -41,7 +41,7 @@ import ( ) func init() { - config := criconfig.DefaultConfig() + config := criconfig.DefaultRuntimeConfig() // Base plugin that other CRI services depend on. registry.Register(&plugin.Registration{ @@ -72,8 +72,8 @@ func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) { ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ctx := ic.Context - pluginConfig := ic.Config.(*criconfig.PluginConfig) - if warnings, err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil { + pluginConfig := ic.Config.(*criconfig.RuntimeConfig) + if warnings, err := criconfig.ValidateRuntimeConfig(ctx, pluginConfig); err != nil { return nil, fmt.Errorf("invalid plugin config: %w", err) } else if len(warnings) > 0 { ws, err := ic.GetSingle(plugins.WarningPlugin) @@ -92,7 +92,7 @@ func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) { containerdStateDir := filepath.Dir(ic.Properties[plugins.PropertyStateDir]) stateDir := filepath.Join(containerdStateDir, "io.containerd.grpc.v1.cri") c := criconfig.Config{ - PluginConfig: *pluginConfig, + RuntimeConfig: *pluginConfig, ContainerdRootDir: containerdRootDir, ContainerdEndpoint: ic.Properties[plugins.PropertyGRPCAddress], RootDir: rootDir,