528 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			528 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
   Copyright The containerd Authors.
 | 
						|
 | 
						|
   Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
   you may not use this file except in compliance with the License.
 | 
						|
   You may obtain a copy of the License at
 | 
						|
 | 
						|
       http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
   Unless required by applicable law or agreed to in writing, software
 | 
						|
   distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
   See the License for the specific language governing permissions and
 | 
						|
   limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package nri
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"path"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/containerd/containerd/log"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
 | 
						|
	"github.com/containerd/containerd/version"
 | 
						|
	nri "github.com/containerd/nri/pkg/adaptation"
 | 
						|
)
 | 
						|
 | 
						|
// API implements a common API for interfacing NRI from containerd. It is
 | 
						|
// agnostic to any internal containerd implementation details of pods and
 | 
						|
// containers. It needs corresponding Domain interfaces for each containerd
 | 
						|
// namespace it needs to handle. These domains take care of the namespace-
 | 
						|
// specific details of providing pod and container metadata to NRI and of
 | 
						|
// applying NRI-requested adjustments to the state of containers.
 | 
						|
type API interface {
 | 
						|
	// IsEnabled returns true if the NRI interface is enabled and initialized.
 | 
						|
	IsEnabled() bool
 | 
						|
 | 
						|
	// Start start the NRI interface, allowing external NRI plugins to
 | 
						|
	// connect, register, and hook themselves into the lifecycle events
 | 
						|
	// of pods and containers.
 | 
						|
	Start() error
 | 
						|
 | 
						|
	// Stop stops the NRI interface.
 | 
						|
	Stop()
 | 
						|
 | 
						|
	// RunPodSandbox relays pod creation events to NRI.
 | 
						|
	RunPodSandbox(context.Context, PodSandbox) error
 | 
						|
 | 
						|
	// StopPodSandbox relays pod shutdown events to NRI.
 | 
						|
	StopPodSandbox(context.Context, PodSandbox) error
 | 
						|
 | 
						|
	// RemovePodSandbox relays pod removal events to NRI.
 | 
						|
	RemovePodSandbox(context.Context, PodSandbox) error
 | 
						|
 | 
						|
	// CreateContainer relays container creation requests to NRI.
 | 
						|
	CreateContainer(context.Context, PodSandbox, Container) (*nri.ContainerAdjustment, error)
 | 
						|
 | 
						|
	// PostCreateContainer relays successful container creation events to NRI.
 | 
						|
	PostCreateContainer(context.Context, PodSandbox, Container) error
 | 
						|
 | 
						|
	// StartContainer relays container start request notifications to NRI.
 | 
						|
	StartContainer(context.Context, PodSandbox, Container) error
 | 
						|
 | 
						|
	// PostStartContainer relays successful container startup events to NRI.
 | 
						|
	PostStartContainer(context.Context, PodSandbox, Container) error
 | 
						|
 | 
						|
	// UpdateContainer relays container update requests to NRI.
 | 
						|
	UpdateContainer(context.Context, PodSandbox, Container, *nri.LinuxResources) (*nri.LinuxResources, error)
 | 
						|
 | 
						|
	// PostUpdateContainer relays successful container update events to NRI.
 | 
						|
	PostUpdateContainer(context.Context, PodSandbox, Container) error
 | 
						|
 | 
						|
	// StopContainer relays container stop requests to NRI.
 | 
						|
	StopContainer(context.Context, PodSandbox, Container) error
 | 
						|
 | 
						|
	// NotifyContainerExit handles the exit event of a container.
 | 
						|
	NotifyContainerExit(context.Context, PodSandbox, Container)
 | 
						|
 | 
						|
	// RemoveContainer relays container removal events to NRI.
 | 
						|
	RemoveContainer(context.Context, PodSandbox, Container) error
 | 
						|
}
 | 
						|
 | 
						|
type State int
 | 
						|
 | 
						|
const (
 | 
						|
	Created State = iota + 1
 | 
						|
	Running
 | 
						|
	Stopped
 | 
						|
	Removed
 | 
						|
)
 | 
						|
 | 
						|
type local struct {
 | 
						|
	sync.Mutex
 | 
						|
	cfg *Config
 | 
						|
	nri *nri.Adaptation
 | 
						|
 | 
						|
	state map[string]State
 | 
						|
}
 | 
						|
 | 
						|
var _ API = &local{}
 | 
						|
 | 
						|
// New creates an instance of the NRI interface with the given configuration.
 | 
						|
func New(cfg *Config) (API, error) {
 | 
						|
	l := &local{
 | 
						|
		cfg: cfg,
 | 
						|
	}
 | 
						|
 | 
						|
	if cfg.Disable {
 | 
						|
		logrus.Info("NRI interface is disabled by configuration.")
 | 
						|
		return l, nil
 | 
						|
	}
 | 
						|
 | 
						|
	var (
 | 
						|
		name     = path.Base(version.Package)
 | 
						|
		version  = version.Version
 | 
						|
		opts     = cfg.toOptions()
 | 
						|
		syncFn   = l.syncPlugin
 | 
						|
		updateFn = l.updateFromPlugin
 | 
						|
		err      error
 | 
						|
	)
 | 
						|
 | 
						|
	cfg.ConfigureTimeouts()
 | 
						|
 | 
						|
	l.nri, err = nri.New(name, version, syncFn, updateFn, opts...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to initialize NRI interface: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	l.state = make(map[string]State)
 | 
						|
 | 
						|
	logrus.Info("created NRI interface")
 | 
						|
 | 
						|
	return l, nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) IsEnabled() bool {
 | 
						|
	return l != nil && !l.cfg.Disable
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) Start() error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	err := l.nri.Start()
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to start NRI interface: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) Stop() {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	l.nri.Stop()
 | 
						|
	l.nri = nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) RunPodSandbox(ctx context.Context, pod PodSandbox) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	request := &nri.RunPodSandboxRequest{
 | 
						|
		Pod: podSandboxToNRI(pod),
 | 
						|
	}
 | 
						|
 | 
						|
	err := l.nri.RunPodSandbox(ctx, request)
 | 
						|
	l.setState(pod.GetID(), Running)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) StopPodSandbox(ctx context.Context, pod PodSandbox) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	if !l.needsStopping(pod.GetID()) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	request := &nri.StopPodSandboxRequest{
 | 
						|
		Pod: podSandboxToNRI(pod),
 | 
						|
	}
 | 
						|
 | 
						|
	err := l.nri.StopPodSandbox(ctx, request)
 | 
						|
	l.setState(pod.GetID(), Stopped)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) RemovePodSandbox(ctx context.Context, pod PodSandbox) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	if !l.needsRemoval(pod.GetID()) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	request := &nri.RemovePodSandboxRequest{
 | 
						|
		Pod: podSandboxToNRI(pod),
 | 
						|
	}
 | 
						|
 | 
						|
	err := l.nri.RemovePodSandbox(ctx, request)
 | 
						|
	l.setState(pod.GetID(), Removed)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) CreateContainer(ctx context.Context, pod PodSandbox, ctr Container) (*nri.ContainerAdjustment, error) {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	request := &nri.CreateContainerRequest{
 | 
						|
		Pod:       podSandboxToNRI(pod),
 | 
						|
		Container: containerToNRI(ctr),
 | 
						|
	}
 | 
						|
 | 
						|
	response, err := l.nri.CreateContainer(ctx, request)
 | 
						|
	l.setState(request.Container.Id, Created)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	_, err = l.evictContainers(ctx, response.Evict)
 | 
						|
	if err != nil {
 | 
						|
		// TODO(klihub): we ignore pre-create eviction failures for now
 | 
						|
		log.G(ctx).WithError(err).Warnf("pre-create eviction failed")
 | 
						|
	}
 | 
						|
 | 
						|
	if _, err := l.applyUpdates(ctx, response.Update); err != nil {
 | 
						|
		// TODO(klihub): we ignore pre-create update failures for now
 | 
						|
		log.G(ctx).WithError(err).Warnf("pre-create update failed")
 | 
						|
	}
 | 
						|
 | 
						|
	return response.Adjust, nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) PostCreateContainer(ctx context.Context, pod PodSandbox, ctr Container) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	request := &nri.PostCreateContainerRequest{
 | 
						|
		Pod:       podSandboxToNRI(pod),
 | 
						|
		Container: containerToNRI(ctr),
 | 
						|
	}
 | 
						|
 | 
						|
	return l.nri.PostCreateContainer(ctx, request)
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) StartContainer(ctx context.Context, pod PodSandbox, ctr Container) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	request := &nri.StartContainerRequest{
 | 
						|
		Pod:       podSandboxToNRI(pod),
 | 
						|
		Container: containerToNRI(ctr),
 | 
						|
	}
 | 
						|
 | 
						|
	err := l.nri.StartContainer(ctx, request)
 | 
						|
	l.setState(request.Container.Id, Running)
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) PostStartContainer(ctx context.Context, pod PodSandbox, ctr Container) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	request := &nri.PostStartContainerRequest{
 | 
						|
		Pod:       podSandboxToNRI(pod),
 | 
						|
		Container: containerToNRI(ctr),
 | 
						|
	}
 | 
						|
 | 
						|
	return l.nri.PostStartContainer(ctx, request)
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) UpdateContainer(ctx context.Context, pod PodSandbox, ctr Container, req *nri.LinuxResources) (*nri.LinuxResources, error) {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	request := &nri.UpdateContainerRequest{
 | 
						|
		Pod:            podSandboxToNRI(pod),
 | 
						|
		Container:      containerToNRI(ctr),
 | 
						|
		LinuxResources: req,
 | 
						|
	}
 | 
						|
 | 
						|
	response, err := l.nri.UpdateContainer(ctx, request)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	_, err = l.evictContainers(ctx, response.Evict)
 | 
						|
	if err != nil {
 | 
						|
		// TODO(klihub): we ignore pre-update eviction failures for now
 | 
						|
		log.G(ctx).WithError(err).Warnf("pre-update eviction failed")
 | 
						|
	}
 | 
						|
 | 
						|
	cnt := len(response.Update)
 | 
						|
	if cnt == 0 {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if cnt > 1 {
 | 
						|
		_, err = l.applyUpdates(ctx, response.Update[0:cnt-1])
 | 
						|
		if err != nil {
 | 
						|
			// TODO(klihub): we ignore pre-update update failures for now
 | 
						|
			log.G(ctx).WithError(err).Warnf("pre-update update failed")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return response.Update[cnt-1].GetLinux().GetResources(), nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) PostUpdateContainer(ctx context.Context, pod PodSandbox, ctr Container) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	request := &nri.PostUpdateContainerRequest{
 | 
						|
		Pod:       podSandboxToNRI(pod),
 | 
						|
		Container: containerToNRI(ctr),
 | 
						|
	}
 | 
						|
 | 
						|
	return l.nri.PostUpdateContainer(ctx, request)
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) StopContainer(ctx context.Context, pod PodSandbox, ctr Container) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	return l.stopContainer(ctx, pod, ctr)
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) NotifyContainerExit(ctx context.Context, pod PodSandbox, ctr Container) {
 | 
						|
	go func() {
 | 
						|
		l.Lock()
 | 
						|
		defer l.Unlock()
 | 
						|
		l.stopContainer(ctx, pod, ctr)
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) stopContainer(ctx context.Context, pod PodSandbox, ctr Container) error {
 | 
						|
	if !l.needsStopping(ctr.GetID()) {
 | 
						|
		log.G(ctx).Tracef("NRI stopContainer: container %s does not need stopping",
 | 
						|
			ctr.GetID())
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	request := &nri.StopContainerRequest{
 | 
						|
		Pod:       podSandboxToNRI(pod),
 | 
						|
		Container: containerToNRI(ctr),
 | 
						|
	}
 | 
						|
 | 
						|
	response, err := l.nri.StopContainer(ctx, request)
 | 
						|
	l.setState(request.Container.Id, Stopped)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	_, err = l.applyUpdates(ctx, response.Update)
 | 
						|
	if err != nil {
 | 
						|
		// TODO(klihub): we ignore post-stop update failures for now
 | 
						|
		log.G(ctx).WithError(err).Warnf("post-stop update failed")
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) RemoveContainer(ctx context.Context, pod PodSandbox, ctr Container) error {
 | 
						|
	if !l.IsEnabled() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	if !l.needsRemoval(ctr.GetID()) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	l.stopContainer(ctx, pod, ctr)
 | 
						|
 | 
						|
	request := &nri.RemoveContainerRequest{
 | 
						|
		Pod:       podSandboxToNRI(pod),
 | 
						|
		Container: containerToNRI(ctr),
 | 
						|
	}
 | 
						|
	err := l.nri.RemoveContainer(ctx, request)
 | 
						|
	l.setState(request.Container.Id, Removed)
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) syncPlugin(ctx context.Context, syncFn nri.SyncCB) error {
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	log.G(ctx).Info("Synchronizing NRI (plugin) with current runtime state")
 | 
						|
 | 
						|
	pods := podSandboxesToNRI(domains.listPodSandboxes())
 | 
						|
	containers := containersToNRI(domains.listContainers())
 | 
						|
 | 
						|
	for _, ctr := range containers {
 | 
						|
		switch ctr.GetState() {
 | 
						|
		case nri.ContainerState_CONTAINER_CREATED:
 | 
						|
			l.setState(ctr.GetId(), Created)
 | 
						|
		case nri.ContainerState_CONTAINER_RUNNING, nri.ContainerState_CONTAINER_PAUSED:
 | 
						|
			l.setState(ctr.GetId(), Running)
 | 
						|
		case nri.ContainerState_CONTAINER_STOPPED:
 | 
						|
			l.setState(ctr.GetId(), Stopped)
 | 
						|
		default:
 | 
						|
			l.setState(ctr.GetId(), Removed)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	updates, err := syncFn(ctx, pods, containers)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	_, err = l.applyUpdates(ctx, updates)
 | 
						|
	if err != nil {
 | 
						|
		// TODO(klihub): we ignore post-sync update failures for now
 | 
						|
		log.G(ctx).WithError(err).Warnf("post-sync update failed")
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) updateFromPlugin(ctx context.Context, req []*nri.ContainerUpdate) ([]*nri.ContainerUpdate, error) {
 | 
						|
	l.Lock()
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	log.G(ctx).Trace("Unsolicited NRI container updates")
 | 
						|
 | 
						|
	failed, err := l.applyUpdates(ctx, req)
 | 
						|
	return failed, err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) applyUpdates(ctx context.Context, updates []*nri.ContainerUpdate) ([]*nri.ContainerUpdate, error) {
 | 
						|
	// TODO(klihub): should we pre-save state and attempt a rollback on failure ?
 | 
						|
	failed, err := domains.updateContainers(ctx, updates)
 | 
						|
	return failed, err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) evictContainers(ctx context.Context, evict []*nri.ContainerEviction) ([]*nri.ContainerEviction, error) {
 | 
						|
	failed, err := domains.evictContainers(ctx, evict)
 | 
						|
	return failed, err
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) setState(id string, state State) {
 | 
						|
	if state != Removed {
 | 
						|
		l.state[id] = state
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	delete(l.state, id)
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) getState(id string) State {
 | 
						|
	if state, ok := l.state[id]; ok {
 | 
						|
		return state
 | 
						|
	}
 | 
						|
 | 
						|
	return Removed
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) needsStopping(id string) bool {
 | 
						|
	s := l.getState(id)
 | 
						|
	if s == Created || s == Running {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
func (l *local) needsRemoval(id string) bool {
 | 
						|
	s := l.getState(id)
 | 
						|
	if s == Created || s == Running || s == Stopped {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 |