Merge pull request #926 from Random-Liu/better-external-image-handling
Better external image handling
This commit is contained in:
commit
58ab1e09b2
148
integration/containerd_image_test.go
Normal file
148
integration/containerd_image_test.go
Normal file
@ -0,0 +1,148 @@
|
||||
/*
|
||||
Copyright 2018 The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
|
||||
)
|
||||
|
||||
// Test to test the CRI plugin should see image pulled into containerd directly.
|
||||
func TestContainerdImage(t *testing.T) {
|
||||
testImage := "docker.io/library/busybox:latest"
|
||||
ctx := context.Background()
|
||||
|
||||
t.Logf("make sure the test image doesn't exist in the cri plugin")
|
||||
i, err := imageService.ImageStatus(&runtime.ImageSpec{Image: testImage})
|
||||
require.NoError(t, err)
|
||||
if i != nil {
|
||||
require.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: testImage}))
|
||||
}
|
||||
|
||||
t.Logf("pull the image into containerd")
|
||||
_, err = containerdClient.Pull(ctx, testImage, containerd.WithPullUnpack)
|
||||
assert.NoError(t, err)
|
||||
defer func() {
|
||||
// Make sure the image is cleaned up in any case.
|
||||
if err := containerdClient.ImageService().Delete(ctx, testImage); err != nil {
|
||||
assert.True(t, errdefs.IsNotFound(err), err)
|
||||
}
|
||||
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: testImage}))
|
||||
}()
|
||||
|
||||
t.Logf("the image should be seen by the cri plugin")
|
||||
var id string
|
||||
checkImage := func() (bool, error) {
|
||||
img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: testImage})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if img == nil {
|
||||
t.Logf("Image %q not show up in the cri plugin yet", testImage)
|
||||
return false, nil
|
||||
}
|
||||
id = img.Id
|
||||
img, err = imageService.ImageStatus(&runtime.ImageSpec{Image: id})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if img == nil {
|
||||
// We always generate image id as a reference first, it must
|
||||
// be ready here.
|
||||
return false, errors.New("can't reference image by id")
|
||||
}
|
||||
if len(img.RepoTags) != 1 {
|
||||
// RepoTags must have been populated correctly.
|
||||
return false, errors.Errorf("unexpected repotags: %+v", img.RepoTags)
|
||||
}
|
||||
if img.RepoTags[0] != testImage {
|
||||
return false, errors.Errorf("unexpected repotag %q", img.RepoTags[0])
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
require.NoError(t, Eventually(checkImage, 100*time.Millisecond, 10*time.Second))
|
||||
require.NoError(t, Consistently(checkImage, 100*time.Millisecond, time.Second))
|
||||
defer func() {
|
||||
t.Logf("image should still be seen by id if only tag get deleted")
|
||||
if err := containerdClient.ImageService().Delete(ctx, testImage); err != nil {
|
||||
assert.True(t, errdefs.IsNotFound(err), err)
|
||||
}
|
||||
assert.NoError(t, Consistently(func() (bool, error) {
|
||||
img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: id})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return img != nil, nil
|
||||
}, 100*time.Millisecond, time.Second))
|
||||
t.Logf("image should be removed from the cri plugin if all references get deleted")
|
||||
if err := containerdClient.ImageService().Delete(ctx, id); err != nil {
|
||||
assert.True(t, errdefs.IsNotFound(err), err)
|
||||
}
|
||||
assert.NoError(t, Eventually(func() (bool, error) {
|
||||
img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: id})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return img == nil, nil
|
||||
}, 100*time.Millisecond, 10*time.Second))
|
||||
}()
|
||||
|
||||
t.Logf("the image should be marked as managed")
|
||||
imgByRef, err := containerdClient.GetImage(ctx, testImage)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, imgByRef.Labels()["io.cri-containerd.image"], "managed")
|
||||
|
||||
t.Logf("the image id should be created and managed")
|
||||
imgByID, err := containerdClient.GetImage(ctx, id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, imgByID.Labels()["io.cri-containerd.image"], "managed")
|
||||
|
||||
t.Logf("should be able to start container with the image")
|
||||
sbConfig := PodSandboxConfig("sandbox", "containerd-image")
|
||||
sb, err := runtimeService.RunPodSandbox(sbConfig)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
assert.NoError(t, runtimeService.StopPodSandbox(sb))
|
||||
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
|
||||
}()
|
||||
|
||||
cnConfig := ContainerConfig(
|
||||
"test-container",
|
||||
id,
|
||||
WithCommand("top"),
|
||||
)
|
||||
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, runtimeService.StartContainer(cn))
|
||||
checkContainer := func() (bool, error) {
|
||||
s, err := runtimeService.ContainerStatus(cn)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return s.GetState() == runtime.ContainerState_CONTAINER_RUNNING, nil
|
||||
}
|
||||
require.NoError(t, Eventually(checkContainer, 100*time.Millisecond, 10*time.Second))
|
||||
require.NoError(t, Consistently(checkContainer, 100*time.Millisecond, time.Second))
|
||||
}
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package integration
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -128,6 +129,17 @@ func TestContainerdRestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Pull test images")
|
||||
for _, image := range []string{"busybox", "alpine"} {
|
||||
img, err := imageService.PullImage(&runtime.ImageSpec{image}, nil)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img}))
|
||||
}()
|
||||
}
|
||||
imagesBeforeRestart, err := imageService.ListImages(nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
t.Logf("Kill containerd")
|
||||
require.NoError(t, KillProcess("containerd"))
|
||||
defer func() {
|
||||
@ -179,4 +191,24 @@ func TestContainerdRestart(t *testing.T) {
|
||||
assert.NoError(t, runtimeService.StopPodSandbox(s.id))
|
||||
assert.NoError(t, runtimeService.RemovePodSandbox(s.id))
|
||||
}
|
||||
|
||||
t.Logf("Should recover all images")
|
||||
imagesAfterRestart, err := imageService.ListImages(nil)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(imagesBeforeRestart), len(imagesAfterRestart))
|
||||
for _, i1 := range imagesBeforeRestart {
|
||||
found := false
|
||||
for _, i2 := range imagesAfterRestart {
|
||||
if i1.Id == i2.Id {
|
||||
sort.Strings(i1.RepoTags)
|
||||
sort.Strings(i1.RepoDigests)
|
||||
sort.Strings(i2.RepoTags)
|
||||
sort.Strings(i2.RepoDigests)
|
||||
assert.Equal(t, i1, i2)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.True(t, found, "should find image %+v", i1)
|
||||
}
|
||||
}
|
||||
|
@ -260,6 +260,26 @@ func Eventually(f CheckFunc, period, timeout time.Duration) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Consistently makes sure that f consistently returns true without
|
||||
// error before timeout exceeds. If f returns error, Consistently
|
||||
// will return the same error immediately.
|
||||
func Consistently(f CheckFunc, period, timeout time.Duration) error {
|
||||
start := time.Now()
|
||||
for {
|
||||
ok, err := f()
|
||||
if !ok {
|
||||
return errors.New("get false")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if time.Since(start) >= timeout {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(period)
|
||||
}
|
||||
}
|
||||
|
||||
// Randomize adds uuid after a string.
|
||||
func Randomize(str string) string {
|
||||
return str + "-" + util.GenerateID()
|
||||
|
@ -34,7 +34,6 @@ import (
|
||||
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
|
||||
"github.com/containerd/cri/pkg/store"
|
||||
containerstore "github.com/containerd/cri/pkg/store/container"
|
||||
imagestore "github.com/containerd/cri/pkg/store/image"
|
||||
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
|
||||
)
|
||||
|
||||
@ -54,14 +53,12 @@ const (
|
||||
// eventMonitor monitors containerd event and updates internal state correspondingly.
|
||||
// TODO(random-liu): Handle event for each container in a separate goroutine.
|
||||
type eventMonitor struct {
|
||||
containerStore *containerstore.Store
|
||||
sandboxStore *sandboxstore.Store
|
||||
imageStore *imagestore.Store
|
||||
ch <-chan *events.Envelope
|
||||
errCh <-chan error
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
backOff *backOff
|
||||
c *criService
|
||||
ch <-chan *events.Envelope
|
||||
errCh <-chan error
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
backOff *backOff
|
||||
}
|
||||
|
||||
type backOff struct {
|
||||
@ -84,16 +81,14 @@ type backOffQueue struct {
|
||||
|
||||
// Create new event monitor. New event monitor will start subscribing containerd event. All events
|
||||
// happen after it should be monitored.
|
||||
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store, i *imagestore.Store) *eventMonitor {
|
||||
func newEventMonitor(c *criService) *eventMonitor {
|
||||
// event subscribe doesn't need namespace.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &eventMonitor{
|
||||
containerStore: c,
|
||||
sandboxStore: s,
|
||||
imageStore: i,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
backOff: newBackOff(),
|
||||
c: c,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
backOff: newBackOff(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,7 +201,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
case *eventtypes.TaskExit:
|
||||
e := any.(*eventtypes.TaskExit)
|
||||
logrus.Infof("TaskExit event %+v", e)
|
||||
cntr, err := em.containerStore.Get(e.ContainerID)
|
||||
cntr, err := em.c.containerStore.Get(e.ContainerID)
|
||||
if err == nil {
|
||||
if err := handleContainerExit(ctx, e, cntr); err != nil {
|
||||
return errors.Wrap(err, "failed to handle container TaskExit event")
|
||||
@ -216,7 +211,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
return errors.Wrap(err, "can't find container for TaskExit event")
|
||||
}
|
||||
// Use GetAll to include sandbox in unknown state.
|
||||
sb, err := em.sandboxStore.GetAll(e.ContainerID)
|
||||
sb, err := em.c.sandboxStore.GetAll(e.ContainerID)
|
||||
if err == nil {
|
||||
if err := handleSandboxExit(ctx, e, sb); err != nil {
|
||||
return errors.Wrap(err, "failed to handle sandbox TaskExit event")
|
||||
@ -229,12 +224,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
case *eventtypes.TaskOOM:
|
||||
e := any.(*eventtypes.TaskOOM)
|
||||
logrus.Infof("TaskOOM event %+v", e)
|
||||
cntr, err := em.containerStore.Get(e.ContainerID)
|
||||
cntr, err := em.c.containerStore.Get(e.ContainerID)
|
||||
if err != nil {
|
||||
if err != store.ErrNotExist {
|
||||
return errors.Wrap(err, "can't find container for TaskOOM event")
|
||||
}
|
||||
if _, err = em.sandboxStore.Get(e.ContainerID); err != nil {
|
||||
if _, err = em.c.sandboxStore.Get(e.ContainerID); err != nil {
|
||||
if err != store.ErrNotExist {
|
||||
return errors.Wrap(err, "can't find sandbox for TaskOOM event")
|
||||
}
|
||||
@ -252,15 +247,15 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
case *eventtypes.ImageCreate:
|
||||
e := any.(*eventtypes.ImageCreate)
|
||||
logrus.Infof("ImageCreate event %+v", e)
|
||||
return em.imageStore.Update(ctx, e.Name)
|
||||
return em.c.updateImage(ctx, e.Name)
|
||||
case *eventtypes.ImageUpdate:
|
||||
e := any.(*eventtypes.ImageUpdate)
|
||||
logrus.Infof("ImageUpdate event %+v", e)
|
||||
return em.imageStore.Update(ctx, e.Name)
|
||||
return em.c.updateImage(ctx, e.Name)
|
||||
case *eventtypes.ImageDelete:
|
||||
e := any.(*eventtypes.ImageDelete)
|
||||
logrus.Infof("ImageDelete event %+v", e)
|
||||
return em.imageStore.Update(ctx, e.Name)
|
||||
return em.c.updateImage(ctx, e.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -105,6 +105,10 @@ const (
|
||||
containerKindSandbox = "sandbox"
|
||||
// containerKindContainer is a label value indicating container is application container
|
||||
containerKindContainer = "container"
|
||||
// imageLabelKey is the label key indicating the image is managed by cri plugin.
|
||||
imageLabelKey = criContainerdPrefix + ".image"
|
||||
// imageLabelValue is the label value indicating the image is managed by cri plugin.
|
||||
imageLabelValue = "managed"
|
||||
// sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest
|
||||
sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata"
|
||||
// containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest
|
||||
|
@ -44,8 +44,11 @@ func (c *criService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (*a
|
||||
}
|
||||
for _, repoTag := range repoTags {
|
||||
// Update image store to reflect the newest state in containerd.
|
||||
if err := c.imageStore.Update(ctx, repoTag); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to update image store %q", repoTag)
|
||||
// Image imported by importer.Import is not treated as managed
|
||||
// by the cri plugin, call `updateImage` to make it managed.
|
||||
// TODO(random-liu): Replace this with the containerd library (issue #909).
|
||||
if err := c.updateImage(ctx, repoTag); err != nil {
|
||||
return nil, errors.Wrapf(err, "update image store %q", repoTag)
|
||||
}
|
||||
logrus.Debugf("Imported image %q", repoTag)
|
||||
}
|
||||
|
@ -114,14 +114,16 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
|
||||
imageID := configDesc.Digest.String()
|
||||
|
||||
repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1)
|
||||
for _, r := range []string{repoTag, repoDigest} {
|
||||
for _, r := range []string{imageID, repoTag, repoDigest} {
|
||||
if r == "" {
|
||||
continue
|
||||
}
|
||||
if err := c.createImageReference(ctx, r, image.Target()); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to update image reference %q", r)
|
||||
return nil, errors.Wrapf(err, "failed to create image reference %q", r)
|
||||
}
|
||||
// Update image store to reflect the newest state in containerd.
|
||||
// No need to use `updateImage`, because the image reference must
|
||||
// have been managed by the cri plugin.
|
||||
if err := c.imageStore.Update(ctx, r); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to update image store %q", r)
|
||||
}
|
||||
@ -174,20 +176,57 @@ func (c *criService) createImageReference(ctx context.Context, name string, desc
|
||||
img := containerdimages.Image{
|
||||
Name: name,
|
||||
Target: desc,
|
||||
// Add a label to indicate that the image is managed by the cri plugin.
|
||||
Labels: map[string]string{imageLabelKey: imageLabelValue},
|
||||
}
|
||||
// TODO(random-liu): Figure out which is the more performant sequence create then update or
|
||||
// update then create.
|
||||
_, err := c.client.ImageService().Create(ctx, img)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
oldImg, err := c.client.ImageService().Create(ctx, img)
|
||||
if err == nil || !errdefs.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
_, err = c.client.ImageService().Update(ctx, img, "target")
|
||||
if oldImg.Target.Digest == img.Target.Digest && oldImg.Labels[imageLabelKey] == imageLabelValue {
|
||||
return nil
|
||||
}
|
||||
_, err = c.client.ImageService().Update(ctx, img, "target", "labels")
|
||||
return err
|
||||
}
|
||||
|
||||
// updateImage updates image store to reflect the newest state of an image reference
|
||||
// in containerd. If the reference is not managed by the cri plugin, the function also
|
||||
// generates necessary metadata for the image and make it managed.
|
||||
func (c *criService) updateImage(ctx context.Context, r string) error {
|
||||
img, err := c.client.GetImage(ctx, r)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
return errors.Wrap(err, "get image by reference")
|
||||
}
|
||||
if err == nil && img.Labels()[imageLabelKey] != imageLabelValue {
|
||||
// Make sure the image has the image id as its unique
|
||||
// identifier that references the image in its lifetime.
|
||||
configDesc, err := img.Config(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get image id")
|
||||
}
|
||||
id := configDesc.Digest.String()
|
||||
if err := c.createImageReference(ctx, id, img.Target()); err != nil {
|
||||
return errors.Wrapf(err, "create image id reference %q", id)
|
||||
}
|
||||
if err := c.imageStore.Update(ctx, id); err != nil {
|
||||
return errors.Wrapf(err, "update image store for %q", id)
|
||||
}
|
||||
// The image id is ready, add the label to mark the image as managed.
|
||||
if err := c.createImageReference(ctx, r, img.Target()); err != nil {
|
||||
return errors.Wrap(err, "create managed label")
|
||||
}
|
||||
}
|
||||
// If the image is not found, we should continue updating the cache,
|
||||
// so that the image can be removed from the cache.
|
||||
if err := c.imageStore.Update(ctx, r); err != nil {
|
||||
return errors.Wrapf(err, "update image store for %q", r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// credentials returns a credential function for docker resolver to use.
|
||||
func (c *criService) credentials(auth *runtime.AuthConfig) func(string) (string, string, error) {
|
||||
return func(host string) (string, string, error) {
|
||||
|
@ -36,7 +36,6 @@ import (
|
||||
|
||||
cio "github.com/containerd/cri/pkg/server/io"
|
||||
containerstore "github.com/containerd/cri/pkg/store/container"
|
||||
imagestore "github.com/containerd/cri/pkg/store/image"
|
||||
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
|
||||
)
|
||||
|
||||
@ -96,7 +95,7 @@ func (c *criService) recover(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to list images")
|
||||
}
|
||||
loadImages(ctx, c.imageStore, cImages, c.config.ContainerdConfig.Snapshotter)
|
||||
c.loadImages(ctx, cImages)
|
||||
|
||||
// It's possible that containerd containers are deleted unexpectedly. In that case,
|
||||
// we can't even get metadata, we should cleanup orphaned sandbox/container directories
|
||||
@ -411,8 +410,8 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S
|
||||
}
|
||||
|
||||
// loadImages loads images from containerd.
|
||||
func loadImages(ctx context.Context, store *imagestore.Store, cImages []containerd.Image,
|
||||
snapshotter string) {
|
||||
func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) {
|
||||
snapshotter := c.config.ContainerdConfig.Snapshotter
|
||||
for _, i := range cImages {
|
||||
ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default())
|
||||
if err != nil {
|
||||
@ -433,7 +432,7 @@ func loadImages(ctx context.Context, store *imagestore.Store, cImages []containe
|
||||
logrus.Warnf("The image %s is not unpacked.", i.Name())
|
||||
// TODO(random-liu): Consider whether we should try unpack here.
|
||||
}
|
||||
if err := store.Update(ctx, i.Name()); err != nil {
|
||||
if err := c.updateImage(ctx, i.Name()); err != nil {
|
||||
logrus.WithError(err).Warnf("Failed to update reference for image %q", i.Name())
|
||||
continue
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
|
||||
return nil, errors.Wrap(err, "failed to create stream server")
|
||||
}
|
||||
|
||||
c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore, c.imageStore)
|
||||
c.eventMonitor = newEventMonitor(c)
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user