Split CRI image service from GRPC handler

Prepares the CRI image service for splitting CRI into multiple plugins.
Also prepares for config migration which will spread across multiple
different plugins.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2023-09-20 09:22:47 -07:00
parent a2f167e836
commit d23ac1122e
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
18 changed files with 349 additions and 201 deletions

View File

@ -80,7 +80,13 @@ func printExecutions() {
} }
} }
func fuzzCRI(f *fuzz.ConsumeFuzzer, c server.CRIService) int { type fuzzCRIService interface {
server.CRIService
runtime.RuntimeServiceServer
runtime.ImageServiceServer
}
func fuzzCRI(f *fuzz.ConsumeFuzzer, c fuzzCRIService) int {
executionOrder = make([]string, 0) executionOrder = make([]string, 0)
defer printExecutions() defer printExecutions()
@ -157,7 +163,7 @@ func logExecution(apiName, request string) {
// createContainerFuzz creates a CreateContainerRequest and passes // createContainerFuzz creates a CreateContainerRequest and passes
// it to c.CreateContainer // it to c.CreateContainer
func createContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func createContainerFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.CreateContainerRequest{} r := &runtime.CreateContainerRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -171,7 +177,7 @@ func createContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// removeContainerFuzz creates a RemoveContainerRequest and passes // removeContainerFuzz creates a RemoveContainerRequest and passes
// it to c.RemoveContainer // it to c.RemoveContainer
func removeContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func removeContainerFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.RemoveContainerRequest{} r := &runtime.RemoveContainerRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -183,7 +189,7 @@ func removeContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
return nil return nil
} }
func sandboxStore(cs server.CRIService) (*sandboxstore.Store, error) { func sandboxStore(cs fuzzCRIService) (*sandboxstore.Store, error) {
var ( var (
ss *sandboxstore.Store ss *sandboxstore.Store
err error err error
@ -201,7 +207,7 @@ func sandboxStore(cs server.CRIService) (*sandboxstore.Store, error) {
} }
// addSandboxesFuzz creates a sandbox and adds it to the sandboxstore // addSandboxesFuzz creates a sandbox and adds it to the sandboxstore
func addSandboxesFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func addSandboxesFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
quantity, err := f.GetInt() quantity, err := f.GetInt()
if err != nil { if err != nil {
return err return err
@ -246,7 +252,7 @@ func getSandboxFuzz(f *fuzz.ConsumeFuzzer) (sandboxstore.Sandbox, error) {
// listContainersFuzz creates a ListContainersRequest and passes // listContainersFuzz creates a ListContainersRequest and passes
// it to c.ListContainers // it to c.ListContainers
func listContainersFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func listContainersFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ListContainersRequest{} r := &runtime.ListContainersRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -260,7 +266,7 @@ func listContainersFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// startContainerFuzz creates a StartContainerRequest and passes // startContainerFuzz creates a StartContainerRequest and passes
// it to c.StartContainer // it to c.StartContainer
func startContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func startContainerFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.StartContainerRequest{} r := &runtime.StartContainerRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -274,7 +280,7 @@ func startContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// containerStatsFuzz creates a ContainerStatsRequest and passes // containerStatsFuzz creates a ContainerStatsRequest and passes
// it to c.ContainerStats // it to c.ContainerStats
func containerStatsFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func containerStatsFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ContainerStatsRequest{} r := &runtime.ContainerStatsRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -288,7 +294,7 @@ func containerStatsFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// listContainerStatsFuzz creates a ListContainerStatsRequest and // listContainerStatsFuzz creates a ListContainerStatsRequest and
// passes it to c.ListContainerStats // passes it to c.ListContainerStats
func listContainerStatsFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func listContainerStatsFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ListContainerStatsRequest{} r := &runtime.ListContainerStatsRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -302,7 +308,7 @@ func listContainerStatsFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// containerStatusFuzz creates a ContainerStatusRequest and passes // containerStatusFuzz creates a ContainerStatusRequest and passes
// it to c.ContainerStatus // it to c.ContainerStatus
func containerStatusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func containerStatusFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ContainerStatusRequest{} r := &runtime.ContainerStatusRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -316,7 +322,7 @@ func containerStatusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// stopContainerFuzz creates a StopContainerRequest and passes // stopContainerFuzz creates a StopContainerRequest and passes
// it to c.StopContainer // it to c.StopContainer
func stopContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func stopContainerFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.StopContainerRequest{} r := &runtime.StopContainerRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -330,7 +336,7 @@ func stopContainerFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// updateContainerResourcesFuzz creates a UpdateContainerResourcesRequest // updateContainerResourcesFuzz creates a UpdateContainerResourcesRequest
// and passes it to c.UpdateContainerResources // and passes it to c.UpdateContainerResources
func updateContainerResourcesFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func updateContainerResourcesFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.UpdateContainerResourcesRequest{} r := &runtime.UpdateContainerResourcesRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -344,7 +350,7 @@ func updateContainerResourcesFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) er
// listImagesFuzz creates a ListImagesRequest and passes it to // listImagesFuzz creates a ListImagesRequest and passes it to
// c.ListImages // c.ListImages
func listImagesFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func listImagesFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ListImagesRequest{} r := &runtime.ListImagesRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -358,7 +364,7 @@ func listImagesFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// removeImagesFuzz creates a RemoveImageRequest and passes it to // removeImagesFuzz creates a RemoveImageRequest and passes it to
// c.RemoveImage // c.RemoveImage
func removeImagesFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func removeImagesFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.RemoveImageRequest{} r := &runtime.RemoveImageRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -372,7 +378,7 @@ func removeImagesFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// imageStatusFuzz creates an ImageStatusRequest and passes it to // imageStatusFuzz creates an ImageStatusRequest and passes it to
// c.ImageStatus // c.ImageStatus
func imageStatusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func imageStatusFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ImageStatusRequest{} r := &runtime.ImageStatusRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -386,7 +392,7 @@ func imageStatusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// imageFsInfoFuzz creates an ImageFsInfoRequest and passes it to // imageFsInfoFuzz creates an ImageFsInfoRequest and passes it to
// c.ImageFsInfo // c.ImageFsInfo
func imageFsInfoFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func imageFsInfoFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ImageFsInfoRequest{} r := &runtime.ImageFsInfoRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -400,7 +406,7 @@ func imageFsInfoFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// listPodSandboxFuzz creates a ListPodSandboxRequest and passes // listPodSandboxFuzz creates a ListPodSandboxRequest and passes
// it to c.ListPodSandbox // it to c.ListPodSandbox
func listPodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func listPodSandboxFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.ListPodSandboxRequest{} r := &runtime.ListPodSandboxRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -414,7 +420,7 @@ func listPodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// portForwardFuzz creates a PortForwardRequest and passes it to // portForwardFuzz creates a PortForwardRequest and passes it to
// c.PortForward // c.PortForward
func portForwardFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func portForwardFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.PortForwardRequest{} r := &runtime.PortForwardRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -428,7 +434,7 @@ func portForwardFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// removePodSandboxFuzz creates a RemovePodSandboxRequest and // removePodSandboxFuzz creates a RemovePodSandboxRequest and
// passes it to c.RemovePodSandbox // passes it to c.RemovePodSandbox
func removePodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func removePodSandboxFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.RemovePodSandboxRequest{} r := &runtime.RemovePodSandboxRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -442,7 +448,7 @@ func removePodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// runPodSandboxFuzz creates a RunPodSandboxRequest and passes // runPodSandboxFuzz creates a RunPodSandboxRequest and passes
// it to c.RunPodSandbox // it to c.RunPodSandbox
func runPodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func runPodSandboxFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.RunPodSandboxRequest{} r := &runtime.RunPodSandboxRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -456,7 +462,7 @@ func runPodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// podSandboxStatusFuzz creates a PodSandboxStatusRequest and // podSandboxStatusFuzz creates a PodSandboxStatusRequest and
// passes it to // passes it to
func podSandboxStatusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func podSandboxStatusFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.PodSandboxStatusRequest{} r := &runtime.PodSandboxStatusRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -470,7 +476,7 @@ func podSandboxStatusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
// stopPodSandboxFuzz creates a StopPodSandboxRequest and passes // stopPodSandboxFuzz creates a StopPodSandboxRequest and passes
// it to c.StopPodSandbox // it to c.StopPodSandbox
func stopPodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func stopPodSandboxFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.StopPodSandboxRequest{} r := &runtime.StopPodSandboxRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -483,7 +489,7 @@ func stopPodSandboxFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
} }
// statusFuzz creates a StatusRequest and passes it to c.Status // statusFuzz creates a StatusRequest and passes it to c.Status
func statusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func statusFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.StatusRequest{} r := &runtime.StatusRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {
@ -495,7 +501,7 @@ func statusFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error {
return nil return nil
} }
func updateRuntimeConfigFuzz(c server.CRIService, f *fuzz.ConsumeFuzzer) error { func updateRuntimeConfigFuzz(c fuzzCRIService, f *fuzz.ConsumeFuzzer) error {
r := &runtime.UpdateRuntimeConfigRequest{} r := &runtime.UpdateRuntimeConfigRequest{}
err := f.GenerateStruct(r) err := f.GenerateStruct(r)
if err != nil { if err != nil {

View File

@ -20,12 +20,14 @@ package fuzz
import ( import (
fuzz "github.com/AdaLogics/go-fuzz-headers" fuzz "github.com/AdaLogics/go-fuzz-headers"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/oci" "github.com/containerd/containerd/v2/oci"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/instrument"
"github.com/containerd/containerd/v2/pkg/cri/server" "github.com/containerd/containerd/v2/pkg/cri/server"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
"github.com/containerd/containerd/v2/pkg/cri/server/images" "github.com/containerd/containerd/v2/pkg/cri/server/images"
) )
@ -42,20 +44,38 @@ func FuzzCRIServer(data []byte) int {
config := criconfig.Config{} config := criconfig.Config{}
criBase := &base.CRIBase{ imageService, err := images.NewService(config, map[string]string{}, client)
Config: config, if err != nil {
panic(err)
}
is := images.NewGRPCService(imageService)
c, rs, err := server.NewCRIService(config, &server.CRIServiceOptions{
ImageService: imageService,
Client: client,
BaseOCISpecs: map[string]*oci.Spec{}, BaseOCISpecs: map[string]*oci.Spec{},
} })
imageService, err := images.NewService(config, client)
if err != nil { if err != nil {
panic(err) panic(err)
} }
c, err := server.NewCRIService(criBase, imageService, client, nil) return fuzzCRI(f, &service{
if err != nil { CRIService: c,
panic(err) RuntimeServiceServer: rs,
} ImageServiceServer: is,
})
return fuzzCRI(f, c) }
type service struct {
server.CRIService
runtime.RuntimeServiceServer
runtime.ImageServiceServer
}
func (c *service) Register(s *grpc.Server) error {
instrumented := instrument.NewService(c)
runtime.RegisterRuntimeServiceServer(s, instrumented)
runtime.RegisterImageServiceServer(s, instrumented)
return nil
} }

View File

@ -37,16 +37,13 @@ import (
"github.com/containerd/log/logtest" "github.com/containerd/log/logtest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/content" "github.com/containerd/containerd/v2/content"
"github.com/containerd/containerd/v2/leases" "github.com/containerd/containerd/v2/leases"
"github.com/containerd/containerd/v2/namespaces" "github.com/containerd/containerd/v2/namespaces"
"github.com/containerd/containerd/v2/oci"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
criserver "github.com/containerd/containerd/v2/pkg/cri/server" criserver "github.com/containerd/containerd/v2/pkg/cri/server"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
"github.com/containerd/containerd/v2/pkg/cri/server/images" "github.com/containerd/containerd/v2/pkg/cri/server/images"
) )
@ -94,11 +91,7 @@ func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace) ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
_, err = criService.PullImage(ctx, &runtimeapi.PullImageRequest{ _, err = criService.PullImage(ctx, pullProgressTestImageName, nil, nil)
Image: &runtimeapi.ImageSpec{
Image: pullProgressTestImageName,
},
})
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -217,11 +210,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {
go func() { go func() {
defer close(errCh) defer close(errCh)
_, err := criService.PullImage(ctx, &runtimeapi.PullImageRequest{ _, err := criService.PullImage(ctx, pullProgressTestImageName, nil, nil)
Image: &runtimeapi.ImageSpec{
Image: pullProgressTestImageName,
},
})
errCh <- err errCh <- err
}() }()
@ -304,11 +293,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {
dctx, _, err := cli.WithLease(ctx) dctx, _, err := cli.WithLease(ctx)
assert.NoError(t, err) assert.NoError(t, err)
_, err = criService.PullImage(dctx, &runtimeapi.PullImageRequest{ _, err = criService.PullImage(dctx, fmt.Sprintf("%s/%s", mirrorURL.Host, "containerd/volume-ownership:2.1"), nil, nil)
Image: &runtimeapi.ImageSpec{
Image: fmt.Sprintf("%s/%s", mirrorURL.Host, "containerd/volume-ownership:2.1"),
},
})
assert.Equal(t, context.Canceled, errors.Unwrap(err), "[%v] expected canceled error, but got (%v)", idx, err) assert.Equal(t, context.Canceled, errors.Unwrap(err), "[%v] expected canceled error, but got (%v)", idx, err)
assert.True(t, mirrorSrv.limiter.clearHitCircuitBreaker(), "[%v] expected to hit circuit breaker", idx) assert.True(t, mirrorSrv.limiter.clearHitCircuitBreaker(), "[%v] expected to hit circuit breaker", idx)
@ -487,7 +472,7 @@ func (l *ioCopyLimiter) limitedCopy(ctx context.Context, dst io.Writer, src io.R
// //
// NOTE: We don't need to start the CRI plugin here because we just need the // NOTE: We don't need to start the CRI plugin here because we just need the
// ImageService API. // ImageService API.
func initLocalCRIPlugin(client *containerd.Client, tmpDir string, registryCfg criconfig.Registry) (criserver.CRIService, error) { func initLocalCRIPlugin(client *containerd.Client, tmpDir string, registryCfg criconfig.Registry) (criserver.ImageService, error) {
containerdRootDir := filepath.Join(tmpDir, "root") containerdRootDir := filepath.Join(tmpDir, "root")
criWorkDir := filepath.Join(tmpDir, "cri-plugin") criWorkDir := filepath.Join(tmpDir, "cri-plugin")
@ -505,15 +490,5 @@ func initLocalCRIPlugin(client *containerd.Client, tmpDir string, registryCfg cr
StateDir: filepath.Join(criWorkDir, "state"), StateDir: filepath.Join(criWorkDir, "state"),
} }
criBase := &base.CRIBase{ return images.NewService(cfg, map[string]string{}, client)
Config: cfg,
BaseOCISpecs: map[string]*oci.Spec{},
}
imageService, err := images.NewService(cfg, client)
if err != nil {
panic(err)
}
return criserver.NewCRIService(criBase, imageService, client, nil)
} }

View File

@ -18,13 +18,16 @@ package cri
import ( import (
"fmt" "fmt"
"io"
"github.com/containerd/log" "github.com/containerd/log"
"github.com/containerd/plugin" "github.com/containerd/plugin"
"github.com/containerd/plugin/registry" "github.com/containerd/plugin/registry"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/pkg/cri/constants"
"github.com/containerd/containerd/v2/pkg/cri/instrument"
"github.com/containerd/containerd/v2/pkg/cri/nri" "github.com/containerd/containerd/v2/pkg/cri/nri"
"github.com/containerd/containerd/v2/pkg/cri/server" "github.com/containerd/containerd/v2/pkg/cri/server"
"github.com/containerd/containerd/v2/pkg/cri/server/base" "github.com/containerd/containerd/v2/pkg/cri/server/base"
@ -32,6 +35,11 @@ import (
nriservice "github.com/containerd/containerd/v2/pkg/nri" nriservice "github.com/containerd/containerd/v2/pkg/nri"
"github.com/containerd/containerd/v2/platforms" "github.com/containerd/containerd/v2/platforms"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/sandbox"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
) )
// Register CRI service plugin // Register CRI service plugin
@ -49,6 +57,7 @@ func init() {
plugins.ServicePlugin, plugins.ServicePlugin,
plugins.LeasePlugin, plugins.LeasePlugin,
plugins.SandboxStorePlugin, plugins.SandboxStorePlugin,
plugins.TransferPlugin,
}, },
InitFn: initCRIService, InitFn: initCRIService,
}) })
@ -63,6 +72,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err) return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err)
} }
criBase := criBasePlugin.(*base.CRIBase) criBase := criBasePlugin.(*base.CRIBase)
c := criBase.Config
// Get image service. // Get image service.
criImagePlugin, err := ic.GetByID(plugins.CRIImagePlugin, "cri-image-service") criImagePlugin, err := ic.GetByID(plugins.CRIImagePlugin, "cri-image-service")
@ -83,7 +93,50 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return nil, fmt.Errorf("failed to create containerd client: %w", err) return nil, fmt.Errorf("failed to create containerd client: %w", err)
} }
s, err := server.NewCRIService(criBase, imageService, client, getNRIAPI(ic)) // TODO: Update this logic to use runtime snapshotter
if client.SnapshotService(c.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", c.ContainerdConfig.Snapshotter)
}
// TODO(dmcgowan): Get the full list directly from configured plugins
sbControllers := map[string]sandbox.Controller{
string(criconfig.ModePodSandbox): client.SandboxController(string(criconfig.ModePodSandbox)),
string(criconfig.ModeShim): client.SandboxController(string(criconfig.ModeShim)),
}
//imageFSPaths := map[string]string{}
//for _, ociRuntime := range c.ContainerdConfig.Runtimes {
// // Can not use `c.RuntimeSnapshotter() yet, so hard-coding here.`
// snapshotter := ociRuntime.Snapshotter
// if snapshotter != "" {
// imageFSPaths[snapshotter] = filepath.Join(c.ContainerdRootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
// log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
// }
// if _, ok := sbControllers[ociRuntime.Sandboxer]; !ok {
// sbControllers[ociRuntime.Sandboxer] = client.SandboxController(ociRuntime.Sandboxer)
// }
//}
//snapshotter := c.ContainerdConfig.Snapshotter
//imageFSPaths[snapshotter] = filepath.Join(c.ContainerdRootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
//log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
// TODO: expose this as a separate containerd plugin.
//imageService, err := images.NewService(c, imageFSPaths, client)
//if err != nil {
// return nil, fmt.Errorf("unable to create CRI image service: %w", err)
//}
options := &server.CRIServiceOptions{
ImageService: imageService,
NRI: getNRIAPI(ic),
ImageFSPaths: imageService.ImageFSPaths(),
Client: client,
SandboxControllers: sbControllers,
BaseOCISpecs: criBase.BaseOCISpecs,
}
is := images.NewGRPCService(imageService)
s, rs, err := server.NewCRIService(criBase.Config, options)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create CRI service: %w", err) return nil, fmt.Errorf("failed to create CRI service: %w", err)
} }
@ -97,7 +150,52 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
// TODO(random-liu): Whether and how we can stop containerd. // TODO(random-liu): Whether and how we can stop containerd.
}() }()
return s, nil service := &criGRPCServer{
RuntimeServiceServer: rs,
ImageServiceServer: is,
Closer: s, // TODO: Where is close run?
initializer: s,
}
if c.DisableTCPService {
return service, nil
}
return criGRPCServerWithTCP{service}, nil
}
type initializer interface {
IsInitialized() bool
}
type criGRPCServer struct {
runtime.RuntimeServiceServer
runtime.ImageServiceServer
io.Closer
initializer
}
func (c *criGRPCServer) register(s *grpc.Server) error {
instrumented := instrument.NewService(c)
runtime.RegisterRuntimeServiceServer(s, instrumented)
runtime.RegisterImageServiceServer(s, instrumented)
return nil
}
// Register registers all required services onto a specific grpc server.
// This is used by containerd cri plugin.
func (c *criGRPCServer) Register(s *grpc.Server) error {
return c.register(s)
}
type criGRPCServerWithTCP struct {
*criGRPCServer
}
// RegisterTCP register all required services onto a GRPC server on TCP.
// This is used by containerd CRI plugin.
func (c criGRPCServerWithTCP) RegisterTCP(s *grpc.Server) error {
return c.register(s)
} }
// Get the NRI plugin, and set up our NRI API for it. // Get the NRI plugin, and set up our NRI API for it.

View File

@ -247,7 +247,7 @@ func TestContainerStatus(t *testing.T) {
if test.imageExist { if test.imageExist {
imageStore, err := imagestore.NewFakeStore([]imagestore.Image{*image}) imageStore, err := imagestore.NewFakeStore([]imagestore.Image{*image})
assert.NoError(t, err) assert.NoError(t, err)
c.imageService = &fakeImageService{imageStore: imageStore} c.ImageService = &fakeImageService{imageStore: imageStore}
} }
resp, err := c.ContainerStatus(context.Background(), &runtime.ContainerStatusRequest{ContainerId: container.ID}) resp, err := c.ContainerStatus(context.Background(), &runtime.ContainerStatusRequest{ContainerId: container.ID})
if test.expectErr { if test.expectErr {
@ -266,7 +266,6 @@ func TestContainerStatus(t *testing.T) {
} }
type fakeImageService struct { type fakeImageService struct {
runtime.ImageServiceServer
imageStore *imagestore.Store imageStore *imagestore.Store
} }
@ -288,6 +287,10 @@ func (s *fakeImageService) LocalResolve(refOrID string) (imagestore.Image, error
func (s *fakeImageService) ImageFSPaths() map[string]string { return make(map[string]string) } func (s *fakeImageService) ImageFSPaths() map[string]string { return make(map[string]string) }
func (s *fakeImageService) PullImage(context.Context, string, func(string) (string, string, error), *runtime.PodSandboxConfig) (string, error) {
return "", errors.New("not implemented")
}
func patchExceptedWithState(expected *runtime.ContainerStatus, state runtime.ContainerState) { func patchExceptedWithState(expected *runtime.ContainerStatus, state runtime.ContainerState) {
expected.State = state expected.State = state
switch state { switch state {

View File

@ -25,7 +25,8 @@ import (
// ListImages lists existing images. // ListImages lists existing images.
// TODO(random-liu): Add image list filters after CRI defines this more clear, and kubelet // TODO(random-liu): Add image list filters after CRI defines this more clear, and kubelet
// actually needs it. // actually needs it.
func (c *CRIImageService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (*runtime.ListImagesResponse, error) { func (c *GRPCCRIImageService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (*runtime.ListImagesResponse, error) {
// TODO: From CRIImageService directly
imagesInStore := c.imageStore.List() imagesInStore := c.imageStore.List()
var images []*runtime.Image var images []*runtime.Image

View File

@ -29,7 +29,7 @@ import (
) )
func TestListImages(t *testing.T) { func TestListImages(t *testing.T) {
c := newTestCRIService() _, c := newTestCRIService()
imagesInStore := []imagestore.Image{ imagesInStore := []imagestore.Image{
{ {
ID: "sha256:1123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", ID: "sha256:1123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",

View File

@ -93,7 +93,30 @@ import (
// contents are missing but snapshots are ready, is the image still "READY"? // contents are missing but snapshots are ready, is the image still "READY"?
// PullImage pulls an image with authentication config. // PullImage pulls an image with authentication config.
func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (_ *runtime.PullImageResponse, err error) { func (c *GRPCCRIImageService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (_ *runtime.PullImageResponse, err error) {
imageRef := r.GetImage().GetImage()
credentials := func(host string) (string, string, error) {
hostauth := r.GetAuth()
if hostauth == nil {
config := c.config.Registry.Configs[host]
if config.Auth != nil {
hostauth = toRuntimeAuthConfig(*config.Auth)
}
}
return ParseAuth(hostauth, host)
}
ref, err := c.CRIImageService.PullImage(ctx, imageRef, credentials, r.SandboxConfig)
if err != nil {
return nil, err
}
return &runtime.PullImageResponse{ImageRef: ref}, nil
}
func (c *CRIImageService) PullImage(ctx context.Context, name string, credentials func(string) (string, string, error), sandboxConfig *runtime.PodSandboxConfig) (_ string, err error) {
span := tracing.SpanFromContext(ctx) span := tracing.SpanFromContext(ctx)
defer func() { defer func() {
// TODO: add domain label for imagePulls metrics, and we may need to provide a mechanism // TODO: add domain label for imagePulls metrics, and we may need to provide a mechanism
@ -109,19 +132,18 @@ func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageReq
defer inProgressImagePulls.Dec() defer inProgressImagePulls.Dec()
startTime := time.Now() startTime := time.Now()
imageRef := r.GetImage().GetImage() namedRef, err := distribution.ParseDockerRef(name)
namedRef, err := distribution.ParseDockerRef(imageRef)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse image reference %q: %w", imageRef, err) return "", fmt.Errorf("failed to parse image reference %q: %w", name, err)
} }
ref := namedRef.String() ref := namedRef.String()
if ref != imageRef { if ref != name {
log.G(ctx).Debugf("PullImage using normalized image ref: %q", ref) log.G(ctx).Debugf("PullImage using normalized image ref: %q", ref)
} }
imagePullProgressTimeout, err := time.ParseDuration(c.config.ImagePullProgressTimeout) imagePullProgressTimeout, err := time.ParseDuration(c.config.ImagePullProgressTimeout)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse image_pull_progress_timeout %q: %w", c.config.ImagePullProgressTimeout, err) return "", fmt.Errorf("failed to parse image_pull_progress_timeout %q: %w", c.config.ImagePullProgressTimeout, err)
} }
var ( var (
@ -131,7 +153,7 @@ func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageReq
resolver = docker.NewResolver(docker.ResolverOptions{ resolver = docker.NewResolver(docker.ResolverOptions{
Headers: c.config.Registry.Headers, Headers: c.config.Registry.Headers,
Hosts: c.registryHosts(ctx, r.GetAuth(), pullReporter.optionUpdateClient), Hosts: c.registryHosts(ctx, credentials, pullReporter.optionUpdateClient),
}) })
isSchema1 bool isSchema1 bool
imageHandler containerdimages.HandlerFunc = func(_ context.Context, imageHandler containerdimages.HandlerFunc = func(_ context.Context,
@ -144,9 +166,9 @@ func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageReq
) )
defer pcancel() defer pcancel()
snapshotter, err := c.snapshotterFromPodSandboxConfig(ctx, ref, r.SandboxConfig) snapshotter, err := c.snapshotterFromPodSandboxConfig(ctx, ref, sandboxConfig)
if err != nil { if err != nil {
return nil, err return "", err
} }
log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter) log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter)
span.SetAttributes( span.SetAttributes(
@ -187,13 +209,13 @@ func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageReq
image, err := c.client.Pull(pctx, ref, pullOpts...) image, err := c.client.Pull(pctx, ref, pullOpts...)
pcancel() pcancel()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err) return "", fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
} }
span.AddEvent("Pull and unpack image complete") span.AddEvent("Pull and unpack image complete")
configDesc, err := image.Config(ctx) configDesc, err := image.Config(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("get image config descriptor: %w", err) return "", fmt.Errorf("get image config descriptor: %w", err)
} }
imageID := configDesc.Digest.String() imageID := configDesc.Digest.String()
@ -203,13 +225,14 @@ func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageReq
continue continue
} }
if err := c.createImageReference(ctx, r, image.Target(), labels); err != nil { if err := c.createImageReference(ctx, r, image.Target(), labels); err != nil {
return nil, fmt.Errorf("failed to create image reference %q: %w", r, err) return "", fmt.Errorf("failed to create image reference %q: %w", r, err)
} }
// Update image store to reflect the newest state in containerd. // Update image store to reflect the newest state in containerd.
// No need to use `updateImage`, because the image reference must // No need to use `updateImage`, because the image reference must
// have been managed by the cri plugin. // have been managed by the cri plugin.
// TODO: Use image service directly
if err := c.imageStore.Update(ctx, r); err != nil { if err := c.imageStore.Update(ctx, r); err != nil {
return nil, fmt.Errorf("failed to update image store %q: %w", r, err) return "", fmt.Errorf("failed to update image store %q: %w", r, err)
} }
} }
@ -218,14 +241,14 @@ func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageReq
imagePullingSpeed := float64(size) / mbToByte / time.Since(startTime).Seconds() imagePullingSpeed := float64(size) / mbToByte / time.Since(startTime).Seconds()
imagePullThroughput.Observe(imagePullingSpeed) imagePullThroughput.Observe(imagePullingSpeed)
log.G(ctx).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q, size %q in %s", imageRef, imageID, log.G(ctx).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q, size %q in %s", name, imageID,
repoTag, repoDigest, strconv.FormatInt(size, 10), time.Since(startTime)) repoTag, repoDigest, strconv.FormatInt(size, 10), time.Since(startTime))
// NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain // NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
// in-memory image store, it's only for in-memory indexing. The image could be removed // in-memory image store, it's only for in-memory indexing. The image could be removed
// by someone else anytime, before/during/after we create the metadata. We should always // by someone else anytime, before/during/after we create the metadata. We should always
// check the actual state in containerd before using the image or returning status of the // check the actual state in containerd before using the image or returning status of the
// image. // image.
return &runtime.PullImageResponse{ImageRef: imageID}, nil return imageID, nil
} }
// getRepoDigestAngTag returns image repoDigest and repoTag of the named image reference. // getRepoDigestAngTag returns image repoDigest and repoTag of the named image reference.
@ -295,6 +318,7 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
} }
// TODO(random-liu): Figure out which is the more performant sequence create then update or // TODO(random-liu): Figure out which is the more performant sequence create then update or
// update then create. // update then create.
// TODO: Call CRIImageService directly
oldImg, err := c.client.ImageService().Create(ctx, img) oldImg, err := c.client.ImageService().Create(ctx, img)
if err == nil || !errdefs.IsAlreadyExists(err) { if err == nil || !errdefs.IsAlreadyExists(err) {
return err return err
@ -328,6 +352,7 @@ func (c *CRIImageService) getLabels(ctx context.Context, name string) map[string
// in containerd. If the reference is not managed by the cri plugin, the function also // in containerd. If the reference is not managed by the cri plugin, the function also
// generates necessary metadata for the image and make it managed. // generates necessary metadata for the image and make it managed.
func (c *CRIImageService) UpdateImage(ctx context.Context, r string) error { func (c *CRIImageService) UpdateImage(ctx context.Context, r string) error {
// TODO: Use image service
img, err := c.client.GetImage(ctx, r) img, err := c.client.GetImage(ctx, r)
if err != nil && !errdefs.IsNotFound(err) { if err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("get image by reference: %w", err) return fmt.Errorf("get image by reference: %w", err)
@ -377,22 +402,13 @@ func hostDirFromRoots(roots []string) func(string) (string, error) {
} }
// registryHosts is the registry hosts to be used by the resolver. // registryHosts is the registry hosts to be used by the resolver.
func (c *CRIImageService) registryHosts(ctx context.Context, auth *runtime.AuthConfig, updateClientFn config.UpdateClientFunc) docker.RegistryHosts { func (c *CRIImageService) registryHosts(ctx context.Context, credentials func(host string) (string, string, error), updateClientFn config.UpdateClientFunc) docker.RegistryHosts {
paths := filepath.SplitList(c.config.Registry.ConfigPath) paths := filepath.SplitList(c.config.Registry.ConfigPath)
if len(paths) > 0 { if len(paths) > 0 {
hostOptions := config.HostOptions{ hostOptions := config.HostOptions{
UpdateClient: updateClientFn, UpdateClient: updateClientFn,
} }
hostOptions.Credentials = func(host string) (string, string, error) { hostOptions.Credentials = credentials
hostauth := auth
if hostauth == nil {
config := c.config.Registry.Configs[host]
if config.Auth != nil {
hostauth = toRuntimeAuthConfig(*config.Auth)
}
}
return ParseAuth(hostauth, host)
}
hostOptions.HostDir = hostDirFromRoots(paths) hostOptions.HostDir = hostDirFromRoots(paths)
return config.ConfigureHosts(ctx, hostOptions) return config.ConfigureHosts(ctx, hostOptions)
@ -424,11 +440,15 @@ func (c *CRIImageService) registryHosts(ctx context.Context, auth *runtime.AuthC
} }
} }
// Make a copy of `auth`, so that different authorizers would not reference // Make a copy of `credentials`, so that different authorizers would not reference
// the same auth variable. // the same credentials variable.
auth := auth credentials := credentials
if auth == nil && config.Auth != nil { if credentials == nil && config.Auth != nil {
auth = toRuntimeAuthConfig(*config.Auth) auth := toRuntimeAuthConfig(*config.Auth)
credentials = func(host string) (string, string, error) {
return ParseAuth(auth, host)
}
} }
if updateClientFn != nil { if updateClientFn != nil {
@ -439,9 +459,7 @@ func (c *CRIImageService) registryHosts(ctx context.Context, auth *runtime.AuthC
authorizer := docker.NewDockerAuthorizer( authorizer := docker.NewDockerAuthorizer(
docker.WithAuthClient(client), docker.WithAuthClient(client),
docker.WithAuthCreds(func(host string) (string, string, error) { docker.WithAuthCreds(credentials))
return ParseAuth(auth, host)
}))
if u.Path == "" { if u.Path == "" {
u.Path = "/v2" u.Path = "/v2"

View File

@ -274,7 +274,7 @@ func TestRegistryEndpoints(t *testing.T) {
} { } {
test := test test := test
t.Run(test.desc, func(t *testing.T) { t.Run(test.desc, func(t *testing.T) {
c := newTestCRIService() c, _ := newTestCRIService()
c.config.Registry.Mirrors = test.mirrors c.config.Registry.Mirrors = test.mirrors
got, err := c.registryEndpoints(test.host) got, err := c.registryEndpoints(test.host)
assert.NoError(t, err) assert.NoError(t, err)
@ -368,7 +368,7 @@ func TestDefaultScheme(t *testing.T) {
// } { // } {
// test := test // test := test
// t.Run(test.desc, func(t *testing.T) { // t.Run(test.desc, func(t *testing.T) {
// c := newTestCRIService() // c, _ := newTestCRIService()
// c.config.ImageDecryption.KeyModel = test.keyModel // c.config.ImageDecryption.KeyModel = test.keyModel
// got := len(c.encryptedImagesPullOpts()) // got := len(c.encryptedImagesPullOpts())
// assert.Equal(t, test.expectedOpts, got) // assert.Equal(t, test.expectedOpts, got)
@ -424,7 +424,7 @@ func TestSnapshotterFromPodSandboxConfig(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) { t.Run(tt.desc, func(t *testing.T) {
cri := newTestCRIService() cri, _ := newTestCRIService()
cri.config.ContainerdConfig.Snapshotter = defaultSnashotter cri.config.ContainerdConfig.Snapshotter = defaultSnashotter
cri.config.ContainerdConfig.Runtimes = make(map[string]criconfig.Runtime) cri.config.ContainerdConfig.Runtimes = make(map[string]criconfig.Runtime)
cri.config.ContainerdConfig.Runtimes["exiting-runtime"] = criconfig.Runtime{ cri.config.ContainerdConfig.Runtimes["exiting-runtime"] = criconfig.Runtime{
@ -487,7 +487,7 @@ func TestGetRepoDigestAndTag(t *testing.T) {
func TestImageGetLabels(t *testing.T) { func TestImageGetLabels(t *testing.T) {
criService := newTestCRIService() criService, _ := newTestCRIService()
tests := []struct { tests := []struct {
name string name string

View File

@ -33,8 +33,10 @@ import (
// TODO(random-liu): We should change CRI to distinguish image id and image spec. // TODO(random-liu): We should change CRI to distinguish image id and image spec.
// Remove the whole image no matter the it's image id or reference. This is the // Remove the whole image no matter the it's image id or reference. This is the
// semantic defined in CRI now. // semantic defined in CRI now.
func (c *CRIImageService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) { func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
span := tracing.SpanFromContext(ctx) span := tracing.SpanFromContext(ctx)
// TODO: Move to separate function
image, err := c.LocalResolve(r.GetImage().GetImage()) image, err := c.LocalResolve(r.GetImage().GetImage())
if err != nil { if err != nil {
if errdefs.IsNotFound(err) { if errdefs.IsNotFound(err) {

View File

@ -52,7 +52,7 @@ func TestImageStatus(t *testing.T) {
Username: "user", Username: "user",
} }
c := newTestCRIService() c, g := newTestCRIService()
t.Logf("should return nil image spec without error for non-exist image") t.Logf("should return nil image spec without error for non-exist image")
resp, err := c.ImageStatus(context.Background(), &runtime.ImageStatusRequest{ resp, err := c.ImageStatus(context.Background(), &runtime.ImageStatusRequest{
Image: &runtime.ImageSpec{Image: testID}, Image: &runtime.ImageSpec{Image: testID},
@ -65,7 +65,7 @@ func TestImageStatus(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
t.Logf("should return correct image status for exist image") t.Logf("should return correct image status for exist image")
resp, err = c.ImageStatus(context.Background(), &runtime.ImageStatusRequest{ resp, err = g.ImageStatus(context.Background(), &runtime.ImageStatusRequest{
Image: &runtime.ImageSpec{Image: testID}, Image: &runtime.ImageSpec{Image: testID},
}) })
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -29,7 +29,7 @@ import (
) )
func TestImageFsInfo(t *testing.T) { func TestImageFsInfo(t *testing.T) {
c := newTestCRIService() c, g := newTestCRIService()
snapshots := []snapshotstore.Snapshot{ snapshots := []snapshotstore.Snapshot{
{ {
Key: snapshotstore.Key{ Key: snapshotstore.Key{
@ -71,7 +71,7 @@ func TestImageFsInfo(t *testing.T) {
for _, sn := range snapshots { for _, sn := range snapshots {
c.snapshotStore.Add(sn) c.snapshotStore.Add(sn)
} }
resp, err := c.ImageFsInfo(context.Background(), &runtime.ImageFsInfoRequest{}) resp, err := g.ImageFsInfo(context.Background(), &runtime.ImageFsInfoRequest{})
require.NoError(t, err) require.NoError(t, err)
stats := resp.GetImageFilesystems() stats := resp.GetImageFilesystems()
// stats[0] is for default snapshotter, stats[1] is for `overlayfs` // stats[0] is for default snapshotter, stats[1] is for `overlayfs`

View File

@ -22,12 +22,6 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
docker "github.com/distribution/reference"
imagedigest "github.com/opencontainers/go-digest"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/constants" "github.com/containerd/containerd/v2/pkg/cri/constants"
@ -39,6 +33,13 @@ import (
"github.com/containerd/containerd/v2/platforms" "github.com/containerd/containerd/v2/platforms"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
snapshot "github.com/containerd/containerd/v2/snapshots" snapshot "github.com/containerd/containerd/v2/snapshots"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
docker "github.com/distribution/reference"
imagedigest "github.com/opencontainers/go-digest"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
) )
func init() { func init() {
@ -58,7 +59,7 @@ func init() {
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err) return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err)
} }
cri := criPlugin.(*base.CRIBase) c := criPlugin.(*base.CRIBase).Config
client, err := containerd.New( client, err := containerd.New(
"", "",
@ -69,7 +70,23 @@ func init() {
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to init client for cri image service: %w", err) return nil, fmt.Errorf("unable to init client for cri image service: %w", err)
} }
service, err := NewService(cri.Config, client)
imageFSPaths := map[string]string{}
// TODO: Figure out a way to break this plugin's dependency on a shared runtime config
for _, ociRuntime := range c.ContainerdConfig.Runtimes {
// Can not use `c.RuntimeSnapshotter() yet, so hard-coding here.`
snapshotter := ociRuntime.Snapshotter
if snapshotter != "" {
imageFSPaths[snapshotter] = filepath.Join(c.ContainerdRootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
}
}
snapshotter := c.ContainerdConfig.Snapshotter
imageFSPaths[snapshotter] = filepath.Join(c.ContainerdRootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
// TODO: Pull out image specific configs here!
service, err := NewService(c, imageFSPaths, client)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create image service: %w", err) return nil, fmt.Errorf("failed to create image service: %w", err)
} }
@ -81,8 +98,21 @@ func init() {
type CRIImageService struct { type CRIImageService struct {
// config contains all configurations. // config contains all configurations.
// TODO: Migrate configs from cri type once moved to its own plugin
// - snapshotter
// - runtime snapshotter
// - Discard unpack layers
// - Disable snapshot annotations
// - Max concurrent downloads (moved to transfer service)
// - Pull progress timeout
// - Registry headers (moved to transfer service)
// - mirror (moved to transfer service)
// - image decryption (moved to transfer service)
// - default runtime
// - stats collection interval (only used to startup snapshot sync)
config criconfig.Config config criconfig.Config
// client is an instance of the containerd client // client is an instance of the containerd client
// TODO: Remove this in favor of using plugins directly
client *containerd.Client client *containerd.Client
// imageFSPaths contains path to image filesystem for snapshotters. // imageFSPaths contains path to image filesystem for snapshotters.
imageFSPaths map[string]string imageFSPaths map[string]string
@ -96,25 +126,21 @@ type CRIImageService struct {
unpackDuplicationSuppressor kmutex.KeyedLocker unpackDuplicationSuppressor kmutex.KeyedLocker
} }
func NewService(config criconfig.Config, client *containerd.Client) (*CRIImageService, error) { type GRPCCRIImageService struct {
if client.SnapshotService(config.ContainerdConfig.Snapshotter) == nil { *CRIImageService
return nil, fmt.Errorf("failed to find snapshotter %q", config.ContainerdConfig.Snapshotter) }
}
imageFSPaths := map[string]string{}
for _, ociRuntime := range config.ContainerdConfig.Runtimes {
// Can not use `c.RuntimeSnapshotter() yet, so hard-coding here.`
snapshotter := ociRuntime.Snapshotter
if snapshotter != "" {
imageFSPaths[snapshotter] = imageFSPath(config.ContainerdRootDir, snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
}
}
snapshotter := config.ContainerdConfig.Snapshotter
imageFSPaths[snapshotter] = imageFSPath(config.ContainerdRootDir, snapshotter)
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
// NewService creates a new CRI Image Service
//
// TODO:
// 1. Generalize the image service and merge with a single higher level image service
// 2. Update the options to remove client and imageFSPath
// - Platform configuration with Array/Map of snapshotter names + filesystem ID + platform matcher + runtime to snapshotter
// - Transfer service implementation
// - Image Service (from metadata)
// - Content store (from metadata)
// 3. Separate image cache and snapshot cache to first class plugins, make the snapshot cache much more efficient and intelligent
func NewService(config criconfig.Config, imageFSPaths map[string]string, client *containerd.Client) (*CRIImageService, error) {
svc := CRIImageService{ svc := CRIImageService{
config: config, config: config,
client: client, client: client,
@ -157,10 +183,9 @@ func NewService(config criconfig.Config, client *containerd.Client) (*CRIImageSe
return &svc, nil return &svc, nil
} }
// imageFSPath returns containerd image filesystem path. // NewGRPCService creates a new CRI Image Service grpc server.
// Note that if containerd changes directory layout, we also needs to change this. func NewGRPCService(imageService *CRIImageService) runtime.ImageServiceServer {
func imageFSPath(rootDir, snapshotter string) string { return &GRPCCRIImageService{imageService}
return filepath.Join(rootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
} }
// LocalResolve resolves image reference locally and returns corresponding image metadata. It // LocalResolve resolves image reference locally and returns corresponding image metadata. It

View File

@ -39,13 +39,14 @@ const (
) )
// newTestCRIService creates a fake criService for test. // newTestCRIService creates a fake criService for test.
func newTestCRIService() *CRIImageService { func newTestCRIService() (*CRIImageService, *GRPCCRIImageService) {
return &CRIImageService{ service := &CRIImageService{
config: testConfig, config: testConfig,
imageFSPaths: map[string]string{"overlayfs": testImageFSPath}, imageFSPaths: map[string]string{"overlayfs": testImageFSPath},
imageStore: imagestore.NewStore(nil, nil, platforms.Default()), imageStore: imagestore.NewStore(nil, nil, platforms.Default()),
snapshotStore: snapshotstore.NewStore(), snapshotStore: snapshotstore.NewStore(),
} }
return service, &GRPCCRIImageService{service}
} }
var testConfig = criconfig.Config{ var testConfig = criconfig.Config{
@ -67,7 +68,7 @@ func TestLocalResolve(t *testing.T) {
}, },
Size: 10, Size: 10,
} }
c := newTestCRIService() c, _ := newTestCRIService()
var err error var err error
c.imageStore, err = imagestore.NewFakeStore([]imagestore.Image{image}) c.imageStore, err = imagestore.NewFakeStore([]imagestore.Image{image})
assert.NoError(t, err) assert.NoError(t, err)
@ -123,7 +124,7 @@ func TestRuntimeSnapshotter(t *testing.T) {
} { } {
test := test test := test
t.Run(test.desc, func(t *testing.T) { t.Run(test.desc, func(t *testing.T) {
cri := newTestCRIService() cri, _ := newTestCRIService()
cri.config = criconfig.Config{ cri.config = criconfig.Config{
PluginConfig: criconfig.DefaultConfig(), PluginConfig: criconfig.DefaultConfig(),
} }

View File

@ -103,10 +103,9 @@ type CRIService interface {
// ImageService specifies dependencies to CRI image service. // ImageService specifies dependencies to CRI image service.
type ImageService interface { type ImageService interface {
runtime.ImageServiceServer
LocalResolve(refOrID string) (imagestore.Image, error) LocalResolve(refOrID string) (imagestore.Image, error)
GetImage(id string) (imagestore.Image, error) GetImage(id string) (imagestore.Image, error)
PullImage(ctx context.Context, name string, creds func(string) (string, string, error), sc *runtime.PodSandboxConfig) (string, error)
} }
type Controller struct { type Controller struct {

View File

@ -297,11 +297,11 @@ func (c *Controller) ensureImageExists(ctx context.Context, ref string, config *
return &image, nil return &image, nil
} }
// Pull image to ensure the image exists // Pull image to ensure the image exists
resp, err := c.imageService.PullImage(ctx, &runtime.PullImageRequest{Image: &runtime.ImageSpec{Image: ref}, SandboxConfig: config}) // TODO: Cleaner interface
imageID, err := c.imageService.PullImage(ctx, ref, nil, config)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %w", ref, err) return nil, fmt.Errorf("failed to pull image %q: %w", ref, err)
} }
imageID := resp.GetImageRef()
newImage, err := c.imageService.GetImage(imageID) newImage, err := c.imageService.GetImage(imageID)
if err != nil { if err != nil {
// It's still possible that someone removed the image right after it is pulled. // It's still possible that someone removed the image right after it is pulled.

View File

@ -26,16 +26,14 @@ import (
"github.com/containerd/go-cni" "github.com/containerd/go-cni"
"github.com/containerd/log" "github.com/containerd/log"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubelet/pkg/cri/streaming" "k8s.io/kubelet/pkg/cri/streaming"
containerd "github.com/containerd/containerd/v2/client" containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/oci" "github.com/containerd/containerd/v2/oci"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config" criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/instrument"
"github.com/containerd/containerd/v2/pkg/cri/nri" "github.com/containerd/containerd/v2/pkg/cri/nri"
"github.com/containerd/containerd/v2/pkg/cri/server/base"
"github.com/containerd/containerd/v2/pkg/cri/server/podsandbox" "github.com/containerd/containerd/v2/pkg/cri/server/podsandbox"
containerstore "github.com/containerd/containerd/v2/pkg/cri/store/container" containerstore "github.com/containerd/containerd/v2/pkg/cri/store/container"
imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image" imagestore "github.com/containerd/containerd/v2/pkg/cri/store/image"
@ -53,26 +51,23 @@ const defaultNetworkPlugin = "default"
// CRIService is the interface implement CRI remote service server. // CRIService is the interface implement CRI remote service server.
type CRIService interface { type CRIService interface {
runtime.RuntimeServiceServer
runtime.ImageServiceServer
// Closer is used by containerd to gracefully stop cri service. // Closer is used by containerd to gracefully stop cri service.
io.Closer io.Closer
Run(ready func()) error IsInitialized() bool
Register(*grpc.Server) error Run(ready func()) error
} }
type sandboxService interface { type sandboxService interface {
SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error) SandboxController(config *runtime.PodSandboxConfig, runtimeHandler string) (sandbox.Controller, error)
} }
// imageService specifies dependencies to image service. // ImageService specifies dependencies to image service.
type imageService interface { type ImageService interface {
runtime.ImageServiceServer
RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string
PullImage(ctx context.Context, name string, credentials func(string) (string, string, error), sandboxConfig *runtime.PodSandboxConfig) (string, error)
UpdateImage(ctx context.Context, r string) error UpdateImage(ctx context.Context, r string) error
GetImage(id string) (imagestore.Image, error) GetImage(id string) (imagestore.Image, error)
@ -81,11 +76,13 @@ type imageService interface {
LocalResolve(refOrID string) (imagestore.Image, error) LocalResolve(refOrID string) (imagestore.Image, error)
ImageFSPaths() map[string]string ImageFSPaths() map[string]string
//ImageFsInfo(context.Context, *ImageFsInfoRequest) (*ImageFsInfoResponse, error)
} }
// criService implements CRIService. // criService implements CRIService.
type criService struct { type criService struct {
imageService ImageService
// config contains all configurations. // config contains all configurations.
config criconfig.Config config criconfig.Config
// imageFSPaths contains path to image filesystem for snapshotters. // imageFSPaths contains path to image filesystem for snapshotters.
@ -130,37 +127,60 @@ type criService struct {
sandboxService sandboxService sandboxService sandboxService
} }
type CRIServiceOptions struct {
ImageService ImageService
NRI *nri.API
// SandboxControllers is a map of all the loaded sandbox controllers
SandboxControllers map[string]sandbox.Controller
// ImageFSPath is the filesystem path for images
//
// TODO: Move this to cached snapshot metadata
ImageFSPaths map[string]string
// BaseOCISpecs contains cached OCI specs loaded via `Runtime.BaseRuntimeSpec`
BaseOCISpecs map[string]*oci.Spec
// Client is the base containerd client used for accessing services,
//
// TODO: Replace this gradually with directly configured instances
Client *containerd.Client
}
// NewCRIService returns a new instance of CRIService // NewCRIService returns a new instance of CRIService
func NewCRIService(criBase *base.CRIBase, imageService imageService, client *containerd.Client, nri *nri.API) (CRIService, error) { // TODO: Add criBase.BaseOCISpecs to options
func NewCRIService(config criconfig.Config, options *CRIServiceOptions) (CRIService, runtime.RuntimeServiceServer, error) {
var err error var err error
labels := label.NewStore() labels := label.NewStore()
config := criBase.Config
c := &criService{ c := &criService{
imageService: imageService, ImageService: options.ImageService,
config: config, config: config,
client: client, client: options.Client,
imageFSPaths: imageService.ImageFSPaths(), imageFSPaths: options.ImageFSPaths,
os: osinterface.RealOS{}, os: osinterface.RealOS{},
baseOCISpecs: criBase.BaseOCISpecs, baseOCISpecs: options.BaseOCISpecs,
sandboxStore: sandboxstore.NewStore(labels), sandboxStore: sandboxstore.NewStore(labels),
containerStore: containerstore.NewStore(labels), containerStore: containerstore.NewStore(labels),
sandboxNameIndex: registrar.NewRegistrar(), sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(),
netPlugin: make(map[string]cni.CNI), netPlugin: make(map[string]cni.CNI),
sandboxService: newCriSandboxService(&config, client), sandboxService: newCriSandboxService(&config, options.Client),
} }
// TODO: figure out a proper channel size. // TODO: figure out a proper channel size.
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000) c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)
if err := c.initPlatform(); err != nil { if err := c.initPlatform(); err != nil {
return nil, fmt.Errorf("initialize platform: %w", err) return nil, nil, fmt.Errorf("initialize platform: %w", err)
} }
// prepare streaming server // prepare streaming server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout) c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort, config.StreamIdleTimeout)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create stream server: %w", err) return nil, nil, fmt.Errorf("failed to create stream server: %w", err)
} }
c.eventMonitor = newEventMonitor(c) c.eventMonitor = newEventMonitor(c)
@ -176,18 +196,20 @@ func NewCRIService(criBase *base.CRIBase, imageService imageService, client *con
if path != "" { if path != "" {
m, err := newCNINetConfSyncer(path, i, c.cniLoadOptions()) m, err := newCNINetConfSyncer(path, i, c.cniLoadOptions())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create cni conf monitor for %s: %w", name, err) return nil, nil, fmt.Errorf("failed to create cni conf monitor for %s: %w", name, err)
} }
c.cniNetConfMonitor[name] = m c.cniNetConfMonitor[name] = m
} }
} }
podSandboxController := client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller) // Initialize pod sandbox controller
// TODO: Get this from options, NOT client
podSandboxController := options.Client.SandboxController(string(criconfig.ModePodSandbox)).(*podsandbox.Controller)
podSandboxController.Init(c) podSandboxController.Init(c)
c.nri = nri c.nri = options.NRI
return c, nil return c, c, nil
} }
// BackOffEvent is a temporary workaround to call eventMonitor from controller.Stop. // BackOffEvent is a temporary workaround to call eventMonitor from controller.Stop.
@ -196,21 +218,6 @@ func (c *criService) BackOffEvent(id string, event interface{}) {
c.eventMonitor.backOff.enBackOff(id, event) c.eventMonitor.backOff.enBackOff(id, event)
} }
// Register registers all required services onto a specific grpc server.
// This is used by containerd cri plugin.
func (c *criService) Register(s *grpc.Server) error {
return c.register(s)
}
// RegisterTCP register all required services onto a GRPC server on TCP.
// This is used by containerd CRI plugin.
func (c *criService) RegisterTCP(s *grpc.Server) error {
if !c.config.DisableTCPService {
return c.register(s)
}
return nil
}
// Run starts the CRI service. // Run starts the CRI service.
func (c *criService) Run(ready func()) error { func (c *criService) Run(ready func()) error {
log.L.Info("Start subscribing containerd event") log.L.Info("Start subscribing containerd event")
@ -320,10 +327,3 @@ func (c *criService) Close() error {
func (c *criService) IsInitialized() bool { func (c *criService) IsInitialized() bool {
return c.initialized.Load() return c.initialized.Load()
} }
func (c *criService) register(s *grpc.Server) error {
instrumented := instrument.NewService(c)
runtime.RegisterRuntimeServiceServer(s, instrumented)
runtime.RegisterImageServiceServer(s, instrumented)
return nil
}

View File

@ -78,7 +78,7 @@ func (f fakeSandboxController) Metrics(ctx context.Context, sandboxID string) (*
func newTestCRIService() *criService { func newTestCRIService() *criService {
labels := label.NewStore() labels := label.NewStore()
return &criService{ return &criService{
imageService: &fakeImageService{}, ImageService: &fakeImageService{},
config: testConfig, config: testConfig,
os: ostesting.NewFakeOS(), os: ostesting.NewFakeOS(),
sandboxStore: sandboxstore.NewStore(labels), sandboxStore: sandboxstore.NewStore(labels),