266 lines
7.2 KiB
Go
266 lines
7.2 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cri
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/containerd/log"
|
|
"github.com/containerd/plugin"
|
|
"github.com/containerd/plugin/registry"
|
|
"google.golang.org/grpc"
|
|
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
|
|
|
containerd "github.com/containerd/containerd/v2/client"
|
|
"github.com/containerd/containerd/v2/core/sandbox"
|
|
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
|
"github.com/containerd/containerd/v2/internal/cri/constants"
|
|
"github.com/containerd/containerd/v2/internal/cri/instrument"
|
|
"github.com/containerd/containerd/v2/internal/cri/server"
|
|
nriservice "github.com/containerd/containerd/v2/internal/nri"
|
|
"github.com/containerd/containerd/v2/plugins"
|
|
"github.com/containerd/containerd/v2/plugins/services/warning"
|
|
"github.com/containerd/containerd/v2/version"
|
|
"github.com/containerd/platforms"
|
|
)
|
|
|
|
// Register CRI service plugin
|
|
func init() {
|
|
defaultConfig := criconfig.DefaultServerConfig()
|
|
registry.Register(&plugin.Registration{
|
|
Type: plugins.GRPCPlugin,
|
|
ID: "cri",
|
|
Requires: []plugin.Type{
|
|
plugins.CRIServicePlugin,
|
|
plugins.PodSandboxPlugin,
|
|
plugins.SandboxControllerPlugin,
|
|
plugins.NRIApiPlugin,
|
|
plugins.EventPlugin,
|
|
plugins.ServicePlugin,
|
|
plugins.LeasePlugin,
|
|
plugins.SandboxStorePlugin,
|
|
plugins.TransferPlugin,
|
|
plugins.WarningPlugin,
|
|
},
|
|
Config: &defaultConfig,
|
|
ConfigMigration: configMigration,
|
|
InitFn: initCRIService,
|
|
})
|
|
}
|
|
|
|
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
|
|
ctx := ic.Context
|
|
config := ic.Config.(*criconfig.ServerConfig)
|
|
|
|
// Get runtime service.
|
|
criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to load CRI runtime service plugin dependency: %w", err)
|
|
}
|
|
|
|
// Get image service.
|
|
criImagePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "images")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err)
|
|
}
|
|
|
|
if warnings, err := criconfig.ValidateServerConfig(ic.Context, config); err != nil {
|
|
return nil, fmt.Errorf("invalid cri image config: %w", err)
|
|
} else if len(warnings) > 0 {
|
|
ws, err := ic.GetSingle(plugins.WarningPlugin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
warn := ws.(warning.Service)
|
|
for _, w := range warnings {
|
|
warn.Emit(ic.Context, w)
|
|
}
|
|
}
|
|
|
|
log.G(ctx).Info("Connect containerd service")
|
|
client, err := containerd.New(
|
|
"",
|
|
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
|
|
containerd.WithDefaultPlatform(platforms.Default()),
|
|
containerd.WithInMemoryServices(ic),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create containerd client: %w", err)
|
|
}
|
|
|
|
sbControllers, err := getSandboxControllers(ic)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get sandbox controllers from plugins %v", err)
|
|
}
|
|
|
|
streamingConfig, err := config.StreamingConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get streaming config: %w", err)
|
|
}
|
|
|
|
options := &server.CRIServiceOptions{
|
|
RuntimeService: criRuntimePlugin.(server.RuntimeService),
|
|
ImageService: criImagePlugin.(server.ImageService),
|
|
StreamingConfig: streamingConfig,
|
|
NRI: getNRIAPI(ic),
|
|
Client: client,
|
|
SandboxControllers: sbControllers,
|
|
}
|
|
is := criImagePlugin.(imageService).GRPCService()
|
|
|
|
s, rs, err := server.NewCRIService(options)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create CRI service: %w", err)
|
|
}
|
|
|
|
// RegisterReadiness() must be called after NewCRIService(): https://github.com/containerd/containerd/issues/9163
|
|
ready := ic.RegisterReadiness()
|
|
go func() {
|
|
if err := s.Run(ready); err != nil {
|
|
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
|
|
}
|
|
// TODO(random-liu): Whether and how we can stop containerd.
|
|
}()
|
|
|
|
service := &criGRPCServer{
|
|
RuntimeServiceServer: rs,
|
|
ImageServiceServer: is,
|
|
Closer: s, // TODO: Where is close run?
|
|
initializer: s,
|
|
}
|
|
|
|
if config.DisableTCPService {
|
|
return service, nil
|
|
}
|
|
|
|
return criGRPCServerWithTCP{service}, nil
|
|
}
|
|
|
|
type imageService interface {
|
|
GRPCService() runtime.ImageServiceServer
|
|
}
|
|
|
|
type initializer interface {
|
|
IsInitialized() bool
|
|
}
|
|
|
|
type criGRPCServer struct {
|
|
runtime.RuntimeServiceServer
|
|
runtime.ImageServiceServer
|
|
io.Closer
|
|
initializer
|
|
}
|
|
|
|
func (c *criGRPCServer) register(s *grpc.Server) error {
|
|
instrumented := instrument.NewService(c)
|
|
runtime.RegisterRuntimeServiceServer(s, instrumented)
|
|
runtime.RegisterImageServiceServer(s, instrumented)
|
|
return nil
|
|
}
|
|
|
|
// Register registers all required services onto a specific grpc server.
|
|
// This is used by containerd cri plugin.
|
|
func (c *criGRPCServer) Register(s *grpc.Server) error {
|
|
return c.register(s)
|
|
}
|
|
|
|
type criGRPCServerWithTCP struct {
|
|
*criGRPCServer
|
|
}
|
|
|
|
// RegisterTCP register all required services onto a GRPC server on TCP.
|
|
// This is used by containerd CRI plugin.
|
|
func (c criGRPCServerWithTCP) RegisterTCP(s *grpc.Server) error {
|
|
return c.register(s)
|
|
}
|
|
|
|
// Get the NRI plugin, and set up our NRI API for it.
|
|
func getNRIAPI(ic *plugin.InitContext) nriservice.API {
|
|
const (
|
|
pluginType = plugins.NRIApiPlugin
|
|
pluginName = "nri"
|
|
)
|
|
|
|
ctx := ic.Context
|
|
|
|
p, err := ic.GetByID(pluginType, pluginName)
|
|
if err != nil {
|
|
log.G(ctx).Info("NRI service not found, NRI support disabled")
|
|
return nil
|
|
}
|
|
|
|
api, ok := p.(nriservice.API)
|
|
if !ok {
|
|
log.G(ctx).Infof("NRI plugin (%s, %q) has incorrect type %T, NRI support disabled",
|
|
pluginType, pluginName, api)
|
|
return nil
|
|
}
|
|
|
|
log.G(ctx).Info("using experimental NRI integration - disable nri plugin to prevent this")
|
|
return api
|
|
}
|
|
|
|
func getSandboxControllers(ic *plugin.InitContext) (map[string]sandbox.Controller, error) {
|
|
sc := make(map[string]sandbox.Controller)
|
|
sandboxers, err := ic.GetByType(plugins.SandboxControllerPlugin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for name, p := range sandboxers {
|
|
sc[name] = p.(sandbox.Controller)
|
|
}
|
|
|
|
podSandboxers, err := ic.GetByType(plugins.PodSandboxPlugin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for name, p := range podSandboxers {
|
|
sc[name] = p.(sandbox.Controller)
|
|
}
|
|
return sc, nil
|
|
}
|
|
|
|
func configMigration(ctx context.Context, configVersion int, pluginConfigs map[string]interface{}) error {
|
|
if configVersion >= version.ConfigVersion {
|
|
return nil
|
|
}
|
|
const pluginName = string(plugins.GRPCPlugin) + ".cri"
|
|
src, ok := pluginConfigs[pluginName].(map[string]interface{})
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
dst := map[string]interface{}{}
|
|
for _, k := range []string{
|
|
"disable_tcp_service",
|
|
"stream_server_address",
|
|
"stream_server_port",
|
|
"stream_idle_timeout",
|
|
"enable_tls_streaming",
|
|
"x509_key_pair_streaming",
|
|
} {
|
|
if val, ok := src[k]; ok {
|
|
dst[k] = val
|
|
}
|
|
}
|
|
|
|
pluginConfigs[pluginName] = dst
|
|
return nil
|
|
}
|