Manage unmanaged images in k8s.io namespace

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2018-09-25 23:38:26 -07:00
parent 4d553cbef8
commit db68300a5a
6 changed files with 79 additions and 39 deletions

View File

@ -34,7 +34,6 @@ import (
ctrdutil "github.com/containerd/cri/pkg/containerd/util" ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store" "github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container" containerstore "github.com/containerd/cri/pkg/store/container"
imagestore "github.com/containerd/cri/pkg/store/image"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox" sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
) )
@ -54,9 +53,7 @@ const (
// eventMonitor monitors containerd event and updates internal state correspondingly. // eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): Handle event for each container in a separate goroutine. // TODO(random-liu): Handle event for each container in a separate goroutine.
type eventMonitor struct { type eventMonitor struct {
containerStore *containerstore.Store c *criService
sandboxStore *sandboxstore.Store
imageStore *imagestore.Store
ch <-chan *events.Envelope ch <-chan *events.Envelope
errCh <-chan error errCh <-chan error
ctx context.Context ctx context.Context
@ -84,13 +81,11 @@ type backOffQueue struct {
// Create new event monitor. New event monitor will start subscribing containerd event. All events // Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored. // happen after it should be monitored.
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store, i *imagestore.Store) *eventMonitor { func newEventMonitor(c *criService) *eventMonitor {
// event subscribe doesn't need namespace. // event subscribe doesn't need namespace.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &eventMonitor{ return &eventMonitor{
containerStore: c, c: c,
sandboxStore: s,
imageStore: i,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
backOff: newBackOff(), backOff: newBackOff(),
@ -206,7 +201,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
case *eventtypes.TaskExit: case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit) e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e) logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID) cntr, err := em.c.containerStore.Get(e.ContainerID)
if err == nil { if err == nil {
if err := handleContainerExit(ctx, e, cntr); err != nil { if err := handleContainerExit(ctx, e, cntr); err != nil {
return errors.Wrap(err, "failed to handle container TaskExit event") return errors.Wrap(err, "failed to handle container TaskExit event")
@ -216,7 +211,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
return errors.Wrap(err, "can't find container for TaskExit event") return errors.Wrap(err, "can't find container for TaskExit event")
} }
// Use GetAll to include sandbox in unknown state. // Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID) sb, err := em.c.sandboxStore.GetAll(e.ContainerID)
if err == nil { if err == nil {
if err := handleSandboxExit(ctx, e, sb); err != nil { if err := handleSandboxExit(ctx, e, sb); err != nil {
return errors.Wrap(err, "failed to handle sandbox TaskExit event") return errors.Wrap(err, "failed to handle sandbox TaskExit event")
@ -229,12 +224,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
case *eventtypes.TaskOOM: case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM) e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e) logrus.Infof("TaskOOM event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID) cntr, err := em.c.containerStore.Get(e.ContainerID)
if err != nil { if err != nil {
if err != store.ErrNotExist { if err != store.ErrNotExist {
return errors.Wrap(err, "can't find container for TaskOOM event") return errors.Wrap(err, "can't find container for TaskOOM event")
} }
if _, err = em.sandboxStore.Get(e.ContainerID); err != nil { if _, err = em.c.sandboxStore.Get(e.ContainerID); err != nil {
if err != store.ErrNotExist { if err != store.ErrNotExist {
return errors.Wrap(err, "can't find sandbox for TaskOOM event") return errors.Wrap(err, "can't find sandbox for TaskOOM event")
} }
@ -252,15 +247,15 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
case *eventtypes.ImageCreate: case *eventtypes.ImageCreate:
e := any.(*eventtypes.ImageCreate) e := any.(*eventtypes.ImageCreate)
logrus.Infof("ImageCreate event %+v", e) logrus.Infof("ImageCreate event %+v", e)
return em.imageStore.Update(ctx, e.Name) return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageUpdate: case *eventtypes.ImageUpdate:
e := any.(*eventtypes.ImageUpdate) e := any.(*eventtypes.ImageUpdate)
logrus.Infof("ImageUpdate event %+v", e) logrus.Infof("ImageUpdate event %+v", e)
return em.imageStore.Update(ctx, e.Name) return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageDelete: case *eventtypes.ImageDelete:
e := any.(*eventtypes.ImageDelete) e := any.(*eventtypes.ImageDelete)
logrus.Infof("ImageDelete event %+v", e) logrus.Infof("ImageDelete event %+v", e)
return em.imageStore.Update(ctx, e.Name) return em.c.updateImage(ctx, e.Name)
} }
return nil return nil

View File

@ -105,6 +105,10 @@ const (
containerKindSandbox = "sandbox" containerKindSandbox = "sandbox"
// containerKindContainer is a label value indicating container is application container // containerKindContainer is a label value indicating container is application container
containerKindContainer = "container" containerKindContainer = "container"
// imageLabelKey is the label key indicating the image is managed by cri plugin.
imageLabelKey = criContainerdPrefix + ".image"
// imageLabelValue is the label value indicating the image is managed by cri plugin.
imageLabelValue = "managed"
// sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest // sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest
sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata" sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata"
// containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest // containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest

View File

@ -44,8 +44,11 @@ func (c *criService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (*a
} }
for _, repoTag := range repoTags { for _, repoTag := range repoTags {
// Update image store to reflect the newest state in containerd. // Update image store to reflect the newest state in containerd.
if err := c.imageStore.Update(ctx, repoTag); err != nil { // Image imported by importer.Import is not treated as managed
return nil, errors.Wrapf(err, "failed to update image store %q", repoTag) // by the cri plugin, call `updateImage` to make it managed.
// TODO(random-liu): Replace this with the containerd library (issue #909).
if err := c.updateImage(ctx, repoTag); err != nil {
return nil, errors.Wrapf(err, "update image store %q", repoTag)
} }
logrus.Debugf("Imported image %q", repoTag) logrus.Debugf("Imported image %q", repoTag)
} }

View File

@ -114,14 +114,16 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
imageID := configDesc.Digest.String() imageID := configDesc.Digest.String()
repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1) repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1)
for _, r := range []string{repoTag, repoDigest} { for _, r := range []string{imageID, repoTag, repoDigest} {
if r == "" { if r == "" {
continue continue
} }
if err := c.createImageReference(ctx, r, image.Target()); err != nil { if err := c.createImageReference(ctx, r, image.Target()); err != nil {
return nil, errors.Wrapf(err, "failed to update image reference %q", r) return nil, errors.Wrapf(err, "failed to create image reference %q", r)
} }
// Update image store to reflect the newest state in containerd. // Update image store to reflect the newest state in containerd.
// No need to use `updateImage`, because the image reference must
// have been managed by the cri plugin.
if err := c.imageStore.Update(ctx, r); err != nil { if err := c.imageStore.Update(ctx, r); err != nil {
return nil, errors.Wrapf(err, "failed to update image store %q", r) return nil, errors.Wrapf(err, "failed to update image store %q", r)
} }
@ -174,18 +176,55 @@ func (c *criService) createImageReference(ctx context.Context, name string, desc
img := containerdimages.Image{ img := containerdimages.Image{
Name: name, Name: name,
Target: desc, Target: desc,
// Add a label to indicate that the image is managed by the cri plugin.
Labels: map[string]string{imageLabelKey: imageLabelValue},
} }
// TODO(random-liu): Figure out which is the more performant sequence create then update or // TODO(random-liu): Figure out which is the more performant sequence create then update or
// update then create. // update then create.
_, err := c.client.ImageService().Create(ctx, img) oldImg, err := c.client.ImageService().Create(ctx, img)
if err == nil { if err == nil || !errdefs.IsAlreadyExists(err) {
return err
}
if oldImg.Target.Digest == img.Target.Digest && oldImg.Labels[imageLabelKey] == imageLabelValue {
return nil return nil
} }
if !errdefs.IsAlreadyExists(err) { _, err = c.client.ImageService().Update(ctx, img, "target", "labels")
return err return err
}
// updateImage updates image store to reflect the newest state of an image reference
// in containerd. If the reference is not managed by the cri plugin, the function also
// generates necessary metadata for the image and make it managed.
func (c *criService) updateImage(ctx context.Context, r string) error {
img, err := c.client.GetImage(ctx, r)
if err != nil && !errdefs.IsNotFound(err) {
return errors.Wrap(err, "get image by reference")
} }
_, err = c.client.ImageService().Update(ctx, img, "target") if err == nil && img.Labels()[imageLabelKey] != imageLabelValue {
return err // Make sure the image has the image id as its unique
// identifier that references the image in its lifetime.
configDesc, err := img.Config(ctx)
if err != nil {
return errors.Wrap(err, "get image id")
}
id := configDesc.Digest.String()
if err := c.createImageReference(ctx, id, img.Target()); err != nil {
return errors.Wrapf(err, "create image id reference %q", id)
}
if err := c.imageStore.Update(ctx, id); err != nil {
return errors.Wrapf(err, "update image store for %q", id)
}
// The image id is ready, add the label to mark the image as managed.
if err := c.createImageReference(ctx, r, img.Target()); err != nil {
return errors.Wrap(err, "create managed label")
}
}
// If the image is not found, we should continue updating the cache,
// so that the image can be removed from the cache.
if err := c.imageStore.Update(ctx, r); err != nil {
return errors.Wrapf(err, "update image store for %q", r)
}
return nil
} }
// credentials returns a credential function for docker resolver to use. // credentials returns a credential function for docker resolver to use.

View File

@ -36,7 +36,6 @@ import (
cio "github.com/containerd/cri/pkg/server/io" cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container" containerstore "github.com/containerd/cri/pkg/store/container"
imagestore "github.com/containerd/cri/pkg/store/image"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox" sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
) )
@ -96,7 +95,7 @@ func (c *criService) recover(ctx context.Context) error {
if err != nil { if err != nil {
return errors.Wrap(err, "failed to list images") return errors.Wrap(err, "failed to list images")
} }
loadImages(ctx, c.imageStore, cImages, c.config.ContainerdConfig.Snapshotter) c.loadImages(ctx, cImages)
// It's possible that containerd containers are deleted unexpectedly. In that case, // It's possible that containerd containers are deleted unexpectedly. In that case,
// we can't even get metadata, we should cleanup orphaned sandbox/container directories // we can't even get metadata, we should cleanup orphaned sandbox/container directories
@ -411,8 +410,8 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S
} }
// loadImages loads images from containerd. // loadImages loads images from containerd.
func loadImages(ctx context.Context, store *imagestore.Store, cImages []containerd.Image, func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) {
snapshotter string) { snapshotter := c.config.ContainerdConfig.Snapshotter
for _, i := range cImages { for _, i := range cImages {
ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default()) ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default())
if err != nil { if err != nil {
@ -433,7 +432,7 @@ func loadImages(ctx context.Context, store *imagestore.Store, cImages []containe
logrus.Warnf("The image %s is not unpacked.", i.Name()) logrus.Warnf("The image %s is not unpacked.", i.Name())
// TODO(random-liu): Consider whether we should try unpack here. // TODO(random-liu): Consider whether we should try unpack here.
} }
if err := store.Update(ctx, i.Name()); err != nil { if err := c.updateImage(ctx, i.Name()); err != nil {
logrus.WithError(err).Warnf("Failed to update reference for image %q", i.Name()) logrus.WithError(err).Warnf("Failed to update reference for image %q", i.Name())
continue continue
} }

View File

@ -157,7 +157,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
return nil, errors.Wrap(err, "failed to create stream server") return nil, errors.Wrap(err, "failed to create stream server")
} }
c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore, c.imageStore) c.eventMonitor = newEventMonitor(c)
return c, nil return c, nil
} }