Extract image service from CRI

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2023-03-28 18:33:37 -07:00
parent 662ff50b73
commit 3557ac884b
33 changed files with 675 additions and 413 deletions

View File

@ -107,7 +107,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
// Prepare container image snapshot. For container, the image should have
// been pulled before creating the container, so do not ensure the image.
image, err := c.localResolve(config.GetImage().GetImage())
image, err := c.LocalResolve(config.GetImage().GetImage())
if err != nil {
return nil, fmt.Errorf("failed to resolve image %q: %w", config.GetImage().GetImage(), err)
}
@ -213,7 +213,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
// Set snapshotter before any other options.
opts := []containerd.NewContainerOpts{
containerd.WithSnapshotter(c.runtimeSnapshotter(ctx, ociRuntime)),
containerd.WithSnapshotter(c.RuntimeSnapshotter(ctx, ociRuntime)),
// Prepare container rootfs. This is always writeable even if
// the container wants a readonly rootfs since we want to give
// the runtime (runc) a chance to modify (e.g. to create mount
@ -400,17 +400,6 @@ func (c *criService) runtimeSpec(id string, platform platforms.Platform, baseSpe
return spec, nil
}
// Overrides the default snapshotter if Snapshotter is set for this runtime.
// See https://github.com/containerd/containerd/issues/6657
func (c *criService) runtimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string {
if ociRuntime.Snapshotter == "" {
return c.config.ContainerdConfig.Snapshotter
}
log.G(ctx).Debugf("Set snapshotter for runtime %s to %s", ociRuntime.Type, ociRuntime.Snapshotter)
return ociRuntime.Snapshotter
}
const (
// relativeRootfsPath is the rootfs path relative to bundle path.
relativeRootfsPath = "rootfs"

View File

@ -435,35 +435,3 @@ func TestBaseRuntimeSpec(t *testing.T) {
assert.Equal(t, filepath.Join("/", constants.K8sContainerdNamespace, "id1"), out.Linux.CgroupsPath)
}
func TestRuntimeSnapshotter(t *testing.T) {
defaultRuntime := config.Runtime{
Snapshotter: "",
}
fooRuntime := config.Runtime{
Snapshotter: "devmapper",
}
for desc, test := range map[string]struct {
runtime config.Runtime
expectSnapshotter string
}{
"should return default snapshotter when runtime.Snapshotter is not set": {
runtime: defaultRuntime,
expectSnapshotter: config.DefaultConfig().Snapshotter,
},
"should return overridden snapshotter when runtime.Snapshotter is set": {
runtime: fooRuntime,
expectSnapshotter: "devmapper",
},
} {
t.Run(desc, func(t *testing.T) {
cri := newTestCRIService()
cri.config = config.Config{
PluginConfig: config.DefaultConfig(),
}
assert.Equal(t, test.expectSnapshotter, cri.runtimeSnapshotter(context.Background(), test.runtime))
})
}
}

View File

@ -37,7 +37,7 @@ func (c *criService) containerMetrics(
) (*runtime.ContainerStats, error) {
var cs runtime.ContainerStats
var usedBytes, inodesUsed uint64
sn, err := c.snapshotStore.Get(meta.ID)
sn, err := c.GetSnapshot(meta.ID)
// If snapshotstore doesn't have cached snapshot information
// set WritableLayer usage to zero
if err == nil {

View File

@ -34,7 +34,7 @@ func (c *criService) containerMetrics(
) (*runtime.ContainerStats, error) {
var cs runtime.ContainerStats
var usedBytes, inodesUsed uint64
sn, err := c.snapshotStore.Get(meta.ID)
sn, err := c.GetSnapshot(meta.ID)
// If snapshotstore doesn't have cached snapshot information
// set WritableLayer usage to zero
if err == nil {

View File

@ -22,6 +22,7 @@ import (
"fmt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/pkg/cri/sbserver/images"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
@ -42,13 +43,13 @@ func (c *criService) ContainerStatus(ctx context.Context, r *runtime.ContainerSt
// * ImageRef in container status is repo digest.
spec := container.Config.GetImage()
imageRef := container.ImageRef
image, err := c.imageStore.Get(imageRef)
image, err := c.GetImage(imageRef)
if err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get image %q: %w", imageRef, err)
}
} else {
repoTags, repoDigests := parseImageReferences(image.References)
repoTags, repoDigests := images.ParseImageReferences(image.References)
if len(repoTags) > 0 {
// Based on current behavior of dockershim, this field should be
// image tag.

View File

@ -18,9 +18,12 @@ package sbserver
import (
"context"
"errors"
"testing"
"time"
criconfig "github.com/containerd/containerd/pkg/cri/config"
snapshotstore "github.com/containerd/containerd/pkg/cri/store/snapshot"
"github.com/stretchr/testify/assert"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -228,8 +231,9 @@ func TestContainerStatus(t *testing.T) {
assert.NoError(t, c.containerStore.Add(container))
}
if test.imageExist {
c.imageStore, err = imagestore.NewFakeStore([]imagestore.Image{*image})
imageStore, err := imagestore.NewFakeStore([]imagestore.Image{*image})
assert.NoError(t, err)
c.imageService = &fakeImageService{imageStore: imageStore}
}
resp, err := c.ContainerStatus(context.Background(), &runtime.ContainerStatusRequest{ContainerId: container.ID})
if test.expectErr {
@ -247,6 +251,27 @@ func TestContainerStatus(t *testing.T) {
}
}
type fakeImageService struct {
runtime.ImageServiceServer
imageStore *imagestore.Store
}
func (s *fakeImageService) RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string {
return ""
}
func (s *fakeImageService) UpdateImage(ctx context.Context, r string) error { return nil }
func (s *fakeImageService) GetImage(id string) (imagestore.Image, error) { return s.imageStore.Get(id) }
func (s *fakeImageService) GetSnapshot(key string) (snapshotstore.Snapshot, error) {
return snapshotstore.Snapshot{}, errors.New("not implemented")
}
func (s *fakeImageService) LocalResolve(refOrID string) (imagestore.Image, error) {
return imagestore.Image{}, errors.New("not implemented")
}
func patchExceptedWithState(expected *runtime.ContainerStatus, state runtime.ContainerState) {
expected.State = state
switch state {

View File

@ -133,7 +133,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
// default SIGTERM is still better than returning error and leaving
// the container unstoppable. (See issue #990)
// TODO(random-liu): Remove this logic when containerd 1.2 is deprecated.
image, err := c.imageStore.Get(container.ImageRef)
image, err := c.GetImage(container.ImageRef)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get image %q: %w", container.ImageRef, err)

View File

@ -350,13 +350,13 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
case *eventtypes.ImageCreate:
logrus.Infof("ImageCreate event %+v", e)
return em.c.updateImage(ctx, e.Name)
return em.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageUpdate:
logrus.Infof("ImageUpdate event %+v", e)
return em.c.updateImage(ctx, e.Name)
return em.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageDelete:
logrus.Infof("ImageDelete event %+v", e)
return em.c.updateImage(ctx, e.Name)
return em.c.UpdateImage(ctx, e.Name)
}
return nil

View File

@ -41,11 +41,9 @@ import (
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
runtimeoptions "github.com/containerd/containerd/pkg/runtimeoptions/v1"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/reference/docker"
runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
runhcsoptions "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
imagedigest "github.com/opencontainers/go-digest"
"github.com/pelletier/go-toml"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -83,10 +81,7 @@ const (
containerKindSandbox = "sandbox"
// containerKindContainer is a label value indicating container is application container
containerKindContainer = "container"
// imageLabelKey is the label key indicating the image is managed by cri plugin.
imageLabelKey = criContainerdPrefix + ".image"
// imageLabelValue is the label value indicating the image is managed by cri plugin.
imageLabelValue = "managed"
// sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest
sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata"
// containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest
@ -140,51 +135,6 @@ func criContainerStateToString(state runtime.ContainerState) string {
return runtime.ContainerState_name[int32(state)]
}
// getRepoDigestAngTag returns image repoDigest and repoTag of the named image reference.
func getRepoDigestAndTag(namedRef docker.Named, digest imagedigest.Digest, schema1 bool) (string, string) {
var repoTag, repoDigest string
if _, ok := namedRef.(docker.NamedTagged); ok {
repoTag = namedRef.String()
}
if _, ok := namedRef.(docker.Canonical); ok {
repoDigest = namedRef.String()
} else if !schema1 {
// digest is not actual repo digest for schema1 image.
repoDigest = namedRef.Name() + "@" + digest.String()
}
return repoDigest, repoTag
}
// localResolve resolves image reference locally and returns corresponding image metadata. It
// returns errdefs.ErrNotFound if the reference doesn't exist.
func (c *criService) localResolve(refOrID string) (imagestore.Image, error) {
getImageID := func(refOrId string) string {
if _, err := imagedigest.Parse(refOrID); err == nil {
return refOrID
}
return func(ref string) string {
// ref is not image id, try to resolve it locally.
// TODO(random-liu): Handle this error better for debugging.
normalized, err := docker.ParseDockerRef(ref)
if err != nil {
return ""
}
id, err := c.imageStore.Resolve(normalized.String())
if err != nil {
return ""
}
return id
}(refOrID)
}
imageID := getImageID(refOrID)
if imageID == "" {
// Try to treat ref as imageID
imageID = refOrID
}
return c.imageStore.Get(imageID)
}
// toContainerdImage converts an image object in image store to containerd image handler.
func (c *criService) toContainerdImage(ctx context.Context, image imagestore.Image) (containerd.Image, error) {
// image should always have at least one reference.
@ -213,30 +163,6 @@ func getUserFromImage(user string) (*int64, string) {
return &uid, ""
}
// EnsureImageExists returns corresponding metadata of the image reference, if image is not
// pulled yet, the function will pull the image.
func (c *criService) EnsureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error) {
image, err := c.localResolve(ref)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get image %q: %w", ref, err)
}
if err == nil {
return &image, nil
}
// Pull image to ensure the image exists
resp, err := c.PullImage(ctx, &runtime.PullImageRequest{Image: &runtime.ImageSpec{Image: ref}, SandboxConfig: config})
if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %w", ref, err)
}
imageID := resp.GetImageRef()
newImage, err := c.imageStore.Get(imageID)
if err != nil {
// It's still possible that someone removed the image right after it is pulled.
return nil, fmt.Errorf("failed to get image %q after pulling: %w", imageID, err)
}
return &newImage, nil
}
// validateTargetContainer checks that a container is a valid
// target for a container using PID NamespaceMode_TARGET.
// The target container must be in the same sandbox and must be running.
@ -298,34 +224,6 @@ func buildLabels(configLabels, imageConfigLabels map[string]string, containerTyp
return labels
}
// toRuntimeAuthConfig converts cri plugin auth config to runtime auth config.
func toRuntimeAuthConfig(a criconfig.AuthConfig) *runtime.AuthConfig {
return &runtime.AuthConfig{
Username: a.Username,
Password: a.Password,
Auth: a.Auth,
IdentityToken: a.IdentityToken,
}
}
// parseImageReferences parses a list of arbitrary image references and returns
// the repotags and repodigests
func parseImageReferences(refs []string) ([]string, []string) {
var tags, digests []string
for _, ref := range refs {
parsed, err := docker.ParseAnyReference(ref)
if err != nil {
continue
}
if _, ok := parsed.(docker.Canonical); ok {
digests = append(digests, parsed.String())
} else if _, ok := parsed.(docker.Tagged); ok {
tags = append(tags, parsed.String())
}
}
return tags, digests
}
// generateRuntimeOptions generates runtime options from cri plugin config.
func generateRuntimeOptions(r criconfig.Runtime, c criconfig.Config) (interface{}, error) {
if r.Options == nil {

View File

@ -27,18 +27,14 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/oci"
criconfig "github.com/containerd/containerd/pkg/cri/config"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/protobuf/types"
"github.com/containerd/containerd/reference/docker"
runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/typeurl/v2"
imagedigest "github.com/opencontainers/go-digest"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pelletier/go-toml"
"github.com/stretchr/testify/assert"
@ -85,46 +81,6 @@ func TestGetUserFromImage(t *testing.T) {
}
}
func TestGetRepoDigestAndTag(t *testing.T) {
digest := imagedigest.Digest("sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582")
for desc, test := range map[string]struct {
ref string
schema1 bool
expectedRepoDigest string
expectedRepoTag string
}{
"repo tag should be empty if original ref has no tag": {
ref: "gcr.io/library/busybox@" + digest.String(),
expectedRepoDigest: "gcr.io/library/busybox@" + digest.String(),
},
"repo tag should not be empty if original ref has tag": {
ref: "gcr.io/library/busybox:latest",
expectedRepoDigest: "gcr.io/library/busybox@" + digest.String(),
expectedRepoTag: "gcr.io/library/busybox:latest",
},
"repo digest should be empty if original ref is schema1 and has no digest": {
ref: "gcr.io/library/busybox:latest",
schema1: true,
expectedRepoDigest: "",
expectedRepoTag: "gcr.io/library/busybox:latest",
},
"repo digest should not be empty if original ref is schema1 but has digest": {
ref: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59594",
schema1: true,
expectedRepoDigest: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59594",
expectedRepoTag: "",
},
} {
t.Run(desc, func(t *testing.T) {
named, err := docker.ParseDockerRef(test.ref)
assert.NoError(t, err)
repoDigest, repoTag := getRepoDigestAndTag(named, digest, test.schema1)
assert.Equal(t, test.expectedRepoDigest, repoDigest)
assert.Equal(t, test.expectedRepoTag, repoTag)
})
}
}
func TestBuildLabels(t *testing.T) {
imageConfigLabels := map[string]string{
"a": "z",
@ -148,61 +104,6 @@ func TestBuildLabels(t *testing.T) {
assert.Equal(t, "b", configLabels["a"], "change in new labels should not affect original label")
}
func TestParseImageReferences(t *testing.T) {
refs := []string{
"gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"gcr.io/library/busybox:1.2",
"sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"arbitrary-ref",
}
expectedTags := []string{
"gcr.io/library/busybox:1.2",
}
expectedDigests := []string{"gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582"}
tags, digests := parseImageReferences(refs)
assert.Equal(t, expectedTags, tags)
assert.Equal(t, expectedDigests, digests)
}
func TestLocalResolve(t *testing.T) {
image := imagestore.Image{
ID: "sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113799",
ChainID: "test-chain-id-1",
References: []string{
"docker.io/library/busybox:latest",
"docker.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
},
Size: 10,
}
c := newTestCRIService()
var err error
c.imageStore, err = imagestore.NewFakeStore([]imagestore.Image{image})
assert.NoError(t, err)
for _, ref := range []string{
"sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113799",
"busybox",
"busybox:latest",
"busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"library/busybox",
"library/busybox:latest",
"library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"docker.io/busybox",
"docker.io/busybox:latest",
"docker.io/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"docker.io/library/busybox",
"docker.io/library/busybox:latest",
"docker.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
} {
img, err := c.localResolve(ref)
assert.NoError(t, err)
assert.Equal(t, image, img)
}
img, err := c.localResolve("randomid")
assert.Equal(t, errdefs.IsNotFound(err), true)
assert.Equal(t, imagestore.Image{}, img)
}
func TestGenerateRuntimeOptions(t *testing.T) {
nilOpts := `
systemd_cgroup = true

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"
@ -25,7 +25,7 @@ import (
// ListImages lists existing images.
// TODO(random-liu): Add image list filters after CRI defines this more clear, and kubelet
// actually needs it.
func (c *criService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (*runtime.ListImagesResponse, error) {
func (c *CRIImageService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (*runtime.ListImagesResponse, error) {
imagesInStore := c.imageStore.List()
var images []*runtime.Image

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"

View File

@ -14,13 +14,14 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"io"
"net"
@ -36,6 +37,7 @@ import (
"github.com/containerd/imgcrypt"
"github.com/containerd/imgcrypt/images/encryption"
imagedigest "github.com/opencontainers/go-digest"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -94,7 +96,7 @@ import (
// contents are missing but snapshots are ready, is the image still "READY"?
// PullImage pulls an image with authentication config.
func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (_ *runtime.PullImageResponse, err error) {
func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (_ *runtime.PullImageResponse, err error) {
span := tracing.SpanFromContext(ctx)
defer func() {
// TODO: add domain label for imagePulls metrics, and we may need to provide a mechanism
@ -224,6 +226,21 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
return &runtime.PullImageResponse{ImageRef: imageID}, nil
}
// getRepoDigestAngTag returns image repoDigest and repoTag of the named image reference.
func getRepoDigestAndTag(namedRef distribution.Named, digest imagedigest.Digest, schema1 bool) (string, string) {
var repoTag, repoDigest string
if _, ok := namedRef.(distribution.NamedTagged); ok {
repoTag = namedRef.String()
}
if _, ok := namedRef.(distribution.Canonical); ok {
repoDigest = namedRef.String()
} else if !schema1 {
// digest is not actual repo digest for schema1 image.
repoDigest = namedRef.Name() + "@" + digest.String()
}
return repoDigest, repoTag
}
// ParseAuth parses AuthConfig and returns username and password/secret required by containerd.
func ParseAuth(auth *runtime.AuthConfig, host string) (string, string, error) {
if auth == nil {
@ -267,7 +284,7 @@ func ParseAuth(auth *runtime.AuthConfig, host string) (string, string, error) {
// Note that because create and update are not finished in one transaction, there could be race. E.g.
// the image reference is deleted by someone else after create returns already exists, but before update
// happens.
func (c *criService) createImageReference(ctx context.Context, name string, desc imagespec.Descriptor) error {
func (c *CRIImageService) createImageReference(ctx context.Context, name string, desc imagespec.Descriptor) error {
img := containerdimages.Image{
Name: name,
Target: desc,
@ -287,10 +304,10 @@ func (c *criService) createImageReference(ctx context.Context, name string, desc
return err
}
// updateImage updates image store to reflect the newest state of an image reference
// UpdateImage updates image store to reflect the newest state of an image reference
// in containerd. If the reference is not managed by the cri plugin, the function also
// generates necessary metadata for the image and make it managed.
func (c *criService) updateImage(ctx context.Context, r string) error {
func (c *CRIImageService) UpdateImage(ctx context.Context, r string) error {
img, err := c.client.GetImage(ctx, r)
if err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("get image by reference: %w", err)
@ -323,7 +340,7 @@ func (c *criService) updateImage(ctx context.Context, r string) error {
}
// getTLSConfig returns a TLSConfig configured with a CA/Cert/Key specified by registryTLSConfig
func (c *criService) getTLSConfig(registryTLSConfig criconfig.TLSConfig) (*tls.Config, error) {
func (c *CRIImageService) getTLSConfig(registryTLSConfig criconfig.TLSConfig) (*tls.Config, error) {
var (
tlsConfig = &tls.Config{}
cert tls.Certificate
@ -380,7 +397,7 @@ func hostDirFromRoots(roots []string) func(string) (string, error) {
}
// registryHosts is the registry hosts to be used by the resolver.
func (c *criService) registryHosts(ctx context.Context, auth *runtime.AuthConfig, updateClientFn config.UpdateClientFunc) docker.RegistryHosts {
func (c *CRIImageService) registryHosts(ctx context.Context, auth *runtime.AuthConfig, updateClientFn config.UpdateClientFunc) docker.RegistryHosts {
paths := filepath.SplitList(c.config.Registry.ConfigPath)
if len(paths) > 0 {
hostOptions := config.HostOptions{
@ -468,6 +485,16 @@ func (c *criService) registryHosts(ctx context.Context, auth *runtime.AuthConfig
}
}
// toRuntimeAuthConfig converts cri plugin auth config to runtime auth config.
func toRuntimeAuthConfig(a criconfig.AuthConfig) *runtime.AuthConfig {
return &runtime.AuthConfig{
Username: a.Username,
Password: a.Password,
Auth: a.Auth,
IdentityToken: a.IdentityToken,
}
}
// defaultScheme returns the default scheme for a registry host.
func defaultScheme(host string) string {
if docker.IsLocalhost(host) {
@ -492,7 +519,7 @@ func addDefaultScheme(endpoint string) (string, error) {
// registryEndpoints returns endpoints for a given host.
// It adds default registry endpoint if it does not exist in the passed-in endpoint list.
// It also supports wildcard host matching with `*`.
func (c *criService) registryEndpoints(host string) ([]string, error) {
func (c *CRIImageService) registryEndpoints(host string) ([]string, error) {
var endpoints []string
_, ok := c.config.Registry.Mirrors[host]
if ok {
@ -543,7 +570,7 @@ func newTransport() *http.Transport {
// encryptedImagesPullOpts returns the necessary list of pull options required
// for decryption of encrypted images based on the cri decryption configuration.
func (c *criService) encryptedImagesPullOpts() []containerd.RemoteOpt {
func (c *CRIImageService) encryptedImagesPullOpts() []containerd.RemoteOpt {
if c.config.ImageDecryption.KeyModel == criconfig.KeyModelNode {
ltdd := imgcrypt.Payload{}
decUnpackOpt := encryption.WithUnpackConfigApplyOpts(encryption.WithDecryptedUnpack(&ltdd))
@ -740,7 +767,7 @@ func (rt *pullRequestReporterRoundTripper) RoundTrip(req *http.Request) (*http.R
// passed from pod sandbox config to get the runtimeHandler. The annotation key is specified in configuration.
// Once we know the runtime, try to override default snapshotter if it is set for this runtime.
// See https://github.com/containerd/containerd/issues/6657
func (c *criService) snapshotterFromPodSandboxConfig(ctx context.Context, imageRef string,
func (c *CRIImageService) snapshotterFromPodSandboxConfig(ctx context.Context, imageRef string,
s *runtime.PodSandboxConfig) (string, error) {
snapshotter := c.config.ContainerdConfig.Snapshotter
if s == nil || s.Annotations == nil {
@ -752,12 +779,65 @@ func (c *criService) snapshotterFromPodSandboxConfig(ctx context.Context, imageR
return snapshotter, nil
}
// TODO: Find other way to retrieve sandbox runtime, this must belong to the Runtime part of the CRI.
ociRuntime, err := c.getSandboxRuntime(s, runtimeHandler)
if err != nil {
return "", fmt.Errorf("experimental: failed to get sandbox runtime for %s, err: %+v", runtimeHandler, err)
}
snapshotter = c.runtimeSnapshotter(context.Background(), ociRuntime)
snapshotter = c.RuntimeSnapshotter(context.Background(), ociRuntime)
log.G(ctx).Infof("experimental: PullImage %q for runtime %s, using snapshotter %s", imageRef, runtimeHandler, snapshotter)
return snapshotter, nil
}
// TODO: copy-pasted from the runtime service implementation. This should not be in image service.
func (c *CRIImageService) getSandboxRuntime(config *runtime.PodSandboxConfig, runtimeHandler string) (criconfig.Runtime, error) {
if untrustedWorkload(config) {
// If the untrusted annotation is provided, runtimeHandler MUST be empty.
if runtimeHandler != "" && runtimeHandler != criconfig.RuntimeUntrusted {
return criconfig.Runtime{}, errors.New("untrusted workload with explicit runtime handler is not allowed")
}
// If the untrusted workload is requesting access to the host/node, this request will fail.
//
// Note: If the workload is marked untrusted but requests privileged, this can be granted, as the
// runtime may support this. For example, in a virtual-machine isolated runtime, privileged
// is a supported option, granting the workload to access the entire guest VM instead of host.
// TODO(windows): Deprecate this so that we don't need to handle it for windows.
if hostAccessingSandbox(config) {
return criconfig.Runtime{}, errors.New("untrusted workload with host access is not allowed")
}
runtimeHandler = criconfig.RuntimeUntrusted
}
if runtimeHandler == "" {
runtimeHandler = c.config.ContainerdConfig.DefaultRuntimeName
}
handler, ok := c.config.ContainerdConfig.Runtimes[runtimeHandler]
if !ok {
return criconfig.Runtime{}, fmt.Errorf("no runtime for %q is configured", runtimeHandler)
}
return handler, nil
}
// untrustedWorkload returns true if the sandbox contains untrusted workload.
func untrustedWorkload(config *runtime.PodSandboxConfig) bool {
return config.GetAnnotations()[annotations.UntrustedWorkload] == "true"
}
// hostAccessingSandbox returns true if the sandbox configuration
// requires additional host access for the sandbox.
func hostAccessingSandbox(config *runtime.PodSandboxConfig) bool {
securityContext := config.GetLinux().GetSecurityContext()
namespaceOptions := securityContext.GetNamespaceOptions()
if namespaceOptions.GetNetwork() == runtime.NamespaceMode_NODE ||
namespaceOptions.GetPid() == runtime.NamespaceMode_NODE ||
namespaceOptions.GetIpc() == runtime.NamespaceMode_NODE {
return true
}
return false
}

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"
@ -22,6 +22,8 @@ import (
"fmt"
"testing"
"github.com/containerd/containerd/reference/docker"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/assert"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -395,3 +397,43 @@ func TestSnapshotterFromPodSandboxConfig(t *testing.T) {
})
}
}
func TestGetRepoDigestAndTag(t *testing.T) {
digest := digest.Digest("sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582")
for desc, test := range map[string]struct {
ref string
schema1 bool
expectedRepoDigest string
expectedRepoTag string
}{
"repo tag should be empty if original ref has no tag": {
ref: "gcr.io/library/busybox@" + digest.String(),
expectedRepoDigest: "gcr.io/library/busybox@" + digest.String(),
},
"repo tag should not be empty if original ref has tag": {
ref: "gcr.io/library/busybox:latest",
expectedRepoDigest: "gcr.io/library/busybox@" + digest.String(),
expectedRepoTag: "gcr.io/library/busybox:latest",
},
"repo digest should be empty if original ref is schema1 and has no digest": {
ref: "gcr.io/library/busybox:latest",
schema1: true,
expectedRepoDigest: "",
expectedRepoTag: "gcr.io/library/busybox:latest",
},
"repo digest should not be empty if original ref is schema1 but has digest": {
ref: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59594",
schema1: true,
expectedRepoDigest: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59594",
expectedRepoTag: "",
},
} {
t.Run(desc, func(t *testing.T) {
named, err := docker.ParseDockerRef(test.ref)
assert.NoError(t, err)
repoDigest, repoTag := getRepoDigestAndTag(named, digest, test.schema1)
assert.Equal(t, test.expectedRepoDigest, repoDigest)
assert.Equal(t, test.expectedRepoTag, repoTag)
})
}
}

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"
@ -33,9 +33,9 @@ import (
// TODO(random-liu): We should change CRI to distinguish image id and image spec.
// Remove the whole image no matter the it's image id or reference. This is the
// semantic defined in CRI now.
func (c *criService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
func (c *CRIImageService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
span := tracing.SpanFromContext(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
image, err := c.LocalResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())

View File

@ -14,16 +14,19 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/tracing"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
@ -33,9 +36,9 @@ import (
// ImageStatus returns the status of the image, returns nil if the image isn't present.
// TODO(random-liu): We should change CRI to distinguish image id and image spec. (See
// kubernetes/kubernetes#46255)
func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
func (c *CRIImageService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
span := tracing.SpanFromContext(ctx)
image, err := c.localResolve(r.GetImage().GetImage())
image, err := c.LocalResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
@ -62,7 +65,7 @@ func (c *criService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequ
// toCRIImage converts internal image object to CRI runtime.Image.
func toCRIImage(image imagestore.Image) *runtime.Image {
repoTags, repoDigests := parseImageReferences(image.References)
repoTags, repoDigests := ParseImageReferences(image.References)
runtimeImage := &runtime.Image{
Id: image.ID,
RepoTags: repoTags,
@ -78,6 +81,43 @@ func toCRIImage(image imagestore.Image) *runtime.Image {
return runtimeImage
}
// getUserFromImage gets uid or user name of the image user.
// If user is numeric, it will be treated as uid; or else, it is treated as user name.
func getUserFromImage(user string) (*int64, string) {
// return both empty if user is not specified in the image.
if user == "" {
return nil, ""
}
// split instances where the id may contain user:group
user = strings.Split(user, ":")[0]
// user could be either uid or user name. Try to interpret as numeric uid.
uid, err := strconv.ParseInt(user, 10, 64)
if err != nil {
// If user is non numeric, assume it's user name.
return nil, user
}
// If user is a numeric uid.
return &uid, ""
}
// ParseImageReferences parses a list of arbitrary image references and returns
// the repotags and repodigests
func ParseImageReferences(refs []string) ([]string, []string) {
var tags, digests []string
for _, ref := range refs {
parsed, err := docker.ParseAnyReference(ref)
if err != nil {
continue
}
if _, ok := parsed.(docker.Canonical); ok {
digests = append(digests, parsed.String())
} else if _, ok := parsed.(docker.Tagged); ok {
tags = append(tags, parsed.String())
}
}
return tags, digests
}
// TODO (mikebrow): discuss moving this struct and / or constants for info map for some or all of these fields to CRI
type verboseImageInfo struct {
ChainID string `json:"chainID"`
@ -85,7 +125,7 @@ type verboseImageInfo struct {
}
// toCRIImageInfo converts internal image object information to CRI image status response info map.
func (c *criService) toCRIImageInfo(ctx context.Context, image *imagestore.Image, verbose bool) (map[string]string, error) {
func (c *CRIImageService) toCRIImageInfo(ctx context.Context, image *imagestore.Image, verbose bool) (map[string]string, error) {
if !verbose {
return nil, nil
}

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"
@ -72,3 +72,59 @@ func TestImageStatus(t *testing.T) {
assert.NotNil(t, resp)
assert.Equal(t, expected, resp.GetImage())
}
func TestParseImageReferences(t *testing.T) {
refs := []string{
"gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"gcr.io/library/busybox:1.2",
"sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"arbitrary-ref",
}
expectedTags := []string{
"gcr.io/library/busybox:1.2",
}
expectedDigests := []string{"gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582"}
tags, digests := ParseImageReferences(refs)
assert.Equal(t, expectedTags, tags)
assert.Equal(t, expectedDigests, digests)
}
// TestGetUserFromImage tests the logic of getting image uid or user name of image user.
func TestGetUserFromImage(t *testing.T) {
newI64 := func(i int64) *int64 { return &i }
for c, test := range map[string]struct {
user string
uid *int64
name string
}{
"no gid": {
user: "0",
uid: newI64(0),
},
"uid/gid": {
user: "0:1",
uid: newI64(0),
},
"empty user": {
user: "",
},
"multiple separators": {
user: "1:2:3",
uid: newI64(1),
},
"root username": {
user: "root:root",
name: "root",
},
"username": {
user: "test:test",
name: "test",
},
} {
t.Run(c, func(t *testing.T) {
actualUID, actualName := getUserFromImage(test.user)
assert.Equal(t, test.uid, actualUID)
assert.Equal(t, test.name, actualName)
})
}
}

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"
@ -25,7 +25,7 @@ import (
// ImageFsInfo returns information of the filesystem that is used to store images.
// TODO(windows): Usage for windows is always 0 right now. Support this for windows.
func (c *criService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
func (c *CRIImageService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
snapshots := c.snapshotStore.List()
timestamp := time.Now().UnixNano()
var usedBytes, inodesUsed uint64

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"

View File

@ -0,0 +1,46 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"github.com/docker/go-metrics"
prom "github.com/prometheus/client_golang/prometheus"
)
var (
imagePulls metrics.LabeledCounter
inProgressImagePulls metrics.Gauge
// pull duration / (image size / 1MBi)
imagePullThroughput prom.Histogram
)
func init() {
// these CRI metrics record latencies for successful operations around a sandbox and container's lifecycle.
ns := metrics.NewNamespace("containerd", "cri_sandboxed", nil)
imagePulls = ns.NewLabeledCounter("image_pulls", "succeeded and failed counters", "status")
inProgressImagePulls = ns.NewGauge("in_progress_image_pulls", "in progress pulls", metrics.Total)
imagePullThroughput = prom.NewHistogram(
prom.HistogramOpts{
Name: "image_pulling_throughput",
Help: "image pull throughput",
Buckets: prom.DefBuckets,
},
)
metrics.Register(ns)
}

View File

@ -0,0 +1,134 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/log"
criconfig "github.com/containerd/containerd/pkg/cri/config"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
snapshotstore "github.com/containerd/containerd/pkg/cri/store/snapshot"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/reference/docker"
imagedigest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
)
const (
// imageLabelKey is the label key indicating the image is managed by cri plugin.
imageLabelKey = "io.cri-containerd.image"
// imageLabelValue is the label value indicating the image is managed by cri plugin.
imageLabelValue = "managed"
)
type CRIImageService struct {
// config contains all configurations.
config criconfig.Config
// client is an instance of the containerd client
client *containerd.Client
// imageFSPath is the path to image filesystem.
imageFSPath string
// imageStore stores all resources associated with images.
imageStore *imagestore.Store
// snapshotStore stores information of all snapshots.
snapshotStore *snapshotstore.Store
// unpackDuplicationSuppressor is used to make sure that there is only
// one in-flight fetch request or unpack handler for a given descriptor's
// or chain ID.
unpackDuplicationSuppressor kmutex.KeyedLocker
}
func NewService(config criconfig.Config, imageFSPath string, client *containerd.Client) (*CRIImageService, error) {
svc := CRIImageService{
config: config,
client: client,
imageStore: imagestore.NewStore(client),
imageFSPath: imageFSPath,
snapshotStore: snapshotstore.NewStore(),
unpackDuplicationSuppressor: kmutex.New(),
}
if client.SnapshotService(svc.config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", svc.config.ContainerdConfig.Snapshotter)
}
// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
snapshotsSyncer := newSnapshotsSyncer(
svc.snapshotStore,
svc.client.SnapshotService(svc.config.ContainerdConfig.Snapshotter),
time.Duration(svc.config.StatsCollectPeriod)*time.Second,
)
snapshotsSyncer.start()
return &svc, nil
}
// LocalResolve resolves image reference locally and returns corresponding image metadata. It
// returns errdefs.ErrNotFound if the reference doesn't exist.
func (c *CRIImageService) LocalResolve(refOrID string) (imagestore.Image, error) {
getImageID := func(refOrId string) string {
if _, err := imagedigest.Parse(refOrID); err == nil {
return refOrID
}
return func(ref string) string {
// ref is not image id, try to resolve it locally.
// TODO(random-liu): Handle this error better for debugging.
normalized, err := docker.ParseDockerRef(ref)
if err != nil {
return ""
}
id, err := c.imageStore.Resolve(normalized.String())
if err != nil {
return ""
}
return id
}(refOrID)
}
imageID := getImageID(refOrID)
if imageID == "" {
// Try to treat ref as imageID
imageID = refOrID
}
return c.imageStore.Get(imageID)
}
// RuntimeSnapshotter overrides the default snapshotter if Snapshotter is set for this runtime.
// See https://github.com/containerd/containerd/issues/6657
func (c *CRIImageService) RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string {
if ociRuntime.Snapshotter == "" {
return c.config.ContainerdConfig.Snapshotter
}
log.G(ctx).Debugf("Set snapshotter for runtime %s to %s", ociRuntime.Type, ociRuntime.Snapshotter)
return ociRuntime.Snapshotter
}
// GetImage gets image metadata by image id.
func (c *CRIImageService) GetImage(id string) (imagestore.Image, error) {
return c.imageStore.Get(id)
}
// GetSnapshot returns the snapshot with specified key.
func (c *CRIImageService) GetSnapshot(key string) (snapshotstore.Snapshot, error) {
return c.snapshotStore.Get(key)
}

View File

@ -0,0 +1,128 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"testing"
"github.com/containerd/containerd/errdefs"
criconfig "github.com/containerd/containerd/pkg/cri/config"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
snapshotstore "github.com/containerd/containerd/pkg/cri/store/snapshot"
"github.com/stretchr/testify/assert"
)
const (
testImageFSPath = "/test/image/fs/path"
testRootDir = "/test/root"
testStateDir = "/test/state"
// Use an image id as test sandbox image to avoid image name resolve.
// TODO(random-liu): Change this to image name after we have complete image
// management unit test framework.
testSandboxImage = "sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113798"
)
// newTestCRIService creates a fake criService for test.
func newTestCRIService() *CRIImageService {
return &CRIImageService{
config: testConfig,
imageFSPath: testImageFSPath,
imageStore: imagestore.NewStore(nil),
snapshotStore: snapshotstore.NewStore(),
}
}
var testConfig = criconfig.Config{
RootDir: testRootDir,
StateDir: testStateDir,
PluginConfig: criconfig.PluginConfig{
SandboxImage: testSandboxImage,
TolerateMissingHugetlbController: true,
},
}
func TestLocalResolve(t *testing.T) {
image := imagestore.Image{
ID: "sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113799",
ChainID: "test-chain-id-1",
References: []string{
"docker.io/library/busybox:latest",
"docker.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
},
Size: 10,
}
c := newTestCRIService()
var err error
c.imageStore, err = imagestore.NewFakeStore([]imagestore.Image{image})
assert.NoError(t, err)
for _, ref := range []string{
"sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113799",
"busybox",
"busybox:latest",
"busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"library/busybox",
"library/busybox:latest",
"library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"docker.io/busybox",
"docker.io/busybox:latest",
"docker.io/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
"docker.io/library/busybox",
"docker.io/library/busybox:latest",
"docker.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
} {
img, err := c.LocalResolve(ref)
assert.NoError(t, err)
assert.Equal(t, image, img)
}
img, err := c.LocalResolve("randomid")
assert.Equal(t, errdefs.IsNotFound(err), true)
assert.Equal(t, imagestore.Image{}, img)
}
func TestRuntimeSnapshotter(t *testing.T) {
defaultRuntime := criconfig.Runtime{
Snapshotter: "",
}
fooRuntime := criconfig.Runtime{
Snapshotter: "devmapper",
}
for desc, test := range map[string]struct {
runtime criconfig.Runtime
expectSnapshotter string
}{
"should return default snapshotter when runtime.Snapshotter is not set": {
runtime: defaultRuntime,
expectSnapshotter: criconfig.DefaultConfig().Snapshotter,
},
"should return overridden snapshotter when runtime.Snapshotter is set": {
runtime: fooRuntime,
expectSnapshotter: "devmapper",
},
} {
t.Run(desc, func(t *testing.T) {
cri := newTestCRIService()
cri.config = criconfig.Config{
PluginConfig: criconfig.DefaultConfig(),
}
assert.Equal(t, test.expectSnapshotter, cri.RuntimeSnapshotter(context.Background(), test.runtime))
})
}
}

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package sbserver
package images
import (
"context"

View File

@ -18,7 +18,6 @@ package sbserver
import (
"github.com/docker/go-metrics"
prom "github.com/prometheus/client_golang/prometheus"
)
var (
@ -39,11 +38,6 @@ var (
networkPluginOperations metrics.LabeledCounter
networkPluginOperationsErrors metrics.LabeledCounter
networkPluginOperationsLatency metrics.LabeledTimer
imagePulls metrics.LabeledCounter
inProgressImagePulls metrics.Gauge
// pull duration / (image size / 1MBi)
imagePullThroughput prom.Histogram
)
func init() {
@ -68,16 +62,6 @@ func init() {
networkPluginOperationsErrors = ns.NewLabeledCounter("network_plugin_operations_errors_total", "cumulative number of network plugin operations by operation type", "operation_type")
networkPluginOperationsLatency = ns.NewLabeledTimer("network_plugin_operations_duration_seconds", "latency in seconds of network plugin operations. Broken down by operation type", "operation_type")
imagePulls = ns.NewLabeledCounter("image_pulls", "succeeded and failed counters", "status")
inProgressImagePulls = ns.NewGauge("in_progress_image_pulls", "in progress pulls", metrics.Total)
imagePullThroughput = prom.NewHistogram(
prom.HistogramOpts{
Name: "image_pulling_throughput",
Help: "image pull throughput",
Buckets: prom.DefBuckets,
},
)
metrics.Register(ns)
}

View File

@ -41,8 +41,6 @@ import (
// CRIService interface contains things required by controller, but not yet refactored from criService.
// TODO: this will be removed in subsequent iterations.
type CRIService interface {
EnsureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error)
// TODO: we should implement Event backoff in Controller.
BackOffEvent(id string, event interface{})
@ -51,11 +49,21 @@ type CRIService interface {
GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType)
}
// ImageService specifies dependencies to CRI image service.
type ImageService interface {
runtime.ImageServiceServer
LocalResolve(refOrID string) (imagestore.Image, error)
GetImage(id string) (imagestore.Image, error)
}
type Controller struct {
// config contains all configurations.
config criconfig.Config
// client is an instance of the containerd client
client *containerd.Client
// imageService is a dependency to CRI image service.
imageService ImageService
// sandboxStore stores all resources associated with sandboxes.
sandboxStore *sandboxstore.Store
// os is an interface for all required os operations.
@ -74,11 +82,13 @@ func New(
sandboxStore *sandboxstore.Store,
os osinterface.OS,
cri CRIService,
imageService ImageService,
baseOCISpecs map[string]*oci.Spec,
) *Controller {
return &Controller{
config: config,
client: client,
imageService: imageService,
sandboxStore: sandboxStore,
os: os,
cri: cri,

View File

@ -21,8 +21,6 @@ import (
"fmt"
"path"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/containerd/containerd"
@ -103,25 +101,6 @@ func (c *Controller) toContainerdImage(ctx context.Context, image imagestore.Ima
return c.client.GetImage(ctx, image.References[0])
}
// getUserFromImage gets uid or user name of the image user.
// If user is numeric, it will be treated as uid; or else, it is treated as user name.
func getUserFromImage(user string) (*int64, string) {
// return both empty if user is not specified in the image.
if user == "" {
return nil, ""
}
// split instances where the id may contain user:group
user = strings.Split(user, ":")[0]
// user could be either uid or user name. Try to interpret as numeric uid.
uid, err := strconv.ParseInt(user, 10, 64)
if err != nil {
// If user is non numeric, assume it's user name.
return nil, user
}
// If user is a numeric uid.
return &uid, ""
}
// buildLabel builds the labels from config to be passed to containerd
func buildLabels(configLabels, imageConfigLabels map[string]string, containerType string) map[string]string {
labels := make(map[string]string)

View File

@ -29,46 +29,6 @@ import (
"github.com/stretchr/testify/assert"
)
// TestGetUserFromImage tests the logic of getting image uid or user name of image user.
func TestGetUserFromImage(t *testing.T) {
newI64 := func(i int64) *int64 { return &i }
for c, test := range map[string]struct {
user string
uid *int64
name string
}{
"no gid": {
user: "0",
uid: newI64(0),
},
"uid/gid": {
user: "0:1",
uid: newI64(0),
},
"empty user": {
user: "",
},
"multiple separators": {
user: "1:2:3",
uid: newI64(1),
},
"root username": {
user: "root:root",
name: "root",
},
"username": {
user: "test:test",
name: "test",
},
} {
t.Run(c, func(t *testing.T) {
actualUID, actualName := getUserFromImage(test.user)
assert.Equal(t, test.uid, actualUID)
assert.Equal(t, test.name, actualName)
})
}
}
func TestGetRepoDigestAndTag(t *testing.T) {
digest := imagedigest.Digest("sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582")
for desc, test := range map[string]struct {

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/nri"
v1 "github.com/containerd/nri/types/v1"
"github.com/containerd/typeurl/v2"
@ -80,7 +81,7 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
)
// Ensure sandbox container image snapshot.
image, err := c.cri.EnsureImageExists(ctx, c.config.SandboxImage, config)
image, err := c.ensureImageExists(ctx, c.config.SandboxImage, config)
if err != nil {
return cin, fmt.Errorf("failed to get sandbox image %q: %w", c.config.SandboxImage, err)
}
@ -272,6 +273,28 @@ func (c *Controller) Create(ctx context.Context, _id string, _ ...sandbox.Create
return nil
}
func (c *Controller) ensureImageExists(ctx context.Context, ref string, config *runtime.PodSandboxConfig) (*imagestore.Image, error) {
image, err := c.imageService.LocalResolve(ref)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get image %q: %w", ref, err)
}
if err == nil {
return &image, nil
}
// Pull image to ensure the image exists
resp, err := c.imageService.PullImage(ctx, &runtime.PullImageRequest{Image: &runtime.ImageSpec{Image: ref}, SandboxConfig: config})
if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %w", ref, err)
}
imageID := resp.GetImageRef()
newImage, err := c.imageService.GetImage(imageID)
if err != nil {
// It's still possible that someone removed the image right after it is pulled.
return nil, fmt.Errorf("failed to get image %q after pulling: %w", imageID, err)
}
return &newImage, nil
}
// untrustedWorkload returns true if the sandbox contains untrusted workload.
func untrustedWorkload(config *runtime.PodSandboxConfig) bool {
return config.GetAnnotations()[annotations.UntrustedWorkload] == "true"

View File

@ -519,7 +519,7 @@ func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image)
log.G(ctx).Warnf("The image %s is not unpacked.", i.Name())
// TODO(random-liu): Consider whether we should try unpack here.
}
if err := c.updateImage(ctx, i.Name()); err != nil {
if err := c.UpdateImage(ctx, i.Name()); err != nil {
log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name())
return
}

View File

@ -151,7 +151,7 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
// If snapshotstore doesn't have cached snapshot information
// set WritableLayer usage to zero
var usedBytes uint64
sn, err := c.snapshotStore.Get(cntr.ID)
sn, err := c.GetSnapshot(cntr.ID)
if err == nil {
usedBytes = sn.Size
}

View File

@ -26,15 +26,16 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/pkg/cri/instrument"
"github.com/containerd/containerd/pkg/cri/nri"
"github.com/containerd/containerd/pkg/cri/sbserver/images"
"github.com/containerd/containerd/pkg/cri/sbserver/podsandbox"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
snapshotstore "github.com/containerd/containerd/pkg/cri/store/snapshot"
"github.com/containerd/containerd/pkg/cri/streaming"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/sandbox"
"github.com/containerd/go-cni"
@ -46,9 +47,7 @@ import (
criconfig "github.com/containerd/containerd/pkg/cri/config"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
snapshotstore "github.com/containerd/containerd/pkg/cri/store/snapshot"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
osinterface "github.com/containerd/containerd/pkg/os"
"github.com/containerd/containerd/pkg/registrar"
@ -69,8 +68,23 @@ type CRIService interface {
Register(*grpc.Server) error
}
// imageService specifies dependencies to image service.
type imageService interface {
runtime.ImageServiceServer
RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string
UpdateImage(ctx context.Context, r string) error
GetImage(id string) (imagestore.Image, error)
GetSnapshot(key string) (snapshotstore.Snapshot, error)
LocalResolve(refOrID string) (imagestore.Image, error)
}
// criService implements CRIService.
type criService struct {
imageService
// config contains all configurations.
config criconfig.Config
// imageFSPath is the path to image filesystem.
@ -90,10 +104,6 @@ type criService struct {
// containerNameIndex stores all container names and make sure each
// name is unique.
containerNameIndex *registrar.Registrar
// imageStore stores all resources associated with images.
imageStore *imagestore.Store
// snapshotStore stores information of all snapshots.
snapshotStore *snapshotstore.Store
// netPlugin is used to setup and teardown network when run/stop pod sandbox.
netPlugin map[string]cni.CNI
// client is an instance of the containerd client
@ -113,10 +123,6 @@ type criService struct {
// allCaps is the list of the capabilities.
// When nil, parsed from CapEff of /proc/self/status.
allCaps []string //nolint:nolintlint,unused // Ignore on non-Linux
// unpackDuplicationSuppressor is used to make sure that there is only
// one in-flight fetch request or unpack handler for a given descriptor's
// or chain ID.
unpackDuplicationSuppressor kmutex.KeyedLocker
// containerEventsChan is used to capture container events and send them
// to the caller of GetContainerEvents.
containerEventsChan chan runtime.ContainerEventResponse
@ -128,31 +134,37 @@ type criService struct {
func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) {
var err error
labels := label.NewStore()
if client.SnapshotService(config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", config.ContainerdConfig.Snapshotter)
}
imageFSPath := imageFSPath(config.ContainerdRootDir, config.ContainerdConfig.Snapshotter)
logrus.Infof("Get image filesystem path %q", imageFSPath)
// TODO: expose this as a separate containerd plugin.
imageService, err := images.NewService(config, imageFSPath, client)
if err != nil {
return nil, fmt.Errorf("unable to create CRI image service: %w", err)
}
c := &criService{
imageService: imageService,
config: config,
client: client,
imageFSPath: imageFSPath,
os: osinterface.RealOS{},
sandboxStore: sandboxstore.NewStore(labels),
containerStore: containerstore.NewStore(labels),
imageStore: imagestore.NewStore(client),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
netPlugin: make(map[string]cni.CNI),
unpackDuplicationSuppressor: kmutex.New(),
sandboxControllers: make(map[criconfig.SandboxControllerMode]sandbox.Controller),
}
// TODO: figure out a proper channel size.
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
}
c.imageFSPath = imageFSPath(config.ContainerdRootDir, config.ContainerdConfig.Snapshotter)
logrus.Infof("Get image filesystem path %q", c.imageFSPath)
if err := c.initPlatform(); err != nil {
return nil, fmt.Errorf("initialize platform: %w", err)
}
@ -189,7 +201,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.
}
// Load all sandbox controllers(pod sandbox controller and remote shim controller)
c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs)
c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, imageService, c.baseOCISpecs)
c.sandboxControllers[criconfig.ModeShim] = client.SandboxController()
c.nri = nri
@ -238,15 +250,6 @@ func (c *criService) Run() error {
logrus.Info("Start event monitor")
eventMonitorErrCh := c.eventMonitor.start()
// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
snapshotsSyncer := newSnapshotsSyncer(
c.snapshotStore,
c.client.SnapshotService(c.config.ContainerdConfig.Snapshotter),
time.Duration(c.config.StatsCollectPeriod)*time.Second,
)
snapshotsSyncer.start()
// Start CNI network conf syncers
cniNetConfMonitorErrCh := make(chan error, len(c.cniNetConfMonitor))
var netSyncGroup sync.WaitGroup

View File

@ -29,10 +29,8 @@ import (
criconfig "github.com/containerd/containerd/pkg/cri/config"
servertesting "github.com/containerd/containerd/pkg/cri/server/testing"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
"github.com/containerd/containerd/pkg/cri/store/label"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
snapshotstore "github.com/containerd/containerd/pkg/cri/store/snapshot"
ostesting "github.com/containerd/containerd/pkg/os/testing"
"github.com/containerd/containerd/pkg/registrar"
)
@ -41,12 +39,10 @@ import (
func newTestCRIService() *criService {
labels := label.NewStore()
return &criService{
imageService: &fakeImageService{},
config: testConfig,
imageFSPath: testImageFSPath,
os: ostesting.NewFakeOS(),
sandboxStore: sandboxstore.NewStore(labels),
imageStore: imagestore.NewStore(nil),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerStore: containerstore.NewStore(labels),
containerNameIndex: registrar.NewRegistrar(),

View File

@ -25,7 +25,6 @@ const (
// TODO(random-liu): Change this to image name after we have complete image
// management unit test framework.
testSandboxImage = "sha256:c75bebcdd211f41b3a460c7bf82970ed6c75acaab9cd4c9a4e125b03ca113798"
testImageFSPath = "/test/image/fs/path"
)
var testConfig = criconfig.Config{