Add container event support to containerd

Signed-off-by: ruiwen-zhao <ruiwen@google.com>
This commit is contained in:
ruiwen-zhao 2022-08-12 19:24:07 +00:00
parent ae6c244995
commit a338abc902
13 changed files with 283 additions and 10 deletions

View File

@ -0,0 +1,160 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"context"
"fmt"
"testing"
"time"
"github.com/containerd/containerd/integration/images"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
const (
drainContainerEventChannelTimeout = 2 * time.Second
readContainerEventChannelTimeout = 500 * time.Millisecond
)
func TestContainerEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
t.Log("Set up container events streaming client")
containerEventsStreamingClient, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
assert.NoError(t, err)
containerEventsChan := make(chan *runtime.ContainerEventResponse)
go listenToEventChannel(ctx, t, containerEventsChan, containerEventsStreamingClient)
// drain all events emitted by previous tests.
drainContainerEventsChan(containerEventsChan)
t.Logf("Step 1: RunPodSandbox and check for expected events")
sandboxName := "container_events_sandbox"
sbConfig := PodSandboxConfig(sandboxName, "container_events")
sb, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler)
require.NoError(t, err)
t.Cleanup(func() {
expectedContainerStates := []runtime.ContainerState{}
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_NOTREADY}
t.Logf("Step 6: StopPodSandbox and check events")
assert.NoError(t, runtimeService.StopPodSandbox(sb))
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Logf("Step 7: RemovePodSandbox and check events")
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
})
// PodSandbox ready, container state list empty
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_READY}
expectedContainerStates := []runtime.ContainerState{}
// PodSandbox created. Check for start event for podsandbox container. Should be zero containers in the podsandbox
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
// PodSandbox started. Check for start event for podsandbox container. Should be zero containers in the podsandbox
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Logf("Step 2: CreateContainer and check events")
pauseImage := images.Get(images.Pause)
EnsureImageExists(t, pauseImage)
containerConfig := ContainerConfig(
"container1",
pauseImage,
WithTestLabels(),
WithTestAnnotations(),
)
cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig)
require.NoError(t, err)
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_CREATED}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Cleanup(func() {
t.Logf("Step 5: RemoveContainer and check events")
assert.NoError(t, runtimeService.RemoveContainer(cn))
// No container status after the container is removed
expectedContainerStates := []runtime.ContainerState{}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
})
t.Logf("Step 3: StartContainer and check events")
require.NoError(t, runtimeService.StartContainer(cn))
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_RUNNING}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Cleanup(func() {
t.Logf("Step 4: StopContainer and check events")
assert.NoError(t, runtimeService.StopContainer(cn, 10))
expectedContainerStates := []runtime.ContainerState{runtime.ContainerState_CONTAINER_EXITED}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
})
}
func listenToEventChannel(ctx context.Context, t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, containerEventsStreamingClient runtime.RuntimeService_GetContainerEventsClient) {
t.Helper()
for {
resp, err := containerEventsStreamingClient.Recv()
// Early return if context is canceled, in which case the error will be
// context canceled
select {
case <-ctx.Done():
return
default:
}
assert.NoError(t, err)
if resp != nil {
containerEventsChan <- resp
}
}
}
func drainContainerEventsChan(containerEventsChan chan *runtime.ContainerEventResponse) {
for {
select {
case <-containerEventsChan:
case <-time.After(drainContainerEventChannelTimeout):
return
}
}
}
func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) error {
t.Helper()
var resp *runtime.ContainerEventResponse
select {
case resp = <-containerEventsChan:
case <-time.After(readContainerEventChannelTimeout):
return fmt.Errorf("assertContainerEventResponse: timeout waiting for events from channel")
}
t.Logf("Container Event response received: %+v", *resp)
assert.Equal(t, expectedType, resp.ContainerEventType)
// Checking only the State field of PodSandboxStatus
if expectedPodSandboxStatus == nil {
assert.Nil(t, resp.PodSandboxStatus)
} else {
assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State)
}
// Checking only the State field of ContainersStatus
for i, cs := range resp.ContainersStatuses {
assert.Equal(t, expectedContainerStates[i], cs.State)
}
return nil
}

View File

@ -35,6 +35,7 @@ limitations under the License.
package cri
import (
"context"
"time"
"google.golang.org/grpc"
@ -75,6 +76,7 @@ type ContainerManager interface {
// for the container. If it returns error, new container log file MUST NOT
// be created.
ReopenContainerLog(ContainerID string, opts ...grpc.CallOption) error
GetContainerEvents(ctx context.Context, request *runtimeapi.GetEventsRequest, opts ...grpc.CallOption) (runtimeapi.RuntimeService_GetContainerEventsClient, error)
}
// PodSandboxManager contains methods for operating on PodSandboxes. The methods

View File

@ -594,3 +594,15 @@ func (r *RuntimeService) ReopenContainerLog(containerID string, opts ...grpc.Cal
klog.V(10).Infof("[RuntimeService] ReopenContainerLog Response (containerID=%v)", containerID)
return nil
}
// GetContainerEvents returns a GRPC client to stream container events
func (r *RuntimeService) GetContainerEvents(ctx context.Context, request *runtimeapi.GetEventsRequest, opts ...grpc.CallOption) (runtimeapi.RuntimeService_GetContainerEventsClient, error) {
klog.V(10).Infof("[RuntimeService] GetContainerEvents", r.timeout)
client, err := r.runtimeClient.GetContainerEvents(ctx, request, opts...)
if err != nil {
klog.Errorf("GetContainerEvents from runtime service failed: %v", err)
return nil, err
}
return client, nil
}

View File

@ -296,6 +296,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
if err := c.containerStore.Add(container); err != nil {
return nil, fmt.Errorf("failed to add container %q into store: %w", id, err)
}
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
if c.nri.isEnabled() {
err = c.nri.postCreateContainer(ctx, &sandbox, &container)

View File

@ -17,11 +17,19 @@
package server
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error {
return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented")
// TODO (https://github.com/containerd/containerd/issues/7318):
// replace with a real implementation that broadcasts containerEventsChan
// to all subscribers.
// TODO (https://github.com/containerd/containerd/issues/7658): Add Sandbox
// server support.
for event := range c.containerEventsChan {
if err := s.Send(&event); err != nil {
return err
}
}
return nil
}

View File

@ -118,6 +118,8 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
c.containerNameIndex.ReleaseByKey(id)
c.generateAndSendContainerEvent(ctx, id, container.SandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
containerRemoveTimer.WithValues(i.Runtime.Name).UpdateSince(start)
return &runtime.RemoveContainerResponse{}, nil

View File

@ -180,6 +180,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
// It handles the TaskExit event and update container state after this.
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
if c.nri.isEnabled() {
err = c.nri.postStartContainer(ctx, &sandbox, &cntr)
if err != nil {

View File

@ -72,6 +72,7 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
// stopContainer stops a container based on the container metadata.
func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
id := container.ID
sandboxID := container.SandboxID
// Return without error if container is not running. This makes sure that
// stop only takes real action after the container is started.
@ -90,7 +91,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
}
// Don't return for unknown state, some cleanup needs to be done.
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
return c.cleanupUnknownContainer(ctx, id, container)
return c.cleanupUnknownContainer(ctx, id, container, sandboxID)
}
return nil
}
@ -105,7 +106,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
}
return c.cleanupUnknownContainer(ctx, id, container)
return c.cleanupUnknownContainer(ctx, id, container, sandboxID)
}
exitCtx, exitCancel := context.WithCancel(context.Background())
@ -208,7 +209,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
}
// cleanupUnknownContainer cleanup stopped container in unknown state.
func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string) error {
// Reuse handleContainerExit to do the cleanup.
return handleContainerExit(ctx, &eventtypes.TaskExit{
ContainerID: id,
@ -216,5 +217,5 @@ func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cnt
Pid: 0,
ExitStatus: unknownExitCode,
ExitedAt: protobuf.ToTimestamp(time.Now()),
}, cntr, c)
}, cntr, sandboxID, c)
}

View File

@ -35,6 +35,7 @@ import (
"github.com/containerd/containerd/protobuf"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/utils/clock"
)
@ -187,7 +188,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(dctx, e, cntr, em.c); err != nil {
if err := handleContainerExit(dctx, e, cntr, cntr.SandboxID, em.c); err != nil {
return err
}
return nil
@ -313,7 +314,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr, em.c); err != nil {
if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil {
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
}
return nil
@ -362,7 +363,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, c *criService) error {
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error {
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
@ -411,6 +412,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
}
// Using channel to propagate the information of container stop
cntr.Stop()
c.generateAndSendContainerEvent(ctx, cntr.ID, sandboxID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
return nil
}
@ -441,6 +443,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
}
// Using channel to propagate the information of sandbox stop
sb.Stop()
c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
return nil
}

View File

@ -23,6 +23,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
@ -517,3 +518,67 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status)
}
return status
}
func (c *criService) generateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) {
podSandboxStatus, err := c.getPodSandboxStatus(ctx, sandboxID)
if err != nil {
// TODO(https://github.com/containerd/containerd/issues/7785):
// Do not skip events with nil PodSandboxStatus.
logrus.Errorf("Failed to get podSandbox status for container event for sandboxID %q: %v. Skipping sending the event.", sandboxID, err)
return
}
containerStatuses, err := c.getContainerStatuses(ctx, sandboxID)
if err != nil {
logrus.Errorf("Failed to get container statuses for container event for sandboxID %q: %v", sandboxID, err)
}
event := runtime.ContainerEventResponse{
ContainerId: containerID,
ContainerEventType: eventType,
CreatedAt: time.Now().UnixNano(),
PodSandboxStatus: podSandboxStatus,
ContainersStatuses: containerStatuses,
}
// TODO(ruiwen-zhao): write events to a cache, storage, or increase the size of the channel
select {
case c.containerEventsChan <- event:
default:
logrus.Debugf("containerEventsChan is full, discarding event %+v", event)
}
}
func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) {
request := &runtime.PodSandboxStatusRequest{PodSandboxId: podSandboxID}
response, err := c.PodSandboxStatus(ctx, request)
if err != nil {
return nil, err
}
return response.GetStatus(), nil
}
func (c *criService) getContainerStatuses(ctx context.Context, podSandboxID string) ([]*runtime.ContainerStatus, error) {
response, err := c.ListContainers(ctx, &runtime.ListContainersRequest{
Filter: &runtime.ContainerFilter{
PodSandboxId: podSandboxID,
},
})
if err != nil {
return nil, err
}
containerStatuses := []*runtime.ContainerStatus{}
for _, container := range response.Containers {
statusResp, err := c.ContainerStatus(ctx, &runtime.ContainerStatusRequest{
ContainerId: container.Id,
Verbose: false,
})
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return nil, err
}
containerStatuses = append(containerStatuses, statusResp.GetStatus())
}
return containerStatuses, nil
}

View File

@ -118,6 +118,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
// Release the sandbox name reserved for the sandbox.
c.sandboxNameIndex.ReleaseByKey(id)
// Send CONTAINER_DELETED event with both ContainerId and SandboxId equal to SandboxId.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start)
return &runtime.RemovePodSandboxResponse{}, nil

View File

@ -390,6 +390,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)
}
// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId.
// Note that this has to be done after sandboxStore.Add() because we need to get
// SandboxStatus from the store and include it in the event.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
@ -397,6 +402,9 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
// but we don't care about sandbox TaskOOM right now, so it is fine.
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh)
// Send CONTAINER_STARTED event with both ContainerId and SandboxId equal to SandboxId.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
sandboxRuntimeCreateTimer.WithValues(ociRuntime.Type).UpdateSince(runtimeStart)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil

View File

@ -122,6 +122,9 @@ type criService struct {
unpackDuplicationSuppressor kmutex.KeyedLocker
nri *nriAPI
// containerEventsChan is used to capture container events and send them
// to the caller of GetContainerEvents.
containerEventsChan chan runtime.ContainerEventResponse
}
// NewCRIService returns a new instance of CRIService
@ -143,6 +146,9 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.
unpackDuplicationSuppressor: kmutex.New(),
}
// TODO: figure out a proper channel size.
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
}