Merge pull request #628 from yanxuean/nits

Handle containerd event reliably
This commit is contained in:
Lantao Liu 2018-03-15 11:35:59 -07:00 committed by GitHub
commit eff311d493
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 327 additions and 47 deletions

View File

@ -47,7 +47,7 @@ type OS interface {
// RealOS is used to dispatch the real system level operations.
type RealOS struct{}
// MkdirAll will will call os.MkdirAll to create a directory.
// MkdirAll will call os.MkdirAll to create a directory.
func (RealOS) MkdirAll(path string, perm os.FileMode) error {
return os.MkdirAll(path, perm)
}
@ -79,7 +79,7 @@ func (RealOS) ResolveSymbolicLink(path string) (string, error) {
return filepath.EvalSymlinks(path)
}
// CopyFile copys src file to dest file
// CopyFile will copy src file to dest file
func (RealOS) CopyFile(src, dest string, perm os.FileMode) error {
in, err := os.Open(src)
if err != nil {

View File

@ -18,14 +18,18 @@ package server
import (
"errors"
"fmt"
"time"
eventtypes "github.com/containerd/containerd/api/events"
containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
gogotypes "github.com/gogo/protobuf/types"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/util/clock"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store"
@ -33,6 +37,12 @@ import (
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
const (
backOffInitDuration = 1 * time.Second
backOffMaxDuration = 5 * time.Minute
backOffExpireCheckDuration = 1 * time.Second
)
// eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): [P1] Figure out is it possible to drop event during containerd
// is running. If it is, we should do periodically list to sync state with containerd.
@ -43,6 +53,23 @@ type eventMonitor struct {
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
}
type backOff struct {
queuePool map[string]*backOffQueue
ticker *time.Ticker
minDuration time.Duration
maxDuration time.Duration
checkDuration time.Duration
clock clock.Clock
}
type backOffQueue struct {
events []interface{}
expireTime time.Time
duration time.Duration
clock clock.Clock
}
// Create new event monitor. New event monitor will start subscribing containerd event. All events
@ -55,6 +82,8 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit
sandboxStore: s,
ctx: ctx,
cancel: cancel,
backOff: newBackOff(backOffInitDuration, backOffMaxDuration,
backOffExpireCheckDuration, clock.RealClock{}),
}
}
@ -67,6 +96,24 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
containerID := ""
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", nil, fmt.Errorf("failed to unmarshalany %+v", err)
}
switch evt.(type) {
case *eventtypes.TaskExit:
containerID = evt.(*eventtypes.TaskExit).ContainerID
case *eventtypes.TaskOOM:
containerID = evt.(*eventtypes.TaskOOM).ContainerID
default:
return "", nil, fmt.Errorf("unsupported event")
}
return containerID, evt, nil
}
// start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop. start must be called after
// subscribe.
@ -76,15 +123,40 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
}
closeCh := make(chan struct{})
go func() {
backOffCheckCh := em.backOff.start()
for {
select {
case e := <-em.ch:
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
em.handleEvent(e)
cID, evt, err := convertEvent(e.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.backOff.isInBackOff(cID) {
em.backOff.enBackOff(cID, evt)
break
}
if err := em.handleEvent(evt); err != nil {
logrus.WithError(err).Errorf("Failed to handle event %+v for container %s", evt, cID)
em.backOff.enBackOff(cID, evt)
}
case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream")
close(closeCh)
return
case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers()
for _, cID := range cIDs {
queue := em.backOff.deBackOff(cID)
for i, any := range queue.events {
if err := em.handleEvent(any); err != nil {
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for container %s", any, cID)
em.backOff.reBackOff(cID, queue.events[i:], queue.duration)
break
}
}
}
}
}
}()
@ -94,17 +166,13 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
// stop stops the event monitor. It will close the event channel.
// Once event monitor is stopped, it can't be started.
func (em *eventMonitor) stop() {
em.backOff.stop()
em.cancel()
}
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(evt *events.Envelope) {
func (em *eventMonitor) handleEvent(any interface{}) error {
ctx := ctrdutil.NamespacedContext()
any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt)
return
}
switch any.(type) {
// If containerd-shim exits unexpectedly, there will be no corresponding event.
// However, containerd could not retrieve container state in that case, so it's
@ -112,51 +180,59 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
// TODO(random-liu): [P2] Handle containerd-shim exit.
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err == nil {
handleContainerExit(ctx, e, cntr)
return
if err := handleContainerExit(ctx, e, cntr); err != nil {
return fmt.Errorf("failed to handle container TaskExit event:%+v", err)
}
return nil
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
return
return fmt.Errorf("can't find container for TaskExit event:%+v", err)
}
// Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID)
if err == nil {
handleSandboxExit(ctx, e, sb)
return
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID)
return
if err := handleSandboxExit(ctx, e, sb); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event:%+v", err)
}
return nil
} else if err != store.ErrNotExist {
return fmt.Errorf("can't find sandbox for TaskExit event:%+v", err)
}
return nil
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err != nil {
if _, err := em.sandboxStore.Get(e.ContainerID); err == nil {
return
if err != store.ErrNotExist {
return fmt.Errorf("can't find container for TaskOOM event:%+v", err)
}
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
return
if _, err = em.sandboxStore.Get(e.ContainerID); err != nil {
if err != store.ErrNotExist {
return fmt.Errorf("can't find sandbox for TaskOOM event:%+v", err)
}
return nil
}
return nil
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Reason = oomExitReason
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update container %q oom", e.ContainerID)
return
return fmt.Errorf("failed to update container status for TaskOOM event:%+v", err)
}
}
return nil
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) {
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error {
if e.Pid != cntr.Status.Get().Pid {
// Non-init process died, ignore the event.
return
return nil
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
@ -166,16 +242,13 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for container %q", e.ContainerID)
return
return fmt.Errorf("failed to load task for container: %v", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID)
return
return fmt.Errorf("failed to stop container: %v", err)
}
// Move on to make sure container status is updated.
}
@ -192,34 +265,30 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update container %q state", e.ContainerID)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
return fmt.Errorf("failed to update container state: %v", err)
}
// Using channel to propagate the information of container stop
cntr.Stop()
return nil
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error {
if e.Pid != sb.Status.Get().Pid {
// Non-init process died, ignore the event.
return
return nil
}
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID)
return
return fmt.Errorf("failed to load task for sandbox: %v", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID)
return
return fmt.Errorf("failed to stop sandbox: %v", err)
}
// Move on to make sure container status is updated.
}
@ -238,10 +307,84 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update sandbox %q state", e.ContainerID)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
return fmt.Errorf("failed to update sandbox state: %v", err)
}
// Using channel to propagate the information of sandbox stop
sb.Stop()
return nil
}
func newBackOff(min, max, check time.Duration, c clock.Clock) *backOff {
return &backOff{
queuePool: map[string]*backOffQueue{},
minDuration: min,
maxDuration: max,
checkDuration: check,
clock: c,
}
}
func (b *backOff) getExpiredContainers() []string {
var containers []string
for c, q := range b.queuePool {
if q.isExpire() {
containers = append(containers, c)
}
}
return containers
}
func (b *backOff) isInBackOff(key string) bool {
if _, ok := b.queuePool[key]; ok {
return true
}
return false
}
// enBackOff start to backOff and put event to the tail of queue
func (b *backOff) enBackOff(key string, evt interface{}) {
if queue, ok := b.queuePool[key]; ok {
queue.events = append(queue.events, evt)
return
}
b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock)
}
// enBackOff get out the whole queue
func (b *backOff) deBackOff(key string) *backOffQueue {
queue := b.queuePool[key]
delete(b.queuePool, key)
return queue
}
// enBackOff start to backOff again and put events to the queue
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
duration := 2 * oldDuration
if duration > b.maxDuration {
duration = b.maxDuration
}
b.queuePool[key] = newBackOffQueue(events, duration, b.clock)
}
func (b *backOff) start() <-chan time.Time {
b.ticker = time.NewTicker(b.checkDuration)
return b.ticker.C
}
func (b *backOff) stop() {
b.ticker.Stop()
}
func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue {
return &backOffQueue{
events: events,
duration: init,
expireTime: c.Now().Add(init),
clock: c,
}
}
func (q *backOffQueue) isExpire() bool {
// return time.Now >= expireTime
return !q.clock.Now().Before(q.expireTime)
}

137
pkg/server/events_test.go Normal file
View File

@ -0,0 +1,137 @@
/*
Copyright 2018 The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
"time"
eventtypes "github.com/containerd/containerd/api/events"
//"github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/typeurl"
//gogotypes "github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/clock"
)
// TestBackOff tests the logic of backOff struct.
func TestBackOff(t *testing.T) {
testStartTime := time.Now()
testClock := clock.NewFakeClock(testStartTime)
inputQueues := map[string]*backOffQueue{
"container1": {
events: []interface{}{
&eventtypes.TaskExit{ContainerID: "container1", ID: "1"},
&eventtypes.TaskExit{ContainerID: "container1", ID: "2"},
},
},
"container2": {
events: []interface{}{
&eventtypes.TaskExit{ContainerID: "container2", ID: "1"},
&eventtypes.TaskExit{ContainerID: "container2", ID: "2"},
},
},
}
expectedQueues := map[string]*backOffQueue{
"container2": {
events: []interface{}{
&eventtypes.TaskExit{ContainerID: "container2", ID: "1"},
&eventtypes.TaskExit{ContainerID: "container2", ID: "2"},
},
expireTime: testClock.Now().Add(backOffInitDuration),
duration: backOffInitDuration,
clock: testClock,
},
"container1": {
events: []interface{}{
&eventtypes.TaskExit{ContainerID: "container1", ID: "1"},
&eventtypes.TaskExit{ContainerID: "container1", ID: "2"},
},
expireTime: testClock.Now().Add(backOffInitDuration),
duration: backOffInitDuration,
clock: testClock,
},
}
t.Logf("Should be able to backOff a event")
actual := newBackOff(backOffInitDuration, backOffMaxDuration, backOffExpireCheckDuration, testClock)
for k, queue := range inputQueues {
for _, event := range queue.events {
actual.enBackOff(k, event)
}
}
assert.Equal(t, actual.queuePool, expectedQueues)
t.Logf("Should be able to check if the container is in backOff state")
for k, queue := range inputQueues {
for _, e := range queue.events {
any, err := typeurl.MarshalAny(e)
assert.NoError(t, err)
key, _, err := convertEvent(any)
assert.NoError(t, err)
assert.Equal(t, k, key)
assert.Equal(t, actual.isInBackOff(key), true)
}
}
t.Logf("Should be able to check that a container isn't in backOff state")
notExistKey := "containerNotExist"
assert.Equal(t, actual.isInBackOff(notExistKey), false)
t.Logf("Should be able to get all keys which are expired for backOff")
testClock.Sleep(backOffInitDuration)
expKeyMap := map[string]struct{}{}
for k := range inputQueues {
expKeyMap[k] = struct{}{}
}
actKeyList := actual.getExpiredContainers()
actKeyMap := map[string]struct{}{} //assert.Equal can't compare slice without order
for _, k := range actKeyList {
actKeyMap[k] = struct{}{}
}
assert.Equal(t, actKeyMap, expKeyMap)
t.Logf("Should be able to get out all backOff events")
doneQueues := map[string]*backOffQueue{}
for k := range inputQueues {
actQueue := actual.deBackOff(k)
doneQueues[k] = actQueue
assert.Equal(t, actQueue, expectedQueues[k])
}
t.Logf("Should not get out the event again after having got out the backOff event")
for k := range inputQueues {
var expect *backOffQueue
actQueue := actual.deBackOff(k)
assert.Equal(t, actQueue, expect)
}
t.Logf("Should be able to reBackOff")
for k, queue := range doneQueues {
failEventIndex := 1
events := queue.events[failEventIndex:]
actual.reBackOff(k, events, queue.duration)
actQueue := actual.deBackOff(k)
expQueue := &backOffQueue{
events: events,
expireTime: testClock.Now().Add(2 * queue.duration),
duration: 2 * queue.duration,
clock: testClock,
}
assert.Equal(t, actQueue, expQueue)
}
}

View File

@ -126,9 +126,9 @@ func makeSandboxName(s *runtime.PodSandboxMetadata) string {
func makeContainerName(c *runtime.ContainerMetadata, s *runtime.PodSandboxMetadata) string {
return strings.Join([]string{
c.Name, // 0
s.Name, // 1: sandbox name
s.Namespace, // 2: sandbox namespace
s.Uid, // 3: sandbox uid
s.Name, // 1: pod name
s.Namespace, // 2: pod namespace
s.Uid, // 3: pod uid
fmt.Sprintf("%d", c.Attempt), // 4
}, nameDelimiter)
}