From a338abc902d9f204dcb9df7212d39fd7d07ac06d Mon Sep 17 00:00:00 2001 From: ruiwen-zhao Date: Fri, 12 Aug 2022 19:24:07 +0000 Subject: [PATCH] Add container event support to containerd Signed-off-by: ruiwen-zhao --- integration/container_event_test.go | 160 +++++++++++++++++++++++ integration/cri-api/pkg/apis/services.go | 2 + integration/remote/remote_runtime.go | 12 ++ pkg/cri/server/container_create.go | 1 + pkg/cri/server/container_events.go | 14 +- pkg/cri/server/container_remove.go | 2 + pkg/cri/server/container_start.go | 2 + pkg/cri/server/container_stop.go | 9 +- pkg/cri/server/events.go | 9 +- pkg/cri/server/helpers.go | 65 +++++++++ pkg/cri/server/sandbox_remove.go | 3 + pkg/cri/server/sandbox_run.go | 8 ++ pkg/cri/server/service.go | 6 + 13 files changed, 283 insertions(+), 10 deletions(-) create mode 100644 integration/container_event_test.go diff --git a/integration/container_event_test.go b/integration/container_event_test.go new file mode 100644 index 000000000..4b30193c1 --- /dev/null +++ b/integration/container_event_test.go @@ -0,0 +1,160 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/containerd/containerd/integration/images" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +const ( + drainContainerEventChannelTimeout = 2 * time.Second + readContainerEventChannelTimeout = 500 * time.Millisecond +) + +func TestContainerEvents(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + t.Log("Set up container events streaming client") + containerEventsStreamingClient, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{}) + assert.NoError(t, err) + containerEventsChan := make(chan *runtime.ContainerEventResponse) + + go listenToEventChannel(ctx, t, containerEventsChan, containerEventsStreamingClient) + // drain all events emitted by previous tests. + drainContainerEventsChan(containerEventsChan) + + t.Logf("Step 1: RunPodSandbox and check for expected events") + sandboxName := "container_events_sandbox" + sbConfig := PodSandboxConfig(sandboxName, "container_events") + + sb, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler) + require.NoError(t, err) + t.Cleanup(func() { + expectedContainerStates := []runtime.ContainerState{} + expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_NOTREADY} + t.Logf("Step 6: StopPodSandbox and check events") + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates) + t.Logf("Step 7: RemovePodSandbox and check events") + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates) + }) + + // PodSandbox ready, container state list empty + expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_READY} + expectedContainerStates := []runtime.ContainerState{} + // PodSandbox created. Check for start event for podsandbox container. Should be zero containers in the podsandbox + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates) + // PodSandbox started. Check for start event for podsandbox container. Should be zero containers in the podsandbox + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates) + + t.Logf("Step 2: CreateContainer and check events") + pauseImage := images.Get(images.Pause) + EnsureImageExists(t, pauseImage) + containerConfig := ContainerConfig( + "container1", + pauseImage, + WithTestLabels(), + WithTestAnnotations(), + ) + cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig) + require.NoError(t, err) + expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_CREATED} + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates) + + t.Cleanup(func() { + t.Logf("Step 5: RemoveContainer and check events") + assert.NoError(t, runtimeService.RemoveContainer(cn)) + // No container status after the container is removed + expectedContainerStates := []runtime.ContainerState{} + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates) + }) + + t.Logf("Step 3: StartContainer and check events") + require.NoError(t, runtimeService.StartContainer(cn)) + expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_RUNNING} + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates) + + t.Cleanup(func() { + t.Logf("Step 4: StopContainer and check events") + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + expectedContainerStates := []runtime.ContainerState{runtime.ContainerState_CONTAINER_EXITED} + checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates) + }) +} + +func listenToEventChannel(ctx context.Context, t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, containerEventsStreamingClient runtime.RuntimeService_GetContainerEventsClient) { + t.Helper() + for { + resp, err := containerEventsStreamingClient.Recv() + // Early return if context is canceled, in which case the error will be + // context canceled + select { + case <-ctx.Done(): + return + default: + } + assert.NoError(t, err) + if resp != nil { + containerEventsChan <- resp + } + } +} + +func drainContainerEventsChan(containerEventsChan chan *runtime.ContainerEventResponse) { + for { + select { + case <-containerEventsChan: + case <-time.After(drainContainerEventChannelTimeout): + return + } + } +} + +func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) error { + t.Helper() + var resp *runtime.ContainerEventResponse + select { + case resp = <-containerEventsChan: + case <-time.After(readContainerEventChannelTimeout): + return fmt.Errorf("assertContainerEventResponse: timeout waiting for events from channel") + } + t.Logf("Container Event response received: %+v", *resp) + assert.Equal(t, expectedType, resp.ContainerEventType) + + // Checking only the State field of PodSandboxStatus + if expectedPodSandboxStatus == nil { + assert.Nil(t, resp.PodSandboxStatus) + } else { + assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State) + } + + // Checking only the State field of ContainersStatus + for i, cs := range resp.ContainersStatuses { + assert.Equal(t, expectedContainerStates[i], cs.State) + } + return nil +} diff --git a/integration/cri-api/pkg/apis/services.go b/integration/cri-api/pkg/apis/services.go index 6850093ae..74591cb42 100644 --- a/integration/cri-api/pkg/apis/services.go +++ b/integration/cri-api/pkg/apis/services.go @@ -35,6 +35,7 @@ limitations under the License. package cri import ( + "context" "time" "google.golang.org/grpc" @@ -75,6 +76,7 @@ type ContainerManager interface { // for the container. If it returns error, new container log file MUST NOT // be created. ReopenContainerLog(ContainerID string, opts ...grpc.CallOption) error + GetContainerEvents(ctx context.Context, request *runtimeapi.GetEventsRequest, opts ...grpc.CallOption) (runtimeapi.RuntimeService_GetContainerEventsClient, error) } // PodSandboxManager contains methods for operating on PodSandboxes. The methods diff --git a/integration/remote/remote_runtime.go b/integration/remote/remote_runtime.go index b172b949a..22d3d72bb 100644 --- a/integration/remote/remote_runtime.go +++ b/integration/remote/remote_runtime.go @@ -594,3 +594,15 @@ func (r *RuntimeService) ReopenContainerLog(containerID string, opts ...grpc.Cal klog.V(10).Infof("[RuntimeService] ReopenContainerLog Response (containerID=%v)", containerID) return nil } + +// GetContainerEvents returns a GRPC client to stream container events +func (r *RuntimeService) GetContainerEvents(ctx context.Context, request *runtimeapi.GetEventsRequest, opts ...grpc.CallOption) (runtimeapi.RuntimeService_GetContainerEventsClient, error) { + klog.V(10).Infof("[RuntimeService] GetContainerEvents", r.timeout) + + client, err := r.runtimeClient.GetContainerEvents(ctx, request, opts...) + if err != nil { + klog.Errorf("GetContainerEvents from runtime service failed: %v", err) + return nil, err + } + return client, nil +} diff --git a/pkg/cri/server/container_create.go b/pkg/cri/server/container_create.go index 331af8fb1..72c4a6df1 100644 --- a/pkg/cri/server/container_create.go +++ b/pkg/cri/server/container_create.go @@ -296,6 +296,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta if err := c.containerStore.Add(container); err != nil { return nil, fmt.Errorf("failed to add container %q into store: %w", id, err) } + c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) if c.nri.isEnabled() { err = c.nri.postCreateContainer(ctx, &sandbox, &container) diff --git a/pkg/cri/server/container_events.go b/pkg/cri/server/container_events.go index 560798fb5..ee34633ea 100644 --- a/pkg/cri/server/container_events.go +++ b/pkg/cri/server/container_events.go @@ -17,11 +17,19 @@ package server import ( - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error { - return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented") + // TODO (https://github.com/containerd/containerd/issues/7318): + // replace with a real implementation that broadcasts containerEventsChan + // to all subscribers. + // TODO (https://github.com/containerd/containerd/issues/7658): Add Sandbox + // server support. + for event := range c.containerEventsChan { + if err := s.Send(&event); err != nil { + return err + } + } + return nil } diff --git a/pkg/cri/server/container_remove.go b/pkg/cri/server/container_remove.go index 0a5b03d5f..dbd19984a 100644 --- a/pkg/cri/server/container_remove.go +++ b/pkg/cri/server/container_remove.go @@ -118,6 +118,8 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta c.containerNameIndex.ReleaseByKey(id) + c.generateAndSendContainerEvent(ctx, id, container.SandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) + containerRemoveTimer.WithValues(i.Runtime.Name).UpdateSince(start) return &runtime.RemoveContainerResponse{}, nil diff --git a/pkg/cri/server/container_start.go b/pkg/cri/server/container_start.go index 885cc72ef..1aa541760 100644 --- a/pkg/cri/server/container_start.go +++ b/pkg/cri/server/container_start.go @@ -180,6 +180,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain // It handles the TaskExit event and update container state after this. c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh) + c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) + if c.nri.isEnabled() { err = c.nri.postStartContainer(ctx, &sandbox, &cntr) if err != nil { diff --git a/pkg/cri/server/container_stop.go b/pkg/cri/server/container_stop.go index 8a1a048d8..cbb52e8a8 100644 --- a/pkg/cri/server/container_stop.go +++ b/pkg/cri/server/container_stop.go @@ -72,6 +72,7 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer // stopContainer stops a container based on the container metadata. func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error { id := container.ID + sandboxID := container.SandboxID // Return without error if container is not running. This makes sure that // stop only takes real action after the container is started. @@ -90,7 +91,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore } // Don't return for unknown state, some cleanup needs to be done. if state == runtime.ContainerState_CONTAINER_UNKNOWN { - return c.cleanupUnknownContainer(ctx, id, container) + return c.cleanupUnknownContainer(ctx, id, container, sandboxID) } return nil } @@ -105,7 +106,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to wait for task for %q: %w", id, err) } - return c.cleanupUnknownContainer(ctx, id, container) + return c.cleanupUnknownContainer(ctx, id, container, sandboxID) } exitCtx, exitCancel := context.WithCancel(context.Background()) @@ -208,7 +209,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers } // cleanupUnknownContainer cleanup stopped container in unknown state. -func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error { +func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string) error { // Reuse handleContainerExit to do the cleanup. return handleContainerExit(ctx, &eventtypes.TaskExit{ ContainerID: id, @@ -216,5 +217,5 @@ func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cnt Pid: 0, ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now()), - }, cntr, c) + }, cntr, sandboxID, c) } diff --git a/pkg/cri/server/events.go b/pkg/cri/server/events.go index 1ac23a0b9..7e7f43059 100644 --- a/pkg/cri/server/events.go +++ b/pkg/cri/server/events.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/protobuf" "github.com/containerd/typeurl" "github.com/sirupsen/logrus" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/utils/clock" ) @@ -187,7 +188,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string cntr, err := em.c.containerStore.Get(e.ID) if err == nil { - if err := handleContainerExit(dctx, e, cntr, em.c); err != nil { + if err := handleContainerExit(dctx, e, cntr, cntr.SandboxID, em.c); err != nil { return err } return nil @@ -313,7 +314,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { // Use ID instead of ContainerID to rule out TaskExit event for exec. cntr, err := em.c.containerStore.Get(e.ID) if err == nil { - if err := handleContainerExit(ctx, e, cntr, em.c); err != nil { + if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil { return fmt.Errorf("failed to handle container TaskExit event: %w", err) } return nil @@ -362,7 +363,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { } // handleContainerExit handles TaskExit event for container. -func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, c *criService) error { +func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error { // Attach container IO so that `Delete` could cleanup the stream properly. task, err := cntr.Container.Task(ctx, func(*containerdio.FIFOSet) (containerdio.IO, error) { @@ -411,6 +412,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta } // Using channel to propagate the information of container stop cntr.Stop() + c.generateAndSendContainerEvent(ctx, cntr.ID, sandboxID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) return nil } @@ -441,6 +443,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst } // Using channel to propagate the information of sandbox stop sb.Stop() + c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) return nil } diff --git a/pkg/cri/server/helpers.go b/pkg/cri/server/helpers.go index 9edf8ae45..1cd49f57e 100644 --- a/pkg/cri/server/helpers.go +++ b/pkg/cri/server/helpers.go @@ -23,6 +23,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/containers" @@ -517,3 +518,67 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status) } return status } + +func (c *criService) generateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) { + podSandboxStatus, err := c.getPodSandboxStatus(ctx, sandboxID) + if err != nil { + // TODO(https://github.com/containerd/containerd/issues/7785): + // Do not skip events with nil PodSandboxStatus. + logrus.Errorf("Failed to get podSandbox status for container event for sandboxID %q: %v. Skipping sending the event.", sandboxID, err) + return + } + containerStatuses, err := c.getContainerStatuses(ctx, sandboxID) + if err != nil { + logrus.Errorf("Failed to get container statuses for container event for sandboxID %q: %v", sandboxID, err) + } + + event := runtime.ContainerEventResponse{ + ContainerId: containerID, + ContainerEventType: eventType, + CreatedAt: time.Now().UnixNano(), + PodSandboxStatus: podSandboxStatus, + ContainersStatuses: containerStatuses, + } + + // TODO(ruiwen-zhao): write events to a cache, storage, or increase the size of the channel + select { + case c.containerEventsChan <- event: + default: + logrus.Debugf("containerEventsChan is full, discarding event %+v", event) + } +} + +func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) { + request := &runtime.PodSandboxStatusRequest{PodSandboxId: podSandboxID} + response, err := c.PodSandboxStatus(ctx, request) + if err != nil { + return nil, err + } + return response.GetStatus(), nil +} + +func (c *criService) getContainerStatuses(ctx context.Context, podSandboxID string) ([]*runtime.ContainerStatus, error) { + response, err := c.ListContainers(ctx, &runtime.ListContainersRequest{ + Filter: &runtime.ContainerFilter{ + PodSandboxId: podSandboxID, + }, + }) + if err != nil { + return nil, err + } + containerStatuses := []*runtime.ContainerStatus{} + for _, container := range response.Containers { + statusResp, err := c.ContainerStatus(ctx, &runtime.ContainerStatusRequest{ + ContainerId: container.Id, + Verbose: false, + }) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + containerStatuses = append(containerStatuses, statusResp.GetStatus()) + } + return containerStatuses, nil +} diff --git a/pkg/cri/server/sandbox_remove.go b/pkg/cri/server/sandbox_remove.go index 0461e9829..dab022b30 100644 --- a/pkg/cri/server/sandbox_remove.go +++ b/pkg/cri/server/sandbox_remove.go @@ -118,6 +118,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS // Release the sandbox name reserved for the sandbox. c.sandboxNameIndex.ReleaseByKey(id) + // Send CONTAINER_DELETED event with both ContainerId and SandboxId equal to SandboxId. + c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) + sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start) return &runtime.RemovePodSandboxResponse{}, nil diff --git a/pkg/cri/server/sandbox_run.go b/pkg/cri/server/sandbox_run.go index 929cf1b25..72e28a06d 100644 --- a/pkg/cri/server/sandbox_run.go +++ b/pkg/cri/server/sandbox_run.go @@ -390,6 +390,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err) } + // Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId. + // Note that this has to be done after sandboxStore.Add() because we need to get + // SandboxStatus from the store and include it in the event. + c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) + // start the monitor after adding sandbox into the store, this ensures // that sandbox is in the store, when event monitor receives the TaskExit event. // @@ -397,6 +402,9 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox // but we don't care about sandbox TaskOOM right now, so it is fine. c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh) + // Send CONTAINER_STARTED event with both ContainerId and SandboxId equal to SandboxId. + c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) + sandboxRuntimeCreateTimer.WithValues(ociRuntime.Type).UpdateSince(runtimeStart) return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index c4da206c8..843b64230 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -122,6 +122,9 @@ type criService struct { unpackDuplicationSuppressor kmutex.KeyedLocker nri *nriAPI + // containerEventsChan is used to capture container events and send them + // to the caller of GetContainerEvents. + containerEventsChan chan runtime.ContainerEventResponse } // NewCRIService returns a new instance of CRIService @@ -143,6 +146,9 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri. unpackDuplicationSuppressor: kmutex.New(), } + // TODO: figure out a proper channel size. + c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000) + if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil { return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter) }