326 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			326 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package v2
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io/ioutil"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 
 | |
| 	"github.com/containerd/containerd/containers"
 | |
| 	"github.com/containerd/containerd/errdefs"
 | |
| 	"github.com/containerd/containerd/events/exchange"
 | |
| 	"github.com/containerd/containerd/log"
 | |
| 	"github.com/containerd/containerd/metadata"
 | |
| 	"github.com/containerd/containerd/mount"
 | |
| 	"github.com/containerd/containerd/namespaces"
 | |
| 	"github.com/containerd/containerd/pkg/timeout"
 | |
| 	"github.com/containerd/containerd/platforms"
 | |
| 	"github.com/containerd/containerd/plugin"
 | |
| 	"github.com/containerd/containerd/runtime"
 | |
| 	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
 | |
| )
 | |
| 
 | |
| // Config for the v2 runtime
 | |
| type Config struct {
 | |
| 	// Supported platforms
 | |
| 	Platforms []string `toml:"platforms"`
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	plugin.Register(&plugin.Registration{
 | |
| 		Type: plugin.RuntimePluginV2,
 | |
| 		ID:   "task",
 | |
| 		Requires: []plugin.Type{
 | |
| 			plugin.MetadataPlugin,
 | |
| 		},
 | |
| 		Config: &Config{
 | |
| 			Platforms: defaultPlatforms(),
 | |
| 		},
 | |
| 		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 | |
| 			supportedPlatforms, err := parsePlatforms(ic.Config.(*Config).Platforms)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			ic.Meta.Platforms = supportedPlatforms
 | |
| 			if err := os.MkdirAll(ic.Root, 0711); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			if err := os.MkdirAll(ic.State, 0711); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			m, err := ic.Get(plugin.MetadataPlugin)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			cs := metadata.NewContainerStore(m.(*metadata.DB))
 | |
| 
 | |
| 			return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs)
 | |
| 		},
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // New task manager for v2 shims
 | |
| func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, cs containers.Store) (*TaskManager, error) {
 | |
| 	for _, d := range []string{root, state} {
 | |
| 		if err := os.MkdirAll(d, 0711); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	m := &TaskManager{
 | |
| 		root:                   root,
 | |
| 		state:                  state,
 | |
| 		containerdAddress:      containerdAddress,
 | |
| 		containerdTTRPCAddress: containerdTTRPCAddress,
 | |
| 		tasks:                  runtime.NewTaskList(),
 | |
| 		events:                 events,
 | |
| 		containers:             cs,
 | |
| 	}
 | |
| 	if err := m.loadExistingTasks(ctx); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return m, nil
 | |
| }
 | |
| 
 | |
| // TaskManager manages v2 shim's and their tasks
 | |
| type TaskManager struct {
 | |
| 	root                   string
 | |
| 	state                  string
 | |
| 	containerdAddress      string
 | |
| 	containerdTTRPCAddress string
 | |
| 
 | |
| 	tasks      *runtime.TaskList
 | |
| 	events     *exchange.Exchange
 | |
| 	containers containers.Store
 | |
| }
 | |
| 
 | |
| // ID of the task manager
 | |
| func (m *TaskManager) ID() string {
 | |
| 	return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task")
 | |
| }
 | |
| 
 | |
| // Create a new task
 | |
| func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
 | |
| 	ns, err := namespaces.NamespaceRequired(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		if err != nil {
 | |
| 			bundle.Delete()
 | |
| 		}
 | |
| 	}()
 | |
| 	topts := opts.TaskOptions
 | |
| 	if topts == nil {
 | |
| 		topts = opts.RuntimeOptions
 | |
| 	}
 | |
| 
 | |
| 	b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
 | |
| 	shim, err := b.Start(ctx, topts, func() {
 | |
| 		log.G(ctx).WithField("id", id).Info("shim disconnected")
 | |
| 
 | |
| 		cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b)
 | |
| 		// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
 | |
| 		// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
 | |
| 		// disconnect and there is no chance to remove this dead task from runtime task lists.
 | |
| 		// Thus it's better to delete it here.
 | |
| 		m.tasks.Delete(ctx, id)
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		if err != nil {
 | |
| 			dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
 | |
| 			defer cancel()
 | |
| 			_, errShim := shim.Delete(dctx)
 | |
| 			if errShim != nil {
 | |
| 				if errdefs.IsDeadlineExceeded(errShim) {
 | |
| 					dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
 | |
| 					defer cancel()
 | |
| 				}
 | |
| 				shim.Shutdown(dctx)
 | |
| 				shim.Close()
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	t, err := shim.Create(ctx, opts)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	m.tasks.Add(ctx, t)
 | |
| 	return t, nil
 | |
| }
 | |
| 
 | |
| // Get a specific task
 | |
| func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
 | |
| 	return m.tasks.Get(ctx, id)
 | |
| }
 | |
| 
 | |
| // Add a runtime task
 | |
| func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error {
 | |
| 	return m.tasks.Add(ctx, task)
 | |
| }
 | |
| 
 | |
| // Delete a runtime task
 | |
| func (m *TaskManager) Delete(ctx context.Context, id string) {
 | |
| 	m.tasks.Delete(ctx, id)
 | |
| }
 | |
| 
 | |
| // Tasks lists all tasks
 | |
| func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
 | |
| 	return m.tasks.GetAll(ctx, all)
 | |
| }
 | |
| 
 | |
| func (m *TaskManager) loadExistingTasks(ctx context.Context) error {
 | |
| 	nsDirs, err := ioutil.ReadDir(m.state)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	for _, nsd := range nsDirs {
 | |
| 		if !nsd.IsDir() {
 | |
| 			continue
 | |
| 		}
 | |
| 		ns := nsd.Name()
 | |
| 		// skip hidden directories
 | |
| 		if len(ns) > 0 && ns[0] == '.' {
 | |
| 			continue
 | |
| 		}
 | |
| 		log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
 | |
| 		if err := m.loadTasks(namespaces.WithNamespace(ctx, ns)); err != nil {
 | |
| 			log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
 | |
| 			continue
 | |
| 		}
 | |
| 		if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
 | |
| 			log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
 | |
| 			continue
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *TaskManager) loadTasks(ctx context.Context) error {
 | |
| 	ns, err := namespaces.NamespaceRequired(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	shimDirs, err := ioutil.ReadDir(filepath.Join(m.state, ns))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	for _, sd := range shimDirs {
 | |
| 		if !sd.IsDir() {
 | |
| 			continue
 | |
| 		}
 | |
| 		id := sd.Name()
 | |
| 		// skip hidden directories
 | |
| 		if len(id) > 0 && id[0] == '.' {
 | |
| 			continue
 | |
| 		}
 | |
| 		bundle, err := LoadBundle(ctx, m.state, id)
 | |
| 		if err != nil {
 | |
| 			// fine to return error here, it is a programmer error if the context
 | |
| 			// does not have a namespace
 | |
| 			return err
 | |
| 		}
 | |
| 		// fast path
 | |
| 		bf, err := ioutil.ReadDir(bundle.Path)
 | |
| 		if err != nil {
 | |
| 			bundle.Delete()
 | |
| 			log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path)
 | |
| 			continue
 | |
| 		}
 | |
| 		if len(bf) == 0 {
 | |
| 			bundle.Delete()
 | |
| 			continue
 | |
| 		}
 | |
| 		container, err := m.container(ctx, id)
 | |
| 		if err != nil {
 | |
| 			log.G(ctx).WithError(err).Errorf("loading container %s", id)
 | |
| 			if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
 | |
| 				log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
 | |
| 			}
 | |
| 			bundle.Delete()
 | |
| 			continue
 | |
| 		}
 | |
| 		binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
 | |
| 		shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
 | |
| 			log.G(ctx).WithField("id", id).Info("shim disconnected")
 | |
| 
 | |
| 			cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)
 | |
| 			// Remove self from the runtime task list.
 | |
| 			m.tasks.Delete(ctx, id)
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall)
 | |
| 			continue
 | |
| 		}
 | |
| 		m.tasks.Add(ctx, shim)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) {
 | |
| 	container, err := m.containers.Get(ctx, id)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &container, nil
 | |
| }
 | |
| 
 | |
| func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error {
 | |
| 	ns, err := namespaces.NamespaceRequired(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	dirs, err := ioutil.ReadDir(filepath.Join(m.root, ns))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	for _, d := range dirs {
 | |
| 		// if the task was not loaded, cleanup and empty working directory
 | |
| 		// this can happen on a reboot where /run for the bundle state is cleaned up
 | |
| 		// but that persistent working dir is left
 | |
| 		if _, err := m.tasks.Get(ctx, d.Name()); err != nil {
 | |
| 			path := filepath.Join(m.root, ns, d.Name())
 | |
| 			if err := os.RemoveAll(path); err != nil {
 | |
| 				log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) {
 | |
| 	p := make([]ocispec.Platform, len(platformStr))
 | |
| 	for i, v := range platformStr {
 | |
| 		parsed, err := platforms.Parse(v)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		p[i] = parsed
 | |
| 	}
 | |
| 	return p, nil
 | |
| }
 | 
