Update CRI runtime platform and pinned image configuration
Updates the CRI image service to own image related configuration and separate it from the runtime configuration. Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
parent
11f311fdd5
commit
ad4c9f8a9d
@ -86,7 +86,7 @@ func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
|
|||||||
delayDuration := 2 * defaultImagePullProgressTimeout
|
delayDuration := 2 * defaultImagePullProgressTimeout
|
||||||
cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration))
|
cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration))
|
||||||
|
|
||||||
criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
|
criService, err := initLocalCRIImageService(cli, tmpDir, criconfig.Registry{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
||||||
@ -109,7 +109,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {
|
|||||||
|
|
||||||
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
||||||
|
|
||||||
criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
|
criService, err := initLocalCRIImageService(cli, tmpDir, criconfig.Registry{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
||||||
@ -287,7 +287,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
criService, err := initLocalCRIPlugin(cli, tmpDir, registryCfg)
|
criService, err := initLocalCRIImageService(cli, tmpDir, registryCfg)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
dctx, _, err := cli.WithLease(ctx)
|
dctx, _, err := cli.WithLease(ctx)
|
||||||
@ -468,27 +468,27 @@ func (l *ioCopyLimiter) limitedCopy(ctx context.Context, dst io.Writer, src io.R
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// initLocalCRIPlugin uses containerd.Client to init CRI plugin.
|
// initLocalCRIImageService uses containerd.Client to init CRI plugin.
|
||||||
//
|
//
|
||||||
// NOTE: We don't need to start the CRI plugin here because we just need the
|
// NOTE: We don't need to start the CRI plugin here because we just need the
|
||||||
// ImageService API.
|
// ImageService API.
|
||||||
func initLocalCRIPlugin(client *containerd.Client, tmpDir string, registryCfg criconfig.Registry) (criserver.ImageService, error) {
|
func initLocalCRIImageService(client *containerd.Client, tmpDir string, registryCfg criconfig.Registry) (criserver.ImageService, error) {
|
||||||
containerdRootDir := filepath.Join(tmpDir, "root")
|
containerdRootDir := filepath.Join(tmpDir, "root")
|
||||||
criWorkDir := filepath.Join(tmpDir, "cri-plugin")
|
|
||||||
|
|
||||||
cfg := criconfig.Config{
|
cfg := criconfig.ImageConfig{
|
||||||
PluginConfig: criconfig.PluginConfig{
|
Snapshotter: containerd.DefaultSnapshotter,
|
||||||
ImageConfig: criconfig.ImageConfig{
|
Registry: registryCfg,
|
||||||
Snapshotter: containerd.DefaultSnapshotter,
|
ImagePullProgressTimeout: defaultImagePullProgressTimeout.String(),
|
||||||
Registry: registryCfg,
|
StatsCollectPeriod: 10,
|
||||||
ImagePullProgressTimeout: defaultImagePullProgressTimeout.String(),
|
|
||||||
StatsCollectPeriod: 10,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
ContainerdRootDir: containerdRootDir,
|
|
||||||
RootDir: filepath.Join(criWorkDir, "root"),
|
|
||||||
StateDir: filepath.Join(criWorkDir, "state"),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return images.NewService(cfg.ImageConfig, map[string]string{}, map[string]images.RuntimePlatform{}, client)
|
return images.NewService(cfg, &images.CRIImageServiceOptions{
|
||||||
|
ImageFSPaths: map[string]string{
|
||||||
|
containerd.DefaultSnapshotter: containerdRootDir,
|
||||||
|
},
|
||||||
|
RuntimePlatforms: map[string]images.ImagePlatform{},
|
||||||
|
Content: client.ContentStore(),
|
||||||
|
Images: client.ImageService(),
|
||||||
|
Client: client,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
@ -237,6 +237,15 @@ type ImageDecryption struct {
|
|||||||
KeyModel string `toml:"key_model" json:"keyModel"`
|
KeyModel string `toml:"key_model" json:"keyModel"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ImagePlatform represents the platform to use for an image including the
|
||||||
|
// snapshotter to use. If snapshotter is not provided, the platform default
|
||||||
|
// can be assumed. When platform is not provided, the default platform can
|
||||||
|
// be assumed
|
||||||
|
type ImagePlatform struct {
|
||||||
|
Platform string
|
||||||
|
Snapshotter string
|
||||||
|
}
|
||||||
|
|
||||||
type ImageConfig struct {
|
type ImageConfig struct {
|
||||||
// Snapshotter is the snapshotter used by containerd.
|
// Snapshotter is the snapshotter used by containerd.
|
||||||
Snapshotter string `toml:"snapshotter" json:"snapshotter"`
|
Snapshotter string `toml:"snapshotter" json:"snapshotter"`
|
||||||
@ -251,6 +260,20 @@ type ImageConfig struct {
|
|||||||
// layers to the snapshotter.
|
// layers to the snapshotter.
|
||||||
DiscardUnpackedLayers bool `toml:"discard_unpacked_layers" json:"discardUnpackedLayers"`
|
DiscardUnpackedLayers bool `toml:"discard_unpacked_layers" json:"discardUnpackedLayers"`
|
||||||
|
|
||||||
|
// PinnedImages are images which the CRI plugin uses and should not be
|
||||||
|
// removed by the CRI client.
|
||||||
|
// Image names should be full names including domain and tag
|
||||||
|
// Examples:
|
||||||
|
// docker.io/library/ubuntu:latest
|
||||||
|
// images.k8s.io/core/pause:1.55
|
||||||
|
PinnedImages []string
|
||||||
|
|
||||||
|
// RuntimePlatforms is map between the runtime and the image platform to
|
||||||
|
// use for that runtime. When resolving an image for a runtime, this
|
||||||
|
// mapping will be used to select the image for the platform and the
|
||||||
|
// snapshotter for unpacking.
|
||||||
|
RuntimePlatforms map[string]ImagePlatform
|
||||||
|
|
||||||
// Registry contains config related to the registry
|
// Registry contains config related to the registry
|
||||||
Registry Registry `toml:"registry" json:"registry"`
|
Registry Registry `toml:"registry" json:"registry"`
|
||||||
|
|
||||||
|
@ -38,6 +38,7 @@ import (
|
|||||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||||
|
|
||||||
|
eventstypes "github.com/containerd/containerd/v2/api/events"
|
||||||
containerd "github.com/containerd/containerd/v2/client"
|
containerd "github.com/containerd/containerd/v2/client"
|
||||||
"github.com/containerd/containerd/v2/diff"
|
"github.com/containerd/containerd/v2/diff"
|
||||||
"github.com/containerd/containerd/v2/errdefs"
|
"github.com/containerd/containerd/v2/errdefs"
|
||||||
@ -319,32 +320,44 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
|
|||||||
// TODO(random-liu): Figure out which is the more performant sequence create then update or
|
// TODO(random-liu): Figure out which is the more performant sequence create then update or
|
||||||
// update then create.
|
// update then create.
|
||||||
// TODO: Call CRIImageService directly
|
// TODO: Call CRIImageService directly
|
||||||
oldImg, err := c.client.ImageService().Create(ctx, img)
|
oldImg, err := c.images.Create(ctx, img)
|
||||||
if err == nil || !errdefs.IsAlreadyExists(err) {
|
if err == nil {
|
||||||
|
if c.publisher != nil {
|
||||||
|
if err := c.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
|
||||||
|
Name: img.Name,
|
||||||
|
Labels: img.Labels,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
} else if !errdefs.IsAlreadyExists(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if oldImg.Target.Digest == img.Target.Digest && oldImg.Labels[crilabels.ImageLabelKey] == labels[crilabels.ImageLabelKey] {
|
if oldImg.Target.Digest == img.Target.Digest && oldImg.Labels[crilabels.ImageLabelKey] == labels[crilabels.ImageLabelKey] {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err = c.client.ImageService().Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey)
|
_, err = c.images.Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey)
|
||||||
|
if err == nil && c.publisher != nil {
|
||||||
|
if c.publisher != nil {
|
||||||
|
if err := c.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
|
||||||
|
Name: img.Name,
|
||||||
|
Labels: img.Labels,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLabels get image labels to be added on CRI image
|
// getLabels get image labels to be added on CRI image
|
||||||
func (c *CRIImageService) getLabels(ctx context.Context, name string) map[string]string {
|
func (c *CRIImageService) getLabels(ctx context.Context, name string) map[string]string {
|
||||||
labels := map[string]string{crilabels.ImageLabelKey: crilabels.ImageLabelValue}
|
labels := map[string]string{crilabels.ImageLabelKey: crilabels.ImageLabelValue}
|
||||||
// TODO: Separate config here to generalize pinned image list
|
for _, pinned := range c.config.PinnedImages {
|
||||||
configSandboxImage := "" //c.config.SandboxImage
|
if pinned == name {
|
||||||
// parse sandbox image
|
labels[crilabels.PinnedImageLabelKey] = crilabels.PinnedImageLabelValue
|
||||||
sandboxNamedRef, err := distribution.ParseDockerRef(configSandboxImage)
|
}
|
||||||
if err != nil {
|
|
||||||
log.G(ctx).Errorf("failed to parse sandbox image from config %s", sandboxNamedRef)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
sandboxRef := sandboxNamedRef.String()
|
|
||||||
// Adding pinned image label to sandbox image
|
|
||||||
if sandboxRef == name {
|
|
||||||
labels[crilabels.PinnedImageLabelKey] = crilabels.PinnedImageLabelValue
|
|
||||||
}
|
}
|
||||||
return labels
|
return labels
|
||||||
}
|
}
|
||||||
@ -767,9 +780,12 @@ func (c *CRIImageService) snapshotterFromPodSandboxConfig(ctx context.Context, i
|
|||||||
return snapshotter, nil
|
return snapshotter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if p, ok := c.runtimePlatforms[runtimeHandler]; ok && p.Snapshotter != snapshotter {
|
// TODO: Ensure error is returned if runtime not found?
|
||||||
snapshotter = p.Snapshotter
|
if c.runtimePlatforms != nil {
|
||||||
log.G(ctx).Infof("experimental: PullImage %q for runtime %s, using snapshotter %s", imageRef, runtimeHandler, snapshotter)
|
if p, ok := c.runtimePlatforms[runtimeHandler]; ok && p.Snapshotter != snapshotter {
|
||||||
|
snapshotter = p.Snapshotter
|
||||||
|
log.G(ctx).Infof("experimental: PullImage %q for runtime %s, using snapshotter %s", imageRef, runtimeHandler, snapshotter)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return snapshotter, nil
|
return snapshotter, nil
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/containerd/containerd/v2/pkg/cri/annotations"
|
"github.com/containerd/containerd/v2/pkg/cri/annotations"
|
||||||
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
||||||
"github.com/containerd/containerd/v2/pkg/cri/labels"
|
"github.com/containerd/containerd/v2/pkg/cri/labels"
|
||||||
|
"github.com/containerd/containerd/v2/platforms"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestParseAuth(t *testing.T) {
|
func TestParseAuth(t *testing.T) {
|
||||||
@ -402,14 +403,13 @@ func TestSnapshotterFromPodSandboxConfig(t *testing.T) {
|
|||||||
expectSnapshotter: defaultSnashotter,
|
expectSnapshotter: defaultSnashotter,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "should return error for runtime not found",
|
desc: "should return default snapshotter for runtime not found",
|
||||||
podSandboxConfig: &runtime.PodSandboxConfig{
|
podSandboxConfig: &runtime.PodSandboxConfig{
|
||||||
Annotations: map[string]string{
|
Annotations: map[string]string{
|
||||||
annotations.RuntimeHandler: "runtime-not-exists",
|
annotations.RuntimeHandler: "runtime-not-exists",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectErr: true,
|
expectSnapshotter: defaultSnashotter,
|
||||||
expectSnapshotter: "",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "should return snapshotter provided in podSandboxConfig.Annotations",
|
desc: "should return snapshotter provided in podSandboxConfig.Annotations",
|
||||||
@ -426,12 +426,10 @@ func TestSnapshotterFromPodSandboxConfig(t *testing.T) {
|
|||||||
t.Run(tt.desc, func(t *testing.T) {
|
t.Run(tt.desc, func(t *testing.T) {
|
||||||
cri, _ := newTestCRIService()
|
cri, _ := newTestCRIService()
|
||||||
cri.config.Snapshotter = defaultSnashotter
|
cri.config.Snapshotter = defaultSnashotter
|
||||||
/*
|
cri.runtimePlatforms["exiting-runtime"] = ImagePlatform{
|
||||||
cri.config.ContainerdConfig.Runtimes = make(map[string]criconfig.Runtime)
|
Platform: platforms.DefaultSpec(),
|
||||||
cri.config.ContainerdConfig.Runtimes["exiting-runtime"] = criconfig.Runtime{
|
Snapshotter: runtimeSnapshotter,
|
||||||
Snapshotter: runtimeSnapshotter,
|
}
|
||||||
}
|
|
||||||
*/
|
|
||||||
snapshotter, err := cri.snapshotterFromPodSandboxConfig(context.Background(), "test-image", tt.podSandboxConfig)
|
snapshotter, err := cri.snapshotterFromPodSandboxConfig(context.Background(), "test-image", tt.podSandboxConfig)
|
||||||
assert.Equal(t, tt.expectSnapshotter, snapshotter)
|
assert.Equal(t, tt.expectSnapshotter, snapshotter)
|
||||||
if tt.expectErr {
|
if tt.expectErr {
|
||||||
@ -492,49 +490,51 @@ func TestImageGetLabels(t *testing.T) {
|
|||||||
criService, _ := newTestCRIService()
|
criService, _ := newTestCRIService()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
expectedLabel map[string]string
|
expectedLabel map[string]string
|
||||||
configSandboxImage string
|
pinnedImages []string
|
||||||
pullImageName string
|
pullImageName string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "pinned image labels should get added on sandbox image",
|
name: "pinned image labels should get added on sandbox image",
|
||||||
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
||||||
configSandboxImage: "k8s.gcr.io/pause:3.9",
|
pinnedImages: []string{"k8s.gcr.io/pause:3.9"},
|
||||||
pullImageName: "k8s.gcr.io/pause:3.9",
|
pullImageName: "k8s.gcr.io/pause:3.9",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "pinned image labels should get added on sandbox image without tag",
|
name: "pinned image labels should get added on sandbox image without tag",
|
||||||
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
||||||
configSandboxImage: "k8s.gcr.io/pause",
|
pinnedImages: []string{"k8s.gcr.io/pause", "k8s.gcr.io/pause:latest"},
|
||||||
pullImageName: "k8s.gcr.io/pause:latest",
|
pullImageName: "k8s.gcr.io/pause:latest",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "pinned image labels should get added on sandbox image specified with tag and digest both",
|
name: "pinned image labels should get added on sandbox image specified with tag and digest both",
|
||||||
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
||||||
configSandboxImage: "k8s.gcr.io/pause:3.9@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
pinnedImages: []string{
|
||||||
pullImageName: "k8s.gcr.io/pause@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
"k8s.gcr.io/pause:3.9@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
||||||
|
"k8s.gcr.io/pause@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
||||||
|
},
|
||||||
|
pullImageName: "k8s.gcr.io/pause@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
||||||
},
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
name: "pinned image labels should get added on sandbox image specified with digest",
|
name: "pinned image labels should get added on sandbox image specified with digest",
|
||||||
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue, labels.PinnedImageLabelKey: labels.PinnedImageLabelValue},
|
||||||
configSandboxImage: "k8s.gcr.io/pause@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
pinnedImages: []string{"k8s.gcr.io/pause@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2"},
|
||||||
pullImageName: "k8s.gcr.io/pause@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
pullImageName: "k8s.gcr.io/pause@sha256:45b23dee08af5e43a7fea6c4cf9c25ccf269ee113168c19722f87876677c5cb2",
|
||||||
},
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
name: "pinned image labels should not get added on other image",
|
name: "pinned image labels should not get added on other image",
|
||||||
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue},
|
expectedLabel: map[string]string{labels.ImageLabelKey: labels.ImageLabelValue},
|
||||||
configSandboxImage: "k8s.gcr.io/pause:3.9",
|
pinnedImages: []string{"k8s.gcr.io/pause:3.9"},
|
||||||
pullImageName: "k8s.gcr.io/random:latest",
|
pullImageName: "k8s.gcr.io/random:latest",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
// Change this config name
|
criService.config.PinnedImages = tt.pinnedImages
|
||||||
//criService.config.SandboxImage = tt.configSandboxImage
|
|
||||||
labels := criService.getLabels(context.Background(), tt.pullImageName)
|
labels := criService.getLabels(context.Background(), tt.pullImageName)
|
||||||
assert.Equal(t, tt.expectedLabel, labels)
|
assert.Equal(t, tt.expectedLabel, labels)
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
eventstypes "github.com/containerd/containerd/v2/api/events"
|
||||||
"github.com/containerd/containerd/v2/errdefs"
|
"github.com/containerd/containerd/v2/errdefs"
|
||||||
"github.com/containerd/containerd/v2/images"
|
"github.com/containerd/containerd/v2/images"
|
||||||
"github.com/containerd/containerd/v2/tracing"
|
"github.com/containerd/containerd/v2/tracing"
|
||||||
@ -56,12 +57,20 @@ func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.Remove
|
|||||||
// someone else before this point.
|
// someone else before this point.
|
||||||
opts = []images.DeleteOpt{images.SynchronousDelete()}
|
opts = []images.DeleteOpt{images.SynchronousDelete()}
|
||||||
}
|
}
|
||||||
err = c.client.ImageService().Delete(ctx, ref, opts...)
|
err = c.images.Delete(ctx, ref, opts...)
|
||||||
if err == nil || errdefs.IsNotFound(err) {
|
if err == nil || errdefs.IsNotFound(err) {
|
||||||
// Update image store to reflect the newest state in containerd.
|
// Update image store to reflect the newest state in containerd.
|
||||||
if err := c.imageStore.Update(ctx, ref); err != nil {
|
if err := c.imageStore.Update(ctx, ref); err != nil {
|
||||||
return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err)
|
return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.publisher != nil {
|
||||||
|
if err := c.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
|
||||||
|
Name: ref,
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err)
|
return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err)
|
||||||
|
@ -23,6 +23,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
containerd "github.com/containerd/containerd/v2/client"
|
containerd "github.com/containerd/containerd/v2/client"
|
||||||
|
"github.com/containerd/containerd/v2/content"
|
||||||
|
"github.com/containerd/containerd/v2/events"
|
||||||
|
"github.com/containerd/containerd/v2/images"
|
||||||
|
"github.com/containerd/containerd/v2/metadata"
|
||||||
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
|
||||||
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
"github.com/containerd/containerd/v2/pkg/cri/constants"
|
||||||
"github.com/containerd/containerd/v2/pkg/cri/server/base"
|
"github.com/containerd/containerd/v2/pkg/cri/server/base"
|
||||||
@ -31,7 +35,7 @@ import (
|
|||||||
"github.com/containerd/containerd/v2/pkg/kmutex"
|
"github.com/containerd/containerd/v2/pkg/kmutex"
|
||||||
"github.com/containerd/containerd/v2/platforms"
|
"github.com/containerd/containerd/v2/platforms"
|
||||||
"github.com/containerd/containerd/v2/plugins"
|
"github.com/containerd/containerd/v2/plugins"
|
||||||
snapshot "github.com/containerd/containerd/v2/snapshots"
|
"github.com/containerd/containerd/v2/snapshots"
|
||||||
"github.com/containerd/log"
|
"github.com/containerd/log"
|
||||||
"github.com/containerd/plugin"
|
"github.com/containerd/plugin"
|
||||||
"github.com/containerd/plugin/registry"
|
"github.com/containerd/plugin/registry"
|
||||||
@ -48,9 +52,11 @@ func init() {
|
|||||||
Requires: []plugin.Type{
|
Requires: []plugin.Type{
|
||||||
plugins.LeasePlugin,
|
plugins.LeasePlugin,
|
||||||
plugins.EventPlugin,
|
plugins.EventPlugin,
|
||||||
|
plugins.MetadataPlugin,
|
||||||
plugins.SandboxStorePlugin,
|
plugins.SandboxStorePlugin,
|
||||||
plugins.InternalPlugin,
|
plugins.InternalPlugin,
|
||||||
plugins.ServicePlugin,
|
plugins.ServicePlugin,
|
||||||
|
plugins.SnapshotPlugin,
|
||||||
},
|
},
|
||||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||||
// Get base CRI dependencies.
|
// Get base CRI dependencies.
|
||||||
@ -58,9 +64,30 @@ func init() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err)
|
return nil, fmt.Errorf("unable to load CRI service base dependencies: %w", err)
|
||||||
}
|
}
|
||||||
|
// TODO: Move this to migration directly
|
||||||
c := criPlugin.(*base.CRIBase).Config
|
c := criPlugin.(*base.CRIBase).Config
|
||||||
|
|
||||||
client, err := containerd.New(
|
m, err := ic.GetSingle(plugins.MetadataPlugin)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
mdb := m.(*metadata.DB)
|
||||||
|
|
||||||
|
ep, err := ic.GetSingle(plugins.EventPlugin)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
options := &CRIImageServiceOptions{
|
||||||
|
Content: mdb.ContentStore(),
|
||||||
|
Images: metadata.NewImageStore(mdb),
|
||||||
|
RuntimePlatforms: map[string]ImagePlatform{},
|
||||||
|
Snapshotters: map[string]snapshots.Snapshotter{},
|
||||||
|
ImageFSPaths: map[string]string{},
|
||||||
|
Publisher: ep.(events.Publisher),
|
||||||
|
}
|
||||||
|
|
||||||
|
options.Client, err = containerd.New(
|
||||||
"",
|
"",
|
||||||
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
|
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
|
||||||
containerd.WithDefaultPlatform(platforms.Default()),
|
containerd.WithDefaultPlatform(platforms.Default()),
|
||||||
@ -70,28 +97,62 @@ func init() {
|
|||||||
return nil, fmt.Errorf("unable to init client for cri image service: %w", err)
|
return nil, fmt.Errorf("unable to init client for cri image service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotterOverrides := map[string]RuntimePlatform{}
|
allSnapshotters := mdb.Snapshotters()
|
||||||
imageFSPaths := map[string]string{}
|
defaultSnapshotter := c.ImageConfig.Snapshotter
|
||||||
// TODO: Figure out a way to break this plugin's dependency on a shared runtime config
|
if s, ok := allSnapshotters[defaultSnapshotter]; ok {
|
||||||
for runtimeName, ociRuntime := range c.ContainerdConfig.Runtimes {
|
options.Snapshotters[defaultSnapshotter] = s
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("failed to find snapshotter %q", defaultSnapshotter)
|
||||||
|
}
|
||||||
|
var snapshotRoot string
|
||||||
|
if plugin := ic.Plugins().Get(plugins.SnapshotPlugin, defaultSnapshotter); plugin != nil {
|
||||||
|
snapshotRoot = plugin.Meta.Exports["root"]
|
||||||
|
}
|
||||||
|
if snapshotRoot == "" {
|
||||||
|
// Try a root in the same parent as this plugin
|
||||||
|
snapshotRoot = filepath.Join(filepath.Dir(ic.Properties[plugins.PropertyRootDir]), plugins.SnapshotPlugin.String()+"."+defaultSnapshotter)
|
||||||
|
}
|
||||||
|
options.ImageFSPaths[defaultSnapshotter] = snapshotRoot
|
||||||
|
log.L.Infof("Get image filesystem path %q for snapshotter %q", snapshotRoot, defaultSnapshotter)
|
||||||
|
|
||||||
|
for runtimeName, rp := range c.ImageConfig.RuntimePlatforms {
|
||||||
// Can not use `c.RuntimeSnapshotter() yet, so hard-coding here.`
|
// Can not use `c.RuntimeSnapshotter() yet, so hard-coding here.`
|
||||||
snapshotter := ociRuntime.Snapshotter
|
snapshotter := rp.Snapshotter
|
||||||
if snapshotter != "" {
|
if snapshotter == "" {
|
||||||
snapshotterOverrides[runtimeName] = RuntimePlatform{
|
snapshotter = defaultSnapshotter
|
||||||
Snapshotter: snapshotter,
|
} else if _, ok := options.ImageFSPaths[snapshotter]; !ok {
|
||||||
// TODO: This must be provided by runtime
|
if s, ok := options.Snapshotters[defaultSnapshotter]; ok {
|
||||||
Platform: platforms.DefaultSpec(),
|
options.Snapshotters[defaultSnapshotter] = s
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("failed to find snapshotter %q", defaultSnapshotter)
|
||||||
}
|
}
|
||||||
imageFSPaths[snapshotter] = filepath.Join(c.ContainerdRootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
|
var snapshotRoot string
|
||||||
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
|
if plugin := ic.Plugins().Get(plugins.SnapshotPlugin, snapshotter); plugin != nil {
|
||||||
|
snapshotRoot = plugin.Meta.Exports["root"]
|
||||||
|
}
|
||||||
|
if snapshotRoot == "" {
|
||||||
|
// Try a root in the same parent as this plugin
|
||||||
|
snapshotRoot = filepath.Join(filepath.Dir(ic.Properties[plugins.PropertyRootDir]), plugins.SnapshotPlugin.String()+"."+snapshotter)
|
||||||
|
}
|
||||||
|
|
||||||
|
options.ImageFSPaths[defaultSnapshotter] = snapshotRoot
|
||||||
|
log.L.Infof("Get image filesystem path %q for snapshotter %q", options.ImageFSPaths[snapshotter], snapshotter)
|
||||||
|
}
|
||||||
|
platform := platforms.DefaultSpec()
|
||||||
|
if rp.Platform != "" {
|
||||||
|
p, err := platforms.Parse(rp.Platform)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to parse platform %q: %w", rp.Platform, err)
|
||||||
|
}
|
||||||
|
platform = p
|
||||||
|
}
|
||||||
|
options.RuntimePlatforms[runtimeName] = ImagePlatform{
|
||||||
|
Snapshotter: snapshotter,
|
||||||
|
Platform: platform,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
snapshotter := c.ImageConfig.Snapshotter
|
|
||||||
imageFSPaths[snapshotter] = filepath.Join(c.ContainerdRootDir, plugins.SnapshotPlugin.String()+"."+snapshotter)
|
|
||||||
log.L.Infof("Get image filesystem path %q for snapshotter %q", imageFSPaths[snapshotter], snapshotter)
|
|
||||||
|
|
||||||
// TODO: Pull out image specific configs here!
|
service, err := NewService(c.ImageConfig, options)
|
||||||
service, err := NewService(c.ImageConfig, imageFSPaths, snapshotterOverrides, client)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create image service: %w", err)
|
return nil, fmt.Errorf("failed to create image service: %w", err)
|
||||||
}
|
}
|
||||||
@ -101,7 +162,13 @@ func init() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type RuntimePlatform struct {
|
type imageClient interface {
|
||||||
|
ListImages(context.Context, ...string) ([]containerd.Image, error)
|
||||||
|
GetImage(context.Context, string) (containerd.Image, error)
|
||||||
|
Pull(context.Context, string, ...containerd.RemoteOpt) (containerd.Image, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ImagePlatform struct {
|
||||||
Snapshotter string
|
Snapshotter string
|
||||||
Platform platforms.Platform
|
Platform platforms.Platform
|
||||||
}
|
}
|
||||||
@ -121,13 +188,18 @@ type CRIImageService struct {
|
|||||||
// - default runtime
|
// - default runtime
|
||||||
// - stats collection interval (only used to startup snapshot sync)
|
// - stats collection interval (only used to startup snapshot sync)
|
||||||
config criconfig.ImageConfig
|
config criconfig.ImageConfig
|
||||||
// client is an instance of the containerd client
|
// images is the lower level image store used for raw storage,
|
||||||
// TODO: Remove this in favor of using plugins directly
|
// no event publishing should currently be assumed
|
||||||
client *containerd.Client
|
images images.Store
|
||||||
|
// publisher is the events publisher
|
||||||
|
publisher events.Publisher
|
||||||
|
// client is a subset of the containerd client
|
||||||
|
// and will be replaced by image store and transfer service
|
||||||
|
client imageClient
|
||||||
// imageFSPaths contains path to image filesystem for snapshotters.
|
// imageFSPaths contains path to image filesystem for snapshotters.
|
||||||
imageFSPaths map[string]string
|
imageFSPaths map[string]string
|
||||||
// runtimePlatforms are the platforms configured for a runtime.
|
// runtimePlatforms are the platforms configured for a runtime.
|
||||||
runtimePlatforms map[string]RuntimePlatform
|
runtimePlatforms map[string]ImagePlatform
|
||||||
// imageStore stores all resources associated with images.
|
// imageStore stores all resources associated with images.
|
||||||
imageStore *imagestore.Store
|
imageStore *imagestore.Store
|
||||||
// snapshotStore stores information of all snapshots.
|
// snapshotStore stores information of all snapshots.
|
||||||
@ -142,6 +214,22 @@ type GRPCCRIImageService struct {
|
|||||||
*CRIImageService
|
*CRIImageService
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CRIImageServiceOptions struct {
|
||||||
|
Content content.Store
|
||||||
|
|
||||||
|
Images images.Store
|
||||||
|
|
||||||
|
ImageFSPaths map[string]string
|
||||||
|
|
||||||
|
RuntimePlatforms map[string]ImagePlatform
|
||||||
|
|
||||||
|
Snapshotters map[string]snapshots.Snapshotter
|
||||||
|
|
||||||
|
Publisher events.Publisher
|
||||||
|
|
||||||
|
Client imageClient
|
||||||
|
}
|
||||||
|
|
||||||
// NewService creates a new CRI Image Service
|
// NewService creates a new CRI Image Service
|
||||||
//
|
//
|
||||||
// TODO:
|
// TODO:
|
||||||
@ -152,40 +240,22 @@ type GRPCCRIImageService struct {
|
|||||||
// - Image Service (from metadata)
|
// - Image Service (from metadata)
|
||||||
// - Content store (from metadata)
|
// - Content store (from metadata)
|
||||||
// 3. Separate image cache and snapshot cache to first class plugins, make the snapshot cache much more efficient and intelligent
|
// 3. Separate image cache and snapshot cache to first class plugins, make the snapshot cache much more efficient and intelligent
|
||||||
func NewService(config criconfig.ImageConfig, imageFSPaths map[string]string, runtimePlatforms map[string]RuntimePlatform, client *containerd.Client) (*CRIImageService, error) {
|
func NewService(config criconfig.ImageConfig, options *CRIImageServiceOptions) (*CRIImageService, error) {
|
||||||
svc := CRIImageService{
|
svc := CRIImageService{
|
||||||
config: config,
|
config: config,
|
||||||
client: client,
|
images: options.Images,
|
||||||
imageStore: imagestore.NewStore(client.ImageService(), client.ContentStore(), platforms.Default()),
|
client: options.Client,
|
||||||
imageFSPaths: imageFSPaths,
|
imageStore: imagestore.NewStore(options.Images, options.Content, platforms.Default()),
|
||||||
runtimePlatforms: runtimePlatforms,
|
imageFSPaths: options.ImageFSPaths,
|
||||||
|
runtimePlatforms: options.RuntimePlatforms,
|
||||||
snapshotStore: snapshotstore.NewStore(),
|
snapshotStore: snapshotstore.NewStore(),
|
||||||
unpackDuplicationSuppressor: kmutex.New(),
|
unpackDuplicationSuppressor: kmutex.New(),
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotters := map[string]snapshot.Snapshotter{}
|
|
||||||
|
|
||||||
for _, rp := range runtimePlatforms {
|
|
||||||
if snapshotter := svc.client.SnapshotService(rp.Snapshotter); snapshotter != nil {
|
|
||||||
snapshotters[rp.Snapshotter] = snapshotter
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("failed to find snapshotter %q", rp.Snapshotter)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add default snapshotter
|
|
||||||
snapshotterName := svc.config.Snapshotter
|
|
||||||
if snapshotter := svc.client.SnapshotService(snapshotterName); snapshotter != nil {
|
|
||||||
snapshotters[snapshotterName] = snapshotter
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("failed to find snapshotter %q", snapshotterName)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start snapshot stats syncer, it doesn't need to be stopped.
|
|
||||||
log.L.Info("Start snapshots syncer")
|
log.L.Info("Start snapshots syncer")
|
||||||
snapshotsSyncer := newSnapshotsSyncer(
|
snapshotsSyncer := newSnapshotsSyncer(
|
||||||
svc.snapshotStore,
|
svc.snapshotStore,
|
||||||
snapshotters,
|
options.Snapshotters,
|
||||||
time.Duration(svc.config.StatsCollectPeriod)*time.Second,
|
time.Duration(svc.config.StatsCollectPeriod)*time.Second,
|
||||||
)
|
)
|
||||||
snapshotsSyncer.start()
|
snapshotsSyncer.start()
|
||||||
|
@ -41,11 +41,13 @@ const (
|
|||||||
// newTestCRIService creates a fake criService for test.
|
// newTestCRIService creates a fake criService for test.
|
||||||
func newTestCRIService() (*CRIImageService, *GRPCCRIImageService) {
|
func newTestCRIService() (*CRIImageService, *GRPCCRIImageService) {
|
||||||
service := &CRIImageService{
|
service := &CRIImageService{
|
||||||
config: testConfig.ImageConfig,
|
config: testConfig.ImageConfig,
|
||||||
imageFSPaths: map[string]string{"overlayfs": testImageFSPath},
|
runtimePlatforms: map[string]ImagePlatform{},
|
||||||
imageStore: imagestore.NewStore(nil, nil, platforms.Default()),
|
imageFSPaths: map[string]string{"overlayfs": testImageFSPath},
|
||||||
snapshotStore: snapshotstore.NewStore(),
|
imageStore: imagestore.NewStore(nil, nil, platforms.Default()),
|
||||||
|
snapshotStore: snapshotstore.NewStore(),
|
||||||
}
|
}
|
||||||
|
|
||||||
return service, &GRPCCRIImageService{service}
|
return service, &GRPCCRIImageService{service}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,6 +55,9 @@ var testConfig = criconfig.Config{
|
|||||||
RootDir: testRootDir,
|
RootDir: testRootDir,
|
||||||
StateDir: testStateDir,
|
StateDir: testStateDir,
|
||||||
PluginConfig: criconfig.PluginConfig{
|
PluginConfig: criconfig.PluginConfig{
|
||||||
|
ImageConfig: criconfig.ImageConfig{
|
||||||
|
PinnedImages: []string{testSandboxImage},
|
||||||
|
},
|
||||||
SandboxImage: testSandboxImage,
|
SandboxImage: testSandboxImage,
|
||||||
TolerateMissingHugetlbController: true,
|
TolerateMissingHugetlbController: true,
|
||||||
},
|
},
|
||||||
|
@ -55,8 +55,8 @@ type Image struct {
|
|||||||
Pinned bool
|
Pinned bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ImageGetter is used to get images but does not make changes
|
// Getter is used to get images but does not make changes
|
||||||
type ImageGetter interface {
|
type Getter interface {
|
||||||
Get(ctx context.Context, name string) (images.Image, error)
|
Get(ctx context.Context, name string) (images.Image, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,7 +67,7 @@ type Store struct {
|
|||||||
refCache map[string]string
|
refCache map[string]string
|
||||||
|
|
||||||
// images is the local image store
|
// images is the local image store
|
||||||
images ImageGetter
|
images Getter
|
||||||
|
|
||||||
// content provider
|
// content provider
|
||||||
provider content.InfoReaderProvider
|
provider content.InfoReaderProvider
|
||||||
@ -81,7 +81,7 @@ type Store struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewStore creates an image store.
|
// NewStore creates an image store.
|
||||||
func NewStore(img ImageGetter, provider content.InfoReaderProvider, platform platforms.MatchComparer) *Store {
|
func NewStore(img Getter, provider content.InfoReaderProvider, platform platforms.MatchComparer) *Store {
|
||||||
return &Store{
|
return &Store{
|
||||||
refCache: make(map[string]string),
|
refCache: make(map[string]string),
|
||||||
images: img,
|
images: img,
|
||||||
|
Loading…
Reference in New Issue
Block a user