Fix streaming manager deadlock on collection

Ensure that lock is released and stream is closed.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2023-03-01 09:26:44 -08:00
parent 2be87c1a75
commit 5c6e9f83d4
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
2 changed files with 16 additions and 5 deletions

View File

@ -359,6 +359,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
marked, err := m.getMarked(ctx, c) // Pass in gc context marked, err := m.getMarked(ctx, c) // Pass in gc context
if err != nil { if err != nil {
m.wlock.Unlock() m.wlock.Unlock()
c.cancel(ctx)
return nil, err return nil, err
} }

View File

@ -27,6 +27,8 @@ import (
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/hashicorp/go-multierror"
) )
func init() { func init() {
@ -118,8 +120,8 @@ func (sm *streamManager) Get(ctx context.Context, name string) (streaming.Stream
return stream, nil return stream, nil
} }
func (sm *streamManager) StartCollection(context.Context) (metadata.CollectionContext, error) { func (sm *streamManager) StartCollection(ctx context.Context) (metadata.CollectionContext, error) {
// lock now and collection will unlock // lock now and collection will unlock on cancel or finish
sm.rwlock.Lock() sm.rwlock.Lock()
return &collectionContext{ return &collectionContext{
@ -225,13 +227,13 @@ func (cc *collectionContext) Cancel() error {
} }
func (cc *collectionContext) Finish() error { func (cc *collectionContext) Finish() error {
defer cc.manager.rwlock.Unlock() var closeStreams []streaming.Stream
for _, node := range cc.removed { for _, node := range cc.removed {
var lease string var lease string
if nsMap, ok := cc.manager.streams[node.Namespace]; ok { if nsMap, ok := cc.manager.streams[node.Namespace]; ok {
if ms, ok := nsMap[node.Key]; ok { if ms, ok := nsMap[node.Key]; ok {
delete(nsMap, node.Key) delete(nsMap, node.Key)
ms.Close() closeStreams = append(closeStreams, ms.Stream)
lease = ms.lease lease = ms.lease
} }
if len(nsMap) == 0 { if len(nsMap) == 0 {
@ -252,6 +254,14 @@ func (cc *collectionContext) Finish() error {
} }
} }
} }
cc.manager.rwlock.Unlock()
return nil var errs *multierror.Error
for _, s := range closeStreams {
if err := s.Close(); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs.ErrorOrNil()
} }