From db68300a5aa5cb52b6d5eae58d9b89159da7b1ef Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 25 Sep 2018 23:38:26 -0700 Subject: [PATCH] Manage unmanaged images in k8s.io namespace Signed-off-by: Lantao Liu --- pkg/server/events.go | 41 +++++++++++++----------------- pkg/server/helpers.go | 4 +++ pkg/server/image_load.go | 7 +++-- pkg/server/image_pull.go | 55 ++++++++++++++++++++++++++++++++++------ pkg/server/restart.go | 9 +++---- pkg/server/service.go | 2 +- 6 files changed, 79 insertions(+), 39 deletions(-) diff --git a/pkg/server/events.go b/pkg/server/events.go index e6e0321b6..4681000df 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -34,7 +34,6 @@ import ( ctrdutil "github.com/containerd/cri/pkg/containerd/util" "github.com/containerd/cri/pkg/store" containerstore "github.com/containerd/cri/pkg/store/container" - imagestore "github.com/containerd/cri/pkg/store/image" sandboxstore "github.com/containerd/cri/pkg/store/sandbox" ) @@ -54,14 +53,12 @@ const ( // eventMonitor monitors containerd event and updates internal state correspondingly. // TODO(random-liu): Handle event for each container in a separate goroutine. type eventMonitor struct { - containerStore *containerstore.Store - sandboxStore *sandboxstore.Store - imageStore *imagestore.Store - ch <-chan *events.Envelope - errCh <-chan error - ctx context.Context - cancel context.CancelFunc - backOff *backOff + c *criService + ch <-chan *events.Envelope + errCh <-chan error + ctx context.Context + cancel context.CancelFunc + backOff *backOff } type backOff struct { @@ -84,16 +81,14 @@ type backOffQueue struct { // Create new event monitor. New event monitor will start subscribing containerd event. All events // happen after it should be monitored. -func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store, i *imagestore.Store) *eventMonitor { +func newEventMonitor(c *criService) *eventMonitor { // event subscribe doesn't need namespace. ctx, cancel := context.WithCancel(context.Background()) return &eventMonitor{ - containerStore: c, - sandboxStore: s, - imageStore: i, - ctx: ctx, - cancel: cancel, - backOff: newBackOff(), + c: c, + ctx: ctx, + cancel: cancel, + backOff: newBackOff(), } } @@ -206,7 +201,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { case *eventtypes.TaskExit: e := any.(*eventtypes.TaskExit) logrus.Infof("TaskExit event %+v", e) - cntr, err := em.containerStore.Get(e.ContainerID) + cntr, err := em.c.containerStore.Get(e.ContainerID) if err == nil { if err := handleContainerExit(ctx, e, cntr); err != nil { return errors.Wrap(err, "failed to handle container TaskExit event") @@ -216,7 +211,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { return errors.Wrap(err, "can't find container for TaskExit event") } // Use GetAll to include sandbox in unknown state. - sb, err := em.sandboxStore.GetAll(e.ContainerID) + sb, err := em.c.sandboxStore.GetAll(e.ContainerID) if err == nil { if err := handleSandboxExit(ctx, e, sb); err != nil { return errors.Wrap(err, "failed to handle sandbox TaskExit event") @@ -229,12 +224,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error { case *eventtypes.TaskOOM: e := any.(*eventtypes.TaskOOM) logrus.Infof("TaskOOM event %+v", e) - cntr, err := em.containerStore.Get(e.ContainerID) + cntr, err := em.c.containerStore.Get(e.ContainerID) if err != nil { if err != store.ErrNotExist { return errors.Wrap(err, "can't find container for TaskOOM event") } - if _, err = em.sandboxStore.Get(e.ContainerID); err != nil { + if _, err = em.c.sandboxStore.Get(e.ContainerID); err != nil { if err != store.ErrNotExist { return errors.Wrap(err, "can't find sandbox for TaskOOM event") } @@ -252,15 +247,15 @@ func (em *eventMonitor) handleEvent(any interface{}) error { case *eventtypes.ImageCreate: e := any.(*eventtypes.ImageCreate) logrus.Infof("ImageCreate event %+v", e) - return em.imageStore.Update(ctx, e.Name) + return em.c.updateImage(ctx, e.Name) case *eventtypes.ImageUpdate: e := any.(*eventtypes.ImageUpdate) logrus.Infof("ImageUpdate event %+v", e) - return em.imageStore.Update(ctx, e.Name) + return em.c.updateImage(ctx, e.Name) case *eventtypes.ImageDelete: e := any.(*eventtypes.ImageDelete) logrus.Infof("ImageDelete event %+v", e) - return em.imageStore.Update(ctx, e.Name) + return em.c.updateImage(ctx, e.Name) } return nil diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 40e5ac0a5..d11918685 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -105,6 +105,10 @@ const ( containerKindSandbox = "sandbox" // containerKindContainer is a label value indicating container is application container containerKindContainer = "container" + // imageLabelKey is the label key indicating the image is managed by cri plugin. + imageLabelKey = criContainerdPrefix + ".image" + // imageLabelValue is the label value indicating the image is managed by cri plugin. + imageLabelValue = "managed" // sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata" // containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest diff --git a/pkg/server/image_load.go b/pkg/server/image_load.go index d3741a021..f286048d3 100644 --- a/pkg/server/image_load.go +++ b/pkg/server/image_load.go @@ -44,8 +44,11 @@ func (c *criService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (*a } for _, repoTag := range repoTags { // Update image store to reflect the newest state in containerd. - if err := c.imageStore.Update(ctx, repoTag); err != nil { - return nil, errors.Wrapf(err, "failed to update image store %q", repoTag) + // Image imported by importer.Import is not treated as managed + // by the cri plugin, call `updateImage` to make it managed. + // TODO(random-liu): Replace this with the containerd library (issue #909). + if err := c.updateImage(ctx, repoTag); err != nil { + return nil, errors.Wrapf(err, "update image store %q", repoTag) } logrus.Debugf("Imported image %q", repoTag) } diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index 48bc4c562..cb66f138a 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -114,14 +114,16 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) imageID := configDesc.Digest.String() repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1) - for _, r := range []string{repoTag, repoDigest} { + for _, r := range []string{imageID, repoTag, repoDigest} { if r == "" { continue } if err := c.createImageReference(ctx, r, image.Target()); err != nil { - return nil, errors.Wrapf(err, "failed to update image reference %q", r) + return nil, errors.Wrapf(err, "failed to create image reference %q", r) } // Update image store to reflect the newest state in containerd. + // No need to use `updateImage`, because the image reference must + // have been managed by the cri plugin. if err := c.imageStore.Update(ctx, r); err != nil { return nil, errors.Wrapf(err, "failed to update image store %q", r) } @@ -174,20 +176,57 @@ func (c *criService) createImageReference(ctx context.Context, name string, desc img := containerdimages.Image{ Name: name, Target: desc, + // Add a label to indicate that the image is managed by the cri plugin. + Labels: map[string]string{imageLabelKey: imageLabelValue}, } // TODO(random-liu): Figure out which is the more performant sequence create then update or // update then create. - _, err := c.client.ImageService().Create(ctx, img) - if err == nil { - return nil - } - if !errdefs.IsAlreadyExists(err) { + oldImg, err := c.client.ImageService().Create(ctx, img) + if err == nil || !errdefs.IsAlreadyExists(err) { return err } - _, err = c.client.ImageService().Update(ctx, img, "target") + if oldImg.Target.Digest == img.Target.Digest && oldImg.Labels[imageLabelKey] == imageLabelValue { + return nil + } + _, err = c.client.ImageService().Update(ctx, img, "target", "labels") return err } +// updateImage updates image store to reflect the newest state of an image reference +// in containerd. If the reference is not managed by the cri plugin, the function also +// generates necessary metadata for the image and make it managed. +func (c *criService) updateImage(ctx context.Context, r string) error { + img, err := c.client.GetImage(ctx, r) + if err != nil && !errdefs.IsNotFound(err) { + return errors.Wrap(err, "get image by reference") + } + if err == nil && img.Labels()[imageLabelKey] != imageLabelValue { + // Make sure the image has the image id as its unique + // identifier that references the image in its lifetime. + configDesc, err := img.Config(ctx) + if err != nil { + return errors.Wrap(err, "get image id") + } + id := configDesc.Digest.String() + if err := c.createImageReference(ctx, id, img.Target()); err != nil { + return errors.Wrapf(err, "create image id reference %q", id) + } + if err := c.imageStore.Update(ctx, id); err != nil { + return errors.Wrapf(err, "update image store for %q", id) + } + // The image id is ready, add the label to mark the image as managed. + if err := c.createImageReference(ctx, r, img.Target()); err != nil { + return errors.Wrap(err, "create managed label") + } + } + // If the image is not found, we should continue updating the cache, + // so that the image can be removed from the cache. + if err := c.imageStore.Update(ctx, r); err != nil { + return errors.Wrapf(err, "update image store for %q", r) + } + return nil +} + // credentials returns a credential function for docker resolver to use. func (c *criService) credentials(auth *runtime.AuthConfig) func(string) (string, string, error) { return func(host string) (string, string, error) { diff --git a/pkg/server/restart.go b/pkg/server/restart.go index bd25d152a..18d8d60c6 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -36,7 +36,6 @@ import ( cio "github.com/containerd/cri/pkg/server/io" containerstore "github.com/containerd/cri/pkg/store/container" - imagestore "github.com/containerd/cri/pkg/store/image" sandboxstore "github.com/containerd/cri/pkg/store/sandbox" ) @@ -96,7 +95,7 @@ func (c *criService) recover(ctx context.Context) error { if err != nil { return errors.Wrap(err, "failed to list images") } - loadImages(ctx, c.imageStore, cImages, c.config.ContainerdConfig.Snapshotter) + c.loadImages(ctx, cImages) // It's possible that containerd containers are deleted unexpectedly. In that case, // we can't even get metadata, we should cleanup orphaned sandbox/container directories @@ -411,8 +410,8 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S } // loadImages loads images from containerd. -func loadImages(ctx context.Context, store *imagestore.Store, cImages []containerd.Image, - snapshotter string) { +func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) { + snapshotter := c.config.ContainerdConfig.Snapshotter for _, i := range cImages { ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default()) if err != nil { @@ -433,7 +432,7 @@ func loadImages(ctx context.Context, store *imagestore.Store, cImages []containe logrus.Warnf("The image %s is not unpacked.", i.Name()) // TODO(random-liu): Consider whether we should try unpack here. } - if err := store.Update(ctx, i.Name()); err != nil { + if err := c.updateImage(ctx, i.Name()); err != nil { logrus.WithError(err).Warnf("Failed to update reference for image %q", i.Name()) continue } diff --git a/pkg/server/service.go b/pkg/server/service.go index bf9f73e66..ce648ae52 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -157,7 +157,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi return nil, errors.Wrap(err, "failed to create stream server") } - c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore, c.imageStore) + c.eventMonitor = newEventMonitor(c) return c, nil }