cri,nri: block NRI plugin sync. during event processing.
Block the synchronization of registering NRI plugins during CRI events to avoid the plugin ending up in an inconsistent starting state after initial sync (missing pods, containers or missed events for some pods or containers). Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
This commit is contained in:
parent
e465b45f9c
commit
79cdbf61b6
@ -358,6 +358,15 @@ func (a *API) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDele
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginSyncBlock = nri.PluginSyncBlock
|
||||||
|
|
||||||
|
func (a *API) BlockPluginSync() *PluginSyncBlock {
|
||||||
|
if a.IsDisabled() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return a.nri.BlockPluginSync()
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// NRI-CRI 'domain' interface
|
// NRI-CRI 'domain' interface
|
||||||
//
|
//
|
||||||
|
@ -108,6 +108,14 @@ func (*API) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginSyncBlock struct{}
|
||||||
|
|
||||||
|
func (*API) BlockPluginSync() *PluginSyncBlock {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*PluginSyncBlock) Unblock() {}
|
||||||
|
|
||||||
//
|
//
|
||||||
// NRI-CRI no-op 'domain' interface
|
// NRI-CRI no-op 'domain' interface
|
||||||
//
|
//
|
||||||
|
@ -313,6 +313,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
var cntr containerd.Container
|
var cntr containerd.Container
|
||||||
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
|
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
|
||||||
return nil, fmt.Errorf("failed to create containerd container: %w", err)
|
return nil, fmt.Errorf("failed to create containerd container: %w", err)
|
||||||
|
@ -44,6 +44,9 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
|
|||||||
log.G(ctx).Tracef("RemoveContainer called for container %q that does not exist", ctrID)
|
log.G(ctx).Tracef("RemoveContainer called for container %q that does not exist", ctrID)
|
||||||
return &runtime.RemoveContainerResponse{}, nil
|
return &runtime.RemoveContainerResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
id := container.ID
|
id := container.ID
|
||||||
span.SetAttributes(tracing.Attribute("container.id", id))
|
span.SetAttributes(tracing.Attribute("container.id", id))
|
||||||
i, err := container.Container.Info(ctx)
|
i, err := container.Container.Info(ctx)
|
||||||
|
@ -156,6 +156,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
|
|||||||
return nil, fmt.Errorf("failed to wait for containerd task: %w", err)
|
return nil, fmt.Errorf("failed to wait for containerd task: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if retErr != nil {
|
if retErr != nil {
|
||||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||||
|
@ -51,6 +51,9 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
|
|||||||
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68
|
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68
|
||||||
return &runtime.StopContainerResponse{}, nil
|
return &runtime.StopContainerResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
span.SetAttributes(tracing.Attribute("container.id", container.ID))
|
span.SetAttributes(tracing.Attribute("container.id", container.ID))
|
||||||
if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil {
|
if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -47,6 +47,8 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
resources := r.GetLinux()
|
resources := r.GetLinux()
|
||||||
updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources)
|
updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,6 +44,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
|
|||||||
r.GetPodSandboxId())
|
r.GetPodSandboxId())
|
||||||
return &runtime.RemovePodSandboxResponse{}, nil
|
return &runtime.RemovePodSandboxResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
// Use the full sandbox id.
|
// Use the full sandbox id.
|
||||||
id := sandbox.ID
|
id := sandbox.ID
|
||||||
span.SetAttributes(tracing.Attribute("sandbox.id", id))
|
span.SetAttributes(tracing.Attribute("sandbox.id", id))
|
||||||
|
@ -300,6 +300,8 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
|||||||
|
|
||||||
sandbox.ProcessLabel = labels["selinux_label"]
|
sandbox.ProcessLabel = labels["selinux_label"]
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
err = c.nri.RunPodSandbox(ctx, &sandbox)
|
err = c.nri.RunPodSandbox(ctx, &sandbox)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
|
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
|
||||||
|
@ -46,6 +46,9 @@ func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb
|
|||||||
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L45-L46
|
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L45-L46
|
||||||
return &runtime.StopPodSandboxResponse{}, nil
|
return &runtime.StopPodSandboxResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
span.SetAttributes(tracing.Attribute("sandbox.id", sandbox.ID))
|
span.SetAttributes(tracing.Attribute("sandbox.id", sandbox.ID))
|
||||||
if err := c.stopPodSandbox(ctx, sandbox); err != nil {
|
if err := c.stopPodSandbox(ctx, sandbox); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -81,6 +81,9 @@ type API interface {
|
|||||||
|
|
||||||
// RemoveContainer relays container removal events to NRI.
|
// RemoveContainer relays container removal events to NRI.
|
||||||
RemoveContainer(context.Context, PodSandbox, Container) error
|
RemoveContainer(context.Context, PodSandbox, Container) error
|
||||||
|
|
||||||
|
// BlockPluginSync blocks plugin synchronization until it is Unblock()ed.
|
||||||
|
BlockPluginSync() *PluginSyncBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
type State int
|
type State int
|
||||||
@ -435,6 +438,15 @@ func (l *local) RemoveContainer(ctx context.Context, pod PodSandbox, ctr Contain
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginSyncBlock = nri.PluginSyncBlock
|
||||||
|
|
||||||
|
func (l *local) BlockPluginSync() *PluginSyncBlock {
|
||||||
|
if !l.IsEnabled() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return l.nri.BlockPluginSync()
|
||||||
|
}
|
||||||
|
|
||||||
func (l *local) syncPlugin(ctx context.Context, syncFn nri.SyncCB) error {
|
func (l *local) syncPlugin(ctx context.Context, syncFn nri.SyncCB) error {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user