349 lines
9.5 KiB
Go
349 lines
9.5 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package v2
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"github.com/containerd/containerd/containers"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/events/exchange"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/metadata"
|
|
"github.com/containerd/containerd/mount"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/containerd/pkg/timeout"
|
|
"github.com/containerd/containerd/platforms"
|
|
"github.com/containerd/containerd/plugin"
|
|
"github.com/containerd/containerd/runtime"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// Config for the v2 runtime
|
|
type Config struct {
|
|
// Supported platforms
|
|
Platforms []string `toml:"platforms"`
|
|
}
|
|
|
|
func init() {
|
|
plugin.Register(&plugin.Registration{
|
|
Type: plugin.RuntimePluginV2,
|
|
ID: "task",
|
|
Requires: []plugin.Type{
|
|
plugin.MetadataPlugin,
|
|
},
|
|
Config: &Config{
|
|
Platforms: defaultPlatforms(),
|
|
},
|
|
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
|
supportedPlatforms, err := parsePlatforms(ic.Config.(*Config).Platforms)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ic.Meta.Platforms = supportedPlatforms
|
|
if err := os.MkdirAll(ic.Root, 0711); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := os.MkdirAll(ic.State, 0711); err != nil {
|
|
return nil, err
|
|
}
|
|
m, err := ic.Get(plugin.MetadataPlugin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cs := metadata.NewContainerStore(m.(*metadata.DB))
|
|
|
|
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs)
|
|
},
|
|
})
|
|
}
|
|
|
|
// New task manager for v2 shims
|
|
func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, cs containers.Store) (*TaskManager, error) {
|
|
for _, d := range []string{root, state} {
|
|
if err := os.MkdirAll(d, 0711); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
m := &TaskManager{
|
|
root: root,
|
|
state: state,
|
|
containerdAddress: containerdAddress,
|
|
containerdTTRPCAddress: containerdTTRPCAddress,
|
|
tasks: runtime.NewTaskList(),
|
|
events: events,
|
|
containers: cs,
|
|
}
|
|
if err := m.loadExistingTasks(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
// TaskManager manages v2 shim's and their tasks
|
|
type TaskManager struct {
|
|
root string
|
|
state string
|
|
containerdAddress string
|
|
containerdTTRPCAddress string
|
|
|
|
tasks *runtime.TaskList
|
|
events *exchange.Exchange
|
|
containers containers.Store
|
|
}
|
|
|
|
// ID of the task manager
|
|
func (m *TaskManager) ID() string {
|
|
return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task")
|
|
}
|
|
|
|
// Create a new task
|
|
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
|
|
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if retErr != nil {
|
|
bundle.Delete()
|
|
}
|
|
}()
|
|
|
|
shim, err := m.startShim(ctx, bundle, id, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if retErr != nil {
|
|
m.deleteShim(shim)
|
|
}
|
|
}()
|
|
|
|
t, err := shim.Create(ctx, opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create shim")
|
|
}
|
|
|
|
if err := m.tasks.Add(ctx, t); err != nil {
|
|
return nil, errors.Wrap(err, "failed to add task")
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
|
|
ns, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
topts := opts.TaskOptions
|
|
if topts == nil {
|
|
topts = opts.RuntimeOptions
|
|
}
|
|
|
|
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
|
shim, err := b.Start(ctx, topts, func() {
|
|
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
|
|
|
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b)
|
|
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
|
|
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
|
|
// disconnect and there is no chance to remove this dead task from runtime task lists.
|
|
// Thus it's better to delete it here.
|
|
m.tasks.Delete(ctx, id)
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "start failed")
|
|
}
|
|
|
|
return shim, nil
|
|
}
|
|
|
|
// deleteShim attempts to properly delete and cleanup shim after error
|
|
func (m *TaskManager) deleteShim(shim *shim) {
|
|
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
|
|
defer cancel()
|
|
|
|
_, errShim := shim.Delete(dctx)
|
|
if errShim != nil {
|
|
if errdefs.IsDeadlineExceeded(errShim) {
|
|
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
|
|
defer cancel()
|
|
}
|
|
shim.Shutdown(dctx)
|
|
shim.Close()
|
|
}
|
|
}
|
|
|
|
// Get a specific task
|
|
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
|
|
return m.tasks.Get(ctx, id)
|
|
}
|
|
|
|
// Add a runtime task
|
|
func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error {
|
|
return m.tasks.Add(ctx, task)
|
|
}
|
|
|
|
// Delete a runtime task
|
|
func (m *TaskManager) Delete(ctx context.Context, id string) {
|
|
m.tasks.Delete(ctx, id)
|
|
}
|
|
|
|
// Tasks lists all tasks
|
|
func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
|
|
return m.tasks.GetAll(ctx, all)
|
|
}
|
|
|
|
func (m *TaskManager) loadExistingTasks(ctx context.Context) error {
|
|
nsDirs, err := ioutil.ReadDir(m.state)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, nsd := range nsDirs {
|
|
if !nsd.IsDir() {
|
|
continue
|
|
}
|
|
ns := nsd.Name()
|
|
// skip hidden directories
|
|
if len(ns) > 0 && ns[0] == '.' {
|
|
continue
|
|
}
|
|
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
|
|
if err := m.loadTasks(namespaces.WithNamespace(ctx, ns)); err != nil {
|
|
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
|
|
continue
|
|
}
|
|
if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
|
|
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
|
|
continue
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *TaskManager) loadTasks(ctx context.Context) error {
|
|
ns, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
shimDirs, err := ioutil.ReadDir(filepath.Join(m.state, ns))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, sd := range shimDirs {
|
|
if !sd.IsDir() {
|
|
continue
|
|
}
|
|
id := sd.Name()
|
|
// skip hidden directories
|
|
if len(id) > 0 && id[0] == '.' {
|
|
continue
|
|
}
|
|
bundle, err := LoadBundle(ctx, m.state, id)
|
|
if err != nil {
|
|
// fine to return error here, it is a programmer error if the context
|
|
// does not have a namespace
|
|
return err
|
|
}
|
|
// fast path
|
|
bf, err := ioutil.ReadDir(bundle.Path)
|
|
if err != nil {
|
|
bundle.Delete()
|
|
log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path)
|
|
continue
|
|
}
|
|
if len(bf) == 0 {
|
|
bundle.Delete()
|
|
continue
|
|
}
|
|
container, err := m.container(ctx, id)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Errorf("loading container %s", id)
|
|
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
|
|
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
|
|
}
|
|
bundle.Delete()
|
|
continue
|
|
}
|
|
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
|
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
|
|
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
|
|
|
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)
|
|
// Remove self from the runtime task list.
|
|
m.tasks.Delete(ctx, id)
|
|
})
|
|
if err != nil {
|
|
cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall)
|
|
continue
|
|
}
|
|
m.tasks.Add(ctx, shim)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) {
|
|
container, err := m.containers.Get(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &container, nil
|
|
}
|
|
|
|
func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error {
|
|
ns, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dirs, err := ioutil.ReadDir(filepath.Join(m.root, ns))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, d := range dirs {
|
|
// if the task was not loaded, cleanup and empty working directory
|
|
// this can happen on a reboot where /run for the bundle state is cleaned up
|
|
// but that persistent working dir is left
|
|
if _, err := m.tasks.Get(ctx, d.Name()); err != nil {
|
|
path := filepath.Join(m.root, ns, d.Name())
|
|
if err := os.RemoveAll(path); err != nil {
|
|
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) {
|
|
p := make([]ocispec.Platform, len(platformStr))
|
|
for i, v := range platformStr {
|
|
parsed, err := platforms.Parse(v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p[i] = parsed
|
|
}
|
|
return p, nil
|
|
}
|