From 21fe0ceaada01e03c24f5fba1a899c7752961a03 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Wed, 25 Jan 2023 19:28:48 -0800 Subject: [PATCH] Move PLEG events for pause container to podsandbox Signed-off-by: Maksym Pavlenko --- pkg/cri/sbserver/events.go | 36 +++++++++++-------- pkg/cri/sbserver/podsandbox/controller.go | 4 +++ pkg/cri/sbserver/podsandbox/sandbox_delete.go | 7 ++++ pkg/cri/sbserver/podsandbox/sandbox_run.go | 8 +++++ pkg/cri/sbserver/sandbox_remove.go | 3 -- pkg/cri/sbserver/sandbox_run.go | 8 ----- pkg/cri/sbserver/service.go | 7 ++++ 7 files changed, 47 insertions(+), 26 deletions(-) diff --git a/pkg/cri/sbserver/events.go b/pkg/cri/sbserver/events.go index cc6b2edec..b14ee8c51 100644 --- a/pkg/cri/sbserver/events.go +++ b/pkg/cri/sbserver/events.go @@ -418,32 +418,38 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta // handleSandboxExit handles TaskExit event for sandbox. func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error { - // No stream attached to sandbox container. - task, err := sb.Container.Task(ctx, nil) - if err != nil { - if !errdefs.IsNotFound(err) { - return fmt.Errorf("failed to load task for sandbox: %w", err) - } - } else { - // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil { + // TODO: Move pause container cleanup to podsandbox/ package. + if sb.Container != nil { + // No stream attached to sandbox container. + task, err := sb.Container.Task(ctx, nil) + if err != nil { if !errdefs.IsNotFound(err) { - return fmt.Errorf("failed to stop sandbox: %w", err) + return fmt.Errorf("failed to load task for sandbox: %w", err) } - // Move on to make sure container status is updated. + } else { + // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker + if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to stop sandbox: %w", err) + } + // Move on to make sure container status is updated. + } + + c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) } } - err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + + if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { status.State = sandboxstore.StateNotReady status.Pid = 0 return status, nil - }) - if err != nil { + }); err != nil { return fmt.Errorf("failed to update sandbox state: %w", err) } + // Using channel to propagate the information of sandbox stop sb.Stop() - c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) + return nil } diff --git a/pkg/cri/sbserver/podsandbox/controller.go b/pkg/cri/sbserver/podsandbox/controller.go index 9090ef1db..175780b97 100644 --- a/pkg/cri/sbserver/podsandbox/controller.go +++ b/pkg/cri/sbserver/podsandbox/controller.go @@ -46,6 +46,10 @@ type CRIService interface { // TODO: we should implement Event backoff in Controller. BackOffEvent(id string, event interface{}) + + // TODO: refactor event generator for PLEG. + // GenerateAndSendContainerEvent is called by controller for sandbox container events. + GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) } type Controller struct { diff --git a/pkg/cri/sbserver/podsandbox/sandbox_delete.go b/pkg/cri/sbserver/podsandbox/sandbox_delete.go index 4efb38e67..fc1a4102e 100644 --- a/pkg/cri/sbserver/podsandbox/sandbox_delete.go +++ b/pkg/cri/sbserver/podsandbox/sandbox_delete.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + "github.com/containerd/containerd" api "github.com/containerd/containerd/api/services/sandbox/v1" "github.com/containerd/containerd/errdefs" @@ -58,5 +60,10 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) (*api.Contr } } + c.sandboxStore.Delete(sandboxID) + + // Send CONTAINER_DELETED event with ContainerId equal to SandboxId. + c.cri.GenerateAndSendContainerEvent(ctx, sandboxID, sandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) + return &api.ControllerShutdownResponse{}, nil } diff --git a/pkg/cri/sbserver/podsandbox/sandbox_run.go b/pkg/cri/sbserver/podsandbox/sandbox_run.go index 5cfe1d400..055efd598 100644 --- a/pkg/cri/sbserver/podsandbox/sandbox_run.go +++ b/pkg/cri/sbserver/podsandbox/sandbox_run.go @@ -155,6 +155,11 @@ func (c *Controller) Start(ctx context.Context, id string) (resp *api.Controller } }() + // Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId. + // Note that this has to be done after sandboxStore.Add() because we need to get + // SandboxStatus from the store and include it in the event. + c.cri.GenerateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) + // Create sandbox container root directories. sandboxRootDir := c.getSandboxRootDir(id) if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil { @@ -255,6 +260,9 @@ func (c *Controller) Start(ctx context.Context, id string) (resp *api.Controller return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) } + // Send CONTAINER_STARTED event with ContainerId equal to SandboxId. + c.cri.GenerateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) + resp = &api.ControllerStartResponse{ SandboxID: id, Pid: task.Pid(), diff --git a/pkg/cri/sbserver/sandbox_remove.go b/pkg/cri/sbserver/sandbox_remove.go index 41d006fb5..a64efb235 100644 --- a/pkg/cri/sbserver/sandbox_remove.go +++ b/pkg/cri/sbserver/sandbox_remove.go @@ -104,9 +104,6 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS // Release the sandbox name reserved for the sandbox. c.sandboxNameIndex.ReleaseByKey(id) - // Send CONTAINER_DELETED event with ContainerId equal to SandboxId. - c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) - sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start) return &runtime.RemovePodSandboxResponse{}, nil diff --git a/pkg/cri/sbserver/sandbox_run.go b/pkg/cri/sbserver/sandbox_run.go index db848ecab..aef452c1e 100644 --- a/pkg/cri/sbserver/sandbox_run.go +++ b/pkg/cri/sbserver/sandbox_run.go @@ -272,11 +272,6 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err) } - // Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId. - // Note that this has to be done after sandboxStore.Add() because we need to get - // SandboxStatus from the store and include it in the event. - c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) - // start the monitor after adding sandbox into the store, this ensures // that sandbox is in the store, when event monitor receives the TaskExit event. // @@ -300,9 +295,6 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox c.eventMonitor.backOff.enBackOff(id, e) }() - // Send CONTAINER_STARTED event with ContainerId equal to SandboxId. - c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) - sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart) return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil diff --git a/pkg/cri/sbserver/service.go b/pkg/cri/sbserver/service.go index 8dd694d56..f8611210b 100644 --- a/pkg/cri/sbserver/service.go +++ b/pkg/cri/sbserver/service.go @@ -17,6 +17,7 @@ package sbserver import ( + "context" "encoding/json" "fmt" "io" @@ -206,6 +207,12 @@ func (c *criService) BackOffEvent(id string, event interface{}) { c.eventMonitor.backOff.enBackOff(id, event) } +// GenerateAndSendContainerEvent is a temporary workaround to send PLEG events from podsandbopx/ controller +// TODO: refactor PLEG event generator so both podsandbox/ controller and CRI service can access it. +func (c *criService) GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) { + c.generateAndSendContainerEvent(ctx, containerID, sandboxID, eventType) +} + // Register registers all required services onto a specific grpc server. // This is used by containerd cri plugin. func (c *criService) Register(s *grpc.Server) error {