diff --git a/pkg/cri/sbserver/container_create.go b/pkg/cri/sbserver/container_create.go index f1c70a38b..0c793fede 100644 --- a/pkg/cri/sbserver/container_create.go +++ b/pkg/cri/sbserver/container_create.go @@ -283,6 +283,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta return nil, fmt.Errorf("failed to add container %q into store: %w", id, err) } + c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) + containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start) return &runtime.CreateContainerResponse{ContainerId: id}, nil diff --git a/pkg/cri/sbserver/container_events.go b/pkg/cri/sbserver/container_events.go index 4d718372d..563cd26c9 100644 --- a/pkg/cri/sbserver/container_events.go +++ b/pkg/cri/sbserver/container_events.go @@ -17,11 +17,17 @@ package sbserver import ( - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error { - return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented") + // TODO (https://github.com/containerd/containerd/issues/7318): + // replace with a real implementation that broadcasts containerEventsChan + // to all subscribers. + for event := range c.containerEventsChan { + if err := s.Send(&event); err != nil { + return err + } + } + return nil } diff --git a/pkg/cri/sbserver/container_remove.go b/pkg/cri/sbserver/container_remove.go index 563d7cc73..05f6ac213 100644 --- a/pkg/cri/sbserver/container_remove.go +++ b/pkg/cri/sbserver/container_remove.go @@ -106,6 +106,8 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta c.containerNameIndex.ReleaseByKey(id) + c.generateAndSendContainerEvent(ctx, id, container.SandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) + containerRemoveTimer.WithValues(i.Runtime.Name).UpdateSince(start) return &runtime.RemoveContainerResponse{}, nil diff --git a/pkg/cri/sbserver/container_start.go b/pkg/cri/sbserver/container_start.go index 6321524c8..420fa742d 100644 --- a/pkg/cri/sbserver/container_start.go +++ b/pkg/cri/sbserver/container_start.go @@ -177,6 +177,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain // It handles the TaskExit event and update container state after this. c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh) + c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) + containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start) return &runtime.StartContainerResponse{}, nil diff --git a/pkg/cri/sbserver/container_stop.go b/pkg/cri/sbserver/container_stop.go index bffbfb51a..55bc3646e 100644 --- a/pkg/cri/sbserver/container_stop.go +++ b/pkg/cri/sbserver/container_stop.go @@ -60,6 +60,7 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer // stopContainer stops a container based on the container metadata. func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error { id := container.ID + sandboxID := container.SandboxID // Return without error if container is not running. This makes sure that // stop only takes real action after the container is started. @@ -78,7 +79,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore } // Don't return for unknown state, some cleanup needs to be done. if state == runtime.ContainerState_CONTAINER_UNKNOWN { - return cleanupUnknownContainer(ctx, id, container) + return cleanupUnknownContainer(ctx, id, container, sandboxID, c) } return nil } @@ -93,7 +94,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to wait for task for %q: %w", id, err) } - return cleanupUnknownContainer(ctx, id, container) + return cleanupUnknownContainer(ctx, id, container, sandboxID, c) } exitCtx, exitCancel := context.WithCancel(context.Background()) @@ -196,7 +197,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers } // cleanupUnknownContainer cleanup stopped container in unknown state. -func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error { +func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string, c *criService) error { // Reuse handleContainerExit to do the cleanup. return handleContainerExit(ctx, &eventtypes.TaskExit{ ContainerID: id, @@ -204,5 +205,5 @@ func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore Pid: 0, ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now()), - }, cntr) + }, cntr, sandboxID, c) } diff --git a/pkg/cri/sbserver/events.go b/pkg/cri/sbserver/events.go index f0782df3c..cc6b2edec 100644 --- a/pkg/cri/sbserver/events.go +++ b/pkg/cri/sbserver/events.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/protobuf" "github.com/containerd/typeurl" "github.com/sirupsen/logrus" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/utils/clock" ) @@ -136,7 +137,7 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, sb, err := em.c.sandboxStore.Get(e.ID) if err == nil { - if err := handleSandboxExit(dctx, e, sb); err != nil { + if err := handleSandboxExit(dctx, e, sb, em.c); err != nil { return err } return nil @@ -187,7 +188,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string cntr, err := em.c.containerStore.Get(e.ID) if err == nil { - if err := handleContainerExit(dctx, e, cntr); err != nil { + if err := handleContainerExit(dctx, e, cntr, cntr.SandboxID, em.c); err != nil { return err } return nil @@ -313,7 +314,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { // Use ID instead of ContainerID to rule out TaskExit event for exec. cntr, err := em.c.containerStore.Get(e.ID) if err == nil { - if err := handleContainerExit(ctx, e, cntr); err != nil { + if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil { return fmt.Errorf("failed to handle container TaskExit event: %w", err) } return nil @@ -322,7 +323,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { } sb, err := em.c.sandboxStore.Get(e.ID) if err == nil { - if err := handleSandboxExit(ctx, e, sb); err != nil { + if err := handleSandboxExit(ctx, e, sb, em.c); err != nil { return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) } return nil @@ -362,7 +363,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { } // handleContainerExit handles TaskExit event for container. -func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error { +func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error { // Attach container IO so that `Delete` could cleanup the stream properly. task, err := cntr.Container.Task(ctx, func(*containerdio.FIFOSet) (containerdio.IO, error) { @@ -411,11 +412,12 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta } // Using channel to propagate the information of container stop cntr.Stop() + c.generateAndSendContainerEvent(ctx, cntr.ID, sandboxID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) return nil } // handleSandboxExit handles TaskExit event for sandbox. -func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error { +func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error { // No stream attached to sandbox container. task, err := sb.Container.Task(ctx, nil) if err != nil { @@ -441,6 +443,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst } // Using channel to propagate the information of sandbox stop sb.Stop() + c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) return nil } diff --git a/pkg/cri/sbserver/helpers.go b/pkg/cri/sbserver/helpers.go index ca3b4aa71..594eefd34 100644 --- a/pkg/cri/sbserver/helpers.go +++ b/pkg/cri/sbserver/helpers.go @@ -23,6 +23,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/containerd/typeurl" runtimespec "github.com/opencontainers/runtime-spec/specs-go" @@ -508,3 +509,67 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status) } return status } + +func (c *criService) generateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) { + podSandboxStatus, err := c.getPodSandboxStatus(ctx, sandboxID) + if err != nil { + // TODO(https://github.com/containerd/containerd/issues/7785): + // Do not skip events with nil PodSandboxStatus. + logrus.Errorf("Failed to get podSandbox status for container event for sandboxID %q: %v. Skipping sending the event.", sandboxID, err) + return + } + containerStatuses, err := c.getContainerStatuses(ctx, sandboxID) + if err != nil { + logrus.Errorf("Failed to get container statuses for container event for sandboxID %q: %v", sandboxID, err) + } + + event := runtime.ContainerEventResponse{ + ContainerId: containerID, + ContainerEventType: eventType, + CreatedAt: time.Now().UnixNano(), + PodSandboxStatus: podSandboxStatus, + ContainersStatuses: containerStatuses, + } + + // TODO(ruiwen-zhao): write events to a cache, storage, or increase the size of the channel + select { + case c.containerEventsChan <- event: + default: + logrus.Debugf("containerEventsChan is full, discarding event %+v", event) + } +} + +func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) { + request := &runtime.PodSandboxStatusRequest{PodSandboxId: podSandboxID} + response, err := c.PodSandboxStatus(ctx, request) + if err != nil { + return nil, err + } + return response.GetStatus(), nil +} + +func (c *criService) getContainerStatuses(ctx context.Context, podSandboxID string) ([]*runtime.ContainerStatus, error) { + response, err := c.ListContainers(ctx, &runtime.ListContainersRequest{ + Filter: &runtime.ContainerFilter{ + PodSandboxId: podSandboxID, + }, + }) + if err != nil { + return nil, err + } + containerStatuses := []*runtime.ContainerStatus{} + for _, container := range response.Containers { + statusResp, err := c.ContainerStatus(ctx, &runtime.ContainerStatusRequest{ + ContainerId: container.Id, + Verbose: false, + }) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + containerStatuses = append(containerStatuses, statusResp.GetStatus()) + } + return containerStatuses, nil +} diff --git a/pkg/cri/sbserver/sandbox_remove.go b/pkg/cri/sbserver/sandbox_remove.go index 78c884f14..55910a3a9 100644 --- a/pkg/cri/sbserver/sandbox_remove.go +++ b/pkg/cri/sbserver/sandbox_remove.go @@ -103,6 +103,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS // Release the sandbox name reserved for the sandbox. c.sandboxNameIndex.ReleaseByKey(id) + // Send CONTAINER_DELETED event with ContainerId equal to SandboxId. + c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) + sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start) return &runtime.RemovePodSandboxResponse{}, nil diff --git a/pkg/cri/sbserver/sandbox_run.go b/pkg/cri/sbserver/sandbox_run.go index 90642f2cb..32419b417 100644 --- a/pkg/cri/sbserver/sandbox_run.go +++ b/pkg/cri/sbserver/sandbox_run.go @@ -286,6 +286,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err) } + // Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId. + // Note that this has to be done after sandboxStore.Add() because we need to get + // SandboxStatus from the store and include it in the event. + c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) + // start the monitor after adding sandbox into the store, this ensures // that sandbox is in the store, when event monitor receives the TaskExit event. // @@ -306,6 +311,9 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox } }() + // Send CONTAINER_STARTED event with ContainerId equal to SandboxId. + c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) + sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart) return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil diff --git a/pkg/cri/sbserver/service.go b/pkg/cri/sbserver/service.go index 78880d822..e19baec9a 100644 --- a/pkg/cri/sbserver/service.go +++ b/pkg/cri/sbserver/service.go @@ -123,6 +123,9 @@ type criService struct { // one in-flight fetch request or unpack handler for a given descriptor's // or chain ID. unpackDuplicationSuppressor kmutex.KeyedLocker + // containerEventsChan is used to capture container events and send them + // to the caller of GetContainerEvents. + containerEventsChan chan runtime.ContainerEventResponse } // NewCRIService returns a new instance of CRIService @@ -145,6 +148,9 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi sandboxControllers: make(map[criconfig.SandboxControllerMode]sandbox.Controller), } + // TODO: figure out a proper channel size. + c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000) + if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil { return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter) } diff --git a/pkg/cri/server/container_events.go b/pkg/cri/server/container_events.go index ee34633ea..83e322cd9 100644 --- a/pkg/cri/server/container_events.go +++ b/pkg/cri/server/container_events.go @@ -24,8 +24,6 @@ func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.R // TODO (https://github.com/containerd/containerd/issues/7318): // replace with a real implementation that broadcasts containerEventsChan // to all subscribers. - // TODO (https://github.com/containerd/containerd/issues/7658): Add Sandbox - // server support. for event := range c.containerEventsChan { if err := s.Send(&event); err != nil { return err