go.{mod,sum}: update NRI deps and re-vendor.

Update NRI dependencies to point to the latest main/HEAD.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
This commit is contained in:
Krisztian Litkey 2024-10-04 10:22:26 +03:00
parent 8b41368e7b
commit 4bd3a71dd6
No known key found for this signature in database
GPG Key ID: 637F2939D50AF85D
25 changed files with 1629 additions and 817 deletions

4
go.mod
View File

@ -21,11 +21,11 @@ require (
github.com/containerd/go-runc v1.1.0 github.com/containerd/go-runc v1.1.0
github.com/containerd/imgcrypt v1.2.0-rc1 github.com/containerd/imgcrypt v1.2.0-rc1
github.com/containerd/log v0.1.0 github.com/containerd/log v0.1.0
github.com/containerd/nri v0.6.1 github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39
github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723 github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723
github.com/containerd/platforms v0.2.1 github.com/containerd/platforms v0.2.1
github.com/containerd/plugin v0.1.0 github.com/containerd/plugin v0.1.0
github.com/containerd/ttrpc v1.2.5 github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287
github.com/containerd/typeurl/v2 v2.2.0 github.com/containerd/typeurl/v2 v2.2.0
github.com/containernetworking/cni v1.2.3 github.com/containernetworking/cni v1.2.3
github.com/containernetworking/plugins v1.5.1 github.com/containernetworking/plugins v1.5.1

16
go.sum
View File

@ -683,8 +683,8 @@ github.com/containerd/imgcrypt v1.2.0-rc1 h1:XESaAcMqxrGlRjQIqLdzxqsO/ddNK4vwfe7
github.com/containerd/imgcrypt v1.2.0-rc1/go.mod h1:F9roK2DzKlFnV+h+ZJy/r2FoS28bIvxKgdcoV7o8Sms= github.com/containerd/imgcrypt v1.2.0-rc1/go.mod h1:F9roK2DzKlFnV+h+ZJy/r2FoS28bIvxKgdcoV7o8Sms=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/containerd/nri v0.6.1 h1:xSQ6elnQ4Ynidm9u49ARK9wRKHs80HCUI+bkXOxV4mA= github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39 h1:M2Cz4Bm2++xdq0A/zWqunOjnp0HNYQnNDNSSzqCmqnE=
github.com/containerd/nri v0.6.1/go.mod h1:7+sX3wNx+LR7RzhjnJiUkFDhn18P5Bg/0VnJ/uXpRJM= github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39/go.mod h1:uSkgBrCdEtAiEz4vnrq8gmAC4EnVAM5Klt0OuK5rZYQ=
github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723 h1:swk9KxrmARZjSMrHc1Lzb39XhcDwAhYpqkBhinCFLCQ= github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723 h1:swk9KxrmARZjSMrHc1Lzb39XhcDwAhYpqkBhinCFLCQ=
github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723/go.mod h1:ZKzztepTSz/LKtbUSzfBNVwgqBEPABVZV9PQF/l53+Q= github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723/go.mod h1:ZKzztepTSz/LKtbUSzfBNVwgqBEPABVZV9PQF/l53+Q=
github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A=
@ -692,8 +692,8 @@ github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7np
github.com/containerd/plugin v0.1.0 h1:CYMyZk9beRAIe1FEKItbMLLAz/z16aXrGc+B+nv0fU4= github.com/containerd/plugin v0.1.0 h1:CYMyZk9beRAIe1FEKItbMLLAz/z16aXrGc+B+nv0fU4=
github.com/containerd/plugin v0.1.0/go.mod h1:j6HlpMtkiZMgT4UsfVNxPBUkwdw9KQGU6nCLfRxnq+w= github.com/containerd/plugin v0.1.0/go.mod h1:j6HlpMtkiZMgT4UsfVNxPBUkwdw9KQGU6nCLfRxnq+w=
github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak=
github.com/containerd/ttrpc v1.2.5 h1:IFckT1EFQoFBMG4c3sMdT8EP3/aKfumK1msY+Ze4oLU= github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287 h1:zwv64tCdT888KxuXQuv5i36cEdljoXq3sVqLmOEbCQI=
github.com/containerd/ttrpc v1.2.5/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o= github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o=
github.com/containerd/typeurl/v2 v2.2.0 h1:6NBDbQzr7I5LHgp34xAXYF5DOTQDn05X58lsPEmzLso= github.com/containerd/typeurl/v2 v2.2.0 h1:6NBDbQzr7I5LHgp34xAXYF5DOTQDn05X58lsPEmzLso=
github.com/containerd/typeurl/v2 v2.2.0/go.mod h1:8XOOxnyatxSWuG8OfsZXVnAF4iZfedjS/8UHSPJnX4g= github.com/containerd/typeurl/v2 v2.2.0/go.mod h1:8XOOxnyatxSWuG8OfsZXVnAF4iZfedjS/8UHSPJnX4g=
github.com/containernetworking/cni v1.2.3 h1:hhOcjNVUQTnzdRJ6alC5XF+wd9mfGIUaj8FuJbEslXM= github.com/containernetworking/cni v1.2.3 h1:hhOcjNVUQTnzdRJ6alC5XF+wd9mfGIUaj8FuJbEslXM=
@ -1001,10 +1001,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.34.0 h1:eSSPsPNp6ZpsG8X1OVmOTxig+CblTc4AxpPBykhe2Os=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/onsi/gomega v1.34.0/go.mod h1:MIKI8c+f+QLWk+hxbePD4i0LMJSExPaZOVfkoex4cAo=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=

3
vendor/github.com/containerd/nri/.codespellrc generated vendored Normal file
View File

@ -0,0 +1,3 @@
[codespell]
skip = .git,*.pdf,*.svg,go.sum,go.mod
ignore-words-list = clos

View File

@ -36,6 +36,9 @@ issues:
linters: linters:
- govet - govet
text: "copylocks: .*protobuf/internal/impl.MessageState.*" text: "copylocks: .*protobuf/internal/impl.MessageState.*"
# We dot-import ginkgo and gomega in some tests. Silence any related errors.
- path: 'pkg/adaptation|pkg/runtime-tools/generate|pkg/net/multiplex'
text: "dot-imports:"
run: run:
timeout: 2m timeout: 2m

View File

@ -124,7 +124,7 @@ $(BIN_PATH)/template: $(wildcard plugins/template/*.go)
test-gopkgs: ginkgo-tests test-ulimits test-gopkgs: ginkgo-tests test-ulimits
SKIPPED_PKGS="ulimit-adjuster" SKIPPED_PKGS="ulimit-adjuster,device-injector"
ginkgo-tests: ginkgo-tests:
$(Q)$(GINKGO) run \ $(Q)$(GINKGO) run \
@ -137,12 +137,15 @@ ginkgo-tests:
--coverprofile coverprofile \ --coverprofile coverprofile \
--succinct \ --succinct \
--skip-package $(SKIPPED_PKGS) \ --skip-package $(SKIPPED_PKGS) \
-r .; \ -r && \
$(GO_CMD) tool cover -html=$(COVERAGE_PATH)/coverprofile -o $(COVERAGE_PATH)/coverage.html $(GO_CMD) tool cover -html=$(COVERAGE_PATH)/coverprofile -o $(COVERAGE_PATH)/coverage.html
test-ulimits: test-ulimits:
$(Q)cd ./plugins/ulimit-adjuster && $(GO_TEST) -v $(Q)cd ./plugins/ulimit-adjuster && $(GO_TEST) -v
test-device-injector:
$(Q)cd ./plugins/device-injector && $(GO_TEST) -v
codecov: SHELL := $(shell which bash) codecov: SHELL := $(shell which bash)
codecov: codecov:
bash <(curl -s https://codecov.io/bash) -f $(COVERAGE_PATH)/coverprofile bash <(curl -s https://codecov.io/bash) -f $(COVERAGE_PATH)/coverprofile
@ -163,6 +166,13 @@ vet:
golangci-lint: golangci-lint:
$(Q)$(GOLANG_CILINT) run $(Q)$(GOLANG_CILINT) run
validate-repo-no-changes:
$(Q)test -z "$$(git status --short | tee /dev/stderr)" || { \
echo "Repository has changes."; \
echo "Please make sure to commit all changes, including generated files."; \
exit 1; \
}
# #
# proto generation targets # proto generation targets
# #

View File

@ -297,7 +297,7 @@ apply NRI responses to containers.
The plugin stub hides many of the low-level details of implementing an NRI The plugin stub hides many of the low-level details of implementing an NRI
plugin. It takes care of connection establishment, plugin registration, plugin. It takes care of connection establishment, plugin registration,
configuration, and event subscription. All [sample plugins](pkg/plugins) configuration, and event subscription. All [sample plugins](plugins)
are implemented using the stub. Any of these can be used as a tutorial on are implemented using the stub. Any of these can be used as a tutorial on
how the stub library should be used. how the stub library should be used.
@ -308,6 +308,7 @@ The following sample plugins exist for NRI:
- [logger](plugins/logger) - [logger](plugins/logger)
- [differ](plugins/differ) - [differ](plugins/differ)
- [device injector](plugins/device-injector) - [device injector](plugins/device-injector)
- [network device injector](plugins/network-device-injector)
- [OCI hook injector](plugins/hook-injector) - [OCI hook injector](plugins/hook-injector)
- [ulimit adjuster](plugins/ulimit-adjuster) - [ulimit adjuster](plugins/ulimit-adjuster)
- [NRI v0.1.0 plugin adapter](plugins/v010-adapter) - [NRI v0.1.0 plugin adapter](plugins/v010-adapter)

View File

@ -65,6 +65,7 @@ type Adaptation struct {
serverOpts []ttrpc.ServerOpt serverOpts []ttrpc.ServerOpt
listener net.Listener listener net.Listener
plugins []*plugin plugins []*plugin
syncLock sync.RWMutex
} }
var ( var (
@ -135,6 +136,7 @@ func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option)
pluginPath: DefaultPluginPath, pluginPath: DefaultPluginPath,
dropinPath: DefaultPluginConfigPath, dropinPath: DefaultPluginConfigPath,
socketPath: DefaultSocketPath, socketPath: DefaultSocketPath,
syncLock: sync.RWMutex{},
} }
for _, o := range opts { for _, o := range opts {
@ -336,25 +338,48 @@ func (r *Adaptation) startPlugins() (retErr error) {
}() }()
for i, name := range names { for i, name := range names {
log.Infof(noCtx, "starting plugin %q...", name) log.Infof(noCtx, "starting pre-installed NRI plugin %q...", name)
id := ids[i] id := ids[i]
p, err := r.newLaunchedPlugin(r.pluginPath, id, name, configs[i]) p, err := r.newLaunchedPlugin(r.pluginPath, id, name, configs[i])
if err != nil { if err != nil {
return fmt.Errorf("failed to start NRI plugin %q: %w", name, err) log.Warnf(noCtx, "failed to initialize pre-installed NRI plugin %q: %v", name, err)
continue
} }
if err := p.start(r.name, r.version); err != nil { if err := p.start(r.name, r.version); err != nil {
return err log.Warnf(noCtx, "failed to start pre-installed NRI plugin %q: %v", name, err)
continue
} }
plugins = append(plugins, p) plugins = append(plugins, p)
} }
// Although the error returned by syncPlugins may not be nil, r.syncFn could still ignores this error and returns a nil error.
// We need to make sure that the plugins are successfully synchronized in the `plugins`
syncPlugins := func(ctx context.Context, pods []*PodSandbox, containers []*Container) (updates []*ContainerUpdate, err error) {
startedPlugins := plugins
plugins = make([]*plugin, 0, len(plugins))
for _, plugin := range startedPlugins {
us, err := plugin.synchronize(ctx, pods, containers)
if err != nil {
plugin.stop()
log.Warnf(noCtx, "failed to synchronize pre-installed NRI plugin %q: %v", plugin.name(), err)
continue
}
plugins = append(plugins, plugin)
updates = append(updates, us...)
log.Infof(noCtx, "pre-installed NRI plugin %q synchronization success", plugin.name())
}
return updates, nil
}
if err := r.syncFn(noCtx, syncPlugins); err != nil {
return fmt.Errorf("failed to synchronize pre-installed NRI Plugins: %w", err)
}
r.plugins = plugins r.plugins = plugins
r.sortPlugins() r.sortPlugins()
return nil return nil
} }
@ -369,12 +394,22 @@ func (r *Adaptation) stopPlugins() {
} }
func (r *Adaptation) removeClosedPlugins() { func (r *Adaptation) removeClosedPlugins() {
active := []*plugin{} var active, closed []*plugin
for _, p := range r.plugins { for _, p := range r.plugins {
if !p.isClosed() { if p.isClosed() {
closed = append(closed, p)
} else {
active = append(active, p) active = append(active, p)
} }
} }
if len(closed) != 0 {
go func() {
for _, plugin := range closed {
plugin.stop()
}
}()
}
r.plugins = active r.plugins = active
} }
@ -431,6 +466,8 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
continue continue
} }
r.requestPluginSync()
err = r.syncFn(ctx, p.synchronize) err = r.syncFn(ctx, p.synchronize)
if err != nil { if err != nil {
log.Infof(ctx, "failed to synchronize plugin: %v", err) log.Infof(ctx, "failed to synchronize plugin: %v", err)
@ -439,9 +476,10 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
r.plugins = append(r.plugins, p) r.plugins = append(r.plugins, p)
r.sortPlugins() r.sortPlugins()
r.Unlock() r.Unlock()
log.Infof(ctx, "plugin %q connected and synchronized", p.name())
} }
log.Infof(ctx, "plugin %q connected", p.name()) r.finishedPluginSync()
} }
}() }()
@ -512,3 +550,30 @@ func (r *Adaptation) sortPlugins() {
} }
} }
} }
func (r *Adaptation) requestPluginSync() {
r.syncLock.Lock()
}
func (r *Adaptation) finishedPluginSync() {
r.syncLock.Unlock()
}
type PluginSyncBlock struct {
r *Adaptation
}
// BlockPluginSync blocks plugins from being synchronized/fully registered.
func (r *Adaptation) BlockPluginSync() *PluginSyncBlock {
r.syncLock.RLock()
return &PluginSyncBlock{r: r}
}
// Unblock a plugin sync. block put in place by BlockPluginSync. Safe to call
// multiple times but only from a single goroutine.
func (b *PluginSyncBlock) Unblock() {
if b != nil && b.r != nil {
b.r.syncLock.RUnlock()
b.r = nil
}
}

View File

@ -78,6 +78,7 @@ type (
LinuxMemory = api.LinuxMemory LinuxMemory = api.LinuxMemory
LinuxDevice = api.LinuxDevice LinuxDevice = api.LinuxDevice
LinuxDeviceCgroup = api.LinuxDeviceCgroup LinuxDeviceCgroup = api.LinuxDeviceCgroup
CDIDevice = api.CDIDevice
HugepageLimit = api.HugepageLimit HugepageLimit = api.HugepageLimit
Hooks = api.Hooks Hooks = api.Hooks
Hook = api.Hook Hook = api.Hook

View File

@ -33,13 +33,15 @@ import (
"github.com/containerd/nri/pkg/net" "github.com/containerd/nri/pkg/net"
"github.com/containerd/nri/pkg/net/multiplex" "github.com/containerd/nri/pkg/net/multiplex"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
const ( const (
// DefaultPluginRegistrationTimeout is the default timeout for plugin registration. // DefaultPluginRegistrationTimeout is the default timeout for plugin registration.
DefaultPluginRegistrationTimeout = 5 * time.Second DefaultPluginRegistrationTimeout = api.DefaultPluginRegistrationTimeout
// DefaultPluginRequestTimeout is the default timeout for plugins to handle a request. // DefaultPluginRequestTimeout is the default timeout for plugins to handle a request.
DefaultPluginRequestTimeout = 2 * time.Second DefaultPluginRequestTimeout = api.DefaultPluginRequestTimeout
) )
var ( var (
@ -384,9 +386,11 @@ func (p *plugin) configure(ctx context.Context, name, version, config string) er
defer cancel() defer cancel()
rpl, err := p.stub.Configure(ctx, &ConfigureRequest{ rpl, err := p.stub.Configure(ctx, &ConfigureRequest{
Config: config, Config: config,
RuntimeName: name, RuntimeName: name,
RuntimeVersion: version, RuntimeVersion: version,
RegistrationTimeout: getPluginRegistrationTimeout().Milliseconds(),
RequestTimeout: getPluginRequestTimeout().Milliseconds(),
}) })
if err != nil { if err != nil {
return fmt.Errorf("failed to configure plugin: %w", err) return fmt.Errorf("failed to configure plugin: %w", err)
@ -412,18 +416,101 @@ func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers
ctx, cancel := context.WithTimeout(ctx, getPluginRequestTimeout()) ctx, cancel := context.WithTimeout(ctx, getPluginRequestTimeout())
defer cancel() defer cancel()
req := &SynchronizeRequest{ var (
Pods: pods, podsToSend = pods
Containers: containers, ctrsToSend = containers
} podsPerMsg = len(pods)
rpl, err := p.stub.Synchronize(ctx, req) ctrsPerMsg = len(containers)
if err != nil {
return nil, err rpl *SynchronizeResponse
err error
)
for {
req := &SynchronizeRequest{
Pods: podsToSend[:podsPerMsg],
Containers: ctrsToSend[:ctrsPerMsg],
More: len(podsToSend) > podsPerMsg || len(ctrsToSend) > ctrsPerMsg,
}
log.Debugf(ctx, "sending sync message, %d/%d, %d/%d (more: %v)",
len(req.Pods), len(podsToSend), len(req.Containers), len(ctrsToSend), req.More)
rpl, err = p.stub.Synchronize(ctx, req)
if err == nil {
if !req.More {
break
}
if len(rpl.Update) > 0 || rpl.More != req.More {
p.close()
return nil, fmt.Errorf("plugin does not handle split sync requests")
}
podsToSend = podsToSend[podsPerMsg:]
ctrsToSend = ctrsToSend[ctrsPerMsg:]
if podsPerMsg > len(podsToSend) {
podsPerMsg = len(podsToSend)
}
if ctrsPerMsg > len(ctrsToSend) {
ctrsPerMsg = len(ctrsToSend)
}
} else {
podsPerMsg, ctrsPerMsg, err = recalcObjsPerSyncMsg(podsPerMsg, ctrsPerMsg, err)
if err != nil {
p.close()
return nil, err
}
log.Debugf(ctx, "oversized message, retrying in smaller chunks")
}
} }
return rpl.Update, nil return rpl.Update, nil
} }
func recalcObjsPerSyncMsg(pods, ctrs int, err error) (int, int, error) {
const (
minObjsPerMsg = 8
)
if status.Code(err) != codes.ResourceExhausted {
return pods, ctrs, err
}
if pods+ctrs <= minObjsPerMsg {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}
var e *ttrpc.OversizedMessageErr
if !errors.As(err, &e) {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}
maxLen := e.MaximumLength()
msgLen := e.RejectedLength()
if msgLen == 0 || maxLen == 0 || msgLen <= maxLen {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}
factor := float64(maxLen) / float64(msgLen)
if factor > 0.9 {
factor = 0.9
}
pods = int(float64(pods) * factor)
ctrs = int(float64(ctrs) * factor)
if pods+ctrs < minObjsPerMsg {
pods = minObjsPerMsg / 2
ctrs = minObjsPerMsg / 2
}
return pods, ctrs, nil
}
// Relay CreateContainer request to plugin. // Relay CreateContainer request to plugin.
func (p *plugin) createContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) { func (p *plugin) createContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
if !p.events.IsSet(Event_CREATE_CONTAINER) { if !p.events.IsSet(Event_CREATE_CONTAINER) {
@ -516,7 +603,7 @@ func (p *plugin) StateChange(ctx context.Context, evt *StateChangeEvent) error {
return nil return nil
} }
// isFatalError returns true if the error is fatal and the plugin connection shoudld be closed. // isFatalError returns true if the error is fatal and the plugin connection should be closed.
func isFatalError(err error) bool { func isFatalError(err error) bool {
switch { switch {
case errors.Is(err, ttrpc.ErrClosed): case errors.Is(err, ttrpc.ErrClosed):

View File

@ -19,6 +19,8 @@ package adaptation
import ( import (
"fmt" "fmt"
"strings" "strings"
"github.com/containerd/nri/pkg/api"
) )
type result struct { type result struct {
@ -89,6 +91,7 @@ func collectCreateContainerResult(request *CreateContainerRequest) *result {
Env: []*KeyValue{}, Env: []*KeyValue{},
Hooks: &Hooks{}, Hooks: &Hooks{},
Rlimits: []*POSIXRlimit{}, Rlimits: []*POSIXRlimit{},
CDIDevices: []*CDIDevice{},
Linux: &LinuxContainerAdjustment{ Linux: &LinuxContainerAdjustment{
Devices: []*LinuxDevice{}, Devices: []*LinuxDevice{},
Resources: &LinuxResources{ Resources: &LinuxResources{
@ -200,7 +203,7 @@ func (r *result) adjust(rpl *ContainerAdjustment, plugin string) error {
if err := r.adjustEnv(rpl.Env, plugin); err != nil { if err := r.adjustEnv(rpl.Env, plugin); err != nil {
return err return err
} }
if err := r.adjustHooks(rpl.Hooks, plugin); err != nil { if err := r.adjustHooks(rpl.Hooks); err != nil {
return err return err
} }
if rpl.Linux != nil { if rpl.Linux != nil {
@ -213,10 +216,16 @@ func (r *result) adjust(rpl *ContainerAdjustment, plugin string) error {
if err := r.adjustCgroupsPath(rpl.Linux.CgroupsPath, plugin); err != nil { if err := r.adjustCgroupsPath(rpl.Linux.CgroupsPath, plugin); err != nil {
return err return err
} }
if err := r.adjustOomScoreAdj(rpl.Linux.OomScoreAdj, plugin); err != nil {
return err
}
} }
if err := r.adjustRlimits(rpl.Rlimits, plugin); err != nil { if err := r.adjustRlimits(rpl.Rlimits, plugin); err != nil {
return err return err
} }
if err := r.adjustCDIDevices(rpl.CDIDevices, plugin); err != nil {
return err
}
return nil return nil
} }
@ -322,6 +331,13 @@ func (r *result) adjustMounts(mounts []*Mount, plugin string) error {
r.reply.adjust.Mounts = append(r.reply.adjust.Mounts, m) r.reply.adjust.Mounts = append(r.reply.adjust.Mounts, m)
} }
// next, apply deletions with no corresponding additions
for _, m := range del {
if _, ok := mod[api.ClearRemovalMarker(m.Destination)]; !ok {
r.reply.adjust.Mounts = append(r.reply.adjust.Mounts, m)
}
}
// finally, apply additions/modifications to plugin container creation request // finally, apply additions/modifications to plugin container creation request
create.Container.Mounts = append(create.Container.Mounts, add...) create.Container.Mounts = append(create.Container.Mounts, add...)
@ -386,6 +402,36 @@ func (r *result) adjustDevices(devices []*LinuxDevice, plugin string) error {
return nil return nil
} }
func (r *result) adjustCDIDevices(devices []*CDIDevice, plugin string) error {
if len(devices) == 0 {
return nil
}
// Notes:
// CDI devices are opaque references, typically to vendor specific
// devices. They get resolved to actual devices and potential related
// mounts, environment variables, etc. in the runtime. Unlike with
// devices, we only allow CDI device references to be added to a
// container, not removed. We pass them unresolved to the runtime and
// have them resolved there. Also unlike with devices, we don't include
// CDI device references in creation requests. However, since there
// is typically a strong ownership and a single related management entity
// per vendor/class for these devices we do treat multiple injection of
// the same CDI device reference as an error here.
id := r.request.create.Container.Id
// apply additions to collected adjustments
for _, d := range devices {
if err := r.owners.claimCDIDevice(id, d.Name, plugin); err != nil {
return err
}
r.reply.adjust.CDIDevices = append(r.reply.adjust.CDIDevices, d)
}
return nil
}
func (r *result) adjustEnv(env []*KeyValue, plugin string) error { func (r *result) adjustEnv(env []*KeyValue, plugin string) error {
if len(env) == 0 { if len(env) == 0 {
return nil return nil
@ -458,7 +504,7 @@ func splitEnvVar(s string) (string, string) {
return split[0], split[1] return split[0], split[1]
} }
func (r *result) adjustHooks(hooks *Hooks, plugin string) error { func (r *result) adjustHooks(hooks *Hooks) error {
if hooks == nil { if hooks == nil {
return nil return nil
} }
@ -645,7 +691,16 @@ func (r *result) adjustResources(resources *LinuxResources, plugin string) error
container.RdtClass = String(v.GetValue()) container.RdtClass = String(v.GetValue())
reply.RdtClass = String(v.GetValue()) reply.RdtClass = String(v.GetValue())
} }
if v := resources.GetPids(); v != nil {
if err := r.owners.claimPidsLimit(id, plugin); err != nil {
return err
}
pidv := &api.LinuxPids{
Limit: v.GetLimit(),
}
container.Pids = pidv
reply.Pids = pidv
}
return nil return nil
} }
@ -666,6 +721,23 @@ func (r *result) adjustCgroupsPath(path, plugin string) error {
return nil return nil
} }
func (r *result) adjustOomScoreAdj(OomScoreAdj *OptionalInt, plugin string) error {
if OomScoreAdj == nil {
return nil
}
create, id := r.request.create, r.request.create.Container.Id
if err := r.owners.claimOomScoreAdj(id, plugin); err != nil {
return err
}
create.Container.Linux.OomScoreAdj = OomScoreAdj
r.reply.adjust.Linux.OomScoreAdj = OomScoreAdj
return nil
}
func (r *result) adjustRlimits(rlimits []*POSIXRlimit, plugin string) error { func (r *result) adjustRlimits(rlimits []*POSIXRlimit, plugin string) error {
create, id, adjust := r.request.create, r.request.create.Container.Id, r.reply.adjust create, id, adjust := r.request.create, r.request.create.Container.Id, r.reply.adjust
for _, l := range rlimits { for _, l := range rlimits {
@ -820,6 +892,14 @@ func (r *result) updateResources(reply, u *ContainerUpdate, plugin string) error
} }
resources.RdtClass = String(v.GetValue()) resources.RdtClass = String(v.GetValue())
} }
if v := resources.GetPids(); v != nil {
if err := r.owners.claimPidsLimit(id, plugin); err != nil {
return err
}
resources.Pids = &api.LinuxPids{
Limit: v.GetLimit(),
}
}
// update request/reply from copy on success // update request/reply from copy on success
reply.Linux.Resources = resources.Copy() reply.Linux.Resources = resources.Copy()
@ -872,6 +952,7 @@ type owners struct {
annotations map[string]string annotations map[string]string
mounts map[string]string mounts map[string]string
devices map[string]string devices map[string]string
cdiDevices map[string]string
env map[string]string env map[string]string
memLimit string memLimit string
memReservation string memReservation string
@ -888,11 +969,13 @@ type owners struct {
cpuRealtimePeriod string cpuRealtimePeriod string
cpusetCpus string cpusetCpus string
cpusetMems string cpusetMems string
pidsLimit string
hugepageLimits map[string]string hugepageLimits map[string]string
blockioClass string blockioClass string
rdtClass string rdtClass string
unified map[string]string unified map[string]string
cgroupsPath string cgroupsPath string
oomScoreAdj string
rlimits map[string]string rlimits map[string]string
} }
@ -917,6 +1000,10 @@ func (ro resultOwners) claimDevice(id, path, plugin string) error {
return ro.ownersFor(id).claimDevice(path, plugin) return ro.ownersFor(id).claimDevice(path, plugin)
} }
func (ro resultOwners) claimCDIDevice(id, path, plugin string) error {
return ro.ownersFor(id).claimCDIDevice(path, plugin)
}
func (ro resultOwners) claimEnv(id, name, plugin string) error { func (ro resultOwners) claimEnv(id, name, plugin string) error {
return ro.ownersFor(id).claimEnv(name, plugin) return ro.ownersFor(id).claimEnv(name, plugin)
} }
@ -981,6 +1068,10 @@ func (ro resultOwners) claimCpusetMems(id, plugin string) error {
return ro.ownersFor(id).claimCpusetMems(plugin) return ro.ownersFor(id).claimCpusetMems(plugin)
} }
func (ro resultOwners) claimPidsLimit(id, plugin string) error {
return ro.ownersFor(id).claimPidsLimit(plugin)
}
func (ro resultOwners) claimHugepageLimit(id, size, plugin string) error { func (ro resultOwners) claimHugepageLimit(id, size, plugin string) error {
return ro.ownersFor(id).claimHugepageLimit(size, plugin) return ro.ownersFor(id).claimHugepageLimit(size, plugin)
} }
@ -1001,6 +1092,10 @@ func (ro resultOwners) claimCgroupsPath(id, plugin string) error {
return ro.ownersFor(id).claimCgroupsPath(plugin) return ro.ownersFor(id).claimCgroupsPath(plugin)
} }
func (ro resultOwners) claimOomScoreAdj(id, plugin string) error {
return ro.ownersFor(id).claimOomScoreAdj(plugin)
}
func (ro resultOwners) claimRlimits(id, typ, plugin string) error { func (ro resultOwners) claimRlimits(id, typ, plugin string) error {
return ro.ownersFor(id).claimRlimit(typ, plugin) return ro.ownersFor(id).claimRlimit(typ, plugin)
} }
@ -1038,6 +1133,17 @@ func (o *owners) claimDevice(path, plugin string) error {
return nil return nil
} }
func (o *owners) claimCDIDevice(name, plugin string) error {
if o.cdiDevices == nil {
o.cdiDevices = make(map[string]string)
}
if other, taken := o.cdiDevices[name]; taken {
return conflict(plugin, other, "CDI device", name)
}
o.cdiDevices[name] = plugin
return nil
}
func (o *owners) claimEnv(name, plugin string) error { func (o *owners) claimEnv(name, plugin string) error {
if o.env == nil { if o.env == nil {
o.env = make(map[string]string) o.env = make(map[string]string)
@ -1169,6 +1275,14 @@ func (o *owners) claimCpusetMems(plugin string) error {
return nil return nil
} }
func (o *owners) claimPidsLimit(plugin string) error {
if other := o.pidsLimit; other != "" {
return conflict(plugin, other, "pids pinning")
}
o.pidsLimit = plugin
return nil
}
func (o *owners) claimHugepageLimit(size, plugin string) error { func (o *owners) claimHugepageLimit(size, plugin string) error {
if o.hugepageLimits == nil { if o.hugepageLimits == nil {
o.hugepageLimits = make(map[string]string) o.hugepageLimits = make(map[string]string)
@ -1227,6 +1341,14 @@ func (o *owners) claimCgroupsPath(plugin string) error {
return nil return nil
} }
func (o *owners) claimOomScoreAdj(plugin string) error {
if other := o.oomScoreAdj; other != "" {
return conflict(plugin, other, "oom score adj")
}
o.oomScoreAdj = plugin
return nil
}
func (ro resultOwners) clearAnnotation(id, key string) { func (ro resultOwners) clearAnnotation(id, key string) {
ro.ownersFor(id).clearAnnotation(key) ro.ownersFor(id).clearAnnotation(key)
} }

View File

@ -129,6 +129,11 @@ func (a *ContainerAdjustment) RemoveDevice(path string) {
}) })
} }
// AddCDIDevice records the addition of the given CDI device to a container.
func (a *ContainerAdjustment) AddCDIDevice(d *CDIDevice) {
a.CDIDevices = append(a.CDIDevices, d) // TODO: should we dup d here ?
}
// SetLinuxMemoryLimit records setting the memory limit for a container. // SetLinuxMemoryLimit records setting the memory limit for a container.
func (a *ContainerAdjustment) SetLinuxMemoryLimit(value int64) { func (a *ContainerAdjustment) SetLinuxMemoryLimit(value int64) {
a.initLinuxResourcesMemory() a.initLinuxResourcesMemory()
@ -219,6 +224,12 @@ func (a *ContainerAdjustment) SetLinuxCPUSetMems(value string) {
a.Linux.Resources.Cpu.Mems = value a.Linux.Resources.Cpu.Mems = value
} }
// SetLinuxPidLimits records setting the pid max number for a container.
func (a *ContainerAdjustment) SetLinuxPidLimits(value int64) {
a.initLinuxResourcesPids()
a.Linux.Resources.Pids.Limit = value
}
// AddLinuxHugepageLimit records adding a hugepage limit for a container. // AddLinuxHugepageLimit records adding a hugepage limit for a container.
func (a *ContainerAdjustment) AddLinuxHugepageLimit(pageSize string, value uint64) { func (a *ContainerAdjustment) AddLinuxHugepageLimit(pageSize string, value uint64) {
a.initLinuxResources() a.initLinuxResources()
@ -253,6 +264,12 @@ func (a *ContainerAdjustment) SetLinuxCgroupsPath(value string) {
a.Linux.CgroupsPath = value a.Linux.CgroupsPath = value
} }
// SetLinuxOomScoreAdj records setting the kernel's Out-Of-Memory (OOM) killer score for a container.
func (a *ContainerAdjustment) SetLinuxOomScoreAdj(value *int) {
a.initLinux()
a.Linux.OomScoreAdj = Int(value) // using Int(value) from ./options.go to optionally allocate a pointer to normalized copy of value
}
// //
// Initializing a container adjustment and container update. // Initializing a container adjustment and container update.
// //
@ -302,6 +319,13 @@ func (a *ContainerAdjustment) initLinuxResourcesCPU() {
} }
} }
func (a *ContainerAdjustment) initLinuxResourcesPids() {
a.initLinuxResources()
if a.Linux.Resources.Pids == nil {
a.Linux.Resources.Pids = &LinuxPids{}
}
}
func (a *ContainerAdjustment) initLinuxResourcesUnified() { func (a *ContainerAdjustment) initLinuxResourcesUnified() {
a.initLinuxResources() a.initLinuxResources()
if a.Linux.Resources.Unified == nil { if a.Linux.Resources.Unified == nil {

File diff suppressed because it is too large Load Diff

View File

@ -46,7 +46,7 @@ message UpdateContainersRequest {
} }
message UpdateContainersResponse { message UpdateContainersResponse {
// Containers that the runtime failed to udpate. // Containers that the runtime failed to update.
repeated ContainerUpdate failed = 1; repeated ContainerUpdate failed = 1;
} }
@ -114,6 +114,10 @@ message ConfigureRequest {
string runtime_name = 2; string runtime_name = 2;
// Version of the runtime NRI is running in. // Version of the runtime NRI is running in.
string runtime_version = 3; string runtime_version = 3;
// Configured registration timeout in milliseconds.
int64 registration_timeout = 4;
// Configured request processing timeout in milliseconds.
int64 request_timeout = 5;
} }
message ConfigureResponse { message ConfigureResponse {
@ -127,11 +131,15 @@ message SynchronizeRequest {
repeated PodSandbox pods = 1; repeated PodSandbox pods = 1;
// Containers known to the runtime. // Containers known to the runtime.
repeated Container containers = 2; repeated Container containers = 2;
// Whether there are more pods and containers to follow.
bool more = 3;
} }
message SynchronizeResponse { message SynchronizeResponse {
// Updates to containers requested by the plugin. // Updates to containers requested by the plugin.
repeated ContainerUpdate update = 1; repeated ContainerUpdate update = 1;
// Whether the client is able to handle more advertised pods and containers.
bool more = 2;
} }
message CreateContainerRequest { message CreateContainerRequest {
@ -319,6 +327,11 @@ message LinuxDeviceCgroup {
string access = 5; string access = 5;
} }
// A CDI device reference.
message CDIDevice {
string name = 1;
}
// Container (linux) resources. // Container (linux) resources.
message LinuxResources { message LinuxResources {
LinuxMemory memory = 1; LinuxMemory memory = 1;
@ -328,6 +341,7 @@ message LinuxResources {
OptionalString rdt_class = 5; OptionalString rdt_class = 5;
map<string, string> unified = 6; map<string, string> unified = 6;
repeated LinuxDeviceCgroup devices = 7; // for NRI v1 emulation repeated LinuxDeviceCgroup devices = 7; // for NRI v1 emulation
LinuxPids pids = 8;
} }
// Memory-related parts of (linux) resources. // Memory-related parts of (linux) resources.
@ -366,6 +380,11 @@ message POSIXRlimit {
uint64 soft = 3; uint64 soft = 3;
} }
// Pids-related parts of (linux) resources.
message LinuxPids {
int64 limit = 1;
}
// Requested adjustments to a container being created. // Requested adjustments to a container being created.
message ContainerAdjustment { message ContainerAdjustment {
map<string, string> annotations = 2; map<string, string> annotations = 2;
@ -374,6 +393,7 @@ message ContainerAdjustment {
Hooks hooks = 5; Hooks hooks = 5;
LinuxContainerAdjustment linux = 6; LinuxContainerAdjustment linux = 6;
repeated POSIXRlimit rlimits = 7; repeated POSIXRlimit rlimits = 7;
repeated CDIDevice CDI_devices = 8;
} }
// Adjustments to (linux) resources. // Adjustments to (linux) resources.
@ -381,6 +401,7 @@ message LinuxContainerAdjustment {
repeated LinuxDevice devices = 1; repeated LinuxDevice devices = 1;
LinuxResources resources = 2; LinuxResources resources = 2;
string cgroups_path = 3; string cgroups_path = 3;
OptionalInt oom_score_adj = 4;
} }
// Requested update to an already created container. // Requested update to an already created container.

View File

@ -57,3 +57,14 @@ func IsMarkedForRemoval(key string) (string, bool) {
func MarkForRemoval(key string) string { func MarkForRemoval(key string) string {
return "-" + key return "-" + key
} }
// ClearRemovalMarker returns a key cleared from any removal marker.
func ClearRemovalMarker(key string) string {
if key == "" {
return ""
}
if key[0] == '-' {
return key[1:]
}
return key
}

View File

@ -22,7 +22,7 @@ import (
) )
// FromOCILinuxResources returns resources from an OCI runtime Spec. // FromOCILinuxResources returns resources from an OCI runtime Spec.
func FromOCILinuxResources(o *rspec.LinuxResources, ann map[string]string) *LinuxResources { func FromOCILinuxResources(o *rspec.LinuxResources, _ map[string]string) *LinuxResources {
if o == nil { if o == nil {
return nil return nil
} }
@ -65,6 +65,11 @@ func FromOCILinuxResources(o *rspec.LinuxResources, ann map[string]string) *Linu
Access: d.Access, Access: d.Access,
}) })
} }
if p := o.Pids; p != nil {
l.Pids = &LinuxPids{
Limit: p.Limit,
}
}
return l return l
} }
@ -148,7 +153,11 @@ func (r *LinuxResources) ToOCI() *rspec.LinuxResources {
Access: d.Access, Access: d.Access,
}) })
} }
if r.Pids != nil {
o.Pids = &rspec.LinuxPids{
Limit: r.Pids.Limit,
}
}
return o return o
} }
@ -226,6 +235,11 @@ func (r *LinuxResources) Copy() *LinuxResources {
o.Unified[k] = v o.Unified[k] = v
} }
} }
if r.Pids != nil {
o.Pids = &LinuxPids{
Limit: r.Pids.Limit,
}
}
o.BlockioClass = String(r.BlockioClass) o.BlockioClass = String(r.BlockioClass)
o.RdtClass = String(r.RdtClass) o.RdtClass = String(r.RdtClass)

28
vendor/github.com/containerd/nri/pkg/api/timeouts.go generated vendored Normal file
View File

@ -0,0 +1,28 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"time"
)
const (
// DefaultPluginRegistrationTimeout is the default timeout for plugin registration.
DefaultPluginRegistrationTimeout = 5 * time.Second
// DefaultPluginRequestTimeout is the default timeout for plugins to handle a request.
DefaultPluginRequestTimeout = 2 * time.Second
)

View File

@ -112,6 +112,12 @@ func (u *ContainerUpdate) SetLinuxCPUSetMems(value string) {
u.Linux.Resources.Cpu.Mems = value u.Linux.Resources.Cpu.Mems = value
} }
// SetLinuxPidLimits records setting the pid max number for a container.
func (u *ContainerUpdate) SetLinuxPidLimits(value int64) {
u.initLinuxResourcesPids()
u.Linux.Resources.Pids.Limit = value
}
// AddLinuxHugepageLimit records adding a hugepage limit for a container. // AddLinuxHugepageLimit records adding a hugepage limit for a container.
func (u *ContainerUpdate) AddLinuxHugepageLimit(pageSize string, value uint64) { func (u *ContainerUpdate) AddLinuxHugepageLimit(pageSize string, value uint64) {
u.initLinuxResources() u.initLinuxResources()
@ -184,3 +190,10 @@ func (u *ContainerUpdate) initLinuxResourcesUnified() {
u.Linux.Resources.Unified = make(map[string]string) u.Linux.Resources.Unified = make(map[string]string)
} }
} }
func (u *ContainerUpdate) initLinuxResourcesPids() {
u.initLinuxResources()
if u.Linux.Resources.Pids == nil {
u.Linux.Resources.Pids = &LinuxPids{}
}
}

View File

@ -139,7 +139,7 @@ const (
// length of frame header: 4-byte ConnID, 4-byte payload length // length of frame header: 4-byte ConnID, 4-byte payload length
headerLen = 8 headerLen = 8
// max. allowed payload size // max. allowed payload size
maxPayloadSize = 1 << 24 maxPayloadSize = ttrpcMessageHeaderLength + ttrpcMessageLengthMax
) )
// conn represents a single multiplexed connection. // conn represents a single multiplexed connection.
@ -234,38 +234,54 @@ func (m *mux) Listen(id ConnID) (net.Listener, error) {
} }
func (m *mux) write(id ConnID, buf []byte) (int, error) { func (m *mux) write(id ConnID, buf []byte) (int, error) {
var hdr [headerLen]byte var (
hdr [headerLen]byte
if len(buf) > maxPayloadSize { data = buf[:]
return 0, syscall.EMSGSIZE size = len(data)
} )
binary.BigEndian.PutUint32(hdr[0:4], uint32(id))
binary.BigEndian.PutUint32(hdr[4:8], uint32(len(buf)))
m.writeLock.Lock() m.writeLock.Lock()
defer m.writeLock.Unlock() defer m.writeLock.Unlock()
n, err := m.trunk.Write(hdr[:]) for {
if err != nil { if size > maxPayloadSize {
err = fmt.Errorf("failed to write header to trunk: %w", err) size = maxPayloadSize
if n != 0 {
m.setError(err)
m.Close()
} }
return 0, err
}
n, err = m.trunk.Write(buf) binary.BigEndian.PutUint32(hdr[0:4], uint32(id))
if err != nil { binary.BigEndian.PutUint32(hdr[4:8], uint32(size))
err = fmt.Errorf("failed to write payload to trunk: %w", err)
if n != 0 { n, err := m.trunk.Write(hdr[:])
m.setError(err) if err != nil {
m.Close() err = fmt.Errorf("failed to write header to trunk: %w", err)
if n != 0 {
m.setError(err)
m.Close()
}
return 0, err
}
n, err = m.trunk.Write(data[:size])
if err != nil {
err = fmt.Errorf("failed to write payload to trunk: %w", err)
if n != 0 {
m.setError(err)
m.Close()
}
return 0, err
}
data = data[size:]
if size > len(data) {
size = len(data)
}
if size == 0 {
break
} }
} }
return n, err return len(buf), nil
} }
func (m *mux) reader() { func (m *mux) reader() {
@ -429,16 +445,16 @@ func (c *conn) RemoteAddr() net.Addr {
} }
// SetDeadline is the unimplemented stub for the corresponding net.Conn function. // SetDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetDeadline(t time.Time) error { func (c *conn) SetDeadline(_ time.Time) error {
return nil return nil
} }
// SetReadDeadline is the unimplemented stub for the corresponding net.Conn function. // SetReadDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetReadDeadline(t time.Time) error { func (c *conn) SetReadDeadline(_ time.Time) error {
return nil return nil
} }
// SetWriteDeadline is the unimplemented stub for the corresponding net.Conn function. // SetWriteDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetWriteDeadline(t time.Time) error { func (c *conn) SetWriteDeadline(_ time.Time) error {
return nil return nil
} }

View File

@ -22,3 +22,8 @@ const (
// RuntimeServiceConn is the mux connection ID for NRI runtime services. // RuntimeServiceConn is the mux connection ID for NRI runtime services.
RuntimeServiceConn RuntimeServiceConn
) )
const (
ttrpcMessageHeaderLength = 10
ttrpcMessageLengthMax = 4 << 20
)

View File

@ -40,6 +40,7 @@ type Generator struct {
filterAnnotations func(map[string]string) (map[string]string, error) filterAnnotations func(map[string]string) (map[string]string, error)
resolveBlockIO func(string) (*rspec.LinuxBlockIO, error) resolveBlockIO func(string) (*rspec.LinuxBlockIO, error)
resolveRdt func(string) (*rspec.LinuxIntelRdt, error) resolveRdt func(string) (*rspec.LinuxIntelRdt, error)
injectCDIDevices func(*rspec.Spec, []string) error
checkResources func(*rspec.LinuxResources) error checkResources func(*rspec.LinuxResources) error
} }
@ -91,6 +92,14 @@ func WithResourceChecker(fn func(*rspec.LinuxResources) error) GeneratorOption {
} }
} }
// WithCDIDeviceInjector specifies a runtime-specific function to use for CDI
// device resolution and injection into an OCI Spec.
func WithCDIDeviceInjector(fn func(*rspec.Spec, []string) error) GeneratorOption {
return func(g *Generator) {
g.injectCDIDevices = fn
}
}
// Adjust adjusts all aspects of the OCI Spec that NRI knows/cares about. // Adjust adjusts all aspects of the OCI Spec that NRI knows/cares about.
func (g *Generator) Adjust(adjust *nri.ContainerAdjustment) error { func (g *Generator) Adjust(adjust *nri.ContainerAdjustment) error {
if adjust == nil { if adjust == nil {
@ -102,8 +111,12 @@ func (g *Generator) Adjust(adjust *nri.ContainerAdjustment) error {
} }
g.AdjustEnv(adjust.GetEnv()) g.AdjustEnv(adjust.GetEnv())
g.AdjustHooks(adjust.GetHooks()) g.AdjustHooks(adjust.GetHooks())
if err := g.InjectCDIDevices(adjust.GetCDIDevices()); err != nil {
return err
}
g.AdjustDevices(adjust.GetLinux().GetDevices()) g.AdjustDevices(adjust.GetLinux().GetDevices())
g.AdjustCgroupsPath(adjust.GetLinux().GetCgroupsPath()) g.AdjustCgroupsPath(adjust.GetLinux().GetCgroupsPath())
g.AdjustOomScoreAdj(adjust.GetLinux().GetOomScoreAdj())
resources := adjust.GetLinux().GetResources() resources := adjust.GetLinux().GetResources()
if err := g.AdjustResources(resources); err != nil { if err := g.AdjustResources(resources); err != nil {
@ -252,7 +265,9 @@ func (g *Generator) AdjustResources(r *nri.LinuxResources) error {
for k, v := range r.Unified { for k, v := range r.Unified {
g.AddLinuxResourcesUnified(k, v) g.AddLinuxResourcesUnified(k, v)
} }
if v := r.GetPids(); v != nil {
g.SetLinuxResourcesPidsLimit(v.GetLimit())
}
if g.checkResources != nil { if g.checkResources != nil {
if err := g.checkResources(g.Config.Linux.Resources); err != nil { if err := g.checkResources(g.Config.Linux.Resources); err != nil {
return fmt.Errorf("failed to adjust resources in OCI Spec: %w", err) return fmt.Errorf("failed to adjust resources in OCI Spec: %w", err)
@ -309,6 +324,17 @@ func (g *Generator) AdjustCgroupsPath(path string) {
} }
} }
// AdjustOomScoreAdj adjusts the kernel's Out-Of-Memory (OOM) killer score for the container.
// This may override kubelet's settings for OOM score.
func (g *Generator) AdjustOomScoreAdj(score *nri.OptionalInt) {
if score != nil {
g.SetProcessOOMScoreAdj(int(score.Value))
} else {
g.SetProcessOOMScoreAdj(0)
g.Config.Process.OOMScoreAdj = nil
}
}
// AdjustDevices adjusts the (Linux) devices in the OCI Spec. // AdjustDevices adjusts the (Linux) devices in the OCI Spec.
func (g *Generator) AdjustDevices(devices []*nri.LinuxDevice) { func (g *Generator) AdjustDevices(devices []*nri.LinuxDevice) {
for _, d := range devices { for _, d := range devices {
@ -323,6 +349,23 @@ func (g *Generator) AdjustDevices(devices []*nri.LinuxDevice) {
} }
} }
// InjectCDIDevices injects the requested CDI devices into the OCI Spec.
// Devices are given by their fully qualified CDI device names. The
// actual device injection is done using a runtime-specific CDI
// injection function, set using the WithCDIDeviceInjector option.
func (g *Generator) InjectCDIDevices(devices []*nri.CDIDevice) error {
if len(devices) == 0 || g.injectCDIDevices == nil {
return nil
}
names := []string{}
for _, d := range devices {
names = append(names, d.Name)
}
return g.injectCDIDevices(g.Config, names)
}
func (g *Generator) AdjustRlimits(rlimits []*nri.POSIXRlimit) error { func (g *Generator) AdjustRlimits(rlimits []*nri.POSIXRlimit) error {
for _, l := range rlimits { for _, l := range rlimits {
if l == nil { if l == nil {

View File

@ -35,7 +35,7 @@ import (
) )
// Plugin can implement a number of interfaces related to Pod and Container // Plugin can implement a number of interfaces related to Pod and Container
// lifecycle events. No any single such inteface is mandatory, therefore the // lifecycle events. No any single such interface is mandatory, therefore the
// Plugin interface itself is empty. Plugins are required to implement at // Plugin interface itself is empty. Plugins are required to implement at
// least one of these interfaces and this is verified during stub creation. // least one of these interfaces and this is verified during stub creation.
// Trying to create a stub for a plugin violating this requirement will fail // Trying to create a stub for a plugin violating this requirement will fail
@ -137,7 +137,9 @@ type PostUpdateContainerInterface interface {
// Stub is the interface the stub provides for the plugin implementation. // Stub is the interface the stub provides for the plugin implementation.
type Stub interface { type Stub interface {
// Run the plugin. Starts the plugin then waits for an error or the plugin to stop // Run starts the plugin then waits for the plugin service to exit, either due to a
// critical error or an explicit call to Stop(). Once Run() returns, the plugin can be
// restarted by calling Run() or Start() again.
Run(context.Context) error Run(context.Context) error
// Start the plugin. // Start the plugin.
Start(context.Context) error Start(context.Context) error
@ -148,11 +150,23 @@ type Stub interface {
// UpdateContainer requests unsolicited updates to containers. // UpdateContainer requests unsolicited updates to containers.
UpdateContainers([]*api.ContainerUpdate) ([]*api.ContainerUpdate, error) UpdateContainers([]*api.ContainerUpdate) ([]*api.ContainerUpdate, error)
// RegistrationTimeout returns the registration timeout for the stub.
// This is the default timeout if the plugin has not been started or
// the timeout received in the Configure request otherwise.
RegistrationTimeout() time.Duration
// RequestTimeout returns the request timeout for the stub.
// This is the default timeout if the plugin has not been started or
// the timeout received in the Configure request otherwise.
RequestTimeout() time.Duration
} }
const ( const (
// Plugin registration timeout. // DefaultRegistrationTimeout is the default plugin registration timeout.
registrationTimeout = 2 * time.Second DefaultRegistrationTimeout = api.DefaultPluginRegistrationTimeout
// DefaultRequestTimeout is the default plugin request processing timeout.
DefaultRequestTimeout = api.DefaultPluginRequestTimeout
) )
var ( var (
@ -255,11 +269,14 @@ type stub struct {
rpcs *ttrpc.Server rpcs *ttrpc.Server
rpcc *ttrpc.Client rpcc *ttrpc.Client
runtime api.RuntimeService runtime api.RuntimeService
closeOnce sync.Once
started bool started bool
doneC chan struct{} doneC chan struct{}
srvErrC chan error srvErrC chan error
cfgErrC chan error cfgErrC chan error
syncReq *api.SynchronizeRequest
registrationTimeout time.Duration
requestTimeout time.Duration
} }
// Handlers for NRI plugin event and request. // Handlers for NRI plugin event and request.
@ -288,7 +305,9 @@ func New(p interface{}, opts ...Option) (Stub, error) {
idx: os.Getenv(api.PluginIdxEnvVar), idx: os.Getenv(api.PluginIdxEnvVar),
socketPath: api.DefaultSocketPath, socketPath: api.DefaultSocketPath,
dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) }, dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) },
doneC: make(chan struct{}),
registrationTimeout: DefaultRegistrationTimeout,
requestTimeout: DefaultRequestTimeout,
} }
for _, o := range opts { for _, o := range opts {
@ -316,10 +335,10 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
stub.Lock() stub.Lock()
defer stub.Unlock() defer stub.Unlock()
if stub.started { if stub.isStarted() {
return fmt.Errorf("stub already started") return fmt.Errorf("stub already started")
} }
stub.started = true stub.doneC = make(chan struct{})
err := stub.connect() err := stub.connect()
if err != nil { if err != nil {
@ -378,10 +397,11 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
stub.srvErrC = make(chan error, 1) stub.srvErrC = make(chan error, 1)
stub.cfgErrC = make(chan error, 1) stub.cfgErrC = make(chan error, 1)
go func() {
stub.srvErrC <- rpcs.Serve(ctx, rpcl) go func(l stdnet.Listener, doneC chan struct{}, srvErrC chan error) {
close(stub.doneC) srvErrC <- rpcs.Serve(ctx, l)
}() close(doneC)
}(rpcl, stub.doneC, stub.srvErrC)
stub.rpcm = rpcm stub.rpcm = rpcm
stub.rpcl = rpcl stub.rpcl = rpcl
@ -401,6 +421,7 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
log.Infof(ctx, "Started plugin %s...", stub.Name()) log.Infof(ctx, "Started plugin %s...", stub.Name())
stub.started = true
return nil return nil
} }
@ -413,24 +434,43 @@ func (stub *stub) Stop() {
stub.close() stub.close()
} }
// IsStarted returns true if the plugin has been started either by Start() or by Run().
func (stub *stub) IsStarted() bool {
stub.Lock()
defer stub.Unlock()
return stub.isStarted()
}
func (stub *stub) isStarted() bool {
return stub.started
}
// reset stub to the status that can initiate a new
// NRI connection, the caller must hold lock.
func (stub *stub) close() { func (stub *stub) close() {
stub.closeOnce.Do(func() { if !stub.isStarted() {
if stub.rpcl != nil { return
stub.rpcl.Close() }
}
if stub.rpcs != nil { if stub.rpcl != nil {
stub.rpcs.Close() stub.rpcl.Close()
} }
if stub.rpcc != nil { if stub.rpcs != nil {
stub.rpcc.Close() stub.rpcs.Close()
} }
if stub.rpcm != nil { if stub.rpcc != nil {
stub.rpcm.Close() stub.rpcc.Close()
} }
if stub.srvErrC != nil { if stub.rpcm != nil {
<-stub.doneC stub.rpcm.Close()
} }
}) if stub.srvErrC != nil {
<-stub.doneC
}
stub.started = false
stub.conn = nil
stub.syncReq = nil
} }
// Run the plugin. Start event processing then wait for an error or getting stopped. // Run the plugin. Start event processing then wait for an error or getting stopped.
@ -449,14 +489,11 @@ func (stub *stub) Run(ctx context.Context) error {
return err return err
} }
// Wait for the plugin to stop. // Wait for the plugin to stop, should be called after Start() or Run().
func (stub *stub) Wait() { func (stub *stub) Wait() {
stub.Lock() if stub.IsStarted() {
if stub.srvErrC == nil { <-stub.doneC
return
} }
stub.Unlock()
<-stub.doneC
} }
// Name returns the full indexed name of the plugin. // Name returns the full indexed name of the plugin.
@ -464,6 +501,14 @@ func (stub *stub) Name() string {
return stub.idx + "-" + stub.name return stub.idx + "-" + stub.name
} }
func (stub *stub) RegistrationTimeout() time.Duration {
return stub.registrationTimeout
}
func (stub *stub) RequestTimeout() time.Duration {
return stub.requestTimeout
}
// Connect the plugin to NRI. // Connect the plugin to NRI.
func (stub *stub) connect() error { func (stub *stub) connect() error {
if stub.conn != nil { if stub.conn != nil {
@ -502,7 +547,7 @@ func (stub *stub) connect() error {
func (stub *stub) register(ctx context.Context) error { func (stub *stub) register(ctx context.Context) error {
log.Infof(ctx, "Registering plugin %s...", stub.Name()) log.Infof(ctx, "Registering plugin %s...", stub.Name())
ctx, cancel := context.WithTimeout(ctx, registrationTimeout) ctx, cancel := context.WithTimeout(ctx, stub.registrationTimeout)
defer cancel() defer cancel()
req := &api.RegisterPluginRequest{ req := &api.RegisterPluginRequest{
@ -518,7 +563,9 @@ func (stub *stub) register(ctx context.Context) error {
// Handle a lost connection. // Handle a lost connection.
func (stub *stub) connClosed() { func (stub *stub) connClosed() {
stub.Lock()
stub.close() stub.close()
stub.Unlock()
if stub.onClose != nil { if stub.onClose != nil {
stub.onClose() stub.onClose()
return return
@ -558,6 +605,9 @@ func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl
log.Infof(ctx, "Configuring plugin %s for runtime %s/%s...", stub.Name(), log.Infof(ctx, "Configuring plugin %s for runtime %s/%s...", stub.Name(),
req.RuntimeName, req.RuntimeVersion) req.RuntimeName, req.RuntimeVersion)
stub.registrationTimeout = time.Duration(req.RegistrationTimeout * int64(time.Millisecond))
stub.requestTimeout = time.Duration(req.RequestTimeout * int64(time.Millisecond))
defer func() { defer func() {
stub.cfgErrC <- retErr stub.cfgErrC <- retErr
}() }()
@ -596,16 +646,55 @@ func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl
func (stub *stub) Synchronize(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) { func (stub *stub) Synchronize(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
handler := stub.handlers.Synchronize handler := stub.handlers.Synchronize
if handler == nil { if handler == nil {
return &api.SynchronizeResponse{}, nil return &api.SynchronizeResponse{More: req.More}, nil
} }
update, err := handler(ctx, req.Pods, req.Containers)
if req.More {
return stub.collectSync(req)
}
return stub.deliverSync(ctx, req)
}
func (stub *stub) collectSync(req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
stub.Lock()
defer stub.Unlock()
log.Debugf(noCtx, "collecting sync req with %d pods, %d containers...",
len(req.Pods), len(req.Containers))
if stub.syncReq == nil {
stub.syncReq = req
} else {
stub.syncReq.Pods = append(stub.syncReq.Pods, req.Pods...)
stub.syncReq.Containers = append(stub.syncReq.Containers, req.Containers...)
}
return &api.SynchronizeResponse{More: req.More}, nil
}
func (stub *stub) deliverSync(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
stub.Lock()
syncReq := stub.syncReq
stub.syncReq = nil
stub.Unlock()
if syncReq == nil {
syncReq = req
} else {
syncReq.Pods = append(syncReq.Pods, req.Pods...)
syncReq.Containers = append(syncReq.Containers, req.Containers...)
}
update, err := stub.handlers.Synchronize(ctx, syncReq.Pods, syncReq.Containers)
return &api.SynchronizeResponse{ return &api.SynchronizeResponse{
Update: update, Update: update,
More: false,
}, err }, err
} }
// Shutdown the plugin. // Shutdown the plugin.
func (stub *stub) Shutdown(ctx context.Context, req *api.ShutdownRequest) (*api.ShutdownResponse, error) { func (stub *stub) Shutdown(ctx context.Context, _ *api.ShutdownRequest) (*api.ShutdownResponse, error) {
handler := stub.handlers.Shutdown handler := stub.handlers.Shutdown
if handler != nil { if handler != nil {
handler(ctx) handler(ctx)

View File

@ -33,7 +33,7 @@ type Plugin struct {
// //
// Normally located at /etc/nri/conf.json // Normally located at /etc/nri/conf.json
type ConfigList struct { type ConfigList struct {
// Verion of the list // Version of the list
Version string `json:"version"` Version string `json:"version"`
// Plugins // Plugins
Plugins []*Plugin `json:"plugins"` Plugins []*Plugin `json:"plugins"`

View File

@ -143,10 +143,10 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
} }
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error { func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
// TODO: Error on send rather than on recv if len(p) > messageLengthMax {
//if len(p) > messageLengthMax { return OversizedMessageError(len(p))
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax) }
//}
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil { if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
return err return err
} }

View File

@ -16,7 +16,12 @@
package ttrpc package ttrpc
import "errors" import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var ( var (
// ErrProtocol is a general error in the handling the protocol. // ErrProtocol is a general error in the handling the protocol.
@ -32,3 +37,44 @@ var (
// ErrStreamClosed is when the streaming connection is closed. // ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed") ErrStreamClosed = errors.New("ttrpc: stream closed")
) )
// OversizedMessageErr is used to indicate refusal to send an oversized message.
// It wraps a ResourceExhausted grpc Status together with the offending message
// length.
type OversizedMessageErr struct {
messageLength int
err error
}
// OversizedMessageError returns an OversizedMessageErr error for the given message
// length if it exceeds the allowed maximum. Otherwise a nil error is returned.
func OversizedMessageError(messageLength int) error {
if messageLength <= messageLengthMax {
return nil
}
return &OversizedMessageErr{
messageLength: messageLength,
err: status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", messageLength, messageLengthMax),
}
}
// Error returns the error message for the corresponding grpc Status for the error.
func (e *OversizedMessageErr) Error() string {
return e.err.Error()
}
// Unwrap returns the corresponding error with our grpc status code.
func (e *OversizedMessageErr) Unwrap() error {
return e.err
}
// RejectedLength retrieves the rejected message length which triggered the error.
func (e *OversizedMessageErr) RejectedLength() int {
return e.messageLength
}
// MaximumLength retrieves the maximum allowed message length that triggered the error.
func (*OversizedMessageErr) MaximumLength() int {
return messageLengthMax
}

6
vendor/modules.txt vendored
View File

@ -163,8 +163,8 @@ github.com/containerd/imgcrypt/images/encryption
## explicit; go 1.20 ## explicit; go 1.20
github.com/containerd/log github.com/containerd/log
github.com/containerd/log/logtest github.com/containerd/log/logtest
# github.com/containerd/nri v0.6.1 # github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39
## explicit; go 1.19 ## explicit; go 1.21
github.com/containerd/nri github.com/containerd/nri
github.com/containerd/nri/pkg/adaptation github.com/containerd/nri/pkg/adaptation
github.com/containerd/nri/pkg/api github.com/containerd/nri/pkg/api
@ -186,7 +186,7 @@ github.com/containerd/platforms
github.com/containerd/plugin github.com/containerd/plugin
github.com/containerd/plugin/dynamic github.com/containerd/plugin/dynamic
github.com/containerd/plugin/registry github.com/containerd/plugin/registry
# github.com/containerd/ttrpc v1.2.5 # github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287
## explicit; go 1.19 ## explicit; go 1.19
github.com/containerd/ttrpc github.com/containerd/ttrpc
# github.com/containerd/typeurl/v2 v2.2.0 # github.com/containerd/typeurl/v2 v2.2.0