Add Evented PLEG support to sandbox server
Signed-off-by: ruiwen-zhao <ruiwen@google.com>
This commit is contained in:
parent
a338abc902
commit
a6929f9f6b
@ -283,6 +283,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
|
||||
return nil, fmt.Errorf("failed to add container %q into store: %w", id, err)
|
||||
}
|
||||
|
||||
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
|
||||
|
||||
containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start)
|
||||
|
||||
return &runtime.CreateContainerResponse{ContainerId: id}, nil
|
||||
|
@ -17,11 +17,17 @@
|
||||
package sbserver
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
)
|
||||
|
||||
func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented")
|
||||
// TODO (https://github.com/containerd/containerd/issues/7318):
|
||||
// replace with a real implementation that broadcasts containerEventsChan
|
||||
// to all subscribers.
|
||||
for event := range c.containerEventsChan {
|
||||
if err := s.Send(&event); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -106,6 +106,8 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
|
||||
|
||||
c.containerNameIndex.ReleaseByKey(id)
|
||||
|
||||
c.generateAndSendContainerEvent(ctx, id, container.SandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
|
||||
|
||||
containerRemoveTimer.WithValues(i.Runtime.Name).UpdateSince(start)
|
||||
|
||||
return &runtime.RemoveContainerResponse{}, nil
|
||||
|
@ -177,6 +177,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
|
||||
// It handles the TaskExit event and update container state after this.
|
||||
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
|
||||
|
||||
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
|
||||
|
||||
containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start)
|
||||
|
||||
return &runtime.StartContainerResponse{}, nil
|
||||
|
@ -60,6 +60,7 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
|
||||
// stopContainer stops a container based on the container metadata.
|
||||
func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
|
||||
id := container.ID
|
||||
sandboxID := container.SandboxID
|
||||
|
||||
// Return without error if container is not running. This makes sure that
|
||||
// stop only takes real action after the container is started.
|
||||
@ -78,7 +79,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
|
||||
}
|
||||
// Don't return for unknown state, some cleanup needs to be done.
|
||||
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
|
||||
return cleanupUnknownContainer(ctx, id, container)
|
||||
return cleanupUnknownContainer(ctx, id, container, sandboxID, c)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -93,7 +94,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
|
||||
}
|
||||
return cleanupUnknownContainer(ctx, id, container)
|
||||
return cleanupUnknownContainer(ctx, id, container, sandboxID, c)
|
||||
}
|
||||
|
||||
exitCtx, exitCancel := context.WithCancel(context.Background())
|
||||
@ -196,7 +197,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
|
||||
}
|
||||
|
||||
// cleanupUnknownContainer cleanup stopped container in unknown state.
|
||||
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
|
||||
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string, c *criService) error {
|
||||
// Reuse handleContainerExit to do the cleanup.
|
||||
return handleContainerExit(ctx, &eventtypes.TaskExit{
|
||||
ContainerID: id,
|
||||
@ -204,5 +205,5 @@ func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore
|
||||
Pid: 0,
|
||||
ExitStatus: unknownExitCode,
|
||||
ExitedAt: protobuf.ToTimestamp(time.Now()),
|
||||
}, cntr)
|
||||
}, cntr, sandboxID, c)
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"github.com/containerd/containerd/protobuf"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/sirupsen/logrus"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
@ -136,7 +137,7 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
|
||||
|
||||
sb, err := em.c.sandboxStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleSandboxExit(dctx, e, sb); err != nil {
|
||||
if err := handleSandboxExit(dctx, e, sb, em.c); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -187,7 +188,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
|
||||
|
||||
cntr, err := em.c.containerStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleContainerExit(dctx, e, cntr); err != nil {
|
||||
if err := handleContainerExit(dctx, e, cntr, cntr.SandboxID, em.c); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -313,7 +314,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
// Use ID instead of ContainerID to rule out TaskExit event for exec.
|
||||
cntr, err := em.c.containerStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleContainerExit(ctx, e, cntr); err != nil {
|
||||
if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil {
|
||||
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
|
||||
}
|
||||
return nil
|
||||
@ -322,7 +323,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
}
|
||||
sb, err := em.c.sandboxStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleSandboxExit(ctx, e, sb); err != nil {
|
||||
if err := handleSandboxExit(ctx, e, sb, em.c); err != nil {
|
||||
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
|
||||
}
|
||||
return nil
|
||||
@ -362,7 +363,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
}
|
||||
|
||||
// handleContainerExit handles TaskExit event for container.
|
||||
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error {
|
||||
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error {
|
||||
// Attach container IO so that `Delete` could cleanup the stream properly.
|
||||
task, err := cntr.Container.Task(ctx,
|
||||
func(*containerdio.FIFOSet) (containerdio.IO, error) {
|
||||
@ -411,11 +412,12 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
|
||||
}
|
||||
// Using channel to propagate the information of container stop
|
||||
cntr.Stop()
|
||||
c.generateAndSendContainerEvent(ctx, cntr.ID, sandboxID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleSandboxExit handles TaskExit event for sandbox.
|
||||
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error {
|
||||
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error {
|
||||
// No stream attached to sandbox container.
|
||||
task, err := sb.Container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
@ -441,6 +443,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
|
||||
}
|
||||
// Using channel to propagate the information of sandbox stop
|
||||
sb.Stop()
|
||||
c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/typeurl"
|
||||
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
@ -508,3 +509,67 @@ func copyResourcesToStatus(spec *runtimespec.Spec, status containerstore.Status)
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
func (c *criService) generateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) {
|
||||
podSandboxStatus, err := c.getPodSandboxStatus(ctx, sandboxID)
|
||||
if err != nil {
|
||||
// TODO(https://github.com/containerd/containerd/issues/7785):
|
||||
// Do not skip events with nil PodSandboxStatus.
|
||||
logrus.Errorf("Failed to get podSandbox status for container event for sandboxID %q: %v. Skipping sending the event.", sandboxID, err)
|
||||
return
|
||||
}
|
||||
containerStatuses, err := c.getContainerStatuses(ctx, sandboxID)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to get container statuses for container event for sandboxID %q: %v", sandboxID, err)
|
||||
}
|
||||
|
||||
event := runtime.ContainerEventResponse{
|
||||
ContainerId: containerID,
|
||||
ContainerEventType: eventType,
|
||||
CreatedAt: time.Now().UnixNano(),
|
||||
PodSandboxStatus: podSandboxStatus,
|
||||
ContainersStatuses: containerStatuses,
|
||||
}
|
||||
|
||||
// TODO(ruiwen-zhao): write events to a cache, storage, or increase the size of the channel
|
||||
select {
|
||||
case c.containerEventsChan <- event:
|
||||
default:
|
||||
logrus.Debugf("containerEventsChan is full, discarding event %+v", event)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) {
|
||||
request := &runtime.PodSandboxStatusRequest{PodSandboxId: podSandboxID}
|
||||
response, err := c.PodSandboxStatus(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return response.GetStatus(), nil
|
||||
}
|
||||
|
||||
func (c *criService) getContainerStatuses(ctx context.Context, podSandboxID string) ([]*runtime.ContainerStatus, error) {
|
||||
response, err := c.ListContainers(ctx, &runtime.ListContainersRequest{
|
||||
Filter: &runtime.ContainerFilter{
|
||||
PodSandboxId: podSandboxID,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
containerStatuses := []*runtime.ContainerStatus{}
|
||||
for _, container := range response.Containers {
|
||||
statusResp, err := c.ContainerStatus(ctx, &runtime.ContainerStatusRequest{
|
||||
ContainerId: container.Id,
|
||||
Verbose: false,
|
||||
})
|
||||
if err != nil {
|
||||
if errdefs.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
containerStatuses = append(containerStatuses, statusResp.GetStatus())
|
||||
}
|
||||
return containerStatuses, nil
|
||||
}
|
||||
|
@ -103,6 +103,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
|
||||
// Release the sandbox name reserved for the sandbox.
|
||||
c.sandboxNameIndex.ReleaseByKey(id)
|
||||
|
||||
// Send CONTAINER_DELETED event with ContainerId equal to SandboxId.
|
||||
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
|
||||
|
||||
sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start)
|
||||
|
||||
return &runtime.RemovePodSandboxResponse{}, nil
|
||||
|
@ -286,6 +286,11 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)
|
||||
}
|
||||
|
||||
// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId.
|
||||
// Note that this has to be done after sandboxStore.Add() because we need to get
|
||||
// SandboxStatus from the store and include it in the event.
|
||||
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
|
||||
|
||||
// start the monitor after adding sandbox into the store, this ensures
|
||||
// that sandbox is in the store, when event monitor receives the TaskExit event.
|
||||
//
|
||||
@ -306,6 +311,9 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
}
|
||||
}()
|
||||
|
||||
// Send CONTAINER_STARTED event with ContainerId equal to SandboxId.
|
||||
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
|
||||
|
||||
sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart)
|
||||
|
||||
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
|
||||
|
@ -123,6 +123,9 @@ type criService struct {
|
||||
// one in-flight fetch request or unpack handler for a given descriptor's
|
||||
// or chain ID.
|
||||
unpackDuplicationSuppressor kmutex.KeyedLocker
|
||||
// containerEventsChan is used to capture container events and send them
|
||||
// to the caller of GetContainerEvents.
|
||||
containerEventsChan chan runtime.ContainerEventResponse
|
||||
}
|
||||
|
||||
// NewCRIService returns a new instance of CRIService
|
||||
@ -145,6 +148,9 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
|
||||
sandboxControllers: make(map[criconfig.SandboxControllerMode]sandbox.Controller),
|
||||
}
|
||||
|
||||
// TODO: figure out a proper channel size.
|
||||
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)
|
||||
|
||||
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
|
||||
return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
|
||||
}
|
||||
|
@ -24,8 +24,6 @@ func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.R
|
||||
// TODO (https://github.com/containerd/containerd/issues/7318):
|
||||
// replace with a real implementation that broadcasts containerEventsChan
|
||||
// to all subscribers.
|
||||
// TODO (https://github.com/containerd/containerd/issues/7658): Add Sandbox
|
||||
// server support.
|
||||
for event := range c.containerEventsChan {
|
||||
if err := s.Send(&event); err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user