Move CRI image service into a separate plugin

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
Maksym Pavlenko 2023-09-20 12:12:37 -07:00 committed by Abel Feng
parent 349c8d12c8
commit e15c246550
7 changed files with 209 additions and 120 deletions

View File

@ -1,46 +0,0 @@
//go:build gofuzz
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fuzz
import (
fuzz "github.com/AdaLogics/go-fuzz-headers"
containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/server"
)
func FuzzCRISandboxServer(data []byte) int {
initDaemon.Do(startDaemon)
f := fuzz.NewConsumer(data)
client, err := containerd.New(defaultAddress)
if err != nil {
return 0
}
defer client.Close()
c, err := server.NewCRIService(criconfig.Config{}, client, nil)
if err != nil {
panic(err)
}
return fuzzCRI(f, c)
}

View File

@ -24,6 +24,7 @@ import (
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/server" "github.com/containerd/containerd/v2/pkg/cri/server"
"github.com/containerd/containerd/v2/pkg/cri/server/images"
) )
func FuzzCRIServer(data []byte) int { func FuzzCRIServer(data []byte) int {
@ -37,7 +38,12 @@ func FuzzCRIServer(data []byte) int {
} }
defer client.Close() defer client.Close()
c, err := server.NewCRIService(criconfig.Config{}, client, nil) imageService, err := images.NewService(criconfig.Config{}, client)
if err != nil {
panic(err)
}
c, err := server.NewCRIService(criconfig.Config{}, imageService, client, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -17,21 +17,19 @@
package cri package cri
import ( import (
"flag"
"fmt" "fmt"
"path/filepath"
"github.com/containerd/log" "github.com/containerd/log"
"github.com/containerd/plugin" "github.com/containerd/plugin"
"github.com/containerd/plugin/registry" "github.com/containerd/plugin/registry"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"k8s.io/klog/v2"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/pkg/cri/constants"
"github.com/containerd/containerd/v2/pkg/cri/nri" "github.com/containerd/containerd/v2/pkg/cri/nri"
"github.com/containerd/containerd/v2/pkg/cri/server" "github.com/containerd/containerd/v2/pkg/cri/server"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
"github.com/containerd/containerd/v2/pkg/cri/server/images"
nriservice "github.com/containerd/containerd/v2/pkg/nri" nriservice "github.com/containerd/containerd/v2/pkg/nri"
"github.com/containerd/containerd/v2/platforms" "github.com/containerd/containerd/v2/platforms"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
@ -43,7 +41,7 @@ func init() {
config := criconfig.DefaultConfig() config := criconfig.DefaultConfig()
registry.Register(&plugin.Registration{ registry.Register(&plugin.Registration{
Type: plugins.GRPCPlugin, Type: plugins.GRPCPlugin,
ID: "cri", ID: "cri-runtime-service",
Config: &config, Config: &config,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugins.EventPlugin, plugins.EventPlugin,
@ -57,8 +55,6 @@ func init() {
} }
func initCRIService(ic *plugin.InitContext) (interface{}, error) { func initCRIService(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
ctx := ic.Context ctx := ic.Context
pluginConfig := ic.Config.(*criconfig.PluginConfig) pluginConfig := ic.Config.(*criconfig.PluginConfig)
if warnings, err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil { if warnings, err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil {
@ -74,18 +70,19 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
} }
} }
c := criconfig.Config{ // Get base CRI dependencies.
PluginConfig: *pluginConfig, criBasePlugin, err := ic.GetByID(plugins.GRPCPlugin, "cri")
ContainerdRootDir: filepath.Dir(ic.Properties[plugins.PropertyRootDir]), if err != nil {
ContainerdEndpoint: ic.Properties[plugins.PropertyGRPCAddress], return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err)
RootDir: ic.Properties[plugins.PropertyRootDir],
StateDir: ic.Properties[plugins.PropertyStateDir],
} }
log.G(ctx).Infof("Start cri plugin with config %+v", c) criBase := criBasePlugin.(*base.CRIBase)
if err := setGLogLevel(); err != nil { // Get image service.
return nil, fmt.Errorf("failed to set glog level: %w", err) criImagePlugin, err := ic.GetByID(plugins.GRPCPlugin, "cri-image-service")
if err != nil {
return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err)
} }
imageService := criImagePlugin.(*images.CRIImageService)
log.G(ctx).Info("Connect containerd service") log.G(ctx).Info("Connect containerd service")
client, err := containerd.New( client, err := containerd.New(
@ -95,11 +92,8 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
containerd.WithInMemoryServices(ic), containerd.WithInMemoryServices(ic),
containerd.WithInMemorySandboxControllers(ic), containerd.WithInMemorySandboxControllers(ic),
) )
if err != nil {
return nil, fmt.Errorf("failed to create containerd client: %w", err)
}
s, err := server.NewCRIService(c, client, getNRIAPI(ic)) s, err := server.NewCRIService(criBase.Config, imageService, client, getNRIAPI(ic))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create CRI service: %w", err) return nil, fmt.Errorf("failed to create CRI service: %w", err)
} }
@ -116,27 +110,6 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return s, nil return s, nil
} }
// Set glog level.
func setGLogLevel() error {
l := log.GetLevel()
fs := flag.NewFlagSet("klog", flag.PanicOnError)
klog.InitFlags(fs)
if err := fs.Set("logtostderr", "true"); err != nil {
return err
}
switch l {
case log.TraceLevel:
return fs.Set("v", "5")
case log.DebugLevel:
return fs.Set("v", "4")
case log.InfoLevel:
return fs.Set("v", "2")
default:
// glog doesn't support other filters. Defaults to v=0.
}
return nil
}
// Get the NRI plugin, and set up our NRI API for it. // Get the NRI plugin, and set up our NRI API for it.
func getNRIAPI(ic *plugin.InitContext) *nri.API { func getNRIAPI(ic *plugin.InitContext) *nri.API {
const ( const (

View File

@ -0,0 +1,121 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package base
import (
"flag"
"fmt"
"path/filepath"
"github.com/containerd/log"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"k8s.io/klog/v2"
"github.com/containerd/containerd"
criconfig "github.com/containerd/containerd/pkg/cri/config"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/plugin/registry"
"github.com/containerd/containerd/plugins"
)
// CRIBase contains common dependencies for CRI's runtime, image, and podsandbox services.
type CRIBase struct {
// Config contains all configurations.
Config criconfig.Config
// Client is an instance of the containerd client
Client *containerd.Client
}
func init() {
config := criconfig.DefaultConfig()
// Base plugin that other CRI services depend on.
registry.Register(&plugin.Registration{
Type: plugins.GRPCPlugin,
ID: "cri",
Config: &config,
Requires: []plugin.Type{
plugins.EventPlugin,
plugins.ServicePlugin,
plugins.NRIApiPlugin,
},
InitFn: initCRIBase,
})
}
func initCRIBase(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
ctx := ic.Context
pluginConfig := ic.Config.(*criconfig.PluginConfig)
if err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil {
return nil, fmt.Errorf("invalid plugin config: %w", err)
}
c := criconfig.Config{
PluginConfig: *pluginConfig,
ContainerdRootDir: filepath.Dir(ic.Properties[plugins.PropertyRootDir]),
ContainerdEndpoint: ic.Properties[plugins.PropertyGRPCAddress],
RootDir: ic.Properties[plugins.PropertyRootDir],
StateDir: ic.Properties[plugins.PropertyStateDir],
}
log.G(ctx).Infof("Start cri plugin with config %+v", c)
if err := setGLogLevel(); err != nil {
return nil, fmt.Errorf("failed to set glog level: %w", err)
}
log.G(ctx).Info("Connect containerd service")
client, err := containerd.New(
"",
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
containerd.WithDefaultPlatform(platforms.Default()),
containerd.WithInMemoryServices(ic),
)
if err != nil {
return nil, fmt.Errorf("failed to create containerd client: %w", err)
}
return &CRIBase{
Config: c,
Client: client,
}, nil
}
// Set glog level.
func setGLogLevel() error {
l := log.GetLevel()
fs := flag.NewFlagSet("klog", flag.PanicOnError)
klog.InitFlags(fs)
if err := fs.Set("logtostderr", "true"); err != nil {
return err
}
switch l {
case log.TraceLevel:
return fs.Set("v", "5")
case log.DebugLevel:
return fs.Set("v", "4")
case log.InfoLevel:
return fs.Set("v", "2")
default:
// glog doesn't support other filters. Defaults to v=0.
}
return nil
}

View File

@ -286,6 +286,8 @@ func (s *fakeImageService) LocalResolve(refOrID string) (imagestore.Image, error
return imagestore.Image{}, errors.New("not implemented") return imagestore.Image{}, errors.New("not implemented")
} }
func (s *fakeImageService) ImageFSPaths() map[string]string { return make(map[string]string) }
func patchExceptedWithState(expected *runtime.ContainerStatus, state runtime.ContainerState) { func patchExceptedWithState(expected *runtime.ContainerStatus, state runtime.ContainerState) {
expected.State = state expected.State = state
switch state { switch state {

View File

@ -19,21 +19,50 @@ package images
import ( import (
"context" "context"
"fmt" "fmt"
"path/filepath"
"time" "time"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
docker "github.com/distribution/reference"
imagedigest "github.com/opencontainers/go-digest"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image"
snapshotstore "github.com/containerd/containerd/v2/pkg/cri/store/snapshot" snapshotstore "github.com/containerd/containerd/v2/pkg/cri/store/snapshot"
ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util" ctrdutil "github.com/containerd/containerd/v2/pkg/cri/util"
"github.com/containerd/containerd/v2/pkg/kmutex" "github.com/containerd/containerd/v2/pkg/kmutex"
"github.com/containerd/containerd/v2/platforms" "github.com/containerd/containerd/v2/platforms"
"github.com/containerd/containerd/v2/plugins"
snapshot "github.com/containerd/containerd/v2/snapshots" snapshot "github.com/containerd/containerd/v2/snapshots"
"github.com/containerd/log"
docker "github.com/distribution/reference"
imagedigest "github.com/opencontainers/go-digest"
) )
func init() {
registry.Register(&plugin.Registration{
Type: plugins.GRPCPlugin,
ID: "cri-image-service",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
// Get base CRI dependencies.
criPlugin, err := ic.GetByID(plugins.GRPCPlugin, "cri")
if err != nil {
return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err)
}
cri := criPlugin.(*base.CRIBase)
service, err := NewService(cri.Config, cri.Client)
if err != nil {
return nil, fmt.Errorf("failed to create image service: %w", err)
}
return service, nil
},
})
}
type CRIImageService struct { type CRIImageService struct {
// config contains all configurations. // config contains all configurations.
config criconfig.Config config criconfig.Config
@ -51,7 +80,25 @@ type CRIImageService struct {
unpackDuplicationSuppressor kmutex.KeyedLocker unpackDuplicationSuppressor kmutex.KeyedLocker
} }
func NewService(config criconfig.Config, imageFSPaths map[string]string, client *containerd.Client) (*CRIImageService, error) { func NewService(config criconfig.Config, client *containerd.Client) (*CRIImageService, error) {
if client.SnapshotService(config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", config.ContainerdConfig.Snapshotter)
}
imageFSPaths := map[string]string{}
for _, ociRuntime := range config.ContainerdConfig.Runtimes {
// Can not use `c.RuntimeSnapshotter() yet, so hard-coding here.`
snapshotter := ociRuntime.Snapshotter
if snapshotter != "" {
imageFSPaths[snapshotter] = imageFSPath(config.ContainerdRootDir, snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
}
}
snapshotter := config.ContainerdConfig.Snapshotter
imageFSPaths[snapshotter] = imageFSPath(config.ContainerdRootDir, snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
svc := CRIImageService{ svc := CRIImageService{
config: config, config: config,
client: client, client: client,
@ -94,6 +141,12 @@ func NewService(config criconfig.Config, imageFSPaths map[string]string, client
return &svc, nil return &svc, nil
} }
// imageFSPath returns containerd image filesystem path.
// Note that if containerd changes directory layout, we also needs to change this.
func imageFSPath(rootDir, snapshotter string) string {
return filepath.Join(rootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
}
// LocalResolve resolves image reference locally and returns corresponding image metadata. It // LocalResolve resolves image reference locally and returns corresponding image metadata. It
// returns errdefs.ErrNotFound if the reference doesn't exist. // returns errdefs.ErrNotFound if the reference doesn't exist.
func (c *CRIImageService) LocalResolve(refOrID string) (imagestore.Image, error) { func (c *CRIImageService) LocalResolve(refOrID string) (imagestore.Image, error) {
@ -148,3 +201,7 @@ func (c *CRIImageService) GetSnapshot(key, snapshotter string) (snapshotstore.Sn
} }
return c.snapshotStore.Get(snapshotKey) return c.snapshotStore.Get(snapshotKey)
} }
func (c *CRIImageService) ImageFSPaths() map[string]string {
return c.imageFSPaths
}

View File

@ -38,7 +38,6 @@ import (
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/instrument" "github.com/containerd/containerd/v2/pkg/cri/instrument"
"github.com/containerd/containerd/v2/pkg/cri/nri" "github.com/containerd/containerd/v2/pkg/cri/nri"
"github.com/containerd/containerd/v2/pkg/cri/server/images"
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox" "github.com/containerd/containerd/v2/pkg/cri/server/podsandbox"
containerstore "github.com/containerd/containerd/v2/pkg/cri/store/container" containerstore "github.com/containerd/containerd/v2/pkg/cri/store/container"
imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image"
@ -83,6 +82,8 @@ type imageService interface {
GetSnapshot(key, snapshotter string) (snapshotstore.Snapshot, error) GetSnapshot(key, snapshotter string) (snapshotstore.Snapshot, error)
LocalResolve(refOrID string) (imagestore.Image, error) LocalResolve(refOrID string) (imagestore.Image, error)
ImageFSPaths() map[string]string
} }
// criService implements CRIService. // criService implements CRIService.
@ -133,39 +134,14 @@ type criService struct {
} }
// NewCRIService returns a new instance of CRIService // NewCRIService returns a new instance of CRIService
func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) { func NewCRIService(config criconfig.Config, imageService imageService, client *containerd.Client, nri *nri.API) (CRIService, error) {
var err error var err error
labels := label.NewStore() labels := label.NewStore()
if client.SnapshotService(config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", config.ContainerdConfig.Snapshotter)
}
imageFSPaths := map[string]string{}
for _, ociRuntime := range config.ContainerdConfig.Runtimes {
// Can not use `c.RuntimeSnapshotter() yet, so hard-coding here.`
snapshotter := ociRuntime.Snapshotter
if snapshotter != "" {
imageFSPaths[snapshotter] = imageFSPath(config.ContainerdRootDir, snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
}
}
snapshotter := config.ContainerdConfig.Snapshotter
imageFSPaths[snapshotter] = imageFSPath(config.ContainerdRootDir, snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
// TODO: expose this as a separate containerd plugin.
imageService, err := images.NewService(config, imageFSPaths, client)
if err != nil {
return nil, fmt.Errorf("unable to create CRI image service: %w", err)
}
c := &criService{ c := &criService{
imageService: imageService, imageService: imageService,
config: config, config: config,
client: client, client: client,
imageFSPaths: imageFSPaths, imageFSPaths: imageService.ImageFSPaths(),
os: osinterface.RealOS{}, os: osinterface.RealOS{},
sandboxStore: sandboxstore.NewStore(labels), sandboxStore: sandboxstore.NewStore(labels),
containerStore: containerstore.NewStore(labels), containerStore: containerstore.NewStore(labels),