Create image reference cache.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2018-07-26 08:05:44 +00:00
parent cfdf872493
commit 953d67d250
22 changed files with 640 additions and 456 deletions

View File

@@ -34,6 +34,7 @@ import (
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container"
imagestore "github.com/containerd/cri/pkg/store/image"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
@@ -49,6 +50,7 @@ const (
type eventMonitor struct {
containerStore *containerstore.Store
sandboxStore *sandboxstore.Store
imageStore *imagestore.Store
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
@@ -76,12 +78,13 @@ type backOffQueue struct {
// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor {
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store, i *imagestore.Store) *eventMonitor {
// event subscribe doesn't need namespace.
ctx, cancel := context.WithCancel(context.Background())
return &eventMonitor{
containerStore: c,
sandboxStore: s,
imageStore: i,
ctx: ctx,
cancel: cancel,
backOff: newBackOff(),
@@ -93,12 +96,13 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
filters := []string{
`topic=="/tasks/exit"`,
`topic=="/tasks/oom"`,
`topic~="/images/"`,
}
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
containerID := ""
id := ""
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", nil, errors.Wrap(err, "failed to unmarshalany")
@@ -106,16 +110,22 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
switch evt.(type) {
case *eventtypes.TaskExit:
containerID = evt.(*eventtypes.TaskExit).ContainerID
id = evt.(*eventtypes.TaskExit).ContainerID
case *eventtypes.TaskOOM:
containerID = evt.(*eventtypes.TaskOOM).ContainerID
id = evt.(*eventtypes.TaskOOM).ContainerID
case *eventtypes.ImageCreate:
id = evt.(*eventtypes.ImageCreate).Name
case *eventtypes.ImageUpdate:
id = evt.(*eventtypes.ImageUpdate).Name
case *eventtypes.ImageDelete:
id = evt.(*eventtypes.ImageDelete).Name
default:
return "", nil, errors.New("unsupported event")
}
return containerID, evt, nil
return id, evt, nil
}
// start starts the event monitor which monitors and handles all container events. It returns
// start starts the event monitor which monitors and handles all subscribed events. It returns
// an error channel for the caller to wait for stop errors from the event monitor.
// start must be called after subscribe.
func (em *eventMonitor) start() <-chan error {
@@ -130,19 +140,19 @@ func (em *eventMonitor) start() <-chan error {
select {
case e := <-em.ch:
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
cID, evt, err := convertEvent(e.Event)
id, evt, err := convertEvent(e.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.backOff.isInBackOff(cID) {
logrus.Infof("Events for container %q is in backoff, enqueue event %+v", cID, evt)
em.backOff.enBackOff(cID, evt)
if em.backOff.isInBackOff(id) {
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
em.backOff.enBackOff(id, evt)
break
}
if err := em.handleEvent(evt); err != nil {
logrus.WithError(err).Errorf("Failed to handle event %+v for container %s", evt, cID)
em.backOff.enBackOff(cID, evt)
logrus.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id)
em.backOff.enBackOff(id, evt)
}
case err := <-em.errCh:
// Close errCh in defer directly if there is no error.
@@ -152,13 +162,13 @@ func (em *eventMonitor) start() <-chan error {
}
return
case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers()
for _, cID := range cIDs {
queue := em.backOff.deBackOff(cID)
ids := em.backOff.getExpiredIDs()
for _, id := range ids {
queue := em.backOff.deBackOff(id)
for i, any := range queue.events {
if err := em.handleEvent(any); err != nil {
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for container %s", any, cID)
em.backOff.reBackOff(cID, queue.events[i:], queue.duration)
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for %s", any, id)
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
break
}
}
@@ -230,6 +240,18 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
if err != nil {
return errors.Wrap(err, "failed to update container status for TaskOOM event")
}
case *eventtypes.ImageCreate:
e := any.(*eventtypes.ImageCreate)
logrus.Infof("ImageCreate event %+v", e)
return em.imageStore.Update(ctx, e.Name)
case *eventtypes.ImageUpdate:
e := any.(*eventtypes.ImageUpdate)
logrus.Infof("ImageUpdate event %+v", e)
return em.imageStore.Update(ctx, e.Name)
case *eventtypes.ImageDelete:
e := any.(*eventtypes.ImageDelete)
logrus.Infof("ImageDelete event %+v", e)
return em.imageStore.Update(ctx, e.Name)
}
return nil
@@ -331,14 +353,14 @@ func newBackOff() *backOff {
}
}
func (b *backOff) getExpiredContainers() []string {
var containers []string
for c, q := range b.queuePool {
func (b *backOff) getExpiredIDs() []string {
var ids []string
for id, q := range b.queuePool {
if q.isExpire() {
containers = append(containers, c)
ids = append(ids, id)
}
}
return containers
return ids
}
func (b *backOff) isInBackOff(key string) bool {