Split streaming config from runtime config

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2024-01-28 22:00:11 -08:00
parent 58ff9d368d
commit 65b3922df7
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
18 changed files with 433 additions and 412 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/containerd/log"
"github.com/pelletier/go-toml/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubelet/pkg/cri/streaming"
runhcsoptions "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
runcoptions "github.com/containerd/containerd/v2/core/runtime/v2/runc/options"
@ -312,31 +313,18 @@ type ImageConfig struct {
StatsCollectPeriod int `toml:"stats_collect_period" json:"statsCollectPeriod"`
}
// PluginConfig contains toml config related to CRI plugin,
// RuntimeConfig contains toml config related to CRI plugin,
// it is a subset of Config.
type PluginConfig struct {
type RuntimeConfig struct {
// ContainerdConfig contains config related to containerd
ContainerdConfig `toml:"containerd" json:"containerd"`
// CniConfig contains config related to cni
CniConfig `toml:"cni" json:"cni"`
// StreamServerAddress is the ip address streaming server is listening on.
StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"`
// StreamServerPort is the port streaming server is listening on.
StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"`
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"`
// EnableSelinux indicates to enable the selinux support.
EnableSelinux bool `toml:"enable_selinux" json:"enableSelinux"`
// SelinuxCategoryRange allows the upper bound on the category range to be set.
// If not specified or set to 0, defaults to 1024 from the selinux package.
SelinuxCategoryRange int `toml:"selinux_category_range" json:"selinuxCategoryRange"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// X509KeyPairStreaming is a x509 key pair used for TLS streaming
X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"`
// MaxContainerLogLineSize is the maximum log line size in bytes for a container.
// Log line longer than the limit will be split into multiple lines. Non-positive
// value means no limit.
@ -416,10 +404,10 @@ type X509KeyPairStreaming struct {
TLSKeyFile string `toml:"tls_key_file" json:"tlsKeyFile"`
}
// Config contains all configurations for cri server.
// Config contains all configurations for CRI runtime plugin.
type Config struct {
// PluginConfig is the config for CRI plugin.
PluginConfig
// RuntimeConfig is the config for CRI runtime.
RuntimeConfig
// ContainerdRootDir is the root directory path for containerd.
ContainerdRootDir string `json:"containerdRootDir"`
// ContainerdEndpoint is the containerd endpoint path.
@ -431,10 +419,23 @@ type Config struct {
StateDir string `json:"stateDir"`
}
// ServiceConfig contains all the configuration for the CRI API server.
type ServiceConfig struct {
// ServerConfig contains all the configuration for the CRI API server.
type ServerConfig struct {
// DisableTCPService disables serving CRI on the TCP server.
DisableTCPService bool `toml:"disable_tcp_service" json:"disableTCPService"`
// StreamServerAddress is the ip address streaming server is listening on.
StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress"`
// StreamServerPort is the port streaming server is listening on.
StreamServerPort string `toml:"stream_server_port" json:"streamServerPort"`
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
StreamIdleTimeout string `toml:"stream_idle_timeout" json:"streamIdleTimeout"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// X509KeyPairStreaming is a x509 key pair used for TLS streaming
X509KeyPairStreaming `toml:"x509_key_pair_streaming" json:"x509KeyPairStreaming"`
}
const (
@ -498,8 +499,8 @@ func ValidateImageConfig(ctx context.Context, c *ImageConfig) ([]deprecation.War
return warnings, nil
}
// ValidatePluginConfig validates the given plugin configuration.
func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.Warning, error) {
// ValidateRuntimeConfig validates the given runtime configuration.
func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation.Warning, error) {
var warnings []deprecation.Warning
if c.ContainerdConfig.Runtimes == nil {
c.ContainerdConfig.Runtimes = make(map[string]Runtime)
@ -524,13 +525,6 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W
}
}
// Validation for stream_idle_timeout
if c.StreamIdleTimeout != "" {
if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil {
return warnings, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
// Validation for drain_exec_sync_io_timeout
if c.DrainExecSyncIOTimeout != "" {
if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil {
@ -543,6 +537,18 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W
return warnings, nil
}
// ValidateServerConfig validates the given server configuration.
func ValidateServerConfig(ctx context.Context, c *ServerConfig) ([]deprecation.Warning, error) {
var warnings []deprecation.Warning
// Validation for stream_idle_timeout
if c.StreamIdleTimeout != "" {
if _, err := time.ParseDuration(c.StreamIdleTimeout); err != nil {
return warnings, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
return warnings, nil
}
func (config *Config) GetSandboxRuntime(podSandboxConfig *runtime.PodSandboxConfig, runtimeHandler string) (Runtime, error) {
if untrustedWorkload(podSandboxConfig) {
// If the untrusted annotation is provided, runtimeHandler MUST be empty.
@ -631,3 +637,17 @@ func getRuntimeOptionsType(t string) interface{} {
return &runtimeoptions.Options{}
}
}
func DefaultServerConfig() ServerConfig {
return ServerConfig{
DisableTCPService: true,
StreamServerAddress: "127.0.0.1",
StreamServerPort: "0",
StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "",
},
}
}

View File

@ -28,7 +28,7 @@ import (
var kernelGreaterEqualThan = kernel.GreaterEqualThan
func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error {
func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error {
if c.EnableUnprivilegedICMP || c.EnableUnprivilegedPorts {
fourDotEleven := kernel.KernelVersion{Kernel: 4, Major: 11}
ok, err := kernelGreaterEqualThan(fourDotEleven)

View File

@ -32,13 +32,13 @@ func TestValidateEnableUnprivileged(t *testing.T) {
tests := []struct {
name string
config *PluginConfig
config *RuntimeConfig
kernelGreater bool
expectedErr string
}{
{
name: "disable unprivileged_icmp and unprivileged_port",
config: &PluginConfig{
config: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -54,7 +54,7 @@ func TestValidateEnableUnprivileged(t *testing.T) {
},
{
name: "enable unprivileged_icmp or unprivileged_port, but kernel version is smaller than 4.11",
config: &PluginConfig{
config: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -71,7 +71,7 @@ func TestValidateEnableUnprivileged(t *testing.T) {
},
{
name: "enable unprivileged_icmp or unprivileged_port, but kernel version is greater than or equal 4.11",
config: &PluginConfig{
config: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{

View File

@ -22,6 +22,6 @@ import (
"context"
)
func ValidateEnableUnprivileged(ctx context.Context, c *PluginConfig) error {
func ValidateEnableUnprivileged(ctx context.Context, c *RuntimeConfig) error {
return nil
}

View File

@ -28,29 +28,32 @@ import (
func TestValidateConfig(t *testing.T) {
for desc, test := range map[string]struct {
config *PluginConfig
expectedErr string
expected *PluginConfig
runtimeConfig *RuntimeConfig
runtimeExpectedErr string
runtimeExpected *RuntimeConfig
imageConfig *ImageConfig
imageExpectedErr string
imageExpected *ImageConfig
serverConfig *ServerConfig
serverExpectedErr string
serverExpected *ServerConfig
warnings []deprecation.Warning
}{
"no default_runtime_name": {
config: &PluginConfig{},
expectedErr: "`default_runtime_name` is empty",
runtimeConfig: &RuntimeConfig{},
runtimeExpectedErr: "`default_runtime_name` is empty",
},
"no runtime[default_runtime_name]": {
config: &PluginConfig{
runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
},
},
expectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"",
runtimeExpectedErr: "no corresponding runtime configured in `containerd.runtimes` for `containerd` `default_runtime_name = \"default\"",
},
"deprecated auths": {
config: &PluginConfig{
runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -58,7 +61,7 @@ func TestValidateConfig(t *testing.T) {
},
},
},
expected: &PluginConfig{
runtimeExpected: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -92,18 +95,10 @@ func TestValidateConfig(t *testing.T) {
warnings: []deprecation.Warning{deprecation.CRIRegistryAuths},
},
"invalid stream_idle_timeout": {
config: &PluginConfig{
serverConfig: &ServerConfig{
StreamIdleTimeout: "invalid",
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
RuntimeDefault: {
Type: "default",
},
},
},
},
expectedErr: "invalid stream idle timeout",
serverExpectedErr: "invalid stream idle timeout",
},
"conflicting mirror registry config": {
imageConfig: &ImageConfig{
@ -117,7 +112,7 @@ func TestValidateConfig(t *testing.T) {
imageExpectedErr: "`mirrors` cannot be set when `config_path` is provided",
},
"deprecated mirrors": {
config: &PluginConfig{
runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -132,7 +127,7 @@ func TestValidateConfig(t *testing.T) {
},
},
},
expected: &PluginConfig{
runtimeExpected: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -152,7 +147,7 @@ func TestValidateConfig(t *testing.T) {
warnings: []deprecation.Warning{deprecation.CRIRegistryMirrors},
},
"deprecated configs": {
config: &PluginConfig{
runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -171,7 +166,7 @@ func TestValidateConfig(t *testing.T) {
},
},
},
expected: &PluginConfig{
runtimeExpected: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -195,7 +190,7 @@ func TestValidateConfig(t *testing.T) {
warnings: []deprecation.Warning{deprecation.CRIRegistryConfigs},
},
"privileged_without_host_devices_all_devices_allowed without privileged_without_host_devices": {
config: &PluginConfig{
runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -207,10 +202,10 @@ func TestValidateConfig(t *testing.T) {
},
},
},
expectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled",
runtimeExpectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled",
},
"invalid drain_exec_sync_io_timeout input": {
config: &PluginConfig{
runtimeConfig: &RuntimeConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
@ -221,18 +216,18 @@ func TestValidateConfig(t *testing.T) {
},
DrainExecSyncIOTimeout: "10",
},
expectedErr: "invalid `drain_exec_sync_io_timeout`",
runtimeExpectedErr: "invalid `drain_exec_sync_io_timeout`",
},
} {
t.Run(desc, func(t *testing.T) {
var warnings []deprecation.Warning
if test.config != nil {
w, err := ValidatePluginConfig(context.Background(), test.config)
if test.expectedErr != "" {
assert.Contains(t, err.Error(), test.expectedErr)
if test.runtimeConfig != nil {
w, err := ValidateRuntimeConfig(context.Background(), test.runtimeConfig)
if test.runtimeExpectedErr != "" {
assert.Contains(t, err.Error(), test.runtimeExpectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, test.config)
assert.Equal(t, test.runtimeExpected, test.runtimeConfig)
}
warnings = append(warnings, w...)
}
@ -246,6 +241,16 @@ func TestValidateConfig(t *testing.T) {
}
warnings = append(warnings, w...)
}
if test.serverConfig != nil {
w, err := ValidateServerConfig(context.Background(), test.serverConfig)
if test.serverExpectedErr != "" {
assert.Contains(t, err.Error(), test.serverExpectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.serverExpected, test.serverConfig)
}
warnings = append(warnings, w...)
}
if len(test.warnings) > 0 {
assert.ElementsMatch(t, test.warnings, warnings)

View File

@ -21,7 +21,6 @@ package config
import (
"github.com/containerd/containerd/v2/defaults"
"github.com/pelletier/go-toml/v2"
"k8s.io/kubelet/pkg/cri/streaming"
)
func DefaultImageConfig() ImageConfig {
@ -41,8 +40,8 @@ func DefaultImageConfig() ImageConfig {
}
}
// DefaultConfig returns default configurations of cri plugin.
func DefaultConfig() PluginConfig {
// DefaultRuntimeConfig returns default configurations of cri plugin.
func DefaultRuntimeConfig() RuntimeConfig {
defaultRuncV2Opts := `
# NoNewKeyring disables new keyring for the container.
NoNewKeyring = false
@ -71,7 +70,7 @@ func DefaultConfig() PluginConfig {
var m map[string]interface{}
toml.Unmarshal([]byte(defaultRuncV2Opts), &m)
return PluginConfig{
return RuntimeConfig{
CniConfig: CniConfig{
NetworkPluginBinDir: "/opt/cni/bin",
NetworkPluginConfDir: "/etc/cni/net.d",
@ -89,16 +88,8 @@ func DefaultConfig() PluginConfig {
},
},
},
StreamServerAddress: "127.0.0.1",
StreamServerPort: "0",
StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour
EnableSelinux: false,
SelinuxCategoryRange: 1024,
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "",
},
MaxContainerLogLineSize: 16 * 1024,
DisableProcMount: false,
TolerateMissingHugetlbController: true,

View File

@ -21,7 +21,6 @@ import (
"path/filepath"
"github.com/containerd/containerd/v2/defaults"
"k8s.io/kubelet/pkg/cri/streaming"
)
func DefaultImageConfig() ImageConfig {
@ -39,9 +38,9 @@ func DefaultImageConfig() ImageConfig {
}
}
// DefaultConfig returns default configurations of cri plugin.
func DefaultConfig() PluginConfig {
return PluginConfig{
// DefaultRuntimeConfig returns default configurations of cri plugin.
func DefaultRuntimeConfig() RuntimeConfig {
return RuntimeConfig{
CniConfig: CniConfig{
NetworkPluginBinDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "bin"),
NetworkPluginConfDir: filepath.Join(os.Getenv("ProgramFiles"), "containerd", "cni", "conf"),
@ -78,14 +77,6 @@ func DefaultConfig() PluginConfig {
},
},
},
StreamServerAddress: "127.0.0.1",
StreamServerPort: "0",
StreamIdleTimeout: streaming.DefaultConfig.StreamIdleTimeout.String(), // 4 hour
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "",
},
MaxContainerLogLineSize: 16 * 1024,
IgnoreImageDefinedVolumes: false,
// TODO(windows): Add platform specific config, so that most common defaults can be shared.

163
pkg/cri/config/streaming.go Normal file
View File

@ -0,0 +1,163 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"crypto/tls"
"errors"
"fmt"
"net"
"os"
"time"
k8snet "k8s.io/apimachinery/pkg/util/net"
k8scert "k8s.io/client-go/util/cert"
"k8s.io/kubelet/pkg/cri/streaming"
)
type streamListenerMode int
const (
x509KeyPairTLS streamListenerMode = iota
selfSignTLS
withoutTLS
)
func getStreamListenerMode(config *ServerConfig) (streamListenerMode, error) {
if config.EnableTLSStreaming {
if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile != "" {
return x509KeyPairTLS, nil
}
if config.X509KeyPairStreaming.TLSCertFile != "" && config.X509KeyPairStreaming.TLSKeyFile == "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile")
}
if config.X509KeyPairStreaming.TLSCertFile == "" && config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile")
}
return selfSignTLS, nil
}
if config.X509KeyPairStreaming.TLSCertFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set")
}
if config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set")
}
return withoutTLS, nil
}
func (c *ServerConfig) StreamingConfig() (streaming.Config, error) {
var (
addr = c.StreamServerAddress
port = c.StreamServerPort
streamIdleTimeout = c.StreamIdleTimeout
)
if addr == "" {
a, err := k8snet.ResolveBindAddress(nil)
if err != nil {
return streaming.Config{}, fmt.Errorf("failed to get stream server address: %w", err)
}
addr = a.String()
}
config := streaming.DefaultConfig
if streamIdleTimeout != "" {
var err error
config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout)
if err != nil {
return streaming.Config{}, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
config.Addr = net.JoinHostPort(addr, port)
tlsMode, err := getStreamListenerMode(c)
if err != nil {
return streaming.Config{}, fmt.Errorf("invalid stream server configuration: %w", err)
}
switch tlsMode {
case x509KeyPairTLS:
tlsCert, err := tls.LoadX509KeyPair(c.X509KeyPairStreaming.TLSCertFile, c.X509KeyPairStreaming.TLSKeyFile)
if err != nil {
return streaming.Config{}, fmt.Errorf("failed to load x509 key pair for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
case selfSignTLS:
tlsCert, err := newTLSCert()
if err != nil {
return streaming.Config{}, fmt.Errorf("failed to generate tls certificate for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
case withoutTLS:
default:
return streaming.Config{}, errors.New("invalid configuration for the stream listener")
}
return config, nil
}
// newTLSCert returns a self CA signed tls.certificate.
// TODO (mikebrow): replace / rewrite this function to support using CA
// signing of the certificate. Requires a security plan for kubernetes regarding
// CRI connections / streaming, etc. For example, kubernetes could configure or
// require a CA service and pass a configuration down through CRI.
func newTLSCert() (tls.Certificate, error) {
fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err }
hostName, err := os.Hostname()
if err != nil {
return fail(fmt.Errorf("failed to get hostname: %w", err))
}
addrs, err := net.InterfaceAddrs()
if err != nil {
return fail(fmt.Errorf("failed to get host IP addresses: %w", err))
}
var alternateIPs []net.IP
var alternateDNS []string
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
continue
}
alternateIPs = append(alternateIPs, ip)
alternateDNS = append(alternateDNS, ip.String())
}
// Generate a self signed certificate key (CA is self)
certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS)
if err != nil {
return fail(fmt.Errorf("certificate key could not be created: %w", err))
}
// Load the tls certificate
tlsCert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
return fail(fmt.Errorf("certificate could not be loaded: %w", err))
}
return tlsCert, nil
}

View File

@ -0,0 +1,130 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestValidateStreamServer(t *testing.T) {
for _, test := range []struct {
desc string
config ServerConfig
tlsMode streamListenerMode
expectErr bool
}{
{
desc: "should pass with default withoutTLS",
config: DefaultServerConfig(),
tlsMode: withoutTLS,
expectErr: false,
},
{
desc: "should pass with x509KeyPairTLS",
config: ServerConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
tlsMode: x509KeyPairTLS,
expectErr: false,
},
{
desc: "should pass with selfSign",
config: ServerConfig{
EnableTLSStreaming: true,
},
tlsMode: selfSignTLS,
expectErr: false,
},
{
desc: "should return error with X509 keypair but not EnableTLSStreaming",
config: ServerConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSCertFile empty",
config: ServerConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSKeyFile empty",
config: ServerConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSCertFile set",
config: ServerConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSKeyFile set",
config: ServerConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
tlsMode: -1,
expectErr: true,
},
} {
test := test
t.Run(test.desc, func(t *testing.T) {
tlsMode, err := getStreamListenerMode(&test.config)
if test.expectErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, test.tlsMode, tlsMode)
})
}
}

View File

@ -239,7 +239,7 @@ func TestUpdateOCILinuxResource(t *testing.T) {
test := test
t.Run(test.desc, func(t *testing.T) {
config := criconfig.Config{
PluginConfig: criconfig.PluginConfig{
RuntimeConfig: criconfig.RuntimeConfig{
TolerateMissingHugetlbController: true,
DisableHugetlbController: false,
},

View File

@ -38,7 +38,7 @@ const (
var testConfig = criconfig.Config{
RootDir: testRootDir,
StateDir: testStateDir,
PluginConfig: criconfig.PluginConfig{
RuntimeConfig: criconfig.RuntimeConfig{
TolerateMissingHugetlbController: true,
},
}

View File

@ -94,8 +94,8 @@ func TestRuntimeConfig(t *testing.T) {
test := test
t.Run(test.desc, func(t *testing.T) {
c := newTestCRIService()
c.config.PluginConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime
c.config.PluginConfig.ContainerdConfig.Runtimes = test.runtimes
c.config.RuntimeConfig.ContainerdConfig.DefaultRuntimeName = test.defaultRuntime
c.config.RuntimeConfig.ContainerdConfig.Runtimes = test.runtimes
resp, err := c.RuntimeConfig(context.TODO(), &runtime.RuntimeConfigRequest{})
assert.NoError(t, err)

View File

@ -142,6 +142,8 @@ type CRIServiceOptions struct {
ImageService ImageService
StreamingConfig streaming.Config
NRI *nri.API
// SandboxControllers is a map of all the loaded sandbox controllers
@ -189,7 +191,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
}
// prepare streaming server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout)
c.streamServer, err = streaming.NewServer(options.StreamingConfig, newStreamRuntime(c))
if err != nil {
return nil, nil, fmt.Errorf("failed to create stream server: %w", err)
}

View File

@ -18,104 +18,18 @@ package server
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"math"
"net"
"os"
"time"
k8snet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand"
k8scert "k8s.io/client-go/util/cert"
"k8s.io/utils/exec"
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
"k8s.io/kubelet/pkg/cri/streaming"
)
type streamListenerMode int
const (
x509KeyPairTLS streamListenerMode = iota
selfSignTLS
withoutTLS
)
func getStreamListenerMode(c *criService) (streamListenerMode, error) {
if c.config.EnableTLSStreaming {
if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" {
return x509KeyPairTLS, nil
}
if c.config.X509KeyPairStreaming.TLSCertFile != "" && c.config.X509KeyPairStreaming.TLSKeyFile == "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSKeyFile")
}
if c.config.X509KeyPairStreaming.TLSCertFile == "" && c.config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("must set X509KeyPairStreaming.TLSCertFile")
}
return selfSignTLS, nil
}
if c.config.X509KeyPairStreaming.TLSCertFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSCertFile is set but EnableTLSStreaming is not set")
}
if c.config.X509KeyPairStreaming.TLSKeyFile != "" {
return -1, errors.New("X509KeyPairStreaming.TLSKeyFile is set but EnableTLSStreaming is not set")
}
return withoutTLS, nil
}
func newStreamServer(c *criService, addr, port, streamIdleTimeout string) (streaming.Server, error) {
if addr == "" {
a, err := k8snet.ResolveBindAddress(nil)
if err != nil {
return nil, fmt.Errorf("failed to get stream server address: %w", err)
}
addr = a.String()
}
config := streaming.DefaultConfig
if streamIdleTimeout != "" {
var err error
config.StreamIdleTimeout, err = time.ParseDuration(streamIdleTimeout)
if err != nil {
return nil, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}
config.Addr = net.JoinHostPort(addr, port)
run := newStreamRuntime(c)
tlsMode, err := getStreamListenerMode(c)
if err != nil {
return nil, fmt.Errorf("invalid stream server configuration: %w", err)
}
switch tlsMode {
case x509KeyPairTLS:
tlsCert, err := tls.LoadX509KeyPair(c.config.X509KeyPairStreaming.TLSCertFile, c.config.X509KeyPairStreaming.TLSKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load x509 key pair for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
return streaming.NewServer(config, run)
case selfSignTLS:
tlsCert, err := newTLSCert()
if err != nil {
return nil, fmt.Errorf("failed to generate tls certificate for stream server: %w", err)
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
InsecureSkipVerify: true,
}
return streaming.NewServer(config, run)
case withoutTLS:
return streaming.NewServer(config, run)
default:
return nil, errors.New("invalid configuration for the stream listener")
}
}
type streamRuntime struct {
c *criService
}
@ -187,54 +101,3 @@ func handleResizing(ctx context.Context, resize <-chan remotecommand.TerminalSiz
}
}()
}
// newTLSCert returns a self CA signed tls.certificate.
// TODO (mikebrow): replace / rewrite this function to support using CA
// signing of the certificate. Requires a security plan for kubernetes regarding
// CRI connections / streaming, etc. For example, kubernetes could configure or
// require a CA service and pass a configuration down through CRI.
func newTLSCert() (tls.Certificate, error) {
fail := func(err error) (tls.Certificate, error) { return tls.Certificate{}, err }
hostName, err := os.Hostname()
if err != nil {
return fail(fmt.Errorf("failed to get hostname: %w", err))
}
addrs, err := net.InterfaceAddrs()
if err != nil {
return fail(fmt.Errorf("failed to get host IP addresses: %w", err))
}
var alternateIPs []net.IP
var alternateDNS []string
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
continue
}
alternateIPs = append(alternateIPs, ip)
alternateDNS = append(alternateDNS, ip.String())
}
// Generate a self signed certificate key (CA is self)
certPem, keyPem, err := k8scert.GenerateSelfSignedCertKey(hostName, alternateIPs, alternateDNS)
if err != nil {
return fail(fmt.Errorf("certificate key could not be created: %w", err))
}
// Load the tls certificate
tlsCert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
return fail(fmt.Errorf("certificate could not be loaded: %w", err))
}
return tlsCert, nil
}

View File

@ -1,163 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
"github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/stretchr/testify/assert"
)
func TestValidateStreamServer(t *testing.T) {
for _, test := range []struct {
desc string
*criService
tlsMode streamListenerMode
expectErr bool
}{
{
desc: "should pass with default withoutTLS",
criService: &criService{
config: config.Config{
PluginConfig: config.DefaultConfig(),
},
},
tlsMode: withoutTLS,
expectErr: false,
},
{
desc: "should pass with x509KeyPairTLS",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: x509KeyPairTLS,
expectErr: false,
},
{
desc: "should pass with selfSign",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
},
},
},
tlsMode: selfSignTLS,
expectErr: false,
},
{
desc: "should return error with X509 keypair but not EnableTLSStreaming",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSCertFile empty",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error with X509 TLSKeyFile empty",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: true,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSCertFile set",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "",
TLSCertFile: "non-empty",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
{
desc: "should return error without EnableTLSStreaming and only TLSKeyFile set",
criService: &criService{
config: config.Config{
PluginConfig: config.PluginConfig{
EnableTLSStreaming: false,
X509KeyPairStreaming: config.X509KeyPairStreaming{
TLSKeyFile: "non-empty",
TLSCertFile: "",
},
},
},
},
tlsMode: -1,
expectErr: true,
},
} {
test := test
t.Run(test.desc, func(t *testing.T) {
tlsMode, err := getStreamListenerMode(test.criService)
if test.expectErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, test.tlsMode, tlsMode)
})
}
}

View File

@ -26,7 +26,7 @@ const (
var testConfig = criconfig.Config{
RootDir: testRootDir,
StateDir: testStateDir,
PluginConfig: criconfig.PluginConfig{
RuntimeConfig: criconfig.RuntimeConfig{
TolerateMissingHugetlbController: true,
ContainerdConfig: criconfig.ContainerdConfig{
DefaultRuntimeName: "runc",

View File

@ -35,6 +35,7 @@ import (
"github.com/containerd/containerd/v2/pkg/cri/server"
nriservice "github.com/containerd/containerd/v2/pkg/nri"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/platforms"
"google.golang.org/grpc"
@ -44,6 +45,7 @@ import (
// Register CRI service plugin
func init() {
defaultConfig := criconfig.DefaultServerConfig()
registry.Register(&plugin.Registration{
Type: plugins.GRPCPlugin,
ID: "cri",
@ -56,10 +58,9 @@ func init() {
plugins.LeasePlugin,
plugins.SandboxStorePlugin,
plugins.TransferPlugin,
plugins.WarningPlugin,
},
Config: &criconfig.ServiceConfig{
DisableTCPService: true,
},
Config: &defaultConfig,
ConfigMigration: func(ctx context.Context, version int, pluginConfigs map[string]interface{}) error {
if version >= srvconfig.CurrentConfigVersion {
return nil
@ -87,7 +88,7 @@ func init() {
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
ctx := ic.Context
config := ic.Config.(*criconfig.ServiceConfig)
config := ic.Config.(*criconfig.ServerConfig)
// Get runtime service.
criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime")
@ -101,6 +102,19 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err)
}
if warnings, err := criconfig.ValidateServerConfig(ic.Context, config); err != nil {
return nil, fmt.Errorf("invalid cri image config: %w", err)
} else if len(warnings) > 0 {
ws, err := ic.GetSingle(plugins.WarningPlugin)
if err != nil {
return nil, err
}
warn := ws.(warning.Service)
for _, w := range warnings {
warn.Emit(ic.Context, w)
}
}
log.G(ctx).Info("Connect containerd service")
client, err := containerd.New(
"",
@ -119,16 +133,21 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)),
}
streamingConfig, err := config.StreamingConfig()
if err != nil {
return nil, fmt.Errorf("failed to get streaming config: %w", err)
}
options := &server.CRIServiceOptions{
RuntimeService: criRuntimePlugin.(server.RuntimeService),
ImageService: criImagePlugin.(server.ImageService),
StreamingConfig: streamingConfig,
NRI: getNRIAPI(ic),
Client: client,
SandboxControllers: sbControllers,
}
is := criImagePlugin.(imageService).GRPCService()
// TODO: More options specifically for grpc service?
s, rs, err := server.NewCRIService(options)
if err != nil {
return nil, fmt.Errorf("failed to create CRI service: %w", err)

View File

@ -41,7 +41,7 @@ import (
)
func init() {
config := criconfig.DefaultConfig()
config := criconfig.DefaultRuntimeConfig()
// Base plugin that other CRI services depend on.
registry.Register(&plugin.Registration{
@ -72,8 +72,8 @@ func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
ctx := ic.Context
pluginConfig := ic.Config.(*criconfig.PluginConfig)
if warnings, err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil {
pluginConfig := ic.Config.(*criconfig.RuntimeConfig)
if warnings, err := criconfig.ValidateRuntimeConfig(ctx, pluginConfig); err != nil {
return nil, fmt.Errorf("invalid plugin config: %w", err)
} else if len(warnings) > 0 {
ws, err := ic.GetSingle(plugins.WarningPlugin)
@ -92,7 +92,7 @@ func initCRIRuntime(ic *plugin.InitContext) (interface{}, error) {
containerdStateDir := filepath.Dir(ic.Properties[plugins.PropertyStateDir])
stateDir := filepath.Join(containerdStateDir, "io.containerd.grpc.v1.cri")
c := criconfig.Config{
PluginConfig: *pluginConfig,
RuntimeConfig: *pluginConfig,
ContainerdRootDir: containerdRootDir,
ContainerdEndpoint: ic.Properties[plugins.PropertyGRPCAddress],
RootDir: rootDir,