Merge pull request #7073 from ruiwen-zhao/event

Add container event support to containerd
This commit is contained in:
Fu Wei
2022-12-09 15:24:23 +08:00
committed by GitHub
23 changed files with 392 additions and 23 deletions

View File

@@ -296,6 +296,7 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
if err := c.containerStore.Add(container); err != nil {
return nil, fmt.Errorf("failed to add container %q into store: %w", id, err)
}
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
if c.nri.isEnabled() {
err = c.nri.postCreateContainer(ctx, &sandbox, &container)

View File

@@ -17,11 +17,17 @@
package server
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error {
return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented")
// TODO (https://github.com/containerd/containerd/issues/7318):
// replace with a real implementation that broadcasts containerEventsChan
// to all subscribers.
for event := range c.containerEventsChan {
if err := s.Send(&event); err != nil {
return err
}
}
return nil
}

View File

@@ -118,6 +118,8 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
c.containerNameIndex.ReleaseByKey(id)
c.generateAndSendContainerEvent(ctx, id, container.SandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
containerRemoveTimer.WithValues(i.Runtime.Name).UpdateSince(start)
return &runtime.RemoveContainerResponse{}, nil

View File

@@ -180,6 +180,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
// It handles the TaskExit event and update container state after this.
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
if c.nri.isEnabled() {
err = c.nri.postStartContainer(ctx, &sandbox, &cntr)
if err != nil {

View File

@@ -72,6 +72,7 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
// stopContainer stops a container based on the container metadata.
func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
id := container.ID
sandboxID := container.SandboxID
// Return without error if container is not running. This makes sure that
// stop only takes real action after the container is started.
@@ -90,7 +91,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
}
// Don't return for unknown state, some cleanup needs to be done.
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
return c.cleanupUnknownContainer(ctx, id, container)
return c.cleanupUnknownContainer(ctx, id, container, sandboxID)
}
return nil
}
@@ -105,7 +106,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
}
return c.cleanupUnknownContainer(ctx, id, container)
return c.cleanupUnknownContainer(ctx, id, container, sandboxID)
}
exitCtx, exitCancel := context.WithCancel(context.Background())
@@ -208,7 +209,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
}
// cleanupUnknownContainer cleanup stopped container in unknown state.
func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string) error {
// Reuse handleContainerExit to do the cleanup.
return handleContainerExit(ctx, &eventtypes.TaskExit{
ContainerID: id,
@@ -216,5 +217,5 @@ func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cnt
Pid: 0,
ExitStatus: unknownExitCode,
ExitedAt: protobuf.ToTimestamp(time.Now()),
}, cntr, c)
}, cntr, sandboxID, c)
}

View File

@@ -35,6 +35,7 @@ import (
"github.com/containerd/containerd/protobuf"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/utils/clock"
)
@@ -187,7 +188,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(dctx, e, cntr, em.c); err != nil {
if err := handleContainerExit(dctx, e, cntr, cntr.SandboxID, em.c); err != nil {
return err
}
return nil
@@ -313,7 +314,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr, em.c); err != nil {
if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil {
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
}
return nil
@@ -362,7 +363,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, c *criService) error {
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error {
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
@@ -411,6 +412,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
}
// Using channel to propagate the information of container stop
cntr.Stop()
c.generateAndSendContainerEvent(ctx, cntr.ID, sandboxID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
return nil
}
@@ -441,6 +443,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
}
// Using channel to propagate the information of sandbox stop
sb.Stop()
c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
return nil
}

View File

@@ -23,6 +23,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
@@ -517,3 +518,67 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status)
}
return status
}
func (c *criService) generateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) {
podSandboxStatus, err := c.getPodSandboxStatus(ctx, sandboxID)
if err != nil {
// TODO(https://github.com/containerd/containerd/issues/7785):
// Do not skip events with nil PodSandboxStatus.
logrus.Errorf("Failed to get podSandbox status for container event for sandboxID %q: %v. Skipping sending the event.", sandboxID, err)
return
}
containerStatuses, err := c.getContainerStatuses(ctx, sandboxID)
if err != nil {
logrus.Errorf("Failed to get container statuses for container event for sandboxID %q: %v", sandboxID, err)
}
event := runtime.ContainerEventResponse{
ContainerId: containerID,
ContainerEventType: eventType,
CreatedAt: time.Now().UnixNano(),
PodSandboxStatus: podSandboxStatus,
ContainersStatuses: containerStatuses,
}
// TODO(ruiwen-zhao): write events to a cache, storage, or increase the size of the channel
select {
case c.containerEventsChan <- event:
default:
logrus.Debugf("containerEventsChan is full, discarding event %+v", event)
}
}
func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) {
request := &runtime.PodSandboxStatusRequest{PodSandboxId: podSandboxID}
response, err := c.PodSandboxStatus(ctx, request)
if err != nil {
return nil, err
}
return response.GetStatus(), nil
}
func (c *criService) getContainerStatuses(ctx context.Context, podSandboxID string) ([]*runtime.ContainerStatus, error) {
response, err := c.ListContainers(ctx, &runtime.ListContainersRequest{
Filter: &runtime.ContainerFilter{
PodSandboxId: podSandboxID,
},
})
if err != nil {
return nil, err
}
containerStatuses := []*runtime.ContainerStatus{}
for _, container := range response.Containers {
statusResp, err := c.ContainerStatus(ctx, &runtime.ContainerStatusRequest{
ContainerId: container.Id,
Verbose: false,
})
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return nil, err
}
containerStatuses = append(containerStatuses, statusResp.GetStatus())
}
return containerStatuses, nil
}

View File

@@ -118,6 +118,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
// Release the sandbox name reserved for the sandbox.
c.sandboxNameIndex.ReleaseByKey(id)
// Send CONTAINER_DELETED event with both ContainerId and SandboxId equal to SandboxId.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start)
return &runtime.RemovePodSandboxResponse{}, nil

View File

@@ -390,6 +390,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)
}
// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId.
// Note that this has to be done after sandboxStore.Add() because we need to get
// SandboxStatus from the store and include it in the event.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
@@ -397,6 +402,9 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
// but we don't care about sandbox TaskOOM right now, so it is fine.
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh)
// Send CONTAINER_STARTED event with both ContainerId and SandboxId equal to SandboxId.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
sandboxRuntimeCreateTimer.WithValues(ociRuntime.Type).UpdateSince(runtimeStart)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil

View File

@@ -122,6 +122,9 @@ type criService struct {
unpackDuplicationSuppressor kmutex.KeyedLocker
nri *nriAPI
// containerEventsChan is used to capture container events and send them
// to the caller of GetContainerEvents.
containerEventsChan chan runtime.ContainerEventResponse
}
// NewCRIService returns a new instance of CRIService
@@ -143,6 +146,9 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.
unpackDuplicationSuppressor: kmutex.New(),
}
// TODO: figure out a proper channel size.
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
}