Merge pull request #10769 from klihub/devel/update-nri

Update NRI to latest.
This commit is contained in:
Derek McGowan 2024-10-16 23:38:34 +00:00 committed by GitHub
commit fff2236f49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 1629 additions and 817 deletions

4
go.mod
View File

@ -21,11 +21,11 @@ require (
github.com/containerd/go-runc v1.1.0
github.com/containerd/imgcrypt v1.2.0-rc1
github.com/containerd/log v0.1.0
github.com/containerd/nri v0.6.1
github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39
github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723
github.com/containerd/platforms v0.2.1
github.com/containerd/plugin v0.1.0
github.com/containerd/ttrpc v1.2.5
github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287
github.com/containerd/typeurl/v2 v2.2.0
github.com/containernetworking/cni v1.2.3
github.com/containernetworking/plugins v1.5.1

16
go.sum
View File

@ -683,8 +683,8 @@ github.com/containerd/imgcrypt v1.2.0-rc1 h1:XESaAcMqxrGlRjQIqLdzxqsO/ddNK4vwfe7
github.com/containerd/imgcrypt v1.2.0-rc1/go.mod h1:F9roK2DzKlFnV+h+ZJy/r2FoS28bIvxKgdcoV7o8Sms=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/containerd/nri v0.6.1 h1:xSQ6elnQ4Ynidm9u49ARK9wRKHs80HCUI+bkXOxV4mA=
github.com/containerd/nri v0.6.1/go.mod h1:7+sX3wNx+LR7RzhjnJiUkFDhn18P5Bg/0VnJ/uXpRJM=
github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39 h1:M2Cz4Bm2++xdq0A/zWqunOjnp0HNYQnNDNSSzqCmqnE=
github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39/go.mod h1:uSkgBrCdEtAiEz4vnrq8gmAC4EnVAM5Klt0OuK5rZYQ=
github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723 h1:swk9KxrmARZjSMrHc1Lzb39XhcDwAhYpqkBhinCFLCQ=
github.com/containerd/otelttrpc v0.0.0-20240305015340-ea5083fda723/go.mod h1:ZKzztepTSz/LKtbUSzfBNVwgqBEPABVZV9PQF/l53+Q=
github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A=
@ -692,8 +692,8 @@ github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7np
github.com/containerd/plugin v0.1.0 h1:CYMyZk9beRAIe1FEKItbMLLAz/z16aXrGc+B+nv0fU4=
github.com/containerd/plugin v0.1.0/go.mod h1:j6HlpMtkiZMgT4UsfVNxPBUkwdw9KQGU6nCLfRxnq+w=
github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak=
github.com/containerd/ttrpc v1.2.5 h1:IFckT1EFQoFBMG4c3sMdT8EP3/aKfumK1msY+Ze4oLU=
github.com/containerd/ttrpc v1.2.5/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o=
github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287 h1:zwv64tCdT888KxuXQuv5i36cEdljoXq3sVqLmOEbCQI=
github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o=
github.com/containerd/typeurl/v2 v2.2.0 h1:6NBDbQzr7I5LHgp34xAXYF5DOTQDn05X58lsPEmzLso=
github.com/containerd/typeurl/v2 v2.2.0/go.mod h1:8XOOxnyatxSWuG8OfsZXVnAF4iZfedjS/8UHSPJnX4g=
github.com/containernetworking/cni v1.2.3 h1:hhOcjNVUQTnzdRJ6alC5XF+wd9mfGIUaj8FuJbEslXM=
@ -1001,10 +1001,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0=
github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA=
github.com/onsi/gomega v1.34.0 h1:eSSPsPNp6ZpsG8X1OVmOTxig+CblTc4AxpPBykhe2Os=
github.com/onsi/gomega v1.34.0/go.mod h1:MIKI8c+f+QLWk+hxbePD4i0LMJSExPaZOVfkoex4cAo=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=

3
vendor/github.com/containerd/nri/.codespellrc generated vendored Normal file
View File

@ -0,0 +1,3 @@
[codespell]
skip = .git,*.pdf,*.svg,go.sum,go.mod
ignore-words-list = clos

View File

@ -36,6 +36,9 @@ issues:
linters:
- govet
text: "copylocks: .*protobuf/internal/impl.MessageState.*"
# We dot-import ginkgo and gomega in some tests. Silence any related errors.
- path: 'pkg/adaptation|pkg/runtime-tools/generate|pkg/net/multiplex'
text: "dot-imports:"
run:
timeout: 2m

View File

@ -124,7 +124,7 @@ $(BIN_PATH)/template: $(wildcard plugins/template/*.go)
test-gopkgs: ginkgo-tests test-ulimits
SKIPPED_PKGS="ulimit-adjuster"
SKIPPED_PKGS="ulimit-adjuster,device-injector"
ginkgo-tests:
$(Q)$(GINKGO) run \
@ -137,12 +137,15 @@ ginkgo-tests:
--coverprofile coverprofile \
--succinct \
--skip-package $(SKIPPED_PKGS) \
-r .; \
-r && \
$(GO_CMD) tool cover -html=$(COVERAGE_PATH)/coverprofile -o $(COVERAGE_PATH)/coverage.html
test-ulimits:
$(Q)cd ./plugins/ulimit-adjuster && $(GO_TEST) -v
test-device-injector:
$(Q)cd ./plugins/device-injector && $(GO_TEST) -v
codecov: SHELL := $(shell which bash)
codecov:
bash <(curl -s https://codecov.io/bash) -f $(COVERAGE_PATH)/coverprofile
@ -163,6 +166,13 @@ vet:
golangci-lint:
$(Q)$(GOLANG_CILINT) run
validate-repo-no-changes:
$(Q)test -z "$$(git status --short | tee /dev/stderr)" || { \
echo "Repository has changes."; \
echo "Please make sure to commit all changes, including generated files."; \
exit 1; \
}
#
# proto generation targets
#

View File

@ -297,7 +297,7 @@ apply NRI responses to containers.
The plugin stub hides many of the low-level details of implementing an NRI
plugin. It takes care of connection establishment, plugin registration,
configuration, and event subscription. All [sample plugins](pkg/plugins)
configuration, and event subscription. All [sample plugins](plugins)
are implemented using the stub. Any of these can be used as a tutorial on
how the stub library should be used.
@ -308,6 +308,7 @@ The following sample plugins exist for NRI:
- [logger](plugins/logger)
- [differ](plugins/differ)
- [device injector](plugins/device-injector)
- [network device injector](plugins/network-device-injector)
- [OCI hook injector](plugins/hook-injector)
- [ulimit adjuster](plugins/ulimit-adjuster)
- [NRI v0.1.0 plugin adapter](plugins/v010-adapter)

View File

@ -65,6 +65,7 @@ type Adaptation struct {
serverOpts []ttrpc.ServerOpt
listener net.Listener
plugins []*plugin
syncLock sync.RWMutex
}
var (
@ -135,6 +136,7 @@ func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option)
pluginPath: DefaultPluginPath,
dropinPath: DefaultPluginConfigPath,
socketPath: DefaultSocketPath,
syncLock: sync.RWMutex{},
}
for _, o := range opts {
@ -336,25 +338,48 @@ func (r *Adaptation) startPlugins() (retErr error) {
}()
for i, name := range names {
log.Infof(noCtx, "starting plugin %q...", name)
log.Infof(noCtx, "starting pre-installed NRI plugin %q...", name)
id := ids[i]
p, err := r.newLaunchedPlugin(r.pluginPath, id, name, configs[i])
if err != nil {
return fmt.Errorf("failed to start NRI plugin %q: %w", name, err)
log.Warnf(noCtx, "failed to initialize pre-installed NRI plugin %q: %v", name, err)
continue
}
if err := p.start(r.name, r.version); err != nil {
return err
log.Warnf(noCtx, "failed to start pre-installed NRI plugin %q: %v", name, err)
continue
}
plugins = append(plugins, p)
}
// Although the error returned by syncPlugins may not be nil, r.syncFn could still ignores this error and returns a nil error.
// We need to make sure that the plugins are successfully synchronized in the `plugins`
syncPlugins := func(ctx context.Context, pods []*PodSandbox, containers []*Container) (updates []*ContainerUpdate, err error) {
startedPlugins := plugins
plugins = make([]*plugin, 0, len(plugins))
for _, plugin := range startedPlugins {
us, err := plugin.synchronize(ctx, pods, containers)
if err != nil {
plugin.stop()
log.Warnf(noCtx, "failed to synchronize pre-installed NRI plugin %q: %v", plugin.name(), err)
continue
}
plugins = append(plugins, plugin)
updates = append(updates, us...)
log.Infof(noCtx, "pre-installed NRI plugin %q synchronization success", plugin.name())
}
return updates, nil
}
if err := r.syncFn(noCtx, syncPlugins); err != nil {
return fmt.Errorf("failed to synchronize pre-installed NRI Plugins: %w", err)
}
r.plugins = plugins
r.sortPlugins()
return nil
}
@ -369,12 +394,22 @@ func (r *Adaptation) stopPlugins() {
}
func (r *Adaptation) removeClosedPlugins() {
active := []*plugin{}
var active, closed []*plugin
for _, p := range r.plugins {
if !p.isClosed() {
if p.isClosed() {
closed = append(closed, p)
} else {
active = append(active, p)
}
}
if len(closed) != 0 {
go func() {
for _, plugin := range closed {
plugin.stop()
}
}()
}
r.plugins = active
}
@ -431,6 +466,8 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
continue
}
r.requestPluginSync()
err = r.syncFn(ctx, p.synchronize)
if err != nil {
log.Infof(ctx, "failed to synchronize plugin: %v", err)
@ -439,9 +476,10 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
r.plugins = append(r.plugins, p)
r.sortPlugins()
r.Unlock()
log.Infof(ctx, "plugin %q connected and synchronized", p.name())
}
log.Infof(ctx, "plugin %q connected", p.name())
r.finishedPluginSync()
}
}()
@ -512,3 +550,30 @@ func (r *Adaptation) sortPlugins() {
}
}
}
func (r *Adaptation) requestPluginSync() {
r.syncLock.Lock()
}
func (r *Adaptation) finishedPluginSync() {
r.syncLock.Unlock()
}
type PluginSyncBlock struct {
r *Adaptation
}
// BlockPluginSync blocks plugins from being synchronized/fully registered.
func (r *Adaptation) BlockPluginSync() *PluginSyncBlock {
r.syncLock.RLock()
return &PluginSyncBlock{r: r}
}
// Unblock a plugin sync. block put in place by BlockPluginSync. Safe to call
// multiple times but only from a single goroutine.
func (b *PluginSyncBlock) Unblock() {
if b != nil && b.r != nil {
b.r.syncLock.RUnlock()
b.r = nil
}
}

View File

@ -78,6 +78,7 @@ type (
LinuxMemory = api.LinuxMemory
LinuxDevice = api.LinuxDevice
LinuxDeviceCgroup = api.LinuxDeviceCgroup
CDIDevice = api.CDIDevice
HugepageLimit = api.HugepageLimit
Hooks = api.Hooks
Hook = api.Hook

View File

@ -33,13 +33,15 @@ import (
"github.com/containerd/nri/pkg/net"
"github.com/containerd/nri/pkg/net/multiplex"
"github.com/containerd/ttrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// DefaultPluginRegistrationTimeout is the default timeout for plugin registration.
DefaultPluginRegistrationTimeout = 5 * time.Second
DefaultPluginRegistrationTimeout = api.DefaultPluginRegistrationTimeout
// DefaultPluginRequestTimeout is the default timeout for plugins to handle a request.
DefaultPluginRequestTimeout = 2 * time.Second
DefaultPluginRequestTimeout = api.DefaultPluginRequestTimeout
)
var (
@ -387,6 +389,8 @@ func (p *plugin) configure(ctx context.Context, name, version, config string) er
Config: config,
RuntimeName: name,
RuntimeVersion: version,
RegistrationTimeout: getPluginRegistrationTimeout().Milliseconds(),
RequestTimeout: getPluginRequestTimeout().Milliseconds(),
})
if err != nil {
return fmt.Errorf("failed to configure plugin: %w", err)
@ -412,18 +416,101 @@ func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers
ctx, cancel := context.WithTimeout(ctx, getPluginRequestTimeout())
defer cancel()
var (
podsToSend = pods
ctrsToSend = containers
podsPerMsg = len(pods)
ctrsPerMsg = len(containers)
rpl *SynchronizeResponse
err error
)
for {
req := &SynchronizeRequest{
Pods: pods,
Containers: containers,
Pods: podsToSend[:podsPerMsg],
Containers: ctrsToSend[:ctrsPerMsg],
More: len(podsToSend) > podsPerMsg || len(ctrsToSend) > ctrsPerMsg,
}
rpl, err := p.stub.Synchronize(ctx, req)
log.Debugf(ctx, "sending sync message, %d/%d, %d/%d (more: %v)",
len(req.Pods), len(podsToSend), len(req.Containers), len(ctrsToSend), req.More)
rpl, err = p.stub.Synchronize(ctx, req)
if err == nil {
if !req.More {
break
}
if len(rpl.Update) > 0 || rpl.More != req.More {
p.close()
return nil, fmt.Errorf("plugin does not handle split sync requests")
}
podsToSend = podsToSend[podsPerMsg:]
ctrsToSend = ctrsToSend[ctrsPerMsg:]
if podsPerMsg > len(podsToSend) {
podsPerMsg = len(podsToSend)
}
if ctrsPerMsg > len(ctrsToSend) {
ctrsPerMsg = len(ctrsToSend)
}
} else {
podsPerMsg, ctrsPerMsg, err = recalcObjsPerSyncMsg(podsPerMsg, ctrsPerMsg, err)
if err != nil {
p.close()
return nil, err
}
log.Debugf(ctx, "oversized message, retrying in smaller chunks")
}
}
return rpl.Update, nil
}
func recalcObjsPerSyncMsg(pods, ctrs int, err error) (int, int, error) {
const (
minObjsPerMsg = 8
)
if status.Code(err) != codes.ResourceExhausted {
return pods, ctrs, err
}
if pods+ctrs <= minObjsPerMsg {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}
var e *ttrpc.OversizedMessageErr
if !errors.As(err, &e) {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}
maxLen := e.MaximumLength()
msgLen := e.RejectedLength()
if msgLen == 0 || maxLen == 0 || msgLen <= maxLen {
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
}
factor := float64(maxLen) / float64(msgLen)
if factor > 0.9 {
factor = 0.9
}
pods = int(float64(pods) * factor)
ctrs = int(float64(ctrs) * factor)
if pods+ctrs < minObjsPerMsg {
pods = minObjsPerMsg / 2
ctrs = minObjsPerMsg / 2
}
return pods, ctrs, nil
}
// Relay CreateContainer request to plugin.
func (p *plugin) createContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
if !p.events.IsSet(Event_CREATE_CONTAINER) {
@ -516,7 +603,7 @@ func (p *plugin) StateChange(ctx context.Context, evt *StateChangeEvent) error {
return nil
}
// isFatalError returns true if the error is fatal and the plugin connection shoudld be closed.
// isFatalError returns true if the error is fatal and the plugin connection should be closed.
func isFatalError(err error) bool {
switch {
case errors.Is(err, ttrpc.ErrClosed):

View File

@ -19,6 +19,8 @@ package adaptation
import (
"fmt"
"strings"
"github.com/containerd/nri/pkg/api"
)
type result struct {
@ -89,6 +91,7 @@ func collectCreateContainerResult(request *CreateContainerRequest) *result {
Env: []*KeyValue{},
Hooks: &Hooks{},
Rlimits: []*POSIXRlimit{},
CDIDevices: []*CDIDevice{},
Linux: &LinuxContainerAdjustment{
Devices: []*LinuxDevice{},
Resources: &LinuxResources{
@ -200,7 +203,7 @@ func (r *result) adjust(rpl *ContainerAdjustment, plugin string) error {
if err := r.adjustEnv(rpl.Env, plugin); err != nil {
return err
}
if err := r.adjustHooks(rpl.Hooks, plugin); err != nil {
if err := r.adjustHooks(rpl.Hooks); err != nil {
return err
}
if rpl.Linux != nil {
@ -213,10 +216,16 @@ func (r *result) adjust(rpl *ContainerAdjustment, plugin string) error {
if err := r.adjustCgroupsPath(rpl.Linux.CgroupsPath, plugin); err != nil {
return err
}
if err := r.adjustOomScoreAdj(rpl.Linux.OomScoreAdj, plugin); err != nil {
return err
}
}
if err := r.adjustRlimits(rpl.Rlimits, plugin); err != nil {
return err
}
if err := r.adjustCDIDevices(rpl.CDIDevices, plugin); err != nil {
return err
}
return nil
}
@ -322,6 +331,13 @@ func (r *result) adjustMounts(mounts []*Mount, plugin string) error {
r.reply.adjust.Mounts = append(r.reply.adjust.Mounts, m)
}
// next, apply deletions with no corresponding additions
for _, m := range del {
if _, ok := mod[api.ClearRemovalMarker(m.Destination)]; !ok {
r.reply.adjust.Mounts = append(r.reply.adjust.Mounts, m)
}
}
// finally, apply additions/modifications to plugin container creation request
create.Container.Mounts = append(create.Container.Mounts, add...)
@ -386,6 +402,36 @@ func (r *result) adjustDevices(devices []*LinuxDevice, plugin string) error {
return nil
}
func (r *result) adjustCDIDevices(devices []*CDIDevice, plugin string) error {
if len(devices) == 0 {
return nil
}
// Notes:
// CDI devices are opaque references, typically to vendor specific
// devices. They get resolved to actual devices and potential related
// mounts, environment variables, etc. in the runtime. Unlike with
// devices, we only allow CDI device references to be added to a
// container, not removed. We pass them unresolved to the runtime and
// have them resolved there. Also unlike with devices, we don't include
// CDI device references in creation requests. However, since there
// is typically a strong ownership and a single related management entity
// per vendor/class for these devices we do treat multiple injection of
// the same CDI device reference as an error here.
id := r.request.create.Container.Id
// apply additions to collected adjustments
for _, d := range devices {
if err := r.owners.claimCDIDevice(id, d.Name, plugin); err != nil {
return err
}
r.reply.adjust.CDIDevices = append(r.reply.adjust.CDIDevices, d)
}
return nil
}
func (r *result) adjustEnv(env []*KeyValue, plugin string) error {
if len(env) == 0 {
return nil
@ -458,7 +504,7 @@ func splitEnvVar(s string) (string, string) {
return split[0], split[1]
}
func (r *result) adjustHooks(hooks *Hooks, plugin string) error {
func (r *result) adjustHooks(hooks *Hooks) error {
if hooks == nil {
return nil
}
@ -645,7 +691,16 @@ func (r *result) adjustResources(resources *LinuxResources, plugin string) error
container.RdtClass = String(v.GetValue())
reply.RdtClass = String(v.GetValue())
}
if v := resources.GetPids(); v != nil {
if err := r.owners.claimPidsLimit(id, plugin); err != nil {
return err
}
pidv := &api.LinuxPids{
Limit: v.GetLimit(),
}
container.Pids = pidv
reply.Pids = pidv
}
return nil
}
@ -666,6 +721,23 @@ func (r *result) adjustCgroupsPath(path, plugin string) error {
return nil
}
func (r *result) adjustOomScoreAdj(OomScoreAdj *OptionalInt, plugin string) error {
if OomScoreAdj == nil {
return nil
}
create, id := r.request.create, r.request.create.Container.Id
if err := r.owners.claimOomScoreAdj(id, plugin); err != nil {
return err
}
create.Container.Linux.OomScoreAdj = OomScoreAdj
r.reply.adjust.Linux.OomScoreAdj = OomScoreAdj
return nil
}
func (r *result) adjustRlimits(rlimits []*POSIXRlimit, plugin string) error {
create, id, adjust := r.request.create, r.request.create.Container.Id, r.reply.adjust
for _, l := range rlimits {
@ -820,6 +892,14 @@ func (r *result) updateResources(reply, u *ContainerUpdate, plugin string) error
}
resources.RdtClass = String(v.GetValue())
}
if v := resources.GetPids(); v != nil {
if err := r.owners.claimPidsLimit(id, plugin); err != nil {
return err
}
resources.Pids = &api.LinuxPids{
Limit: v.GetLimit(),
}
}
// update request/reply from copy on success
reply.Linux.Resources = resources.Copy()
@ -872,6 +952,7 @@ type owners struct {
annotations map[string]string
mounts map[string]string
devices map[string]string
cdiDevices map[string]string
env map[string]string
memLimit string
memReservation string
@ -888,11 +969,13 @@ type owners struct {
cpuRealtimePeriod string
cpusetCpus string
cpusetMems string
pidsLimit string
hugepageLimits map[string]string
blockioClass string
rdtClass string
unified map[string]string
cgroupsPath string
oomScoreAdj string
rlimits map[string]string
}
@ -917,6 +1000,10 @@ func (ro resultOwners) claimDevice(id, path, plugin string) error {
return ro.ownersFor(id).claimDevice(path, plugin)
}
func (ro resultOwners) claimCDIDevice(id, path, plugin string) error {
return ro.ownersFor(id).claimCDIDevice(path, plugin)
}
func (ro resultOwners) claimEnv(id, name, plugin string) error {
return ro.ownersFor(id).claimEnv(name, plugin)
}
@ -981,6 +1068,10 @@ func (ro resultOwners) claimCpusetMems(id, plugin string) error {
return ro.ownersFor(id).claimCpusetMems(plugin)
}
func (ro resultOwners) claimPidsLimit(id, plugin string) error {
return ro.ownersFor(id).claimPidsLimit(plugin)
}
func (ro resultOwners) claimHugepageLimit(id, size, plugin string) error {
return ro.ownersFor(id).claimHugepageLimit(size, plugin)
}
@ -1001,6 +1092,10 @@ func (ro resultOwners) claimCgroupsPath(id, plugin string) error {
return ro.ownersFor(id).claimCgroupsPath(plugin)
}
func (ro resultOwners) claimOomScoreAdj(id, plugin string) error {
return ro.ownersFor(id).claimOomScoreAdj(plugin)
}
func (ro resultOwners) claimRlimits(id, typ, plugin string) error {
return ro.ownersFor(id).claimRlimit(typ, plugin)
}
@ -1038,6 +1133,17 @@ func (o *owners) claimDevice(path, plugin string) error {
return nil
}
func (o *owners) claimCDIDevice(name, plugin string) error {
if o.cdiDevices == nil {
o.cdiDevices = make(map[string]string)
}
if other, taken := o.cdiDevices[name]; taken {
return conflict(plugin, other, "CDI device", name)
}
o.cdiDevices[name] = plugin
return nil
}
func (o *owners) claimEnv(name, plugin string) error {
if o.env == nil {
o.env = make(map[string]string)
@ -1169,6 +1275,14 @@ func (o *owners) claimCpusetMems(plugin string) error {
return nil
}
func (o *owners) claimPidsLimit(plugin string) error {
if other := o.pidsLimit; other != "" {
return conflict(plugin, other, "pids pinning")
}
o.pidsLimit = plugin
return nil
}
func (o *owners) claimHugepageLimit(size, plugin string) error {
if o.hugepageLimits == nil {
o.hugepageLimits = make(map[string]string)
@ -1227,6 +1341,14 @@ func (o *owners) claimCgroupsPath(plugin string) error {
return nil
}
func (o *owners) claimOomScoreAdj(plugin string) error {
if other := o.oomScoreAdj; other != "" {
return conflict(plugin, other, "oom score adj")
}
o.oomScoreAdj = plugin
return nil
}
func (ro resultOwners) clearAnnotation(id, key string) {
ro.ownersFor(id).clearAnnotation(key)
}

View File

@ -129,6 +129,11 @@ func (a *ContainerAdjustment) RemoveDevice(path string) {
})
}
// AddCDIDevice records the addition of the given CDI device to a container.
func (a *ContainerAdjustment) AddCDIDevice(d *CDIDevice) {
a.CDIDevices = append(a.CDIDevices, d) // TODO: should we dup d here ?
}
// SetLinuxMemoryLimit records setting the memory limit for a container.
func (a *ContainerAdjustment) SetLinuxMemoryLimit(value int64) {
a.initLinuxResourcesMemory()
@ -219,6 +224,12 @@ func (a *ContainerAdjustment) SetLinuxCPUSetMems(value string) {
a.Linux.Resources.Cpu.Mems = value
}
// SetLinuxPidLimits records setting the pid max number for a container.
func (a *ContainerAdjustment) SetLinuxPidLimits(value int64) {
a.initLinuxResourcesPids()
a.Linux.Resources.Pids.Limit = value
}
// AddLinuxHugepageLimit records adding a hugepage limit for a container.
func (a *ContainerAdjustment) AddLinuxHugepageLimit(pageSize string, value uint64) {
a.initLinuxResources()
@ -253,6 +264,12 @@ func (a *ContainerAdjustment) SetLinuxCgroupsPath(value string) {
a.Linux.CgroupsPath = value
}
// SetLinuxOomScoreAdj records setting the kernel's Out-Of-Memory (OOM) killer score for a container.
func (a *ContainerAdjustment) SetLinuxOomScoreAdj(value *int) {
a.initLinux()
a.Linux.OomScoreAdj = Int(value) // using Int(value) from ./options.go to optionally allocate a pointer to normalized copy of value
}
//
// Initializing a container adjustment and container update.
//
@ -302,6 +319,13 @@ func (a *ContainerAdjustment) initLinuxResourcesCPU() {
}
}
func (a *ContainerAdjustment) initLinuxResourcesPids() {
a.initLinuxResources()
if a.Linux.Resources.Pids == nil {
a.Linux.Resources.Pids = &LinuxPids{}
}
}
func (a *ContainerAdjustment) initLinuxResourcesUnified() {
a.initLinuxResources()
if a.Linux.Resources.Unified == nil {

File diff suppressed because it is too large Load Diff

View File

@ -46,7 +46,7 @@ message UpdateContainersRequest {
}
message UpdateContainersResponse {
// Containers that the runtime failed to udpate.
// Containers that the runtime failed to update.
repeated ContainerUpdate failed = 1;
}
@ -114,6 +114,10 @@ message ConfigureRequest {
string runtime_name = 2;
// Version of the runtime NRI is running in.
string runtime_version = 3;
// Configured registration timeout in milliseconds.
int64 registration_timeout = 4;
// Configured request processing timeout in milliseconds.
int64 request_timeout = 5;
}
message ConfigureResponse {
@ -127,11 +131,15 @@ message SynchronizeRequest {
repeated PodSandbox pods = 1;
// Containers known to the runtime.
repeated Container containers = 2;
// Whether there are more pods and containers to follow.
bool more = 3;
}
message SynchronizeResponse {
// Updates to containers requested by the plugin.
repeated ContainerUpdate update = 1;
// Whether the client is able to handle more advertised pods and containers.
bool more = 2;
}
message CreateContainerRequest {
@ -319,6 +327,11 @@ message LinuxDeviceCgroup {
string access = 5;
}
// A CDI device reference.
message CDIDevice {
string name = 1;
}
// Container (linux) resources.
message LinuxResources {
LinuxMemory memory = 1;
@ -328,6 +341,7 @@ message LinuxResources {
OptionalString rdt_class = 5;
map<string, string> unified = 6;
repeated LinuxDeviceCgroup devices = 7; // for NRI v1 emulation
LinuxPids pids = 8;
}
// Memory-related parts of (linux) resources.
@ -366,6 +380,11 @@ message POSIXRlimit {
uint64 soft = 3;
}
// Pids-related parts of (linux) resources.
message LinuxPids {
int64 limit = 1;
}
// Requested adjustments to a container being created.
message ContainerAdjustment {
map<string, string> annotations = 2;
@ -374,6 +393,7 @@ message ContainerAdjustment {
Hooks hooks = 5;
LinuxContainerAdjustment linux = 6;
repeated POSIXRlimit rlimits = 7;
repeated CDIDevice CDI_devices = 8;
}
// Adjustments to (linux) resources.
@ -381,6 +401,7 @@ message LinuxContainerAdjustment {
repeated LinuxDevice devices = 1;
LinuxResources resources = 2;
string cgroups_path = 3;
OptionalInt oom_score_adj = 4;
}
// Requested update to an already created container.

View File

@ -57,3 +57,14 @@ func IsMarkedForRemoval(key string) (string, bool) {
func MarkForRemoval(key string) string {
return "-" + key
}
// ClearRemovalMarker returns a key cleared from any removal marker.
func ClearRemovalMarker(key string) string {
if key == "" {
return ""
}
if key[0] == '-' {
return key[1:]
}
return key
}

View File

@ -22,7 +22,7 @@ import (
)
// FromOCILinuxResources returns resources from an OCI runtime Spec.
func FromOCILinuxResources(o *rspec.LinuxResources, ann map[string]string) *LinuxResources {
func FromOCILinuxResources(o *rspec.LinuxResources, _ map[string]string) *LinuxResources {
if o == nil {
return nil
}
@ -65,6 +65,11 @@ func FromOCILinuxResources(o *rspec.LinuxResources, ann map[string]string) *Linu
Access: d.Access,
})
}
if p := o.Pids; p != nil {
l.Pids = &LinuxPids{
Limit: p.Limit,
}
}
return l
}
@ -148,7 +153,11 @@ func (r *LinuxResources) ToOCI() *rspec.LinuxResources {
Access: d.Access,
})
}
if r.Pids != nil {
o.Pids = &rspec.LinuxPids{
Limit: r.Pids.Limit,
}
}
return o
}
@ -226,6 +235,11 @@ func (r *LinuxResources) Copy() *LinuxResources {
o.Unified[k] = v
}
}
if r.Pids != nil {
o.Pids = &LinuxPids{
Limit: r.Pids.Limit,
}
}
o.BlockioClass = String(r.BlockioClass)
o.RdtClass = String(r.RdtClass)

28
vendor/github.com/containerd/nri/pkg/api/timeouts.go generated vendored Normal file
View File

@ -0,0 +1,28 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"time"
)
const (
// DefaultPluginRegistrationTimeout is the default timeout for plugin registration.
DefaultPluginRegistrationTimeout = 5 * time.Second
// DefaultPluginRequestTimeout is the default timeout for plugins to handle a request.
DefaultPluginRequestTimeout = 2 * time.Second
)

View File

@ -112,6 +112,12 @@ func (u *ContainerUpdate) SetLinuxCPUSetMems(value string) {
u.Linux.Resources.Cpu.Mems = value
}
// SetLinuxPidLimits records setting the pid max number for a container.
func (u *ContainerUpdate) SetLinuxPidLimits(value int64) {
u.initLinuxResourcesPids()
u.Linux.Resources.Pids.Limit = value
}
// AddLinuxHugepageLimit records adding a hugepage limit for a container.
func (u *ContainerUpdate) AddLinuxHugepageLimit(pageSize string, value uint64) {
u.initLinuxResources()
@ -184,3 +190,10 @@ func (u *ContainerUpdate) initLinuxResourcesUnified() {
u.Linux.Resources.Unified = make(map[string]string)
}
}
func (u *ContainerUpdate) initLinuxResourcesPids() {
u.initLinuxResources()
if u.Linux.Resources.Pids == nil {
u.Linux.Resources.Pids = &LinuxPids{}
}
}

View File

@ -139,7 +139,7 @@ const (
// length of frame header: 4-byte ConnID, 4-byte payload length
headerLen = 8
// max. allowed payload size
maxPayloadSize = 1 << 24
maxPayloadSize = ttrpcMessageHeaderLength + ttrpcMessageLengthMax
)
// conn represents a single multiplexed connection.
@ -234,18 +234,23 @@ func (m *mux) Listen(id ConnID) (net.Listener, error) {
}
func (m *mux) write(id ConnID, buf []byte) (int, error) {
var hdr [headerLen]byte
if len(buf) > maxPayloadSize {
return 0, syscall.EMSGSIZE
}
binary.BigEndian.PutUint32(hdr[0:4], uint32(id))
binary.BigEndian.PutUint32(hdr[4:8], uint32(len(buf)))
var (
hdr [headerLen]byte
data = buf[:]
size = len(data)
)
m.writeLock.Lock()
defer m.writeLock.Unlock()
for {
if size > maxPayloadSize {
size = maxPayloadSize
}
binary.BigEndian.PutUint32(hdr[0:4], uint32(id))
binary.BigEndian.PutUint32(hdr[4:8], uint32(size))
n, err := m.trunk.Write(hdr[:])
if err != nil {
err = fmt.Errorf("failed to write header to trunk: %w", err)
@ -256,16 +261,27 @@ func (m *mux) write(id ConnID, buf []byte) (int, error) {
return 0, err
}
n, err = m.trunk.Write(buf)
n, err = m.trunk.Write(data[:size])
if err != nil {
err = fmt.Errorf("failed to write payload to trunk: %w", err)
if n != 0 {
m.setError(err)
m.Close()
}
return 0, err
}
return n, err
data = data[size:]
if size > len(data) {
size = len(data)
}
if size == 0 {
break
}
}
return len(buf), nil
}
func (m *mux) reader() {
@ -429,16 +445,16 @@ func (c *conn) RemoteAddr() net.Addr {
}
// SetDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetDeadline(t time.Time) error {
func (c *conn) SetDeadline(_ time.Time) error {
return nil
}
// SetReadDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetReadDeadline(t time.Time) error {
func (c *conn) SetReadDeadline(_ time.Time) error {
return nil
}
// SetWriteDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetWriteDeadline(t time.Time) error {
func (c *conn) SetWriteDeadline(_ time.Time) error {
return nil
}

View File

@ -22,3 +22,8 @@ const (
// RuntimeServiceConn is the mux connection ID for NRI runtime services.
RuntimeServiceConn
)
const (
ttrpcMessageHeaderLength = 10
ttrpcMessageLengthMax = 4 << 20
)

View File

@ -40,6 +40,7 @@ type Generator struct {
filterAnnotations func(map[string]string) (map[string]string, error)
resolveBlockIO func(string) (*rspec.LinuxBlockIO, error)
resolveRdt func(string) (*rspec.LinuxIntelRdt, error)
injectCDIDevices func(*rspec.Spec, []string) error
checkResources func(*rspec.LinuxResources) error
}
@ -91,6 +92,14 @@ func WithResourceChecker(fn func(*rspec.LinuxResources) error) GeneratorOption {
}
}
// WithCDIDeviceInjector specifies a runtime-specific function to use for CDI
// device resolution and injection into an OCI Spec.
func WithCDIDeviceInjector(fn func(*rspec.Spec, []string) error) GeneratorOption {
return func(g *Generator) {
g.injectCDIDevices = fn
}
}
// Adjust adjusts all aspects of the OCI Spec that NRI knows/cares about.
func (g *Generator) Adjust(adjust *nri.ContainerAdjustment) error {
if adjust == nil {
@ -102,8 +111,12 @@ func (g *Generator) Adjust(adjust *nri.ContainerAdjustment) error {
}
g.AdjustEnv(adjust.GetEnv())
g.AdjustHooks(adjust.GetHooks())
if err := g.InjectCDIDevices(adjust.GetCDIDevices()); err != nil {
return err
}
g.AdjustDevices(adjust.GetLinux().GetDevices())
g.AdjustCgroupsPath(adjust.GetLinux().GetCgroupsPath())
g.AdjustOomScoreAdj(adjust.GetLinux().GetOomScoreAdj())
resources := adjust.GetLinux().GetResources()
if err := g.AdjustResources(resources); err != nil {
@ -252,7 +265,9 @@ func (g *Generator) AdjustResources(r *nri.LinuxResources) error {
for k, v := range r.Unified {
g.AddLinuxResourcesUnified(k, v)
}
if v := r.GetPids(); v != nil {
g.SetLinuxResourcesPidsLimit(v.GetLimit())
}
if g.checkResources != nil {
if err := g.checkResources(g.Config.Linux.Resources); err != nil {
return fmt.Errorf("failed to adjust resources in OCI Spec: %w", err)
@ -309,6 +324,17 @@ func (g *Generator) AdjustCgroupsPath(path string) {
}
}
// AdjustOomScoreAdj adjusts the kernel's Out-Of-Memory (OOM) killer score for the container.
// This may override kubelet's settings for OOM score.
func (g *Generator) AdjustOomScoreAdj(score *nri.OptionalInt) {
if score != nil {
g.SetProcessOOMScoreAdj(int(score.Value))
} else {
g.SetProcessOOMScoreAdj(0)
g.Config.Process.OOMScoreAdj = nil
}
}
// AdjustDevices adjusts the (Linux) devices in the OCI Spec.
func (g *Generator) AdjustDevices(devices []*nri.LinuxDevice) {
for _, d := range devices {
@ -323,6 +349,23 @@ func (g *Generator) AdjustDevices(devices []*nri.LinuxDevice) {
}
}
// InjectCDIDevices injects the requested CDI devices into the OCI Spec.
// Devices are given by their fully qualified CDI device names. The
// actual device injection is done using a runtime-specific CDI
// injection function, set using the WithCDIDeviceInjector option.
func (g *Generator) InjectCDIDevices(devices []*nri.CDIDevice) error {
if len(devices) == 0 || g.injectCDIDevices == nil {
return nil
}
names := []string{}
for _, d := range devices {
names = append(names, d.Name)
}
return g.injectCDIDevices(g.Config, names)
}
func (g *Generator) AdjustRlimits(rlimits []*nri.POSIXRlimit) error {
for _, l := range rlimits {
if l == nil {

View File

@ -35,7 +35,7 @@ import (
)
// Plugin can implement a number of interfaces related to Pod and Container
// lifecycle events. No any single such inteface is mandatory, therefore the
// lifecycle events. No any single such interface is mandatory, therefore the
// Plugin interface itself is empty. Plugins are required to implement at
// least one of these interfaces and this is verified during stub creation.
// Trying to create a stub for a plugin violating this requirement will fail
@ -137,7 +137,9 @@ type PostUpdateContainerInterface interface {
// Stub is the interface the stub provides for the plugin implementation.
type Stub interface {
// Run the plugin. Starts the plugin then waits for an error or the plugin to stop
// Run starts the plugin then waits for the plugin service to exit, either due to a
// critical error or an explicit call to Stop(). Once Run() returns, the plugin can be
// restarted by calling Run() or Start() again.
Run(context.Context) error
// Start the plugin.
Start(context.Context) error
@ -148,11 +150,23 @@ type Stub interface {
// UpdateContainer requests unsolicited updates to containers.
UpdateContainers([]*api.ContainerUpdate) ([]*api.ContainerUpdate, error)
// RegistrationTimeout returns the registration timeout for the stub.
// This is the default timeout if the plugin has not been started or
// the timeout received in the Configure request otherwise.
RegistrationTimeout() time.Duration
// RequestTimeout returns the request timeout for the stub.
// This is the default timeout if the plugin has not been started or
// the timeout received in the Configure request otherwise.
RequestTimeout() time.Duration
}
const (
// Plugin registration timeout.
registrationTimeout = 2 * time.Second
// DefaultRegistrationTimeout is the default plugin registration timeout.
DefaultRegistrationTimeout = api.DefaultPluginRegistrationTimeout
// DefaultRequestTimeout is the default plugin request processing timeout.
DefaultRequestTimeout = api.DefaultPluginRequestTimeout
)
var (
@ -255,11 +269,14 @@ type stub struct {
rpcs *ttrpc.Server
rpcc *ttrpc.Client
runtime api.RuntimeService
closeOnce sync.Once
started bool
doneC chan struct{}
srvErrC chan error
cfgErrC chan error
syncReq *api.SynchronizeRequest
registrationTimeout time.Duration
requestTimeout time.Duration
}
// Handlers for NRI plugin event and request.
@ -288,7 +305,9 @@ func New(p interface{}, opts ...Option) (Stub, error) {
idx: os.Getenv(api.PluginIdxEnvVar),
socketPath: api.DefaultSocketPath,
dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) },
doneC: make(chan struct{}),
registrationTimeout: DefaultRegistrationTimeout,
requestTimeout: DefaultRequestTimeout,
}
for _, o := range opts {
@ -316,10 +335,10 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
stub.Lock()
defer stub.Unlock()
if stub.started {
if stub.isStarted() {
return fmt.Errorf("stub already started")
}
stub.started = true
stub.doneC = make(chan struct{})
err := stub.connect()
if err != nil {
@ -378,10 +397,11 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
stub.srvErrC = make(chan error, 1)
stub.cfgErrC = make(chan error, 1)
go func() {
stub.srvErrC <- rpcs.Serve(ctx, rpcl)
close(stub.doneC)
}()
go func(l stdnet.Listener, doneC chan struct{}, srvErrC chan error) {
srvErrC <- rpcs.Serve(ctx, l)
close(doneC)
}(rpcl, stub.doneC, stub.srvErrC)
stub.rpcm = rpcm
stub.rpcl = rpcl
@ -401,6 +421,7 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
log.Infof(ctx, "Started plugin %s...", stub.Name())
stub.started = true
return nil
}
@ -413,8 +434,24 @@ func (stub *stub) Stop() {
stub.close()
}
// IsStarted returns true if the plugin has been started either by Start() or by Run().
func (stub *stub) IsStarted() bool {
stub.Lock()
defer stub.Unlock()
return stub.isStarted()
}
func (stub *stub) isStarted() bool {
return stub.started
}
// reset stub to the status that can initiate a new
// NRI connection, the caller must hold lock.
func (stub *stub) close() {
stub.closeOnce.Do(func() {
if !stub.isStarted() {
return
}
if stub.rpcl != nil {
stub.rpcl.Close()
}
@ -430,7 +467,10 @@ func (stub *stub) close() {
if stub.srvErrC != nil {
<-stub.doneC
}
})
stub.started = false
stub.conn = nil
stub.syncReq = nil
}
// Run the plugin. Start event processing then wait for an error or getting stopped.
@ -449,21 +489,26 @@ func (stub *stub) Run(ctx context.Context) error {
return err
}
// Wait for the plugin to stop.
// Wait for the plugin to stop, should be called after Start() or Run().
func (stub *stub) Wait() {
stub.Lock()
if stub.srvErrC == nil {
return
}
stub.Unlock()
if stub.IsStarted() {
<-stub.doneC
}
}
// Name returns the full indexed name of the plugin.
func (stub *stub) Name() string {
return stub.idx + "-" + stub.name
}
func (stub *stub) RegistrationTimeout() time.Duration {
return stub.registrationTimeout
}
func (stub *stub) RequestTimeout() time.Duration {
return stub.requestTimeout
}
// Connect the plugin to NRI.
func (stub *stub) connect() error {
if stub.conn != nil {
@ -502,7 +547,7 @@ func (stub *stub) connect() error {
func (stub *stub) register(ctx context.Context) error {
log.Infof(ctx, "Registering plugin %s...", stub.Name())
ctx, cancel := context.WithTimeout(ctx, registrationTimeout)
ctx, cancel := context.WithTimeout(ctx, stub.registrationTimeout)
defer cancel()
req := &api.RegisterPluginRequest{
@ -518,7 +563,9 @@ func (stub *stub) register(ctx context.Context) error {
// Handle a lost connection.
func (stub *stub) connClosed() {
stub.Lock()
stub.close()
stub.Unlock()
if stub.onClose != nil {
stub.onClose()
return
@ -558,6 +605,9 @@ func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl
log.Infof(ctx, "Configuring plugin %s for runtime %s/%s...", stub.Name(),
req.RuntimeName, req.RuntimeVersion)
stub.registrationTimeout = time.Duration(req.RegistrationTimeout * int64(time.Millisecond))
stub.requestTimeout = time.Duration(req.RequestTimeout * int64(time.Millisecond))
defer func() {
stub.cfgErrC <- retErr
}()
@ -596,16 +646,55 @@ func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl
func (stub *stub) Synchronize(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
handler := stub.handlers.Synchronize
if handler == nil {
return &api.SynchronizeResponse{}, nil
return &api.SynchronizeResponse{More: req.More}, nil
}
update, err := handler(ctx, req.Pods, req.Containers)
if req.More {
return stub.collectSync(req)
}
return stub.deliverSync(ctx, req)
}
func (stub *stub) collectSync(req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
stub.Lock()
defer stub.Unlock()
log.Debugf(noCtx, "collecting sync req with %d pods, %d containers...",
len(req.Pods), len(req.Containers))
if stub.syncReq == nil {
stub.syncReq = req
} else {
stub.syncReq.Pods = append(stub.syncReq.Pods, req.Pods...)
stub.syncReq.Containers = append(stub.syncReq.Containers, req.Containers...)
}
return &api.SynchronizeResponse{More: req.More}, nil
}
func (stub *stub) deliverSync(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
stub.Lock()
syncReq := stub.syncReq
stub.syncReq = nil
stub.Unlock()
if syncReq == nil {
syncReq = req
} else {
syncReq.Pods = append(syncReq.Pods, req.Pods...)
syncReq.Containers = append(syncReq.Containers, req.Containers...)
}
update, err := stub.handlers.Synchronize(ctx, syncReq.Pods, syncReq.Containers)
return &api.SynchronizeResponse{
Update: update,
More: false,
}, err
}
// Shutdown the plugin.
func (stub *stub) Shutdown(ctx context.Context, req *api.ShutdownRequest) (*api.ShutdownResponse, error) {
func (stub *stub) Shutdown(ctx context.Context, _ *api.ShutdownRequest) (*api.ShutdownResponse, error) {
handler := stub.handlers.Shutdown
if handler != nil {
handler(ctx)

View File

@ -33,7 +33,7 @@ type Plugin struct {
//
// Normally located at /etc/nri/conf.json
type ConfigList struct {
// Verion of the list
// Version of the list
Version string `json:"version"`
// Plugins
Plugins []*Plugin `json:"plugins"`

View File

@ -143,10 +143,10 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
}
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
// TODO: Error on send rather than on recv
//if len(p) > messageLengthMax {
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
//}
if len(p) > messageLengthMax {
return OversizedMessageError(len(p))
}
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
return err
}

View File

@ -16,7 +16,12 @@
package ttrpc
import "errors"
import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
// ErrProtocol is a general error in the handling the protocol.
@ -32,3 +37,44 @@ var (
// ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed")
)
// OversizedMessageErr is used to indicate refusal to send an oversized message.
// It wraps a ResourceExhausted grpc Status together with the offending message
// length.
type OversizedMessageErr struct {
messageLength int
err error
}
// OversizedMessageError returns an OversizedMessageErr error for the given message
// length if it exceeds the allowed maximum. Otherwise a nil error is returned.
func OversizedMessageError(messageLength int) error {
if messageLength <= messageLengthMax {
return nil
}
return &OversizedMessageErr{
messageLength: messageLength,
err: status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", messageLength, messageLengthMax),
}
}
// Error returns the error message for the corresponding grpc Status for the error.
func (e *OversizedMessageErr) Error() string {
return e.err.Error()
}
// Unwrap returns the corresponding error with our grpc status code.
func (e *OversizedMessageErr) Unwrap() error {
return e.err
}
// RejectedLength retrieves the rejected message length which triggered the error.
func (e *OversizedMessageErr) RejectedLength() int {
return e.messageLength
}
// MaximumLength retrieves the maximum allowed message length that triggered the error.
func (*OversizedMessageErr) MaximumLength() int {
return messageLengthMax
}

6
vendor/modules.txt vendored
View File

@ -163,8 +163,8 @@ github.com/containerd/imgcrypt/images/encryption
## explicit; go 1.20
github.com/containerd/log
github.com/containerd/log/logtest
# github.com/containerd/nri v0.6.1
## explicit; go 1.19
# github.com/containerd/nri v0.6.2-0.20241010080438-159f5754db39
## explicit; go 1.21
github.com/containerd/nri
github.com/containerd/nri/pkg/adaptation
github.com/containerd/nri/pkg/api
@ -186,7 +186,7 @@ github.com/containerd/platforms
github.com/containerd/plugin
github.com/containerd/plugin/dynamic
github.com/containerd/plugin/registry
# github.com/containerd/ttrpc v1.2.5
# github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287
## explicit; go 1.19
github.com/containerd/ttrpc
# github.com/containerd/typeurl/v2 v2.2.0