Merge pull request #11329 from klihub/fixes/containerd/2.0.x/nri-plugin-sync
[release/2.0] fix initial sync race of registering NRI plugins.
This commit is contained in:
commit
be08040e8d
@ -358,6 +358,15 @@ func (a *API) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDele
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginSyncBlock = nri.PluginSyncBlock
|
||||||
|
|
||||||
|
func (a *API) BlockPluginSync() *PluginSyncBlock {
|
||||||
|
if a.IsDisabled() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return a.nri.BlockPluginSync()
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// NRI-CRI 'domain' interface
|
// NRI-CRI 'domain' interface
|
||||||
//
|
//
|
||||||
|
@ -108,6 +108,14 @@ func (*API) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginSyncBlock struct{}
|
||||||
|
|
||||||
|
func (*API) BlockPluginSync() *PluginSyncBlock {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*PluginSyncBlock) Unblock() {}
|
||||||
|
|
||||||
//
|
//
|
||||||
// NRI-CRI no-op 'domain' interface
|
// NRI-CRI no-op 'domain' interface
|
||||||
//
|
//
|
||||||
|
@ -313,6 +313,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
var cntr containerd.Container
|
var cntr containerd.Container
|
||||||
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
|
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
|
||||||
return nil, fmt.Errorf("failed to create containerd container: %w", err)
|
return nil, fmt.Errorf("failed to create containerd container: %w", err)
|
||||||
|
@ -44,6 +44,9 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
|
|||||||
log.G(ctx).Tracef("RemoveContainer called for container %q that does not exist", ctrID)
|
log.G(ctx).Tracef("RemoveContainer called for container %q that does not exist", ctrID)
|
||||||
return &runtime.RemoveContainerResponse{}, nil
|
return &runtime.RemoveContainerResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
id := container.ID
|
id := container.ID
|
||||||
span.SetAttributes(tracing.Attribute("container.id", id))
|
span.SetAttributes(tracing.Attribute("container.id", id))
|
||||||
i, err := container.Container.Info(ctx)
|
i, err := container.Container.Info(ctx)
|
||||||
|
@ -156,6 +156,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
|
|||||||
return nil, fmt.Errorf("failed to wait for containerd task: %w", err)
|
return nil, fmt.Errorf("failed to wait for containerd task: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if retErr != nil {
|
if retErr != nil {
|
||||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||||
|
@ -51,6 +51,9 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
|
|||||||
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68
|
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68
|
||||||
return &runtime.StopContainerResponse{}, nil
|
return &runtime.StopContainerResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
span.SetAttributes(tracing.Attribute("container.id", container.ID))
|
span.SetAttributes(tracing.Attribute("container.id", container.ID))
|
||||||
if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil {
|
if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -47,6 +47,8 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
resources := r.GetLinux()
|
resources := r.GetLinux()
|
||||||
updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources)
|
updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,6 +44,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
|
|||||||
r.GetPodSandboxId())
|
r.GetPodSandboxId())
|
||||||
return &runtime.RemovePodSandboxResponse{}, nil
|
return &runtime.RemovePodSandboxResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
// Use the full sandbox id.
|
// Use the full sandbox id.
|
||||||
id := sandbox.ID
|
id := sandbox.ID
|
||||||
span.SetAttributes(tracing.Attribute("sandbox.id", id))
|
span.SetAttributes(tracing.Attribute("sandbox.id", id))
|
||||||
|
@ -300,6 +300,8 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
|||||||
|
|
||||||
sandbox.ProcessLabel = labels["selinux_label"]
|
sandbox.ProcessLabel = labels["selinux_label"]
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
err = c.nri.RunPodSandbox(ctx, &sandbox)
|
err = c.nri.RunPodSandbox(ctx, &sandbox)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
|
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
|
||||||
|
@ -46,6 +46,9 @@ func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb
|
|||||||
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L45-L46
|
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L45-L46
|
||||||
return &runtime.StopPodSandboxResponse{}, nil
|
return &runtime.StopPodSandboxResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer c.nri.BlockPluginSync().Unblock()
|
||||||
|
|
||||||
span.SetAttributes(tracing.Attribute("sandbox.id", sandbox.ID))
|
span.SetAttributes(tracing.Attribute("sandbox.id", sandbox.ID))
|
||||||
if err := c.stopPodSandbox(ctx, sandbox); err != nil {
|
if err := c.stopPodSandbox(ctx, sandbox); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -81,6 +81,9 @@ type API interface {
|
|||||||
|
|
||||||
// RemoveContainer relays container removal events to NRI.
|
// RemoveContainer relays container removal events to NRI.
|
||||||
RemoveContainer(context.Context, PodSandbox, Container) error
|
RemoveContainer(context.Context, PodSandbox, Container) error
|
||||||
|
|
||||||
|
// BlockPluginSync blocks plugin synchronization until it is Unblock()ed.
|
||||||
|
BlockPluginSync() *PluginSyncBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
type State int
|
type State int
|
||||||
@ -435,6 +438,15 @@ func (l *local) RemoveContainer(ctx context.Context, pod PodSandbox, ctr Contain
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginSyncBlock = nri.PluginSyncBlock
|
||||||
|
|
||||||
|
func (l *local) BlockPluginSync() *PluginSyncBlock {
|
||||||
|
if !l.IsEnabled() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return l.nri.BlockPluginSync()
|
||||||
|
}
|
||||||
|
|
||||||
func (l *local) syncPlugin(ctx context.Context, syncFn nri.SyncCB) error {
|
func (l *local) syncPlugin(ctx context.Context, syncFn nri.SyncCB) error {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user