pkg/cri/server: experimental NRI integration for CRI.

Implement the adaptation interface required by the NRI
service plugin to handle CRI sandboxes and containers.
Hook the NRI service plugin into CRI request processing.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
This commit is contained in:
Krisztian Litkey
2021-03-24 17:17:38 +02:00
parent 43704ca888
commit b27ef6f169
20 changed files with 1675 additions and 21 deletions

View File

@@ -25,6 +25,7 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/cri/sbserver"
"github.com/containerd/containerd/pkg/nri"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -46,6 +47,7 @@ func init() {
Requires: []plugin.Type{
plugin.EventPlugin,
plugin.ServicePlugin,
plugin.NRIApiPlugin,
},
InitFn: initCRIService,
})
@@ -85,12 +87,19 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
}
var s server.CRIService
var nrip nri.API
if os.Getenv("ENABLE_CRI_SANDBOXES") != "" {
log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable")
s, err = sbserver.NewCRIService(c, client)
} else {
log.G(ctx).Info("using legacy CRI server")
s, err = server.NewCRIService(c, client)
nrip, err = getNRIPlugin(ic)
if err != nil {
log.G(ctx).Info("NRI service not found, disabling NRI support")
}
s, err = server.NewCRIService(c, client, nrip)
}
if err != nil {
return nil, fmt.Errorf("failed to create CRI service: %w", err)
@@ -102,6 +111,14 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
}
// TODO(random-liu): Whether and how we can stop containerd.
}()
if nrip != nil {
log.G(ctx).Info("using experimental NRI integration - disable nri plugin to prevent this")
if err = nrip.Start(); err != nil {
log.G(ctx).WithError(err).Fatal("Failed to start NRI service")
}
}
return s, nil
}
@@ -128,3 +145,24 @@ func setGLogLevel() error {
}
return nil
}
// Get the NRI plugin and verify its type.
func getNRIPlugin(ic *plugin.InitContext) (nri.API, error) {
const (
pluginType = plugin.NRIApiPlugin
pluginName = "nri"
)
p, err := ic.GetByID(pluginType, pluginName)
if err != nil {
return nil, err
}
api, ok := p.(nri.API)
if !ok {
return nil, fmt.Errorf("NRI plugin (%s, %q) has incompatible type %T",
pluginType, pluginName, api)
}
return api, nil
}

View File

@@ -246,6 +246,19 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
containerd.WithRuntime(sandboxInfo.Runtime.Name, runtimeOptions),
containerd.WithContainerLabels(containerLabels),
containerd.WithContainerExtension(containerMetadataExtension, &meta))
if c.nri.isEnabled() {
opts = append(opts, c.nri.WithContainerAdjustment())
defer func() {
if retErr != nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
c.nri.undoCreateContainer(deferCtx, &sandbox, id, spec)
}
}()
}
var cntr containerd.Container
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
return nil, fmt.Errorf("failed to create containerd container: %w", err)
@@ -284,6 +297,13 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
return nil, fmt.Errorf("failed to add container %q into store: %w", id, err)
}
if c.nri.isEnabled() {
err = c.nri.postCreateContainer(ctx, &sandbox, &container)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-create notification failed")
}
}
containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start)
return &runtime.CreateContainerResponse{ContainerId: id}, nil

View File

@@ -73,6 +73,18 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
}
}()
if c.nri.isEnabled() {
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
err = c.nri.removeContainer(ctx, nil, &container)
} else {
err = c.nri.removeContainer(ctx, &sandbox, &container)
}
if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to remove container")
}
}
// NOTE(random-liu): Docker set container to "Dead" state when start removing the
// container so as to avoid start/restart the container again. However, for current
// kubelet implementation, we'll never start a container once we decide to remove it,

View File

@@ -160,6 +160,24 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
}
}
defer func() {
if retErr != nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
err = c.nri.stopContainer(deferCtx, &sandbox, &cntr)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id)
}
}
}()
if c.nri.isEnabled() {
err = c.nri.startContainer(ctx, &sandbox, &cntr)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI container start failed")
return nil, fmt.Errorf("NRI container start failed: %w", err)
}
}
// Start containerd task.
if err := task.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start containerd task %q: %w", id, err)
@@ -177,6 +195,13 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
// It handles the TaskExit event and update container state after this.
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
if c.nri.isEnabled() {
err = c.nri.postStartContainer(ctx, &sandbox, &cntr)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-start notification failed")
}
}
containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start)
return &runtime.StartContainerResponse{}, nil

View File

@@ -47,6 +47,18 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
return nil, err
}
if c.nri.isEnabled() {
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
err = c.nri.stopContainer(ctx, nil, &container)
} else {
err = c.nri.stopContainer(ctx, &sandbox, &container)
}
if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to stop container")
}
}
i, err := container.Container.Info(ctx)
if err != nil {
return nil, fmt.Errorf("get container info: %w", err)
@@ -78,7 +90,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
}
// Don't return for unknown state, some cleanup needs to be done.
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
return cleanupUnknownContainer(ctx, id, container)
return c.cleanupUnknownContainer(ctx, id, container)
}
return nil
}
@@ -93,7 +105,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
}
return cleanupUnknownContainer(ctx, id, container)
return c.cleanupUnknownContainer(ctx, id, container)
}
exitCtx, exitCancel := context.WithCancel(context.Background())
@@ -196,7 +208,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
}
// cleanupUnknownContainer cleanup stopped container in unknown state.
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
// Reuse handleContainerExit to do the cleanup.
return handleContainerExit(ctx, &eventtypes.TaskExit{
ContainerID: id,
@@ -204,5 +216,5 @@ func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore
Pid: 0,
ExitStatus: unknownExitCode,
ExitedAt: protobuf.ToTimestamp(time.Now()),
}, cntr)
}, cntr, c)
}

View File

@@ -33,6 +33,7 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
)
@@ -42,6 +43,24 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
if err != nil {
return nil, fmt.Errorf("failed to find container: %w", err)
}
var sandbox sandboxstore.Sandbox
if c.nri.isEnabled() {
sandbox, err = c.sandboxStore.Get(container.SandboxID)
if err != nil {
return nil, err
}
resources := r.GetLinux()
updated, err := c.nri.updateContainer(ctx, &sandbox, &container, resources)
if err != nil {
return nil, fmt.Errorf("NRI container update failed: %w", err)
}
if updated != nil {
*resources = *updated
}
}
// Update resources in status update transaction, so that:
// 1) There won't be race condition with container start.
// 2) There won't be concurrent resource update to the same container.
@@ -50,6 +69,14 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
}); err != nil {
return nil, fmt.Errorf("failed to update resources: %w", err)
}
if c.nri.isEnabled() {
err = c.nri.postUpdateContainer(ctx, &sandbox, &container)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-update notification failed")
}
}
return &runtime.UpdateContainerResourcesResponse{}, nil
}

View File

@@ -136,7 +136,7 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
sb, err := em.c.sandboxStore.Get(e.ID)
if err == nil {
if err := handleSandboxExit(dctx, e, sb); err != nil {
if err := handleSandboxExit(dctx, e, sb, em.c); err != nil {
return err
}
return nil
@@ -187,7 +187,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(dctx, e, cntr); err != nil {
if err := handleContainerExit(dctx, e, cntr, em.c); err != nil {
return err
}
return nil
@@ -313,7 +313,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr); err != nil {
if err := handleContainerExit(ctx, e, cntr, em.c); err != nil {
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
}
return nil
@@ -322,7 +322,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
sb, err := em.c.sandboxStore.Get(e.ID)
if err == nil {
if err := handleSandboxExit(ctx, e, sb); err != nil {
if err := handleSandboxExit(ctx, e, sb, em.c); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
@@ -362,7 +362,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error {
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, c *criService) error {
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
@@ -384,7 +384,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), containerd.WithProcessKill); err != nil {
if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), c.nri.WithContainerExit(&cntr), containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container: %w", err)
}
@@ -415,7 +415,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error {
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error {
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {

782
pkg/cri/server/nri-api.go Normal file
View File

@@ -0,0 +1,782 @@
//go:build linux
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"encoding/json"
"fmt"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/cri/annotations"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
"github.com/containerd/typeurl"
"github.com/opencontainers/runtime-spec/specs-go"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/opencontainers/runtime-tools/generate"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/pkg/nri"
"github.com/containerd/nri/pkg/api"
nrigen "github.com/containerd/nri/pkg/runtime-tools/generate"
)
type nriAPI struct {
cri *criService
nri nri.API
}
func (a *nriAPI) register() {
if !a.isEnabled() {
return
}
nri.RegisterDomain(a)
}
func (a *nriAPI) isEnabled() bool {
return a != nil && a.nri != nil && a.nri.IsEnabled()
}
//
// CRI-NRI lifecycle hook interface
//
// These functions are used to hook NRI into the processing of
// the corresponding CRI lifecycle events using the common NRI
// interface.
//
func (a *nriAPI) runPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
pod := a.nriPodSandbox(criPod)
err := a.nri.RunPodSandbox(ctx, pod)
if err != nil {
a.nri.StopPodSandbox(ctx, pod)
a.nri.RemovePodSandbox(ctx, pod)
}
return err
}
func (a *nriAPI) stopPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
pod := a.nriPodSandbox(criPod)
err := a.nri.StopPodSandbox(ctx, pod)
return err
}
func (a *nriAPI) removePodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
pod := a.nriPodSandbox(criPod)
err := a.nri.RemovePodSandbox(ctx, pod)
return err
}
func (a *nriAPI) createContainer(ctx context.Context, ctrs *containers.Container, spec *specs.Spec) (*api.ContainerAdjustment, error) {
ctr := a.nriContainer(ctrs, spec)
criPod, err := a.cri.sandboxStore.Get(ctr.GetPodSandboxID())
if err != nil {
return nil, err
}
pod := a.nriPodSandbox(&criPod)
adjust, err := a.nri.CreateContainer(ctx, pod, ctr)
return adjust, err
}
func (a *nriAPI) postCreateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil)
err := a.nri.PostCreateContainer(ctx, pod, ctr)
return err
}
func (a *nriAPI) startContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil)
err := a.nri.StartContainer(ctx, pod, ctr)
return err
}
func (a *nriAPI) postStartContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil)
err := a.nri.PostStartContainer(ctx, pod, ctr)
return err
}
func (a *nriAPI) updateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container, req *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) {
const noOomAdj = 0
pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil)
r, err := a.nri.UpdateContainer(ctx, pod, ctr, api.FromCRILinuxResources(req))
if err != nil {
return nil, err
}
return r.ToCRI(noOomAdj), nil
}
func (a *nriAPI) postUpdateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil)
err := a.nri.PostUpdateContainer(ctx, pod, ctr)
return err
}
func (a *nriAPI) stopContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
ctr := a.nriContainer(criCtr, nil)
if criPod == nil || criPod.ID == "" {
criPod = &sstore.Sandbox{
Metadata: sstore.Metadata{
ID: ctr.GetPodSandboxID(),
},
}
}
pod := a.nriPodSandbox(criPod)
err := a.nri.StopContainer(ctx, pod, ctr)
return err
}
func (a *nriAPI) notifyContainerExit(ctx context.Context, criCtr *cstore.Container) {
ctr := a.nriContainer(criCtr, nil)
criPod, _ := a.cri.sandboxStore.Get(ctr.GetPodSandboxID())
if criPod.ID == "" {
criPod = sstore.Sandbox{
Metadata: sstore.Metadata{
ID: ctr.GetPodSandboxID(),
},
}
}
pod := a.nriPodSandbox(&criPod)
a.nri.NotifyContainerExit(ctx, pod, ctr)
}
func (a *nriAPI) removeContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil)
err := a.nri.RemoveContainer(ctx, pod, ctr)
return err
}
func (a *nriAPI) undoCreateContainer(ctx context.Context, criPod *sstore.Sandbox, id string, spec *specs.Spec) {
pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(&containers.Container{ID: id}, spec)
err := a.nri.StopContainer(ctx, pod, ctr)
if err != nil {
log.G(ctx).WithError(err).Error("container creation undo (stop) failed")
}
err = a.nri.RemoveContainer(ctx, pod, ctr)
if err != nil {
log.G(ctx).WithError(err).Error("container creation undo (remove) failed")
}
}
func (a *nriAPI) WithContainerAdjustment() containerd.NewContainerOpts {
resourceCheckOpt := nrigen.WithResourceChecker(
func(r *runtimespec.LinuxResources) error {
if r != nil {
if a.cri.config.DisableHugetlbController {
r.HugepageLimits = nil
}
}
return nil
},
)
rdtResolveOpt := nrigen.WithRdtResolver(
func(className string) (*runtimespec.LinuxIntelRdt, error) {
if className == "" {
return nil, nil
}
return &runtimespec.LinuxIntelRdt{
ClosID: className,
}, nil
},
)
blkioResolveOpt := nrigen.WithBlockIOResolver(
func(className string) (*runtimespec.LinuxBlockIO, error) {
if className == "" {
return nil, nil
}
blockIO, err := blockIOToLinuxOci(className)
if err != nil {
return nil, err
}
return blockIO, nil
},
)
return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error {
spec := &specs.Spec{}
if err := json.Unmarshal(c.Spec.GetValue(), spec); err != nil {
return fmt.Errorf("failed to unmarshal container OCI Spec for NRI: %w", err)
}
adjust, err := a.createContainer(ctx, c, spec)
if err != nil {
return fmt.Errorf("failed to get NRI adjustment for container: %w", err)
}
sgen := generate.Generator{Config: spec}
ngen := nrigen.SpecGenerator(&sgen, resourceCheckOpt, rdtResolveOpt, blkioResolveOpt)
err = ngen.Adjust(adjust)
if err != nil {
return fmt.Errorf("failed to NRI-adjust container Spec: %w", err)
}
adjusted, err := typeurl.MarshalAny(spec)
if err != nil {
return fmt.Errorf("failed to marshal NRI-adjusted Spec: %w", err)
}
c.Spec = adjusted
return nil
}
}
func (a *nriAPI) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDeleteOpts {
if !a.isEnabled() {
return func(_ context.Context, _ containerd.Process) error {
return nil
}
}
return func(_ context.Context, _ containerd.Process) error {
a.notifyContainerExit(context.Background(), criCtr)
return nil
}
}
//
// NRI-CRI 'domain' interface
//
// These functions are used to interface CRI pods and containers
// from the common NRI interface. They implement pod and container
// discovery, lookup and updating of container parameters.
//
const (
nriDomain = constants.K8sContainerdNamespace
)
func (a *nriAPI) GetName() string {
return nriDomain
}
func (a *nriAPI) ListPodSandboxes() []nri.PodSandbox {
pods := []nri.PodSandbox{}
for _, pod := range a.cri.sandboxStore.List() {
if pod.Status.Get().State != sstore.StateUnknown {
pod := pod
pods = append(pods, a.nriPodSandbox(&pod))
}
}
return pods
}
func (a *nriAPI) ListContainers() []nri.Container {
containers := []nri.Container{}
for _, ctr := range a.cri.containerStore.List() {
switch ctr.Status.Get().State() {
case cri.ContainerState_CONTAINER_EXITED:
continue
case cri.ContainerState_CONTAINER_UNKNOWN:
continue
}
ctr := ctr
containers = append(containers, a.nriContainer(&ctr, nil))
}
return containers
}
func (a *nriAPI) GetPodSandbox(id string) (nri.PodSandbox, bool) {
pod, err := a.cri.sandboxStore.Get(id)
if err != nil {
return nil, false
}
return a.nriPodSandbox(&pod), true
}
func (a *nriAPI) GetContainer(id string) (nri.Container, bool) {
ctr, err := a.cri.containerStore.Get(id)
if err != nil {
return nil, false
}
return a.nriContainer(&ctr, nil), true
}
func (a *nriAPI) UpdateContainer(ctx context.Context, u *api.ContainerUpdate) error {
ctr, err := a.cri.containerStore.Get(u.ContainerId)
if err != nil {
return nil
}
err = ctr.Status.UpdateSync(
func(status cstore.Status) (cstore.Status, error) {
criReq := &cri.UpdateContainerResourcesRequest{
ContainerId: u.ContainerId,
Linux: u.GetLinux().GetResources().ToCRI(0),
}
newStatus, err := a.cri.updateContainerResources(ctx, ctr, criReq, status)
return newStatus, err
},
)
if err != nil {
if !u.IgnoreFailure {
return err
}
}
return nil
}
func (a *nriAPI) EvictContainer(ctx context.Context, e *api.ContainerEviction) error {
ctr, err := a.cri.containerStore.Get(e.ContainerId)
if err != nil {
return nil
}
err = a.cri.stopContainer(ctx, ctr, 0)
if err != nil {
return err
}
return nil
}
//
// NRI integration wrapper for CRI Pods
//
type criPodSandbox struct {
*sstore.Sandbox
spec *specs.Spec
pid uint32
}
func (a *nriAPI) nriPodSandbox(pod *sstore.Sandbox) *criPodSandbox {
criPod := &criPodSandbox{
Sandbox: pod,
spec: &specs.Spec{},
}
if pod == nil || pod.Container == nil {
return criPod
}
ctx := ctrdutil.NamespacedContext()
task, err := pod.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
log.L.WithError(err).Errorf("failed to get task for sandbox container %s",
pod.Container.ID())
}
return criPod
}
criPod.pid = task.Pid()
spec, err := task.Spec(ctx)
if err != nil {
if err != nil {
log.L.WithError(err).Errorf("failed to get spec for sandbox container %s",
pod.Container.ID())
}
return criPod
}
criPod.spec = spec
return criPod
}
func (p *criPodSandbox) GetDomain() string {
return nriDomain
}
func (p *criPodSandbox) GetID() string {
if p.Sandbox == nil {
return ""
}
return p.ID
}
func (p *criPodSandbox) GetName() string {
if p.Sandbox == nil {
return ""
}
return p.Config.GetMetadata().GetName()
}
func (p *criPodSandbox) GetUID() string {
if p.Sandbox == nil {
return ""
}
return p.Config.GetMetadata().GetUid()
}
func (p *criPodSandbox) GetNamespace() string {
if p.Sandbox == nil {
return ""
}
return p.Config.GetMetadata().GetNamespace()
}
func (p *criPodSandbox) GetAnnotations() map[string]string {
if p.Sandbox == nil {
return nil
}
annotations := map[string]string{}
for key, value := range p.Config.GetAnnotations() {
annotations[key] = value
}
for key, value := range p.spec.Annotations {
annotations[key] = value
}
return annotations
}
func (p *criPodSandbox) GetLabels() map[string]string {
if p.Sandbox == nil {
return nil
}
labels := map[string]string{}
for key, value := range p.Config.GetLabels() {
labels[key] = value
}
if p.Sandbox.Container == nil {
return labels
}
ctx := ctrdutil.NamespacedContext()
ctrd := p.Sandbox.Container
ctrs, err := ctrd.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
log.L.WithError(err).Errorf("failed to get info for sandbox container %s", ctrd.ID())
return labels
}
for key, value := range ctrs.Labels {
labels[key] = value
}
return labels
}
func (p *criPodSandbox) GetRuntimeHandler() string {
if p.Sandbox == nil {
return ""
}
return p.RuntimeHandler
}
func (p *criPodSandbox) GetLinuxPodSandbox() nri.LinuxPodSandbox {
return p
}
func (p *criPodSandbox) GetLinuxNamespaces() []*api.LinuxNamespace {
if p.spec.Linux != nil {
return api.FromOCILinuxNamespaces(p.spec.Linux.Namespaces)
}
return nil
}
func (p *criPodSandbox) GetPodLinuxOverhead() *api.LinuxResources {
if p.Sandbox == nil {
return nil
}
return api.FromCRILinuxResources(p.Config.GetLinux().GetOverhead())
}
func (p *criPodSandbox) GetPodLinuxResources() *api.LinuxResources {
if p.Sandbox == nil {
return nil
}
return api.FromCRILinuxResources(p.Config.GetLinux().GetResources())
}
func (p *criPodSandbox) GetLinuxResources() *api.LinuxResources {
if p.spec.Linux == nil {
return nil
}
return api.FromOCILinuxResources(p.spec.Linux.Resources, nil)
}
func (p *criPodSandbox) GetCgroupParent() string {
if p.Sandbox == nil {
return ""
}
return p.Config.GetLinux().GetCgroupParent()
}
func (p *criPodSandbox) GetCgroupsPath() string {
if p.spec.Linux == nil {
return ""
}
return p.spec.Linux.CgroupsPath
}
func (p *criPodSandbox) GetPid() uint32 {
return p.pid
}
//
// NRI integration wrapper for CRI Containers
//
type criContainer struct {
api *nriAPI
ctrs *containers.Container
spec *specs.Spec
meta *cstore.Metadata
pid uint32
}
func (a *nriAPI) nriContainer(ctr interface{}, spec *specs.Spec) *criContainer {
switch c := ctr.(type) {
case *cstore.Container:
ctx := ctrdutil.NamespacedContext()
pid := uint32(0)
ctrd := c.Container
ctrs, err := ctrd.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
log.L.WithError(err).Errorf("failed to get info for container %s", ctrd.ID())
}
spec, err := ctrd.Spec(ctx)
if err != nil {
log.L.WithError(err).Errorf("failed to get OCI Spec for container %s", ctrd.ID())
spec = &specs.Spec{}
}
task, err := ctrd.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
log.L.WithError(err).Errorf("failed to get task for container %s", ctrd.ID())
}
} else {
pid = task.Pid()
}
return &criContainer{
api: a,
ctrs: &ctrs,
meta: &c.Metadata,
spec: spec,
pid: pid,
}
case *containers.Container:
ctrs := c
meta := &cstore.Metadata{}
if ext := ctrs.Extensions[containerMetadataExtension]; ext != nil {
err := typeurl.UnmarshalTo(ext, meta)
if err != nil {
log.L.WithError(err).Errorf("failed to get metadata for container %s", ctrs.ID)
}
}
return &criContainer{
api: a,
ctrs: ctrs,
meta: meta,
spec: spec,
}
}
log.L.Errorf("can't wrap %T as NRI container", ctr)
return &criContainer{
api: a,
meta: &cstore.Metadata{},
spec: &specs.Spec{},
}
}
func (c *criContainer) GetDomain() string {
return nriDomain
}
func (c *criContainer) GetID() string {
if c.ctrs != nil {
return c.ctrs.ID
}
return ""
}
func (c *criContainer) GetPodSandboxID() string {
return c.spec.Annotations[annotations.SandboxID]
}
func (c *criContainer) GetName() string {
return c.spec.Annotations[annotations.ContainerName]
}
func (c *criContainer) GetState() api.ContainerState {
criCtr, err := c.api.cri.containerStore.Get(c.GetID())
if err != nil {
return api.ContainerState_CONTAINER_UNKNOWN
}
switch criCtr.Status.Get().State() {
case cri.ContainerState_CONTAINER_CREATED:
return api.ContainerState_CONTAINER_CREATED
case cri.ContainerState_CONTAINER_RUNNING:
return api.ContainerState_CONTAINER_RUNNING
case cri.ContainerState_CONTAINER_EXITED:
return api.ContainerState_CONTAINER_STOPPED
}
return api.ContainerState_CONTAINER_UNKNOWN
}
func (c *criContainer) GetLabels() map[string]string {
if c.ctrs == nil {
return nil
}
labels := map[string]string{}
for key, value := range c.ctrs.Labels {
labels[key] = value
}
if c.meta != nil && c.meta.Config != nil {
for key, value := range c.meta.Config.Labels {
labels[key] = value
}
}
return labels
}
func (c *criContainer) GetAnnotations() map[string]string {
annotations := map[string]string{}
for key, value := range c.spec.Annotations {
annotations[key] = value
}
if c.meta != nil && c.meta.Config != nil {
for key, value := range c.meta.Config.Annotations {
annotations[key] = value
}
}
return annotations
}
func (c *criContainer) GetArgs() []string {
if c.spec.Process == nil {
return nil
}
return api.DupStringSlice(c.spec.Process.Args)
}
func (c *criContainer) GetEnv() []string {
if c.spec.Process == nil {
return nil
}
return api.DupStringSlice(c.spec.Process.Env)
}
func (c *criContainer) GetMounts() []*api.Mount {
return api.FromOCIMounts(c.spec.Mounts)
}
func (c *criContainer) GetHooks() *api.Hooks {
return api.FromOCIHooks(c.spec.Hooks)
}
func (c *criContainer) GetLinuxContainer() nri.LinuxContainer {
return c
}
func (c *criContainer) GetLinuxNamespaces() []*api.LinuxNamespace {
if c.spec.Linux == nil {
return nil
}
return api.FromOCILinuxNamespaces(c.spec.Linux.Namespaces)
}
func (c *criContainer) GetLinuxDevices() []*api.LinuxDevice {
if c.spec.Linux == nil {
return nil
}
return api.FromOCILinuxDevices(c.spec.Linux.Devices)
}
func (c *criContainer) GetLinuxResources() *api.LinuxResources {
if c.spec.Linux == nil {
return nil
}
return api.FromOCILinuxResources(c.spec.Linux.Resources, c.spec.Annotations)
}
func (c *criContainer) GetOOMScoreAdj() *int {
if c.spec.Process == nil {
return nil
}
return c.spec.Process.OOMScoreAdj
}
func (c *criContainer) GetCgroupsPath() string {
if c.spec.Linux == nil {
return ""
}
return c.spec.Linux.CgroupsPath
}
func (c *criContainer) GetPid() uint32 {
return c.pid
}

View File

@@ -0,0 +1,134 @@
//go:build !linux
// +build !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/pkg/nri"
"github.com/containerd/nri/pkg/api"
)
type nriAPI struct {
cri *criService
nri nri.API
}
func (a *nriAPI) register() {
}
func (a *nriAPI) isEnabled() bool {
return false
}
//
// CRI-NRI lifecycle hook no-op interface
//
func (*nriAPI) runPodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*nriAPI) stopPodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*nriAPI) removePodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*nriAPI) postCreateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) startContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) postStartContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) stopContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) removeContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) undoCreateContainer(context.Context, *sstore.Sandbox, string, *specs.Spec) {
}
func (*nriAPI) WithContainerAdjustment() containerd.NewContainerOpts {
return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error {
return nil
}
}
func (*nriAPI) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts {
return func(_ context.Context, _ containerd.Process) error {
return nil
}
}
//
// NRI-CRI no-op 'domain' interface
//
const (
nriDomain = constants.K8sContainerdNamespace
)
func (*nriAPI) GetName() string {
return nriDomain
}
func (*nriAPI) ListPodSandboxes() []nri.PodSandbox {
return nil
}
func (*nriAPI) ListContainers() []nri.Container {
return nil
}
func (*nriAPI) GetPodSandbox(string) (nri.PodSandbox, bool) {
return nil, false
}
func (*nriAPI) GetContainer(string) (nri.Container, bool) {
return nil, false
}
func (*nriAPI) UpdateContainer(context.Context, *api.ContainerUpdate) error {
return nil
}
func (*nriAPI) EvictContainer(context.Context, *api.ContainerEviction) error {
return nil
}

View File

@@ -0,0 +1,37 @@
//go:build windows
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (*nriAPI) updateContainer(context.Context, *sstore.Sandbox, *cstore.Container, *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) {
return nil, nil
}
func (*nriAPI) postUpdateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}

View File

@@ -101,6 +101,13 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
log.G(ctx).Tracef("Remove called for sandbox container %q that does not exist", id)
}
if c.nri.isEnabled() {
err = c.nri.removePodSandbox(ctx, &sandbox)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed")
}
}
// Remove sandbox from sandbox store. Note that once the sandbox is successfully
// deleted:
// 1) ListPodSandbox will not include this sandbox.

View File

@@ -373,6 +373,21 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
}
}
if c.nri.isEnabled() {
err = c.nri.runPodSandbox(ctx, &sandbox)
if err != nil {
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
}
defer func() {
if retErr != nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
c.nri.removePodSandbox(deferCtx, &sandbox)
}
}()
}
if err := task.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
}

View File

@@ -82,6 +82,13 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
}
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
if c.nri.isEnabled() {
err := c.nri.stopPodSandbox(ctx, &sandbox)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed")
}
}
// Teardown network for sandbox.
if sandbox.NetNS != nil {
netStop := time.Now()
@@ -120,7 +127,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
}
// Don't return for unknown state, some cleanup needs to be done.
if state == sandboxstore.StateUnknown {
return cleanupUnknownSandbox(ctx, id, sandbox)
return c.cleanupUnknownSandbox(ctx, id, sandbox)
}
return nil
}
@@ -136,7 +143,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task: %w", err)
}
return cleanupUnknownSandbox(ctx, id, sandbox)
return c.cleanupUnknownSandbox(ctx, id, sandbox)
}
exitCtx, exitCancel := context.WithCancel(context.Background())
@@ -190,7 +197,7 @@ func (c *criService) teardownPodNetwork(ctx context.Context, sandbox sandboxstor
}
// cleanupUnknownSandbox cleanup stopped sandbox in unknown state.
func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
func (c *criService) cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error {
// Reuse handleSandboxExit to do the cleanup.
return handleSandboxExit(ctx, &eventtypes.TaskExit{
ContainerID: id,
@@ -198,5 +205,5 @@ func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.
Pid: 0,
ExitStatus: unknownExitCode,
ExitedAt: protobuf.ToTimestamp(time.Now()),
}, sandbox)
}, sandbox, c)
}

View File

@@ -30,6 +30,7 @@ import (
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/pkg/cri/streaming"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/pkg/nri"
"github.com/containerd/containerd/plugin"
runtime_alpha "github.com/containerd/containerd/third_party/k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
cni "github.com/containerd/go-cni"
@@ -67,6 +68,7 @@ type grpcAlphaServices interface {
// CRIService is the interface implement CRI remote service server.
type CRIService interface {
Run() error
// io.Closer is used by containerd to gracefully stop cri service.
io.Closer
Register(*grpc.Server) error
@@ -118,10 +120,12 @@ type criService struct {
// one in-flight fetch request or unpack handler for a given descriptor's
// or chain ID.
unpackDuplicationSuppressor kmutex.KeyedLocker
nri *nriAPI
}
// NewCRIService returns a new instance of CRIService
func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) {
func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.API) (CRIService, error) {
var err error
labels := label.NewStore()
c := &criService{
@@ -181,6 +185,13 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
return nil, err
}
if nrip != nil {
c.nri = &nriAPI{
cri: c,
nri: nrip,
}
}
return c, nil
}
@@ -249,6 +260,8 @@ func (c *criService) Run() error {
}
}()
c.nri.register()
// Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set()