nri: add experimental NRI plugin.

Add a common NRI 'service' plugin. It takes care of relaying
requests and respones to and from NRI (external NRI plugins)
and the high-level containerd namespace-independent logic of
applying NRI container adjustments and updates to actual CRI
and other containers.

The namespace-dependent details of the necessary container
manipulation operations are to be implemented by namespace-
specific adaptations. This NRI plugin defines the API which
such adaptations need to implement.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
This commit is contained in:
Krisztian Litkey
2022-08-31 16:10:24 +03:00
parent e0be97ccee
commit 43704ca888
118 changed files with 12178 additions and 10066 deletions

View File

@@ -1,22 +1,41 @@
linters:
enable:
- structcheck
- varcheck
- staticcheck
- unconvert
- gofmt
- goimports
- golint
- ineffassign
- vet
- unused
- misspell
- revive
disable:
- errcheck
issues:
include:
- EXC0002
exclude-rules:
# We have protoc-generated ttRPC/gRPC code. When adding extra functions
# for generated types we want to consistently violate golint's semantic
# function name spelling rules, instead of inconsistently doing so only
# from automatically generated files. These rules are for that.
- path: pkg/adaptation/result.go
linters:
- golint
- revive
text: "should be claim"
# Ignore naming violation in the test suite as well.
- path: pkg/adaptation/adaptation_suite_test.go
linters:
- golint
- revive
text: "should be strip"
# Differ copies pods and containers with Mutexes for diffing. Should be harmless.
- path: plugins/differ/nri-differ.go
linters:
- govet
text: "copylocks: .*protobuf/internal/impl.MessageState.*"
run:
timeout: 2m

View File

@@ -12,5 +12,166 @@
# See the License for the specific language governing permissions and
# limitations under the License.
all:
go build -v
PROTO_SOURCES = $(shell find . -name '*.proto' | grep -v /vendor/)
PROTO_GOFILES = $(patsubst %.proto,%.pb.go,$(PROTO_SOURCES))
PROTO_INCLUDE = -I$(PWD):/usr/local/include:/usr/include
PROTO_OPTIONS = --proto_path=. $(PROTO_INCLUDE) \
--go_opt=paths=source_relative --go_out=. \
--go-ttrpc_opt=paths=source_relative --go-ttrpc_out=.
PROTO_COMPILE = PATH=$(PATH):$(shell go env GOPATH)/bin; protoc $(PROTO_OPTIONS)
GO_CMD := go
GO_BUILD := $(GO_CMD) build
GO_INSTALL := $(GO_CMD) install
GO_TEST := $(GO_CMD) test
GO_LINT := golint -set_exit_status
GO_FMT := gofmt
GO_VET := $(GO_CMD) vet
GO_MODULES := $(shell $(GO_CMD) list ./...)
GOLANG_CILINT := golangci-lint
GINKGO := ginkgo
BUILD_PATH := $(shell pwd)/build
BIN_PATH := $(BUILD_PATH)/bin
COVERAGE_PATH := $(BUILD_PATH)/coverage
PLUGINS := \
$(BIN_PATH)/logger \
$(BIN_PATH)/device-injector \
$(BIN_PATH)/hook-injector \
$(BIN_PATH)/differ \
$(BIN_PATH)/v010-adapter \
$(BIN_PATH)/template
ifneq ($(V),1)
Q := @
endif
#
# top-level targets
#
all: build build-plugins
build: build-proto build-check
clean: clean-plugins
allclean: clean clean-cache
test: test-gopkgs
#
# build targets
#
build-proto: $(PROTO_GOFILES)
build-plugins: $(PLUGINS)
build-check:
$(Q)$(GO_BUILD) -v $(GO_MODULES)
#
# clean targets
#
clean-plugins:
$(Q)rm -f $(PLUGINS)
clean-cache:
$(Q)$(GO_CMD) clean -cache -testcache
#
# plugins build targets
#
$(BIN_PATH)/logger: $(wildcard plugins/logger/*.go)
$(Q)echo "Building $@..."; \
cd $(dir $<) && $(GO_BUILD) -o $@ .
$(BIN_PATH)/device-injector: $(wildcard plugins/device-injector/*.go)
$(Q)echo "Building $@..."; \
cd $(dir $<) && $(GO_BUILD) -o $@ .
$(BIN_PATH)/hook-injector: $(wildcard plugins/hook-injector/*.go)
$(Q)echo "Building $@..."; \
cd $(dir $<) && $(GO_BUILD) -o $@ .
$(BIN_PATH)/differ: $(wildcard plugins/differ/*.go)
$(Q)echo "Building $@..."; \
cd $(dir $<) && $(GO_BUILD) -o $@ .
$(BIN_PATH)/v010-adapter: $(wildcard plugins/v010-adapter/*.go)
$(Q)echo "Building $@..."; \
cd $(dir $<) && $(GO_BUILD) -o $@ .
$(BIN_PATH)/template: $(wildcard plugins/template/*.go)
$(Q)echo "Building $@..."; \
cd $(dir $<) && $(GO_BUILD) -o $@ .
#
# test targets
#
test-gopkgs: ginkgo-tests
ginkgo-tests:
$(Q)$(GINKGO) run \
--race \
--trace \
--cover \
--covermode atomic \
--output-dir $(COVERAGE_PATH) \
--junit-report junit.xml \
--coverprofile coverprofile \
--succinct \
-r .; \
$(GO_CMD) tool cover -html=$(COVERAGE_PATH)/coverprofile -o $(COVERAGE_PATH)/coverage.html
codecov: SHELL := $(shell which bash)
codecov:
bash <(curl -s https://codecov.io/bash) -f $(COVERAGE_PATH)/coverprofile
#
# other validation targets
#
fmt format:
$(Q)$(GO_FMT) -s -d -e .
lint:
$(Q)$(GO_LINT) -set_exit_status ./...
vet:
$(Q)$(GO_VET) ./...
golangci-lint:
$(Q)$(GOLANG_CILINT) run
#
# proto generation targets
#
%.pb.go: %.proto
$(Q)echo "Generating $@..."; \
$(PROTO_COMPILE) $<
#
# targets for installing dependencies
#
install-protoc install-protobuf:
$(Q)./scripts/install-protobuf && \
install-ttrpc-plugin:
$(Q)$(GO_INSTALL) github.com/containerd/ttrpc/cmd/protoc-gen-go-ttrpc@74421d10189e8c118870d294c9f7f62db2d33ec1
install-protoc-dependencies:
$(Q)$(GO_INSTALL) google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.0
install-ginkgo:
$(Q)$(GO_INSTALL) -mod=mod github.com/onsi/ginkgo/v2/ginkgo

115
vendor/github.com/containerd/nri/README-v0.1.0.md generated vendored Normal file
View File

@@ -0,0 +1,115 @@
# nri - Node Resource Interface
[![PkgGoDev](https://pkg.go.dev/badge/github.com/containerd/nri)](https://pkg.go.dev/github.com/containerd/nri)
[![Build Status](https://github.com/containerd/nri/workflows/CI/badge.svg)](https://github.com/containerd/nri/actions?query=workflow%3ACI)
[![codecov](https://codecov.io/gh/containerd/nri/branch/main/graph/badge.svg)](https://codecov.io/gh/containerd/nri)
[![Go Report Card](https://goreportcard.com/badge/github.com/containerd/nri)](https://goreportcard.com/report/github.com/containerd/nri)
*This project is currently in DRAFT status*
This project is a WIP for a new, CNI like, interface for managing resources on a node for Pods and Containers.
## Documentation
The basic interface, concepts and plugin design of the Container Network Interface (CNI) is an elegant way to handle multiple implementations of the network stack for containers.
This concept can be used for additional interfaces to customize a container's runtime environment.
This proposal covers a new interface for resource management on a node with a structured API and plugin design for containers.
## Lifecycle
The big selling point for CNI is that it has a structured interface for modifying the network namespace for a container.
This is different from generic hooks as they lack a type safe API injected into the lifecycle of a container.
The lifecycle point that CNI and NRI plugins will be injected into is the point between `Create` and `Start` of the container's init process.
`Create->NRI->Start`
## Configuration
Configuration is split into two parts. One is the payload that is specific to a plugin invocation while the second is the host level configuration and options that specify what plugins to run and provide additional configuration to a plugin.
### Filepath and Binaries
Plugin binary paths can be configured via the consumer but will default to `/opt/nri/bin`.
Binaries are named with their type as the binary name, same as the CNI plugin naming scheme.
### Host Level Config
The config's default location will be `/etc/nri/resource.d/*.conf`.
```json
{
"version": "0.1",
"plugins": [
{
"type": "konfine",
"conf": {
"systemReserved": [0, 1]
}
},
{
"type": "clearcfs"
}
]
}
```
### Input
Input to a plugin is provided via `STDIN` as a `json` payload.
```json
{
"version": "0.1",
"state": "create",
"id": "redis",
"pid": 1234,
"spec": {
"resources": {},
"cgroupsPath": "default/redis",
"namespaces": {
"pid": "/proc/44/ns/pid",
"mount": "/proc/44/ns/mnt",
"net": "/proc/44/ns/net"
},
"annotations": {
"qos.class": "ls"
}
}
}
```
### Output
```json
{
"version": "0.1",
"state": "create",
"id": "redis",
"pid": 1234,
"cgroupsPath": "qos-ls/default/redis"
}
```
## Commands
* Invoke - provides invocations into different lifecycle changes of a container
- states: `setup|pause|resume|update|delete`
## Packages
A Go based API and client package will be created for both producers of plugins and consumers, commonly being the container runtime (containerd).
### Sample Plugin
* [clearcfs](examples/clearcfs/main.go)
## Project details
nri is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
As a containerd sub-project, you will find the:
* [Project governance](https://github.com/containerd/project/blob/main/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/main/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/main/CONTRIBUTING.md)
information in our [`containerd/project`](https://github.com/containerd/project) repository.

View File

@@ -1,167 +1,306 @@
# nri - Node Resource Interface
## Node Resource Interface, Revisited
[![PkgGoDev](https://pkg.go.dev/badge/github.com/containerd/nri)](https://pkg.go.dev/github.com/containerd/nri)
[![Build Status](https://github.com/containerd/nri/workflows/CI/badge.svg)](https://github.com/containerd/nri/actions?query=workflow%3ACI)
[![codecov](https://codecov.io/gh/containerd/nri/branch/master/graph/badge.svg)](https://codecov.io/gh/containerd/nri)
[![Go Report Card](https://goreportcard.com/badge/github.com/containerd/nri)](https://goreportcard.com/report/github.com/containerd/nri)
### Goal
*This project is currently in DRAFT status*
NRI allows plugging domain- or vendor-specific custom logic into OCI-
compatible runtimes. This logic can make controlled changes to containers
or perform extra actions outside the scope of OCI at certain points in a
containers lifecycle. This can be used, for instance, for improved allocation
and management of devices and other container resources.
This project is a WIP for a new, CNI like, interface for managing resources on a node for Pods and Containers.
NRI defines the interfaces and implements the common infrastructure for
enabling such pluggable runtime extensions, NRI plugins. This also keeps
the plugins themselves runtime-agnostic.
## Documentation
The goal is to enable NRI support in the most commonly used OCI runtimes,
[containerd](https://github.com/containerd/containerd) and
[CRI-O](https://github.com/cri-o/cri-o).
The basic interface, concepts and plugin design of the Container Network Interface (CNI) is an elegant way to handle multiple implementations of the network stack for containers.
This concept can be used for additional interfaces to customize a container's runtime environment.
This proposal covers a new interface for resource management on a node with a structured API and plugin design for containers.
### Background
## Lifecycle
The revisited API is a major rewrite of NRI. It changes the scope of NRI
and how it gets integrated into runtimes. It reworks how plugins are
implemented, how they communicate with the runtime, and what kind of
changes they can make to containers.
The big selling point for CNI is that it has a structured interface for modifying the network namespace for a container.
This is different from generic hooks as they lack a type safe API injected into the lifecycle of a container.
The lifecycle point that CNI and NRI plugins will be injected into is the point between `Create` and `Start` of the container's init process.
[NRI v0.1.0](README-v0.1.0.md) used an OCI hook-like one-shot plugin invocation
mechanism where a separate instance of a plugin was spawned for every NRI
event. This instance then used its standard input and output to receive a
request and provide a response, both as JSON data.
`Create->NRI->Start`
Plugins in NRI are daemon-like entities. A single instance of a plugin is
now responsible for handling the full stream of NRI events and requests. A
unix-domain socket is used as the transport for communication. Instead of
JSON requests and responses NRI is defined as a formal, protobuf-based
'NRI plugin protocol' which is compiled into ttRPC bindings. This should
result in improved communication efficiency with lower per-message overhead,
and enable straightforward implementation of stateful NRI plugins.
## Configuration
### Components
Configuration is split into two parts. One is the payload that is specific to a plugin invocation while the second is the host level configuration and options that specify what plugins to run and provide additional configuration to a plugin.
The NRI implementation consists of a number of components. The core of
these are essential for implementing working end-to-end NRI support in
runtimes. These core components are the actual [NRI protocol](pkg/api),
and the [NRI runtime adaptation](pkg/adaptation).
### Filepath and Binaries
Together these establish the model of how a runtime interacts with NRI and
how plugins interact with containers in the runtime through NRI. They also
define under which conditions plugins can make changes to containers and
the extent of these changes.
Plugin binary paths can be configured via the consumer but will default to `/opt/nri/bin`.
Binaries are named with their type as the binary name, same as the CNI plugin naming scheme.
The rest of the components are the [NRI plugin stub library](pkg/stub)
and [some sample NRI plugins](plugins). [Some plugins](plugins/hook-injector)
implement useful functionality in real world scenarios. [A few](plugins/differ)
[others](plugins/logger) are useful for debugging. All of the sample plugins
serve as practical examples of how the stub library can be used to implement
NRI plugins.
### Host Level Config
### Protocol, Plugin API
The config's default location will be `/etc/nri/resource.d/*.conf`.
The core of NRI is defined by a protobuf [protocol definition](pkg/api/api.proto)
of the low-level plugin API. The API defines two services, Runtime and Plugin.
```json
{
"version": "0.1",
"plugins": [
{
"type": "konfine",
"conf": {
"systemReserved": [0, 1]
}
},
{
"type": "clearcfs"
}
]
}
```
The Runtime service is the public interface runtimes expose for NRI plugins. All
requests on this interface are initiated by the plugin. The interface provides
functions for
### Input
- initiating plugin registration
- requesting unsolicited updates to containers
Input to a plugin is provided via `STDIN` as a `json` payload.
The Plugin service is the public interface NRI uses to interact with plugins.
All requests on this interface are initiated by NRI/the runtime. The interface
provides functions for
```json
{
"version": "0.1",
"state": "create",
"id": "redis",
"pid": 1234,
"spec": {
"resources": {},
"cgroupsPath": "default/redis",
"namespaces": {
"pid": "/proc/44/ns/pid",
"mount": "/proc/44/ns/mnt",
"net": "/proc/44/ns/net"
},
"annotations": {
"qos.class": "ls"
}
}
}
```
- configuring the plugin
- getting initial list of already existing pods and containers
- hooking the plugin into pod/container lifecycle events
- shutting down the plugin
### Output
#### Plugin Registration
```json
{
"version": "0.1",
"state": "create",
"id": "redis",
"pid": 1234,
"cgroupsPath": "qos-ls/default/redis"
}
```
Before a plugin can start receiving and processing container events, it needs
to register itself with NRI. During registration the plugin and NRI perform a
handshake sequence which consists of the following steps:
## Commands
1. the plugin identifies itself to the runtime
2. NRI provides plugin-specific configuration data to the plugin
3. the plugin subscribes to pod and container lifecycle events of interest
4. NRI sends list of existing pods and containers to plugin
5. the plugin requests any updates deemed necessary to existing containers
* Invoke - provides invocations into different lifecycle changes of a container
- states: `setup|pause|resume|update|delete`
The plugin identifies itself to NRI by a plugin name and a plugin index. The
plugin index is used by NRI to determine in which order the plugin is hooked
into pod and container lifecycle event processing with respect to any other
plugins.
## Packages
The plugin name is used to pick plugin-specific data to send to the plugin
as configuration. This data is only present if the plugin has been launched
by NRI. If the plugin has been externally started it is expected to acquire
its configuration also by external means. The plugin subscribes to pod and
container lifecycle events of interest in its response to configuration.
A Go based API and client package will be created for both producers of plugins and consumers, commonly being the container runtime (containerd).
As the last step in the registration and handshaking process, NRI sends the
full set of pods and containers known to the runtime. The plugin can request
updates it considers necessary to any of the known containers in response.
### Sample Plugin
Once the handshake sequence is over and the plugin has registered with NRI,
it will start receiving pod and container lifecycle events according to its
subscription.
**clearcfs**
#### Pod Data and Available Lifecycle Events
Clear the cfs quotas for `ls` services.
<details>
<summary>NRI Pod Lifecycle Events</summary>
<p align="center">
<img src="./docs/nri-pod-lifecycle.svg" title="NRI Pod Lifecycle Events">
</p>
</details>
NRI plugins can subscribe to the following pod lifecycle events:
- creation
- stopping
- removal
The following pieces of pod metadata are available to plugins in NRI:
- ID
- name
- UID
- namespace
- labels
- annotations
- cgroup parent directory
- runtime handler name
#### Container Data and Available Lifecycle Events
<details>
<summary>NRI Container Lifecycle Events</summary>
<p align="center">
<img src="./docs/nri-container-lifecycle.svg" title="NRI Container Lifecycle Events">
</p>
</details>
NRI plugins can subscribe to the following container lifecycle events:
- creation (*)
- post-creation
- starting
- post-start
- updating (*)
- post-update
- stopping (*)
- removal
*) Plugins can request adjustment or updates to containers in response to
these events.
The following pieces of container metadata are available to plugins in NRI:
- ID
- pod ID
- name
- state
- labels
- annotations
- command line arguments
- environment variables
- mounts
- OCI hooks
- linux
- namespace IDs
- devices
- resources
- memory
- limit
- reservation
- swap limit
- kernel limit
- kernel TCP limit
- swappiness
- OOM disabled flag
- hierarchical accounting flag
- hugepage limits
- CPU
- shares
- quota
- period
- realtime runtime
- realtime period
- cpuset CPUs
- cpuset memory
- Block I/O class
- RDT class
Apart from data identifying the container, these pieces of information
represent the corresponding data in the container's OCI Spec.
#### Container Adjustment
During container creation plugins can request changes to the following
container parameters:
- annotations
- mounts
- environment variables
- OCI hooks
- linux
- devices
- resources
- memory
- limit
- reservation
- swap limit
- kernel limit
- kernel TCP limit
- swappiness
- OOM disabled flag
- hierarchical accounting flag
- hugepage limits
- CPU
- shares
- quota
- period
- realtime runtime
- realtime period
- cpuset CPUs
- cpuset memory
- Block I/O class
- RDT class
#### Container Updates
Once a container has been created plugins can request updates to them.
These updates can be requested in response to another containers creation
request, in response to any containers update request, in response to any
containers stop request, or they can be requested as part of a separate
unsolicited container update request. The following container parameters
can be updated this way:
- resources
- memory
- limit
- reservation
- swap limit
- kernel limit
- kernel TCP limit
- swappiness
- OOM disabled flag
- hierarchical accounting flag
- hugepage limits
- CPU
- shares
- quota
- period
- realtime runtime
- realtime period
- cpuset CPUs
- cpuset memory
- Block I/O class
- RDT class
```go
package main
### Runtime Adaptation
import (
"context"
"fmt"
"os"
The NRI [runtime adaptation](pkg/adaptation) package is the interface
runtimes use to integrate to NRI and interact with NRI plugins. It
implements basic plugin discovery, startup and configuration. It also
provides the functions necessary to hook NRI plugins into lifecycle
events of pods and containers from the runtime.
"github.com/containerd/containerd/pkg/nri/skel"
"github.com/containerd/containerd/pkg/nri/types"
"github.com/sirupsen/logrus"
)
The package hides the fact that multiple NRI plugins might be processing
any single pod or container lifecycle event. It takes care of invoking
plugins in the correct order and combining responses by multiple plugins
into a single one. While combining responses, the package detects any
unintentional conflicting changes made by multiple plugins to a single
container and flags such an event as an error to the runtime.
var max = []byte("max")
### Wrapped OCI Spec Generator
// clearCFS clears any cfs quotas for the containers
type clearCFS struct {
}
The [OCI Spec generator](pkg/runtime-tools/generate) package wraps the
[corresponding package](https://github.com/opencontainers/runtime-tools/tree/master/generate)
and adds functions for applying NRI container adjustments and updates to
OCI Specs. This package can be used by runtime NRI integration code to
apply NRI responses to containers.
func (c *clearCFS) Type() string {
return "clearcfs"
}
### Plugin Stub Library
func (c *clearCFS) Invoke(ctx context.Context, r *types.Request) (*types.Result, error) {
result := r.NewResult()
if r.State != types.Create {
return result, nil
}
switch r.Spec.Annotations["qos.class"] {
case "ls":
logrus.Debugf("clearing cfs for %s", r.ID)
group, err := cg.Load(r.Spec.CgroupsPath)
if err != nil {
return nil, err
}
return result, group.Write(cg.CFSMax)
}
return result, nil
}
The plugin stub hides many of the low-level details of implementing an NRI
plugin. It takes care of connection establishment, plugin registration,
configuration, and event subscription. All [sample plugins](pkg/plugins)
are implemented using the stub. Any of these can be used as a tutorial on
how the stub library should be used.
func main() {
ctx := context.Background()
if err := skel.Run(ctx, &clearCFS{}); err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
os.Exit(1)
}
}
```
### Sample Plugins
## Project details
The following sample plugins exist for NRI:
nri is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
As a containerd sub-project, you will find the:
- [logger](plugins/logger)
- [differ](plugins/differ)
- [device injector](plugins/device-injector)
- [OCI hook injector](plugins/hook-injector)
- [NRI v0.1.0 plugin adapter](plugins/v010-adapter)
* [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
information in our [`containerd/project`](https://github.com/containerd/project) repository.
Please see the documentation of these plugins for further details
about what and how each of these plugins can be used for.

View File

@@ -28,7 +28,6 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/oci"
types "github.com/containerd/nri/types/v1"
"github.com/pkg/errors"
)
const (
@@ -107,7 +106,7 @@ func (c *Client) InvokeWithSandbox(ctx context.Context, task containerd.Task, st
r.Conf = p.Conf
result, err := c.invokePlugin(ctx, p.Type, r)
if err != nil {
return nil, errors.Wrapf(err, "plugin: %s", p.Type)
return nil, fmt.Errorf("plugin: %s: %w", p.Type, err)
}
r.Results = append(r.Results, result)
}
@@ -141,7 +140,7 @@ func (c *Client) invokePlugin(ctx context.Context, name string, r *types.Request
}
var result types.Result
if err := json.Unmarshal(out, &result); err != nil {
return nil, errors.Errorf("failed to unmarshal plugin output: %s: %s", err.Error(), msg)
return nil, fmt.Errorf("failed to unmarshal plugin output %s: %w", msg, err)
}
if result.Err() != nil {
return nil, result.Err()

View File

@@ -0,0 +1,504 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptation
import (
"context"
"errors"
"fmt"
"io/fs"
"net"
"os"
"path/filepath"
"sort"
"sync"
"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/log"
)
const (
// DefaultConfigPath is the default path to the NRI configuration.
DefaultConfigPath = "/etc/nri/nri.conf"
// DefaultPluginPath is the default path to search for NRI plugins.
DefaultPluginPath = "/opt/nri/plugins"
// DefaultSocketPath is the default socket path for external plugins.
DefaultSocketPath = api.DefaultSocketPath
)
// SyncFn is a container runtime function for state synchronization.
type SyncFn func(context.Context, SyncCB) error
// SyncCB is an NRI function used to synchronize plugins with the runtime.
type SyncCB func(context.Context, []*PodSandbox, []*Container) ([]*ContainerUpdate, error)
// UpdateFn is a container runtime function for unsolicited container updates.
type UpdateFn func(context.Context, []*ContainerUpdate) ([]*ContainerUpdate, error)
// Adaptation is the NRI abstraction for container runtime NRI adaptation/integration.
type Adaptation struct {
sync.Mutex
name string
version string
configPath string
pluginPath string
socketPath string
syncFn SyncFn
updateFn UpdateFn
cfg *Config
listener net.Listener
plugins []*plugin
}
var (
// Used instead of nil Context in logging.
noCtx = context.TODO()
)
// Option to apply to the NRI runtime.
type Option func(*Adaptation) error
// WithConfigPath returns an option to override the default NRI config path.
func WithConfigPath(path string) Option {
return func(r *Adaptation) error {
r.configPath = path
return nil
}
}
// WithConfig returns an option to provide a pre-parsed NRI configuration.
func WithConfig(cfg *Config) Option {
return func(r *Adaptation) error {
r.cfg = cfg
r.configPath = cfg.path
return nil
}
}
// WithPluginPath returns an option to override the default NRI plugin path.
func WithPluginPath(path string) Option {
return func(r *Adaptation) error {
r.pluginPath = path
return nil
}
}
// WithSocketPath returns an option to override the default NRI socket path.
func WithSocketPath(path string) Option {
return func(r *Adaptation) error {
r.socketPath = path
return nil
}
}
// New creates a new NRI Runtime.
func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option) (*Adaptation, error) {
var err error
r := &Adaptation{
name: name,
version: version,
syncFn: syncFn,
updateFn: updateFn,
configPath: DefaultConfigPath,
pluginPath: DefaultPluginPath,
socketPath: DefaultSocketPath,
}
for _, o := range opts {
if err = o(r); err != nil {
return nil, fmt.Errorf("failed to apply option: %w", err)
}
}
if r.cfg == nil {
if r.cfg, err = ReadConfig(r.configPath); err != nil {
return nil, err
}
}
log.Infof(noCtx, "runtime interface created")
return r, nil
}
// Start up the NRI runtime.
func (r *Adaptation) Start() error {
log.Infof(noCtx, "runtime interface starting up...")
r.Lock()
defer r.Unlock()
if err := r.startPlugins(); err != nil {
return err
}
if err := r.startListener(); err != nil {
return err
}
return nil
}
// Stop the NRI runtime.
func (r *Adaptation) Stop() {
log.Infof(noCtx, "runtime interface shutting down...")
r.Lock()
defer r.Unlock()
r.stopListener()
r.stopPlugins()
}
// RunPodSandbox relays the corresponding CRI event to plugins.
func (r *Adaptation) RunPodSandbox(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_RUN_POD_SANDBOX
return r.StateChange(ctx, evt)
}
// StopPodSandbox relays the corresponding CRI event to plugins.
func (r *Adaptation) StopPodSandbox(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_STOP_POD_SANDBOX
return r.StateChange(ctx, evt)
}
// RemovePodSandbox relays the corresponding CRI event to plugins.
func (r *Adaptation) RemovePodSandbox(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_REMOVE_POD_SANDBOX
return r.StateChange(ctx, evt)
}
// CreateContainer relays the corresponding CRI request to plugins.
func (r *Adaptation) CreateContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
r.Lock()
defer r.Unlock()
defer r.removeClosedPlugins()
result := collectCreateContainerResult(req)
for _, plugin := range r.plugins {
rpl, err := plugin.createContainer(ctx, req)
if err != nil {
return nil, err
}
err = result.apply(rpl, plugin.name())
if err != nil {
return nil, err
}
}
return result.createContainerResponse(), nil
}
// PostCreateContainer relays the corresponding CRI event to plugins.
func (r *Adaptation) PostCreateContainer(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_POST_CREATE_CONTAINER
return r.StateChange(ctx, evt)
}
// StartContainer relays the corresponding CRI event to plugins.
func (r *Adaptation) StartContainer(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_START_CONTAINER
return r.StateChange(ctx, evt)
}
// PostStartContainer relays the corresponding CRI event to plugins.
func (r *Adaptation) PostStartContainer(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_POST_START_CONTAINER
return r.StateChange(ctx, evt)
}
// UpdateContainer relays the corresponding CRI request to plugins.
func (r *Adaptation) UpdateContainer(ctx context.Context, req *UpdateContainerRequest) (*UpdateContainerResponse, error) {
r.Lock()
defer r.Unlock()
defer r.removeClosedPlugins()
result := collectUpdateContainerResult(req)
for _, plugin := range r.plugins {
rpl, err := plugin.updateContainer(ctx, req)
if err != nil {
return nil, err
}
err = result.apply(rpl, plugin.name())
if err != nil {
return nil, err
}
}
return result.updateContainerResponse(), nil
}
// PostUpdateContainer relays the corresponding CRI event to plugins.
func (r *Adaptation) PostUpdateContainer(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_POST_UPDATE_CONTAINER
return r.StateChange(ctx, evt)
}
// StopContainer relays the corresponding CRI request to plugins.
func (r *Adaptation) StopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error) {
r.Lock()
defer r.Unlock()
defer r.removeClosedPlugins()
result := collectStopContainerResult()
for _, plugin := range r.plugins {
rpl, err := plugin.stopContainer(ctx, req)
if err != nil {
return nil, err
}
err = result.apply(rpl, plugin.name())
if err != nil {
return nil, err
}
}
return result.stopContainerResponse(), nil
}
// RemoveContainer relays the corresponding CRI event to plugins.
func (r *Adaptation) RemoveContainer(ctx context.Context, evt *StateChangeEvent) error {
evt.Event = Event_REMOVE_CONTAINER
return r.StateChange(ctx, evt)
}
// StateChange relays pod- or container events to plugins.
func (r *Adaptation) StateChange(ctx context.Context, evt *StateChangeEvent) error {
if evt.Event == Event_UNKNOWN {
return errors.New("invalid (unset) event in state change notification")
}
r.Lock()
defer r.Unlock()
defer r.removeClosedPlugins()
for _, plugin := range r.plugins {
err := plugin.StateChange(ctx, evt)
if err != nil {
return err
}
}
return nil
}
// Perform a set of unsolicited container updates requested by a plugin.
func (r *Adaptation) updateContainers(ctx context.Context, req []*ContainerUpdate) ([]*ContainerUpdate, error) {
r.Lock()
defer r.Unlock()
return r.updateFn(ctx, req)
}
// Start up pre-installed plugins.
func (r *Adaptation) startPlugins() (retErr error) {
var plugins []*plugin
log.Infof(noCtx, "starting plugins...")
ids, names, configs, err := r.discoverPlugins()
if err != nil {
return err
}
defer func() {
if retErr != nil {
for _, p := range plugins {
p.stop()
}
}
}()
for i, name := range names {
log.Infof(noCtx, "starting plugin %q...", name)
id := ids[i]
p, err := newLaunchedPlugin(r.pluginPath, id, name, configs[i])
if err != nil {
return fmt.Errorf("failed to start NRI plugin %q: %w", name, err)
}
if err := p.start(r.name, r.version); err != nil {
return err
}
plugins = append(plugins, p)
}
r.plugins = plugins
r.sortPlugins()
return nil
}
// Stop plugins.
func (r *Adaptation) stopPlugins() {
log.Infof(noCtx, "stopping plugins...")
for _, p := range r.plugins {
p.stop()
}
r.plugins = nil
}
func (r *Adaptation) removeClosedPlugins() {
active := []*plugin{}
for _, p := range r.plugins {
if !p.isClosed() {
active = append(active, p)
}
}
r.plugins = active
}
func (r *Adaptation) startListener() error {
if r.cfg.DisableConnections {
log.Infof(noCtx, "connection from external plugins disabled")
return nil
}
os.Remove(r.socketPath)
if err := os.MkdirAll(filepath.Dir(r.socketPath), 0755); err != nil {
return fmt.Errorf("failed to create socket %q: %w", r.socketPath, err)
}
l, err := net.ListenUnix("unix", &net.UnixAddr{
Name: r.socketPath,
Net: "unix",
})
if err != nil {
return fmt.Errorf("failed to create socket %q: %w", r.socketPath, err)
}
r.acceptPluginConnections(l)
return nil
}
func (r *Adaptation) stopListener() {
if r.listener != nil {
r.listener.Close()
}
}
func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
r.listener = l
ctx := context.Background()
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Infof(ctx, "stopped accepting plugin connections (%v)", err)
return
}
p, err := newExternalPlugin(conn)
if err != nil {
log.Errorf(ctx, "failed to create external plugin: %v", err)
continue
}
if err := p.start(r.name, r.version); err != nil {
log.Errorf(ctx, "failed to start external plugin: %v", err)
continue
}
r.Lock()
err = r.syncFn(ctx, p.synchronize)
if err != nil {
log.Infof(ctx, "failed to synchronize plugin: %v", err)
} else {
r.plugins = append(r.plugins, p)
r.sortPlugins()
}
r.Unlock()
log.Infof(ctx, "plugin %q connected", p.name())
}
}()
return nil
}
func (r *Adaptation) discoverPlugins() ([]string, []string, []string, error) {
var (
plugins []string
indices []string
configs []string
entries []os.DirEntry
info fs.FileInfo
err error
)
if entries, err = os.ReadDir(r.pluginPath); err != nil {
if os.IsNotExist(err) {
return nil, nil, nil, nil
}
return nil, nil, nil, fmt.Errorf("failed to discover plugins in %s: %w",
r.pluginPath, err)
}
for _, e := range entries {
if e.IsDir() {
continue
}
if info, err = e.Info(); err != nil {
continue
}
if info.Mode()&fs.FileMode(0o111) == 0 {
continue
}
name := e.Name()
idx, base, err := api.ParsePluginName(name)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to discover plugins in %s: %w",
r.pluginPath, err)
}
cfg, err := r.cfg.getPluginConfig(idx, base)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to discover plugins in %s: %w",
r.pluginPath, err)
}
log.Infof(noCtx, "discovered plugin %s", name)
indices = append(indices, idx)
plugins = append(plugins, base)
configs = append(configs, cfg)
}
return indices, plugins, configs, nil
}
func (r *Adaptation) sortPlugins() {
r.removeClosedPlugins()
sort.Slice(r.plugins, func(i, j int) bool {
return r.plugins[i].idx < r.plugins[j].idx
})
if len(r.plugins) > 0 {
log.Infof(noCtx, "plugin invocation order")
for i, p := range r.plugins {
log.Infof(noCtx, " #%d: %q (%s)", i+1, p.name(), p.qualifiedName())
}
}
}

151
vendor/github.com/containerd/nri/pkg/adaptation/api.go generated vendored Normal file
View File

@@ -0,0 +1,151 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptation
import (
"github.com/containerd/nri/pkg/api"
)
//
// Alias types, consts and functions from api for the runtime.
//
// Aliased request/response/event types for api/api.proto.
// nolint
type (
RegisterPluginRequest = api.RegisterPluginRequest
RegisterPluginResponse = api.Empty
UpdateContainersRequest = api.UpdateContainersRequest
UpdateContainersResponse = api.UpdateContainersResponse
ConfigureRequest = api.ConfigureRequest
ConfigureResponse = api.ConfigureResponse
SynchronizeRequest = api.SynchronizeRequest
SynchronizeResponse = api.SynchronizeResponse
CreateContainerRequest = api.CreateContainerRequest
CreateContainerResponse = api.CreateContainerResponse
UpdateContainerRequest = api.UpdateContainerRequest
UpdateContainerResponse = api.UpdateContainerResponse
StopContainerRequest = api.StopContainerRequest
StopContainerResponse = api.StopContainerResponse
StateChangeEvent = api.StateChangeEvent
StateChangeResponse = api.StateChangeResponse
RunPodSandboxRequest = api.RunPodSandboxRequest
StopPodSandboxRequest = api.StopPodSandboxRequest
RemovePodSandboxRequest = api.RemovePodSandboxRequest
StartContainerRequest = api.StartContainerRequest
StartContainerResponse = api.StartContainerResponse
RemoveContainerRequest = api.RemoveContainerRequest
RemoveContainerResponse = api.RemoveContainerResponse
PostCreateContainerRequest = api.PostCreateContainerRequest
PostCreateContainerResponse = api.PostCreateContainerResponse
PostStartContainerRequest = api.PostStartContainerRequest
PostStartContainerResponse = api.PostStartContainerResponse
PostUpdateContainerRequest = api.PostUpdateContainerRequest
PostUpdateContainerResponse = api.PostUpdateContainerResponse
PodSandbox = api.PodSandbox
LinuxPodSandbox = api.LinuxPodSandbox
Container = api.Container
ContainerAdjustment = api.ContainerAdjustment
LinuxContainerAdjustment = api.LinuxContainerAdjustment
ContainerUpdate = api.ContainerUpdate
LinuxContainerUpdate = api.LinuxContainerUpdate
ContainerEviction = api.ContainerEviction
ContainerState = api.ContainerState
KeyValue = api.KeyValue
Mount = api.Mount
LinuxContainer = api.LinuxContainer
LinuxNamespace = api.LinuxNamespace
LinuxResources = api.LinuxResources
LinuxCPU = api.LinuxCPU
LinuxMemory = api.LinuxMemory
LinuxDevice = api.LinuxDevice
LinuxDeviceCgroup = api.LinuxDeviceCgroup
HugepageLimit = api.HugepageLimit
Hooks = api.Hooks
Hook = api.Hook
EventMask = api.EventMask
)
// Aliased consts for api/api.proto.
// nolint
const (
Event_UNKNOWN = api.Event_UNKNOWN
Event_RUN_POD_SANDBOX = api.Event_RUN_POD_SANDBOX
Event_STOP_POD_SANDBOX = api.Event_STOP_POD_SANDBOX
Event_REMOVE_POD_SANDBOX = api.Event_REMOVE_POD_SANDBOX
Event_CREATE_CONTAINER = api.Event_CREATE_CONTAINER
Event_POST_CREATE_CONTAINER = api.Event_POST_CREATE_CONTAINER
Event_START_CONTAINER = api.Event_START_CONTAINER
Event_POST_START_CONTAINER = api.Event_POST_START_CONTAINER
Event_UPDATE_CONTAINER = api.Event_UPDATE_CONTAINER
Event_POST_UPDATE_CONTAINER = api.Event_POST_UPDATE_CONTAINER
Event_STOP_CONTAINER = api.Event_STOP_CONTAINER
Event_REMOVE_CONTAINER = api.Event_REMOVE_CONTAINER
ValidEvents = api.ValidEvents
ContainerState_CONTAINER_UNKNOWN = api.ContainerState_CONTAINER_UNKNOWN
ContainerState_CONTAINER_CREATED = api.ContainerState_CONTAINER_CREATED
ContainerState_CONTAINER_PAUSED = api.ContainerState_CONTAINER_PAUSED
ContainerState_CONTAINER_RUNNING = api.ContainerState_CONTAINER_RUNNING
ContainerState_CONTAINER_STOPPED = api.ContainerState_CONTAINER_STOPPED
ContainerState_CONTAINER_EXITED = api.ContainerState_CONTAINER_STOPPED
)
// Aliased types for api/optional.go.
// nolint
type (
OptionalString = api.OptionalString
OptionalInt = api.OptionalInt
OptionalInt32 = api.OptionalInt32
OptionalUInt32 = api.OptionalUInt32
OptionalInt64 = api.OptionalInt64
OptionalUInt64 = api.OptionalUInt64
OptionalBool = api.OptionalBool
OptionalFileMode = api.OptionalFileMode
)
// Aliased functions for api/optional.go.
// nolint
var (
String = api.String
Int = api.Int
Int32 = api.Int32
UInt32 = api.UInt32
Int64 = api.Int64
UInt64 = api.UInt64
Bool = api.Bool
FileMode = api.FileMode
)
// Aliased functions for api/types.go.
// nolint
var (
FromOCIMounts = api.FromOCIMounts
FromOCIHooks = api.FromOCIHooks
FromOCILinuxNamespaces = api.FromOCILinuxNamespaces
FromOCILinuxDevices = api.FromOCILinuxDevices
FromOCILinuxResources = api.FromOCILinuxResources
DupStringSlice = api.DupStringSlice
DupStringMap = api.DupStringMap
IsMarkedForRemoval = api.IsMarkedForRemoval
MarkForRemoval = api.MarkForRemoval
)

View File

@@ -0,0 +1,95 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptation
import (
"fmt"
"os"
"path/filepath"
"sigs.k8s.io/yaml"
)
const (
// PluginConfigSubdir is the drop-in directory for plugin configuration.
PluginConfigSubdir = "conf.d"
)
// Config is the runtime configuration for NRI.
type Config struct {
// DisableConnections disables plugin-initiated connections.
DisableConnections bool `json:"disableConnections"`
path string
dropIn string
}
// DefaultConfig returns the default NRI configuration for a given path.
// This configuration should be identical to what ReadConfig would return
// for an empty file at the given location. If the given path is empty,
// DefaultConfigPath is used instead.
func DefaultConfig(path string) *Config {
if path == "" {
path = DefaultConfigPath
}
return &Config{
path: path,
dropIn: filepath.Join(filepath.Dir(path), PluginConfigSubdir),
}
}
// ReadConfig reads the NRI runtime configuration from a file.
func ReadConfig(path string) (*Config, error) {
buf, err := os.ReadFile(path)
if os.IsNotExist(err) {
return nil, err
}
if err != nil {
return nil, fmt.Errorf("failed to read file %q: %w", path, err)
}
cfg := &Config{}
err = yaml.UnmarshalStrict(buf, cfg)
if err != nil {
return nil, fmt.Errorf("failed to parse file %q: %w", path, err)
}
cfg.path = path
cfg.dropIn = filepath.Join(filepath.Dir(path), PluginConfigSubdir)
return cfg, nil
}
func (cfg *Config) getPluginConfig(id, base string) (string, error) {
name := id + "-" + base
dropIns := []string{
filepath.Join(cfg.dropIn, name+".conf"),
filepath.Join(cfg.dropIn, base+".conf"),
}
for _, path := range dropIns {
buf, err := os.ReadFile(path)
if err == nil {
return string(buf), nil
}
if !os.IsNotExist(err) {
return "", fmt.Errorf("failed to read configuration for plugin %q: %w", name, err)
}
}
return "", nil
}

View File

@@ -0,0 +1,468 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptation
import (
"context"
"errors"
"fmt"
stdnet "net"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/log"
"github.com/containerd/nri/pkg/net"
"github.com/containerd/nri/pkg/net/multiplex"
"github.com/containerd/ttrpc"
)
const (
pluginRegistrationTimeout = 2 * time.Second
pluginRequestTimeout = 2 * time.Second
)
type plugin struct {
sync.Mutex
idx string
base string
cfg string
pid int
cmd *exec.Cmd
mux multiplex.Mux
rpcc *ttrpc.Client
rpcl stdnet.Listener
rpcs *ttrpc.Server
events EventMask
closed bool
stub api.PluginService
regC chan error
closeC chan struct{}
r *Adaptation
}
// Launch a pre-installed plugin with a pre-connected socketpair.
func newLaunchedPlugin(dir, idx, base, cfg string) (p *plugin, retErr error) {
name := idx + "-" + base
sockets, err := net.NewSocketPair()
if err != nil {
return nil, fmt.Errorf("failed to create plugin connection for plugin %q: %w", name, err)
}
defer sockets.Close()
conn, err := sockets.LocalConn()
if err != nil {
return nil, fmt.Errorf("failed to set up local connection for plugin %q: %w", name, err)
}
peerFile := sockets.PeerFile()
defer func() {
peerFile.Close()
if retErr != nil {
conn.Close()
}
}()
cmd := exec.Command(filepath.Join(dir, name))
cmd.ExtraFiles = []*os.File{peerFile}
cmd.Env = []string{
api.PluginNameEnvVar + "=" + name,
api.PluginIdxEnvVar + "=" + idx,
api.PluginSocketEnvVar + "=3",
}
p = &plugin{
cfg: cfg,
cmd: cmd,
idx: idx,
base: base,
regC: make(chan error, 1),
closeC: make(chan struct{}),
}
if err = p.cmd.Start(); err != nil {
return nil, fmt.Errorf("failed launch plugin %q: %w", p.name(), err)
}
if err = p.connect(conn); err != nil {
return nil, err
}
return p, nil
}
// Create a plugin (stub) for an accepted external plugin connection.
func newExternalPlugin(conn stdnet.Conn) (p *plugin, retErr error) {
p = &plugin{
regC: make(chan error, 1),
closeC: make(chan struct{}),
}
if err := p.connect(conn); err != nil {
return nil, err
}
return p, nil
}
// Check if the plugin is external (was not launched by us).
func (p *plugin) isExternal() bool {
return p.cmd == nil
}
// 'connect' a plugin, setting up multiplexing on its socket.
func (p *plugin) connect(conn stdnet.Conn) (retErr error) {
mux := multiplex.Multiplex(conn, multiplex.WithBlockedRead())
defer func() {
if retErr != nil {
mux.Close()
}
}()
pconn, err := mux.Open(multiplex.PluginServiceConn)
if err != nil {
return fmt.Errorf("failed to mux plugin connection for plugin %q: %w", p.name(), err)
}
rpcc := ttrpc.NewClient(pconn, ttrpc.WithOnClose(
func() {
log.Infof(noCtx, "connection to plugin %q closed", p.name())
close(p.closeC)
p.close()
}))
defer func() {
if retErr != nil {
rpcc.Close()
}
}()
stub := api.NewPluginClient(rpcc)
rpcs, err := ttrpc.NewServer()
if err != nil {
return fmt.Errorf("failed to create ttrpc server for plugin %q: %w", p.name(), err)
}
defer func() {
if retErr != nil {
rpcs.Close()
}
}()
rpcl, err := mux.Listen(multiplex.RuntimeServiceConn)
if err != nil {
return fmt.Errorf("failed to create mux runtime listener for plugin %q: %w", p.name(), err)
}
p.mux = mux
p.rpcc = rpcc
p.rpcl = rpcl
p.rpcs = rpcs
p.stub = stub
p.pid, err = getPeerPid(p.mux.Trunk())
if err != nil {
log.Warnf(noCtx, "failed to determine plugin pid pid: %v", err)
}
api.RegisterRuntimeService(p.rpcs, p)
return nil
}
// Start Runtime service, wait for plugin to register, then configure it.
func (p *plugin) start(name, version string) error {
var err error
go func() {
err := p.rpcs.Serve(context.Background(), p.rpcl)
if err != ttrpc.ErrServerClosed {
log.Infof(noCtx, "ttrpc server for plugin %q closed (%v)", p.name(), err)
}
p.close()
}()
p.mux.Unblock()
select {
case err = <-p.regC:
if err != nil {
return fmt.Errorf("failed to register plugin: %w", err)
}
case <-p.closeC:
return fmt.Errorf("failed to register plugin, connection closed: %w", err)
case <-time.After(pluginRegistrationTimeout):
p.close()
p.stop()
return errors.New("plugin registration timed out")
}
err = p.configure(context.Background(), name, version, p.cfg)
if err != nil {
p.close()
p.stop()
return err
}
return nil
}
// close a plugin shutting down its multiplexed ttrpc connections.
func (p *plugin) close() {
p.Lock()
defer p.Unlock()
if p.closed {
return
}
p.closed = true
p.mux.Close()
p.rpcc.Close()
p.rpcs.Close()
p.rpcl.Close()
}
func (p *plugin) isClosed() bool {
p.Lock()
defer p.Unlock()
return p.closed
}
// stop a plugin (if it was launched by us)
func (p *plugin) stop() error {
if p.isExternal() || p.cmd.Process == nil {
return nil
}
// TODO(klihub):
// We should attempt a graceful shutdown of the process here...
// - send it SIGINT
// - give the it some slack waiting with a timeout
// - butcher it with SIGKILL after the timeout
p.cmd.Process.Kill()
p.cmd.Process.Wait()
p.cmd.Process.Release()
return nil
}
// Name returns a string indentication for the plugin.
func (p *plugin) name() string {
return p.idx + "-" + p.base
}
func (p *plugin) qualifiedName() string {
var kind, idx, base string
if p.isExternal() {
kind = "external"
} else {
kind = "pre-connected"
}
if idx = p.idx; idx == "" {
idx = "??"
}
if base = p.base; base == "" {
base = "plugin"
}
return kind + ":" + idx + "-" + base + "[" + strconv.Itoa(p.pid) + "]"
}
// RegisterPlugin handles the plugin's registration request.
func (p *plugin) RegisterPlugin(ctx context.Context, req *RegisterPluginRequest) (*RegisterPluginResponse, error) {
if p.isExternal() {
if req.PluginName == "" {
p.regC <- fmt.Errorf("plugin %q registered empty name", p.qualifiedName())
return &RegisterPluginResponse{}, errors.New("invalid (empty) plugin name")
}
if err := api.CheckPluginIndex(req.PluginIdx); err != nil {
p.regC <- fmt.Errorf("plugin %q registered invalid index: %w", req.PluginName, err)
return &RegisterPluginResponse{}, fmt.Errorf("invalid plugin index: %w", err)
}
p.base = req.PluginName
p.idx = req.PluginIdx
}
log.Infof(ctx, "plugin %q registered as %q", p.qualifiedName(), p.name())
p.regC <- nil
return &RegisterPluginResponse{}, nil
}
// UpdateContainers relays container update request to the runtime.
func (p *plugin) UpdateContainers(ctx context.Context, req *UpdateContainersRequest) (*UpdateContainersResponse, error) {
log.Infof(ctx, "plugin %q requested container updates", p.name())
failed, err := p.r.updateContainers(ctx, req.Update)
return &UpdateContainersResponse{
Failed: failed,
}, err
}
// configure the plugin and subscribe it for the events it requested.
func (p *plugin) configure(ctx context.Context, name, version, config string) error {
ctx, cancel := context.WithTimeout(ctx, pluginRequestTimeout)
defer cancel()
rpl, err := p.stub.Configure(ctx, &ConfigureRequest{
Config: config,
RuntimeName: name,
RuntimeVersion: version,
})
if err != nil {
return fmt.Errorf("failed to configure plugin: %w", err)
}
events := EventMask(rpl.Events)
if events != 0 {
if extra := events &^ ValidEvents; extra != 0 {
return fmt.Errorf("invalid plugin events: 0x%x", extra)
}
} else {
events = ValidEvents
}
p.events = events
return nil
}
// synchronize the plugin with the current state of the runtime.
func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers []*Container) ([]*ContainerUpdate, error) {
log.Infof(ctx, "synchronizing plugin %s", p.name())
ctx, cancel := context.WithTimeout(ctx, pluginRequestTimeout)
defer cancel()
req := &SynchronizeRequest{
Pods: pods,
Containers: containers,
}
rpl, err := p.stub.Synchronize(ctx, req)
if err != nil {
return nil, err
}
return rpl.Update, nil
}
// Relay CreateContainer request to plugin.
func (p *plugin) createContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
if !p.events.IsSet(Event_CREATE_CONTAINER) {
return nil, nil
}
ctx, cancel := context.WithTimeout(ctx, pluginRequestTimeout)
defer cancel()
rpl, err := p.stub.CreateContainer(ctx, req)
if err != nil {
if isFatalError(err) {
log.Errorf(ctx, "closing plugin %s, failed to handle CreateContainer request: %w",
p.name(), err)
p.close()
return nil, nil
}
return nil, err
}
return rpl, nil
}
// Relay UpdateContainer request to plugin.
func (p *plugin) updateContainer(ctx context.Context, req *UpdateContainerRequest) (*UpdateContainerResponse, error) {
if !p.events.IsSet(Event_UPDATE_CONTAINER) {
return nil, nil
}
ctx, cancel := context.WithTimeout(ctx, pluginRequestTimeout)
defer cancel()
rpl, err := p.stub.UpdateContainer(ctx, req)
if err != nil {
if isFatalError(err) {
log.Errorf(ctx, "closing plugin %s, failed to handle UpdateContainer request: %w",
p.name(), err)
p.close()
return nil, nil
}
return nil, err
}
return rpl, nil
}
// Relay StopContainer request to the plugin.
func (p *plugin) stopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error) {
if !p.events.IsSet(Event_STOP_CONTAINER) {
return nil, nil
}
ctx, cancel := context.WithTimeout(ctx, pluginRequestTimeout)
defer cancel()
rpl, err := p.stub.StopContainer(ctx, req)
if err != nil {
if isFatalError(err) {
log.Errorf(ctx, "closing plugin %s, failed to handle StopContainer request: %w",
p.name(), err)
p.close()
return nil, nil
}
return nil, err
}
return rpl, nil
}
// Relay other pod or container state change events to the plugin.
func (p *plugin) StateChange(ctx context.Context, evt *StateChangeEvent) error {
if !p.events.IsSet(evt.Event) {
return nil
}
ctx, cancel := context.WithTimeout(ctx, pluginRequestTimeout)
defer cancel()
_, err := p.stub.StateChange(ctx, evt)
if err != nil {
if isFatalError(err) {
log.Errorf(ctx, "closing plugin %s, failed to handle event %d: %w",
p.name(), evt.Event, err)
p.close()
return nil
}
return err
}
return nil
}
// isFatalError returns true if the error is fatal and the plugin connection shoudld be closed.
func isFatalError(err error) bool {
switch {
case errors.Is(err, ttrpc.ErrClosed):
return true
case errors.Is(err, ttrpc.ErrServerClosed):
return true
case errors.Is(err, ttrpc.ErrProtocol):
return true
case errors.Is(err, context.DeadlineExceeded):
return true
}
return false
}

View File

@@ -0,0 +1,55 @@
//go:build linux
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptation
import (
"errors"
"fmt"
stdnet "net"
"golang.org/x/sys/unix"
)
// getPeerPid returns the process id at the other end of the connection.
func getPeerPid(conn stdnet.Conn) (int, error) {
var cred *unix.Ucred
uc, ok := conn.(*stdnet.UnixConn)
if !ok {
return 0, errors.New("invalid connection, not *net.UnixConn")
}
raw, err := uc.SyscallConn()
if err != nil {
return 0, fmt.Errorf("failed get raw unix domain connection: %w", err)
}
ctrlErr := raw.Control(func(fd uintptr) {
cred, err = unix.GetsockoptUcred(int(fd), unix.SOL_SOCKET, unix.SO_PEERCRED)
})
if err != nil {
return 0, fmt.Errorf("failed to get process credentials: %w", err)
}
if ctrlErr != nil {
return 0, fmt.Errorf("uc.SyscallConn().Control() failed: %w", ctrlErr)
}
return int(cred.Pid), nil
}

View File

@@ -0,0 +1,31 @@
//go:build !linux
// +build !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adaptation
import (
"fmt"
"net"
"runtime"
)
// getPeerPid returns the process id at the other end of the connection.
func getPeerPid(conn net.Conn) (int, error) {
return 0, fmt.Errorf("getPeerPid() unimplemented on %s", runtime.GOOS)
}

1245
vendor/github.com/containerd/nri/pkg/adaptation/result.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

295
vendor/github.com/containerd/nri/pkg/api/adjustment.go generated vendored Normal file
View File

@@ -0,0 +1,295 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
//
// Notes:
// Adjustment of metadata that is stored in maps (labels and annotations)
// currently assumes that a single plugin will never do an add prior to a
// delete for any key. IOW, it is always assumed that if both a deletion
// and an addition/setting was recorded for a key then the final desired
// state is the addition. This seems like a reasonably safe assumption. A
// removal is usually done only to protect against triggering the conflict
// in the runtime when a plugin intends to touch a key which is known to
// have been put there or already modified by another plugin.
//
// An alternative without this implicit ordering assumption would be to
// store the adjustment for such data as a sequence of add/del operations
// in a slice. At the moment that does not seem to be necessary.
//
// AddAnnotation records the addition of the annotation key=value.
func (a *ContainerAdjustment) AddAnnotation(key, value string) {
a.initAnnotations()
a.Annotations[key] = value
}
// RemoveAnnotation records the removal of the annotation for the given key.
// Normally it is an error for a plugin to try and alter an annotation
// touched by another plugin. However, this is not an error if the plugin
// removes that annotation prior to touching it.
func (a *ContainerAdjustment) RemoveAnnotation(key string) {
a.initAnnotations()
a.Annotations[MarkForRemoval(key)] = ""
}
// AddMount records the addition of a mount to a container.
func (a *ContainerAdjustment) AddMount(m *Mount) {
a.Mounts = append(a.Mounts, m) // TODO: should we dup m here ?
}
// RemoveMount records the removal of a mount from a container.
// Normally it is an error for a plugin to try and alter a mount
// touched by another plugin. However, this is not an error if the
// plugin removes that mount prior to touching it.
func (a *ContainerAdjustment) RemoveMount(ContainerPath string) {
a.Mounts = append(a.Mounts, &Mount{
Destination: MarkForRemoval(ContainerPath),
})
}
// AddEnv records the addition of an environment variable to a container.
func (a *ContainerAdjustment) AddEnv(key, value string) {
a.Env = append(a.Env, &KeyValue{
Key: key,
Value: value,
})
}
// RemoveEnv records the removal of an environment variable from a container.
// Normally it is an error for a plugin to try and alter an environment
// variable touched by another container. However, this is not an error if
// the plugin removes that variable prior to touching it.
func (a *ContainerAdjustment) RemoveEnv(key string) {
a.Env = append(a.Env, &KeyValue{
Key: MarkForRemoval(key),
})
}
// AddHooks records the addition of the given hooks to a container.
func (a *ContainerAdjustment) AddHooks(h *Hooks) {
a.initHooks()
if h.Prestart != nil {
a.Hooks.Prestart = append(a.Hooks.Prestart, h.Prestart...)
}
if h.CreateRuntime != nil {
a.Hooks.CreateRuntime = append(a.Hooks.CreateRuntime, h.CreateRuntime...)
}
if h.CreateContainer != nil {
a.Hooks.CreateContainer = append(a.Hooks.CreateContainer, h.CreateContainer...)
}
if h.StartContainer != nil {
a.Hooks.StartContainer = append(a.Hooks.StartContainer, h.StartContainer...)
}
if h.Poststart != nil {
a.Hooks.Poststart = append(a.Hooks.Poststart, h.Poststart...)
}
if h.Poststop != nil {
a.Hooks.Poststop = append(a.Hooks.Poststop, h.Poststop...)
}
}
// AddDevice records the addition of the given device to a container.
func (a *ContainerAdjustment) AddDevice(d *LinuxDevice) {
a.initLinux()
a.Linux.Devices = append(a.Linux.Devices, d) // TODO: should we dup d here ?
}
// RemoveDevice records the removal of a device from a container.
// Normally it is an error for a plugin to try and alter an device
// touched by another container. However, this is not an error if
// the plugin removes that device prior to touching it.
func (a *ContainerAdjustment) RemoveDevice(path string) {
a.initLinux()
a.Linux.Devices = append(a.Linux.Devices, &LinuxDevice{
Path: MarkForRemoval(path),
})
}
// SetLinuxMemoryLimit records setting the memory limit for a container.
func (a *ContainerAdjustment) SetLinuxMemoryLimit(value int64) {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.Limit = Int64(value)
}
// SetLinuxMemoryReservation records setting the memory reservation for a container.
func (a *ContainerAdjustment) SetLinuxMemoryReservation(value int64) {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.Reservation = Int64(value)
}
// SetLinuxMemorySwap records records setting the memory swap limit for a container.
func (a *ContainerAdjustment) SetLinuxMemorySwap(value int64) {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.Swap = Int64(value)
}
// SetLinuxMemoryKernel records setting the memory kernel limit for a container.
func (a *ContainerAdjustment) SetLinuxMemoryKernel(value int64) {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.Kernel = Int64(value)
}
// SetLinuxMemoryKernelTCP records setting the memory kernel TCP limit for a container.
func (a *ContainerAdjustment) SetLinuxMemoryKernelTCP(value int64) {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.KernelTcp = Int64(value)
}
// SetLinuxMemorySwappiness records setting the memory swappiness for a container.
func (a *ContainerAdjustment) SetLinuxMemorySwappiness(value uint64) {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.Swappiness = UInt64(value)
}
// SetLinuxMemoryDisableOomKiller records disabling the OOM killer for a container.
func (a *ContainerAdjustment) SetLinuxMemoryDisableOomKiller() {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.DisableOomKiller = Bool(true)
}
// SetLinuxMemoryUseHierarchy records enabling hierarchical memory accounting for a container.
func (a *ContainerAdjustment) SetLinuxMemoryUseHierarchy() {
a.initLinuxResourcesMemory()
a.Linux.Resources.Memory.UseHierarchy = Bool(true)
}
// SetLinuxCPUShares records setting the scheduler's CPU shares for a container.
func (a *ContainerAdjustment) SetLinuxCPUShares(value uint64) {
a.initLinuxResourcesCPU()
a.Linux.Resources.Cpu.Shares = UInt64(value)
}
// SetLinuxCPUQuota records setting the scheduler's CPU quota for a container.
func (a *ContainerAdjustment) SetLinuxCPUQuota(value int64) {
a.initLinuxResourcesCPU()
a.Linux.Resources.Cpu.Quota = Int64(value)
}
// SetLinuxCPUPeriod records setting the scheduler's CPU period for a container.
func (a *ContainerAdjustment) SetLinuxCPUPeriod(value int64) {
a.initLinuxResourcesCPU()
a.Linux.Resources.Cpu.Period = UInt64(value)
}
// SetLinuxCPURealtimeRuntime records setting the scheduler's realtime runtime for a container.
func (a *ContainerAdjustment) SetLinuxCPURealtimeRuntime(value int64) {
a.initLinuxResourcesCPU()
a.Linux.Resources.Cpu.RealtimeRuntime = Int64(value)
}
// SetLinuxCPURealtimePeriod records setting the scheduler's realtime period for a container.
func (a *ContainerAdjustment) SetLinuxCPURealtimePeriod(value uint64) {
a.initLinuxResourcesCPU()
a.Linux.Resources.Cpu.RealtimePeriod = UInt64(value)
}
// SetLinuxCPUSetCPUs records setting the cpuset CPUs for a container.
func (a *ContainerAdjustment) SetLinuxCPUSetCPUs(value string) {
a.initLinuxResourcesCPU()
a.Linux.Resources.Cpu.Cpus = value
}
// SetLinuxCPUSetMems records setting the cpuset memory for a container.
func (a *ContainerAdjustment) SetLinuxCPUSetMems(value string) {
a.initLinuxResourcesCPU()
a.Linux.Resources.Cpu.Mems = value
}
// AddLinuxHugepageLimit records adding a hugepage limit for a container.
func (a *ContainerAdjustment) AddLinuxHugepageLimit(pageSize string, value uint64) {
a.initLinuxResources()
a.Linux.Resources.HugepageLimits = append(a.Linux.Resources.HugepageLimits,
&HugepageLimit{
PageSize: pageSize,
Limit: value,
})
}
// SetLinuxBlockIOClass records setting the Block I/O class for a container.
func (a *ContainerAdjustment) SetLinuxBlockIOClass(value string) {
a.initLinuxResources()
a.Linux.Resources.BlockioClass = String(value)
}
// SetLinuxRDTClass records setting the RDT class for a container.
func (a *ContainerAdjustment) SetLinuxRDTClass(value string) {
a.initLinuxResources()
a.Linux.Resources.RdtClass = String(value)
}
// AddLinuxUnified sets a cgroupv2 unified resource.
func (a *ContainerAdjustment) AddLinuxUnified(key, value string) {
a.initLinuxResourcesUnified()
a.Linux.Resources.Unified[key] = value
}
// SetLinuxCgroupsPath records setting the cgroups path for a container.
func (a *ContainerAdjustment) SetLinuxCgroupsPath(value string) {
a.initLinux()
a.Linux.CgroupsPath = value
}
//
// Initializing a container adjustment and container update.
//
func (a *ContainerAdjustment) initAnnotations() {
if a.Annotations == nil {
a.Annotations = make(map[string]string)
}
}
func (a *ContainerAdjustment) initHooks() {
if a.Hooks == nil {
a.Hooks = &Hooks{}
}
}
func (a *ContainerAdjustment) initLinux() {
if a.Linux == nil {
a.Linux = &LinuxContainerAdjustment{}
}
}
func (a *ContainerAdjustment) initLinuxResources() {
a.initLinux()
if a.Linux.Resources == nil {
a.Linux.Resources = &LinuxResources{}
}
}
func (a *ContainerAdjustment) initLinuxResourcesMemory() {
a.initLinuxResources()
if a.Linux.Resources.Memory == nil {
a.Linux.Resources.Memory = &LinuxMemory{}
}
}
func (a *ContainerAdjustment) initLinuxResourcesCPU() {
a.initLinuxResources()
if a.Linux.Resources.Cpu == nil {
a.Linux.Resources.Cpu = &LinuxCPU{}
}
}
func (a *ContainerAdjustment) initLinuxResourcesUnified() {
a.initLinuxResources()
if a.Linux.Resources.Unified == nil {
a.Linux.Resources.Unified = make(map[string]string)
}
}

4295
vendor/github.com/containerd/nri/pkg/api/api.pb.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

441
vendor/github.com/containerd/nri/pkg/api/api.proto generated vendored Normal file
View File

@@ -0,0 +1,441 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package nri.pkg.api.v1alpha1;
option go_package = "github.com/containerd/nri/pkg/api;api";
// Runtime service is the public API runtimes expose for NRI plugins.
// On this interface RPC requests are initiated by the plugin. This
// only covers plugin registration and unsolicited container updates.
// The rest of the API is defined by the Plugin service.
service Runtime {
// RegisterPlugin registers the plugin with the runtime.
rpc RegisterPlugin(RegisterPluginRequest) returns (Empty);
// UpdateContainers requests unsolicited updates to a set of containers.
rpc UpdateContainers(UpdateContainersRequest) returns (UpdateContainersResponse);
}
message RegisterPluginRequest {
// Name of the plugin to register.
string plugin_name = 1;
// Plugin invocation index. Plugins are called in ascending index order.
string plugin_idx = 2;
}
message UpdateContainersRequest {
// List of containers to update.
repeated ContainerUpdate update = 1;
// List of containers to evict.
repeated ContainerEviction evict = 2;
}
message UpdateContainersResponse {
// Containers that the runtime failed to udpate.
repeated ContainerUpdate failed = 1;
}
//
// Plugin is the API NRI uses to interact with plugins. It is used to
// - configure a plugin and subscribe it for lifecycle events
// - synchronize the state of a plugin with that of the runtime
// - hook a plugin into the lifecycle events of its interest
//
// During configuration the plugin tells the runtime which lifecycle events
// it wishes to get hooked into. Once configured, the plugin is synchronized
// with the runtime by receiving the list of pods and containers known to
// the runtime. The plugin can request changes to any of the containers in
// response. After initial synchronization the plugin starts receiving the
// events it subscribed for as they occur in the runtime. For container
// creation, update, and stop events, the plugin can request changes, both
// to the container that triggered the event or any other existing container
// in the runtime.
//
// For a subset of the container lifecycle events, NRI defines an additional
// Post-variant of the event. These variants are defined for CreateContainer,
// StartContainer, and UpdateContainer. For creation and update, these events
// can be used by plugins to discover the full extent of changes applied to
// the container, including any changes made by other active plugins.
//
service Plugin {
// Configure the plugin and get its event subscription.
rpc Configure(ConfigureRequest) returns (ConfigureResponse);
// Synchronize the plugin with the state of the runtime.
rpc Synchronize(SynchronizeRequest) returns (SynchronizeResponse);
// Shutdown a plugin (let it know the runtime is going down).
rpc Shutdown(Empty) returns (Empty);
// CreateContainer relays the corresponding request to the plugin. In
// response, the plugin can adjust the container being created, and
// update other containers in the runtime. Container adjustment can
// alter labels, annotations, mounts, devices, environment variables,
// OCI hooks, and assigned container resources. Updates can alter
// assigned container resources.
rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse);
// UpdateContainer relays the corresponding request to the plugin.
// The plugin can alter how the container is updated and request updates
// to additional containers in the runtime.
rpc UpdateContainer(UpdateContainerRequest) returns (UpdateContainerResponse);
// StopContainer relays the corresponding request to the plugin. The plugin
// can update any of the remaining containers in the runtime in response.
rpc StopContainer(StopContainerRequest) returns (StopContainerResponse);
// StateChange relays any remaining pod or container lifecycle/state change
// events the plugin has subscribed for. These can be used to trigger any
// plugin-specific processing which needs to occur in connection with any of
// these events.
rpc StateChange(StateChangeEvent) returns (Empty);
}
message ConfigureRequest {
// Any plugin-specific data, if present among the NRI configuration.
string config = 1;
// Name of the runtime NRI is running in.
string runtime_name = 2;
// Version of the runtime NRI is running in.
string runtime_version = 3;
}
message ConfigureResponse {
// Events to subscribe the plugin for. Each bit set corresponds to an
// enumerated Event.
int32 events = 2;
}
message SynchronizeRequest {
// Pods known to the runtime.
repeated PodSandbox pods = 1;
// Containers known to the runtime.
repeated Container containers = 2;
}
message SynchronizeResponse {
// Updates to containers requested by the plugin.
repeated ContainerUpdate update = 1;
}
message CreateContainerRequest {
// Pod of container being created.
PodSandbox pod = 1;
// Container being created.
Container container = 2;
}
message CreateContainerResponse {
// Requested adjustments to container being created.
ContainerAdjustment adjust = 1;
// Requested updates to other existing containers.
repeated ContainerUpdate update = 2;
// Requested eviction of existing containers.
repeated ContainerEviction evict = 3;
}
message UpdateContainerRequest {
// Pod of container being updated.
PodSandbox pod = 1;
// Container being updated.
Container container = 2;
// Resources to update.
LinuxResources linux_resources = 3;
}
message UpdateContainerResponse {
// Requested updates to containers.
repeated ContainerUpdate update = 1;
// Requested eviction of containers.
repeated ContainerEviction evict = 2;
}
message StopContainerRequest {
// Pod of container being stopped.
PodSandbox pod = 1;
// Container being stopped.
Container container = 2;
}
message StopContainerResponse {
// Requested updates to containers.
repeated ContainerUpdate update = 1;
}
message StateChangeEvent {
// Event type of notification.
Event event = 1;
// Pod this notification is sent for. If this event is related to a container,
// pod is set to the pod of the container.
PodSandbox pod = 2;
// Container this notification is sent for. If the event is related to a pod,
// container is nil.
Container container = 3;
}
// Empty response for those *Requests that are semantically events.
message Empty {}
// Events that plugins can subscribe to in ConfigureResponse.
enum Event {
UNKNOWN = 0;
RUN_POD_SANDBOX = 1;
STOP_POD_SANDBOX = 2;
REMOVE_POD_SANDBOX = 3;
CREATE_CONTAINER = 4;
POST_CREATE_CONTAINER = 5;
START_CONTAINER = 6;
POST_START_CONTAINER = 7;
UPDATE_CONTAINER = 8;
POST_UPDATE_CONTAINER = 9;
STOP_CONTAINER = 10;
REMOVE_CONTAINER = 11;
LAST = 12;
}
// Pod metadata that is considered relevant for a plugin.
message PodSandbox {
string id = 1;
string name = 2;
string uid = 3;
string namespace = 4;
map<string, string> labels = 5;
map<string, string> annotations = 6;
string runtime_handler = 7;
LinuxPodSandbox linux = 8;
uint32 pid = 9; // for NRI v1 emulation
}
// PodSandbox linux-specific metadata
message LinuxPodSandbox {
LinuxResources pod_overhead = 1;
LinuxResources pod_resources = 2;
string cgroup_parent = 3;
string cgroups_path = 4; // for NRI v1 emulation
repeated LinuxNamespace namespaces = 5; // for NRI v1 emulation
LinuxResources resources = 6; // for NRI v1 emulation
}
// Container metadata that is considered relevant for a plugin.
message Container {
string id = 1;
string pod_sandbox_id = 2;
string name = 3;
ContainerState state = 4;
map<string, string> labels = 5;
map<string, string> annotations = 6;
repeated string args = 7;
repeated string env = 8;
repeated Mount mounts = 9;
Hooks hooks = 10;
LinuxContainer linux = 11;
uint32 pid = 12; // for NRI v1 emulation
}
// Possible container states.
enum ContainerState {
CONTAINER_UNKNOWN = 0;
CONTAINER_CREATED = 1;
CONTAINER_PAUSED = 2; // is this useful/necessary ?
CONTAINER_RUNNING = 3;
CONTAINER_STOPPED = 4;
}
// A container mount.
message Mount {
string destination = 1;
string type = 2;
string source = 3;
repeated string options = 4;
}
// Container OCI hooks.
message Hooks {
repeated Hook prestart = 1;
repeated Hook create_runtime = 2;
repeated Hook create_container = 3;
repeated Hook start_container = 4;
repeated Hook poststart = 5;
repeated Hook poststop = 6;
}
// One OCI hook.
message Hook {
string path = 1;
repeated string args = 2;
repeated string env = 3;
OptionalInt timeout = 4;
}
// Container (linux) metadata.
message LinuxContainer {
repeated LinuxNamespace namespaces = 1;
repeated LinuxDevice devices = 2;
LinuxResources resources = 3;
OptionalInt oom_score_adj = 4;
string cgroups_path = 5;
}
// A linux namespace.
message LinuxNamespace {
string type = 1;
string path = 2;
}
// A container (linux) device.
message LinuxDevice {
string path = 1;
string type = 2;
int64 major = 3;
int64 minor = 4;
OptionalFileMode file_mode = 5;
OptionalUInt32 uid = 6;
OptionalUInt32 gid = 7;
}
// A linux device cgroup controller rule.
message LinuxDeviceCgroup {
bool allow = 1;
string type = 2;
OptionalInt64 major = 3;
OptionalInt64 minor = 4;
string access = 5;
}
// Container (linux) resources.
message LinuxResources {
LinuxMemory memory = 1;
LinuxCPU cpu = 2;
repeated HugepageLimit hugepage_limits = 3;
OptionalString blockio_class = 4;
OptionalString rdt_class = 5;
map<string, string> unified = 6;
repeated LinuxDeviceCgroup devices = 7; // for NRI v1 emulation
}
// Memory-related parts of (linux) resources.
message LinuxMemory {
OptionalInt64 limit = 1;
OptionalInt64 reservation = 2;
OptionalInt64 swap = 3;
OptionalInt64 kernel = 4;
OptionalInt64 kernel_tcp = 5;
OptionalUInt64 swappiness = 6;
OptionalBool disable_oom_killer = 7;
OptionalBool use_hierarchy = 8;
}
// CPU-related parts of (linux) resources.
message LinuxCPU {
OptionalUInt64 shares = 1;
OptionalInt64 quota = 2;
OptionalUInt64 period = 3;
OptionalInt64 realtime_runtime = 4;
OptionalUInt64 realtime_period = 5;
string cpus = 6;
string mems = 7;
}
// Container huge page limit.
message HugepageLimit {
string page_size = 1;
uint64 limit = 2;
}
// Requested adjustments to a container being created.
message ContainerAdjustment {
map<string, string> annotations = 2;
repeated Mount mounts = 3;
repeated KeyValue env = 4;
Hooks hooks = 5;
LinuxContainerAdjustment linux = 6;
}
// Adjustments to (linux) resources.
message LinuxContainerAdjustment {
repeated LinuxDevice devices = 1;
LinuxResources resources = 2;
string cgroups_path = 3;
}
// Requested update to an already created container.
message ContainerUpdate {
string container_id = 1;
LinuxContainerUpdate linux = 2;
bool ignore_failure = 3;
}
// Updates to (linux) resources.
message LinuxContainerUpdate {
LinuxResources resources = 1;
}
// Request to evict (IOW unsolicitedly stop) a container.
message ContainerEviction {
// Container to evict.
string container_id = 1;
// Human-readable reason for eviction.
string reason = 2;
}
// KeyValue represents an environment variable.
message KeyValue {
string key = 1;
string value = 2;
}
// An optional string value.
message OptionalString {
string value = 1;
}
// An optional signed integer value.
message OptionalInt {
int64 value = 1;
}
// An optional 32-bit signed integer value.
message OptionalInt32 {
int32 value = 1;
}
// An optional 32-bit unsigned integer value.
message OptionalUInt32 {
uint32 value = 1;
}
// An optional 64-bit signed integer value.
message OptionalInt64 {
int64 value = 1;
}
// An optional 64-bit unsigned integer value.
message OptionalUInt64 {
uint64 value = 1;
}
// An optional boolean value.
message OptionalBool {
bool value = 1;
}
// An optional value of file permissions.
message OptionalFileMode {
uint32 value = 1;
}

View File

@@ -0,0 +1,179 @@
// Code generated by protoc-gen-go-ttrpc. DO NOT EDIT.
// source: pkg/api/api.proto
package api
import (
context "context"
ttrpc "github.com/containerd/ttrpc"
)
type RuntimeService interface {
RegisterPlugin(ctx context.Context, req *RegisterPluginRequest) (*Empty, error)
UpdateContainers(ctx context.Context, req *UpdateContainersRequest) (*UpdateContainersResponse, error)
}
func RegisterRuntimeService(srv *ttrpc.Server, svc RuntimeService) {
srv.Register("nri.pkg.api.v1alpha1.Runtime", map[string]ttrpc.Method{
"RegisterPlugin": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req RegisterPluginRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.RegisterPlugin(ctx, &req)
},
"UpdateContainers": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req UpdateContainersRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.UpdateContainers(ctx, &req)
},
})
}
type runtimeClient struct {
client *ttrpc.Client
}
func NewRuntimeClient(client *ttrpc.Client) RuntimeService {
return &runtimeClient{
client: client,
}
}
func (c *runtimeClient) RegisterPlugin(ctx context.Context, req *RegisterPluginRequest) (*Empty, error) {
var resp Empty
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Runtime", "RegisterPlugin", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *runtimeClient) UpdateContainers(ctx context.Context, req *UpdateContainersRequest) (*UpdateContainersResponse, error) {
var resp UpdateContainersResponse
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Runtime", "UpdateContainers", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
type PluginService interface {
Configure(ctx context.Context, req *ConfigureRequest) (*ConfigureResponse, error)
Synchronize(ctx context.Context, req *SynchronizeRequest) (*SynchronizeResponse, error)
Shutdown(ctx context.Context, req *Empty) (*Empty, error)
CreateContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error)
UpdateContainer(ctx context.Context, req *UpdateContainerRequest) (*UpdateContainerResponse, error)
StopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error)
StateChange(ctx context.Context, req *StateChangeEvent) (*Empty, error)
}
func RegisterPluginService(srv *ttrpc.Server, svc PluginService) {
srv.Register("nri.pkg.api.v1alpha1.Plugin", map[string]ttrpc.Method{
"Configure": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req ConfigureRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Configure(ctx, &req)
},
"Synchronize": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req SynchronizeRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Synchronize(ctx, &req)
},
"Shutdown": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req Empty
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Shutdown(ctx, &req)
},
"CreateContainer": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req CreateContainerRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.CreateContainer(ctx, &req)
},
"UpdateContainer": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req UpdateContainerRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.UpdateContainer(ctx, &req)
},
"StopContainer": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req StopContainerRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.StopContainer(ctx, &req)
},
"StateChange": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req StateChangeEvent
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.StateChange(ctx, &req)
},
})
}
type pluginClient struct {
client *ttrpc.Client
}
func NewPluginClient(client *ttrpc.Client) PluginService {
return &pluginClient{
client: client,
}
}
func (c *pluginClient) Configure(ctx context.Context, req *ConfigureRequest) (*ConfigureResponse, error) {
var resp ConfigureResponse
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Plugin", "Configure", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *pluginClient) Synchronize(ctx context.Context, req *SynchronizeRequest) (*SynchronizeResponse, error) {
var resp SynchronizeResponse
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Plugin", "Synchronize", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *pluginClient) Shutdown(ctx context.Context, req *Empty) (*Empty, error) {
var resp Empty
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Plugin", "Shutdown", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *pluginClient) CreateContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
var resp CreateContainerResponse
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Plugin", "CreateContainer", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *pluginClient) UpdateContainer(ctx context.Context, req *UpdateContainerRequest) (*UpdateContainerResponse, error) {
var resp UpdateContainerResponse
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Plugin", "UpdateContainer", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *pluginClient) StopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error) {
var resp StopContainerResponse
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Plugin", "StopContainer", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *pluginClient) StateChange(ctx context.Context, req *StateChangeEvent) (*Empty, error) {
var resp Empty
if err := c.client.Call(ctx, "nri.pkg.api.v1alpha1.Plugin", "StateChange", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}

89
vendor/github.com/containerd/nri/pkg/api/device.go generated vendored Normal file
View File

@@ -0,0 +1,89 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
rspec "github.com/opencontainers/runtime-spec/specs-go"
)
// FromOCILinuxDevices returns a device slice from an OCI runtime Spec.
func FromOCILinuxDevices(o []rspec.LinuxDevice) []*LinuxDevice {
var devices []*LinuxDevice
for _, d := range o {
devices = append(devices, &LinuxDevice{
Path: d.Path,
Type: d.Type,
Major: d.Major,
Minor: d.Minor,
FileMode: FileMode(d.FileMode),
Uid: UInt32(d.UID),
Gid: UInt32(d.GID),
})
}
return devices
}
// ToOCI returns the linux devices for an OCI runtime Spec.
func (d *LinuxDevice) ToOCI() rspec.LinuxDevice {
if d == nil {
return rspec.LinuxDevice{}
}
return rspec.LinuxDevice{
Path: d.Path,
Type: d.Type,
Major: d.Major,
Minor: d.Minor,
FileMode: d.FileMode.Get(),
UID: d.Uid.Get(),
GID: d.Gid.Get(),
}
}
// AccessString returns an OCI access string for the device.
func (d *LinuxDevice) AccessString() string {
r, w, m := "r", "w", ""
if mode := d.FileMode.Get(); mode != nil {
perm := mode.Perm()
if (perm & 0444) != 0 {
r = "r"
}
if (perm & 0222) != 0 {
w = "w"
}
}
if d.Type == "b" {
m = "m"
}
return r + w + m
}
// Cmp returns true if the devices are equal.
func (d *LinuxDevice) Cmp(v *LinuxDevice) bool {
if v == nil {
return false
}
return d.Major != v.Major || d.Minor != v.Minor
}
// IsMarkedForRemoval checks if a LinuxDevice is marked for removal.
func (d *LinuxDevice) IsMarkedForRemoval() (string, bool) {
key, marked := IsMarkedForRemoval(d.Path)
return key, marked
}

17
vendor/github.com/containerd/nri/pkg/api/doc.go generated vendored Normal file
View File

@@ -0,0 +1,17 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api

60
vendor/github.com/containerd/nri/pkg/api/env.go generated vendored Normal file
View File

@@ -0,0 +1,60 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"strings"
)
// ToOCI returns an OCI Env entry for the KeyValue.
func (e *KeyValue) ToOCI() string {
return e.Key + "=" + e.Value
}
// FromOCIEnv returns KeyValues from an OCI runtime Spec environment.
func FromOCIEnv(in []string) []*KeyValue {
if in == nil {
return nil
}
out := []*KeyValue{}
for _, keyval := range in {
var key, val string
split := strings.SplitN(keyval, "=", 2)
switch len(split) {
case 0:
continue
case 1:
key = split[0]
case 2:
key = split[0]
val = split[1]
default:
val = strings.Join(split[1:], "=")
}
out = append(out, &KeyValue{
Key: key,
Value: val,
})
}
return out
}
// IsMarkedForRemoval checks if an environment variable is marked for removal.
func (e *KeyValue) IsMarkedForRemoval() (string, bool) {
key, marked := IsMarkedForRemoval(e.Key)
return key, marked
}

172
vendor/github.com/containerd/nri/pkg/api/event.go generated vendored Normal file
View File

@@ -0,0 +1,172 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"fmt"
"strings"
)
const (
// ValidEvents is the event mask of all valid events.
ValidEvents = EventMask((1 << (Event_LAST - 1)) - 1)
)
// nolint
type (
// Define *Request/*Response type aliases for *Event/Empty pairs.
StateChangeResponse = Empty
RunPodSandboxRequest = StateChangeEvent
RunPodSandboxResponse = Empty
StopPodSandboxRequest = StateChangeEvent
StopPodSandboxResponse = Empty
RemovePodSandboxRequest = StateChangeEvent
RemovePodSandboxResponse = Empty
StartContainerRequest = StateChangeEvent
StartContainerResponse = Empty
RemoveContainerRequest = StateChangeEvent
RemoveContainerResponse = Empty
PostCreateContainerRequest = StateChangeEvent
PostCreateContainerResponse = Empty
PostStartContainerRequest = StateChangeEvent
PostStartContainerResponse = Empty
PostUpdateContainerRequest = StateChangeEvent
PostUpdateContainerResponse = Empty
ShutdownRequest = Empty
ShutdownResponse = Empty
)
// EventMask corresponds to a set of enumerated Events.
type EventMask int32
// ParseEventMask parses a string representation into an EventMask.
func ParseEventMask(events ...string) (EventMask, error) {
var mask EventMask
bits := map[string]Event{
"runpodsandbox": Event_RUN_POD_SANDBOX,
"stoppodsandbox": Event_STOP_POD_SANDBOX,
"removepodsandbox": Event_REMOVE_POD_SANDBOX,
"createcontainer": Event_CREATE_CONTAINER,
"postcreatecontainer": Event_POST_CREATE_CONTAINER,
"startcontainer": Event_START_CONTAINER,
"poststartcontainer": Event_POST_START_CONTAINER,
"updatecontainer": Event_UPDATE_CONTAINER,
"postupdatecontainer": Event_POST_UPDATE_CONTAINER,
"stopcontainer": Event_STOP_CONTAINER,
"removecontainer": Event_REMOVE_CONTAINER,
}
for _, event := range events {
lcEvents := strings.ToLower(event)
for _, name := range strings.Split(lcEvents, ",") {
switch name {
case "all":
mask |= ValidEvents
continue
case "pod", "podsandbox":
for name, bit := range bits {
if strings.Contains(name, "Pod") {
mask.Set(bit)
}
}
continue
case "container":
for name, bit := range bits {
if strings.Contains(name, "Container") {
mask.Set(bit)
}
}
continue
}
bit, ok := bits[strings.TrimSpace(name)]
if !ok {
return 0, fmt.Errorf("unknown event %q", name)
}
mask.Set(bit)
}
}
return mask, nil
}
// MustParseEventMask parses the given events, panic()ing on errors.
func MustParseEventMask(events ...string) EventMask {
mask, err := ParseEventMask(events...)
if err != nil {
panic(fmt.Sprintf("failed to parse events %s", strings.Join(events, " ")))
}
return mask
}
// PrettyString returns a human-readable string representation of an EventMask.
func (m *EventMask) PrettyString() string {
names := map[Event]string{
Event_RUN_POD_SANDBOX: "RunPodSandbox",
Event_STOP_POD_SANDBOX: "StopPodSandbox",
Event_REMOVE_POD_SANDBOX: "RemovePodSandbox",
Event_CREATE_CONTAINER: "CreateContainer",
Event_POST_CREATE_CONTAINER: "PostCreateContainer",
Event_START_CONTAINER: "StartContainer",
Event_POST_START_CONTAINER: "PostStartContainer",
Event_UPDATE_CONTAINER: "UpdateContainer",
Event_POST_UPDATE_CONTAINER: "PostUpdateContainer",
Event_STOP_CONTAINER: "StopContainer",
Event_REMOVE_CONTAINER: "RemoveContainer",
}
mask := *m
events, sep := "", ""
for bit := Event_UNKNOWN + 1; bit <= Event_LAST; bit++ {
if mask.IsSet(bit) {
events += sep + names[bit]
sep = ","
mask.Clear(bit)
}
}
if mask != 0 {
events += sep + fmt.Sprintf("unknown(0x%x)", mask)
}
return events
}
// Set sets the given Events in the mask.
func (m *EventMask) Set(events ...Event) *EventMask {
for _, e := range events {
*m |= (1 << (e - 1))
}
return m
}
// Clear clears the given Events in the mask.
func (m *EventMask) Clear(events ...Event) *EventMask {
for _, e := range events {
*m &^= (1 << (e - 1))
}
return m
}
// IsSet check if the given Event is set in the mask.
func (m *EventMask) IsSet(e Event) bool {
return *m&(1<<(e-1)) != 0
}

59
vendor/github.com/containerd/nri/pkg/api/helpers.go generated vendored Normal file
View File

@@ -0,0 +1,59 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
// DupStringSlice creates a copy of a string slice.
func DupStringSlice(in []string) []string {
if in == nil {
return nil
}
out := make([]string, len(in))
copy(out, in)
return out
}
// DupStringMap creates a copy of a map with string keys and values.
func DupStringMap(in map[string]string) map[string]string {
if in == nil {
return nil
}
out := map[string]string{}
for k, v := range in {
out[k] = v
}
return out
}
// IsMarkedForRemoval checks if a key is marked for removal.
//
// The key can be an annotation name, a mount container path, a device path,
// or an environment variable name. These are all marked for removal in
// adjustments by preceding their corresponding key with a '-'.
func IsMarkedForRemoval(key string) (string, bool) {
if key == "" {
return "", false
}
if key[0] != '-' {
return key, false
}
return key[1:], true
}
// MarkForRemoval returns a key marked for removal.
func MarkForRemoval(key string) string {
return "-" + key
}

103
vendor/github.com/containerd/nri/pkg/api/hooks.go generated vendored Normal file
View File

@@ -0,0 +1,103 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
rspec "github.com/opencontainers/runtime-spec/specs-go"
)
// Append appends the given hooks to the existing ones.
func (hooks *Hooks) Append(h *Hooks) *Hooks {
if h == nil {
return hooks
}
hooks.Prestart = append(hooks.Prestart, h.Prestart...)
hooks.CreateRuntime = append(hooks.CreateRuntime, h.CreateRuntime...)
hooks.CreateContainer = append(hooks.CreateContainer, h.CreateContainer...)
hooks.StartContainer = append(hooks.StartContainer, h.StartContainer...)
hooks.Poststart = append(hooks.Poststart, h.Poststart...)
hooks.Poststop = append(hooks.Poststop, h.Poststop...)
return hooks
}
// Hooks returns itself it any of its hooks is set. Otherwise it returns nil.
func (hooks *Hooks) Hooks() *Hooks {
if hooks == nil {
return nil
}
if len(hooks.Prestart) > 0 {
return hooks
}
if len(hooks.CreateRuntime) > 0 {
return hooks
}
if len(hooks.CreateContainer) > 0 {
return hooks
}
if len(hooks.StartContainer) > 0 {
return hooks
}
if len(hooks.Poststart) > 0 {
return hooks
}
if len(hooks.Poststop) > 0 {
return hooks
}
return nil
}
// ToOCI returns the hook for an OCI runtime Spec.
func (h *Hook) ToOCI() rspec.Hook {
return rspec.Hook{
Path: h.Path,
Args: DupStringSlice(h.Args),
Env: DupStringSlice(h.Env),
Timeout: h.Timeout.Get(),
}
}
// FromOCIHooks returns hooks from an OCI runtime Spec.
func FromOCIHooks(o *rspec.Hooks) *Hooks {
if o == nil {
return nil
}
return &Hooks{
Prestart: FromOCIHookSlice(o.Prestart),
CreateRuntime: FromOCIHookSlice(o.CreateRuntime),
CreateContainer: FromOCIHookSlice(o.CreateContainer),
StartContainer: FromOCIHookSlice(o.StartContainer),
Poststart: FromOCIHookSlice(o.Poststart),
Poststop: FromOCIHookSlice(o.Poststop),
}
}
// FromOCIHookSlice returns a hook slice from an OCI runtime Spec.
func FromOCIHookSlice(o []rspec.Hook) []*Hook {
var hooks []*Hook
for _, h := range o {
hooks = append(hooks, &Hook{
Path: h.Path,
Args: DupStringSlice(h.Args),
Env: DupStringSlice(h.Env),
Timeout: Int(h.Timeout),
})
}
return hooks
}

89
vendor/github.com/containerd/nri/pkg/api/mount.go generated vendored Normal file
View File

@@ -0,0 +1,89 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"sort"
rspec "github.com/opencontainers/runtime-spec/specs-go"
)
const (
// SELinuxRelabel is a Mount pseudo-option to request relabeling.
SELinuxRelabel = "relabel"
)
// FromOCIMounts returns a Mount slice for an OCI runtime Spec.
func FromOCIMounts(o []rspec.Mount) []*Mount {
var mounts []*Mount
for _, m := range o {
mounts = append(mounts, &Mount{
Destination: m.Destination,
Type: m.Type,
Source: m.Source,
Options: DupStringSlice(m.Options),
})
}
return mounts
}
// ToOCI returns a Mount for an OCI runtime Spec.
func (m *Mount) ToOCI(propagationQuery *string) rspec.Mount {
o := rspec.Mount{
Destination: m.Destination,
Type: m.Type,
Source: m.Source,
Options: []string{},
}
for _, opt := range m.Options {
o.Options = append(o.Options, opt)
if propagationQuery != nil && (opt == "rprivate" || opt == "rshared" || opt == "rslave") {
*propagationQuery = opt
}
}
return o
}
// Cmp returns true if the mounts are equal.
func (m *Mount) Cmp(v *Mount) bool {
if v == nil {
return false
}
if m.Destination != v.Destination || m.Type != v.Type || m.Source != v.Source ||
len(m.Options) != len(v.Options) {
return false
}
mOpts := make([]string, len(m.Options))
vOpts := make([]string, len(m.Options))
sort.Strings(mOpts)
sort.Strings(vOpts)
for i, o := range mOpts {
if vOpts[i] != o {
return false
}
}
return true
}
// IsMarkedForRemoval checks if a Mount is marked for removal.
func (m *Mount) IsMarkedForRemoval() (string, bool) {
key, marked := IsMarkedForRemoval(m.Destination)
return key, marked
}

33
vendor/github.com/containerd/nri/pkg/api/namespace.go generated vendored Normal file
View File

@@ -0,0 +1,33 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
rspec "github.com/opencontainers/runtime-spec/specs-go"
)
// FromOCILinuxNamespaces returns a namespace slice from an OCI runtime Spec.
func FromOCILinuxNamespaces(o []rspec.LinuxNamespace) []*LinuxNamespace {
var namespaces []*LinuxNamespace
for _, ns := range o {
namespaces = append(namespaces, &LinuxNamespace{
Type: string(ns.Type),
Path: ns.Path,
})
}
return namespaces
}

341
vendor/github.com/containerd/nri/pkg/api/optional.go generated vendored Normal file
View File

@@ -0,0 +1,341 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"os"
)
//
// XXX FIXME:
//
// The optional interface constructor should be updated/split up
// to avoid having to take an interface{} argument. Instead The
// optional types should have a
// - constructor taking the underlying native type
// - a Copy() function for copying them
// - a FromPointer constructor to create them from an optionally nil
// pointer to the underlying native type (to help constructing from
// structures that use a pointer to the native underlying type to
// denote optionality (OCI Spec mostly))
// Creating from any other type should use one of these with any explicit
// cast for the argument as necessary.
//
// String creates an Optional wrapper from its argument.
func String(v interface{}) *OptionalString {
var value string
switch o := v.(type) {
case string:
value = o
case *string:
if o == nil {
return nil
}
value = *o
case *OptionalString:
if o == nil {
return nil
}
value = o.Value
default:
return nil
}
return &OptionalString{
Value: value,
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalString) Get() *string {
if o == nil {
return nil
}
v := o.Value
return &v
}
// Int creates an Optional wrapper from its argument.
func Int(v interface{}) *OptionalInt {
var value int64
switch o := v.(type) {
case int:
value = int64(o)
case *int:
if o == nil {
return nil
}
value = int64(*o)
case *OptionalInt:
if o == nil {
return nil
}
value = o.Value
default:
return nil
}
return &OptionalInt{
Value: value,
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalInt) Get() *int {
if o == nil {
return nil
}
v := int(o.Value)
return &v
}
// Int32 creates an Optional wrapper from its argument.
func Int32(v interface{}) *OptionalInt32 {
var value int32
switch o := v.(type) {
case int32:
value = o
case *int32:
if o == nil {
return nil
}
value = *o
case *OptionalInt32:
if o == nil {
return nil
}
value = o.Value
default:
return nil
}
return &OptionalInt32{
Value: value,
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalInt32) Get() *int32 {
if o == nil {
return nil
}
v := o.Value
return &v
}
// UInt32 creates an Optional wrapper from its argument.
func UInt32(v interface{}) *OptionalUInt32 {
var value uint32
switch o := v.(type) {
case uint32:
value = o
case *uint32:
if o == nil {
return nil
}
value = *o
case *OptionalUInt32:
if o == nil {
return nil
}
value = o.Value
default:
return nil
}
return &OptionalUInt32{
Value: value,
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalUInt32) Get() *uint32 {
if o == nil {
return nil
}
v := o.Value
return &v
}
// Int64 creates an Optional wrapper from its argument.
func Int64(v interface{}) *OptionalInt64 {
var value int64
switch o := v.(type) {
case int:
value = int64(o)
case uint:
value = int64(o)
case uint64:
value = int64(o)
case int64:
value = o
case *int64:
if o == nil {
return nil
}
value = *o
case *uint64:
if o == nil {
return nil
}
value = int64(*o)
case *OptionalInt64:
if o == nil {
return nil
}
value = o.Value
default:
return nil
}
return &OptionalInt64{
Value: value,
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalInt64) Get() *int64 {
if o == nil {
return nil
}
v := o.Value
return &v
}
// UInt64 creates an Optional wrapper from its argument.
func UInt64(v interface{}) *OptionalUInt64 {
var value uint64
switch o := v.(type) {
case int:
value = uint64(o)
case uint:
value = uint64(o)
case int64:
value = uint64(o)
case uint64:
value = o
case *int64:
if o == nil {
return nil
}
value = uint64(*o)
case *uint64:
if o == nil {
return nil
}
value = *o
case *OptionalUInt64:
if o == nil {
return nil
}
value = o.Value
default:
return nil
}
return &OptionalUInt64{
Value: value,
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalUInt64) Get() *uint64 {
if o == nil {
return nil
}
v := o.Value
return &v
}
// Bool creates an Optional wrapper from its argument.
func Bool(v interface{}) *OptionalBool {
var value bool
switch o := v.(type) {
case bool:
value = o
case *bool:
if o == nil {
return nil
}
value = *o
case *OptionalBool:
if o == nil {
return nil
}
value = o.Value
default:
return nil
}
return &OptionalBool{
Value: value,
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalBool) Get() *bool {
if o == nil {
return nil
}
v := o.Value
return &v
}
// FileMode creates an Optional wrapper from its argument.
func FileMode(v interface{}) *OptionalFileMode {
var value os.FileMode
switch o := v.(type) {
case *os.FileMode:
if o == nil {
return nil
}
value = *o
case os.FileMode:
value = o
case *OptionalFileMode:
if o == nil {
return nil
}
value = os.FileMode(o.Value)
case uint32:
value = os.FileMode(o)
default:
return nil
}
return &OptionalFileMode{
Value: uint32(value),
}
}
// Get returns nil if its value is unset or a pointer to the value itself.
func (o *OptionalFileMode) Get() *os.FileMode {
if o == nil {
return nil
}
v := os.FileMode(o.Value)
return &v
}

58
vendor/github.com/containerd/nri/pkg/api/plugin.go generated vendored Normal file
View File

@@ -0,0 +1,58 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"fmt"
"strings"
)
const (
// DefaultSocketPath is the default socket path for external plugins.
DefaultSocketPath = "/var/run/nri.sock"
// PluginSocketEnvVar is used to inform plugins about pre-connected sockets.
PluginSocketEnvVar = "NRI_PLUGIN_SOCKET"
// PluginNameEnvVar is used to inform NRI-launched plugins about their name.
PluginNameEnvVar = "NRI_PLUGIN_NAME"
// PluginIdxEnvVar is used to inform NRI-launched plugins about their ID.
PluginIdxEnvVar = "NRI_PLUGIN_IDX"
)
// ParsePluginName parses the (file)name of a plugin into an index and a base.
func ParsePluginName(name string) (string, string, error) {
split := strings.SplitN(name, "-", 2)
if len(split) < 2 {
return "", "", fmt.Errorf("invalid plugin name %q, idx-pluginname expected", name)
}
if err := CheckPluginIndex(split[0]); err != nil {
return "", "", err
}
return split[0], split[1], nil
}
// CheckPluginIndex checks the validity of a plugin index.
func CheckPluginIndex(idx string) error {
if len(idx) != 2 {
return fmt.Errorf("invalid plugin index %q, must be 2 digits", idx)
}
if !('0' <= idx[0] && idx[0] <= '9') || !('0' <= idx[1] && idx[1] <= '9') {
return fmt.Errorf("invalid plugin index %q (not [0-9][0-9])", idx)
}
return nil
}

230
vendor/github.com/containerd/nri/pkg/api/resources.go generated vendored Normal file
View File

@@ -0,0 +1,230 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
rspec "github.com/opencontainers/runtime-spec/specs-go"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// FromOCILinuxResources returns resources from an OCI runtime Spec.
func FromOCILinuxResources(o *rspec.LinuxResources, ann map[string]string) *LinuxResources {
if o == nil {
return nil
}
l := &LinuxResources{}
if m := o.Memory; m != nil {
l.Memory = &LinuxMemory{
Limit: Int64(m.Limit),
Reservation: Int64(m.Reservation),
Swap: Int64(m.Swap),
Kernel: Int64(m.Kernel),
KernelTcp: Int64(m.KernelTCP),
Swappiness: UInt64(m.Swappiness),
DisableOomKiller: Bool(m.DisableOOMKiller),
UseHierarchy: Bool(m.UseHierarchy),
}
}
if c := o.CPU; c != nil {
l.Cpu = &LinuxCPU{
Shares: UInt64(c.Shares),
Quota: Int64(c.Quota),
Period: UInt64(c.Period),
RealtimeRuntime: Int64(c.RealtimeRuntime),
RealtimePeriod: UInt64(c.RealtimePeriod),
Cpus: c.Cpus,
Mems: c.Mems,
}
}
for _, h := range o.HugepageLimits {
l.HugepageLimits = append(l.HugepageLimits, &HugepageLimit{
PageSize: h.Pagesize,
Limit: h.Limit,
})
}
for _, d := range o.Devices {
l.Devices = append(l.Devices, &LinuxDeviceCgroup{
Allow: d.Allow,
Type: d.Type,
Major: Int64(d.Major),
Minor: Int64(d.Minor),
Access: d.Access,
})
}
return l
}
func FromCRILinuxResources(c *cri.LinuxContainerResources) *LinuxResources {
if c == nil {
return nil
}
shares, quota, period := uint64(c.CpuShares), c.CpuQuota, uint64(c.CpuPeriod)
r := &LinuxResources{
Cpu: &LinuxCPU{
Shares: UInt64(&shares),
Quota: Int64(&quota),
Period: UInt64(&period),
Cpus: c.CpusetCpus,
Mems: c.CpusetMems,
},
Memory: &LinuxMemory{
Limit: Int64(&c.MemoryLimitInBytes),
},
}
for _, l := range c.HugepageLimits {
r.HugepageLimits = append(r.HugepageLimits,
&HugepageLimit{
PageSize: l.PageSize,
Limit: l.Limit,
})
}
return r
}
// ToOCI returns resources for an OCI runtime Spec.
func (r *LinuxResources) ToOCI() *rspec.LinuxResources {
if r == nil {
return nil
}
o := &rspec.LinuxResources{}
if r.Memory != nil {
o.Memory = &rspec.LinuxMemory{
Limit: r.Memory.Limit.Get(),
Reservation: r.Memory.Reservation.Get(),
Swap: r.Memory.Swap.Get(),
Kernel: r.Memory.Kernel.Get(),
KernelTCP: r.Memory.KernelTcp.Get(),
Swappiness: r.Memory.Swappiness.Get(),
DisableOOMKiller: r.Memory.DisableOomKiller.Get(),
UseHierarchy: r.Memory.UseHierarchy.Get(),
}
}
if r.Cpu != nil {
o.CPU = &rspec.LinuxCPU{
Shares: r.Cpu.Shares.Get(),
Quota: r.Cpu.Quota.Get(),
Period: r.Cpu.Period.Get(),
RealtimeRuntime: r.Cpu.RealtimeRuntime.Get(),
RealtimePeriod: r.Cpu.RealtimePeriod.Get(),
Cpus: r.Cpu.Cpus,
Mems: r.Cpu.Mems,
}
}
for _, l := range r.HugepageLimits {
o.HugepageLimits = append(o.HugepageLimits, rspec.LinuxHugepageLimit{
Pagesize: l.PageSize,
Limit: l.Limit,
})
}
if len(r.Unified) != 0 {
o.Unified = make(map[string]string)
for k, v := range r.Unified {
o.Unified[k] = v
}
}
for _, d := range r.Devices {
o.Devices = append(o.Devices, rspec.LinuxDeviceCgroup{
Allow: d.Allow,
Type: d.Type,
Major: d.Major.Get(),
Minor: d.Minor.Get(),
Access: d.Access,
})
}
return o
}
// ToCRI returns resources for CRI.
func (r *LinuxResources) ToCRI(oomScoreAdj int64) *cri.LinuxContainerResources {
if r == nil {
return nil
}
o := &cri.LinuxContainerResources{}
if r.Memory != nil {
o.MemoryLimitInBytes = r.Memory.GetLimit().GetValue()
o.OomScoreAdj = oomScoreAdj
}
if r.Cpu != nil {
o.CpuShares = int64(r.Cpu.GetShares().GetValue())
o.CpuPeriod = int64(r.Cpu.GetPeriod().GetValue())
o.CpuQuota = r.Cpu.GetQuota().GetValue()
o.CpusetCpus = r.Cpu.Cpus
o.CpusetMems = r.Cpu.Mems
}
for _, l := range r.HugepageLimits {
o.HugepageLimits = append(o.HugepageLimits, &cri.HugepageLimit{
PageSize: l.PageSize,
Limit: l.Limit,
})
}
if len(r.Unified) != 0 {
o.Unified = make(map[string]string)
for k, v := range r.Unified {
o.Unified[k] = v
}
}
return o
}
// Copy creates a copy of the resources.
func (r *LinuxResources) Copy() *LinuxResources {
if r == nil {
return nil
}
o := &LinuxResources{}
if r.Memory != nil {
o.Memory = &LinuxMemory{
Limit: Int64(r.Memory.GetLimit()),
Reservation: Int64(r.Memory.GetReservation()),
Swap: Int64(r.Memory.GetSwap()),
Kernel: Int64(r.Memory.GetKernel()),
KernelTcp: Int64(r.Memory.GetKernelTcp()),
Swappiness: UInt64(r.Memory.GetSwappiness()),
DisableOomKiller: Bool(r.Memory.GetDisableOomKiller()),
UseHierarchy: Bool(r.Memory.GetUseHierarchy()),
}
}
if r.Cpu != nil {
o.Cpu = &LinuxCPU{
Shares: UInt64(r.Cpu.GetShares()),
Quota: Int64(r.Cpu.GetQuota()),
Period: UInt64(r.Cpu.GetPeriod()),
RealtimeRuntime: Int64(r.Cpu.GetRealtimeRuntime()),
RealtimePeriod: UInt64(r.Cpu.GetRealtimePeriod()),
Cpus: r.Cpu.GetCpus(),
Mems: r.Cpu.GetMems(),
}
}
for _, l := range r.HugepageLimits {
o.HugepageLimits = append(o.HugepageLimits, &HugepageLimit{
PageSize: l.PageSize,
Limit: l.Limit,
})
}
if len(r.Unified) != 0 {
o.Unified = make(map[string]string)
for k, v := range r.Unified {
o.Unified[k] = v
}
}
o.BlockioClass = String(r.BlockioClass)
o.RdtClass = String(r.RdtClass)
return o
}

186
vendor/github.com/containerd/nri/pkg/api/update.go generated vendored Normal file
View File

@@ -0,0 +1,186 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
//nolint
// SetContainerId sets the id of the container to update.
func (u *ContainerUpdate) SetContainerId(id string) {
u.ContainerId = id
}
// SetLinuxMemoryLimit records setting the memory limit for a container.
func (u *ContainerUpdate) SetLinuxMemoryLimit(value int64) {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.Limit = Int64(value)
}
// SetLinuxMemoryReservation records setting the memory reservation for a container.
func (u *ContainerUpdate) SetLinuxMemoryReservation(value int64) {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.Reservation = Int64(value)
}
// SetLinuxMemorySwap records records setting the memory swap limit for a container.
func (u *ContainerUpdate) SetLinuxMemorySwap(value int64) {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.Swap = Int64(value)
}
// SetLinuxMemoryKernel records setting the memory kernel limit for a container.
func (u *ContainerUpdate) SetLinuxMemoryKernel(value int64) {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.Kernel = Int64(value)
}
// SetLinuxMemoryKernelTCP records setting the memory kernel TCP limit for a container.
func (u *ContainerUpdate) SetLinuxMemoryKernelTCP(value int64) {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.KernelTcp = Int64(value)
}
// SetLinuxMemorySwappiness records setting the memory swappiness for a container.
func (u *ContainerUpdate) SetLinuxMemorySwappiness(value uint64) {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.Swappiness = UInt64(value)
}
// SetLinuxMemoryDisableOomKiller records disabling the OOM killer for a container.
func (u *ContainerUpdate) SetLinuxMemoryDisableOomKiller() {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.DisableOomKiller = Bool(true)
}
// SetLinuxMemoryUseHierarchy records enabling hierarchical memory accounting for a container.
func (u *ContainerUpdate) SetLinuxMemoryUseHierarchy() {
u.initLinuxResourcesMemory()
u.Linux.Resources.Memory.UseHierarchy = Bool(true)
}
// SetLinuxCPUShares records setting the scheduler's CPU shares for a container.
func (u *ContainerUpdate) SetLinuxCPUShares(value uint64) {
u.initLinuxResourcesCPU()
u.Linux.Resources.Cpu.Shares = UInt64(value)
}
// SetLinuxCPUQuota records setting the scheduler's CPU quota for a container.
func (u *ContainerUpdate) SetLinuxCPUQuota(value int64) {
u.initLinuxResourcesCPU()
u.Linux.Resources.Cpu.Quota = Int64(value)
}
// SetLinuxCPUPeriod records setting the scheduler's CPU period for a container.
func (u *ContainerUpdate) SetLinuxCPUPeriod(value int64) {
u.initLinuxResourcesCPU()
u.Linux.Resources.Cpu.Period = UInt64(value)
}
// SetLinuxCPURealtimeRuntime records setting the scheduler's realtime runtime for a container.
func (u *ContainerUpdate) SetLinuxCPURealtimeRuntime(value int64) {
u.initLinuxResourcesCPU()
u.Linux.Resources.Cpu.RealtimeRuntime = Int64(value)
}
// SetLinuxCPURealtimePeriod records setting the scheduler's realtime period for a container.
func (u *ContainerUpdate) SetLinuxCPURealtimePeriod(value uint64) {
u.initLinuxResourcesCPU()
u.Linux.Resources.Cpu.RealtimePeriod = UInt64(value)
}
// SetLinuxCPUSetCPUs records setting the cpuset CPUs for a container.
func (u *ContainerUpdate) SetLinuxCPUSetCPUs(value string) {
u.initLinuxResourcesCPU()
u.Linux.Resources.Cpu.Cpus = value
}
// SetLinuxCPUSetMems records setting the cpuset memory for a container.
func (u *ContainerUpdate) SetLinuxCPUSetMems(value string) {
u.initLinuxResourcesCPU()
u.Linux.Resources.Cpu.Mems = value
}
// AddLinuxHugepageLimit records adding a hugepage limit for a container.
func (u *ContainerUpdate) AddLinuxHugepageLimit(pageSize string, value uint64) {
u.initLinuxResources()
u.Linux.Resources.HugepageLimits = append(u.Linux.Resources.HugepageLimits,
&HugepageLimit{
PageSize: pageSize,
Limit: value,
})
}
// SetLinuxBlockIOClass records setting the Block I/O class for a container.
func (u *ContainerUpdate) SetLinuxBlockIOClass(value string) {
u.initLinuxResources()
u.Linux.Resources.BlockioClass = String(value)
}
// SetLinuxRDTClass records setting the RDT class for a container.
func (u *ContainerUpdate) SetLinuxRDTClass(value string) {
u.initLinuxResources()
u.Linux.Resources.RdtClass = String(value)
}
// AddLinuxUnified sets a cgroupv2 unified resource.
func (u *ContainerUpdate) AddLinuxUnified(key, value string) {
u.initLinuxResourcesUnified()
u.Linux.Resources.Unified[key] = value
}
// SetIgnoreFailure marks an Update as ignored for failures.
// Such updates will not prevent the related container operation
// from succeeding if the update fails.
func (u *ContainerUpdate) SetIgnoreFailure() {
u.IgnoreFailure = true
}
//
// Initializing a container update.
//
func (u *ContainerUpdate) initLinux() {
if u.Linux == nil {
u.Linux = &LinuxContainerUpdate{}
}
}
func (u *ContainerUpdate) initLinuxResources() {
u.initLinux()
if u.Linux.Resources == nil {
u.Linux.Resources = &LinuxResources{}
}
}
func (u *ContainerUpdate) initLinuxResourcesMemory() {
u.initLinuxResources()
if u.Linux.Resources.Memory == nil {
u.Linux.Resources.Memory = &LinuxMemory{}
}
}
func (u *ContainerUpdate) initLinuxResourcesCPU() {
u.initLinuxResources()
if u.Linux.Resources.Cpu == nil {
u.Linux.Resources.Cpu = &LinuxCPU{}
}
}
func (u *ContainerUpdate) initLinuxResourcesUnified() {
u.initLinuxResources()
if u.Linux.Resources.Unified == nil {
u.Linux.Resources.Unified = make(map[string]string)
}
}

87
vendor/github.com/containerd/nri/pkg/log/log.go generated vendored Normal file
View File

@@ -0,0 +1,87 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"context"
"github.com/sirupsen/logrus"
)
var (
log Logger = &fallbackLogger{}
)
// Logger is the interface NRI uses for logging.
type Logger interface {
Debugf(ctx context.Context, format string, args ...interface{})
Infof(ctx context.Context, format string, args ...interface{})
Warnf(ctx context.Context, format string, args ...interface{})
Errorf(ctx context.Context, format string, args ...interface{})
}
// Set the logger used by NRI.
func Set(l Logger) {
log = l
}
// Get the logger used by NRI.
func Get() Logger {
return log
}
// Debugf logs a formatted debug message.
func Debugf(ctx context.Context, format string, args ...interface{}) {
log.Debugf(ctx, format, args...)
}
// Infof logs a formatted informational message.
func Infof(ctx context.Context, format string, args ...interface{}) {
log.Infof(ctx, format, args...)
}
// Warnf logs a formatted warning message.
func Warnf(ctx context.Context, format string, args ...interface{}) {
log.Warnf(ctx, format, args...)
}
// Errorf logs a formatted error message.
func Errorf(ctx context.Context, format string, args ...interface{}) {
log.Errorf(ctx, format, args...)
}
type fallbackLogger struct{}
// Debugf logs a formatted debug message.
func (f *fallbackLogger) Debugf(ctx context.Context, format string, args ...interface{}) {
logrus.WithContext(ctx).Debugf(format, args...)
}
// Infof logs a formatted informational message.
func (f *fallbackLogger) Infof(ctx context.Context, format string, args ...interface{}) {
logrus.WithContext(ctx).Infof(format, args...)
}
// Warnf logs a formatted warning message.
func (f *fallbackLogger) Warnf(ctx context.Context, format string, args ...interface{}) {
logrus.WithContext(ctx).Warnf(format, args...)
}
// Errorf logs a formatted error message.
func (f *fallbackLogger) Errorf(ctx context.Context, format string, args ...interface{}) {
logrus.WithContext(ctx).Errorf(format, args...)
}

93
vendor/github.com/containerd/nri/pkg/net/conn.go generated vendored Normal file
View File

@@ -0,0 +1,93 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"fmt"
"io"
"net"
"os"
"strconv"
"sync"
)
// NewFdConn creates a net.Conn for the given (socket) fd.
func NewFdConn(fd int) (net.Conn, error) {
f := os.NewFile(uintptr(fd), "fd #"+strconv.Itoa(fd))
conn, err := net.FileConn(f)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for fd #%d: %w", fd, err)
}
f.Close()
return conn, nil
}
// connListener wraps a pre-connected socket in a net.Listener.
type connListener struct {
next chan net.Conn
conn net.Conn
addr net.Addr
lock sync.RWMutex // for Close()
closed bool
}
// NewConnListener wraps an existing net.Conn in a net.Listener.
//
// The first call to Accept() on the listener will return the wrapped
// connection. Subsequent calls to Accept() block until the listener
// is closed, then return io.EOF. Close() closes the listener and the
// wrapped connection.
func NewConnListener(conn net.Conn) net.Listener {
next := make(chan net.Conn, 1)
next <- conn
return &connListener{
next: next,
conn: conn,
addr: conn.LocalAddr(),
}
}
// Accept returns the wrapped connection when it is called the first
// time. Later calls to Accept block until the listener is closed, then
// return io.EOF.
func (l *connListener) Accept() (net.Conn, error) {
conn := <-l.next
if conn == nil {
return nil, io.EOF
}
return conn, nil
}
// Close closes the listener and the wrapped connection.
func (l *connListener) Close() error {
l.lock.Lock()
defer l.lock.Unlock()
if l.closed {
return nil
}
close(l.next)
l.closed = true
return l.conn.Close()
}
// Addr returns the local address of the wrapped connection.
func (l *connListener) Addr() net.Addr {
return l.addr
}

View File

@@ -0,0 +1,444 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multiplex
import (
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"sync"
"syscall"
"time"
nrinet "github.com/containerd/nri/pkg/net"
"github.com/containerd/ttrpc"
)
// Mux multiplexes several logical connections over a single net.Conn.
//
// Connections are identified within a Mux by ConnIDs which are simple
// 32-bit unsigned integers. Opening a connection returns a net.Conn
// corrponding to the ConnID. This can then be used to write and read
// data through the connection with the Mux performing multiplexing
// and demultiplexing of data.
//
// Writing to a connection is fully synchronous. The caller can safely
// reuse the buffer once the call returns. Reading from a connection
// returns the oldest demultiplexed buffer for the connection, blocking
// if the connections incoming queue is empty. If any incoming queue is
// ever overflown the underlying trunk and all multiplexed connections
// are closed and an error is recorded. This error is later returned by
// any subsequent read from any connection. All connections of the Mux
// have the same fixed incoming queue length which can be configured
// using the WithReadQueueLength Option during Mux creation.
//
// The Mux interface also provides functions that emulate net.Dial and
// net.Listen for a connection. Usually these can be used for passing
// multiplexed connections to packages that insist to Dial or Accept
// themselves for connection establishment.
//
// Note that opening a connection is a virtual operation in the sense
// that it has no effects outside the Mux. It is performed without any
// signalling or other communication. It merely acquires the net.Conn
// corresponding to the connection and blindly assumes that the same
// ConnID is or will be opened at the other end of the Mux.
type Mux interface {
// Open the connection for the given ConnID.
Open(ConnID) (net.Conn, error)
// Close the Mux and all connections associated with it.
Close() error
// Dialer returns a net.Dial-like function for the connection.
//
// Calling the returned function (with arguments) will return a
// net.Conn for the connection.
Dialer(ConnID) func(string, string) (net.Conn, error)
// Listener returns a net.Listener for the connection. The first
// call to Accept() on the listener will return a net.Conn for the
// connection. Subsequent calls to Accept() will block until the
// connection is closed then return io.EOF.
Listen(ConnID) (net.Listener, error)
// Trunk returns the trunk connection for the Mux.
Trunk() net.Conn
// Unblock unblocks the Mux reader.
Unblock()
}
// ConnID uniquely identifies a logical connection within a Mux.
type ConnID uint32
const (
// ConnID 0 is reserved for future use.
reservedConnID ConnID = iota
// LowestConnID is the lowest externally usable ConnID.
LowestConnID
)
// Option to apply to a Mux.
type Option func(*mux)
// WithBlockedRead causes the Mux to be blocked for reading until gets Unblock()'ed.
func WithBlockedRead() Option {
return func(m *mux) {
if m.blockC == nil {
m.blockC = make(chan struct{})
}
}
}
// WithReadQueueLength overrides the default read queue size.
func WithReadQueueLength(length int) Option {
return func(m *mux) {
m.qlen = length
}
}
// Multiplex returns a multiplexer for the given connection.
func Multiplex(trunk net.Conn, options ...Option) Mux {
return newMux(trunk, options...)
}
// mux is our implementation of Mux.
type mux struct {
trunk net.Conn
writeLock sync.Mutex
conns map[ConnID]*conn
connLock sync.RWMutex
qlen int
errOnce sync.Once
err error
unblkOnce sync.Once
blockC chan struct{}
closeOnce sync.Once
doneC chan struct{}
}
const (
// default read queue length for a single connection
readQueueLen = 256
// length of frame header: 4-byte ConnID, 4-byte payload length
headerLen = 8
// max. allowed payload size
maxPayloadSize = 1 << 24
)
// conn represents a single multiplexed connection.
type conn struct {
id ConnID
mux *mux
readC chan []byte
closeOnce sync.Once
doneC chan error
}
func newMux(trunk net.Conn, options ...Option) *mux {
m := &mux{
trunk: trunk,
conns: make(map[ConnID]*conn),
qlen: readQueueLen,
doneC: make(chan struct{}),
}
for _, o := range options {
o(m)
}
if m.blockC == nil {
WithBlockedRead()(m)
m.Unblock()
}
go m.reader()
return m
}
func (m *mux) Trunk() net.Conn {
return m.trunk
}
func (m *mux) Unblock() {
m.unblkOnce.Do(func() {
close(m.blockC)
})
}
func (m *mux) Open(id ConnID) (net.Conn, error) {
if id == reservedConnID {
return nil, fmt.Errorf("ConnID %d is reserved", id)
}
m.connLock.Lock()
defer m.connLock.Unlock()
c, ok := m.conns[id]
if !ok {
c = &conn{
id: id,
mux: m,
doneC: make(chan error, 1),
readC: make(chan []byte, m.qlen),
}
m.conns[id] = c
}
return c, nil
}
func (m *mux) Close() error {
m.closeOnce.Do(func() {
m.connLock.Lock()
defer m.connLock.Unlock()
for _, conn := range m.conns {
conn.close()
}
close(m.doneC)
m.trunk.Close()
})
return nil
}
func (m *mux) Dialer(id ConnID) func(string, string) (net.Conn, error) {
return func(string, string) (net.Conn, error) {
return m.Open(id)
}
}
func (m *mux) Listen(id ConnID) (net.Listener, error) {
conn, err := m.Open(id)
if err != nil {
return nil, err
}
return nrinet.NewConnListener(conn), nil
}
func (m *mux) write(id ConnID, buf []byte) (int, error) {
var hdr [headerLen]byte
if len(buf) > maxPayloadSize {
return 0, syscall.EMSGSIZE
}
binary.BigEndian.PutUint32(hdr[0:4], uint32(id))
binary.BigEndian.PutUint32(hdr[4:8], uint32(len(buf)))
m.writeLock.Lock()
defer m.writeLock.Unlock()
n, err := m.trunk.Write(hdr[:])
if err != nil {
err = fmt.Errorf("failed to write header to trunk: %w", err)
if n != 0 {
m.setError(err)
m.Close()
}
return 0, err
}
n, err = m.trunk.Write(buf)
if err != nil {
err = fmt.Errorf("failed to write payload to trunk: %w", err)
if n != 0 {
m.setError(err)
m.Close()
}
}
return n, err
}
func (m *mux) reader() {
var (
hdr [headerLen]byte
cid uint32
cnt uint32
buf []byte
err error
)
<-m.blockC
for {
select {
case <-m.doneC:
return
default:
}
_, err = io.ReadFull(m.trunk, hdr[:])
if err != nil {
switch {
case errors.Is(err, io.EOF):
case errors.Is(err, ttrpc.ErrClosed):
err = io.EOF
case errors.Is(err, ttrpc.ErrServerClosed):
err = io.EOF
case errors.Is(err, net.ErrClosed):
err = io.EOF
default:
err = fmt.Errorf("failed to read header from trunk: %w", err)
}
m.setError(err)
m.Close()
return
}
cid = binary.BigEndian.Uint32(hdr[0:4])
cnt = binary.BigEndian.Uint32(hdr[4:8])
buf = make([]byte, int(cnt))
_, err = io.ReadFull(m.trunk, buf)
if err != nil {
switch {
case errors.Is(err, io.EOF):
case errors.Is(err, ttrpc.ErrClosed):
err = io.EOF
case errors.Is(err, ttrpc.ErrServerClosed):
err = io.EOF
case errors.Is(err, net.ErrClosed):
err = io.EOF
default:
err = fmt.Errorf("failed to read payload from trunk: %w", err)
}
m.setError(err)
m.Close()
return
}
m.connLock.RLock()
conn, ok := m.conns[ConnID(cid)]
m.connLock.RUnlock()
if ok {
select {
case conn.readC <- buf:
default:
m.setError(errors.New("failed to queue payload for reading"))
m.Close()
return
}
}
}
}
func (m *mux) setError(err error) {
m.errOnce.Do(func() {
m.err = err
})
}
// nolint
func (m *mux) error() error {
m.errOnce.Do(func() {
if m.err == nil {
m.err = io.EOF
}
})
return m.err
}
//
// multiplexed connections
//
// Reads reads the next message from the multiplexed connection.
func (c *conn) Read(buf []byte) (int, error) {
var (
msg []byte
err error
ok bool
)
select {
case err, ok = <-c.doneC:
if !ok || err == nil {
err = c.mux.error()
}
return 0, err
case msg, ok = <-c.readC:
if !ok {
return 0, c.mux.error()
}
if cap(buf) < len(msg) {
return 0, syscall.ENOMEM
}
}
copy(buf, msg)
return len(msg), nil
}
// Write writes the given data to the multiplexed connection.
func (c *conn) Write(b []byte) (int, error) {
select {
case err := <-c.doneC:
if err == nil {
err = io.EOF
}
return 0, err
default:
}
return c.mux.write(c.id, b)
}
// Close closes the multiplexed connection.
func (c *conn) Close() error {
c.mux.connLock.Lock()
defer c.mux.connLock.Unlock()
if c.mux.conns[c.id] == c {
delete(c.mux.conns, c.id)
}
return c.close()
}
func (c *conn) close() error {
c.closeOnce.Do(func() {
close(c.doneC)
})
return nil
}
// LocalAddr is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) LocalAddr() net.Addr {
return nil
}
// RemoteAddr is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) RemoteAddr() net.Addr {
return nil
}
// SetDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetDeadline(t time.Time) error {
return nil
}
// SetReadDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetReadDeadline(t time.Time) error {
return nil
}
// SetWriteDeadline is the unimplemented stub for the corresponding net.Conn function.
func (c *conn) SetWriteDeadline(t time.Time) error {
return nil
}

View File

@@ -0,0 +1,24 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multiplex
const (
// PluginServiceConn is the mux connection ID for NRI plugin services.
PluginServiceConn ConnID = iota + 1
// RuntimeServiceConn is the mux connection ID for NRI runtime services.
RuntimeServiceConn
)

View File

@@ -0,0 +1,98 @@
//go:build !windows
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"fmt"
"net"
"os"
syscall "golang.org/x/sys/unix"
)
const (
local = 0
peer = 1
)
// SocketPair contains the file descriptors of a connected pair of sockets.
type SocketPair [2]int
// NewSocketPair returns a connected pair of sockets.
func NewSocketPair() (SocketPair, error) {
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
if err != nil {
return [2]int{-1, -1}, fmt.Errorf("failed to create socketpair: %w", err)
}
return fds, nil
}
// LocalFile returns the socketpair fd for local usage as an *os.File.
func (fds SocketPair) LocalFile() *os.File {
return os.NewFile(uintptr(fds[local]), fds.fileName()+"[0]")
}
// PeerFile returns the socketpair fd for peer usage as an *os.File.
func (fds SocketPair) PeerFile() *os.File {
return os.NewFile(uintptr(fds[peer]), fds.fileName()+"[1]")
}
// LocalConn returns a net.Conn for the local end of the socketpair.
func (fds SocketPair) LocalConn() (net.Conn, error) {
file := fds.LocalFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[0]: %w", fds.fileName(), err)
}
return conn, nil
}
// PeerConn returns a net.Conn for the peer end of the socketpair.
func (fds SocketPair) PeerConn() (net.Conn, error) {
file := fds.PeerFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[1]: %w", fds.fileName(), err)
}
return conn, nil
}
// Close closes both ends of the socketpair.
func (fds SocketPair) Close() {
fds.LocalClose()
fds.PeerClose()
}
// LocalClose closes the local end of the socketpair.
func (fds SocketPair) LocalClose() {
syscall.Close(fds[local])
}
// PeerClose closes the peer end of the socketpair.
func (fds SocketPair) PeerClose() {
syscall.Close(fds[peer])
}
func (fds SocketPair) fileName() string {
return fmt.Sprintf("socketpair-#%d:%d[0]", fds[local], fds[peer])
}

View File

@@ -0,0 +1,198 @@
//go:build windows
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"fmt"
"net"
"os"
"unsafe"
sys "golang.org/x/sys/windows"
)
// SocketPair contains a connected pair of sockets.
type SocketPair [2]sys.Handle
const (
local = 0
peer = 1
)
// NewSocketPair returns a connected pair of sockets.
func NewSocketPair() (SocketPair, error) {
/* return [2]sys.Handle{sys.InvalidHandle, sys.InvalidHandle},
errors.New("failed to emulatesocketpair, unimplemented for windows")*/
// untested: return emulateWithPreConnect()
return emulateWithPreConnect()
}
func emulateWithPreConnect() (SocketPair, error) {
var (
invalid = SocketPair{sys.InvalidHandle, sys.InvalidHandle}
sa sys.SockaddrInet4
//sn sys.Sockaddr
l sys.Handle
a sys.Handle
p sys.Handle
err error
)
l, err = socket(sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (local Socket()): %w", err)
}
defer func() {
if err != nil {
sys.CloseHandle(l)
}
}()
sa.Addr[0] = 127
sa.Addr[3] = 1
sa.Port = 9999
err = sys.Bind(l, &sa)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (Bind()): %w", err)
}
/*sn, err = sys.Getsockname(l)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (Getsockname()): %w", err)
}*/
p, err = socket(sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (peer Socket()): %w", err)
}
defer func() {
if err != nil {
sys.CloseHandle(p)
}
}()
err = sys.Listen(l, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emulate socketpair (Listen()): %w", err)
}
go func() {
err = connect(p, &sa)
}()
a, err = accept(l, sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return invalid, fmt.Errorf("failed to emualte socketpair (Accept()): %w", err)
}
defer func() {
if err != nil {
sys.CloseHandle(a)
}
}()
sys.CloseHandle(l)
return SocketPair{a, p}, nil
}
// Close closes both ends of the socketpair.
func (sp SocketPair) Close() {
sp.LocalClose()
sp.PeerClose()
}
// LocalFile returns the socketpair fd for local usage as an *os.File.
func (sp SocketPair) LocalFile() *os.File {
return os.NewFile(uintptr(sp[local]), sp.fileName()+"[0]")
}
// PeerFile returns the socketpair fd for peer usage as an *os.File.
func (sp SocketPair) PeerFile() *os.File {
return os.NewFile(uintptr(sp[peer]), sp.fileName()+"[1]")
}
// LocalConn returns a net.Conn for the local end of the socketpair.
func (sp SocketPair) LocalConn() (net.Conn, error) {
file := sp.LocalFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[0]: %w", sp.fileName(), err)
}
return conn, nil
}
// PeerConn returns a net.Conn for the peer end of the socketpair.
func (sp SocketPair) PeerConn() (net.Conn, error) {
file := sp.PeerFile()
defer file.Close()
conn, err := net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn for %s[1]: %w", sp.fileName(), err)
}
return conn, nil
}
// LocalClose closes the local end of the socketpair.
func (sp SocketPair) LocalClose() {
sys.CloseHandle(sp[local])
}
// PeerClose closes the peer end of the socketpair.
func (sp SocketPair) PeerClose() {
sys.CloseHandle(sp[peer])
}
func (sp SocketPair) fileName() string {
return fmt.Sprintf("socketpair-#%d:%d[0]", sp[local], sp[peer])
}
func socket(domain, typ, proto int) (sys.Handle, error) {
return sys.WSASocket(int32(domain), int32(typ), int32(proto), nil, 0, sys.WSA_FLAG_OVERLAPPED)
}
func connect(s sys.Handle, sa sys.Sockaddr) error {
o := &sys.Overlapped{}
return sys.ConnectEx(s, sa, nil, 0, nil, o)
}
func accept(l sys.Handle, domain, typ, proto int) (sys.Handle, error) {
var (
a sys.Handle
err error
buf = [1024]byte{}
overlap = sys.Overlapped{}
cnt = uint32(16 + 256)
)
a, err = socket(sys.AF_INET, sys.SOCK_STREAM, 0)
if err != nil {
return sys.InvalidHandle, fmt.Errorf("failed to emulate socketpair (accept): %w", err)
}
err = sys.AcceptEx(l, a, (*byte)(unsafe.Pointer(&buf)), 0, cnt, cnt, &cnt, &overlap)
if err != nil {
return sys.InvalidHandle, fmt.Errorf("failed to emulate socketpair (AcceptEx()): %w", err)
}
return a, nil
}

View File

@@ -18,8 +18,7 @@ package v1
import (
"encoding/json"
"github.com/pkg/errors"
"errors"
)
// Plugin type and configuration