From 5c6e9f83d41a61dceb803463851e09bb6e67f78e Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 1 Mar 2023 09:26:44 -0800 Subject: [PATCH] Fix streaming manager deadlock on collection Ensure that lock is released and stream is closed. Signed-off-by: Derek McGowan --- metadata/db.go | 1 + plugins/streaming/manager.go | 20 +++++++++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/metadata/db.go b/metadata/db.go index 02b223440..8241930a9 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -359,6 +359,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { marked, err := m.getMarked(ctx, c) // Pass in gc context if err != nil { m.wlock.Unlock() + c.cancel(ctx) return nil, err } diff --git a/plugins/streaming/manager.go b/plugins/streaming/manager.go index 6d0d52a80..85a4debac 100644 --- a/plugins/streaming/manager.go +++ b/plugins/streaming/manager.go @@ -27,6 +27,8 @@ import ( "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/plugin" + + "github.com/hashicorp/go-multierror" ) func init() { @@ -118,8 +120,8 @@ func (sm *streamManager) Get(ctx context.Context, name string) (streaming.Stream return stream, nil } -func (sm *streamManager) StartCollection(context.Context) (metadata.CollectionContext, error) { - // lock now and collection will unlock +func (sm *streamManager) StartCollection(ctx context.Context) (metadata.CollectionContext, error) { + // lock now and collection will unlock on cancel or finish sm.rwlock.Lock() return &collectionContext{ @@ -225,13 +227,13 @@ func (cc *collectionContext) Cancel() error { } func (cc *collectionContext) Finish() error { - defer cc.manager.rwlock.Unlock() + var closeStreams []streaming.Stream for _, node := range cc.removed { var lease string if nsMap, ok := cc.manager.streams[node.Namespace]; ok { if ms, ok := nsMap[node.Key]; ok { delete(nsMap, node.Key) - ms.Close() + closeStreams = append(closeStreams, ms.Stream) lease = ms.lease } if len(nsMap) == 0 { @@ -252,6 +254,14 @@ func (cc *collectionContext) Finish() error { } } } + cc.manager.rwlock.Unlock() - return nil + var errs *multierror.Error + for _, s := range closeStreams { + if err := s.Close(); err != nil { + errs = multierror.Append(errs, err) + } + } + + return errs.ErrorOrNil() }