diff --git a/contrib/fuzz/cri_server_fuzzer.go b/contrib/fuzz/cri_server_fuzzer.go index 86ef288fe..f9ba0eb68 100644 --- a/contrib/fuzz/cri_server_fuzzer.go +++ b/contrib/fuzz/cri_server_fuzzer.go @@ -38,7 +38,7 @@ func FuzzCRIServer(data []byte) int { } defer client.Close() - c, err := server.NewCRIService(criconfig.Config{}, client) + c, err := server.NewCRIService(criconfig.Config{}, client, nil) if err != nil { panic(err) } diff --git a/go.mod b/go.mod index 1d4879e1e..2ebde3366 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,9 @@ require ( github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b github.com/opencontainers/runc v1.1.4 github.com/opencontainers/runtime-spec v1.0.3-0.20220825212826-86290f6a00fb + // ATM the runtime-tools commit we need are beyond the latest tag. + // We use a replace to handle that until a new version is tagged. + github.com/opencontainers/runtime-tools v0.9.0 github.com/opencontainers/selinux v1.10.2 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_golang v1.12.1 @@ -107,9 +110,6 @@ require ( github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - // ATM the runtime-tools commit we need are beyond the latest tag. - // We use a replace to handle that until a new version is tagged. - github.com/opencontainers/runtime-tools v0.9.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect diff --git a/pkg/cri/cri.go b/pkg/cri/cri.go index 8509d92c5..a5e01b399 100644 --- a/pkg/cri/cri.go +++ b/pkg/cri/cri.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/cri/sbserver" + "github.com/containerd/containerd/pkg/nri" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" imagespec "github.com/opencontainers/image-spec/specs-go/v1" @@ -46,6 +47,7 @@ func init() { Requires: []plugin.Type{ plugin.EventPlugin, plugin.ServicePlugin, + plugin.NRIApiPlugin, }, InitFn: initCRIService, }) @@ -85,12 +87,19 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { } var s server.CRIService + var nrip nri.API if os.Getenv("ENABLE_CRI_SANDBOXES") != "" { log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable") s, err = sbserver.NewCRIService(c, client) } else { log.G(ctx).Info("using legacy CRI server") - s, err = server.NewCRIService(c, client) + + nrip, err = getNRIPlugin(ic) + if err != nil { + log.G(ctx).Info("NRI service not found, disabling NRI support") + } + + s, err = server.NewCRIService(c, client, nrip) } if err != nil { return nil, fmt.Errorf("failed to create CRI service: %w", err) @@ -102,6 +111,14 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { } // TODO(random-liu): Whether and how we can stop containerd. }() + + if nrip != nil { + log.G(ctx).Info("using experimental NRI integration - disable nri plugin to prevent this") + if err = nrip.Start(); err != nil { + log.G(ctx).WithError(err).Fatal("Failed to start NRI service") + } + } + return s, nil } @@ -128,3 +145,24 @@ func setGLogLevel() error { } return nil } + +// Get the NRI plugin and verify its type. +func getNRIPlugin(ic *plugin.InitContext) (nri.API, error) { + const ( + pluginType = plugin.NRIApiPlugin + pluginName = "nri" + ) + + p, err := ic.GetByID(pluginType, pluginName) + if err != nil { + return nil, err + } + + api, ok := p.(nri.API) + if !ok { + return nil, fmt.Errorf("NRI plugin (%s, %q) has incompatible type %T", + pluginType, pluginName, api) + } + + return api, nil +} diff --git a/pkg/cri/server/container_create.go b/pkg/cri/server/container_create.go index 610be347f..331af8fb1 100644 --- a/pkg/cri/server/container_create.go +++ b/pkg/cri/server/container_create.go @@ -246,6 +246,19 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta containerd.WithRuntime(sandboxInfo.Runtime.Name, runtimeOptions), containerd.WithContainerLabels(containerLabels), containerd.WithContainerExtension(containerMetadataExtension, &meta)) + + if c.nri.isEnabled() { + opts = append(opts, c.nri.WithContainerAdjustment()) + + defer func() { + if retErr != nil { + deferCtx, deferCancel := ctrdutil.DeferContext() + defer deferCancel() + c.nri.undoCreateContainer(deferCtx, &sandbox, id, spec) + } + }() + } + var cntr containerd.Container if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil { return nil, fmt.Errorf("failed to create containerd container: %w", err) @@ -284,6 +297,13 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta return nil, fmt.Errorf("failed to add container %q into store: %w", id, err) } + if c.nri.isEnabled() { + err = c.nri.postCreateContainer(ctx, &sandbox, &container) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI post-create notification failed") + } + } + containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start) return &runtime.CreateContainerResponse{ContainerId: id}, nil diff --git a/pkg/cri/server/container_remove.go b/pkg/cri/server/container_remove.go index 57a80dde1..0a5b03d5f 100644 --- a/pkg/cri/server/container_remove.go +++ b/pkg/cri/server/container_remove.go @@ -73,6 +73,18 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta } }() + if c.nri.isEnabled() { + sandbox, err := c.sandboxStore.Get(container.SandboxID) + if err != nil { + err = c.nri.removeContainer(ctx, nil, &container) + } else { + err = c.nri.removeContainer(ctx, &sandbox, &container) + } + if err != nil { + log.G(ctx).WithError(err).Error("NRI failed to remove container") + } + } + // NOTE(random-liu): Docker set container to "Dead" state when start removing the // container so as to avoid start/restart the container again. However, for current // kubelet implementation, we'll never start a container once we decide to remove it, diff --git a/pkg/cri/server/container_start.go b/pkg/cri/server/container_start.go index 5006ae960..df432048a 100644 --- a/pkg/cri/server/container_start.go +++ b/pkg/cri/server/container_start.go @@ -160,6 +160,24 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain } } + defer func() { + if retErr != nil { + deferCtx, deferCancel := ctrdutil.DeferContext() + defer deferCancel() + err = c.nri.stopContainer(deferCtx, &sandbox, &cntr) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id) + } + } + }() + if c.nri.isEnabled() { + err = c.nri.startContainer(ctx, &sandbox, &cntr) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI container start failed") + return nil, fmt.Errorf("NRI container start failed: %w", err) + } + } + // Start containerd task. if err := task.Start(ctx); err != nil { return nil, fmt.Errorf("failed to start containerd task %q: %w", id, err) @@ -177,6 +195,13 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain // It handles the TaskExit event and update container state after this. c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh) + if c.nri.isEnabled() { + err = c.nri.postStartContainer(ctx, &sandbox, &cntr) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI post-start notification failed") + } + } + containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start) return &runtime.StartContainerResponse{}, nil diff --git a/pkg/cri/server/container_stop.go b/pkg/cri/server/container_stop.go index 46c54d75a..8a1a048d8 100644 --- a/pkg/cri/server/container_stop.go +++ b/pkg/cri/server/container_stop.go @@ -47,6 +47,18 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer return nil, err } + if c.nri.isEnabled() { + sandbox, err := c.sandboxStore.Get(container.SandboxID) + if err != nil { + err = c.nri.stopContainer(ctx, nil, &container) + } else { + err = c.nri.stopContainer(ctx, &sandbox, &container) + } + if err != nil { + log.G(ctx).WithError(err).Error("NRI failed to stop container") + } + } + i, err := container.Container.Info(ctx) if err != nil { return nil, fmt.Errorf("get container info: %w", err) @@ -78,7 +90,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore } // Don't return for unknown state, some cleanup needs to be done. if state == runtime.ContainerState_CONTAINER_UNKNOWN { - return cleanupUnknownContainer(ctx, id, container) + return c.cleanupUnknownContainer(ctx, id, container) } return nil } @@ -93,7 +105,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to wait for task for %q: %w", id, err) } - return cleanupUnknownContainer(ctx, id, container) + return c.cleanupUnknownContainer(ctx, id, container) } exitCtx, exitCancel := context.WithCancel(context.Background()) @@ -196,7 +208,7 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers } // cleanupUnknownContainer cleanup stopped container in unknown state. -func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error { +func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error { // Reuse handleContainerExit to do the cleanup. return handleContainerExit(ctx, &eventtypes.TaskExit{ ContainerID: id, @@ -204,5 +216,5 @@ func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore Pid: 0, ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now()), - }, cntr) + }, cntr, c) } diff --git a/pkg/cri/server/container_update_resources.go b/pkg/cri/server/container_update_resources.go index b3c9f6afe..3ca2bbee5 100644 --- a/pkg/cri/server/container_update_resources.go +++ b/pkg/cri/server/container_update_resources.go @@ -33,6 +33,7 @@ import ( runtime "k8s.io/cri-api/pkg/apis/runtime/v1" containerstore "github.com/containerd/containerd/pkg/cri/store/container" + sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/pkg/cri/util" ) @@ -42,6 +43,24 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up if err != nil { return nil, fmt.Errorf("failed to find container: %w", err) } + + var sandbox sandboxstore.Sandbox + if c.nri.isEnabled() { + sandbox, err = c.sandboxStore.Get(container.SandboxID) + if err != nil { + return nil, err + } + + resources := r.GetLinux() + updated, err := c.nri.updateContainer(ctx, &sandbox, &container, resources) + if err != nil { + return nil, fmt.Errorf("NRI container update failed: %w", err) + } + if updated != nil { + *resources = *updated + } + } + // Update resources in status update transaction, so that: // 1) There won't be race condition with container start. // 2) There won't be concurrent resource update to the same container. @@ -50,6 +69,14 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up }); err != nil { return nil, fmt.Errorf("failed to update resources: %w", err) } + + if c.nri.isEnabled() { + err = c.nri.postUpdateContainer(ctx, &sandbox, &container) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI post-update notification failed") + } + } + return &runtime.UpdateContainerResourcesResponse{}, nil } diff --git a/pkg/cri/server/events.go b/pkg/cri/server/events.go index 85e985d81..2199a97ac 100644 --- a/pkg/cri/server/events.go +++ b/pkg/cri/server/events.go @@ -136,7 +136,7 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, sb, err := em.c.sandboxStore.Get(e.ID) if err == nil { - if err := handleSandboxExit(dctx, e, sb); err != nil { + if err := handleSandboxExit(dctx, e, sb, em.c); err != nil { return err } return nil @@ -187,7 +187,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string cntr, err := em.c.containerStore.Get(e.ID) if err == nil { - if err := handleContainerExit(dctx, e, cntr); err != nil { + if err := handleContainerExit(dctx, e, cntr, em.c); err != nil { return err } return nil @@ -313,7 +313,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { // Use ID instead of ContainerID to rule out TaskExit event for exec. cntr, err := em.c.containerStore.Get(e.ID) if err == nil { - if err := handleContainerExit(ctx, e, cntr); err != nil { + if err := handleContainerExit(ctx, e, cntr, em.c); err != nil { return fmt.Errorf("failed to handle container TaskExit event: %w", err) } return nil @@ -322,7 +322,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { } sb, err := em.c.sandboxStore.Get(e.ID) if err == nil { - if err := handleSandboxExit(ctx, e, sb); err != nil { + if err := handleSandboxExit(ctx, e, sb, em.c); err != nil { return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) } return nil @@ -362,7 +362,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error { } // handleContainerExit handles TaskExit event for container. -func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error { +func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, c *criService) error { // Attach container IO so that `Delete` could cleanup the stream properly. task, err := cntr.Container.Task(ctx, func(*containerdio.FIFOSet) (containerdio.IO, error) { @@ -384,7 +384,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta } } else { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), containerd.WithProcessKill); err != nil { + if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), c.nri.WithContainerExit(&cntr), containerd.WithProcessKill); err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to stop container: %w", err) } @@ -415,7 +415,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta } // handleSandboxExit handles TaskExit event for sandbox. -func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error { +func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error { // No stream attached to sandbox container. task, err := sb.Container.Task(ctx, nil) if err != nil { diff --git a/pkg/cri/server/nri-api.go b/pkg/cri/server/nri-api.go new file mode 100644 index 000000000..3f256ee21 --- /dev/null +++ b/pkg/cri/server/nri-api.go @@ -0,0 +1,782 @@ +//go:build linux +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/cri/annotations" + cstore "github.com/containerd/containerd/pkg/cri/store/container" + sstore "github.com/containerd/containerd/pkg/cri/store/sandbox" + ctrdutil "github.com/containerd/containerd/pkg/cri/util" + "github.com/containerd/typeurl" + "github.com/opencontainers/runtime-spec/specs-go" + runtimespec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/opencontainers/runtime-tools/generate" + cri "k8s.io/cri-api/pkg/apis/runtime/v1" + + "github.com/containerd/containerd/pkg/cri/constants" + "github.com/containerd/containerd/pkg/nri" + + "github.com/containerd/nri/pkg/api" + + nrigen "github.com/containerd/nri/pkg/runtime-tools/generate" +) + +type nriAPI struct { + cri *criService + nri nri.API +} + +func (a *nriAPI) register() { + if !a.isEnabled() { + return + } + + nri.RegisterDomain(a) +} + +func (a *nriAPI) isEnabled() bool { + return a != nil && a.nri != nil && a.nri.IsEnabled() +} + +// +// CRI-NRI lifecycle hook interface +// +// These functions are used to hook NRI into the processing of +// the corresponding CRI lifecycle events using the common NRI +// interface. +// + +func (a *nriAPI) runPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error { + pod := a.nriPodSandbox(criPod) + err := a.nri.RunPodSandbox(ctx, pod) + + if err != nil { + a.nri.StopPodSandbox(ctx, pod) + a.nri.RemovePodSandbox(ctx, pod) + } + + return err +} + +func (a *nriAPI) stopPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error { + pod := a.nriPodSandbox(criPod) + err := a.nri.StopPodSandbox(ctx, pod) + + return err +} + +func (a *nriAPI) removePodSandbox(ctx context.Context, criPod *sstore.Sandbox) error { + pod := a.nriPodSandbox(criPod) + + err := a.nri.RemovePodSandbox(ctx, pod) + + return err +} + +func (a *nriAPI) createContainer(ctx context.Context, ctrs *containers.Container, spec *specs.Spec) (*api.ContainerAdjustment, error) { + ctr := a.nriContainer(ctrs, spec) + + criPod, err := a.cri.sandboxStore.Get(ctr.GetPodSandboxID()) + if err != nil { + return nil, err + } + + pod := a.nriPodSandbox(&criPod) + + adjust, err := a.nri.CreateContainer(ctx, pod, ctr) + + return adjust, err +} + +func (a *nriAPI) postCreateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { + pod := a.nriPodSandbox(criPod) + ctr := a.nriContainer(criCtr, nil) + + err := a.nri.PostCreateContainer(ctx, pod, ctr) + + return err +} + +func (a *nriAPI) startContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { + pod := a.nriPodSandbox(criPod) + ctr := a.nriContainer(criCtr, nil) + + err := a.nri.StartContainer(ctx, pod, ctr) + + return err +} + +func (a *nriAPI) postStartContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { + pod := a.nriPodSandbox(criPod) + ctr := a.nriContainer(criCtr, nil) + + err := a.nri.PostStartContainer(ctx, pod, ctr) + + return err +} + +func (a *nriAPI) updateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container, req *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) { + const noOomAdj = 0 + + pod := a.nriPodSandbox(criPod) + ctr := a.nriContainer(criCtr, nil) + + r, err := a.nri.UpdateContainer(ctx, pod, ctr, api.FromCRILinuxResources(req)) + if err != nil { + return nil, err + } + + return r.ToCRI(noOomAdj), nil +} + +func (a *nriAPI) postUpdateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { + pod := a.nriPodSandbox(criPod) + ctr := a.nriContainer(criCtr, nil) + + err := a.nri.PostUpdateContainer(ctx, pod, ctr) + + return err +} + +func (a *nriAPI) stopContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { + ctr := a.nriContainer(criCtr, nil) + + if criPod == nil || criPod.ID == "" { + criPod = &sstore.Sandbox{ + Metadata: sstore.Metadata{ + ID: ctr.GetPodSandboxID(), + }, + } + } + pod := a.nriPodSandbox(criPod) + + err := a.nri.StopContainer(ctx, pod, ctr) + + return err +} + +func (a *nriAPI) notifyContainerExit(ctx context.Context, criCtr *cstore.Container) { + ctr := a.nriContainer(criCtr, nil) + + criPod, _ := a.cri.sandboxStore.Get(ctr.GetPodSandboxID()) + if criPod.ID == "" { + criPod = sstore.Sandbox{ + Metadata: sstore.Metadata{ + ID: ctr.GetPodSandboxID(), + }, + } + } + pod := a.nriPodSandbox(&criPod) + + a.nri.NotifyContainerExit(ctx, pod, ctr) +} + +func (a *nriAPI) removeContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { + pod := a.nriPodSandbox(criPod) + ctr := a.nriContainer(criCtr, nil) + + err := a.nri.RemoveContainer(ctx, pod, ctr) + + return err +} + +func (a *nriAPI) undoCreateContainer(ctx context.Context, criPod *sstore.Sandbox, id string, spec *specs.Spec) { + pod := a.nriPodSandbox(criPod) + ctr := a.nriContainer(&containers.Container{ID: id}, spec) + + err := a.nri.StopContainer(ctx, pod, ctr) + if err != nil { + log.G(ctx).WithError(err).Error("container creation undo (stop) failed") + } + + err = a.nri.RemoveContainer(ctx, pod, ctr) + if err != nil { + log.G(ctx).WithError(err).Error("container creation undo (remove) failed") + } +} + +func (a *nriAPI) WithContainerAdjustment() containerd.NewContainerOpts { + resourceCheckOpt := nrigen.WithResourceChecker( + func(r *runtimespec.LinuxResources) error { + if r != nil { + if a.cri.config.DisableHugetlbController { + r.HugepageLimits = nil + } + } + return nil + }, + ) + + rdtResolveOpt := nrigen.WithRdtResolver( + func(className string) (*runtimespec.LinuxIntelRdt, error) { + if className == "" { + return nil, nil + } + return &runtimespec.LinuxIntelRdt{ + ClosID: className, + }, nil + }, + ) + + blkioResolveOpt := nrigen.WithBlockIOResolver( + func(className string) (*runtimespec.LinuxBlockIO, error) { + if className == "" { + return nil, nil + } + blockIO, err := blockIOToLinuxOci(className) + if err != nil { + return nil, err + } + return blockIO, nil + }, + ) + + return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error { + spec := &specs.Spec{} + if err := json.Unmarshal(c.Spec.GetValue(), spec); err != nil { + return fmt.Errorf("failed to unmarshal container OCI Spec for NRI: %w", err) + } + + adjust, err := a.createContainer(ctx, c, spec) + if err != nil { + return fmt.Errorf("failed to get NRI adjustment for container: %w", err) + } + + sgen := generate.Generator{Config: spec} + ngen := nrigen.SpecGenerator(&sgen, resourceCheckOpt, rdtResolveOpt, blkioResolveOpt) + + err = ngen.Adjust(adjust) + if err != nil { + return fmt.Errorf("failed to NRI-adjust container Spec: %w", err) + } + + adjusted, err := typeurl.MarshalAny(spec) + if err != nil { + return fmt.Errorf("failed to marshal NRI-adjusted Spec: %w", err) + } + + c.Spec = adjusted + return nil + } +} + +func (a *nriAPI) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDeleteOpts { + if !a.isEnabled() { + return func(_ context.Context, _ containerd.Process) error { + return nil + } + } + + return func(_ context.Context, _ containerd.Process) error { + a.notifyContainerExit(context.Background(), criCtr) + return nil + } +} + +// +// NRI-CRI 'domain' interface +// +// These functions are used to interface CRI pods and containers +// from the common NRI interface. They implement pod and container +// discovery, lookup and updating of container parameters. +// + +const ( + nriDomain = constants.K8sContainerdNamespace +) + +func (a *nriAPI) GetName() string { + return nriDomain +} + +func (a *nriAPI) ListPodSandboxes() []nri.PodSandbox { + pods := []nri.PodSandbox{} + for _, pod := range a.cri.sandboxStore.List() { + if pod.Status.Get().State != sstore.StateUnknown { + pod := pod + pods = append(pods, a.nriPodSandbox(&pod)) + } + } + return pods +} + +func (a *nriAPI) ListContainers() []nri.Container { + containers := []nri.Container{} + for _, ctr := range a.cri.containerStore.List() { + switch ctr.Status.Get().State() { + case cri.ContainerState_CONTAINER_EXITED: + continue + case cri.ContainerState_CONTAINER_UNKNOWN: + continue + } + ctr := ctr + containers = append(containers, a.nriContainer(&ctr, nil)) + } + return containers +} + +func (a *nriAPI) GetPodSandbox(id string) (nri.PodSandbox, bool) { + pod, err := a.cri.sandboxStore.Get(id) + if err != nil { + return nil, false + } + + return a.nriPodSandbox(&pod), true +} + +func (a *nriAPI) GetContainer(id string) (nri.Container, bool) { + ctr, err := a.cri.containerStore.Get(id) + if err != nil { + return nil, false + } + + return a.nriContainer(&ctr, nil), true +} + +func (a *nriAPI) UpdateContainer(ctx context.Context, u *api.ContainerUpdate) error { + ctr, err := a.cri.containerStore.Get(u.ContainerId) + if err != nil { + return nil + } + + err = ctr.Status.UpdateSync( + func(status cstore.Status) (cstore.Status, error) { + criReq := &cri.UpdateContainerResourcesRequest{ + ContainerId: u.ContainerId, + Linux: u.GetLinux().GetResources().ToCRI(0), + } + newStatus, err := a.cri.updateContainerResources(ctx, ctr, criReq, status) + return newStatus, err + }, + ) + if err != nil { + if !u.IgnoreFailure { + return err + } + } + + return nil +} + +func (a *nriAPI) EvictContainer(ctx context.Context, e *api.ContainerEviction) error { + ctr, err := a.cri.containerStore.Get(e.ContainerId) + if err != nil { + return nil + } + err = a.cri.stopContainer(ctx, ctr, 0) + if err != nil { + return err + } + + return nil +} + +// +// NRI integration wrapper for CRI Pods +// + +type criPodSandbox struct { + *sstore.Sandbox + spec *specs.Spec + pid uint32 +} + +func (a *nriAPI) nriPodSandbox(pod *sstore.Sandbox) *criPodSandbox { + criPod := &criPodSandbox{ + Sandbox: pod, + spec: &specs.Spec{}, + } + + if pod == nil || pod.Container == nil { + return criPod + } + + ctx := ctrdutil.NamespacedContext() + task, err := pod.Container.Task(ctx, nil) + if err != nil { + if !errdefs.IsNotFound(err) { + log.L.WithError(err).Errorf("failed to get task for sandbox container %s", + pod.Container.ID()) + } + return criPod + } + + criPod.pid = task.Pid() + spec, err := task.Spec(ctx) + if err != nil { + if err != nil { + log.L.WithError(err).Errorf("failed to get spec for sandbox container %s", + pod.Container.ID()) + } + return criPod + } + + criPod.spec = spec + + return criPod +} + +func (p *criPodSandbox) GetDomain() string { + return nriDomain +} + +func (p *criPodSandbox) GetID() string { + if p.Sandbox == nil { + return "" + } + return p.ID +} + +func (p *criPodSandbox) GetName() string { + if p.Sandbox == nil { + return "" + } + return p.Config.GetMetadata().GetName() +} + +func (p *criPodSandbox) GetUID() string { + if p.Sandbox == nil { + return "" + } + return p.Config.GetMetadata().GetUid() +} + +func (p *criPodSandbox) GetNamespace() string { + if p.Sandbox == nil { + return "" + } + return p.Config.GetMetadata().GetNamespace() +} + +func (p *criPodSandbox) GetAnnotations() map[string]string { + if p.Sandbox == nil { + return nil + } + + annotations := map[string]string{} + + for key, value := range p.Config.GetAnnotations() { + annotations[key] = value + } + for key, value := range p.spec.Annotations { + annotations[key] = value + } + + return annotations +} + +func (p *criPodSandbox) GetLabels() map[string]string { + if p.Sandbox == nil { + return nil + } + + labels := map[string]string{} + + for key, value := range p.Config.GetLabels() { + labels[key] = value + } + + if p.Sandbox.Container == nil { + return labels + } + + ctx := ctrdutil.NamespacedContext() + ctrd := p.Sandbox.Container + ctrs, err := ctrd.Info(ctx, containerd.WithoutRefreshedMetadata) + if err != nil { + log.L.WithError(err).Errorf("failed to get info for sandbox container %s", ctrd.ID()) + return labels + } + + for key, value := range ctrs.Labels { + labels[key] = value + } + + return labels +} + +func (p *criPodSandbox) GetRuntimeHandler() string { + if p.Sandbox == nil { + return "" + } + return p.RuntimeHandler +} + +func (p *criPodSandbox) GetLinuxPodSandbox() nri.LinuxPodSandbox { + return p +} + +func (p *criPodSandbox) GetLinuxNamespaces() []*api.LinuxNamespace { + if p.spec.Linux != nil { + return api.FromOCILinuxNamespaces(p.spec.Linux.Namespaces) + } + return nil +} + +func (p *criPodSandbox) GetPodLinuxOverhead() *api.LinuxResources { + if p.Sandbox == nil { + return nil + } + return api.FromCRILinuxResources(p.Config.GetLinux().GetOverhead()) +} +func (p *criPodSandbox) GetPodLinuxResources() *api.LinuxResources { + if p.Sandbox == nil { + return nil + } + return api.FromCRILinuxResources(p.Config.GetLinux().GetResources()) +} + +func (p *criPodSandbox) GetLinuxResources() *api.LinuxResources { + if p.spec.Linux == nil { + return nil + } + return api.FromOCILinuxResources(p.spec.Linux.Resources, nil) +} + +func (p *criPodSandbox) GetCgroupParent() string { + if p.Sandbox == nil { + return "" + } + return p.Config.GetLinux().GetCgroupParent() +} + +func (p *criPodSandbox) GetCgroupsPath() string { + if p.spec.Linux == nil { + return "" + } + return p.spec.Linux.CgroupsPath +} + +func (p *criPodSandbox) GetPid() uint32 { + return p.pid +} + +// +// NRI integration wrapper for CRI Containers +// + +type criContainer struct { + api *nriAPI + ctrs *containers.Container + spec *specs.Spec + meta *cstore.Metadata + pid uint32 +} + +func (a *nriAPI) nriContainer(ctr interface{}, spec *specs.Spec) *criContainer { + switch c := ctr.(type) { + case *cstore.Container: + ctx := ctrdutil.NamespacedContext() + pid := uint32(0) + ctrd := c.Container + ctrs, err := ctrd.Info(ctx, containerd.WithoutRefreshedMetadata) + if err != nil { + log.L.WithError(err).Errorf("failed to get info for container %s", ctrd.ID()) + } + spec, err := ctrd.Spec(ctx) + if err != nil { + log.L.WithError(err).Errorf("failed to get OCI Spec for container %s", ctrd.ID()) + spec = &specs.Spec{} + } + task, err := ctrd.Task(ctx, nil) + if err != nil { + if !errdefs.IsNotFound(err) { + log.L.WithError(err).Errorf("failed to get task for container %s", ctrd.ID()) + } + } else { + pid = task.Pid() + } + + return &criContainer{ + api: a, + ctrs: &ctrs, + meta: &c.Metadata, + spec: spec, + pid: pid, + } + + case *containers.Container: + ctrs := c + meta := &cstore.Metadata{} + if ext := ctrs.Extensions[containerMetadataExtension]; ext != nil { + err := typeurl.UnmarshalTo(ext, meta) + if err != nil { + log.L.WithError(err).Errorf("failed to get metadata for container %s", ctrs.ID) + } + } + + return &criContainer{ + api: a, + ctrs: ctrs, + meta: meta, + spec: spec, + } + } + + log.L.Errorf("can't wrap %T as NRI container", ctr) + return &criContainer{ + api: a, + meta: &cstore.Metadata{}, + spec: &specs.Spec{}, + } +} + +func (c *criContainer) GetDomain() string { + return nriDomain +} + +func (c *criContainer) GetID() string { + if c.ctrs != nil { + return c.ctrs.ID + } + return "" +} + +func (c *criContainer) GetPodSandboxID() string { + return c.spec.Annotations[annotations.SandboxID] +} + +func (c *criContainer) GetName() string { + return c.spec.Annotations[annotations.ContainerName] +} + +func (c *criContainer) GetState() api.ContainerState { + criCtr, err := c.api.cri.containerStore.Get(c.GetID()) + if err != nil { + return api.ContainerState_CONTAINER_UNKNOWN + } + switch criCtr.Status.Get().State() { + case cri.ContainerState_CONTAINER_CREATED: + return api.ContainerState_CONTAINER_CREATED + case cri.ContainerState_CONTAINER_RUNNING: + return api.ContainerState_CONTAINER_RUNNING + case cri.ContainerState_CONTAINER_EXITED: + return api.ContainerState_CONTAINER_STOPPED + } + + return api.ContainerState_CONTAINER_UNKNOWN +} + +func (c *criContainer) GetLabels() map[string]string { + if c.ctrs == nil { + return nil + } + + labels := map[string]string{} + for key, value := range c.ctrs.Labels { + labels[key] = value + } + + if c.meta != nil && c.meta.Config != nil { + for key, value := range c.meta.Config.Labels { + labels[key] = value + } + } + + return labels +} + +func (c *criContainer) GetAnnotations() map[string]string { + annotations := map[string]string{} + + for key, value := range c.spec.Annotations { + annotations[key] = value + } + if c.meta != nil && c.meta.Config != nil { + for key, value := range c.meta.Config.Annotations { + annotations[key] = value + } + } + + return annotations +} + +func (c *criContainer) GetArgs() []string { + if c.spec.Process == nil { + return nil + } + return api.DupStringSlice(c.spec.Process.Args) +} + +func (c *criContainer) GetEnv() []string { + if c.spec.Process == nil { + return nil + } + return api.DupStringSlice(c.spec.Process.Env) +} + +func (c *criContainer) GetMounts() []*api.Mount { + return api.FromOCIMounts(c.spec.Mounts) +} + +func (c *criContainer) GetHooks() *api.Hooks { + return api.FromOCIHooks(c.spec.Hooks) +} + +func (c *criContainer) GetLinuxContainer() nri.LinuxContainer { + return c +} + +func (c *criContainer) GetLinuxNamespaces() []*api.LinuxNamespace { + if c.spec.Linux == nil { + return nil + } + return api.FromOCILinuxNamespaces(c.spec.Linux.Namespaces) +} + +func (c *criContainer) GetLinuxDevices() []*api.LinuxDevice { + if c.spec.Linux == nil { + return nil + } + return api.FromOCILinuxDevices(c.spec.Linux.Devices) +} + +func (c *criContainer) GetLinuxResources() *api.LinuxResources { + if c.spec.Linux == nil { + return nil + } + return api.FromOCILinuxResources(c.spec.Linux.Resources, c.spec.Annotations) +} + +func (c *criContainer) GetOOMScoreAdj() *int { + if c.spec.Process == nil { + return nil + } + return c.spec.Process.OOMScoreAdj +} + +func (c *criContainer) GetCgroupsPath() string { + if c.spec.Linux == nil { + return "" + } + return c.spec.Linux.CgroupsPath +} + +func (c *criContainer) GetPid() uint32 { + return c.pid +} diff --git a/pkg/cri/server/nri-api_other.go b/pkg/cri/server/nri-api_other.go new file mode 100644 index 000000000..382299e18 --- /dev/null +++ b/pkg/cri/server/nri-api_other.go @@ -0,0 +1,134 @@ +//go:build !linux +// +build !linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "context" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/containers" + cstore "github.com/containerd/containerd/pkg/cri/store/container" + sstore "github.com/containerd/containerd/pkg/cri/store/sandbox" + "github.com/opencontainers/runtime-spec/specs-go" + + "github.com/containerd/containerd/pkg/cri/constants" + "github.com/containerd/containerd/pkg/nri" + + "github.com/containerd/nri/pkg/api" +) + +type nriAPI struct { + cri *criService + nri nri.API +} + +func (a *nriAPI) register() { +} + +func (a *nriAPI) isEnabled() bool { + return false +} + +// +// CRI-NRI lifecycle hook no-op interface +// + +func (*nriAPI) runPodSandbox(context.Context, *sstore.Sandbox) error { + return nil +} + +func (*nriAPI) stopPodSandbox(context.Context, *sstore.Sandbox) error { + return nil +} + +func (*nriAPI) removePodSandbox(context.Context, *sstore.Sandbox) error { + return nil +} + +func (*nriAPI) postCreateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error { + return nil +} + +func (*nriAPI) startContainer(context.Context, *sstore.Sandbox, *cstore.Container) error { + return nil +} + +func (*nriAPI) postStartContainer(context.Context, *sstore.Sandbox, *cstore.Container) error { + return nil +} + +func (*nriAPI) stopContainer(context.Context, *sstore.Sandbox, *cstore.Container) error { + return nil +} + +func (*nriAPI) removeContainer(context.Context, *sstore.Sandbox, *cstore.Container) error { + return nil +} + +func (*nriAPI) undoCreateContainer(context.Context, *sstore.Sandbox, string, *specs.Spec) { +} + +func (*nriAPI) WithContainerAdjustment() containerd.NewContainerOpts { + return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error { + return nil + } +} + +func (*nriAPI) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts { + return func(_ context.Context, _ containerd.Process) error { + return nil + } +} + +// +// NRI-CRI no-op 'domain' interface +// + +const ( + nriDomain = constants.K8sContainerdNamespace +) + +func (*nriAPI) GetName() string { + return nriDomain +} + +func (*nriAPI) ListPodSandboxes() []nri.PodSandbox { + return nil +} + +func (*nriAPI) ListContainers() []nri.Container { + return nil +} + +func (*nriAPI) GetPodSandbox(string) (nri.PodSandbox, bool) { + return nil, false +} + +func (*nriAPI) GetContainer(string) (nri.Container, bool) { + return nil, false +} + +func (*nriAPI) UpdateContainer(context.Context, *api.ContainerUpdate) error { + return nil +} + +func (*nriAPI) EvictContainer(context.Context, *api.ContainerEviction) error { + return nil +} diff --git a/pkg/cri/server/nri-api_windows.go b/pkg/cri/server/nri-api_windows.go new file mode 100644 index 000000000..bc83cd76b --- /dev/null +++ b/pkg/cri/server/nri-api_windows.go @@ -0,0 +1,37 @@ +//go:build windows +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "context" + + cstore "github.com/containerd/containerd/pkg/cri/store/container" + sstore "github.com/containerd/containerd/pkg/cri/store/sandbox" + + cri "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +func (*nriAPI) updateContainer(context.Context, *sstore.Sandbox, *cstore.Container, *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) { + return nil, nil +} + +func (*nriAPI) postUpdateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error { + return nil +} diff --git a/pkg/cri/server/sandbox_remove.go b/pkg/cri/server/sandbox_remove.go index c479cf56f..0461e9829 100644 --- a/pkg/cri/server/sandbox_remove.go +++ b/pkg/cri/server/sandbox_remove.go @@ -101,6 +101,13 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS log.G(ctx).Tracef("Remove called for sandbox container %q that does not exist", id) } + if c.nri.isEnabled() { + err = c.nri.removePodSandbox(ctx, &sandbox) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed") + } + } + // Remove sandbox from sandbox store. Note that once the sandbox is successfully // deleted: // 1) ListPodSandbox will not include this sandbox. diff --git a/pkg/cri/server/sandbox_run.go b/pkg/cri/server/sandbox_run.go index 9543c034e..e085352de 100644 --- a/pkg/cri/server/sandbox_run.go +++ b/pkg/cri/server/sandbox_run.go @@ -373,6 +373,21 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox } } + if c.nri.isEnabled() { + err = c.nri.runPodSandbox(ctx, &sandbox) + if err != nil { + return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err) + } + + defer func() { + if retErr != nil { + deferCtx, deferCancel := ctrdutil.DeferContext() + defer deferCancel() + c.nri.removePodSandbox(deferCtx, &sandbox) + } + }() + } + if err := task.Start(ctx); err != nil { return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) } diff --git a/pkg/cri/server/sandbox_stop.go b/pkg/cri/server/sandbox_stop.go index c9d3c0ca9..84bf7a3c3 100644 --- a/pkg/cri/server/sandbox_stop.go +++ b/pkg/cri/server/sandbox_stop.go @@ -82,6 +82,13 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa } sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop) + if c.nri.isEnabled() { + err := c.nri.stopPodSandbox(ctx, &sandbox) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed") + } + } + // Teardown network for sandbox. if sandbox.NetNS != nil { netStop := time.Now() @@ -120,7 +127,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst } // Don't return for unknown state, some cleanup needs to be done. if state == sandboxstore.StateUnknown { - return cleanupUnknownSandbox(ctx, id, sandbox) + return c.cleanupUnknownSandbox(ctx, id, sandbox) } return nil } @@ -136,7 +143,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to wait for task: %w", err) } - return cleanupUnknownSandbox(ctx, id, sandbox) + return c.cleanupUnknownSandbox(ctx, id, sandbox) } exitCtx, exitCancel := context.WithCancel(context.Background()) @@ -190,7 +197,7 @@ func (c *criService) teardownPodNetwork(ctx context.Context, sandbox sandboxstor } // cleanupUnknownSandbox cleanup stopped sandbox in unknown state. -func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error { +func (c *criService) cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore.Sandbox) error { // Reuse handleSandboxExit to do the cleanup. return handleSandboxExit(ctx, &eventtypes.TaskExit{ ContainerID: id, @@ -198,5 +205,5 @@ func cleanupUnknownSandbox(ctx context.Context, id string, sandbox sandboxstore. Pid: 0, ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now()), - }, sandbox) + }, sandbox, c) } diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index 0919b9d96..c4da206c8 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd/oci" "github.com/containerd/containerd/pkg/cri/streaming" "github.com/containerd/containerd/pkg/kmutex" + "github.com/containerd/containerd/pkg/nri" "github.com/containerd/containerd/plugin" runtime_alpha "github.com/containerd/containerd/third_party/k8s.io/cri-api/pkg/apis/runtime/v1alpha2" cni "github.com/containerd/go-cni" @@ -67,6 +68,7 @@ type grpcAlphaServices interface { // CRIService is the interface implement CRI remote service server. type CRIService interface { Run() error + // io.Closer is used by containerd to gracefully stop cri service. io.Closer Register(*grpc.Server) error @@ -118,10 +120,12 @@ type criService struct { // one in-flight fetch request or unpack handler for a given descriptor's // or chain ID. unpackDuplicationSuppressor kmutex.KeyedLocker + + nri *nriAPI } // NewCRIService returns a new instance of CRIService -func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) { +func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.API) (CRIService, error) { var err error labels := label.NewStore() c := &criService{ @@ -181,6 +185,13 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi return nil, err } + if nrip != nil { + c.nri = &nriAPI{ + cri: c, + nri: nrip, + } + } + return c, nil } @@ -249,6 +260,8 @@ func (c *criService) Run() error { } }() + c.nri.register() + // Set the server as initialized. GRPC services could start serving traffic. c.initialized.Set() diff --git a/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/generate.go b/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/generate.go new file mode 100644 index 000000000..81f4c1e7f --- /dev/null +++ b/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/generate.go @@ -0,0 +1,422 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package generate + +import ( + "fmt" + "strings" + + rspec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/opencontainers/runtime-tools/generate" + + nri "github.com/containerd/nri/pkg/api" +) + +// GeneratorOption is an option for Generator(). +type GeneratorOption func(*Generator) + +// Generator extends a stock runtime-tools Generator and extends it with +// a few functions for handling NRI container adjustment. +type Generator struct { + *generate.Generator + filterLabels func(map[string]string) (map[string]string, error) + filterAnnotations func(map[string]string) (map[string]string, error) + resolveBlockIO func(string) (*rspec.LinuxBlockIO, error) + resolveRdt func(string) (*rspec.LinuxIntelRdt, error) + checkResources func(*rspec.LinuxResources) error +} + +// SpecGenerator returns a wrapped OCI Spec Generator. +func SpecGenerator(gg *generate.Generator, opts ...GeneratorOption) *Generator { + g := &Generator{ + Generator: gg, + } + g.filterLabels = nopFilter + g.filterAnnotations = nopFilter + for _, o := range opts { + o(g) + } + return g +} + +// WithLabelFilter provides an option for filtering or rejecting labels. +func WithLabelFilter(fn func(map[string]string) (map[string]string, error)) GeneratorOption { + return func(g *Generator) { + g.filterLabels = fn + } +} + +// WithAnnotationFilter provides an option for filtering or rejecting annotations. +func WithAnnotationFilter(fn func(map[string]string) (map[string]string, error)) GeneratorOption { + return func(g *Generator) { + g.filterAnnotations = fn + } +} + +// WithBlockIOResolver specifies a function for resolving Block I/O classes by name. +func WithBlockIOResolver(fn func(string) (*rspec.LinuxBlockIO, error)) GeneratorOption { + return func(g *Generator) { + g.resolveBlockIO = fn + } +} + +// WithRdtResolver specifies a function for resolving RDT classes by name. +func WithRdtResolver(fn func(string) (*rspec.LinuxIntelRdt, error)) GeneratorOption { + return func(g *Generator) { + g.resolveRdt = fn + } +} + +// WithResourceChecker specifies a function to perform final resource adjustment. +func WithResourceChecker(fn func(*rspec.LinuxResources) error) GeneratorOption { + return func(g *Generator) { + g.checkResources = fn + } +} + +// Adjust adjusts all aspects of the OCI Spec that NRI knows/cares about. +func (g *Generator) Adjust(adjust *nri.ContainerAdjustment) error { + if adjust == nil { + return nil + } + + if err := g.AdjustAnnotations(adjust.GetAnnotations()); err != nil { + return fmt.Errorf("failed to adjust annotations in OCI Spec: %w", err) + } + g.AdjustEnv(adjust.GetEnv()) + g.AdjustHooks(adjust.GetHooks()) + g.AdjustDevices(adjust.GetLinux().GetDevices()) + g.AdjustCgroupsPath(adjust.GetLinux().GetCgroupsPath()) + + resources := adjust.GetLinux().GetResources() + if err := g.AdjustResources(resources); err != nil { + return err + } + if err := g.AdjustBlockIOClass(resources.GetBlockioClass().Get()); err != nil { + return err + } + if err := g.AdjustRdtClass(resources.GetRdtClass().Get()); err != nil { + return err + } + + if err := g.AdjustMounts(adjust.GetMounts()); err != nil { + return err + } + + return nil +} + +// AdjustEnv adjusts the environment of the OCI Spec. +func (g *Generator) AdjustEnv(env []*nri.KeyValue) { + mod := map[string]*nri.KeyValue{} + + for _, e := range env { + key, _ := nri.IsMarkedForRemoval(e.Key) + mod[key] = e + } + + // first modify existing environment + if len(mod) > 0 && g.Config != nil && g.Config.Process != nil { + old := g.Config.Process.Env + g.ClearProcessEnv() + for _, e := range old { + keyval := strings.SplitN(e, "=", 2) + if len(keyval) < 2 { + continue + } + if m, ok := mod[keyval[0]]; ok { + delete(mod, keyval[0]) + if _, marked := m.IsMarkedForRemoval(); !marked { + g.AddProcessEnv(m.Key, m.Value) + } + continue + } + g.AddProcessEnv(keyval[0], keyval[1]) + } + } + + // then append remaining unprocessed adjustments (new variables) + for _, e := range env { + if _, marked := e.IsMarkedForRemoval(); marked { + continue + } + if _, ok := mod[e.Key]; ok { + g.AddProcessEnv(e.Key, e.Value) + } + } +} + +// AdjustAnnotations adjusts the annotations in the OCI Spec. +func (g *Generator) AdjustAnnotations(annotations map[string]string) error { + var err error + + if annotations, err = g.filterAnnotations(annotations); err != nil { + return err + } + for k, v := range annotations { + if key, marked := nri.IsMarkedForRemoval(k); marked { + g.RemoveAnnotation(key) + } else { + g.AddAnnotation(k, v) + } + } + + return nil +} + +// AdjustHooks adjusts the OCI hooks in the OCI Spec. +func (g *Generator) AdjustHooks(hooks *nri.Hooks) { + if hooks == nil { + return + } + for _, h := range hooks.Prestart { + g.AddPreStartHook(h.ToOCI()) + } + for _, h := range hooks.Poststart { + g.AddPostStartHook(h.ToOCI()) + } + for _, h := range hooks.Poststop { + g.AddPostStopHook(h.ToOCI()) + } + for _, h := range hooks.CreateRuntime { + g.AddCreateRuntimeHook(h.ToOCI()) + } + for _, h := range hooks.CreateContainer { + g.AddCreateContainerHook(h.ToOCI()) + } + for _, h := range hooks.StartContainer { + g.AddStartContainerHook(h.ToOCI()) + } +} + +// AdjustResources adjusts the (Linux) resources in the OCI Spec. +func (g *Generator) AdjustResources(r *nri.LinuxResources) error { + if r == nil { + return nil + } + + g.initConfigLinux() + + if r.Cpu != nil { + g.SetLinuxResourcesCPUPeriod(r.Cpu.GetPeriod().GetValue()) + g.SetLinuxResourcesCPUQuota(r.Cpu.GetQuota().GetValue()) + g.SetLinuxResourcesCPUShares(r.Cpu.GetShares().GetValue()) + g.SetLinuxResourcesCPUCpus(r.Cpu.GetCpus()) + g.SetLinuxResourcesCPUMems(r.Cpu.GetMems()) + g.SetLinuxResourcesCPURealtimeRuntime(r.Cpu.GetRealtimeRuntime().GetValue()) + g.SetLinuxResourcesCPURealtimePeriod(r.Cpu.GetRealtimePeriod().GetValue()) + } + if r.Memory != nil { + if l := r.Memory.GetLimit().GetValue(); l != 0 { + g.SetLinuxResourcesMemoryLimit(l) + g.SetLinuxResourcesMemorySwap(l) + } + } + for _, l := range r.HugepageLimits { + g.AddLinuxResourcesHugepageLimit(l.PageSize, l.Limit) + } + for k, v := range r.Unified { + g.AddLinuxResourcesUnified(k, v) + } + + if g.checkResources != nil { + if err := g.checkResources(g.Config.Linux.Resources); err != nil { + return fmt.Errorf("failed to adjust resources in OCI Spec: %w", err) + } + } + + return nil +} + +// AdjustBlockIOClass adjusts the block I/O class in the OCI Spec. +func (g *Generator) AdjustBlockIOClass(blockIOClass *string) error { + if blockIOClass == nil || g.resolveBlockIO == nil { + return nil + } + + if *blockIOClass == "" { + g.ClearLinuxResourcesBlockIO() + return nil + } + + blockIO, err := g.resolveBlockIO(*blockIOClass) + if err != nil { + return fmt.Errorf("failed to adjust BlockIO class in OCI Spec: %w", err) + } + + g.SetLinuxResourcesBlockIO(blockIO) + return nil +} + +// AdjustRdtClass adjusts the RDT class in the OCI Spec. +func (g *Generator) AdjustRdtClass(rdtClass *string) error { + if rdtClass == nil || g.resolveRdt == nil { + return nil + } + + if *rdtClass == "" { + g.ClearLinuxIntelRdt() + return nil + } + + rdt, err := g.resolveRdt(*rdtClass) + if err != nil { + return fmt.Errorf("failed to adjust RDT class in OCI Spec: %w", err) + } + + g.SetLinuxIntelRdt(rdt) + return nil +} + +// AdjustCgroupsPath adjusts the cgroup pseudofs path in the OCI Spec. +func (g *Generator) AdjustCgroupsPath(path string) { + if path != "" { + g.SetLinuxCgroupsPath(path) + } +} + +// AdjustDevices adjusts the (Linux) devices in the OCI Spec. +func (g *Generator) AdjustDevices(devices []*nri.LinuxDevice) { + for _, d := range devices { + key, marked := d.IsMarkedForRemoval() + g.RemoveDevice(key) + if marked { + continue + } + g.AddDevice(d.ToOCI()) + major, minor, access := &d.Major, &d.Minor, d.AccessString() + g.AddLinuxResourcesDevice(true, d.Type, major, minor, access) + } +} + +// AdjustMounts adjusts the mounts in the OCI Spec. +func (g *Generator) AdjustMounts(mounts []*nri.Mount) error { + var ( + propagation string + ) + + for _, m := range mounts { + if destination, marked := m.IsMarkedForRemoval(); marked { + g.RemoveMount(destination) + continue + } + g.RemoveMount(m.Destination) + + mnt := m.ToOCI(&propagation) + + switch propagation { + case "rprivate": + case "rshared": + if err := ensurePropagation(mnt.Source, "rshared"); err != nil { + return fmt.Errorf("failed to adjust mounts in OCI Spec: %w", err) + } + if err := g.SetLinuxRootPropagation("rshared"); err != nil { + return fmt.Errorf("failed to adjust rootfs propagation in OCI Spec: %w", err) + } + case "rslave": + if err := ensurePropagation(mnt.Source, "rshared", "rslave"); err != nil { + return fmt.Errorf("failed to adjust mounts in OCI Spec: %w", err) + } + rootProp := g.Config.Linux.RootfsPropagation + if rootProp != "rshared" && rootProp != "rslave" { + if err := g.SetLinuxRootPropagation("rslave"); err != nil { + return fmt.Errorf("failed to adjust rootfs propagation in OCI Spec: %w", err) + } + } + } + g.AddMount(mnt) + } + + return nil +} + +func nopFilter(m map[string]string) (map[string]string, error) { + return m, nil +} + +// +// TODO: these could be added to the stock Spec generator... +// + +// AddCreateRuntimeHook adds a hooks new CreateRuntime hooks. +func (g *Generator) AddCreateRuntimeHook(hook rspec.Hook) { + g.initConfigHooks() + g.Config.Hooks.CreateRuntime = append(g.Config.Hooks.CreateRuntime, hook) +} + +// AddCreateContainerHook adds a hooks new CreateContainer hooks. +func (g *Generator) AddCreateContainerHook(hook rspec.Hook) { + g.initConfigHooks() + g.Config.Hooks.CreateContainer = append(g.Config.Hooks.CreateContainer, hook) +} + +// AddStartContainerHook adds a hooks new StartContainer hooks. +func (g *Generator) AddStartContainerHook(hook rspec.Hook) { + g.initConfigHooks() + g.Config.Hooks.StartContainer = append(g.Config.Hooks.StartContainer, hook) +} + +// ClearLinuxIntelRdt clears RDT CLOS. +func (g *Generator) ClearLinuxIntelRdt() { + g.initConfigLinux() + g.Config.Linux.IntelRdt = nil +} + +// SetLinuxIntelRdt sets RDT CLOS. +func (g *Generator) SetLinuxIntelRdt(rdt *rspec.LinuxIntelRdt) { + g.initConfigLinux() + g.Config.Linux.IntelRdt = rdt +} + +// ClearLinuxResourcesBlockIO clears Block I/O settings. +func (g *Generator) ClearLinuxResourcesBlockIO() { + g.initConfigLinuxResources() + g.Config.Linux.Resources.BlockIO = nil +} + +// SetLinuxResourcesBlockIO sets Block I/O settings. +func (g *Generator) SetLinuxResourcesBlockIO(blockIO *rspec.LinuxBlockIO) { + g.initConfigLinuxResources() + g.Config.Linux.Resources.BlockIO = blockIO +} + +func (g *Generator) initConfig() { + if g.Config == nil { + g.Config = &rspec.Spec{} + } +} + +func (g *Generator) initConfigHooks() { + g.initConfig() + if g.Config.Hooks == nil { + g.Config.Hooks = &rspec.Hooks{} + } +} + +func (g *Generator) initConfigLinux() { + g.initConfig() + if g.Config.Linux == nil { + g.Config.Linux = &rspec.Linux{} + } +} + +func (g *Generator) initConfigLinuxResources() { + g.initConfigLinux() + if g.Config.Linux.Resources == nil { + g.Config.Linux.Resources = &rspec.LinuxResources{} + } +} diff --git a/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/helpers_linux.go b/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/helpers_linux.go new file mode 100644 index 000000000..41088cb76 --- /dev/null +++ b/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/helpers_linux.go @@ -0,0 +1,78 @@ +//go:build linux +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package generate + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/moby/sys/mountinfo" +) + +func getPropagation(path string) (string, error) { + var ( + dir = filepath.Clean(path) + mnt *mountinfo.Info + ) + + mounts, err := mountinfo.GetMounts(mountinfo.ParentsFilter(dir)) + if err != nil { + return "", err + } + if len(mounts) == 0 { + return "", fmt.Errorf("failed to get mount info for %q", path) + } + + maxLen := 0 + for _, m := range mounts { + if l := len(m.Mountpoint); l > maxLen { + mnt = m + maxLen = l + } + } + + for _, opt := range strings.Split(mnt.Optional, " ") { + switch { + case strings.HasPrefix(opt, "shared:"): + return "rshared", nil + case strings.HasPrefix(opt, "master:"): + return "rslave", nil + } + } + + return "", nil +} + +func ensurePropagation(path string, accepted ...string) error { + prop, err := getPropagation(path) + if err != nil { + return err + } + + for _, p := range accepted { + if p == prop { + return nil + } + } + + return fmt.Errorf("path %q mount propagation is %q, not %q", + path, prop, strings.Join(accepted, " or ")) +} diff --git a/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/helpers_other.go b/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/helpers_other.go new file mode 100644 index 000000000..54f58d3e7 --- /dev/null +++ b/vendor/github.com/containerd/nri/pkg/runtime-tools/generate/helpers_other.go @@ -0,0 +1,24 @@ +//go:build !linux +// +build !linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package generate + +func ensurePropagation(path string, accepted ...string) error { + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e4d9b9da4..722f7abb0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -134,6 +134,7 @@ github.com/containerd/nri/pkg/api github.com/containerd/nri/pkg/log github.com/containerd/nri/pkg/net github.com/containerd/nri/pkg/net/multiplex +github.com/containerd/nri/pkg/runtime-tools/generate github.com/containerd/nri/types/v1 # github.com/containerd/ttrpc v1.1.1-0.20220420014843-944ef4a40df3 ## explicit; go 1.13