Merge pull request #7954 from klihub/devel/sbserver-nri-integration

pkg/cri/sbserver: experimental NRI integration for CRI.
This commit is contained in:
Kazuyoshi Kato 2023-02-15 10:42:25 -08:00 committed by GitHub
commit fe5d1d3e7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 699 additions and 389 deletions

View File

@ -37,7 +37,7 @@ func FuzzCRISandboxServer(data []byte) int {
} }
defer client.Close() defer client.Close()
c, err := sbserver.NewCRIService(criconfig.Config{}, client) c, err := sbserver.NewCRIService(criconfig.Config{}, client, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -24,8 +24,9 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/cri/nri"
"github.com/containerd/containerd/pkg/cri/sbserver" "github.com/containerd/containerd/pkg/cri/sbserver"
"github.com/containerd/containerd/pkg/nri" nriservice "github.com/containerd/containerd/pkg/nri"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
imagespec "github.com/opencontainers/image-spec/specs-go/v1" imagespec "github.com/opencontainers/image-spec/specs-go/v1"
@ -87,19 +88,12 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
} }
var s server.CRIService var s server.CRIService
var nrip nri.API
if os.Getenv("ENABLE_CRI_SANDBOXES") != "" { if os.Getenv("ENABLE_CRI_SANDBOXES") != "" {
log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable") log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable")
s, err = sbserver.NewCRIService(c, client) s, err = sbserver.NewCRIService(c, client, getNRIAPI(ic))
} else { } else {
log.G(ctx).Info("using legacy CRI server") log.G(ctx).Info("using legacy CRI server")
s, err = server.NewCRIService(c, client, getNRIAPI(ic))
nrip, err = getNRIPlugin(ic)
if err != nil {
log.G(ctx).Info("NRI service not found, disabling NRI support")
}
s, err = server.NewCRIService(c, client, nrip)
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create CRI service: %w", err) return nil, fmt.Errorf("failed to create CRI service: %w", err)
@ -112,13 +106,6 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
// TODO(random-liu): Whether and how we can stop containerd. // TODO(random-liu): Whether and how we can stop containerd.
}() }()
if nrip != nil {
log.G(ctx).Info("using experimental NRI integration - disable nri plugin to prevent this")
if err = nrip.Start(); err != nil {
log.G(ctx).WithError(err).Fatal("Failed to start NRI service")
}
}
return s, nil return s, nil
} }
@ -146,23 +133,29 @@ func setGLogLevel() error {
return nil return nil
} }
// Get the NRI plugin and verify its type. // Get the NRI plugin, and set up our NRI API for it.
func getNRIPlugin(ic *plugin.InitContext) (nri.API, error) { func getNRIAPI(ic *plugin.InitContext) *nri.API {
const ( const (
pluginType = plugin.NRIApiPlugin pluginType = plugin.NRIApiPlugin
pluginName = "nri" pluginName = "nri"
) )
ctx := ic.Context
p, err := ic.GetByID(pluginType, pluginName) p, err := ic.GetByID(pluginType, pluginName)
if err != nil { if err != nil {
return nil, err log.G(ctx).Info("NRI service not found, NRI support disabled")
return nil
} }
api, ok := p.(nri.API) api, ok := p.(nriservice.API)
if !ok { if !ok {
return nil, fmt.Errorf("NRI plugin (%s, %q) has incompatible type %T", log.G(ctx).Infof("NRI plugin (%s, %q) has incorrect type %T, NRI support disabled",
pluginType, pluginName, api) pluginType, pluginName, api)
return nil
} }
return api, nil log.G(ctx).Info("using experimental NRI integration - disable nri plugin to prevent this")
return nri.NewAPI(api)
} }

36
pkg/cri/nri/nri_api.go Normal file
View File

@ -0,0 +1,36 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nri
import (
"context"
"time"
criconfig "github.com/containerd/containerd/pkg/cri/config"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
type CRIImplementation interface {
Config() *criconfig.Config
SandboxStore() *sstore.Store
ContainerStore() *cstore.Store
ContainerMetadataExtensionKey() string
UpdateContainerResources(context.Context, cstore.Container, *cri.UpdateContainerResourcesRequest, cstore.Status) (cstore.Status, error)
StopContainer(context.Context, cstore.Container, time.Duration) error
}

View File

@ -1,5 +1,3 @@
//go:build linux
/* /*
Copyright The containerd Authors. Copyright The containerd Authors.
@ -16,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package server package nri
import ( import (
"context" "context"
@ -29,6 +27,7 @@ import (
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/blockio" "github.com/containerd/containerd/pkg/blockio"
"github.com/containerd/containerd/pkg/cri/annotations" "github.com/containerd/containerd/pkg/cri/annotations"
"github.com/containerd/containerd/pkg/cri/constants"
cstore "github.com/containerd/containerd/pkg/cri/store/container" cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox" sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util" ctrdutil "github.com/containerd/containerd/pkg/cri/util"
@ -38,29 +37,37 @@ import (
"github.com/opencontainers/runtime-tools/generate" "github.com/opencontainers/runtime-tools/generate"
cri "k8s.io/cri-api/pkg/apis/runtime/v1" cri "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/pkg/nri" "github.com/containerd/containerd/pkg/nri"
"github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/api"
nrigen "github.com/containerd/nri/pkg/runtime-tools/generate" nrigen "github.com/containerd/nri/pkg/runtime-tools/generate"
) )
type nriAPI struct { type API struct {
cri *criService cri CRIImplementation
nri nri.API nri nri.API
} }
func (a *nriAPI) register() { func NewAPI(nri nri.API) *API {
if !a.isEnabled() { return &API{
return nri: nri,
} }
nri.RegisterDomain(a)
} }
func (a *nriAPI) isEnabled() bool { func (a *API) IsDisabled() bool {
return a != nil && a.nri != nil && a.nri.IsEnabled() return a == nil || a.nri == nil || !a.nri.IsEnabled()
}
func (a *API) IsEnabled() bool { return !a.IsDisabled() }
func (a *API) Register(cri CRIImplementation) error {
if a.IsDisabled() {
return nil
}
a.cri = cri
nri.RegisterDomain(a)
return a.nri.Start()
} }
// //
@ -71,7 +78,11 @@ func (a *nriAPI) isEnabled() bool {
// interface. // interface.
// //
func (a *nriAPI) runPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error { func (a *API) RunPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
err := a.nri.RunPodSandbox(ctx, pod) err := a.nri.RunPodSandbox(ctx, pod)
@ -83,14 +94,22 @@ func (a *nriAPI) runPodSandbox(ctx context.Context, criPod *sstore.Sandbox) erro
return err return err
} }
func (a *nriAPI) stopPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error { func (a *API) StopPodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
err := a.nri.StopPodSandbox(ctx, pod) err := a.nri.StopPodSandbox(ctx, pod)
return err return err
} }
func (a *nriAPI) removePodSandbox(ctx context.Context, criPod *sstore.Sandbox) error { func (a *API) RemovePodSandbox(ctx context.Context, criPod *sstore.Sandbox) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
err := a.nri.RemovePodSandbox(ctx, pod) err := a.nri.RemovePodSandbox(ctx, pod)
@ -98,10 +117,10 @@ func (a *nriAPI) removePodSandbox(ctx context.Context, criPod *sstore.Sandbox) e
return err return err
} }
func (a *nriAPI) createContainer(ctx context.Context, ctrs *containers.Container, spec *specs.Spec) (*api.ContainerAdjustment, error) { func (a *API) CreateContainer(ctx context.Context, ctrs *containers.Container, spec *specs.Spec) (*api.ContainerAdjustment, error) {
ctr := a.nriContainer(ctrs, spec) ctr := a.nriContainer(ctrs, spec)
criPod, err := a.cri.sandboxStore.Get(ctr.GetPodSandboxID()) criPod, err := a.cri.SandboxStore().Get(ctr.GetPodSandboxID())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -113,7 +132,11 @@ func (a *nriAPI) createContainer(ctx context.Context, ctrs *containers.Container
return adjust, err return adjust, err
} }
func (a *nriAPI) postCreateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { func (a *API) PostCreateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil) ctr := a.nriContainer(criCtr, nil)
@ -122,7 +145,11 @@ func (a *nriAPI) postCreateContainer(ctx context.Context, criPod *sstore.Sandbox
return err return err
} }
func (a *nriAPI) startContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { func (a *API) StartContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil) ctr := a.nriContainer(criCtr, nil)
@ -131,7 +158,11 @@ func (a *nriAPI) startContainer(ctx context.Context, criPod *sstore.Sandbox, cri
return err return err
} }
func (a *nriAPI) postStartContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { func (a *API) PostStartContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil) ctr := a.nriContainer(criCtr, nil)
@ -140,7 +171,11 @@ func (a *nriAPI) postStartContainer(ctx context.Context, criPod *sstore.Sandbox,
return err return err
} }
func (a *nriAPI) updateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container, req *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) { func (a *API) UpdateContainerResources(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container, req *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) {
if a.IsDisabled() {
return nil, nil
}
const noOomAdj = 0 const noOomAdj = 0
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
@ -154,7 +189,11 @@ func (a *nriAPI) updateContainer(ctx context.Context, criPod *sstore.Sandbox, cr
return r.ToCRI(noOomAdj), nil return r.ToCRI(noOomAdj), nil
} }
func (a *nriAPI) postUpdateContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { func (a *API) PostUpdateContainerResources(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil) ctr := a.nriContainer(criCtr, nil)
@ -163,7 +202,11 @@ func (a *nriAPI) postUpdateContainer(ctx context.Context, criPod *sstore.Sandbox
return err return err
} }
func (a *nriAPI) stopContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { func (a *API) StopContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
if a.IsDisabled() {
return nil
}
ctr := a.nriContainer(criCtr, nil) ctr := a.nriContainer(criCtr, nil)
if criPod == nil || criPod.ID == "" { if criPod == nil || criPod.ID == "" {
@ -180,10 +223,14 @@ func (a *nriAPI) stopContainer(ctx context.Context, criPod *sstore.Sandbox, criC
return err return err
} }
func (a *nriAPI) notifyContainerExit(ctx context.Context, criCtr *cstore.Container) { func (a *API) NotifyContainerExit(ctx context.Context, criCtr *cstore.Container) {
if a.IsDisabled() {
return
}
ctr := a.nriContainer(criCtr, nil) ctr := a.nriContainer(criCtr, nil)
criPod, _ := a.cri.sandboxStore.Get(ctr.GetPodSandboxID()) criPod, _ := a.cri.SandboxStore().Get(ctr.GetPodSandboxID())
if criPod.ID == "" { if criPod.ID == "" {
criPod = sstore.Sandbox{ criPod = sstore.Sandbox{
Metadata: sstore.Metadata{ Metadata: sstore.Metadata{
@ -196,7 +243,11 @@ func (a *nriAPI) notifyContainerExit(ctx context.Context, criCtr *cstore.Contain
a.nri.NotifyContainerExit(ctx, pod, ctr) a.nri.NotifyContainerExit(ctx, pod, ctr)
} }
func (a *nriAPI) removeContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error { func (a *API) RemoveContainer(ctx context.Context, criPod *sstore.Sandbox, criCtr *cstore.Container) error {
if a.IsDisabled() {
return nil
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(criCtr, nil) ctr := a.nriContainer(criCtr, nil)
@ -205,7 +256,11 @@ func (a *nriAPI) removeContainer(ctx context.Context, criPod *sstore.Sandbox, cr
return err return err
} }
func (a *nriAPI) undoCreateContainer(ctx context.Context, criPod *sstore.Sandbox, id string, spec *specs.Spec) { func (a *API) UndoCreateContainer(ctx context.Context, criPod *sstore.Sandbox, id string, spec *specs.Spec) {
if a.IsDisabled() {
return
}
pod := a.nriPodSandbox(criPod) pod := a.nriPodSandbox(criPod)
ctr := a.nriContainer(&containers.Container{ID: id}, spec) ctr := a.nriContainer(&containers.Container{ID: id}, spec)
@ -220,11 +275,17 @@ func (a *nriAPI) undoCreateContainer(ctx context.Context, criPod *sstore.Sandbox
} }
} }
func (a *nriAPI) WithContainerAdjustment() containerd.NewContainerOpts { func (a *API) WithContainerAdjustment() containerd.NewContainerOpts {
if a.IsDisabled() {
return func(context.Context, *containerd.Client, *containers.Container) error {
return nil
}
}
resourceCheckOpt := nrigen.WithResourceChecker( resourceCheckOpt := nrigen.WithResourceChecker(
func(r *runtimespec.LinuxResources) error { func(r *runtimespec.LinuxResources) error {
if r != nil { if r != nil {
if a.cri.config.DisableHugetlbController { if a.cri.Config().DisableHugetlbController {
r.HugepageLimits = nil r.HugepageLimits = nil
} }
} }
@ -262,7 +323,7 @@ func (a *nriAPI) WithContainerAdjustment() containerd.NewContainerOpts {
return fmt.Errorf("failed to unmarshal container OCI Spec for NRI: %w", err) return fmt.Errorf("failed to unmarshal container OCI Spec for NRI: %w", err)
} }
adjust, err := a.createContainer(ctx, c, spec) adjust, err := a.CreateContainer(ctx, c, spec)
if err != nil { if err != nil {
return fmt.Errorf("failed to get NRI adjustment for container: %w", err) return fmt.Errorf("failed to get NRI adjustment for container: %w", err)
} }
@ -285,15 +346,15 @@ func (a *nriAPI) WithContainerAdjustment() containerd.NewContainerOpts {
} }
} }
func (a *nriAPI) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDeleteOpts { func (a *API) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDeleteOpts {
if !a.isEnabled() { if a.IsDisabled() {
return func(_ context.Context, _ containerd.Process) error { return func(_ context.Context, _ containerd.Process) error {
return nil return nil
} }
} }
return func(_ context.Context, _ containerd.Process) error { return func(_ context.Context, _ containerd.Process) error {
a.notifyContainerExit(context.Background(), criCtr) a.NotifyContainerExit(context.Background(), criCtr)
return nil return nil
} }
} }
@ -310,13 +371,13 @@ const (
nriDomain = constants.K8sContainerdNamespace nriDomain = constants.K8sContainerdNamespace
) )
func (a *nriAPI) GetName() string { func (a *API) GetName() string {
return nriDomain return nriDomain
} }
func (a *nriAPI) ListPodSandboxes() []nri.PodSandbox { func (a *API) ListPodSandboxes() []nri.PodSandbox {
pods := []nri.PodSandbox{} pods := []nri.PodSandbox{}
for _, pod := range a.cri.sandboxStore.List() { for _, pod := range a.cri.SandboxStore().List() {
if pod.Status.Get().State != sstore.StateUnknown { if pod.Status.Get().State != sstore.StateUnknown {
pod := pod pod := pod
pods = append(pods, a.nriPodSandbox(&pod)) pods = append(pods, a.nriPodSandbox(&pod))
@ -325,9 +386,9 @@ func (a *nriAPI) ListPodSandboxes() []nri.PodSandbox {
return pods return pods
} }
func (a *nriAPI) ListContainers() []nri.Container { func (a *API) ListContainers() []nri.Container {
containers := []nri.Container{} containers := []nri.Container{}
for _, ctr := range a.cri.containerStore.List() { for _, ctr := range a.cri.ContainerStore().List() {
switch ctr.Status.Get().State() { switch ctr.Status.Get().State() {
case cri.ContainerState_CONTAINER_EXITED: case cri.ContainerState_CONTAINER_EXITED:
continue continue
@ -340,8 +401,8 @@ func (a *nriAPI) ListContainers() []nri.Container {
return containers return containers
} }
func (a *nriAPI) GetPodSandbox(id string) (nri.PodSandbox, bool) { func (a *API) GetPodSandbox(id string) (nri.PodSandbox, bool) {
pod, err := a.cri.sandboxStore.Get(id) pod, err := a.cri.SandboxStore().Get(id)
if err != nil { if err != nil {
return nil, false return nil, false
} }
@ -349,8 +410,8 @@ func (a *nriAPI) GetPodSandbox(id string) (nri.PodSandbox, bool) {
return a.nriPodSandbox(&pod), true return a.nriPodSandbox(&pod), true
} }
func (a *nriAPI) GetContainer(id string) (nri.Container, bool) { func (a *API) GetContainer(id string) (nri.Container, bool) {
ctr, err := a.cri.containerStore.Get(id) ctr, err := a.cri.ContainerStore().Get(id)
if err != nil { if err != nil {
return nil, false return nil, false
} }
@ -358,8 +419,8 @@ func (a *nriAPI) GetContainer(id string) (nri.Container, bool) {
return a.nriContainer(&ctr, nil), true return a.nriContainer(&ctr, nil), true
} }
func (a *nriAPI) UpdateContainer(ctx context.Context, u *api.ContainerUpdate) error { func (a *API) UpdateContainer(ctx context.Context, u *api.ContainerUpdate) error {
ctr, err := a.cri.containerStore.Get(u.ContainerId) ctr, err := a.cri.ContainerStore().Get(u.ContainerId)
if err != nil { if err != nil {
return nil return nil
} }
@ -370,7 +431,7 @@ func (a *nriAPI) UpdateContainer(ctx context.Context, u *api.ContainerUpdate) er
ContainerId: u.ContainerId, ContainerId: u.ContainerId,
Linux: u.GetLinux().GetResources().ToCRI(0), Linux: u.GetLinux().GetResources().ToCRI(0),
} }
newStatus, err := a.cri.updateContainerResources(ctx, ctr, criReq, status) newStatus, err := a.cri.UpdateContainerResources(ctx, ctr, criReq, status)
return newStatus, err return newStatus, err
}, },
) )
@ -383,12 +444,12 @@ func (a *nriAPI) UpdateContainer(ctx context.Context, u *api.ContainerUpdate) er
return nil return nil
} }
func (a *nriAPI) EvictContainer(ctx context.Context, e *api.ContainerEviction) error { func (a *API) EvictContainer(ctx context.Context, e *api.ContainerEviction) error {
ctr, err := a.cri.containerStore.Get(e.ContainerId) ctr, err := a.cri.ContainerStore().Get(e.ContainerId)
if err != nil { if err != nil {
return nil return nil
} }
err = a.cri.stopContainer(ctx, ctr, 0) err = a.cri.StopContainer(ctx, ctr, 0)
if err != nil { if err != nil {
return err return err
} }
@ -406,7 +467,7 @@ type criPodSandbox struct {
pid uint32 pid uint32
} }
func (a *nriAPI) nriPodSandbox(pod *sstore.Sandbox) *criPodSandbox { func (a *API) nriPodSandbox(pod *sstore.Sandbox) *criPodSandbox {
criPod := &criPodSandbox{ criPod := &criPodSandbox{
Sandbox: pod, Sandbox: pod,
spec: &specs.Spec{}, spec: &specs.Spec{},
@ -581,14 +642,14 @@ func (p *criPodSandbox) GetPid() uint32 {
// //
type criContainer struct { type criContainer struct {
api *nriAPI api *API
ctrs *containers.Container ctrs *containers.Container
spec *specs.Spec spec *specs.Spec
meta *cstore.Metadata meta *cstore.Metadata
pid uint32 pid uint32
} }
func (a *nriAPI) nriContainer(ctr interface{}, spec *specs.Spec) *criContainer { func (a *API) nriContainer(ctr interface{}, spec *specs.Spec) *criContainer {
switch c := ctr.(type) { switch c := ctr.(type) {
case *cstore.Container: case *cstore.Container:
ctx := ctrdutil.NamespacedContext() ctx := ctrdutil.NamespacedContext()
@ -623,7 +684,7 @@ func (a *nriAPI) nriContainer(ctr interface{}, spec *specs.Spec) *criContainer {
case *containers.Container: case *containers.Container:
ctrs := c ctrs := c
meta := &cstore.Metadata{} meta := &cstore.Metadata{}
if ext := ctrs.Extensions[containerMetadataExtension]; ext != nil { if ext := ctrs.Extensions[a.cri.ContainerMetadataExtensionKey()]; ext != nil {
err := typeurl.UnmarshalTo(ext, meta) err := typeurl.UnmarshalTo(ext, meta)
if err != nil { if err != nil {
log.L.WithError(err).Errorf("failed to get metadata for container %s", ctrs.ID) log.L.WithError(err).Errorf("failed to get metadata for container %s", ctrs.ID)
@ -666,7 +727,7 @@ func (c *criContainer) GetName() string {
} }
func (c *criContainer) GetState() api.ContainerState { func (c *criContainer) GetState() api.ContainerState {
criCtr, err := c.api.cri.containerStore.Get(c.GetID()) criCtr, err := c.api.cri.ContainerStore().Get(c.GetID())
if err != nil { if err != nil {
return api.ContainerState_CONTAINER_UNKNOWN return api.ContainerState_CONTAINER_UNKNOWN
} }

View File

@ -0,0 +1,145 @@
//go:build !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nri
import (
"context"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/opencontainers/runtime-spec/specs-go"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/pkg/nri"
"github.com/containerd/nri/pkg/api"
)
type API struct {
}
func NewAPI(nri.API) *API {
return nil
}
func (a *API) Register(CRIImplementation) error {
return nil
}
func (a *API) IsEnabled() bool {
return false
}
//
// CRI-NRI lifecycle hook no-op interface
//
func (*API) RunPodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*API) StopPodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*API) RemovePodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*API) PostCreateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*API) StartContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*API) PostStartContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*API) UpdateContainerResources(context.Context, *sstore.Sandbox, *cstore.Container, *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) {
return nil, nil
}
func (*API) PostUpdateContainerResources(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*API) StopContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*API) RemoveContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*API) UndoCreateContainer(context.Context, *sstore.Sandbox, string, *specs.Spec) {
}
func (*API) WithContainerAdjustment() containerd.NewContainerOpts {
return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error {
return nil
}
}
func (*API) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts {
return func(_ context.Context, _ containerd.Process) error {
return nil
}
}
//
// NRI-CRI no-op 'domain' interface
//
const (
nriDomain = constants.K8sContainerdNamespace
)
func (*API) GetName() string {
return nriDomain
}
func (*API) ListPodSandboxes() []nri.PodSandbox {
return nil
}
func (*API) ListContainers() []nri.Container {
return nil
}
func (*API) GetPodSandbox(string) (nri.PodSandbox, bool) {
return nil, false
}
func (*API) GetContainer(string) (nri.Container, bool) {
return nil, false
}
func (*API) UpdateContainer(context.Context, *api.ContainerUpdate) error {
return nil
}
func (*API) EvictContainer(context.Context, *api.ContainerEviction) error {
return nil
}

View File

@ -278,6 +278,15 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
opts = append(opts, containerd.WithSandbox(sandboxID)) opts = append(opts, containerd.WithSandbox(sandboxID))
} }
opts = append(opts, c.nri.WithContainerAdjustment())
defer func() {
if retErr != nil {
deferCtx, deferCancel := util.DeferContext()
defer deferCancel()
c.nri.UndoCreateContainer(deferCtx, &sandbox, id, spec)
}
}()
var cntr containerd.Container var cntr containerd.Container
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil { if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
return nil, fmt.Errorf("failed to create containerd container: %w", err) return nil, fmt.Errorf("failed to create containerd container: %w", err)
@ -318,6 +327,11 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
err = c.nri.PostCreateContainer(ctx, &sandbox, &container)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-create notification failed")
}
containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start) containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start)
return &runtime.CreateContainerResponse{ContainerId: id}, nil return &runtime.CreateContainerResponse{ContainerId: id}, nil

View File

@ -73,6 +73,16 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
} }
}() }()
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
err = c.nri.RemoveContainer(ctx, nil, &container)
} else {
err = c.nri.RemoveContainer(ctx, &sandbox, &container)
}
if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to remove container")
}
// NOTE(random-liu): Docker set container to "Dead" state when start removing the // NOTE(random-liu): Docker set container to "Dead" state when start removing the
// container so as to avoid start/restart the container again. However, for current // container so as to avoid start/restart the container again. However, for current
// kubelet implementation, we'll never start a container once we decide to remove it, // kubelet implementation, we'll never start a container once we decide to remove it,

View File

@ -27,8 +27,6 @@ import (
containerdio "github.com/containerd/containerd/cio" containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/nri"
v1 "github.com/containerd/nri/types/v1"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -135,7 +133,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
deferCtx, deferCancel := ctrdutil.DeferContext() deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel() defer deferCancel()
// It's possible that task is deleted by event monitor. // It's possible that task is deleted by event monitor.
if _, err := task.Delete(deferCtx, WithNRISandboxDelete(sandboxID), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("Failed to delete containerd task %q", id) log.G(ctx).WithError(err).Errorf("Failed to delete containerd task %q", id)
} }
} }
@ -146,18 +144,22 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to wait for containerd task: %w", err) return nil, fmt.Errorf("failed to wait for containerd task: %w", err)
} }
nric, err := nri.New()
defer func() {
if retErr != nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
err = c.nri.StopContainer(deferCtx, &sandbox, &cntr)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("unable to create nri client") log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id)
} }
if nric != nil {
nriSB := &nri.Sandbox{
ID: sandboxID,
Labels: sandbox.Config.Labels,
}
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil {
return nil, fmt.Errorf("nri invoke: %w", err)
} }
}()
err = c.nri.StartContainer(ctx, &sandbox, &cntr)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI container start failed")
return nil, fmt.Errorf("NRI container start failed: %w", err)
} }
// Start containerd task. // Start containerd task.
@ -179,6 +181,11 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
err = c.nri.PostStartContainer(ctx, &sandbox, &cntr)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-start notification failed")
}
containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start) containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start)
return &runtime.StartContainerResponse{}, nil return &runtime.StartContainerResponse{}, nil

View File

@ -47,6 +47,16 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
return nil, err return nil, err
} }
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
err = c.nri.StopContainer(ctx, nil, &container)
} else {
err = c.nri.StopContainer(ctx, &sandbox, &container)
}
if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to stop container")
}
i, err := container.Container.Info(ctx) i, err := container.Container.Info(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("get container info: %w", err) return nil, fmt.Errorf("get container info: %w", err)

View File

@ -42,6 +42,21 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to find container: %w", err) return nil, fmt.Errorf("failed to find container: %w", err)
} }
sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil {
return nil, err
}
resources := r.GetLinux()
updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources)
if err != nil {
return nil, fmt.Errorf("NRI container update failed: %w", err)
}
if updated != nil {
*resources = *updated
}
// Update resources in status update transaction, so that: // Update resources in status update transaction, so that:
// 1) There won't be race condition with container start. // 1) There won't be race condition with container start.
// 2) There won't be concurrent resource update to the same container. // 2) There won't be concurrent resource update to the same container.
@ -50,6 +65,12 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
}); err != nil { }); err != nil {
return nil, fmt.Errorf("failed to update resources: %w", err) return nil, fmt.Errorf("failed to update resources: %w", err)
} }
err = c.nri.PostUpdateContainerResources(ctx, &sandbox, &container)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-update notification failed")
}
return &runtime.UpdateContainerResourcesResponse{}, nil return &runtime.UpdateContainerResourcesResponse{}, nil
} }

View File

@ -385,7 +385,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
} }
} else { } else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, WithNRISandboxDelete(cntr.SandboxID), containerd.WithProcessKill); err != nil { if _, err = task.Delete(ctx, c.nri.WithContainerExit(&cntr), containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container: %w", err) return fmt.Errorf("failed to stop container: %w", err)
} }
@ -428,7 +428,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
} }
} else { } else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, WithNRISandboxDelete(sb.ID), containerd.WithProcessKill); err != nil { if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop sandbox: %w", err) return fmt.Errorf("failed to stop sandbox: %w", err)
} }

43
pkg/cri/sbserver/nri.go Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
criconfig "github.com/containerd/containerd/pkg/cri/config"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
)
type criImplementation struct {
c *criService
}
func (i *criImplementation) Config() *criconfig.Config {
return &i.c.config
}
func (i *criImplementation) SandboxStore() *sstore.Store {
return i.c.sandboxStore
}
func (i *criImplementation) ContainerStore() *cstore.Store {
return i.c.containerStore
}
func (i *criImplementation) ContainerMetadataExtensionKey() string {
return containerMetadataExtension
}

View File

@ -0,0 +1,35 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
"context"
"time"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) {
return i.c.updateContainerResources(ctx, ctr, req, status)
}
func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error {
return i.c.stopContainer(ctx, ctr, timeout)
}

View File

@ -0,0 +1,35 @@
//go:build !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
"context"
"time"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) {
return cstore.Status{}, nil
}
func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error {
return nil
}

View File

@ -1,51 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
"context"
"github.com/containerd/containerd"
"github.com/containerd/containerd/log"
"github.com/containerd/nri"
v1 "github.com/containerd/nri/types/v1"
)
// WithNRISandboxDelete calls delete for a sandbox'd task
func WithNRISandboxDelete(sandboxID string) containerd.ProcessDeleteOpts {
return func(ctx context.Context, p containerd.Process) error {
task, ok := p.(containerd.Task)
if !ok {
return nil
}
nric, err := nri.New()
if err != nil {
log.G(ctx).WithError(err).Error("unable to create nri client")
return nil
}
if nric == nil {
return nil
}
sb := &nri.Sandbox{
ID: sandboxID,
}
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Delete, sb); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to delete nri for %q", task.ID())
}
return nil
}
}

View File

@ -90,6 +90,11 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err) return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err)
} }
err = c.nri.RemovePodSandbox(ctx, &sandbox)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed")
}
// Remove sandbox from sandbox store. Note that once the sandbox is successfully // Remove sandbox from sandbox store. Note that once the sandbox is successfully
// deleted: // deleted:
// 1) ListPodSandbox will not include this sandbox. // 1) ListPodSandbox will not include this sandbox.

View File

@ -262,6 +262,19 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
sandbox.ProcessLabel = labels["selinux_label"] sandbox.ProcessLabel = labels["selinux_label"]
err = c.nri.RunPodSandbox(ctx, &sandbox)
if err != nil {
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
}
defer func() {
if retErr != nil {
deferCtx, deferCancel := util.DeferContext()
defer deferCancel()
c.nri.RemovePodSandbox(deferCtx, &sandbox)
}
}()
if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
// Set the pod sandbox as ready after successfully start sandbox container. // Set the pod sandbox as ready after successfully start sandbox container.
status.Pid = ctrl.Pid status.Pid = ctrl.Pid

View File

@ -80,6 +80,11 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop) sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
err := c.nri.StopPodSandbox(ctx, &sandbox)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed")
}
// Teardown network for sandbox. // Teardown network for sandbox.
if sandbox.NetNS != nil { if sandbox.NetNS != nil {
netStop := time.Now() netStop := time.Now()

View File

@ -30,6 +30,7 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/oci" "github.com/containerd/containerd/oci"
"github.com/containerd/containerd/pkg/cri/instrument" "github.com/containerd/containerd/pkg/cri/instrument"
"github.com/containerd/containerd/pkg/cri/nri"
"github.com/containerd/containerd/pkg/cri/sbserver/podsandbox" "github.com/containerd/containerd/pkg/cri/sbserver/podsandbox"
"github.com/containerd/containerd/pkg/cri/streaming" "github.com/containerd/containerd/pkg/cri/streaming"
"github.com/containerd/containerd/pkg/kmutex" "github.com/containerd/containerd/pkg/kmutex"
@ -120,10 +121,12 @@ type criService struct {
// containerEventsChan is used to capture container events and send them // containerEventsChan is used to capture container events and send them
// to the caller of GetContainerEvents. // to the caller of GetContainerEvents.
containerEventsChan chan runtime.ContainerEventResponse containerEventsChan chan runtime.ContainerEventResponse
// nri is used to hook NRI into CRI request processing.
nri *nri.API
} }
// NewCRIService returns a new instance of CRIService // NewCRIService returns a new instance of CRIService
func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) { func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) {
var err error var err error
labels := label.NewStore() labels := label.NewStore()
c := &criService{ c := &criService{
@ -191,6 +194,8 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs) c.sandboxControllers[criconfig.ModePodSandbox] = podsandbox.New(config, client, c.sandboxStore, c.os, c, c.baseOCISpecs)
c.sandboxControllers[criconfig.ModeShim] = client.SandboxController() c.sandboxControllers[criconfig.ModeShim] = client.SandboxController()
c.nri = nri
return c, nil return c, nil
} }
@ -271,6 +276,11 @@ func (c *criService) Run() error {
} }
}() }()
// register CRI domain with NRI
if err := c.nri.Register(&criImplementation{c}); err != nil {
return fmt.Errorf("failed to set up NRI for CRI service: %w", err)
}
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set() c.initialized.Set()

View File

@ -249,17 +249,14 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
containerd.WithContainerLabels(containerLabels), containerd.WithContainerLabels(containerLabels),
containerd.WithContainerExtension(containerMetadataExtension, &meta)) containerd.WithContainerExtension(containerMetadataExtension, &meta))
if c.nri.isEnabled() {
opts = append(opts, c.nri.WithContainerAdjustment()) opts = append(opts, c.nri.WithContainerAdjustment())
defer func() { defer func() {
if retErr != nil { if retErr != nil {
deferCtx, deferCancel := util.DeferContext() deferCtx, deferCancel := util.DeferContext()
defer deferCancel() defer deferCancel()
c.nri.undoCreateContainer(deferCtx, &sandbox, id, spec) c.nri.UndoCreateContainer(deferCtx, &sandbox, id, spec)
} }
}() }()
}
var cntr containerd.Container var cntr containerd.Container
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil { if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
@ -300,12 +297,10 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
} }
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT) c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
if c.nri.isEnabled() { err = c.nri.PostCreateContainer(ctx, &sandbox, &container)
err = c.nri.postCreateContainer(ctx, &sandbox, &container)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-create notification failed") log.G(ctx).WithError(err).Errorf("NRI post-create notification failed")
} }
}
containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start) containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start)

View File

@ -73,17 +73,15 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
} }
}() }()
if c.nri.isEnabled() {
sandbox, err := c.sandboxStore.Get(container.SandboxID) sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil { if err != nil {
err = c.nri.removeContainer(ctx, nil, &container) err = c.nri.RemoveContainer(ctx, nil, &container)
} else { } else {
err = c.nri.removeContainer(ctx, &sandbox, &container) err = c.nri.RemoveContainer(ctx, &sandbox, &container)
} }
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to remove container") log.G(ctx).WithError(err).Error("NRI failed to remove container")
} }
}
// NOTE(random-liu): Docker set container to "Dead" state when start removing the // NOTE(random-liu): Docker set container to "Dead" state when start removing the
// container so as to avoid start/restart the container again. However, for current // container so as to avoid start/restart the container again. However, for current

View File

@ -149,19 +149,17 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
if retErr != nil { if retErr != nil {
deferCtx, deferCancel := ctrdutil.DeferContext() deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel() defer deferCancel()
err = c.nri.stopContainer(deferCtx, &sandbox, &cntr) err = c.nri.StopContainer(deferCtx, &sandbox, &cntr)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id) log.G(ctx).WithError(err).Errorf("NRI stop failed for failed container %q", id)
} }
} }
}() }()
if c.nri.isEnabled() { err = c.nri.StartContainer(ctx, &sandbox, &cntr)
err = c.nri.startContainer(ctx, &sandbox, &cntr)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("NRI container start failed") log.G(ctx).WithError(err).Errorf("NRI container start failed")
return nil, fmt.Errorf("NRI container start failed: %w", err) return nil, fmt.Errorf("NRI container start failed: %w", err)
} }
}
// Start containerd task. // Start containerd task.
if err := task.Start(ctx); err != nil { if err := task.Start(ctx); err != nil {
@ -182,12 +180,10 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
if c.nri.isEnabled() { err = c.nri.PostStartContainer(ctx, &sandbox, &cntr)
err = c.nri.postStartContainer(ctx, &sandbox, &cntr)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-start notification failed") log.G(ctx).WithError(err).Errorf("NRI post-start notification failed")
} }
}
containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start) containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start)

View File

@ -47,17 +47,15 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
return nil, err return nil, err
} }
if c.nri.isEnabled() {
sandbox, err := c.sandboxStore.Get(container.SandboxID) sandbox, err := c.sandboxStore.Get(container.SandboxID)
if err != nil { if err != nil {
err = c.nri.stopContainer(ctx, nil, &container) err = c.nri.StopContainer(ctx, nil, &container)
} else { } else {
err = c.nri.stopContainer(ctx, &sandbox, &container) err = c.nri.StopContainer(ctx, &sandbox, &container)
} }
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to stop container") log.G(ctx).WithError(err).Error("NRI failed to stop container")
} }
}
i, err := container.Container.Info(ctx) i, err := container.Container.Info(ctx)
if err != nil { if err != nil {

View File

@ -32,7 +32,6 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerstore "github.com/containerd/containerd/pkg/cri/store/container" containerstore "github.com/containerd/containerd/pkg/cri/store/container"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util" ctrdutil "github.com/containerd/containerd/pkg/cri/util"
) )
@ -43,22 +42,19 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
return nil, fmt.Errorf("failed to find container: %w", err) return nil, fmt.Errorf("failed to find container: %w", err)
} }
var sandbox sandboxstore.Sandbox sandbox, err := c.sandboxStore.Get(container.SandboxID)
if c.nri.isEnabled() {
sandbox, err = c.sandboxStore.Get(container.SandboxID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resources := r.GetLinux() resources := r.GetLinux()
updated, err := c.nri.updateContainer(ctx, &sandbox, &container, resources) updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources)
if err != nil { if err != nil {
return nil, fmt.Errorf("NRI container update failed: %w", err) return nil, fmt.Errorf("NRI container update failed: %w", err)
} }
if updated != nil { if updated != nil {
*resources = *updated *resources = *updated
} }
}
// Update resources in status update transaction, so that: // Update resources in status update transaction, so that:
// 1) There won't be race condition with container start. // 1) There won't be race condition with container start.
@ -69,12 +65,10 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
return nil, fmt.Errorf("failed to update resources: %w", err) return nil, fmt.Errorf("failed to update resources: %w", err)
} }
if c.nri.isEnabled() { err = c.nri.PostUpdateContainerResources(ctx, &sandbox, &container)
err = c.nri.postUpdateContainer(ctx, &sandbox, &container)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("NRI post-update notification failed") log.G(ctx).WithError(err).Errorf("NRI post-update notification failed")
} }
}
return &runtime.UpdateContainerResourcesResponse{}, nil return &runtime.UpdateContainerResourcesResponse{}, nil
} }

View File

@ -1,133 +0,0 @@
//go:build !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/pkg/nri"
"github.com/containerd/nri/pkg/api"
)
type nriAPI struct {
cri *criService
nri nri.API
}
func (a *nriAPI) register() {
}
func (a *nriAPI) isEnabled() bool {
return false
}
//
// CRI-NRI lifecycle hook no-op interface
//
func (*nriAPI) runPodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*nriAPI) stopPodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*nriAPI) removePodSandbox(context.Context, *sstore.Sandbox) error {
return nil
}
func (*nriAPI) postCreateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) startContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) postStartContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) stopContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) removeContainer(context.Context, *sstore.Sandbox, *cstore.Container) error {
return nil
}
func (*nriAPI) undoCreateContainer(context.Context, *sstore.Sandbox, string, *specs.Spec) {
}
func (*nriAPI) WithContainerAdjustment() containerd.NewContainerOpts {
return func(ctx context.Context, _ *containerd.Client, c *containers.Container) error {
return nil
}
}
func (*nriAPI) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts {
return func(_ context.Context, _ containerd.Process) error {
return nil
}
}
//
// NRI-CRI no-op 'domain' interface
//
const (
nriDomain = constants.K8sContainerdNamespace
)
func (*nriAPI) GetName() string {
return nriDomain
}
func (*nriAPI) ListPodSandboxes() []nri.PodSandbox {
return nil
}
func (*nriAPI) ListContainers() []nri.Container {
return nil
}
func (*nriAPI) GetPodSandbox(string) (nri.PodSandbox, bool) {
return nil, false
}
func (*nriAPI) GetContainer(string) (nri.Container, bool) {
return nil, false
}
func (*nriAPI) UpdateContainer(context.Context, *api.ContainerUpdate) error {
return nil
}
func (*nriAPI) EvictContainer(context.Context, *api.ContainerEviction) error {
return nil
}

43
pkg/cri/server/nri.go Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
criconfig "github.com/containerd/containerd/pkg/cri/config"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
)
type criImplementation struct {
c *criService
}
func (i *criImplementation) Config() *criconfig.Config {
return &i.c.config
}
func (i *criImplementation) SandboxStore() *sstore.Store {
return i.c.sandboxStore
}
func (i *criImplementation) ContainerStore() *cstore.Store {
return i.c.containerStore
}
func (i *criImplementation) ContainerMetadataExtensionKey() string {
return containerMetadataExtension
}

View File

@ -0,0 +1,35 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"time"
cstore "github.com/containerd/containerd/pkg/cri/store/container"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) {
return i.c.updateContainerResources(ctx, ctr, req, status)
}
func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error {
return i.c.stopContainer(ctx, ctr, timeout)
}

View File

@ -1,4 +1,4 @@
//go:build windows //go:build !linux
/* /*
Copyright The containerd Authors. Copyright The containerd Authors.
@ -20,17 +20,16 @@ package server
import ( import (
"context" "context"
"time"
cstore "github.com/containerd/containerd/pkg/cri/store/container" cstore "github.com/containerd/containerd/pkg/cri/store/container"
sstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
cri "k8s.io/cri-api/pkg/apis/runtime/v1" cri "k8s.io/cri-api/pkg/apis/runtime/v1"
) )
func (*nriAPI) updateContainer(context.Context, *sstore.Sandbox, *cstore.Container, *cri.LinuxContainerResources) (*cri.LinuxContainerResources, error) { func (i *criImplementation) UpdateContainerResources(ctx context.Context, ctr cstore.Container, req *cri.UpdateContainerResourcesRequest, status cstore.Status) (cstore.Status, error) {
return nil, nil return cstore.Status{}, nil
} }
func (*nriAPI) postUpdateContainer(context.Context, *sstore.Sandbox, *cstore.Container) error { func (i *criImplementation) StopContainer(ctx context.Context, ctr cstore.Container, timeout time.Duration) error {
return nil return nil
} }

View File

@ -101,12 +101,10 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
log.G(ctx).Tracef("Remove called for sandbox container %q that does not exist", id) log.G(ctx).Tracef("Remove called for sandbox container %q that does not exist", id)
} }
if c.nri.isEnabled() { err = c.nri.RemovePodSandbox(ctx, &sandbox)
err = c.nri.removePodSandbox(ctx, &sandbox)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed") log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed")
} }
}
// Remove sandbox from sandbox store. Note that once the sandbox is successfully // Remove sandbox from sandbox store. Note that once the sandbox is successfully
// deleted: // deleted:

View File

@ -461,8 +461,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
sandboxCreateNetworkTimer.UpdateSince(netStart) sandboxCreateNetworkTimer.UpdateSince(netStart)
} }
if c.nri.isEnabled() { err = c.nri.RunPodSandbox(ctx, &sandbox)
err = c.nri.runPodSandbox(ctx, &sandbox)
if err != nil { if err != nil {
return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err) return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err)
} }
@ -471,10 +470,9 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
if retErr != nil { if retErr != nil {
deferCtx, deferCancel := util.DeferContext() deferCtx, deferCancel := util.DeferContext()
defer deferCancel() defer deferCancel()
c.nri.removePodSandbox(deferCtx, &sandbox) c.nri.RemovePodSandbox(deferCtx, &sandbox)
} }
}() }()
}
if err := task.Start(ctx); err != nil { if err := task.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) return nil, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)

View File

@ -82,12 +82,10 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
} }
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop) sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
if c.nri.isEnabled() { err := c.nri.StopPodSandbox(ctx, &sandbox)
err := c.nri.stopPodSandbox(ctx, &sandbox)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed") log.G(ctx).WithError(err).Errorf("NRI sandbox stop notification failed")
} }
}
// Teardown network for sandbox. // Teardown network for sandbox.
if sandbox.NetNS != nil { if sandbox.NetNS != nil {

View File

@ -29,9 +29,9 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/oci" "github.com/containerd/containerd/oci"
"github.com/containerd/containerd/pkg/cri/instrument" "github.com/containerd/containerd/pkg/cri/instrument"
"github.com/containerd/containerd/pkg/cri/nri"
"github.com/containerd/containerd/pkg/cri/streaming" "github.com/containerd/containerd/pkg/cri/streaming"
"github.com/containerd/containerd/pkg/kmutex" "github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/pkg/nri"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
runtime_alpha "github.com/containerd/containerd/third_party/k8s.io/cri-api/pkg/apis/runtime/v1alpha2" runtime_alpha "github.com/containerd/containerd/third_party/k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
cni "github.com/containerd/go-cni" cni "github.com/containerd/go-cni"
@ -112,15 +112,15 @@ type criService struct {
// one in-flight fetch request or unpack handler for a given descriptor's // one in-flight fetch request or unpack handler for a given descriptor's
// or chain ID. // or chain ID.
unpackDuplicationSuppressor kmutex.KeyedLocker unpackDuplicationSuppressor kmutex.KeyedLocker
// nri is used to hook NRI into CRI request processing.
nri *nriAPI nri *nri.API
// containerEventsChan is used to capture container events and send them // containerEventsChan is used to capture container events and send them
// to the caller of GetContainerEvents. // to the caller of GetContainerEvents.
containerEventsChan chan runtime.ContainerEventResponse containerEventsChan chan runtime.ContainerEventResponse
} }
// NewCRIService returns a new instance of CRIService // NewCRIService returns a new instance of CRIService
func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.API) (CRIService, error) { func NewCRIService(config criconfig.Config, client *containerd.Client, nri *nri.API) (CRIService, error) {
var err error var err error
labels := label.NewStore() labels := label.NewStore()
c := &criService{ c := &criService{
@ -183,12 +183,7 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.
return nil, err return nil, err
} }
if nrip != nil { c.nri = nri
c.nri = &nriAPI{
cri: c,
nri: nrip,
}
}
return c, nil return c, nil
} }
@ -258,7 +253,10 @@ func (c *criService) Run() error {
} }
}() }()
c.nri.register() // register CRI domain with NRI
if err := c.nri.Register(&criImplementation{c}); err != nil {
return fmt.Errorf("failed to set up NRI for CRI service: %w", err)
}
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set() c.initialized.Set()
@ -292,6 +290,7 @@ func (c *criService) Run() error {
if cniNetConfMonitorErr != nil { if cniNetConfMonitorErr != nil {
return fmt.Errorf("cni network conf monitor error: %w", cniNetConfMonitorErr) return fmt.Errorf("cni network conf monitor error: %w", cniNetConfMonitorErr)
} }
return nil return nil
} }