From db68300a5aa5cb52b6d5eae58d9b89159da7b1ef Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 25 Sep 2018 23:38:26 -0700 Subject: [PATCH 1/2] Manage unmanaged images in k8s.io namespace Signed-off-by: Lantao Liu --- pkg/server/events.go | 41 +++++++++++++----------------- pkg/server/helpers.go | 4 +++ pkg/server/image_load.go | 7 +++-- pkg/server/image_pull.go | 55 ++++++++++++++++++++++++++++++++++------ pkg/server/restart.go | 9 +++---- pkg/server/service.go | 2 +- 6 files changed, 79 insertions(+), 39 deletions(-) diff --git a/pkg/server/events.go b/pkg/server/events.go index e6e0321b6..4681000df 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -34,7 +34,6 @@ import ( ctrdutil "github.com/containerd/cri/pkg/containerd/util" "github.com/containerd/cri/pkg/store" containerstore "github.com/containerd/cri/pkg/store/container" - imagestore "github.com/containerd/cri/pkg/store/image" sandboxstore "github.com/containerd/cri/pkg/store/sandbox" ) @@ -54,14 +53,12 @@ const ( // eventMonitor monitors containerd event and updates internal state correspondingly. // TODO(random-liu): Handle event for each container in a separate goroutine. type eventMonitor struct { - containerStore *containerstore.Store - sandboxStore *sandboxstore.Store - imageStore *imagestore.Store - ch <-chan *events.Envelope - errCh <-chan error - ctx context.Context - cancel context.CancelFunc - backOff *backOff + c *criService + ch <-chan *events.Envelope + errCh <-chan error + ctx context.Context + cancel context.CancelFunc + backOff *backOff } type backOff struct { @@ -84,16 +81,14 @@ type backOffQueue struct { // Create new event monitor. New event monitor will start subscribing containerd event. All events // happen after it should be monitored. -func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store, i *imagestore.Store) *eventMonitor { +func newEventMonitor(c *criService) *eventMonitor { // event subscribe doesn't need namespace. ctx, cancel := context.WithCancel(context.Background()) return &eventMonitor{ - containerStore: c, - sandboxStore: s, - imageStore: i, - ctx: ctx, - cancel: cancel, - backOff: newBackOff(), + c: c, + ctx: ctx, + cancel: cancel, + backOff: newBackOff(), } } @@ -206,7 +201,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { case *eventtypes.TaskExit: e := any.(*eventtypes.TaskExit) logrus.Infof("TaskExit event %+v", e) - cntr, err := em.containerStore.Get(e.ContainerID) + cntr, err := em.c.containerStore.Get(e.ContainerID) if err == nil { if err := handleContainerExit(ctx, e, cntr); err != nil { return errors.Wrap(err, "failed to handle container TaskExit event") @@ -216,7 +211,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { return errors.Wrap(err, "can't find container for TaskExit event") } // Use GetAll to include sandbox in unknown state. - sb, err := em.sandboxStore.GetAll(e.ContainerID) + sb, err := em.c.sandboxStore.GetAll(e.ContainerID) if err == nil { if err := handleSandboxExit(ctx, e, sb); err != nil { return errors.Wrap(err, "failed to handle sandbox TaskExit event") @@ -229,12 +224,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error { case *eventtypes.TaskOOM: e := any.(*eventtypes.TaskOOM) logrus.Infof("TaskOOM event %+v", e) - cntr, err := em.containerStore.Get(e.ContainerID) + cntr, err := em.c.containerStore.Get(e.ContainerID) if err != nil { if err != store.ErrNotExist { return errors.Wrap(err, "can't find container for TaskOOM event") } - if _, err = em.sandboxStore.Get(e.ContainerID); err != nil { + if _, err = em.c.sandboxStore.Get(e.ContainerID); err != nil { if err != store.ErrNotExist { return errors.Wrap(err, "can't find sandbox for TaskOOM event") } @@ -252,15 +247,15 @@ func (em *eventMonitor) handleEvent(any interface{}) error { case *eventtypes.ImageCreate: e := any.(*eventtypes.ImageCreate) logrus.Infof("ImageCreate event %+v", e) - return em.imageStore.Update(ctx, e.Name) + return em.c.updateImage(ctx, e.Name) case *eventtypes.ImageUpdate: e := any.(*eventtypes.ImageUpdate) logrus.Infof("ImageUpdate event %+v", e) - return em.imageStore.Update(ctx, e.Name) + return em.c.updateImage(ctx, e.Name) case *eventtypes.ImageDelete: e := any.(*eventtypes.ImageDelete) logrus.Infof("ImageDelete event %+v", e) - return em.imageStore.Update(ctx, e.Name) + return em.c.updateImage(ctx, e.Name) } return nil diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 40e5ac0a5..d11918685 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -105,6 +105,10 @@ const ( containerKindSandbox = "sandbox" // containerKindContainer is a label value indicating container is application container containerKindContainer = "container" + // imageLabelKey is the label key indicating the image is managed by cri plugin. + imageLabelKey = criContainerdPrefix + ".image" + // imageLabelValue is the label value indicating the image is managed by cri plugin. + imageLabelValue = "managed" // sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata" // containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest diff --git a/pkg/server/image_load.go b/pkg/server/image_load.go index d3741a021..f286048d3 100644 --- a/pkg/server/image_load.go +++ b/pkg/server/image_load.go @@ -44,8 +44,11 @@ func (c *criService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (*a } for _, repoTag := range repoTags { // Update image store to reflect the newest state in containerd. - if err := c.imageStore.Update(ctx, repoTag); err != nil { - return nil, errors.Wrapf(err, "failed to update image store %q", repoTag) + // Image imported by importer.Import is not treated as managed + // by the cri plugin, call `updateImage` to make it managed. + // TODO(random-liu): Replace this with the containerd library (issue #909). + if err := c.updateImage(ctx, repoTag); err != nil { + return nil, errors.Wrapf(err, "update image store %q", repoTag) } logrus.Debugf("Imported image %q", repoTag) } diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index 48bc4c562..cb66f138a 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -114,14 +114,16 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) imageID := configDesc.Digest.String() repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1) - for _, r := range []string{repoTag, repoDigest} { + for _, r := range []string{imageID, repoTag, repoDigest} { if r == "" { continue } if err := c.createImageReference(ctx, r, image.Target()); err != nil { - return nil, errors.Wrapf(err, "failed to update image reference %q", r) + return nil, errors.Wrapf(err, "failed to create image reference %q", r) } // Update image store to reflect the newest state in containerd. + // No need to use `updateImage`, because the image reference must + // have been managed by the cri plugin. if err := c.imageStore.Update(ctx, r); err != nil { return nil, errors.Wrapf(err, "failed to update image store %q", r) } @@ -174,20 +176,57 @@ func (c *criService) createImageReference(ctx context.Context, name string, desc img := containerdimages.Image{ Name: name, Target: desc, + // Add a label to indicate that the image is managed by the cri plugin. + Labels: map[string]string{imageLabelKey: imageLabelValue}, } // TODO(random-liu): Figure out which is the more performant sequence create then update or // update then create. - _, err := c.client.ImageService().Create(ctx, img) - if err == nil { - return nil - } - if !errdefs.IsAlreadyExists(err) { + oldImg, err := c.client.ImageService().Create(ctx, img) + if err == nil || !errdefs.IsAlreadyExists(err) { return err } - _, err = c.client.ImageService().Update(ctx, img, "target") + if oldImg.Target.Digest == img.Target.Digest && oldImg.Labels[imageLabelKey] == imageLabelValue { + return nil + } + _, err = c.client.ImageService().Update(ctx, img, "target", "labels") return err } +// updateImage updates image store to reflect the newest state of an image reference +// in containerd. If the reference is not managed by the cri plugin, the function also +// generates necessary metadata for the image and make it managed. +func (c *criService) updateImage(ctx context.Context, r string) error { + img, err := c.client.GetImage(ctx, r) + if err != nil && !errdefs.IsNotFound(err) { + return errors.Wrap(err, "get image by reference") + } + if err == nil && img.Labels()[imageLabelKey] != imageLabelValue { + // Make sure the image has the image id as its unique + // identifier that references the image in its lifetime. + configDesc, err := img.Config(ctx) + if err != nil { + return errors.Wrap(err, "get image id") + } + id := configDesc.Digest.String() + if err := c.createImageReference(ctx, id, img.Target()); err != nil { + return errors.Wrapf(err, "create image id reference %q", id) + } + if err := c.imageStore.Update(ctx, id); err != nil { + return errors.Wrapf(err, "update image store for %q", id) + } + // The image id is ready, add the label to mark the image as managed. + if err := c.createImageReference(ctx, r, img.Target()); err != nil { + return errors.Wrap(err, "create managed label") + } + } + // If the image is not found, we should continue updating the cache, + // so that the image can be removed from the cache. + if err := c.imageStore.Update(ctx, r); err != nil { + return errors.Wrapf(err, "update image store for %q", r) + } + return nil +} + // credentials returns a credential function for docker resolver to use. func (c *criService) credentials(auth *runtime.AuthConfig) func(string) (string, string, error) { return func(host string) (string, string, error) { diff --git a/pkg/server/restart.go b/pkg/server/restart.go index bd25d152a..18d8d60c6 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -36,7 +36,6 @@ import ( cio "github.com/containerd/cri/pkg/server/io" containerstore "github.com/containerd/cri/pkg/store/container" - imagestore "github.com/containerd/cri/pkg/store/image" sandboxstore "github.com/containerd/cri/pkg/store/sandbox" ) @@ -96,7 +95,7 @@ func (c *criService) recover(ctx context.Context) error { if err != nil { return errors.Wrap(err, "failed to list images") } - loadImages(ctx, c.imageStore, cImages, c.config.ContainerdConfig.Snapshotter) + c.loadImages(ctx, cImages) // It's possible that containerd containers are deleted unexpectedly. In that case, // we can't even get metadata, we should cleanup orphaned sandbox/container directories @@ -411,8 +410,8 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S } // loadImages loads images from containerd. -func loadImages(ctx context.Context, store *imagestore.Store, cImages []containerd.Image, - snapshotter string) { +func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) { + snapshotter := c.config.ContainerdConfig.Snapshotter for _, i := range cImages { ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default()) if err != nil { @@ -433,7 +432,7 @@ func loadImages(ctx context.Context, store *imagestore.Store, cImages []containe logrus.Warnf("The image %s is not unpacked.", i.Name()) // TODO(random-liu): Consider whether we should try unpack here. } - if err := store.Update(ctx, i.Name()); err != nil { + if err := c.updateImage(ctx, i.Name()); err != nil { logrus.WithError(err).Warnf("Failed to update reference for image %q", i.Name()) continue } diff --git a/pkg/server/service.go b/pkg/server/service.go index bf9f73e66..ce648ae52 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -157,7 +157,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi return nil, errors.Wrap(err, "failed to create stream server") } - c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore, c.imageStore) + c.eventMonitor = newEventMonitor(c) return c, nil } From 6905460b85fc9b2104bd0feb4123016946ca40a1 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 25 Sep 2018 23:38:43 -0700 Subject: [PATCH 2/2] Add integration test Signed-off-by: Lantao Liu --- integration/containerd_image_test.go | 148 +++++++++++++++++++++++++++ integration/restart_test.go | 32 ++++++ integration/test_utils.go | 20 ++++ 3 files changed, 200 insertions(+) create mode 100644 integration/containerd_image_test.go diff --git a/integration/containerd_image_test.go b/integration/containerd_image_test.go new file mode 100644 index 000000000..291d52a4d --- /dev/null +++ b/integration/containerd_image_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2018 The containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "golang.org/x/net/context" + "testing" + "time" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/errdefs" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" +) + +// Test to test the CRI plugin should see image pulled into containerd directly. +func TestContainerdImage(t *testing.T) { + testImage := "docker.io/library/busybox:latest" + ctx := context.Background() + + t.Logf("make sure the test image doesn't exist in the cri plugin") + i, err := imageService.ImageStatus(&runtime.ImageSpec{Image: testImage}) + require.NoError(t, err) + if i != nil { + require.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: testImage})) + } + + t.Logf("pull the image into containerd") + _, err = containerdClient.Pull(ctx, testImage, containerd.WithPullUnpack) + assert.NoError(t, err) + defer func() { + // Make sure the image is cleaned up in any case. + if err := containerdClient.ImageService().Delete(ctx, testImage); err != nil { + assert.True(t, errdefs.IsNotFound(err), err) + } + assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: testImage})) + }() + + t.Logf("the image should be seen by the cri plugin") + var id string + checkImage := func() (bool, error) { + img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: testImage}) + if err != nil { + return false, err + } + if img == nil { + t.Logf("Image %q not show up in the cri plugin yet", testImage) + return false, nil + } + id = img.Id + img, err = imageService.ImageStatus(&runtime.ImageSpec{Image: id}) + if err != nil { + return false, err + } + if img == nil { + // We always generate image id as a reference first, it must + // be ready here. + return false, errors.New("can't reference image by id") + } + if len(img.RepoTags) != 1 { + // RepoTags must have been populated correctly. + return false, errors.Errorf("unexpected repotags: %+v", img.RepoTags) + } + if img.RepoTags[0] != testImage { + return false, errors.Errorf("unexpected repotag %q", img.RepoTags[0]) + } + return true, nil + } + require.NoError(t, Eventually(checkImage, 100*time.Millisecond, 10*time.Second)) + require.NoError(t, Consistently(checkImage, 100*time.Millisecond, time.Second)) + defer func() { + t.Logf("image should still be seen by id if only tag get deleted") + if err := containerdClient.ImageService().Delete(ctx, testImage); err != nil { + assert.True(t, errdefs.IsNotFound(err), err) + } + assert.NoError(t, Consistently(func() (bool, error) { + img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: id}) + if err != nil { + return false, err + } + return img != nil, nil + }, 100*time.Millisecond, time.Second)) + t.Logf("image should be removed from the cri plugin if all references get deleted") + if err := containerdClient.ImageService().Delete(ctx, id); err != nil { + assert.True(t, errdefs.IsNotFound(err), err) + } + assert.NoError(t, Eventually(func() (bool, error) { + img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: id}) + if err != nil { + return false, err + } + return img == nil, nil + }, 100*time.Millisecond, 10*time.Second)) + }() + + t.Logf("the image should be marked as managed") + imgByRef, err := containerdClient.GetImage(ctx, testImage) + assert.NoError(t, err) + assert.Equal(t, imgByRef.Labels()["io.cri-containerd.image"], "managed") + + t.Logf("the image id should be created and managed") + imgByID, err := containerdClient.GetImage(ctx, id) + assert.NoError(t, err) + assert.Equal(t, imgByID.Labels()["io.cri-containerd.image"], "managed") + + t.Logf("should be able to start container with the image") + sbConfig := PodSandboxConfig("sandbox", "containerd-image") + sb, err := runtimeService.RunPodSandbox(sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + + cnConfig := ContainerConfig( + "test-container", + id, + WithCommand("top"), + ) + cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) + require.NoError(t, err) + require.NoError(t, runtimeService.StartContainer(cn)) + checkContainer := func() (bool, error) { + s, err := runtimeService.ContainerStatus(cn) + if err != nil { + return false, err + } + return s.GetState() == runtime.ContainerState_CONTAINER_RUNNING, nil + } + require.NoError(t, Eventually(checkContainer, 100*time.Millisecond, 10*time.Second)) + require.NoError(t, Consistently(checkContainer, 100*time.Millisecond, time.Second)) +} diff --git a/integration/restart_test.go b/integration/restart_test.go index 5cf4dbbf0..e1c3bc1ad 100644 --- a/integration/restart_test.go +++ b/integration/restart_test.go @@ -17,6 +17,7 @@ limitations under the License. package integration import ( + "sort" "testing" "time" @@ -128,6 +129,17 @@ func TestContainerdRestart(t *testing.T) { } } + t.Logf("Pull test images") + for _, image := range []string{"busybox", "alpine"} { + img, err := imageService.PullImage(&runtime.ImageSpec{image}, nil) + require.NoError(t, err) + defer func() { + assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img})) + }() + } + imagesBeforeRestart, err := imageService.ListImages(nil) + assert.NoError(t, err) + t.Logf("Kill containerd") require.NoError(t, KillProcess("containerd")) defer func() { @@ -179,4 +191,24 @@ func TestContainerdRestart(t *testing.T) { assert.NoError(t, runtimeService.StopPodSandbox(s.id)) assert.NoError(t, runtimeService.RemovePodSandbox(s.id)) } + + t.Logf("Should recover all images") + imagesAfterRestart, err := imageService.ListImages(nil) + assert.NoError(t, err) + assert.Equal(t, len(imagesBeforeRestart), len(imagesAfterRestart)) + for _, i1 := range imagesBeforeRestart { + found := false + for _, i2 := range imagesAfterRestart { + if i1.Id == i2.Id { + sort.Strings(i1.RepoTags) + sort.Strings(i1.RepoDigests) + sort.Strings(i2.RepoTags) + sort.Strings(i2.RepoDigests) + assert.Equal(t, i1, i2) + found = true + break + } + } + assert.True(t, found, "should find image %+v", i1) + } } diff --git a/integration/test_utils.go b/integration/test_utils.go index 2bde8ce4a..cece345df 100644 --- a/integration/test_utils.go +++ b/integration/test_utils.go @@ -260,6 +260,26 @@ func Eventually(f CheckFunc, period, timeout time.Duration) error { } } +// Consistently makes sure that f consistently returns true without +// error before timeout exceeds. If f returns error, Consistently +// will return the same error immediately. +func Consistently(f CheckFunc, period, timeout time.Duration) error { + start := time.Now() + for { + ok, err := f() + if !ok { + return errors.New("get false") + } + if err != nil { + return err + } + if time.Since(start) >= timeout { + return nil + } + time.Sleep(period) + } +} + // Randomize adds uuid after a string. func Randomize(str string) string { return str + "-" + util.GenerateID()