From df8c206a927986a6953586c556b01f594b951821 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Thu, 7 Oct 2021 13:53:12 -0700 Subject: [PATCH] Cleanup shim loading Signed-off-by: Maksym Pavlenko --- runtime/v2/manager.go | 133 ++-------------------------------------- runtime/v2/shim_load.go | 53 +++++++--------- 2 files changed, 29 insertions(+), 157 deletions(-) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 7a4796743..76477dc89 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -20,14 +20,12 @@ import ( "context" "fmt" "os" - "path/filepath" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" - "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/platforms" @@ -94,6 +92,8 @@ func init() { ID: "task", Requires: []plugin.Type{ plugin.RuntimeShimPlugin, + plugin.EventPlugin, + plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { m, err := ic.Get(plugin.MetadataPlugin) @@ -120,6 +120,10 @@ func init() { return nil, err } + if err := shimManager.loadExistingTasks(ic.Context); err != nil { + return nil, err + } + // Internally task manager relies on shim manager to launch task shims. // It's also possible to use shim manager independently and launch other types of shims. // @@ -293,131 +297,6 @@ func (m *ShimManager) Delete(ctx context.Context, id string) error { return err } -func (m *ShimManager) loadExistingTasks(ctx context.Context) error { - nsDirs, err := os.ReadDir(m.state) - if err != nil { - return err - } - for _, nsd := range nsDirs { - if !nsd.IsDir() { - continue - } - ns := nsd.Name() - // skip hidden directories - if len(ns) > 0 && ns[0] == '.' { - continue - } - log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") - if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { - log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") - continue - } - if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil { - log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") - continue - } - } - return nil -} - -func (m *ShimManager) loadShims(ctx context.Context) error { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) - if err != nil { - return err - } - for _, sd := range shimDirs { - if !sd.IsDir() { - continue - } - id := sd.Name() - // skip hidden directories - if len(id) > 0 && id[0] == '.' { - continue - } - bundle, err := LoadBundle(ctx, m.state, id) - if err != nil { - // fine to return error here, it is a programmer error if the context - // does not have a namespace - return err - } - // fast path - bf, err := os.ReadDir(bundle.Path) - if err != nil { - bundle.Delete() - log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path) - continue - } - if len(bf) == 0 { - bundle.Delete() - continue - } - container, err := m.container(ctx, id) - if err != nil { - log.G(ctx).WithError(err).Errorf("loading container %s", id) - if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { - log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id) - } - bundle.Delete() - continue - } - binaryCall := shimBinary(bundle, - shimBinaryConfig{ - runtime: container.Runtime.Name, - address: m.containerdAddress, - ttrpcAddress: m.containerdTTRPCAddress, - schedCore: m.schedCore, - }) - shim, err := loadShim(ctx, bundle, func() { - log.G(ctx).WithField("id", id).Info("shim disconnected") - - cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall) - // Remove self from the runtime task list. - m.list.Delete(ctx, id) - }) - if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall) - continue - } - m.list.Add(ctx, shim) - } - return nil -} - -func (m *ShimManager) container(ctx context.Context, id string) (*containers.Container, error) { - container, err := m.containers.Get(ctx, id) - if err != nil { - return nil, err - } - return &container, nil -} - -func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - dirs, err := os.ReadDir(filepath.Join(m.root, ns)) - if err != nil { - return err - } - for _, d := range dirs { - // if the task was not loaded, cleanup and empty working directory - // this can happen on a reboot where /run for the bundle state is cleaned up - // but that persistent working dir is left - if _, err := m.list.Get(ctx, d.Name()); err != nil { - path := filepath.Join(m.root, ns, d.Name()) - if err := os.RemoveAll(path); err != nil { - log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) - } - } - } - return nil -} - func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) { p := make([]ocispec.Platform, len(platformStr)) for i, v := range platformStr { diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go index 9dcfccf09..8eed0f467 100644 --- a/runtime/v2/shim_load.go +++ b/runtime/v2/shim_load.go @@ -21,25 +21,13 @@ import ( "os" "path/filepath" - "github.com/containerd/containerd/containers" - "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/plugin" - "github.com/containerd/containerd/runtime" ) -func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store) error { - var ( - ctx = ic.Context - state = ic.State - root = ic.Root - containerdAddress = ic.Address - containerdTTRPCAddress = ic.TTRPCAddress - ) - - nsDirs, err := os.ReadDir(state) +func (m *ShimManager) loadExistingTasks(ctx context.Context) error { + nsDirs, err := os.ReadDir(m.state) if err != nil { return err } @@ -53,25 +41,24 @@ func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *e continue } log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") - if err := loadShims(namespaces.WithNamespace(ctx, ns), state, list, events, containers, containerdAddress, containerdTTRPCAddress); err != nil { + if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") continue } - if err := cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), root, list); err != nil { + if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") continue } } - return nil } -func loadShims(ctx context.Context, state string, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store, containerdAddress string, containerdTTRPCAddress string) error { +func (m *ShimManager) loadShims(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err } - shimDirs, err := os.ReadDir(filepath.Join(state, ns)) + shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) if err != nil { return err } @@ -84,7 +71,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events if len(id) > 0 && id[0] == '.' { continue } - bundle, err := LoadBundle(ctx, state, id) + bundle, err := LoadBundle(ctx, m.state, id) if err != nil { // fine to return error here, it is a programmer error if the context // does not have a namespace @@ -101,7 +88,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events bundle.Delete() continue } - container, err := containers.Get(ctx, id) + container, err := m.containers.Get(ctx, id) if err != nil { log.G(ctx).WithError(err).Errorf("loading container %s", id) if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { @@ -110,29 +97,35 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events bundle.Delete() continue } - binaryCall := shimBinary(bundle, container.Runtime.Name, containerdAddress, containerdTTRPCAddress) + binaryCall := shimBinary(bundle, + shimBinaryConfig{ + runtime: container.Runtime.Name, + address: m.containerdAddress, + ttrpcAddress: m.containerdTTRPCAddress, + schedCore: m.schedCore, + }) shim, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, list, events, binaryCall) + cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall) // Remove self from the runtime task list. - list.Delete(ctx, id) + m.list.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, list, events, binaryCall) + cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall) continue } - list.Add(ctx, shim) + m.list.Add(ctx, shim) } return nil } -func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) error { +func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err } - dirs, err := os.ReadDir(filepath.Join(root, ns)) + dirs, err := os.ReadDir(filepath.Join(m.root, ns)) if err != nil { return err } @@ -140,8 +133,8 @@ func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) e // if the task was not loaded, cleanup and empty working directory // this can happen on a reboot where /run for the bundle state is cleaned up // but that persistent working dir is left - if _, err := list.Get(ctx, d.Name()); err != nil { - path := filepath.Join(root, ns, d.Name()) + if _, err := m.list.Get(ctx, d.Name()); err != nil { + path := filepath.Join(m.root, ns, d.Name()) if err := os.RemoveAll(path); err != nil { log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) }