pkg/cri/sbserver: experimental NRI integration for CRI.

Hook the NRI service plugin into CRI sbserver request
processing.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
This commit is contained in:
Krisztian Litkey 2023-01-11 14:20:08 +02:00
parent 8a1dca0f4a
commit ebbcb57a4c
16 changed files with 227 additions and 70 deletions

View File

@ -37,7 +37,7 @@ func FuzzCRISandboxServer(data []byte) int {
} }
defer client.Close() defer client.Close()
c, err := sbserver.NewCRIService(criconfig.Config{}, client) c, err := sbserver.NewCRIService(criconfig.Config{}, client, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -90,7 +90,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
var s server.CRIService var s server.CRIService
if os.Getenv("ENABLE_CRI_SANDBOXES") != "" { if os.Getenv("ENABLE_CRI_SANDBOXES") != "" {
log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable") log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable")
s, err = sbserver.NewCRIService(c, client) s, err = sbserver.NewCRIService(c, client, getNRIAPI(ic))
} else { } else {
log.G(ctx).Info("using legacy CRI server") log.G(ctx).Info("using legacy CRI server")
s, err = server.NewCRIService(c, client, getNRIAPI(ic)) s, err = server.NewCRIService(c, client, getNRIAPI(ic))

View File

@ -278,6 +278,15 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
opts = append(opts, containerd.WithSandbox(sandboxID)) opts = append(opts, containerd.WithSandbox(sandboxID))
} }
opts = append(opts, c.nri.WithContainerAdjustment())
defer func() {
if retErr != nil {
deferCtx, deferCancel := util.DeferContext()
defer deferCancel()
c.nri.UndoCreateContainer(deferCtx, &sandbox, id, spec)
}
}()
var cntr containerd.Container var cntr containerd.Container
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil { if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
return nil, fmt.Errorf("failed to create containerd container: %w", err) return nil, fmt.Errorf("failed to create containerd container: %w", err)
@ -318,6 +327,11 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
err = c.nri.PostCreateContainer(ctx, &sandbox, &container)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-create notification failed")
}
containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start) containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start)
return &runtime.CreateContainerResponse{ContainerId: id}, nil return &runtime.CreateContainerResponse{ContainerId: id}, nil

View File

@ -73,6 +73,16 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
} }
}() }()
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
err = c.nri.RemoveContainer(ctx, nil, &container)
} else {
err = c.nri.RemoveContainer(ctx, &sandbox, &container)
}
if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to remove container")
}
// NOTE(random-liu): Docker set container to "Dead" state when start removing the // NOTE(random-liu): Docker set container to "Dead" state when start removing the
// container so as to avoid start/restart the container again. However, for current // container so as to avoid start/restart the container again. However, for current
// kubelet implementation, we'll never start a container once we decide to remove it, // kubelet implementation, we'll never start a container once we decide to remove it,

View File

@ -27,8 +27,6 @@ import (
containerdio "github.com/containerd/containerd/cio" containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/nri"
v1 "github.com/containerd/nri/types/v1"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -135,7 +133,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
deferCtx, deferCancel := ctrdutil.DeferContext() deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel() defer deferCancel()
// It's possible that task is deleted by event monitor. // It's possible that task is deleted by event monitor.
if _, err := task.Delete(deferCtx, WithNRISandboxDelete(sandboxID), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("Failed to delete containerd task %q", id) log.G(ctx).WithError(err).Errorf("Failed to delete containerd task %q", id)
} }
} }
@ -146,18 +144,22 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to wait for containerd task: %w", err) return nil, fmt.Errorf("failed to wait for containerd task: %w", err)
} }
nric, err := nri.New()
defer func() {
if retErr != nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
err = c.nri.StopContainer(deferCtx, &sandbox, &cntr)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id)
}
}
}()
err = c.nri.StartContainer(ctx, &sandbox, &cntr)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("unable to create nri client") log.G(ctx).WithError(err).Errorf("NRI container start failed")
} return nil, fmt.Errorf("NRI container start failed: %w", err)
if nric != nil {
nriSB := &nri.Sandbox{
ID: sandboxID,
Labels: sandbox.Config.Labels,
}
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil {
return nil, fmt.Errorf("nri invoke: %w", err)
}
} }
// Start containerd task. // Start containerd task.
@ -179,6 +181,11 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
err = c.nri.PostStartContainer(ctx, &sandbox, &cntr)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-start notification failed")
}
containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start) containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start)
return &runtime.StartContainerResponse{}, nil return &runtime.StartContainerResponse{}, nil

View File

@ -47,6 +47,16 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
return nil, err return nil, err
} }
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
err = c.nri.StopContainer(ctx, nil, &container)
} else {
err = c.nri.StopContainer(ctx, &sandbox, &container)
}
if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to stop container")
}
i, err := container.Container.Info(ctx) i, err := container.Container.Info(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("get container info: %w", err) return nil, fmt.Errorf("get container info: %w", err)

View File

@ -42,6 +42,21 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to find container: %w", err) return nil, fmt.Errorf("failed to find container: %w", err)
} }
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
return nil, err
}
resources := r.GetLinux()
updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources)
if err != nil {
return nil, fmt.Errorf("NRI container update failed: %w", err)
}
if updated != nil {
*resources = *updated
}
// Update resources in status update transaction, so that: // Update resources in status update transaction, so that:
// 1) There won't be race condition with container start. // 1) There won't be race condition with container start.
// 2) There won't be concurrent resource update to the same container. // 2) There won't be concurrent resource update to the same container.
@ -50,6 +65,12 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
}); err != nil { }); err != nil {
return nil, fmt.Errorf("failed to update resources: %w", err) return nil, fmt.Errorf("failed to update resources: %w", err)
} }
err = c.nri.PostUpdateContainerResources(ctx, &sandbox, &container)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-update notification failed")
}
return &runtime.UpdateContainerResourcesResponse{}, nil return &runtime.UpdateContainerResourcesResponse{}, nil
} }

View File

@ -385,7 +385,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
} }
} else { } else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), containerd.WithProcessKill); err != nil { if _, err = task.Delete(ctx, c.nri.WithContainerExit(&cntr), containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container: %w", err) return fmt.Errorf("failed to stop container: %w", err)
} }
@ -428,7 +428,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
} }
} else { } else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil { if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop sandbox: %w", err) return fmt.Errorf("failed to stop sandbox: %w", err)
} }

43
pkg/cri/sbserver/nri.go Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
criconfig "github.com/containerd/containerd/pkg/cri/config"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
)
type criImplementation struct {
c *criService
}
func (i *criImplementation) Config() *criconfig.Config {
return &i.c.config
}
func (i *criImplementation) SandboxStore() *sstore.Store {
return i.c.sandboxStore
}
func (i *criImplementation) ContainerStore() *cstore.Store {
return i.c.containerStore
}
func (i *criImplementation) ContainerMetadataExtensionKey() string {
return containerMetadataExtension
}

View File

@ -0,0 +1,35 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
"context"
"time"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) {
return i.c.updateContainerResources(ctx, ctr, req, status)
}
func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error {
return i.c.stopContainer(ctx, ctr, timeout)
}

View File

@ -0,0 +1,35 @@
//go:build !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
"context"
"time"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) {
return cstore.Status{}, nil
}
func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error {
return nil
}

View File

@ -1,51 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
"context"
"github.com/containerd/containerd"
"github.com/containerd/containerd/log"
"github.com/containerd/nri"
v1 "github.com/containerd/nri/types/v1"
)
// WithNRISandboxDelete calls delete for a sandbox'd task
func WithNRISandboxDelete(sandboxID string) containerd.ProcessDeleteOpts {
return func(ctx context.Context, p containerd.Process) error {
task, ok := p.(containerd.Task)
if !ok {
return nil
}
nric, err := nri.New()
if err != nil {
log.G(ctx).WithError(err).Error("unable to create nri client")
return nil
}
if nric == nil {
return nil
}
sb := &nri.Sandbox{
ID: sandboxID,
}
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Delete, sb); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to delete nri for %q", task.ID())
}
return nil
}
}

View File

@ -90,6 +90,11 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err) return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err)
} }
err = c.nri.RemovePodSandbox(ctx, &sandbox)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed")
}
// Remove sandbox from sandbox store. Note that once the sandbox is successfully // Remove sandbox from sandbox store. Note that once the sandbox is successfully
// deleted: // deleted:
// 1) ListPodSandbox will not include this sandbox. // 1) ListPodSandbox will not include this sandbox.

View File

@ -262,6 +262,19 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
sandbox.ProcessLabel = labels["selinux_label"] sandbox.ProcessLabel = labels["selinux_label"]
err = c.nri.RunPodSandbox(ctx, &sandbox)
if err != nil {
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
}
defer func() {
if retErr != nil {
deferCtx, deferCancel := util.DeferContext()
defer deferCancel()
c.nri.RemovePodSandbox(deferCtx, &sandbox)
}
}()
if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
// Set the pod sandbox as ready after successfully start sandbox container. // Set the pod sandbox as ready after successfully start sandbox container.
status.Pid = ctrl.Pid status.Pid = ctrl.Pid

View File

@ -80,6 +80,11 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop) sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
err := c.nri.StopPodSandbox(ctx, &sandbox)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed")
}
// Teardown network for sandbox. // Teardown network for sandbox.
if sandbox.NetNS != nil { if sandbox.NetNS != nil {
netStop := time.Now() netStop := time.Now()

View File

@ -30,6 +30,7 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/oci" "github.com/containerd/containerd/oci"
"github.com/containerd/containerd/pkg/cri/instrument" "github.com/containerd/containerd/pkg/cri/instrument"
"github.com/containerd/containerd/pkg/cri/nri"
"github.com/containerd/containerd/pkg/cri/sbserver/podsandbox" "github.com/containerd/containerd/pkg/cri/sbserver/podsandbox"
"github.com/containerd/containerd/pkg/cri/streaming" "github.com/containerd/containerd/pkg/cri/streaming"
"github.com/containerd/containerd/pkg/kmutex" "github.com/containerd/containerd/pkg/kmutex"
@ -120,10 +121,12 @@ type criService struct {
// containerEventsChan is used to capture container events and send them // containerEventsChan is used to capture container events and send them
// to the caller of GetContainerEvents. // to the caller of GetContainerEvents.
containerEventsChan chan runtime.ContainerEventResponse containerEventsChan chan runtime.ContainerEventResponse
// nri is used to hook NRI into CRI request processing.
nri *nri.API
} }
// NewCRIService returns a new instance of CRIService // NewCRIService returns a new instance of CRIService
func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) { func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) {
var err error var err error
labels := label.NewStore() labels := label.NewStore()
c := &criService{ c := &criService{
@ -191,6 +194,8 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs) c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs)
c.sandboxControllers[criconfig.ModeShim] = client.SandboxController() c.sandboxControllers[criconfig.ModeShim] = client.SandboxController()
c.nri = nri
return c, nil return c, nil
} }
@ -271,6 +276,11 @@ func (c *criService) Run() error {
} }
}() }()
// register CRI domain with NRI
if err := c.nri.Register(&criImplementation{c}); err != nil {
return fmt.Errorf("failed to set up NRI for CRI service: %w", err)
}
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set() c.initialized.Set()