From 79cdbf61b6f7e4be2feb1bb2d631bdb1b9c5cd7f Mon Sep 17 00:00:00 2001 From: Krisztian Litkey Date: Fri, 31 Jan 2025 20:15:12 +0200 Subject: [PATCH] cri,nri: block NRI plugin sync. during event processing. Block the synchronization of registering NRI plugins during CRI events to avoid the plugin ending up in an inconsistent starting state after initial sync (missing pods, containers or missed events for some pods or containers). Signed-off-by: Krisztian Litkey --- internal/cri/nri/nri_api_linux.go | 9 +++++++++ internal/cri/nri/nri_api_other.go | 8 ++++++++ internal/cri/server/container_create.go | 2 ++ internal/cri/server/container_remove.go | 3 +++ internal/cri/server/container_start.go | 2 ++ internal/cri/server/container_stop.go | 3 +++ internal/cri/server/container_update_resources.go | 2 ++ internal/cri/server/sandbox_remove.go | 3 +++ internal/cri/server/sandbox_run.go | 2 ++ internal/cri/server/sandbox_stop.go | 3 +++ internal/nri/nri.go | 12 ++++++++++++ 11 files changed, 49 insertions(+) diff --git a/internal/cri/nri/nri_api_linux.go b/internal/cri/nri/nri_api_linux.go index 0adcd9d11..baf77daca 100644 --- a/internal/cri/nri/nri_api_linux.go +++ b/internal/cri/nri/nri_api_linux.go @@ -358,6 +358,15 @@ func (a *API) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDele } } +type PluginSyncBlock = nri.PluginSyncBlock + +func (a *API) BlockPluginSync() *PluginSyncBlock { + if a.IsDisabled() { + return nil + } + return a.nri.BlockPluginSync() +} + // // NRI-CRI 'domain' interface // diff --git a/internal/cri/nri/nri_api_other.go b/internal/cri/nri/nri_api_other.go index 14bd7ddd5..808a36048 100644 --- a/internal/cri/nri/nri_api_other.go +++ b/internal/cri/nri/nri_api_other.go @@ -108,6 +108,14 @@ func (*API) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts { } } +type PluginSyncBlock struct{} + +func (*API) BlockPluginSync() *PluginSyncBlock { + return nil +} + +func (*PluginSyncBlock) Unblock() {} + // // NRI-CRI no-op 'domain' interface // diff --git a/internal/cri/server/container_create.go b/internal/cri/server/container_create.go index e7e56f163..f27a92e88 100644 --- a/internal/cri/server/container_create.go +++ b/internal/cri/server/container_create.go @@ -313,6 +313,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta } }() + defer c.nri.BlockPluginSync().Unblock() + var cntr containerd.Container if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil { return nil, fmt.Errorf("failed to create containerd container: %w", err) diff --git a/internal/cri/server/container_remove.go b/internal/cri/server/container_remove.go index a666456e3..76cfaf6ea 100644 --- a/internal/cri/server/container_remove.go +++ b/internal/cri/server/container_remove.go @@ -44,6 +44,9 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta log.G(ctx).Tracef("RemoveContainer called for container %q that does not exist", ctrID) return &runtime.RemoveContainerResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + id := container.ID span.SetAttributes(tracing.Attribute("container.id", id)) i, err := container.Container.Info(ctx) diff --git a/internal/cri/server/container_start.go b/internal/cri/server/container_start.go index 36e4d7246..b69d7cc7d 100644 --- a/internal/cri/server/container_start.go +++ b/internal/cri/server/container_start.go @@ -156,6 +156,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain return nil, fmt.Errorf("failed to wait for containerd task: %w", err) } + defer c.nri.BlockPluginSync().Unblock() + defer func() { if retErr != nil { deferCtx, deferCancel := ctrdutil.DeferContext() diff --git a/internal/cri/server/container_stop.go b/internal/cri/server/container_stop.go index 736911fbd..408bf9f10 100644 --- a/internal/cri/server/container_stop.go +++ b/internal/cri/server/container_stop.go @@ -51,6 +51,9 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer // https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68 return &runtime.StopContainerResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + span.SetAttributes(tracing.Attribute("container.id", container.ID)) if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil { return nil, err diff --git a/internal/cri/server/container_update_resources.go b/internal/cri/server/container_update_resources.go index 78849321e..c472ccbd8 100644 --- a/internal/cri/server/container_update_resources.go +++ b/internal/cri/server/container_update_resources.go @@ -47,6 +47,8 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up return nil, err } + defer c.nri.BlockPluginSync().Unblock() + resources := r.GetLinux() updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources) if err != nil { diff --git a/internal/cri/server/sandbox_remove.go b/internal/cri/server/sandbox_remove.go index f7e1ed880..e7c0e0654 100644 --- a/internal/cri/server/sandbox_remove.go +++ b/internal/cri/server/sandbox_remove.go @@ -44,6 +44,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS r.GetPodSandboxId()) return &runtime.RemovePodSandboxResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + // Use the full sandbox id. id := sandbox.ID span.SetAttributes(tracing.Attribute("sandbox.id", id)) diff --git a/internal/cri/server/sandbox_run.go b/internal/cri/server/sandbox_run.go index e895d66b3..8ed15b2c9 100644 --- a/internal/cri/server/sandbox_run.go +++ b/internal/cri/server/sandbox_run.go @@ -300,6 +300,8 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox sandbox.ProcessLabel = labels["selinux_label"] + defer c.nri.BlockPluginSync().Unblock() + err = c.nri.RunPodSandbox(ctx, &sandbox) if err != nil { return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err) diff --git a/internal/cri/server/sandbox_stop.go b/internal/cri/server/sandbox_stop.go index 95f5780a0..a27beec93 100644 --- a/internal/cri/server/sandbox_stop.go +++ b/internal/cri/server/sandbox_stop.go @@ -46,6 +46,9 @@ func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb // https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L45-L46 return &runtime.StopPodSandboxResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + span.SetAttributes(tracing.Attribute("sandbox.id", sandbox.ID)) if err := c.stopPodSandbox(ctx, sandbox); err != nil { return nil, err diff --git a/internal/nri/nri.go b/internal/nri/nri.go index a3922bcf3..14fc107dc 100644 --- a/internal/nri/nri.go +++ b/internal/nri/nri.go @@ -81,6 +81,9 @@ type API interface { // RemoveContainer relays container removal events to NRI. RemoveContainer(context.Context, PodSandbox, Container) error + + // BlockPluginSync blocks plugin synchronization until it is Unblock()ed. + BlockPluginSync() *PluginSyncBlock } type State int @@ -435,6 +438,15 @@ func (l *local) RemoveContainer(ctx context.Context, pod PodSandbox, ctr Contain return err } +type PluginSyncBlock = nri.PluginSyncBlock + +func (l *local) BlockPluginSync() *PluginSyncBlock { + if !l.IsEnabled() { + return nil + } + return l.nri.BlockPluginSync() +} + func (l *local) syncPlugin(ctx context.Context, syncFn nri.SyncCB) error { l.Lock() defer l.Unlock()