Move PLEG events for pause container to podsandbox
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
parent
93a2823811
commit
21fe0ceaad
@ -418,32 +418,38 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
|
||||
|
||||
// handleSandboxExit handles TaskExit event for sandbox.
|
||||
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error {
|
||||
// No stream attached to sandbox container.
|
||||
task, err := sb.Container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to load task for sandbox: %w", err)
|
||||
}
|
||||
} else {
|
||||
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
|
||||
if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil {
|
||||
// TODO: Move pause container cleanup to podsandbox/ package.
|
||||
if sb.Container != nil {
|
||||
// No stream attached to sandbox container.
|
||||
task, err := sb.Container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to stop sandbox: %w", err)
|
||||
return fmt.Errorf("failed to load task for sandbox: %w", err)
|
||||
}
|
||||
// Move on to make sure container status is updated.
|
||||
} else {
|
||||
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
|
||||
if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to stop sandbox: %w", err)
|
||||
}
|
||||
// Move on to make sure container status is updated.
|
||||
}
|
||||
|
||||
c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
|
||||
}
|
||||
}
|
||||
err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
|
||||
|
||||
if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
|
||||
status.State = sandboxstore.StateNotReady
|
||||
status.Pid = 0
|
||||
return status, nil
|
||||
})
|
||||
if err != nil {
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to update sandbox state: %w", err)
|
||||
}
|
||||
|
||||
// Using channel to propagate the information of sandbox stop
|
||||
sb.Stop()
|
||||
c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,10 @@ type CRIService interface {
|
||||
|
||||
// TODO: we should implement Event backoff in Controller.
|
||||
BackOffEvent(id string, event interface{})
|
||||
|
||||
// TODO: refactor event generator for PLEG.
|
||||
// GenerateAndSendContainerEvent is called by controller for sandbox container events.
|
||||
GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType)
|
||||
}
|
||||
|
||||
type Controller struct {
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
api "github.com/containerd/containerd/api/services/sandbox/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@ -58,5 +60,10 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) (*api.Contr
|
||||
}
|
||||
}
|
||||
|
||||
c.sandboxStore.Delete(sandboxID)
|
||||
|
||||
// Send CONTAINER_DELETED event with ContainerId equal to SandboxId.
|
||||
c.cri.GenerateAndSendContainerEvent(ctx, sandboxID, sandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
|
||||
|
||||
return &api.ControllerShutdownResponse{}, nil
|
||||
}
|
||||
|
@ -155,6 +155,11 @@ func (c *Controller) Start(ctx context.Context, id string) (resp *api.Controller
|
||||
}
|
||||
}()
|
||||
|
||||
// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId.
|
||||
// Note that this has to be done after sandboxStore.Add() because we need to get
|
||||
// SandboxStatus from the store and include it in the event.
|
||||
c.cri.GenerateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
|
||||
|
||||
// Create sandbox container root directories.
|
||||
sandboxRootDir := c.getSandboxRootDir(id)
|
||||
if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil {
|
||||
@ -255,6 +260,9 @@ func (c *Controller) Start(ctx context.Context, id string) (resp *api.Controller
|
||||
return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
|
||||
}
|
||||
|
||||
// Send CONTAINER_STARTED event with ContainerId equal to SandboxId.
|
||||
c.cri.GenerateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
|
||||
|
||||
resp = &api.ControllerStartResponse{
|
||||
SandboxID: id,
|
||||
Pid: task.Pid(),
|
||||
|
@ -104,9 +104,6 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
|
||||
// Release the sandbox name reserved for the sandbox.
|
||||
c.sandboxNameIndex.ReleaseByKey(id)
|
||||
|
||||
// Send CONTAINER_DELETED event with ContainerId equal to SandboxId.
|
||||
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
|
||||
|
||||
sandboxRemoveTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(start)
|
||||
|
||||
return &runtime.RemovePodSandboxResponse{}, nil
|
||||
|
@ -272,11 +272,6 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)
|
||||
}
|
||||
|
||||
// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId.
|
||||
// Note that this has to be done after sandboxStore.Add() because we need to get
|
||||
// SandboxStatus from the store and include it in the event.
|
||||
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
|
||||
|
||||
// start the monitor after adding sandbox into the store, this ensures
|
||||
// that sandbox is in the store, when event monitor receives the TaskExit event.
|
||||
//
|
||||
@ -300,9 +295,6 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
c.eventMonitor.backOff.enBackOff(id, e)
|
||||
}()
|
||||
|
||||
// Send CONTAINER_STARTED event with ContainerId equal to SandboxId.
|
||||
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
|
||||
|
||||
sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart)
|
||||
|
||||
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
|
||||
|
@ -17,6 +17,7 @@
|
||||
package sbserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -206,6 +207,12 @@ func (c *criService) BackOffEvent(id string, event interface{}) {
|
||||
c.eventMonitor.backOff.enBackOff(id, event)
|
||||
}
|
||||
|
||||
// GenerateAndSendContainerEvent is a temporary workaround to send PLEG events from podsandbopx/ controller
|
||||
// TODO: refactor PLEG event generator so both podsandbox/ controller and CRI service can access it.
|
||||
func (c *criService) GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) {
|
||||
c.generateAndSendContainerEvent(ctx, containerID, sandboxID, eventType)
|
||||
}
|
||||
|
||||
// Register registers all required services onto a specific grpc server.
|
||||
// This is used by containerd cri plugin.
|
||||
func (c *criService) Register(s *grpc.Server) error {
|
||||
|
Loading…
Reference in New Issue
Block a user