Merge pull request #7861 from dmcgowan/cleanup-context

Add cleanup package for context management during cleanup
This commit is contained in:
Fu Wei 2023-01-05 13:18:31 +08:00 committed by GitHub
commit 5fc727224e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 166 additions and 68 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cleanup"
"github.com/containerd/containerd/snapshots"
bolt "go.etcd.io/bbolt"
)
@ -423,7 +424,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
log.G(ctx).WithField("snapshotter", snapshotterName).Debug("schedule snapshotter cleanup")
go func(snapshotterName string) {
st1 := time.Now()
m.cleanupSnapshotter(snapshotterName)
m.cleanupSnapshotter(ctx, snapshotterName)
sl.Lock()
stats.SnapshotD[snapshotterName] = time.Since(st1)
@ -440,7 +441,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
log.G(ctx).Debug("schedule content cleanup")
go func() {
ct1 := time.Now()
m.cleanupContent()
m.cleanupContent(ctx)
stats.ContentD = time.Since(ct1)
wg.Done()
}()
@ -506,8 +507,8 @@ func (m *DB) getMarked(ctx context.Context, c *gcContext) (map[gc.Node]struct{},
return marked, nil
}
func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) {
ctx := context.Background()
func (m *DB) cleanupSnapshotter(ctx context.Context, name string) (time.Duration, error) {
ctx = cleanup.Background(ctx)
sn, ok := m.ss[name]
if !ok {
return 0, nil
@ -523,8 +524,8 @@ func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) {
return d, err
}
func (m *DB) cleanupContent() (time.Duration, error) {
ctx := context.Background()
func (m *DB) cleanupContent(ctx context.Context) (time.Duration, error) {
ctx = cleanup.Background(ctx)
if m.cs == nil {
return 0, nil
}

52
pkg/cleanup/context.go Normal file
View File

@ -0,0 +1,52 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package providing utilies to help cleanup
package cleanup
import (
"context"
"time"
)
type clearCancel struct {
context.Context
}
func (cc clearCancel) Deadline() (deadline time.Time, ok bool) {
return
}
func (cc clearCancel) Done() <-chan struct{} {
return nil
}
func (cc clearCancel) Err() error {
return nil
}
// Background creates a new context which clears out the parent errors
func Background(ctx context.Context) context.Context {
return clearCancel{ctx}
}
// Do runs the provided function with a context in which the
// errors are cleared out and will timeout after 10 seconds.
func Do(ctx context.Context, do func(context.Context)) {
ctx, cancel := context.WithTimeout(clearCancel{ctx}, 10*time.Second)
do(ctx)
cancel()
}

View File

@ -0,0 +1,58 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cleanup
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestBackground(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var k struct{}
v := "incontext"
ctx = context.WithValue(ctx, k, v)
assert.Nil(t, contextError(ctx))
assert.Equal(t, ctx.Value(k), v)
cancel()
assert.Error(t, contextError(ctx))
assert.Equal(t, ctx.Value(k), v)
// cleanup context should no longer be canceled
ctx = Background(ctx)
assert.Nil(t, contextError(ctx))
assert.Equal(t, ctx.Value(k), v)
// cleanup contexts can be rewrapped in cancel context
ctx, cancel = context.WithCancel(ctx)
cancel()
assert.Error(t, contextError(ctx))
assert.Equal(t, ctx.Value(k), v)
}
func contextError(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

View File

@ -35,6 +35,7 @@ import (
"github.com/containerd/containerd/labels"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/pkg/cleanup"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
@ -368,11 +369,11 @@ func (u *Unpacker) unpack(
select {
case <-ctx.Done():
abort(context.Background()) // Cleanup context
cleanup.Do(ctx, abort)
return ctx.Err()
case err := <-fetchErr:
if err != nil {
abort(ctx)
cleanup.Do(ctx, abort)
return err
}
case <-fetchC[i-fetchOffset]:
@ -380,16 +381,16 @@ func (u *Unpacker) unpack(
diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...)
if err != nil {
abort(ctx)
cleanup.Do(ctx, abort)
return fmt.Errorf("failed to extract layer %s: %w", diffIDs[i], err)
}
if diff.Digest != diffIDs[i] {
abort(ctx)
cleanup.Do(ctx, abort)
return fmt.Errorf("wrong diff id calculated on extraction %q", diffIDs[i])
}
if err = sn.Commit(ctx, chainID, key, opts...); err != nil {
abort(ctx)
cleanup.Do(ctx, abort)
if errdefs.IsAlreadyExists(err) {
return nil
}

View File

@ -22,7 +22,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cleanup"
"github.com/containerd/containerd/snapshots"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
@ -32,13 +32,6 @@ import (
// the content creation and the provided snapshotter and mount differ are used
// for calculating the diff. The descriptor for the layer diff is returned.
func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter, d diff.Comparer, opts ...diff.Opt) (ocispec.Descriptor, error) {
// dctx is used to handle cleanup things just in case the param ctx
// has been canceled, which causes that the defer cleanup fails.
dctx := context.Background()
if ns, ok := namespaces.Namespace(ctx); ok {
dctx = namespaces.WithNamespace(dctx, ns)
}
info, err := sn.Stat(ctx, snapshotID)
if err != nil {
return ocispec.Descriptor{}, err
@ -49,7 +42,9 @@ func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter
if err != nil {
return ocispec.Descriptor{}, err
}
defer sn.Remove(dctx, lowerKey)
defer cleanup.Do(ctx, func(ctx context.Context) {
sn.Remove(ctx, lowerKey)
})
var upper []mount.Mount
if info.Kind == snapshots.KindActive {
@ -63,7 +58,9 @@ func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter
if err != nil {
return ocispec.Descriptor{}, err
}
defer sn.Remove(dctx, upperKey)
defer cleanup.Do(ctx, func(ctx context.Context) {
sn.Remove(ctx, upperKey)
})
}
return d.Compare(ctx, lower, upper, opts...)

View File

@ -183,7 +183,7 @@ func (b *bundle) Delete() error {
if err2 == nil {
return err
}
return fmt.Errorf("Failed to remove both bundle and workdir locations: %v: %w", err2, err)
return fmt.Errorf("failed to remove both bundle and workdir locations: %v: %w", err2, err)
}
func (b *bundle) legacyShimAddress(namespace string) string {

View File

@ -37,6 +37,7 @@ import (
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cleanup"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
@ -165,6 +166,10 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
if err != nil {
return nil, err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
}))
if err := identifiers.Validate(id); err != nil {
return nil, fmt.Errorf("invalid task id: %w", err)
@ -206,11 +211,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return
}
if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
}).Warn("failed to clean up after killed shim")
if err = r.cleanupAfterDeadShim(cleanup.Background(ctx), bundle, namespace, id); err != nil {
log.G(ctx).WithError(err).Warn("failed to clean up after killed shim")
}
}
shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
@ -222,8 +224,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
}
defer func() {
if err != nil {
deferCtx, deferCancel := context.WithTimeout(
namespaces.WithNamespace(context.TODO(), namespace), cleanupTimeout)
deferCtx, deferCancel := context.WithTimeout(cleanup.Background(ctx), cleanupTimeout)
defer deferCancel()
if kerr := s.KillShim(deferCtx); kerr != nil {
log.G(ctx).WithError(kerr).Error("failed to kill shim")
@ -359,6 +360,11 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
filepath.Join(r.root, ns, id),
)
ctx = namespaces.WithNamespace(ctx, ns)
ctx = log.WithLogger(ctx, log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
}))
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
shimExit := make(chan struct{})
s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() {
@ -374,10 +380,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
}
}), nil)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
}).Error("connecting to shim")
log.G(ctx).WithError(err).Error("connecting to shim")
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
@ -402,11 +405,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
}
shimStdoutLog, err := v1.OpenShimStdoutLog(ctx, logDirPath)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
"logDirPath": logDirPath,
}).Error("opening shim stdout log pipe")
log.G(ctx).WithError(err).WithField("logDirPath", logDirPath).
Error("opening shim stdout log pipe")
continue
}
if r.config.ShimDebug {
@ -417,11 +417,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
shimStderrLog, err := v1.OpenShimStderrLog(ctx, logDirPath)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
"logDirPath": logDirPath,
}).Error("opening shim stderr log pipe")
log.G(ctx).WithError(err).WithField("logDirPath", logDirPath).
Error("opening shim stderr log pipe")
continue
}
if r.config.ShimDebug {
@ -441,13 +438,9 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
}
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string) error {
log.G(ctx).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
}).Warn("cleaning up after shim dead")
log.G(ctx).Warn("cleaning up after shim dead")
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
ctx = namespaces.WithNamespace(ctx, ns)
if err := r.terminate(ctx, bundle, ns, id); err != nil {
if r.config.ShimDebug {
return fmt.Errorf("failed to terminate task, leaving bundle for debugging: %w", err)

View File

@ -31,6 +31,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cleanup"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
@ -231,7 +232,7 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO
}
defer func() {
if retErr != nil {
m.cleanupShim(shim)
m.cleanupShim(ctx, shim)
}
}()
@ -247,6 +248,7 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
if err != nil {
return nil, err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns))
topts := opts.TaskOptions
if topts == nil || topts.GetValue() == nil {
@ -267,7 +269,7 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, b)
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
// disconnect and there is no chance to remove this dead task from runtime task lists.
@ -360,8 +362,8 @@ func (m *ShimManager) resolveRuntimePath(runtime string) (string, error) {
}
// cleanupShim attempts to properly delete and cleanup shim after error
func (m *ShimManager) cleanupShim(shim *shim) {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
func (m *ShimManager) cleanupShim(ctx context.Context, shim *shim) {
dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
_ = shim.Delete(dctx)
@ -429,14 +431,14 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
// NOTE: ctx contains required namespace information.
m.manager.shims.Delete(ctx, taskID)
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
sandboxed := opts.SandboxID != ""
_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
}

View File

@ -32,7 +32,6 @@ import (
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/protobuf"
ptypes "github.com/containerd/containerd/protobuf/types"
@ -40,7 +39,6 @@ import (
client "github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/ttrpc"
"github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus"
)
const (
@ -131,21 +129,14 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
return shim, nil
}
func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) {
ctx = namespaces.WithNamespace(ctx, ns)
func cleanupAfterDeadShim(ctx context.Context, id string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) {
ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
defer cancel()
log.G(ctx).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
}).Warn("cleaning up after shim disconnected")
log.G(ctx).WithField("id", id).Warn("cleaning up after shim disconnected")
response, err := binaryCall.Delete(ctx)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
}).Warn("failed to clean up after shim disconnected")
log.G(ctx).WithError(err).WithField("id", id).Warn("failed to clean up after shim disconnected")
}
if _, err := rt.Get(ctx, id); err != nil {

View File

@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cleanup"
)
func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
@ -60,6 +61,8 @@ func (m *ShimManager) loadShims(ctx context.Context) error {
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns))
shimDirs, err := os.ReadDir(filepath.Join(m.state, ns))
if err != nil {
return err
@ -133,12 +136,12 @@ func (m *ShimManager) loadShims(ctx context.Context) error {
instance, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, binaryCall)
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall)
// Remove self from the runtime task list.
m.shims.Delete(ctx, id)
})
if err != nil {
cleanupAfterDeadShim(ctx, id, ns, m.shims, m.events, binaryCall)
cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall)
continue
}
shim := newShimTask(instance)