Cleanup shim loading

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2021-10-07 13:53:12 -07:00
parent b554b577b0
commit df8c206a92
2 changed files with 29 additions and 157 deletions

View File

@ -20,14 +20,12 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"path/filepath"
"github.com/containerd/containerd/containers" "github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
@ -94,6 +92,8 @@ func init() {
ID: "task", ID: "task",
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.RuntimeShimPlugin, plugin.RuntimeShimPlugin,
plugin.EventPlugin,
plugin.MetadataPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin) m, err := ic.Get(plugin.MetadataPlugin)
@ -120,6 +120,10 @@ func init() {
return nil, err return nil, err
} }
if err := shimManager.loadExistingTasks(ic.Context); err != nil {
return nil, err
}
// Internally task manager relies on shim manager to launch task shims. // Internally task manager relies on shim manager to launch task shims.
// It's also possible to use shim manager independently and launch other types of shims. // It's also possible to use shim manager independently and launch other types of shims.
// //
@ -293,131 +297,6 @@ func (m *ShimManager) Delete(ctx context.Context, id string) error {
return err return err
} }
func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
nsDirs, err := os.ReadDir(m.state)
if err != nil {
return err
}
for _, nsd := range nsDirs {
if !nsd.IsDir() {
continue
}
ns := nsd.Name()
// skip hidden directories
if len(ns) > 0 && ns[0] == '.' {
continue
}
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue
}
if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
continue
}
}
return nil
}
func (m *ShimManager) loadShims(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
shimDirs, err := os.ReadDir(filepath.Join(m.state, ns))
if err != nil {
return err
}
for _, sd := range shimDirs {
if !sd.IsDir() {
continue
}
id := sd.Name()
// skip hidden directories
if len(id) > 0 && id[0] == '.' {
continue
}
bundle, err := LoadBundle(ctx, m.state, id)
if err != nil {
// fine to return error here, it is a programmer error if the context
// does not have a namespace
return err
}
// fast path
bf, err := os.ReadDir(bundle.Path)
if err != nil {
bundle.Delete()
log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path)
continue
}
if len(bf) == 0 {
bundle.Delete()
continue
}
container, err := m.container(ctx, id)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading container %s", id)
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
}
bundle.Delete()
continue
}
binaryCall := shimBinary(bundle,
shimBinaryConfig{
runtime: container.Runtime.Name,
address: m.containerdAddress,
ttrpcAddress: m.containerdTTRPCAddress,
schedCore: m.schedCore,
})
shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall)
// Remove self from the runtime task list.
m.list.Delete(ctx, id)
})
if err != nil {
cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall)
continue
}
m.list.Add(ctx, shim)
}
return nil
}
func (m *ShimManager) container(ctx context.Context, id string) (*containers.Container, error) {
container, err := m.containers.Get(ctx, id)
if err != nil {
return nil, err
}
return &container, nil
}
func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
dirs, err := os.ReadDir(filepath.Join(m.root, ns))
if err != nil {
return err
}
for _, d := range dirs {
// if the task was not loaded, cleanup and empty working directory
// this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left
if _, err := m.list.Get(ctx, d.Name()); err != nil {
path := filepath.Join(m.root, ns, d.Name())
if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
}
}
}
return nil
}
func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) { func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) {
p := make([]ocispec.Platform, len(platformStr)) p := make([]ocispec.Platform, len(platformStr))
for i, v := range platformStr { for i, v := range platformStr {

View File

@ -21,25 +21,13 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
) )
func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store) error { func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
var ( nsDirs, err := os.ReadDir(m.state)
ctx = ic.Context
state = ic.State
root = ic.Root
containerdAddress = ic.Address
containerdTTRPCAddress = ic.TTRPCAddress
)
nsDirs, err := os.ReadDir(state)
if err != nil { if err != nil {
return err return err
} }
@ -53,25 +41,24 @@ func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *e
continue continue
} }
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := loadShims(namespaces.WithNamespace(ctx, ns), state, list, events, containers, containerdAddress, containerdTTRPCAddress); err != nil { if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue continue
} }
if err := cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), root, list); err != nil { if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
continue continue
} }
} }
return nil return nil
} }
func loadShims(ctx context.Context, state string, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store, containerdAddress string, containerdTTRPCAddress string) error { func (m *ShimManager) loadShims(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return err return err
} }
shimDirs, err := os.ReadDir(filepath.Join(state, ns)) shimDirs, err := os.ReadDir(filepath.Join(m.state, ns))
if err != nil { if err != nil {
return err return err
} }
@ -84,7 +71,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events
if len(id) > 0 && id[0] == '.' { if len(id) > 0 && id[0] == '.' {
continue continue
} }
bundle, err := LoadBundle(ctx, state, id) bundle, err := LoadBundle(ctx, m.state, id)
if err != nil { if err != nil {
// fine to return error here, it is a programmer error if the context // fine to return error here, it is a programmer error if the context
// does not have a namespace // does not have a namespace
@ -101,7 +88,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events
bundle.Delete() bundle.Delete()
continue continue
} }
container, err := containers.Get(ctx, id) container, err := m.containers.Get(ctx, id)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("loading container %s", id) log.G(ctx).WithError(err).Errorf("loading container %s", id)
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
@ -110,29 +97,35 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events
bundle.Delete() bundle.Delete()
continue continue
} }
binaryCall := shimBinary(bundle, container.Runtime.Name, containerdAddress, containerdTTRPCAddress) binaryCall := shimBinary(bundle,
shimBinaryConfig{
runtime: container.Runtime.Name,
address: m.containerdAddress,
ttrpcAddress: m.containerdTTRPCAddress,
schedCore: m.schedCore,
})
shim, err := loadShim(ctx, bundle, func() { shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, list, events, binaryCall) cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall)
// Remove self from the runtime task list. // Remove self from the runtime task list.
list.Delete(ctx, id) m.list.Delete(ctx, id)
}) })
if err != nil { if err != nil {
cleanupAfterDeadShim(ctx, id, ns, list, events, binaryCall) cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall)
continue continue
} }
list.Add(ctx, shim) m.list.Add(ctx, shim)
} }
return nil return nil
} }
func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) error { func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return err return err
} }
dirs, err := os.ReadDir(filepath.Join(root, ns)) dirs, err := os.ReadDir(filepath.Join(m.root, ns))
if err != nil { if err != nil {
return err return err
} }
@ -140,8 +133,8 @@ func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) e
// if the task was not loaded, cleanup and empty working directory // if the task was not loaded, cleanup and empty working directory
// this can happen on a reboot where /run for the bundle state is cleaned up // this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left // but that persistent working dir is left
if _, err := list.Get(ctx, d.Name()); err != nil { if _, err := m.list.Get(ctx, d.Name()); err != nil {
path := filepath.Join(root, ns, d.Name()) path := filepath.Join(m.root, ns, d.Name())
if err := os.RemoveAll(path); err != nil { if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
} }