Move services/server to cmd/containerd/server
Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
448
cmd/containerd/server/config/config.go
Normal file
448
cmd/containerd/server/config/config.go
Normal file
@@ -0,0 +1,448 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// config is the global configuration for containerd
|
||||
//
|
||||
// Version History
|
||||
// 1: Deprecated and removed in containerd 2.0
|
||||
// 2: Uses fully qualified plugin names
|
||||
// 3: Added support for migration and warning on unknown fields
|
||||
package config
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"dario.cat/mergo"
|
||||
"github.com/pelletier/go-toml/v2"
|
||||
|
||||
"github.com/containerd/containerd/v2/errdefs"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/plugin"
|
||||
)
|
||||
|
||||
// CurrentConfigVersion is the max config version which is supported
|
||||
const CurrentConfigVersion = 3
|
||||
|
||||
// migrations hold the migration functions for every prior containerd config version
|
||||
var migrations = []func(context.Context, *Config) error{
|
||||
nil, // Version 0 is not defined, treated at version 1
|
||||
v1Migrate, // Version 1 plugins renamed to URI for version 2
|
||||
nil, // Version 2 has only plugin changes to version 3
|
||||
}
|
||||
|
||||
// NOTE: Any new map fields added also need to be handled in mergeConfig.
|
||||
|
||||
// Config provides containerd configuration data for the server
|
||||
type Config struct {
|
||||
// Version of the config file
|
||||
Version int `toml:"version"`
|
||||
// Root is the path to a directory where containerd will store persistent data
|
||||
Root string `toml:"root"`
|
||||
// State is the path to a directory where containerd will store transient data
|
||||
State string `toml:"state"`
|
||||
// TempDir is the path to a directory where to place containerd temporary files
|
||||
TempDir string `toml:"temp"`
|
||||
// PluginDir is the directory for dynamic plugins to be stored
|
||||
//
|
||||
// Deprecated: Please use proxy or binary external plugins.
|
||||
PluginDir string `toml:"plugin_dir"`
|
||||
// GRPC configuration settings
|
||||
GRPC GRPCConfig `toml:"grpc"`
|
||||
// TTRPC configuration settings
|
||||
TTRPC TTRPCConfig `toml:"ttrpc"`
|
||||
// Debug and profiling settings
|
||||
Debug Debug `toml:"debug"`
|
||||
// Metrics and monitoring settings
|
||||
Metrics MetricsConfig `toml:"metrics"`
|
||||
// DisabledPlugins are IDs of plugins to disable. Disabled plugins won't be
|
||||
// initialized and started.
|
||||
// DisabledPlugins must use a fully qualified plugin URI.
|
||||
DisabledPlugins []string `toml:"disabled_plugins"`
|
||||
// RequiredPlugins are IDs of required plugins. Containerd exits if any
|
||||
// required plugin doesn't exist or fails to be initialized or started.
|
||||
// RequiredPlugins must use a fully qualified plugin URI.
|
||||
RequiredPlugins []string `toml:"required_plugins"`
|
||||
// Plugins provides plugin specific configuration for the initialization of a plugin
|
||||
Plugins map[string]interface{} `toml:"plugins"`
|
||||
// OOMScore adjust the containerd's oom score
|
||||
OOMScore int `toml:"oom_score"`
|
||||
// Cgroup specifies cgroup information for the containerd daemon process
|
||||
Cgroup CgroupConfig `toml:"cgroup"`
|
||||
// ProxyPlugins configures plugins which are communicated to over GRPC
|
||||
ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"`
|
||||
// Timeouts specified as a duration
|
||||
Timeouts map[string]string `toml:"timeouts"`
|
||||
// Imports are additional file path list to config files that can overwrite main config file fields
|
||||
Imports []string `toml:"imports"`
|
||||
// StreamProcessors configuration
|
||||
StreamProcessors map[string]StreamProcessor `toml:"stream_processors"`
|
||||
}
|
||||
|
||||
// StreamProcessor provides configuration for diff content processors
|
||||
type StreamProcessor struct {
|
||||
// Accepts specific media-types
|
||||
Accepts []string `toml:"accepts"`
|
||||
// Returns the media-type
|
||||
Returns string `toml:"returns"`
|
||||
// Path or name of the binary
|
||||
Path string `toml:"path"`
|
||||
// Args to the binary
|
||||
Args []string `toml:"args"`
|
||||
// Environment variables for the binary
|
||||
Env []string `toml:"env"`
|
||||
}
|
||||
|
||||
// ValidateVersion validates the config for a v2 file
|
||||
func (c *Config) ValidateVersion() error {
|
||||
if c.Version > CurrentConfigVersion {
|
||||
return fmt.Errorf("expected containerd config version equal to or less than `%d`, got `%d`", CurrentConfigVersion, c.Version)
|
||||
}
|
||||
|
||||
for _, p := range c.DisabledPlugins {
|
||||
if !strings.ContainsAny(p, ".") {
|
||||
return fmt.Errorf("invalid disabled plugin URI %q expect io.containerd.x.vx", p)
|
||||
}
|
||||
}
|
||||
for _, p := range c.RequiredPlugins {
|
||||
if !strings.ContainsAny(p, ".") {
|
||||
return fmt.Errorf("invalid required plugin URI %q expect io.containerd.x.vx", p)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MigrateConfig will convert the config to the latest version before using
|
||||
func (c *Config) MigrateConfig(ctx context.Context) error {
|
||||
for c.Version < CurrentConfigVersion {
|
||||
if m := migrations[c.Version]; m != nil {
|
||||
if err := m(ctx, c); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
c.Version++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func v1Migrate(ctx context.Context, c *Config) error {
|
||||
plugins := make(map[string]interface{}, len(c.Plugins))
|
||||
|
||||
// corePlugins is the list of used plugins before v1 was deprecated
|
||||
corePlugins := map[string]string{
|
||||
"cri": "io.containerd.grpc.v1.cri",
|
||||
"cgroups": "io.containerd.monitor.v1.cgroups",
|
||||
"linux": "io.containerd.runtime.v1.linux",
|
||||
"scheduler": "io.containerd.gc.v1.scheduler",
|
||||
"bolt": "io.containerd.metadata.v1.bolt",
|
||||
"task": "io.containerd.runtime.v2.task",
|
||||
"opt": "io.containerd.internal.v1.opt",
|
||||
"restart": "io.containerd.internal.v1.restart",
|
||||
"tracing": "io.containerd.internal.v1.tracing",
|
||||
"otlp": "io.containerd.tracing.processor.v1.otlp",
|
||||
"aufs": "io.containerd.snapshotter.v1.aufs",
|
||||
"btrfs": "io.containerd.snapshotter.v1.btrfs",
|
||||
"devmapper": "io.containerd.snapshotter.v1.devmapper",
|
||||
"native": "io.containerd.snapshotter.v1.native",
|
||||
"overlayfs": "io.containerd.snapshotter.v1.overlayfs",
|
||||
"zfs": "io.containerd.snapshotter.v1.zfs",
|
||||
}
|
||||
for plugin, value := range c.Plugins {
|
||||
if !strings.ContainsAny(plugin, ".") {
|
||||
var ambiguous string
|
||||
if full, ok := corePlugins[plugin]; ok {
|
||||
plugin = full
|
||||
} else if strings.HasSuffix(plugin, "-service") {
|
||||
plugin = "io.containerd.service.v1." + plugin
|
||||
} else if plugin == "windows" || plugin == "windows-lcow" {
|
||||
// runtime, differ, and snapshotter plugins do not have configs for v1
|
||||
ambiguous = plugin
|
||||
plugin = "io.containerd.snapshotter.v1." + plugin
|
||||
} else {
|
||||
ambiguous = plugin
|
||||
plugin = "io.containerd.grpc.v1." + plugin
|
||||
}
|
||||
if ambiguous != "" {
|
||||
log.G(ctx).Warnf("Ambiguous %s plugin in v1 config, treating as %s", ambiguous, plugin)
|
||||
}
|
||||
}
|
||||
plugins[plugin] = value
|
||||
}
|
||||
c.Plugins = plugins
|
||||
return nil
|
||||
}
|
||||
|
||||
// GRPCConfig provides GRPC configuration for the socket
|
||||
type GRPCConfig struct {
|
||||
Address string `toml:"address"`
|
||||
TCPAddress string `toml:"tcp_address"`
|
||||
TCPTLSCA string `toml:"tcp_tls_ca"`
|
||||
TCPTLSCert string `toml:"tcp_tls_cert"`
|
||||
TCPTLSKey string `toml:"tcp_tls_key"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
MaxRecvMsgSize int `toml:"max_recv_message_size"`
|
||||
MaxSendMsgSize int `toml:"max_send_message_size"`
|
||||
}
|
||||
|
||||
// TTRPCConfig provides TTRPC configuration for the socket
|
||||
type TTRPCConfig struct {
|
||||
Address string `toml:"address"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
}
|
||||
|
||||
// Debug provides debug configuration
|
||||
type Debug struct {
|
||||
Address string `toml:"address"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
Level string `toml:"level"`
|
||||
// Format represents the logging format. Supported values are 'text' and 'json'.
|
||||
Format string `toml:"format"`
|
||||
}
|
||||
|
||||
// MetricsConfig provides metrics configuration
|
||||
type MetricsConfig struct {
|
||||
Address string `toml:"address"`
|
||||
GRPCHistogram bool `toml:"grpc_histogram"`
|
||||
}
|
||||
|
||||
// CgroupConfig provides cgroup configuration
|
||||
type CgroupConfig struct {
|
||||
Path string `toml:"path"`
|
||||
}
|
||||
|
||||
// ProxyPlugin provides a proxy plugin configuration
|
||||
type ProxyPlugin struct {
|
||||
Type string `toml:"type"`
|
||||
Address string `toml:"address"`
|
||||
Platform string `toml:"platform"`
|
||||
Exports map[string]string `toml:"exports"`
|
||||
}
|
||||
|
||||
// Decode unmarshals a plugin specific configuration by plugin id
|
||||
func (c *Config) Decode(ctx context.Context, id string, config interface{}) (interface{}, error) {
|
||||
data, ok := c.Plugins[id]
|
||||
if !ok {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
b, err := toml.Marshal(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := toml.NewDecoder(bytes.NewReader(b)).DisallowUnknownFields().Decode(config); err != nil {
|
||||
var serr *toml.StrictMissingError
|
||||
if errors.As(err, &serr) {
|
||||
for _, derr := range serr.Errors {
|
||||
log.G(ctx).WithFields(log.Fields{
|
||||
"plugin": id,
|
||||
"key": strings.Join(derr.Key(), " "),
|
||||
}).WithError(err).Warn("Ignoring unknown key in TOML for plugin")
|
||||
}
|
||||
err = toml.Unmarshal(b, config)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// LoadConfig loads the containerd server config from the provided path
|
||||
func LoadConfig(ctx context.Context, path string, out *Config) error {
|
||||
if out == nil {
|
||||
return fmt.Errorf("argument out must not be nil: %w", errdefs.ErrInvalidArgument)
|
||||
}
|
||||
|
||||
var (
|
||||
loaded = map[string]bool{}
|
||||
pending = []string{path}
|
||||
)
|
||||
|
||||
for len(pending) > 0 {
|
||||
path, pending = pending[0], pending[1:]
|
||||
|
||||
// Check if a file at the given path already loaded to prevent circular imports
|
||||
if _, ok := loaded[path]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
config, err := loadConfigFile(ctx, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mergeConfig(out, config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
imports, err := resolveImports(path, config.Imports)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
loaded[path] = true
|
||||
pending = append(pending, imports...)
|
||||
}
|
||||
|
||||
// Fix up the list of config files loaded
|
||||
out.Imports = []string{}
|
||||
for path := range loaded {
|
||||
out.Imports = append(out.Imports, path)
|
||||
}
|
||||
|
||||
err := out.ValidateVersion()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load TOML from %s: %w", path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadConfigFile decodes a TOML file at the given path
|
||||
func loadConfigFile(ctx context.Context, path string) (*Config, error) {
|
||||
config := &Config{}
|
||||
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if err := toml.NewDecoder(f).DisallowUnknownFields().Decode(config); err != nil {
|
||||
var serr *toml.StrictMissingError
|
||||
if errors.As(err, &serr) {
|
||||
for _, derr := range serr.Errors {
|
||||
row, col := derr.Position()
|
||||
log.G(ctx).WithFields(log.Fields{
|
||||
"file": path,
|
||||
"row": row,
|
||||
"column": col,
|
||||
"key": strings.Join(derr.Key(), " "),
|
||||
}).WithError(err).Warn("Ignoring unknown key in TOML")
|
||||
}
|
||||
|
||||
// Try decoding again with unknown fields
|
||||
config = &Config{}
|
||||
if _, seekerr := f.Seek(0, io.SeekStart); seekerr != nil {
|
||||
return nil, fmt.Errorf("unable to seek file to start %w: failed to unmarshal TOML with unknown fields: %w", seekerr, err)
|
||||
}
|
||||
err = toml.NewDecoder(f).Decode(config)
|
||||
}
|
||||
if err != nil {
|
||||
var derr *toml.DecodeError
|
||||
if errors.As(err, &derr) {
|
||||
row, column := derr.Position()
|
||||
log.G(ctx).WithFields(log.Fields{
|
||||
"file": path,
|
||||
"row": row,
|
||||
"column": column,
|
||||
}).WithError(err).Error("Failure unmarshaling TOML")
|
||||
return nil, fmt.Errorf("failed to unmarshal TOML at row %d column %d: %w", row, column, err)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to unmarshal TOML: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// resolveImports resolves import strings list to absolute paths list:
|
||||
// - If path contains *, glob pattern matching applied
|
||||
// - Non abs path is relative to parent config file directory
|
||||
// - Abs paths returned as is
|
||||
func resolveImports(parent string, imports []string) ([]string, error) {
|
||||
var out []string
|
||||
|
||||
for _, path := range imports {
|
||||
if strings.Contains(path, "*") {
|
||||
matches, err := filepath.Glob(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out = append(out, matches...)
|
||||
} else {
|
||||
path = filepath.Clean(path)
|
||||
if !filepath.IsAbs(path) {
|
||||
path = filepath.Join(filepath.Dir(parent), path)
|
||||
}
|
||||
|
||||
out = append(out, path)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// mergeConfig merges Config structs with the following rules:
|
||||
// 'to' 'from' 'result'
|
||||
// "" "value" "value"
|
||||
// "value" "" "value"
|
||||
// 1 0 1
|
||||
// 0 1 1
|
||||
// []{"1"} []{"2"} []{"1","2"}
|
||||
// []{"1"} []{} []{"1"}
|
||||
// Maps merged by keys, but values are replaced entirely.
|
||||
func mergeConfig(to, from *Config) error {
|
||||
err := mergo.Merge(to, from, mergo.WithOverride, mergo.WithAppendSlice)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Replace entire sections instead of merging map's values.
|
||||
for k, v := range from.Plugins {
|
||||
to.Plugins[k] = v
|
||||
}
|
||||
|
||||
for k, v := range from.StreamProcessors {
|
||||
to.StreamProcessors[k] = v
|
||||
}
|
||||
|
||||
for k, v := range from.ProxyPlugins {
|
||||
to.ProxyPlugins[k] = v
|
||||
}
|
||||
|
||||
for k, v := range from.Timeouts {
|
||||
to.Timeouts[k] = v
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// V2DisabledFilter matches based on URI
|
||||
func V2DisabledFilter(list []string) plugin.DisableFilter {
|
||||
set := make(map[string]struct{}, len(list))
|
||||
for _, l := range list {
|
||||
set[l] = struct{}{}
|
||||
}
|
||||
return func(r *plugin.Registration) bool {
|
||||
_, ok := set[r.URI()]
|
||||
return ok
|
||||
}
|
||||
}
|
||||
249
cmd/containerd/server/config/config_test.go
Normal file
249
cmd/containerd/server/config/config_test.go
Normal file
@@ -0,0 +1,249 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/containerd/log/logtest"
|
||||
)
|
||||
|
||||
func TestMigrations(t *testing.T) {
|
||||
if len(migrations) != CurrentConfigVersion {
|
||||
t.Fatalf("Migration missing, expected %d migrations, only %d defined", CurrentConfigVersion, len(migrations))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeConfigs(t *testing.T) {
|
||||
a := &Config{
|
||||
Version: 2,
|
||||
Root: "old_root",
|
||||
RequiredPlugins: []string{"io.containerd.old_plugin.v1"},
|
||||
DisabledPlugins: []string{"io.containerd.old_plugin.v1"},
|
||||
State: "old_state",
|
||||
OOMScore: 1,
|
||||
Timeouts: map[string]string{"a": "1"},
|
||||
StreamProcessors: map[string]StreamProcessor{"1": {Path: "2", Returns: "4"}, "2": {Path: "5"}},
|
||||
}
|
||||
|
||||
b := &Config{
|
||||
Version: 2,
|
||||
Root: "new_root",
|
||||
RequiredPlugins: []string{"io.containerd.new_plugin1.v1", "io.containerd.new_plugin2.v1"},
|
||||
OOMScore: 2,
|
||||
Timeouts: map[string]string{"b": "2"},
|
||||
StreamProcessors: map[string]StreamProcessor{"1": {Path: "3"}},
|
||||
}
|
||||
|
||||
err := mergeConfig(a, b)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 2, a.Version)
|
||||
assert.Equal(t, "new_root", a.Root)
|
||||
assert.Equal(t, "old_state", a.State)
|
||||
assert.Equal(t, 2, a.OOMScore)
|
||||
assert.Equal(t, []string{"io.containerd.old_plugin.v1", "io.containerd.new_plugin1.v1", "io.containerd.new_plugin2.v1"}, a.RequiredPlugins)
|
||||
assert.Equal(t, []string{"io.containerd.old_plugin.v1"}, a.DisabledPlugins)
|
||||
assert.Equal(t, map[string]string{"a": "1", "b": "2"}, a.Timeouts)
|
||||
assert.Equal(t, map[string]StreamProcessor{"1": {Path: "3"}, "2": {Path: "5"}}, a.StreamProcessors)
|
||||
|
||||
// Verify overrides for integers
|
||||
// https://github.com/containerd/containerd/blob/v1.6.0/services/server/config/config.go#L322-L323
|
||||
a = &Config{Version: 2, OOMScore: 1}
|
||||
b = &Config{Version: 2, OOMScore: 0} // OOMScore "not set / default"
|
||||
err = mergeConfig(a, b)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, a.OOMScore)
|
||||
|
||||
a = &Config{Version: 2, OOMScore: 1}
|
||||
b = &Config{Version: 2, OOMScore: 0} // OOMScore "not set / default"
|
||||
err = mergeConfig(a, b)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, a.OOMScore)
|
||||
}
|
||||
|
||||
func TestResolveImports(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
|
||||
for _, filename := range []string{"config_1.toml", "config_2.toml", "test.toml"} {
|
||||
err := os.WriteFile(filepath.Join(tempDir, filename), []byte(""), 0o600)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
imports, err := resolveImports(filepath.Join(tempDir, "root.toml"), []string{
|
||||
filepath.Join(tempDir, "config_*.toml"), // Glob
|
||||
filepath.Join(tempDir, "./test.toml"), // Path clean up
|
||||
"current.toml", // Resolve current working dir
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, imports, []string{
|
||||
filepath.Join(tempDir, "config_1.toml"),
|
||||
filepath.Join(tempDir, "config_2.toml"),
|
||||
filepath.Join(tempDir, "test.toml"),
|
||||
filepath.Join(tempDir, "current.toml"),
|
||||
})
|
||||
}
|
||||
|
||||
func TestLoadSingleConfig(t *testing.T) {
|
||||
data := `
|
||||
version = 2
|
||||
root = "/var/lib/containerd"
|
||||
|
||||
[stream_processors]
|
||||
[stream_processors."io.containerd.processor.v1.pigz"]
|
||||
accepts = ["application/vnd.docker.image.rootfs.diff.tar.gzip"]
|
||||
path = "unpigz"
|
||||
`
|
||||
tempDir := t.TempDir()
|
||||
|
||||
path := filepath.Join(tempDir, "config.toml")
|
||||
err := os.WriteFile(path, []byte(data), 0o600)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var out Config
|
||||
err = LoadConfig(context.Background(), path, &out)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, out.Version)
|
||||
assert.Equal(t, "/var/lib/containerd", out.Root)
|
||||
assert.Equal(t, map[string]StreamProcessor{
|
||||
"io.containerd.processor.v1.pigz": {
|
||||
Accepts: []string{"application/vnd.docker.image.rootfs.diff.tar.gzip"},
|
||||
Path: "unpigz",
|
||||
},
|
||||
}, out.StreamProcessors)
|
||||
}
|
||||
|
||||
func TestLoadConfigWithImports(t *testing.T) {
|
||||
data1 := `
|
||||
version = 2
|
||||
root = "/var/lib/containerd"
|
||||
imports = ["data2.toml"]
|
||||
`
|
||||
|
||||
data2 := `
|
||||
disabled_plugins = ["io.containerd.v1.xyz"]
|
||||
`
|
||||
|
||||
tempDir := t.TempDir()
|
||||
|
||||
err := os.WriteFile(filepath.Join(tempDir, "data1.toml"), []byte(data1), 0o600)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = os.WriteFile(filepath.Join(tempDir, "data2.toml"), []byte(data2), 0o600)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var out Config
|
||||
err = LoadConfig(context.Background(), filepath.Join(tempDir, "data1.toml"), &out)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 2, out.Version)
|
||||
assert.Equal(t, "/var/lib/containerd", out.Root)
|
||||
assert.Equal(t, []string{"io.containerd.v1.xyz"}, out.DisabledPlugins)
|
||||
}
|
||||
|
||||
func TestLoadConfigWithCircularImports(t *testing.T) {
|
||||
data1 := `
|
||||
version = 2
|
||||
root = "/var/lib/containerd"
|
||||
imports = ["data2.toml", "data1.toml"]
|
||||
`
|
||||
|
||||
data2 := `
|
||||
disabled_plugins = ["io.containerd.v1.xyz"]
|
||||
imports = ["data1.toml", "data2.toml"]
|
||||
`
|
||||
tempDir := t.TempDir()
|
||||
|
||||
err := os.WriteFile(filepath.Join(tempDir, "data1.toml"), []byte(data1), 0o600)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = os.WriteFile(filepath.Join(tempDir, "data2.toml"), []byte(data2), 0o600)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var out Config
|
||||
err = LoadConfig(context.Background(), filepath.Join(tempDir, "data1.toml"), &out)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 2, out.Version)
|
||||
assert.Equal(t, "/var/lib/containerd", out.Root)
|
||||
assert.Equal(t, []string{"io.containerd.v1.xyz"}, out.DisabledPlugins)
|
||||
|
||||
sort.Strings(out.Imports)
|
||||
assert.Equal(t, []string{
|
||||
filepath.Join(tempDir, "data1.toml"),
|
||||
filepath.Join(tempDir, "data2.toml"),
|
||||
}, out.Imports)
|
||||
}
|
||||
|
||||
func TestDecodePlugin(t *testing.T) {
|
||||
ctx := logtest.WithT(context.Background(), t)
|
||||
data := `
|
||||
version = 2
|
||||
[plugins."io.containerd.runtime.v2.task"]
|
||||
shim_debug = true
|
||||
`
|
||||
|
||||
tempDir := t.TempDir()
|
||||
|
||||
path := filepath.Join(tempDir, "config.toml")
|
||||
err := os.WriteFile(path, []byte(data), 0o600)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var out Config
|
||||
err = LoadConfig(context.Background(), path, &out)
|
||||
assert.NoError(t, err)
|
||||
|
||||
pluginConfig := map[string]interface{}{}
|
||||
_, err = out.Decode(ctx, "io.containerd.runtime.v2.task", &pluginConfig)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, pluginConfig["shim_debug"])
|
||||
}
|
||||
|
||||
// TestDecodePluginInV1Config tests decoding non-versioned config
|
||||
// (should be parsed as V1 config) and migrated to latest.
|
||||
func TestDecodePluginInV1Config(t *testing.T) {
|
||||
ctx := logtest.WithT(context.Background(), t)
|
||||
data := `
|
||||
[plugins.task]
|
||||
shim_debug = true
|
||||
`
|
||||
|
||||
path := filepath.Join(t.TempDir(), "config.toml")
|
||||
err := os.WriteFile(path, []byte(data), 0o600)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var out Config
|
||||
err = LoadConfig(context.Background(), path, &out)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, out.Version)
|
||||
|
||||
err = out.MigrateConfig(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, out.Version)
|
||||
|
||||
pluginConfig := map[string]interface{}{}
|
||||
_, err = out.Decode(ctx, "io.containerd.runtime.v2.task", &pluginConfig)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, pluginConfig["shim_debug"])
|
||||
}
|
||||
52
cmd/containerd/server/namespace.go
Normal file
52
cmd/containerd/server/namespace.go
Normal file
@@ -0,0 +1,52 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/containerd/containerd/v2/namespaces"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func unaryNamespaceInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if ns, ok := namespaces.Namespace(ctx); ok {
|
||||
// The above call checks the *incoming* metadata, this makes sure the outgoing metadata is also set
|
||||
ctx = namespaces.WithNamespace(ctx, ns)
|
||||
}
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
func streamNamespaceInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
ctx := ss.Context()
|
||||
if ns, ok := namespaces.Namespace(ctx); ok {
|
||||
// The above call checks the *incoming* metadata, this makes sure the outgoing metadata is also set
|
||||
ctx = namespaces.WithNamespace(ctx, ns)
|
||||
ss = &wrappedSSWithContext{ctx: ctx, ServerStream: ss}
|
||||
}
|
||||
|
||||
return handler(srv, ss)
|
||||
}
|
||||
|
||||
type wrappedSSWithContext struct {
|
||||
grpc.ServerStream
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (w *wrappedSSWithContext) Context() context.Context {
|
||||
return w.ctx
|
||||
}
|
||||
600
cmd/containerd/server/server.go
Normal file
600
cmd/containerd/server/server.go
Normal file
@@ -0,0 +1,600 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/docker/go-metrics"
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
csapi "github.com/containerd/containerd/v2/api/services/content/v1"
|
||||
diffapi "github.com/containerd/containerd/v2/api/services/diff/v1"
|
||||
sbapi "github.com/containerd/containerd/v2/api/services/sandbox/v1"
|
||||
ssapi "github.com/containerd/containerd/v2/api/services/snapshots/v1"
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
csproxy "github.com/containerd/containerd/v2/core/content/proxy"
|
||||
"github.com/containerd/containerd/v2/core/diff"
|
||||
diffproxy "github.com/containerd/containerd/v2/core/diff/proxy"
|
||||
sbproxy "github.com/containerd/containerd/v2/core/sandbox/proxy"
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/containerd/v2/pkg/deprecation"
|
||||
"github.com/containerd/containerd/v2/pkg/dialer"
|
||||
"github.com/containerd/containerd/v2/pkg/timeout"
|
||||
"github.com/containerd/containerd/v2/platforms"
|
||||
"github.com/containerd/containerd/v2/plugins"
|
||||
"github.com/containerd/containerd/v2/plugins/content/local"
|
||||
"github.com/containerd/containerd/v2/services/warning"
|
||||
ssproxy "github.com/containerd/containerd/v2/snapshots/proxy"
|
||||
"github.com/containerd/containerd/v2/sys"
|
||||
"github.com/containerd/plugin"
|
||||
"github.com/containerd/plugin/dynamic"
|
||||
"github.com/containerd/plugin/registry"
|
||||
)
|
||||
|
||||
// CreateTopLevelDirectories creates the top-level root and state directories.
|
||||
func CreateTopLevelDirectories(config *srvconfig.Config) error {
|
||||
switch {
|
||||
case config.Root == "":
|
||||
return errors.New("root must be specified")
|
||||
case config.State == "":
|
||||
return errors.New("state must be specified")
|
||||
case config.Root == config.State:
|
||||
return errors.New("root and state must be different paths")
|
||||
}
|
||||
|
||||
if err := sys.MkdirAllWithACL(config.Root, 0o711); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := sys.MkdirAllWithACL(config.State, 0o711); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.TempDir != "" {
|
||||
if err := sys.MkdirAllWithACL(config.TempDir, 0o711); err != nil {
|
||||
return err
|
||||
}
|
||||
if runtime.GOOS == "windows" {
|
||||
// On Windows, the Host Compute Service (vmcompute) will read the
|
||||
// TEMP/TMP setting from the calling process when creating the
|
||||
// tempdir to extract an image layer to. This allows the
|
||||
// administrator to align the tempdir location with the same volume
|
||||
// as the snapshot dir to avoid a copy operation when moving the
|
||||
// extracted layer to the snapshot dir location.
|
||||
os.Setenv("TEMP", config.TempDir)
|
||||
os.Setenv("TMP", config.TempDir)
|
||||
} else {
|
||||
os.Setenv("TMPDIR", config.TempDir)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// New creates and initializes a new containerd server
|
||||
func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
||||
var (
|
||||
version = config.Version
|
||||
migrationT time.Duration
|
||||
)
|
||||
if version < srvconfig.CurrentConfigVersion {
|
||||
// Migrate config to latest version
|
||||
t1 := time.Now()
|
||||
err := config.MigrateConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
migrationT = time.Since(t1)
|
||||
}
|
||||
|
||||
if err := apply(ctx, config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for key, sec := range config.Timeouts {
|
||||
d, err := time.ParseDuration(sec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to parse %s into a time duration", sec)
|
||||
}
|
||||
timeout.Set(key, d)
|
||||
}
|
||||
loaded, err := LoadPlugins(ctx, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for id, p := range config.StreamProcessors {
|
||||
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))
|
||||
}
|
||||
|
||||
serverOpts := []grpc.ServerOption{
|
||||
grpc.StatsHandler(otelgrpc.NewServerHandler()),
|
||||
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
|
||||
grpc_prometheus.StreamServerInterceptor,
|
||||
streamNamespaceInterceptor,
|
||||
)),
|
||||
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
|
||||
grpc_prometheus.UnaryServerInterceptor,
|
||||
unaryNamespaceInterceptor,
|
||||
)),
|
||||
}
|
||||
if config.GRPC.MaxRecvMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))
|
||||
}
|
||||
if config.GRPC.MaxSendMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
|
||||
}
|
||||
ttrpcServer, err := newTTRPCServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tcpServerOpts := serverOpts
|
||||
if config.GRPC.TCPTLSCert != "" {
|
||||
log.G(ctx).Info("setting up tls on tcp GRPC services...")
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
|
||||
|
||||
if config.GRPC.TCPTLSCA != "" {
|
||||
caCertPool := x509.NewCertPool()
|
||||
caCert, err := os.ReadFile(config.GRPC.TCPTLSCA)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load CA file: %w", err)
|
||||
}
|
||||
caCertPool.AppendCertsFromPEM(caCert)
|
||||
tlsConfig.ClientCAs = caCertPool
|
||||
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
}
|
||||
|
||||
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
|
||||
// grpcService allows GRPC services to be registered with the underlying server
|
||||
type grpcService interface {
|
||||
Register(*grpc.Server) error
|
||||
}
|
||||
|
||||
// tcpService allows GRPC services to be registered with the underlying tcp server
|
||||
type tcpService interface {
|
||||
RegisterTCP(*grpc.Server) error
|
||||
}
|
||||
|
||||
// ttrpcService allows TTRPC services to be registered with the underlying server
|
||||
type ttrpcService interface {
|
||||
RegisterTTRPC(*ttrpc.Server) error
|
||||
}
|
||||
|
||||
var (
|
||||
grpcServer = grpc.NewServer(serverOpts...)
|
||||
tcpServer = grpc.NewServer(tcpServerOpts...)
|
||||
|
||||
grpcServices []grpcService
|
||||
tcpServices []tcpService
|
||||
ttrpcServices []ttrpcService
|
||||
|
||||
s = &Server{
|
||||
grpcServer: grpcServer,
|
||||
tcpServer: tcpServer,
|
||||
ttrpcServer: ttrpcServer,
|
||||
config: config,
|
||||
}
|
||||
initialized = plugin.NewPluginSet()
|
||||
required = make(map[string]struct{})
|
||||
)
|
||||
for _, r := range config.RequiredPlugins {
|
||||
required[r] = struct{}{}
|
||||
}
|
||||
|
||||
if version < srvconfig.CurrentConfigVersion {
|
||||
t1 := time.Now()
|
||||
// Run migration for each configuration version
|
||||
// Run each plugin migration for each version to ensure that migration logic is simple and
|
||||
// focused on upgrading from one version at a time.
|
||||
for v := version; v < srvconfig.CurrentConfigVersion; v++ {
|
||||
for _, p := range loaded {
|
||||
if p.ConfigMigration != nil {
|
||||
if err := p.ConfigMigration(ctx, v, config.Plugins); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
migrationT = migrationT + time.Since(t1)
|
||||
}
|
||||
if migrationT > 0 {
|
||||
log.G(ctx).WithField("t", migrationT).Warnf("Configuration migrated from version %d, use `containerd config migrate` to avoid migration", version)
|
||||
}
|
||||
|
||||
for _, p := range loaded {
|
||||
id := p.URI()
|
||||
log.G(ctx).WithFields(log.Fields{"id": id, "type": p.Type}).Info("loading plugin")
|
||||
var mustSucceed int32
|
||||
|
||||
initContext := plugin.NewContext(
|
||||
ctx,
|
||||
initialized,
|
||||
map[string]string{
|
||||
plugins.PropertyRootDir: filepath.Join(config.Root, id),
|
||||
plugins.PropertyStateDir: filepath.Join(config.State, id),
|
||||
plugins.PropertyGRPCAddress: config.GRPC.Address,
|
||||
plugins.PropertyTTRPCAddress: config.TTRPC.Address,
|
||||
},
|
||||
)
|
||||
initContext.RegisterReadiness = func() func() {
|
||||
atomic.StoreInt32(&mustSucceed, 1)
|
||||
return s.RegisterReadiness()
|
||||
}
|
||||
|
||||
// load the plugin specific configuration if it is provided
|
||||
if p.Config != nil {
|
||||
pc, err := config.Decode(ctx, id, p.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
initContext.Config = pc
|
||||
}
|
||||
result := p.Init(initContext)
|
||||
if err := initialized.Add(result); err != nil {
|
||||
return nil, fmt.Errorf("could not add plugin result to plugin set: %w", err)
|
||||
}
|
||||
|
||||
instance, err := result.Instance()
|
||||
if err != nil {
|
||||
if plugin.IsSkipPlugin(err) {
|
||||
log.G(ctx).WithFields(log.Fields{"error": err, "id": id, "type": p.Type}).Info("skip loading plugin")
|
||||
} else {
|
||||
log.G(ctx).WithFields(log.Fields{"error": err, "id": id, "type": p.Type}).Warn("failed to load plugin")
|
||||
}
|
||||
if _, ok := required[id]; ok {
|
||||
return nil, fmt.Errorf("load required plugin %s: %w", id, err)
|
||||
}
|
||||
// If readiness was registered during initialization, the plugin cannot fail
|
||||
if atomic.LoadInt32(&mustSucceed) != 0 {
|
||||
return nil, fmt.Errorf("plugin failed after registering readiness %s: %w", id, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
delete(required, id)
|
||||
// check for grpc services that should be registered with the server
|
||||
if src, ok := instance.(grpcService); ok {
|
||||
grpcServices = append(grpcServices, src)
|
||||
}
|
||||
if src, ok := instance.(ttrpcService); ok {
|
||||
ttrpcServices = append(ttrpcServices, src)
|
||||
}
|
||||
if service, ok := instance.(tcpService); ok {
|
||||
tcpServices = append(tcpServices, service)
|
||||
}
|
||||
|
||||
s.plugins = append(s.plugins, result)
|
||||
}
|
||||
if len(required) != 0 {
|
||||
var missing []string
|
||||
for id := range required {
|
||||
missing = append(missing, id)
|
||||
}
|
||||
return nil, fmt.Errorf("required plugin %s not included", missing)
|
||||
}
|
||||
|
||||
// register services after all plugins have been initialized
|
||||
for _, service := range grpcServices {
|
||||
if err := service.Register(grpcServer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, service := range ttrpcServices {
|
||||
if err := service.RegisterTTRPC(ttrpcServer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, service := range tcpServices {
|
||||
if err := service.RegisterTCP(tcpServer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
recordConfigDeprecations(ctx, config, initialized)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// recordConfigDeprecations attempts to record use of any deprecated config field. Failures are logged and ignored.
|
||||
func recordConfigDeprecations(ctx context.Context, config *srvconfig.Config, set *plugin.Set) {
|
||||
// record any detected deprecations without blocking server startup
|
||||
p := set.Get(plugins.WarningPlugin, plugins.DeprecationsPlugin)
|
||||
if p == nil {
|
||||
log.G(ctx).Warn("failed to find warning service to record deprecations")
|
||||
return
|
||||
}
|
||||
instance, err := p.Instance()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Warn("failed to load warning service to record deprecations")
|
||||
return
|
||||
}
|
||||
warn, ok := instance.(warning.Service)
|
||||
if !ok {
|
||||
log.G(ctx).WithError(err).Warn("failed to load warning service to record deprecations, unexpected plugin type")
|
||||
return
|
||||
}
|
||||
|
||||
if config.PluginDir != "" { //nolint:staticcheck
|
||||
warn.Emit(ctx, deprecation.GoPluginLibrary)
|
||||
}
|
||||
}
|
||||
|
||||
// Server is the containerd main daemon
|
||||
type Server struct {
|
||||
grpcServer *grpc.Server
|
||||
ttrpcServer *ttrpc.Server
|
||||
tcpServer *grpc.Server
|
||||
config *srvconfig.Config
|
||||
plugins []*plugin.Plugin
|
||||
ready sync.WaitGroup
|
||||
}
|
||||
|
||||
// ServeGRPC provides the containerd grpc APIs on the provided listener
|
||||
func (s *Server) ServeGRPC(l net.Listener) error {
|
||||
if s.config.Metrics.GRPCHistogram {
|
||||
// enable grpc time histograms to measure rpc latencies
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
}
|
||||
// before we start serving the grpc API register the grpc_prometheus metrics
|
||||
// handler. This needs to be the last service registered so that it can collect
|
||||
// metrics for every other service
|
||||
grpc_prometheus.Register(s.grpcServer)
|
||||
return trapClosedConnErr(s.grpcServer.Serve(l))
|
||||
}
|
||||
|
||||
// ServeTTRPC provides the containerd ttrpc APIs on the provided listener
|
||||
func (s *Server) ServeTTRPC(l net.Listener) error {
|
||||
return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l))
|
||||
}
|
||||
|
||||
// ServeMetrics provides a prometheus endpoint for exposing metrics
|
||||
func (s *Server) ServeMetrics(l net.Listener) error {
|
||||
m := http.NewServeMux()
|
||||
m.Handle("/v1/metrics", metrics.Handler())
|
||||
srv := &http.Server{
|
||||
Handler: m,
|
||||
ReadHeaderTimeout: 5 * time.Minute, // "G112: Potential Slowloris Attack (gosec)"; not a real concern for our use, so setting a long timeout.
|
||||
}
|
||||
return trapClosedConnErr(srv.Serve(l))
|
||||
}
|
||||
|
||||
// ServeTCP allows services to serve over tcp
|
||||
func (s *Server) ServeTCP(l net.Listener) error {
|
||||
grpc_prometheus.Register(s.tcpServer)
|
||||
return trapClosedConnErr(s.tcpServer.Serve(l))
|
||||
}
|
||||
|
||||
// ServeDebug provides a debug endpoint
|
||||
func (s *Server) ServeDebug(l net.Listener) error {
|
||||
// don't use the default http server mux to make sure nothing gets registered
|
||||
// that we don't want to expose via containerd
|
||||
m := http.NewServeMux()
|
||||
m.Handle("/debug/vars", expvar.Handler())
|
||||
m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
|
||||
m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
|
||||
m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
|
||||
m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
|
||||
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
|
||||
srv := &http.Server{
|
||||
Handler: m,
|
||||
ReadHeaderTimeout: 5 * time.Minute, // "G112: Potential Slowloris Attack (gosec)"; not a real concern for our use, so setting a long timeout.
|
||||
}
|
||||
return trapClosedConnErr(srv.Serve(l))
|
||||
}
|
||||
|
||||
// Stop the containerd server canceling any open connections
|
||||
func (s *Server) Stop() {
|
||||
s.grpcServer.Stop()
|
||||
for i := len(s.plugins) - 1; i >= 0; i-- {
|
||||
p := s.plugins[i]
|
||||
instance, err := p.Instance()
|
||||
if err != nil {
|
||||
log.L.WithFields(log.Fields{"error": err, "id": p.Registration.URI()}).Error("could not get plugin instance")
|
||||
continue
|
||||
}
|
||||
closer, ok := instance.(io.Closer)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if err := closer.Close(); err != nil {
|
||||
log.L.WithFields(log.Fields{"error": err, "id": p.Registration.URI()}).Error("failed to close plugin")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) RegisterReadiness() func() {
|
||||
s.ready.Add(1)
|
||||
return func() {
|
||||
s.ready.Done()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Wait() {
|
||||
s.ready.Wait()
|
||||
}
|
||||
|
||||
// LoadPlugins loads all plugins into containerd and generates an ordered graph
|
||||
// of all plugins.
|
||||
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Registration, error) {
|
||||
// load all plugins into containerd
|
||||
path := config.PluginDir //nolint:staticcheck
|
||||
if path == "" {
|
||||
path = filepath.Join(config.Root, "plugins")
|
||||
}
|
||||
if count, err := dynamic.Load(path); err != nil {
|
||||
return nil, err
|
||||
} else if count > 0 || config.PluginDir != "" { //nolint:staticcheck
|
||||
config.PluginDir = path //nolint:staticcheck
|
||||
log.G(ctx).Warningf("loaded %d dynamic plugins. `go_plugin` is deprecated, please use `external plugins` instead", count)
|
||||
}
|
||||
// load additional plugins that don't automatically register themselves
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: plugins.ContentPlugin,
|
||||
ID: "content",
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
root := ic.Properties[plugins.PropertyRootDir]
|
||||
ic.Meta.Exports["root"] = root
|
||||
return local.NewStore(root)
|
||||
},
|
||||
})
|
||||
|
||||
clients := &proxyClients{}
|
||||
for name, pp := range config.ProxyPlugins {
|
||||
var (
|
||||
t plugin.Type
|
||||
f func(*grpc.ClientConn) interface{}
|
||||
|
||||
address = pp.Address
|
||||
p v1.Platform
|
||||
err error
|
||||
)
|
||||
|
||||
switch pp.Type {
|
||||
case string(plugins.SnapshotPlugin), "snapshot":
|
||||
t = plugins.SnapshotPlugin
|
||||
ssname := name
|
||||
f = func(conn *grpc.ClientConn) interface{} {
|
||||
return ssproxy.NewSnapshotter(ssapi.NewSnapshotsClient(conn), ssname)
|
||||
}
|
||||
|
||||
case string(plugins.ContentPlugin), "content":
|
||||
t = plugins.ContentPlugin
|
||||
f = func(conn *grpc.ClientConn) interface{} {
|
||||
return csproxy.NewContentStore(csapi.NewContentClient(conn))
|
||||
}
|
||||
case string(plugins.SandboxControllerPlugin), "sandbox":
|
||||
t = plugins.SandboxControllerPlugin
|
||||
f = func(conn *grpc.ClientConn) interface{} {
|
||||
return sbproxy.NewSandboxController(sbapi.NewControllerClient(conn))
|
||||
}
|
||||
case string(plugins.DiffPlugin), "diff":
|
||||
t = plugins.DiffPlugin
|
||||
f = func(conn *grpc.ClientConn) interface{} {
|
||||
return diffproxy.NewDiffApplier(diffapi.NewDiffClient(conn))
|
||||
}
|
||||
default:
|
||||
log.G(ctx).WithField("type", pp.Type).Warn("unknown proxy plugin type")
|
||||
}
|
||||
if pp.Platform != "" {
|
||||
p, err = platforms.Parse(pp.Platform)
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(log.Fields{"error": err, "plugin": name}).Warn("skipping proxy platform with bad platform")
|
||||
}
|
||||
} else {
|
||||
p = platforms.DefaultSpec()
|
||||
}
|
||||
|
||||
exports := pp.Exports
|
||||
if exports == nil {
|
||||
exports = map[string]string{}
|
||||
}
|
||||
exports["address"] = address
|
||||
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: t,
|
||||
ID: name,
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
ic.Meta.Exports = exports
|
||||
ic.Meta.Platforms = append(ic.Meta.Platforms, p)
|
||||
conn, err := clients.getClient(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f(conn), nil
|
||||
},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
filter := srvconfig.V2DisabledFilter
|
||||
// return the ordered graph for plugins
|
||||
return registry.Graph(filter(config.DisabledPlugins)), nil
|
||||
}
|
||||
|
||||
type proxyClients struct {
|
||||
m sync.Mutex
|
||||
clients map[string]*grpc.ClientConn
|
||||
}
|
||||
|
||||
func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) {
|
||||
pc.m.Lock()
|
||||
defer pc.m.Unlock()
|
||||
if pc.clients == nil {
|
||||
pc.clients = map[string]*grpc.ClientConn{}
|
||||
} else if c, ok := pc.clients[address]; ok {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
backoffConfig := backoff.DefaultConfig
|
||||
backoffConfig.MaxDelay = 3 * time.Second
|
||||
connParams := grpc.ConnectParams{
|
||||
Backoff: backoffConfig,
|
||||
}
|
||||
gopts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithConnectParams(connParams),
|
||||
grpc.WithContextDialer(dialer.ContextDialer),
|
||||
|
||||
// TODO(stevvooe): We may need to allow configuration of this on the client.
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to dial %q: %w", address, err)
|
||||
}
|
||||
|
||||
pc.clients[address] = conn
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func trapClosedConnErr(err error) error {
|
||||
if err == nil || errors.Is(err, net.ErrClosed) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
70
cmd/containerd/server/server_linux.go
Normal file
70
cmd/containerd/server/server_linux.go
Normal file
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/containerd/cgroups/v3"
|
||||
cgroup1 "github.com/containerd/cgroups/v3/cgroup1"
|
||||
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/containerd/v2/sys"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/ttrpc"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
)
|
||||
|
||||
// apply sets config settings on the server process
|
||||
func apply(ctx context.Context, config *srvconfig.Config) error {
|
||||
if config.OOMScore != 0 {
|
||||
log.G(ctx).Debugf("changing OOM score to %d", config.OOMScore)
|
||||
if err := sys.SetOOMScore(os.Getpid(), config.OOMScore); err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("failed to change OOM score to %d", config.OOMScore)
|
||||
}
|
||||
}
|
||||
if config.Cgroup.Path != "" {
|
||||
if cgroups.Mode() == cgroups.Unified {
|
||||
cg, err := cgroupsv2.Load(config.Cgroup.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cg.AddProc(uint64(os.Getpid())); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
cg, err := cgroup1.Load(cgroup1.StaticPath(config.Cgroup.Path))
|
||||
if err != nil {
|
||||
if err != cgroup1.ErrCgroupDeleted {
|
||||
return err
|
||||
}
|
||||
if cg, err = cgroup1.New(cgroup1.StaticPath(config.Cgroup.Path), &specs.LinuxResources{}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := cg.AddProc(uint64(os.Getpid())); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
|
||||
}
|
||||
27
cmd/containerd/server/server_solaris.go
Normal file
27
cmd/containerd/server/server_solaris.go
Normal file
@@ -0,0 +1,27 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
)
|
||||
|
||||
func apply(_ context.Context, _ *srvconfig.Config) error {
|
||||
return nil
|
||||
}
|
||||
149
cmd/containerd/server/server_test.go
Normal file
149
cmd/containerd/server/server_test.go
Normal file
@@ -0,0 +1,149 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/plugin"
|
||||
"github.com/containerd/plugin/registry"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const testPath = "/tmp/path/for/testing"
|
||||
|
||||
func TestCreateTopLevelDirectoriesErrorsWithSamePathForRootAndState(t *testing.T) {
|
||||
path := testPath
|
||||
err := CreateTopLevelDirectories(&srvconfig.Config{
|
||||
Root: path,
|
||||
State: path,
|
||||
})
|
||||
assert.EqualError(t, err, "root and state must be different paths")
|
||||
}
|
||||
|
||||
func TestCreateTopLevelDirectoriesWithEmptyStatePath(t *testing.T) {
|
||||
statePath := ""
|
||||
rootPath := testPath
|
||||
err := CreateTopLevelDirectories(&srvconfig.Config{
|
||||
Root: rootPath,
|
||||
State: statePath,
|
||||
})
|
||||
assert.EqualError(t, err, "state must be specified")
|
||||
}
|
||||
|
||||
func TestCreateTopLevelDirectoriesWithEmptyRootPath(t *testing.T) {
|
||||
statePath := testPath
|
||||
rootPath := ""
|
||||
err := CreateTopLevelDirectories(&srvconfig.Config{
|
||||
Root: rootPath,
|
||||
State: statePath,
|
||||
})
|
||||
assert.EqualError(t, err, "root must be specified")
|
||||
}
|
||||
|
||||
func TestMigration(t *testing.T) {
|
||||
registry.Reset()
|
||||
defer registry.Reset()
|
||||
|
||||
version := srvconfig.CurrentConfigVersion - 1
|
||||
|
||||
type testConfig struct {
|
||||
Migrated string `toml:"migrated"`
|
||||
NotMigrated string `toml:"notmigrated"`
|
||||
}
|
||||
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: "io.containerd.test",
|
||||
ID: "t1",
|
||||
Config: &testConfig{},
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
c, ok := ic.Config.(*testConfig)
|
||||
if !ok {
|
||||
t.Error("expected first plugin to have configuration")
|
||||
} else {
|
||||
if c.Migrated != "" {
|
||||
t.Error("expected first plugin to have empty value for migrated config")
|
||||
}
|
||||
if c.NotMigrated != "don't migrate me" {
|
||||
t.Errorf("expected first plugin does not have correct value for not migrated config: %q", c.NotMigrated)
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: "io.containerd.new",
|
||||
Requires: []plugin.Type{
|
||||
"io.containerd.test", // Ensure this test runs second
|
||||
},
|
||||
ID: "t2",
|
||||
Config: &testConfig{},
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
c, ok := ic.Config.(*testConfig)
|
||||
if !ok {
|
||||
t.Error("expected second plugin to have configuration")
|
||||
} else {
|
||||
if c.Migrated != "migrate me" {
|
||||
t.Errorf("expected second plugin does not have correct value for migrated config: %q", c.Migrated)
|
||||
}
|
||||
if c.NotMigrated != "" {
|
||||
t.Error("expected second plugin to have empty value for not migrated config")
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
ConfigMigration: func(ctx context.Context, v int, plugins map[string]interface{}) error {
|
||||
if v != version {
|
||||
t.Errorf("unxpected version: %d", v)
|
||||
}
|
||||
t1, ok := plugins["io.containerd.test.t1"]
|
||||
if !ok {
|
||||
t.Error("plugin not set as expected")
|
||||
return nil
|
||||
}
|
||||
conf, ok := t1.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Errorf("unexpected config value: %v", t1)
|
||||
return nil
|
||||
}
|
||||
newconf := map[string]interface{}{
|
||||
"migrated": conf["migrated"],
|
||||
}
|
||||
delete(conf, "migrated")
|
||||
plugins["io.containerd.new.t2"] = newconf
|
||||
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
config := &srvconfig.Config{}
|
||||
config.Version = version
|
||||
config.Plugins = map[string]interface{}{
|
||||
"io.containerd.test.t1": map[string]interface{}{
|
||||
"migrated": "migrate me",
|
||||
"notmigrated": "don't migrate me",
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
_, err := New(ctx, config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
34
cmd/containerd/server/server_unsupported.go
Normal file
34
cmd/containerd/server/server_unsupported.go
Normal file
@@ -0,0 +1,34 @@
|
||||
//go:build !linux && !windows && !solaris
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func apply(_ context.Context, _ *srvconfig.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
32
cmd/containerd/server/server_windows.go
Normal file
32
cmd/containerd/server/server_windows.go
Normal file
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func apply(_ context.Context, _ *srvconfig.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
Reference in New Issue
Block a user