From ebbcb57a4c6e88b27dfdc79ee05b13e1946fea04 Mon Sep 17 00:00:00 2001 From: Krisztian Litkey Date: Wed, 11 Jan 2023 14:20:08 +0200 Subject: [PATCH] pkg/cri/sbserver: experimental NRI integration for CRI. Hook the NRI service plugin into CRI sbserver request processing. Signed-off-by: Krisztian Litkey --- contrib/fuzz/cri_sbserver_fuzzer.go | 2 +- pkg/cri/cri.go | 2 +- pkg/cri/sbserver/container_create.go | 14 +++++ pkg/cri/sbserver/container_remove.go | 10 ++++ pkg/cri/sbserver/container_start.go | 35 ++++++++----- pkg/cri/sbserver/container_stop.go | 10 ++++ .../sbserver/container_update_resources.go | 21 ++++++++ pkg/cri/sbserver/events.go | 4 +- pkg/cri/sbserver/nri.go | 43 ++++++++++++++++ pkg/cri/sbserver/nri_linux.go | 35 +++++++++++++ pkg/cri/sbserver/nri_other.go | 35 +++++++++++++ pkg/cri/sbserver/opts.go | 51 ------------------- pkg/cri/sbserver/sandbox_remove.go | 5 ++ pkg/cri/sbserver/sandbox_run.go | 13 +++++ pkg/cri/sbserver/sandbox_stop.go | 5 ++ pkg/cri/sbserver/service.go | 12 ++++- 16 files changed, 227 insertions(+), 70 deletions(-) create mode 100644 pkg/cri/sbserver/nri.go create mode 100644 pkg/cri/sbserver/nri_linux.go create mode 100644 pkg/cri/sbserver/nri_other.go delete mode 100644 pkg/cri/sbserver/opts.go diff --git a/contrib/fuzz/cri_sbserver_fuzzer.go b/contrib/fuzz/cri_sbserver_fuzzer.go index e2e2b63cb..ed3e88ec7 100644 --- a/contrib/fuzz/cri_sbserver_fuzzer.go +++ b/contrib/fuzz/cri_sbserver_fuzzer.go @@ -37,7 +37,7 @@ func FuzzCRISandboxServer(data []byte) int { } defer client.Close() - c, err := sbserver.NewCRIService(criconfig.Config{}, client) + c, err := sbserver.NewCRIService(criconfig.Config{}, client, nil) if err != nil { panic(err) } diff --git a/pkg/cri/cri.go b/pkg/cri/cri.go index 39fd30b56..8fa806b68 100644 --- a/pkg/cri/cri.go +++ b/pkg/cri/cri.go @@ -90,7 +90,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { var s server.CRIService if os.Getenv("ENABLE_CRI_SANDBOXES") != "" { log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable") - s, err = sbserver.NewCRIService(c, client) + s, err = sbserver.NewCRIService(c, client, getNRIAPI(ic)) } else { log.G(ctx).Info("using legacy CRI server") s, err = server.NewCRIService(c, client, getNRIAPI(ic)) diff --git a/pkg/cri/sbserver/container_create.go b/pkg/cri/sbserver/container_create.go index a49e38e08..df9590a1c 100644 --- a/pkg/cri/sbserver/container_create.go +++ b/pkg/cri/sbserver/container_create.go @@ -278,6 +278,15 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta opts = append(opts, containerd.WithSandbox(sandboxID)) } + opts = append(opts, c.nri.WithContainerAdjustment()) + defer func() { + if retErr != nil { + deferCtx, deferCancel := util.DeferContext() + defer deferCancel() + c.nri.UndoCreateContainer(deferCtx, &sandbox, id, spec) + } + }() + var cntr containerd.Container if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil { return nil, fmt.Errorf("failed to create containerd container: %w", err) @@ -318,6 +327,11 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) + err = c.nri.PostCreateContainer(ctx, &sandbox, &container) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI post-create notification failed") + } + containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start) return &runtime.CreateContainerResponse{ContainerId: id}, nil diff --git a/pkg/cri/sbserver/container_remove.go b/pkg/cri/sbserver/container_remove.go index 05f6ac213..ff4062944 100644 --- a/pkg/cri/sbserver/container_remove.go +++ b/pkg/cri/sbserver/container_remove.go @@ -73,6 +73,16 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta } }() + sandbox, err := c.sandboxStore.Get(container.SandboxID) + if err != nil { + err = c.nri.RemoveContainer(ctx, nil, &container) + } else { + err = c.nri.RemoveContainer(ctx, &sandbox, &container) + } + if err != nil { + log.G(ctx).WithError(err).Error("NRI failed to remove container") + } + // NOTE(random-liu): Docker set container to "Dead" state when start removing the // container so as to avoid start/restart the container again. However, for current // kubelet implementation, we'll never start a container once we decide to remove it, diff --git a/pkg/cri/sbserver/container_start.go b/pkg/cri/sbserver/container_start.go index 420fa742d..bc893ea78 100644 --- a/pkg/cri/sbserver/container_start.go +++ b/pkg/cri/sbserver/container_start.go @@ -27,8 +27,6 @@ import ( containerdio "github.com/containerd/containerd/cio" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" - "github.com/containerd/nri" - v1 "github.com/containerd/nri/types/v1" "github.com/sirupsen/logrus" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -135,7 +133,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() // It's possible that task is deleted by event monitor. - if _, err := task.Delete(deferCtx, WithNRISandboxDelete(sandboxID), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete containerd task %q", id) } } @@ -146,18 +144,22 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain if err != nil { return nil, fmt.Errorf("failed to wait for containerd task: %w", err) } - nric, err := nri.New() + + defer func() { + if retErr != nil { + deferCtx, deferCancel := ctrdutil.DeferContext() + defer deferCancel() + err = c.nri.StopContainer(deferCtx, &sandbox, &cntr) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id) + } + } + }() + + err = c.nri.StartContainer(ctx, &sandbox, &cntr) if err != nil { - log.G(ctx).WithError(err).Error("unable to create nri client") - } - if nric != nil { - nriSB := &nri.Sandbox{ - ID: sandboxID, - Labels: sandbox.Config.Labels, - } - if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil { - return nil, fmt.Errorf("nri invoke: %w", err) - } + log.G(ctx).WithError(err).Errorf("NRI container start failed") + return nil, fmt.Errorf("NRI container start failed: %w", err) } // Start containerd task. @@ -179,6 +181,11 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) + err = c.nri.PostStartContainer(ctx, &sandbox, &cntr) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI post-start notification failed") + } + containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start) return &runtime.StartContainerResponse{}, nil diff --git a/pkg/cri/sbserver/container_stop.go b/pkg/cri/sbserver/container_stop.go index 55bc3646e..ac36660ca 100644 --- a/pkg/cri/sbserver/container_stop.go +++ b/pkg/cri/sbserver/container_stop.go @@ -47,6 +47,16 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer return nil, err } + sandbox, err := c.sandboxStore.Get(container.SandboxID) + if err != nil { + err = c.nri.StopContainer(ctx, nil, &container) + } else { + err = c.nri.StopContainer(ctx, &sandbox, &container) + } + if err != nil { + log.G(ctx).WithError(err).Error("NRI failed to stop container") + } + i, err := container.Container.Info(ctx) if err != nil { return nil, fmt.Errorf("get container info: %w", err) diff --git a/pkg/cri/sbserver/container_update_resources.go b/pkg/cri/sbserver/container_update_resources.go index 620798be8..1015f1c0c 100644 --- a/pkg/cri/sbserver/container_update_resources.go +++ b/pkg/cri/sbserver/container_update_resources.go @@ -42,6 +42,21 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up if err != nil { return nil, fmt.Errorf("failed to find container: %w", err) } + + sandbox, err := c.sandboxStore.Get(container.SandboxID) + if err != nil { + return nil, err + } + + resources := r.GetLinux() + updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources) + if err != nil { + return nil, fmt.Errorf("NRI container update failed: %w", err) + } + if updated != nil { + *resources = *updated + } + // Update resources in status update transaction, so that: // 1) There won't be race condition with container start. // 2) There won't be concurrent resource update to the same container. @@ -50,6 +65,12 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up }); err != nil { return nil, fmt.Errorf("failed to update resources: %w", err) } + + err = c.nri.PostUpdateContainerResources(ctx, &sandbox, &container) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI post-update notification failed") + } + return &runtime.UpdateContainerResourcesResponse{}, nil } diff --git a/pkg/cri/sbserver/events.go b/pkg/cri/sbserver/events.go index aa617f5db..ac8f6b813 100644 --- a/pkg/cri/sbserver/events.go +++ b/pkg/cri/sbserver/events.go @@ -385,7 +385,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta } } else { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), containerd.WithProcessKill); err != nil { + if _, err = task.Delete(ctx, c.nri.WithContainerExit(&cntr), containerd.WithProcessKill); err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to stop container: %w", err) } @@ -428,7 +428,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst } } else { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil { + if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to stop sandbox: %w", err) } diff --git a/pkg/cri/sbserver/nri.go b/pkg/cri/sbserver/nri.go new file mode 100644 index 000000000..d76b74759 --- /dev/null +++ b/pkg/cri/sbserver/nri.go @@ -0,0 +1,43 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sbserver + +import ( + criconfig "github.com/containerd/containerd/pkg/cri/config" + cstore "github.com/containerd/containerd/pkg/cri/store/container" + sstore "github.com/containerd/containerd/pkg/cri/store/sandbox" +) + +type criImplementation struct { + c *criService +} + +func (i *criImplementation) Config() *criconfig.Config { + return &i.c.config +} + +func (i *criImplementation) SandboxStore() *sstore.Store { + return i.c.sandboxStore +} + +func (i *criImplementation) ContainerStore() *cstore.Store { + return i.c.containerStore +} + +func (i *criImplementation) ContainerMetadataExtensionKey() string { + return containerMetadataExtension +} diff --git a/pkg/cri/sbserver/nri_linux.go b/pkg/cri/sbserver/nri_linux.go new file mode 100644 index 000000000..38c2f5e85 --- /dev/null +++ b/pkg/cri/sbserver/nri_linux.go @@ -0,0 +1,35 @@ +//go:build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sbserver + +import ( + "context" + "time" + + cstore "github.com/containerd/containerd/pkg/cri/store/container" + cri "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) { + return i.c.updateContainerResources(ctx, ctr, req, status) +} + +func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error { + return i.c.stopContainer(ctx, ctr, timeout) +} diff --git a/pkg/cri/sbserver/nri_other.go b/pkg/cri/sbserver/nri_other.go new file mode 100644 index 000000000..f88c16a3a --- /dev/null +++ b/pkg/cri/sbserver/nri_other.go @@ -0,0 +1,35 @@ +//go:build !linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sbserver + +import ( + "context" + "time" + + cstore "github.com/containerd/containerd/pkg/cri/store/container" + cri "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) { + return cstore.Status{}, nil +} + +func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error { + return nil +} diff --git a/pkg/cri/sbserver/opts.go b/pkg/cri/sbserver/opts.go deleted file mode 100644 index 1095fdd75..000000000 --- a/pkg/cri/sbserver/opts.go +++ /dev/null @@ -1,51 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package sbserver - -import ( - "context" - - "github.com/containerd/containerd" - "github.com/containerd/containerd/log" - "github.com/containerd/nri" - v1 "github.com/containerd/nri/types/v1" -) - -// WithNRISandboxDelete calls delete for a sandbox'd task -func WithNRISandboxDelete(sandboxID string) containerd.ProcessDeleteOpts { - return func(ctx context.Context, p containerd.Process) error { - task, ok := p.(containerd.Task) - if !ok { - return nil - } - nric, err := nri.New() - if err != nil { - log.G(ctx).WithError(err).Error("unable to create nri client") - return nil - } - if nric == nil { - return nil - } - sb := &nri.Sandbox{ - ID: sandboxID, - } - if _, err := nric.InvokeWithSandbox(ctx, task, v1.Delete, sb); err != nil { - log.G(ctx).WithError(err).Errorf("Failed to delete nri for %q", task.ID()) - } - return nil - } -} diff --git a/pkg/cri/sbserver/sandbox_remove.go b/pkg/cri/sbserver/sandbox_remove.go index 989e3f3ac..cd1228daf 100644 --- a/pkg/cri/sbserver/sandbox_remove.go +++ b/pkg/cri/sbserver/sandbox_remove.go @@ -90,6 +90,11 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err) } + err = c.nri.RemovePodSandbox(ctx, &sandbox) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed") + } + // Remove sandbox from sandbox store. Note that once the sandbox is successfully // deleted: // 1) ListPodSandbox will not include this sandbox. diff --git a/pkg/cri/sbserver/sandbox_run.go b/pkg/cri/sbserver/sandbox_run.go index 4917c9a72..40da11c08 100644 --- a/pkg/cri/sbserver/sandbox_run.go +++ b/pkg/cri/sbserver/sandbox_run.go @@ -262,6 +262,19 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox sandbox.ProcessLabel = labels["selinux_label"] + err = c.nri.RunPodSandbox(ctx, &sandbox) + if err != nil { + return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err) + } + + defer func() { + if retErr != nil { + deferCtx, deferCancel := util.DeferContext() + defer deferCancel() + c.nri.RemovePodSandbox(deferCtx, &sandbox) + } + }() + if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { // Set the pod sandbox as ready after successfully start sandbox container. status.Pid = ctrl.Pid diff --git a/pkg/cri/sbserver/sandbox_stop.go b/pkg/cri/sbserver/sandbox_stop.go index 1ca8470e5..dbd2b7ac8 100644 --- a/pkg/cri/sbserver/sandbox_stop.go +++ b/pkg/cri/sbserver/sandbox_stop.go @@ -80,6 +80,11 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop) + err := c.nri.StopPodSandbox(ctx, &sandbox) + if err != nil { + log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed") + } + // Teardown network for sandbox. if sandbox.NetNS != nil { netStop := time.Now() diff --git a/pkg/cri/sbserver/service.go b/pkg/cri/sbserver/service.go index 98e11c28e..9154ddb6c 100644 --- a/pkg/cri/sbserver/service.go +++ b/pkg/cri/sbserver/service.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/oci" "github.com/containerd/containerd/pkg/cri/instrument" + "github.com/containerd/containerd/pkg/cri/nri" "github.com/containerd/containerd/pkg/cri/sbserver/podsandbox" "github.com/containerd/containerd/pkg/cri/streaming" "github.com/containerd/containerd/pkg/kmutex" @@ -120,10 +121,12 @@ type criService struct { // containerEventsChan is used to capture container events and send them // to the caller of GetContainerEvents. containerEventsChan chan runtime.ContainerEventResponse + // nri is used to hook NRI into CRI request processing. + nri *nri.API } // NewCRIService returns a new instance of CRIService -func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) { +func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) { var err error labels := label.NewStore() c := &criService{ @@ -191,6 +194,8 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs) c.sandboxControllers[criconfig.ModeShim] = client.SandboxController() + c.nri = nri + return c, nil } @@ -271,6 +276,11 @@ func (c *criService) Run() error { } }() + // register CRI domain with NRI + if err := c.nri.Register(&criImplementation{c}); err != nil { + return fmt.Errorf("failed to set up NRI for CRI service: %w", err) + } + // Set the server as initialized. GRPC services could start serving traffic. c.initialized.Set()