Merge pull request #6019 from klihub/pr/proto/nri
NRI: add support for NRI with extended scope.
This commit is contained in:
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/pkg/cri/sbserver"
|
||||
"github.com/containerd/containerd/pkg/nri"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
@@ -46,6 +47,7 @@ func init() {
|
||||
Requires: []plugin.Type{
|
||||
plugin.EventPlugin,
|
||||
plugin.ServicePlugin,
|
||||
plugin.NRIApiPlugin,
|
||||
},
|
||||
InitFn: initCRIService,
|
||||
})
|
||||
@@ -85,12 +87,19 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
|
||||
}
|
||||
|
||||
var s server.CRIService
|
||||
var nrip nri.API
|
||||
if os.Getenv("ENABLE_CRI_SANDBOXES") != "" {
|
||||
log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable")
|
||||
s, err = sbserver.NewCRIService(c, client)
|
||||
} else {
|
||||
log.G(ctx).Info("using legacy CRI server")
|
||||
s, err = server.NewCRIService(c, client)
|
||||
|
||||
nrip, err = getNRIPlugin(ic)
|
||||
if err != nil {
|
||||
log.G(ctx).Info("NRI service not found, disabling NRI support")
|
||||
}
|
||||
|
||||
s, err = server.NewCRIService(c, client, nrip)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create CRI service: %w", err)
|
||||
@@ -102,6 +111,14 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
|
||||
}
|
||||
// TODO(random-liu): Whether and how we can stop containerd.
|
||||
}()
|
||||
|
||||
if nrip != nil {
|
||||
log.G(ctx).Info("using experimental NRI integration - disable nri plugin to prevent this")
|
||||
if err = nrip.Start(); err != nil {
|
||||
log.G(ctx).WithError(err).Fatal("Failed to start NRI service")
|
||||
}
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@@ -128,3 +145,24 @@ func setGLogLevel() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the NRI plugin and verify its type.
|
||||
func getNRIPlugin(ic *plugin.InitContext) (nri.API, error) {
|
||||
const (
|
||||
pluginType = plugin.NRIApiPlugin
|
||||
pluginName = "nri"
|
||||
)
|
||||
|
||||
p, err := ic.GetByID(pluginType, pluginName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
api, ok := p.(nri.API)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("NRI plugin (%s, %q) has incompatible type %T",
|
||||
pluginType, pluginName, api)
|
||||
}
|
||||
|
||||
return api, nil
|
||||
}
|
||||
|
||||
@@ -246,6 +246,19 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
|
||||
containerd.WithRuntime(sandboxInfo.Runtime.Name, runtimeOptions),
|
||||
containerd.WithContainerLabels(containerLabels),
|
||||
containerd.WithContainerExtension(containerMetadataExtension, &meta))
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
opts = append(opts, c.nri.WithContainerAdjustment())
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||
defer deferCancel()
|
||||
c.nri.undoCreateContainer(deferCtx, &sandbox, id, spec)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var cntr containerd.Container
|
||||
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
|
||||
return nil, fmt.Errorf("failed to create containerd container: %w", err)
|
||||
@@ -284,6 +297,13 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
|
||||
return nil, fmt.Errorf("failed to add container %q into store: %w", id, err)
|
||||
}
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
err = c.nri.postCreateContainer(ctx, &sandbox, &container)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("NRI post-create notification failed")
|
||||
}
|
||||
}
|
||||
|
||||
containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start)
|
||||
|
||||
return &runtime.CreateContainerResponse{ContainerId: id}, nil
|
||||
|
||||
@@ -73,6 +73,18 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
|
||||
}
|
||||
}()
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
sandbox, err := c.sandboxStore.Get(container.SandboxID)
|
||||
if err != nil {
|
||||
err = c.nri.removeContainer(ctx, nil, &container)
|
||||
} else {
|
||||
err = c.nri.removeContainer(ctx, &sandbox, &container)
|
||||
}
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("NRI failed to remove container")
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE(random-liu): Docker set container to "Dead" state when start removing the
|
||||
// container so as to avoid start/restart the container again. However, for current
|
||||
// kubelet implementation, we'll never start a container once we decide to remove it,
|
||||
|
||||
@@ -27,8 +27,6 @@ import (
|
||||
containerdio "github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/nri"
|
||||
v1 "github.com/containerd/nri/types/v1"
|
||||
"github.com/sirupsen/logrus"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
@@ -135,7 +133,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
|
||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||
defer deferCancel()
|
||||
// It's possible that task is deleted by event monitor.
|
||||
if _, err := task.Delete(deferCtx, WithNRISandboxDelete(sandboxID), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
|
||||
if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
|
||||
log.G(ctx).WithError(err).Errorf("Failed to delete containerd task %q", id)
|
||||
}
|
||||
}
|
||||
@@ -146,17 +144,22 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to wait for containerd task: %w", err)
|
||||
}
|
||||
nric, err := nri.New()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("unable to create nri client")
|
||||
}
|
||||
if nric != nil {
|
||||
nriSB := &nri.Sandbox{
|
||||
ID: sandboxID,
|
||||
Labels: sandbox.Config.Labels,
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||
defer deferCancel()
|
||||
err = c.nri.stopContainer(deferCtx, &sandbox, &cntr)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id)
|
||||
}
|
||||
}
|
||||
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil {
|
||||
return nil, fmt.Errorf("nri invoke: %w", err)
|
||||
}()
|
||||
if c.nri.isEnabled() {
|
||||
err = c.nri.startContainer(ctx, &sandbox, &cntr)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("NRI container start failed")
|
||||
return nil, fmt.Errorf("NRI container start failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,6 +180,13 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
|
||||
// It handles the TaskExit event and update container state after this.
|
||||
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
err = c.nri.postStartContainer(ctx, &sandbox, &cntr)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("NRI post-start notification failed")
|
||||
}
|
||||
}
|
||||
|
||||
containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start)
|
||||
|
||||
return &runtime.StartContainerResponse{}, nil
|
||||
|
||||
@@ -47,6 +47,18 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
sandbox, err := c.sandboxStore.Get(container.SandboxID)
|
||||
if err != nil {
|
||||
err = c.nri.stopContainer(ctx, nil, &container)
|
||||
} else {
|
||||
err = c.nri.stopContainer(ctx, &sandbox, &container)
|
||||
}
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("NRI failed to stop container")
|
||||
}
|
||||
}
|
||||
|
||||
i, err := container.Container.Info(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get container info: %w", err)
|
||||
@@ -78,7 +90,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
|
||||
}
|
||||
// Don't return for unknown state, some cleanup needs to be done.
|
||||
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
|
||||
return cleanupUnknownContainer(ctx, id, container)
|
||||
return c.cleanupUnknownContainer(ctx, id, container)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -93,7 +105,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
|
||||
}
|
||||
return cleanupUnknownContainer(ctx, id, container)
|
||||
return c.cleanupUnknownContainer(ctx, id, container)
|
||||
}
|
||||
|
||||
exitCtx, exitCancel := context.WithCancel(context.Background())
|
||||
@@ -196,7 +208,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
|
||||
}
|
||||
|
||||
// cleanupUnknownContainer cleanup stopped container in unknown state.
|
||||
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
|
||||
func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
|
||||
// Reuse handleContainerExit to do the cleanup.
|
||||
return handleContainerExit(ctx, &eventtypes.TaskExit{
|
||||
ContainerID: id,
|
||||
@@ -204,5 +216,5 @@ func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore
|
||||
Pid: 0,
|
||||
ExitStatus: unknownExitCode,
|
||||
ExitedAt: protobuf.ToTimestamp(time.Now()),
|
||||
}, cntr)
|
||||
}, cntr, c)
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
|
||||
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
|
||||
)
|
||||
|
||||
@@ -42,6 +43,24 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find container: %w", err)
|
||||
}
|
||||
|
||||
var sandbox sandboxstore.Sandbox
|
||||
if c.nri.isEnabled() {
|
||||
sandbox, err = c.sandboxStore.Get(container.SandboxID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resources := r.GetLinux()
|
||||
updated, err := c.nri.updateContainer(ctx, &sandbox, &container, resources)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("NRI container update failed: %w", err)
|
||||
}
|
||||
if updated != nil {
|
||||
*resources = *updated
|
||||
}
|
||||
}
|
||||
|
||||
// Update resources in status update transaction, so that:
|
||||
// 1) There won't be race condition with container start.
|
||||
// 2) There won't be concurrent resource update to the same container.
|
||||
@@ -50,6 +69,14 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("failed to update resources: %w", err)
|
||||
}
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
err = c.nri.postUpdateContainer(ctx, &sandbox, &container)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("NRI post-update notification failed")
|
||||
}
|
||||
}
|
||||
|
||||
return &runtime.UpdateContainerResourcesResponse{}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -136,7 +136,7 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
|
||||
|
||||
sb, err := em.c.sandboxStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleSandboxExit(dctx, e, sb); err != nil {
|
||||
if err := handleSandboxExit(dctx, e, sb, em.c); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -187,7 +187,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
|
||||
|
||||
cntr, err := em.c.containerStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleContainerExit(dctx, e, cntr); err != nil {
|
||||
if err := handleContainerExit(dctx, e, cntr, em.c); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -313,7 +313,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
// Use ID instead of ContainerID to rule out TaskExit event for exec.
|
||||
cntr, err := em.c.containerStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleContainerExit(ctx, e, cntr); err != nil {
|
||||
if err := handleContainerExit(ctx, e, cntr, em.c); err != nil {
|
||||
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
|
||||
}
|
||||
return nil
|
||||
@@ -322,7 +322,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
}
|
||||
sb, err := em.c.sandboxStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleSandboxExit(ctx, e, sb); err != nil {
|
||||
if err := handleSandboxExit(ctx, e, sb, em.c); err != nil {
|
||||
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
|
||||
}
|
||||
return nil
|
||||
@@ -362,7 +362,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
}
|
||||
|
||||
// handleContainerExit handles TaskExit event for container.
|
||||
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error {
|
||||
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, c *criService) error {
|
||||
// Attach container IO so that `Delete` could cleanup the stream properly.
|
||||
task, err := cntr.Container.Task(ctx,
|
||||
func(*containerdio.FIFOSet) (containerdio.IO, error) {
|
||||
@@ -384,7 +384,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
|
||||
}
|
||||
} else {
|
||||
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
|
||||
if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), containerd.WithProcessKill); err != nil {
|
||||
if _, err = task.Delete(ctx, c.nri.WithContainerExit(&cntr), containerd.WithProcessKill); err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to stop container: %w", err)
|
||||
}
|
||||
@@ -415,7 +415,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
|
||||
}
|
||||
|
||||
// handleSandboxExit handles TaskExit event for sandbox.
|
||||
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error {
|
||||
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error {
|
||||
// No stream attached to sandbox container.
|
||||
task, err := sb.Container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
@@ -424,7 +424,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
|
||||
}
|
||||
} else {
|
||||
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
|
||||
if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil {
|
||||
if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to stop sandbox: %w", err)
|
||||
}
|
||||
|
||||
782
pkg/cri/server/nri-api.go
Normal file
782
pkg/cri/server/nri-api.go
Normal file
@@ -0,0 +1,782 @@
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/pkg/cri/annotations"
|
||||
cstore "github.com/containerd/containerd/pkg/cri/store/container"
|
||||
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/opencontainers/runtime-tools/generate"
|
||||
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/pkg/cri/constants"
|
||||
"github.com/containerd/containerd/pkg/nri"
|
||||
|
||||
"github.com/containerd/nri/pkg/api"
|
||||
|
||||
nrigen "github.com/containerd/nri/pkg/runtime-tools/generate"
|
||||
)
|
||||
|
||||
type nriAPI struct {
|
||||
cri *criService
|
||||
nri nri.API
|
||||
}
|
||||
|
||||
func (a *nriAPI) register() {
|
||||
if !a.isEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
nri.RegisterDomain(a)
|
||||
}
|
||||
|
||||
func (a *nriAPI) isEnabled() bool {
|
||||
return a != nil && a.nri != nil && a.nri.IsEnabled()
|
||||
}
|
||||
|
||||
//
|
||||
// CRI-NRI lifecycle hook interface
|
||||
//
|
||||
// These functions are used to hook NRI into the processing of
|
||||
// the corresponding CRI lifecycle events using the common NRI
|
||||
// interface.
|
||||
//
|
||||
|
||||
func (a *nriAPI) runPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
err := a.nri.RunPodSandbox(ctx, pod)
|
||||
|
||||
if err != nil {
|
||||
a.nri.StopPodSandbox(ctx, pod)
|
||||
a.nri.RemovePodSandbox(ctx, pod)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) stopPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
err := a.nri.StopPodSandbox(ctx, pod)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) removePodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
|
||||
err := a.nri.RemovePodSandbox(ctx, pod)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) createContainer(ctx context.Context, ctrs *containers.Container, spec *specs.Spec) (*api.ContainerAdjustment, error) {
|
||||
ctr := a.nriContainer(ctrs, spec)
|
||||
|
||||
criPod, err := a.cri.sandboxStore.Get(ctr.GetPodSandboxID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pod := a.nriPodSandbox(&criPod)
|
||||
|
||||
adjust, err := a.nri.CreateContainer(ctx, pod, ctr)
|
||||
|
||||
return adjust, err
|
||||
}
|
||||
|
||||
func (a *nriAPI) postCreateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
err := a.nri.PostCreateContainer(ctx, pod, ctr)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) startContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
err := a.nri.StartContainer(ctx, pod, ctr)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) postStartContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
err := a.nri.PostStartContainer(ctx, pod, ctr)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) updateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container, req *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) {
|
||||
const noOomAdj = 0
|
||||
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
r, err := a.nri.UpdateContainer(ctx, pod, ctr, api.FromCRILinuxResources(req))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.ToCRI(noOomAdj), nil
|
||||
}
|
||||
|
||||
func (a *nriAPI) postUpdateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
err := a.nri.PostUpdateContainer(ctx, pod, ctr)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) stopContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
if criPod == nil || criPod.ID == "" {
|
||||
criPod = &sstore.Sandbox{
|
||||
Metadata: sstore.Metadata{
|
||||
ID: ctr.GetPodSandboxID(),
|
||||
},
|
||||
}
|
||||
}
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
|
||||
err := a.nri.StopContainer(ctx, pod, ctr)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) notifyContainerExit(ctx context.Context, criCtr *cstore.Container) {
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
criPod, _ := a.cri.sandboxStore.Get(ctr.GetPodSandboxID())
|
||||
if criPod.ID == "" {
|
||||
criPod = sstore.Sandbox{
|
||||
Metadata: sstore.Metadata{
|
||||
ID: ctr.GetPodSandboxID(),
|
||||
},
|
||||
}
|
||||
}
|
||||
pod := a.nriPodSandbox(&criPod)
|
||||
|
||||
a.nri.NotifyContainerExit(ctx, pod, ctr)
|
||||
}
|
||||
|
||||
func (a *nriAPI) removeContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
ctr := a.nriContainer(criCtr, nil)
|
||||
|
||||
err := a.nri.RemoveContainer(ctx, pod, ctr)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *nriAPI) undoCreateContainer(ctx context.Context, criPod *sstore.Sandbox, id string, spec *specs.Spec) {
|
||||
pod := a.nriPodSandbox(criPod)
|
||||
ctr := a.nriContainer(&containers.Container{ID: id}, spec)
|
||||
|
||||
err := a.nri.StopContainer(ctx, pod, ctr)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("container creation undo (stop) failed")
|
||||
}
|
||||
|
||||
err = a.nri.RemoveContainer(ctx, pod, ctr)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("container creation undo (remove) failed")
|
||||
}
|
||||
}
|
||||
|
||||
func (a *nriAPI) WithContainerAdjustment() containerd.NewContainerOpts {
|
||||
resourceCheckOpt := nrigen.WithResourceChecker(
|
||||
func(r *runtimespec.LinuxResources) error {
|
||||
if r != nil {
|
||||
if a.cri.config.DisableHugetlbController {
|
||||
r.HugepageLimits = nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
rdtResolveOpt := nrigen.WithRdtResolver(
|
||||
func(className string) (*runtimespec.LinuxIntelRdt, error) {
|
||||
if className == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return &runtimespec.LinuxIntelRdt{
|
||||
ClosID: className,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
|
||||
blkioResolveOpt := nrigen.WithBlockIOResolver(
|
||||
func(className string) (*runtimespec.LinuxBlockIO, error) {
|
||||
if className == "" {
|
||||
return nil, nil
|
||||
}
|
||||
blockIO, err := blockIOToLinuxOci(className)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return blockIO, nil
|
||||
},
|
||||
)
|
||||
|
||||
return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error {
|
||||
spec := &specs.Spec{}
|
||||
if err := json.Unmarshal(c.Spec.GetValue(), spec); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal container OCI Spec for NRI: %w", err)
|
||||
}
|
||||
|
||||
adjust, err := a.createContainer(ctx, c, spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get NRI adjustment for container: %w", err)
|
||||
}
|
||||
|
||||
sgen := generate.Generator{Config: spec}
|
||||
ngen := nrigen.SpecGenerator(&sgen, resourceCheckOpt, rdtResolveOpt, blkioResolveOpt)
|
||||
|
||||
err = ngen.Adjust(adjust)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to NRI-adjust container Spec: %w", err)
|
||||
}
|
||||
|
||||
adjusted, err := typeurl.MarshalAny(spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal NRI-adjusted Spec: %w", err)
|
||||
}
|
||||
|
||||
c.Spec = adjusted
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (a *nriAPI) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDeleteOpts {
|
||||
if !a.isEnabled() {
|
||||
return func(_ context.Context, _ containerd.Process) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return func(_ context.Context, _ containerd.Process) error {
|
||||
a.notifyContainerExit(context.Background(), criCtr)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// NRI-CRI 'domain' interface
|
||||
//
|
||||
// These functions are used to interface CRI pods and containers
|
||||
// from the common NRI interface. They implement pod and container
|
||||
// discovery, lookup and updating of container parameters.
|
||||
//
|
||||
|
||||
const (
|
||||
nriDomain = constants.K8sContainerdNamespace
|
||||
)
|
||||
|
||||
func (a *nriAPI) GetName() string {
|
||||
return nriDomain
|
||||
}
|
||||
|
||||
func (a *nriAPI) ListPodSandboxes() []nri.PodSandbox {
|
||||
pods := []nri.PodSandbox{}
|
||||
for _, pod := range a.cri.sandboxStore.List() {
|
||||
if pod.Status.Get().State != sstore.StateUnknown {
|
||||
pod := pod
|
||||
pods = append(pods, a.nriPodSandbox(&pod))
|
||||
}
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
func (a *nriAPI) ListContainers() []nri.Container {
|
||||
containers := []nri.Container{}
|
||||
for _, ctr := range a.cri.containerStore.List() {
|
||||
switch ctr.Status.Get().State() {
|
||||
case cri.ContainerState_CONTAINER_EXITED:
|
||||
continue
|
||||
case cri.ContainerState_CONTAINER_UNKNOWN:
|
||||
continue
|
||||
}
|
||||
ctr := ctr
|
||||
containers = append(containers, a.nriContainer(&ctr, nil))
|
||||
}
|
||||
return containers
|
||||
}
|
||||
|
||||
func (a *nriAPI) GetPodSandbox(id string) (nri.PodSandbox, bool) {
|
||||
pod, err := a.cri.sandboxStore.Get(id)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return a.nriPodSandbox(&pod), true
|
||||
}
|
||||
|
||||
func (a *nriAPI) GetContainer(id string) (nri.Container, bool) {
|
||||
ctr, err := a.cri.containerStore.Get(id)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return a.nriContainer(&ctr, nil), true
|
||||
}
|
||||
|
||||
func (a *nriAPI) UpdateContainer(ctx context.Context, u *api.ContainerUpdate) error {
|
||||
ctr, err := a.cri.containerStore.Get(u.ContainerId)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err = ctr.Status.UpdateSync(
|
||||
func(status cstore.Status) (cstore.Status, error) {
|
||||
criReq := &cri.UpdateContainerResourcesRequest{
|
||||
ContainerId: u.ContainerId,
|
||||
Linux: u.GetLinux().GetResources().ToCRI(0),
|
||||
}
|
||||
newStatus, err := a.cri.updateContainerResources(ctx, ctr, criReq, status)
|
||||
return newStatus, err
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
if !u.IgnoreFailure {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *nriAPI) EvictContainer(ctx context.Context, e *api.ContainerEviction) error {
|
||||
ctr, err := a.cri.containerStore.Get(e.ContainerId)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
err = a.cri.stopContainer(ctx, ctr, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// NRI integration wrapper for CRI Pods
|
||||
//
|
||||
|
||||
type criPodSandbox struct {
|
||||
*sstore.Sandbox
|
||||
spec *specs.Spec
|
||||
pid uint32
|
||||
}
|
||||
|
||||
func (a *nriAPI) nriPodSandbox(pod *sstore.Sandbox) *criPodSandbox {
|
||||
criPod := &criPodSandbox{
|
||||
Sandbox: pod,
|
||||
spec: &specs.Spec{},
|
||||
}
|
||||
|
||||
if pod == nil || pod.Container == nil {
|
||||
return criPod
|
||||
}
|
||||
|
||||
ctx := ctrdutil.NamespacedContext()
|
||||
task, err := pod.Container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
log.L.WithError(err).Errorf("failed to get task for sandbox container %s",
|
||||
pod.Container.ID())
|
||||
}
|
||||
return criPod
|
||||
}
|
||||
|
||||
criPod.pid = task.Pid()
|
||||
spec, err := task.Spec(ctx)
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to get spec for sandbox container %s",
|
||||
pod.Container.ID())
|
||||
}
|
||||
return criPod
|
||||
}
|
||||
|
||||
criPod.spec = spec
|
||||
|
||||
return criPod
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetDomain() string {
|
||||
return nriDomain
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetID() string {
|
||||
if p.Sandbox == nil {
|
||||
return ""
|
||||
}
|
||||
return p.ID
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetName() string {
|
||||
if p.Sandbox == nil {
|
||||
return ""
|
||||
}
|
||||
return p.Config.GetMetadata().GetName()
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetUID() string {
|
||||
if p.Sandbox == nil {
|
||||
return ""
|
||||
}
|
||||
return p.Config.GetMetadata().GetUid()
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetNamespace() string {
|
||||
if p.Sandbox == nil {
|
||||
return ""
|
||||
}
|
||||
return p.Config.GetMetadata().GetNamespace()
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetAnnotations() map[string]string {
|
||||
if p.Sandbox == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
annotations := map[string]string{}
|
||||
|
||||
for key, value := range p.Config.GetAnnotations() {
|
||||
annotations[key] = value
|
||||
}
|
||||
for key, value := range p.spec.Annotations {
|
||||
annotations[key] = value
|
||||
}
|
||||
|
||||
return annotations
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetLabels() map[string]string {
|
||||
if p.Sandbox == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
labels := map[string]string{}
|
||||
|
||||
for key, value := range p.Config.GetLabels() {
|
||||
labels[key] = value
|
||||
}
|
||||
|
||||
if p.Sandbox.Container == nil {
|
||||
return labels
|
||||
}
|
||||
|
||||
ctx := ctrdutil.NamespacedContext()
|
||||
ctrd := p.Sandbox.Container
|
||||
ctrs, err := ctrd.Info(ctx, containerd.WithoutRefreshedMetadata)
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to get info for sandbox container %s", ctrd.ID())
|
||||
return labels
|
||||
}
|
||||
|
||||
for key, value := range ctrs.Labels {
|
||||
labels[key] = value
|
||||
}
|
||||
|
||||
return labels
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetRuntimeHandler() string {
|
||||
if p.Sandbox == nil {
|
||||
return ""
|
||||
}
|
||||
return p.RuntimeHandler
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetLinuxPodSandbox() nri.LinuxPodSandbox {
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetLinuxNamespaces() []*api.LinuxNamespace {
|
||||
if p.spec.Linux != nil {
|
||||
return api.FromOCILinuxNamespaces(p.spec.Linux.Namespaces)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetPodLinuxOverhead() *api.LinuxResources {
|
||||
if p.Sandbox == nil {
|
||||
return nil
|
||||
}
|
||||
return api.FromCRILinuxResources(p.Config.GetLinux().GetOverhead())
|
||||
}
|
||||
func (p *criPodSandbox) GetPodLinuxResources() *api.LinuxResources {
|
||||
if p.Sandbox == nil {
|
||||
return nil
|
||||
}
|
||||
return api.FromCRILinuxResources(p.Config.GetLinux().GetResources())
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetLinuxResources() *api.LinuxResources {
|
||||
if p.spec.Linux == nil {
|
||||
return nil
|
||||
}
|
||||
return api.FromOCILinuxResources(p.spec.Linux.Resources, nil)
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetCgroupParent() string {
|
||||
if p.Sandbox == nil {
|
||||
return ""
|
||||
}
|
||||
return p.Config.GetLinux().GetCgroupParent()
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetCgroupsPath() string {
|
||||
if p.spec.Linux == nil {
|
||||
return ""
|
||||
}
|
||||
return p.spec.Linux.CgroupsPath
|
||||
}
|
||||
|
||||
func (p *criPodSandbox) GetPid() uint32 {
|
||||
return p.pid
|
||||
}
|
||||
|
||||
//
|
||||
// NRI integration wrapper for CRI Containers
|
||||
//
|
||||
|
||||
type criContainer struct {
|
||||
api *nriAPI
|
||||
ctrs *containers.Container
|
||||
spec *specs.Spec
|
||||
meta *cstore.Metadata
|
||||
pid uint32
|
||||
}
|
||||
|
||||
func (a *nriAPI) nriContainer(ctr interface{}, spec *specs.Spec) *criContainer {
|
||||
switch c := ctr.(type) {
|
||||
case *cstore.Container:
|
||||
ctx := ctrdutil.NamespacedContext()
|
||||
pid := uint32(0)
|
||||
ctrd := c.Container
|
||||
ctrs, err := ctrd.Info(ctx, containerd.WithoutRefreshedMetadata)
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to get info for container %s", ctrd.ID())
|
||||
}
|
||||
spec, err := ctrd.Spec(ctx)
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to get OCI Spec for container %s", ctrd.ID())
|
||||
spec = &specs.Spec{}
|
||||
}
|
||||
task, err := ctrd.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
log.L.WithError(err).Errorf("failed to get task for container %s", ctrd.ID())
|
||||
}
|
||||
} else {
|
||||
pid = task.Pid()
|
||||
}
|
||||
|
||||
return &criContainer{
|
||||
api: a,
|
||||
ctrs: &ctrs,
|
||||
meta: &c.Metadata,
|
||||
spec: spec,
|
||||
pid: pid,
|
||||
}
|
||||
|
||||
case *containers.Container:
|
||||
ctrs := c
|
||||
meta := &cstore.Metadata{}
|
||||
if ext := ctrs.Extensions[containerMetadataExtension]; ext != nil {
|
||||
err := typeurl.UnmarshalTo(ext, meta)
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to get metadata for container %s", ctrs.ID)
|
||||
}
|
||||
}
|
||||
|
||||
return &criContainer{
|
||||
api: a,
|
||||
ctrs: ctrs,
|
||||
meta: meta,
|
||||
spec: spec,
|
||||
}
|
||||
}
|
||||
|
||||
log.L.Errorf("can't wrap %T as NRI container", ctr)
|
||||
return &criContainer{
|
||||
api: a,
|
||||
meta: &cstore.Metadata{},
|
||||
spec: &specs.Spec{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *criContainer) GetDomain() string {
|
||||
return nriDomain
|
||||
}
|
||||
|
||||
func (c *criContainer) GetID() string {
|
||||
if c.ctrs != nil {
|
||||
return c.ctrs.ID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *criContainer) GetPodSandboxID() string {
|
||||
return c.spec.Annotations[annotations.SandboxID]
|
||||
}
|
||||
|
||||
func (c *criContainer) GetName() string {
|
||||
return c.spec.Annotations[annotations.ContainerName]
|
||||
}
|
||||
|
||||
func (c *criContainer) GetState() api.ContainerState {
|
||||
criCtr, err := c.api.cri.containerStore.Get(c.GetID())
|
||||
if err != nil {
|
||||
return api.ContainerState_CONTAINER_UNKNOWN
|
||||
}
|
||||
switch criCtr.Status.Get().State() {
|
||||
case cri.ContainerState_CONTAINER_CREATED:
|
||||
return api.ContainerState_CONTAINER_CREATED
|
||||
case cri.ContainerState_CONTAINER_RUNNING:
|
||||
return api.ContainerState_CONTAINER_RUNNING
|
||||
case cri.ContainerState_CONTAINER_EXITED:
|
||||
return api.ContainerState_CONTAINER_STOPPED
|
||||
}
|
||||
|
||||
return api.ContainerState_CONTAINER_UNKNOWN
|
||||
}
|
||||
|
||||
func (c *criContainer) GetLabels() map[string]string {
|
||||
if c.ctrs == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
labels := map[string]string{}
|
||||
for key, value := range c.ctrs.Labels {
|
||||
labels[key] = value
|
||||
}
|
||||
|
||||
if c.meta != nil && c.meta.Config != nil {
|
||||
for key, value := range c.meta.Config.Labels {
|
||||
labels[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
return labels
|
||||
}
|
||||
|
||||
func (c *criContainer) GetAnnotations() map[string]string {
|
||||
annotations := map[string]string{}
|
||||
|
||||
for key, value := range c.spec.Annotations {
|
||||
annotations[key] = value
|
||||
}
|
||||
if c.meta != nil && c.meta.Config != nil {
|
||||
for key, value := range c.meta.Config.Annotations {
|
||||
annotations[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
return annotations
|
||||
}
|
||||
|
||||
func (c *criContainer) GetArgs() []string {
|
||||
if c.spec.Process == nil {
|
||||
return nil
|
||||
}
|
||||
return api.DupStringSlice(c.spec.Process.Args)
|
||||
}
|
||||
|
||||
func (c *criContainer) GetEnv() []string {
|
||||
if c.spec.Process == nil {
|
||||
return nil
|
||||
}
|
||||
return api.DupStringSlice(c.spec.Process.Env)
|
||||
}
|
||||
|
||||
func (c *criContainer) GetMounts() []*api.Mount {
|
||||
return api.FromOCIMounts(c.spec.Mounts)
|
||||
}
|
||||
|
||||
func (c *criContainer) GetHooks() *api.Hooks {
|
||||
return api.FromOCIHooks(c.spec.Hooks)
|
||||
}
|
||||
|
||||
func (c *criContainer) GetLinuxContainer() nri.LinuxContainer {
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *criContainer) GetLinuxNamespaces() []*api.LinuxNamespace {
|
||||
if c.spec.Linux == nil {
|
||||
return nil
|
||||
}
|
||||
return api.FromOCILinuxNamespaces(c.spec.Linux.Namespaces)
|
||||
}
|
||||
|
||||
func (c *criContainer) GetLinuxDevices() []*api.LinuxDevice {
|
||||
if c.spec.Linux == nil {
|
||||
return nil
|
||||
}
|
||||
return api.FromOCILinuxDevices(c.spec.Linux.Devices)
|
||||
}
|
||||
|
||||
func (c *criContainer) GetLinuxResources() *api.LinuxResources {
|
||||
if c.spec.Linux == nil {
|
||||
return nil
|
||||
}
|
||||
return api.FromOCILinuxResources(c.spec.Linux.Resources, c.spec.Annotations)
|
||||
}
|
||||
|
||||
func (c *criContainer) GetOOMScoreAdj() *int {
|
||||
if c.spec.Process == nil {
|
||||
return nil
|
||||
}
|
||||
return c.spec.Process.OOMScoreAdj
|
||||
}
|
||||
|
||||
func (c *criContainer) GetCgroupsPath() string {
|
||||
if c.spec.Linux == nil {
|
||||
return ""
|
||||
}
|
||||
return c.spec.Linux.CgroupsPath
|
||||
}
|
||||
|
||||
func (c *criContainer) GetPid() uint32 {
|
||||
return c.pid
|
||||
}
|
||||
134
pkg/cri/server/nri-api_other.go
Normal file
134
pkg/cri/server/nri-api_other.go
Normal file
@@ -0,0 +1,134 @@
|
||||
//go:build !linux
|
||||
// +build !linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/containers"
|
||||
cstore "github.com/containerd/containerd/pkg/cri/store/container"
|
||||
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
|
||||
"github.com/containerd/containerd/pkg/cri/constants"
|
||||
"github.com/containerd/containerd/pkg/nri"
|
||||
|
||||
"github.com/containerd/nri/pkg/api"
|
||||
)
|
||||
|
||||
type nriAPI struct {
|
||||
cri *criService
|
||||
nri nri.API
|
||||
}
|
||||
|
||||
func (a *nriAPI) register() {
|
||||
}
|
||||
|
||||
func (a *nriAPI) isEnabled() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
//
|
||||
// CRI-NRI lifecycle hook no-op interface
|
||||
//
|
||||
|
||||
func (*nriAPI) runPodSandbox(context.Context, *sstore.Sandbox) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) stopPodSandbox(context.Context, *sstore.Sandbox) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) removePodSandbox(context.Context, *sstore.Sandbox) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) postCreateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) startContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) postStartContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) stopContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) removeContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) undoCreateContainer(context.Context, *sstore.Sandbox, string, *specs.Spec) {
|
||||
}
|
||||
|
||||
func (*nriAPI) WithContainerAdjustment() containerd.NewContainerOpts {
|
||||
return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (*nriAPI) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts {
|
||||
return func(_ context.Context, _ containerd.Process) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// NRI-CRI no-op 'domain' interface
|
||||
//
|
||||
|
||||
const (
|
||||
nriDomain = constants.K8sContainerdNamespace
|
||||
)
|
||||
|
||||
func (*nriAPI) GetName() string {
|
||||
return nriDomain
|
||||
}
|
||||
|
||||
func (*nriAPI) ListPodSandboxes() []nri.PodSandbox {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) ListContainers() []nri.Container {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) GetPodSandbox(string) (nri.PodSandbox, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (*nriAPI) GetContainer(string) (nri.Container, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (*nriAPI) UpdateContainer(context.Context, *api.ContainerUpdate) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*nriAPI) EvictContainer(context.Context, *api.ContainerEviction) error {
|
||||
return nil
|
||||
}
|
||||
37
pkg/cri/server/nri-api_windows.go
Normal file
37
pkg/cri/server/nri-api_windows.go
Normal file
@@ -0,0 +1,37 @@
|
||||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
cstore "github.com/containerd/containerd/pkg/cri/store/container"
|
||||
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
|
||||
|
||||
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
)
|
||||
|
||||
func (*nriAPI) updateContainer(context.Context, *sstore.Sandbox, *cstore.Container, *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (*nriAPI) postUpdateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/nri"
|
||||
v1 "github.com/containerd/nri/types/v1"
|
||||
)
|
||||
|
||||
// WithNRISandboxDelete calls delete for a sandbox'd task
|
||||
func WithNRISandboxDelete(sandboxID string) containerd.ProcessDeleteOpts {
|
||||
return func(ctx context.Context, p containerd.Process) error {
|
||||
task, ok := p.(containerd.Task)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
nric, err := nri.New()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("unable to create nri client")
|
||||
return nil
|
||||
}
|
||||
if nric == nil {
|
||||
return nil
|
||||
}
|
||||
sb := &nri.Sandbox{
|
||||
ID: sandboxID,
|
||||
}
|
||||
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Delete, sb); err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("Failed to delete nri for %q", task.ID())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -101,6 +101,13 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
|
||||
log.G(ctx).Tracef("Remove called for sandbox container %q that does not exist", id)
|
||||
}
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
err = c.nri.removePodSandbox(ctx, &sandbox)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed")
|
||||
}
|
||||
}
|
||||
|
||||
// Remove sandbox from sandbox store. Note that once the sandbox is successfully
|
||||
// deleted:
|
||||
// 1) ListPodSandbox will not include this sandbox.
|
||||
|
||||
@@ -28,8 +28,6 @@ import (
|
||||
"time"
|
||||
|
||||
cni "github.com/containerd/go-cni"
|
||||
"github.com/containerd/nri"
|
||||
v1 "github.com/containerd/nri/types/v1"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
selinux "github.com/opencontainers/selinux/go-selinux"
|
||||
@@ -346,7 +344,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||
defer deferCancel()
|
||||
// Cleanup the sandbox container if an error is returned.
|
||||
if _, err := task.Delete(deferCtx, WithNRISandboxDelete(id), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
|
||||
if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
|
||||
log.G(ctx).WithError(err).Errorf("Failed to delete sandbox container %q", id)
|
||||
cleanupErr = err
|
||||
}
|
||||
@@ -359,18 +357,19 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
return nil, fmt.Errorf("failed to wait for sandbox container task: %w", err)
|
||||
}
|
||||
|
||||
nric, err := nri.New()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create nri client: %w", err)
|
||||
}
|
||||
if nric != nil {
|
||||
nriSB := &nri.Sandbox{
|
||||
ID: id,
|
||||
Labels: config.Labels,
|
||||
}
|
||||
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil {
|
||||
return nil, fmt.Errorf("nri invoke: %w", err)
|
||||
if c.nri.isEnabled() {
|
||||
err = c.nri.runPodSandbox(ctx, &sandbox)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
deferCtx, deferCancel := ctrdutil.DeferContext()
|
||||
defer deferCancel()
|
||||
c.nri.removePodSandbox(deferCtx, &sandbox)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if err := task.Start(ctx); err != nil {
|
||||
|
||||
@@ -82,6 +82,13 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
|
||||
}
|
||||
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
|
||||
|
||||
if c.nri.isEnabled() {
|
||||
err := c.nri.stopPodSandbox(ctx, &sandbox)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed")
|
||||
}
|
||||
}
|
||||
|
||||
// Teardown network for sandbox.
|
||||
if sandbox.NetNS != nil {
|
||||
netStop := time.Now()
|
||||
@@ -120,7 +127,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
|
||||
}
|
||||
// Don't return for unknown state, some cleanup needs to be done.
|
||||
if state == sandboxstore.StateUnknown {
|
||||
return cleanupUnknownSandbox(ctx, id, sandbox)
|
||||
return c.cleanupUnknownSandbox(ctx, id, sandbox)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -136,7 +143,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to wait for task: %w", err)
|
||||
}
|
||||
return cleanupUnknownSandbox(ctx, id, sandbox)
|
||||
return c.cleanupUnknownSandbox(ctx, id, sandbox)
|
||||
}
|
||||
|
||||
exitCtx, exitCancel := context.WithCancel(context.Background())
|
||||
@@ -190,7 +197,7 @@ func (c *criService) teardownPodNetwork(ctx context.Context, sandbox sandboxstor
|
||||
}
|
||||
|
||||
// cleanupUnknownSandbox cleanup stopped sandbox in unknown state.
|
||||
func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
|
||||
func (c *criService) cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
|
||||
// Reuse handleSandboxExit to do the cleanup.
|
||||
return handleSandboxExit(ctx, &eventtypes.TaskExit{
|
||||
ContainerID: id,
|
||||
@@ -198,5 +205,5 @@ func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.
|
||||
Pid: 0,
|
||||
ExitStatus: unknownExitCode,
|
||||
ExitedAt: protobuf.ToTimestamp(time.Now()),
|
||||
}, sandbox)
|
||||
}, sandbox, c)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/containerd/containerd/oci"
|
||||
"github.com/containerd/containerd/pkg/cri/streaming"
|
||||
"github.com/containerd/containerd/pkg/kmutex"
|
||||
"github.com/containerd/containerd/pkg/nri"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
runtime_alpha "github.com/containerd/containerd/third_party/k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
cni "github.com/containerd/go-cni"
|
||||
@@ -67,6 +68,7 @@ type grpcAlphaServices interface {
|
||||
// CRIService is the interface implement CRI remote service server.
|
||||
type CRIService interface {
|
||||
Run() error
|
||||
|
||||
// io.Closer is used by containerd to gracefully stop cri service.
|
||||
io.Closer
|
||||
Register(*grpc.Server) error
|
||||
@@ -118,10 +120,12 @@ type criService struct {
|
||||
// one in-flight fetch request or unpack handler for a given descriptor's
|
||||
// or chain ID.
|
||||
unpackDuplicationSuppressor kmutex.KeyedLocker
|
||||
|
||||
nri *nriAPI
|
||||
}
|
||||
|
||||
// NewCRIService returns a new instance of CRIService
|
||||
func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) {
|
||||
func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.API) (CRIService, error) {
|
||||
var err error
|
||||
labels := label.NewStore()
|
||||
c := &criService{
|
||||
@@ -181,6 +185,13 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if nrip != nil {
|
||||
c.nri = &nriAPI{
|
||||
cri: c,
|
||||
nri: nrip,
|
||||
}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -249,6 +260,8 @@ func (c *criService) Run() error {
|
||||
}
|
||||
}()
|
||||
|
||||
c.nri.register()
|
||||
|
||||
// Set the server as initialized. GRPC services could start serving traffic.
|
||||
c.initialized.Set()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user