From b550526ccd54287754bb3afa323e19cf0b753a8b Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 28 Dec 2022 19:56:03 -0800 Subject: [PATCH] Use cleanup.Background instead of context.Background for cleanup Use the cleanup context to re-use values from the original context Signed-off-by: Derek McGowan --- metadata/db.go | 13 ++++++----- pkg/unpack/unpacker.go | 11 ++++----- rootfs/diff.go | 17 ++++++-------- runtime/v1/linux/bundle.go | 2 +- runtime/v1/linux/runtime.go | 45 ++++++++++++++++--------------------- runtime/v2/manager.go | 14 +++++++----- runtime/v2/shim.go | 15 +++---------- runtime/v2/shim_load.go | 7 ++++-- 8 files changed, 56 insertions(+), 68 deletions(-) diff --git a/metadata/db.go b/metadata/db.go index 907b45755..60a65b5b4 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/gc" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/cleanup" "github.com/containerd/containerd/snapshots" bolt "go.etcd.io/bbolt" ) @@ -423,7 +424,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { log.G(ctx).WithField("snapshotter", snapshotterName).Debug("schedule snapshotter cleanup") go func(snapshotterName string) { st1 := time.Now() - m.cleanupSnapshotter(snapshotterName) + m.cleanupSnapshotter(ctx, snapshotterName) sl.Lock() stats.SnapshotD[snapshotterName] = time.Since(st1) @@ -440,7 +441,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { log.G(ctx).Debug("schedule content cleanup") go func() { ct1 := time.Now() - m.cleanupContent() + m.cleanupContent(ctx) stats.ContentD = time.Since(ct1) wg.Done() }() @@ -506,8 +507,8 @@ func (m *DB) getMarked(ctx context.Context, c *gcContext) (map[gc.Node]struct{}, return marked, nil } -func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) { - ctx := context.Background() +func (m *DB) cleanupSnapshotter(ctx context.Context, name string) (time.Duration, error) { + ctx = cleanup.Background(ctx) sn, ok := m.ss[name] if !ok { return 0, nil @@ -523,8 +524,8 @@ func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) { return d, err } -func (m *DB) cleanupContent() (time.Duration, error) { - ctx := context.Background() +func (m *DB) cleanupContent(ctx context.Context) (time.Duration, error) { + ctx = cleanup.Background(ctx) if m.cs == nil { return 0, nil } diff --git a/pkg/unpack/unpacker.go b/pkg/unpack/unpacker.go index b4cba82b9..8d72da5ef 100644 --- a/pkg/unpack/unpacker.go +++ b/pkg/unpack/unpacker.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/labels" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/pkg/cleanup" "github.com/containerd/containerd/pkg/kmutex" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/snapshots" @@ -368,11 +369,11 @@ func (u *Unpacker) unpack( select { case <-ctx.Done(): - abort(context.Background()) // Cleanup context + cleanup.Do(ctx, abort) return ctx.Err() case err := <-fetchErr: if err != nil { - abort(ctx) + cleanup.Do(ctx, abort) return err } case <-fetchC[i-fetchOffset]: @@ -380,16 +381,16 @@ func (u *Unpacker) unpack( diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...) if err != nil { - abort(ctx) + cleanup.Do(ctx, abort) return fmt.Errorf("failed to extract layer %s: %w", diffIDs[i], err) } if diff.Digest != diffIDs[i] { - abort(ctx) + cleanup.Do(ctx, abort) return fmt.Errorf("wrong diff id calculated on extraction %q", diffIDs[i]) } if err = sn.Commit(ctx, chainID, key, opts...); err != nil { - abort(ctx) + cleanup.Do(ctx, abort) if errdefs.IsAlreadyExists(err) { return nil } diff --git a/rootfs/diff.go b/rootfs/diff.go index 226cebccf..da9dbe275 100644 --- a/rootfs/diff.go +++ b/rootfs/diff.go @@ -22,7 +22,7 @@ import ( "github.com/containerd/containerd/diff" "github.com/containerd/containerd/mount" - "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/cleanup" "github.com/containerd/containerd/snapshots" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -32,13 +32,6 @@ import ( // the content creation and the provided snapshotter and mount differ are used // for calculating the diff. The descriptor for the layer diff is returned. func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter, d diff.Comparer, opts ...diff.Opt) (ocispec.Descriptor, error) { - // dctx is used to handle cleanup things just in case the param ctx - // has been canceled, which causes that the defer cleanup fails. - dctx := context.Background() - if ns, ok := namespaces.Namespace(ctx); ok { - dctx = namespaces.WithNamespace(dctx, ns) - } - info, err := sn.Stat(ctx, snapshotID) if err != nil { return ocispec.Descriptor{}, err @@ -49,7 +42,9 @@ func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter if err != nil { return ocispec.Descriptor{}, err } - defer sn.Remove(dctx, lowerKey) + defer cleanup.Do(ctx, func(ctx context.Context) { + sn.Remove(ctx, lowerKey) + }) var upper []mount.Mount if info.Kind == snapshots.KindActive { @@ -63,7 +58,9 @@ func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter if err != nil { return ocispec.Descriptor{}, err } - defer sn.Remove(dctx, upperKey) + defer cleanup.Do(ctx, func(ctx context.Context) { + sn.Remove(ctx, upperKey) + }) } return d.Compare(ctx, lower, upper, opts...) diff --git a/runtime/v1/linux/bundle.go b/runtime/v1/linux/bundle.go index 0a5d07486..5f492ac9e 100644 --- a/runtime/v1/linux/bundle.go +++ b/runtime/v1/linux/bundle.go @@ -183,7 +183,7 @@ func (b *bundle) Delete() error { if err2 == nil { return err } - return fmt.Errorf("Failed to remove both bundle and workdir locations: %v: %w", err2, err) + return fmt.Errorf("failed to remove both bundle and workdir locations: %v: %w", err2, err) } func (b *bundle) legacyShimAddress(namespace string) string { diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index df7614829..f63d0fe26 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -37,6 +37,7 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/cleanup" "github.com/containerd/containerd/pkg/process" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" @@ -165,6 +166,10 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts if err != nil { return nil, err } + ctx = log.WithLogger(ctx, log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": namespace, + })) if err := identifiers.Validate(id); err != nil { return nil, fmt.Errorf("invalid task id: %w", err) @@ -206,11 +211,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return } - if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id); err != nil { - log.G(ctx).WithError(err).WithFields(logrus.Fields{ - "id": id, - "namespace": namespace, - }).Warn("failed to clean up after killed shim") + if err = r.cleanupAfterDeadShim(cleanup.Background(ctx), bundle, namespace, id); err != nil { + log.G(ctx).WithError(err).Warn("failed to clean up after killed shim") } } shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler) @@ -222,8 +224,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } defer func() { if err != nil { - deferCtx, deferCancel := context.WithTimeout( - namespaces.WithNamespace(context.TODO(), namespace), cleanupTimeout) + deferCtx, deferCancel := context.WithTimeout(cleanup.Background(ctx), cleanupTimeout) defer deferCancel() if kerr := s.KillShim(deferCtx); kerr != nil { log.G(ctx).WithError(kerr).Error("failed to kill shim") @@ -359,6 +360,11 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { filepath.Join(r.root, ns, id), ) ctx = namespaces.WithNamespace(ctx, ns) + ctx = log.WithLogger(ctx, log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + })) + pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile)) shimExit := make(chan struct{}) s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() { @@ -374,10 +380,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { } }), nil) if err != nil { - log.G(ctx).WithError(err).WithFields(logrus.Fields{ - "id": id, - "namespace": ns, - }).Error("connecting to shim") + log.G(ctx).WithError(err).Error("connecting to shim") err := r.cleanupAfterDeadShim(ctx, bundle, ns, id) if err != nil { log.G(ctx).WithError(err).WithField("bundle", bundle.path). @@ -402,11 +405,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { } shimStdoutLog, err := v1.OpenShimStdoutLog(ctx, logDirPath) if err != nil { - log.G(ctx).WithError(err).WithFields(logrus.Fields{ - "id": id, - "namespace": ns, - "logDirPath": logDirPath, - }).Error("opening shim stdout log pipe") + log.G(ctx).WithError(err).WithField("logDirPath", logDirPath). + Error("opening shim stdout log pipe") continue } if r.config.ShimDebug { @@ -417,11 +417,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { shimStderrLog, err := v1.OpenShimStderrLog(ctx, logDirPath) if err != nil { - log.G(ctx).WithError(err).WithFields(logrus.Fields{ - "id": id, - "namespace": ns, - "logDirPath": logDirPath, - }).Error("opening shim stderr log pipe") + log.G(ctx).WithError(err).WithField("logDirPath", logDirPath). + Error("opening shim stderr log pipe") continue } if r.config.ShimDebug { @@ -441,13 +438,9 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { } func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string) error { - log.G(ctx).WithFields(logrus.Fields{ - "id": id, - "namespace": ns, - }).Warn("cleaning up after shim dead") + log.G(ctx).Warn("cleaning up after shim dead") pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile)) - ctx = namespaces.WithNamespace(ctx, ns) if err := r.terminate(ctx, bundle, ns, id); err != nil { if r.config.ShimDebug { return fmt.Errorf("failed to terminate task, leaving bundle for debugging: %w", err) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index fd2717c0d..909023606 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/cleanup" "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" @@ -231,7 +232,7 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO } defer func() { if retErr != nil { - m.cleanupShim(shim) + m.cleanupShim(ctx, shim) } }() @@ -247,6 +248,7 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, if err != nil { return nil, err } + ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns)) topts := opts.TaskOptions if topts == nil || topts.GetValue() == nil { @@ -267,7 +269,7 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, shim, err := b.Start(ctx, protobuf.FromAny(topts), func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, b) + cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b) // Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // would publish taskExit event, but the shim.Delete() would always failed with ttrpc // disconnect and there is no chance to remove this dead task from runtime task lists. @@ -360,8 +362,8 @@ func (m *ShimManager) resolveRuntimePath(runtime string) (string, error) { } // cleanupShim attempts to properly delete and cleanup shim after error -func (m *ShimManager) cleanupShim(shim *shim) { - dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) +func (m *ShimManager) cleanupShim(ctx context.Context, shim *shim) { + dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) defer cancel() _ = shim.Delete(dctx) @@ -429,14 +431,14 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr // NOTE: ctx contains required namespace information. m.manager.shims.Delete(ctx, taskID) - dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) + dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) defer cancel() sandboxed := opts.SandboxID != "" _, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) if errShim != nil { if errdefs.IsDeadlineExceeded(errShim) { - dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) + dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) defer cancel() } diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 88c6aa1bf..3f1c84b87 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -32,7 +32,6 @@ import ( "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/protobuf" ptypes "github.com/containerd/containerd/protobuf/types" @@ -40,7 +39,6 @@ import ( client "github.com/containerd/containerd/runtime/v2/shim" "github.com/containerd/ttrpc" "github.com/hashicorp/go-multierror" - "github.com/sirupsen/logrus" ) const ( @@ -131,21 +129,14 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan return shim, nil } -func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) { - ctx = namespaces.WithNamespace(ctx, ns) +func cleanupAfterDeadShim(ctx context.Context, id string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) { ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) defer cancel() - log.G(ctx).WithFields(logrus.Fields{ - "id": id, - "namespace": ns, - }).Warn("cleaning up after shim disconnected") + log.G(ctx).WithField("id", id).Warn("cleaning up after shim disconnected") response, err := binaryCall.Delete(ctx) if err != nil { - log.G(ctx).WithError(err).WithFields(logrus.Fields{ - "id": id, - "namespace": ns, - }).Warn("failed to clean up after shim disconnected") + log.G(ctx).WithError(err).WithField("id", id).Warn("failed to clean up after shim disconnected") } if _, err := rt.Get(ctx, id); err != nil { diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go index 9047c3eca..b5307ad18 100644 --- a/runtime/v2/shim_load.go +++ b/runtime/v2/shim_load.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/cleanup" ) func (m *ShimManager) loadExistingTasks(ctx context.Context) error { @@ -60,6 +61,8 @@ func (m *ShimManager) loadShims(ctx context.Context) error { if err != nil { return err } + ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns)) + shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) if err != nil { return err @@ -133,12 +136,12 @@ func (m *ShimManager) loadShims(ctx context.Context) error { instance, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, binaryCall) + cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall) // Remove self from the runtime task list. m.shims.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.shims, m.events, binaryCall) + cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall) continue } shim := newShimTask(instance)