Merge pull request #8188 from dmcgowan/fix-streaming-gc-deadlock
Fix streaming manager deadlock on collection
This commit is contained in:
commit
4ced1fa69e
@ -359,6 +359,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
|
|||||||
marked, err := m.getMarked(ctx, c) // Pass in gc context
|
marked, err := m.getMarked(ctx, c) // Pass in gc context
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.wlock.Unlock()
|
m.wlock.Unlock()
|
||||||
|
c.cancel(ctx)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +27,8 @@ import (
|
|||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
"github.com/containerd/containerd/pkg/streaming"
|
"github.com/containerd/containerd/pkg/streaming"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -118,8 +120,8 @@ func (sm *streamManager) Get(ctx context.Context, name string) (streaming.Stream
|
|||||||
return stream, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *streamManager) StartCollection(context.Context) (metadata.CollectionContext, error) {
|
func (sm *streamManager) StartCollection(ctx context.Context) (metadata.CollectionContext, error) {
|
||||||
// lock now and collection will unlock
|
// lock now and collection will unlock on cancel or finish
|
||||||
sm.rwlock.Lock()
|
sm.rwlock.Lock()
|
||||||
|
|
||||||
return &collectionContext{
|
return &collectionContext{
|
||||||
@ -225,13 +227,13 @@ func (cc *collectionContext) Cancel() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cc *collectionContext) Finish() error {
|
func (cc *collectionContext) Finish() error {
|
||||||
defer cc.manager.rwlock.Unlock()
|
var closeStreams []streaming.Stream
|
||||||
for _, node := range cc.removed {
|
for _, node := range cc.removed {
|
||||||
var lease string
|
var lease string
|
||||||
if nsMap, ok := cc.manager.streams[node.Namespace]; ok {
|
if nsMap, ok := cc.manager.streams[node.Namespace]; ok {
|
||||||
if ms, ok := nsMap[node.Key]; ok {
|
if ms, ok := nsMap[node.Key]; ok {
|
||||||
delete(nsMap, node.Key)
|
delete(nsMap, node.Key)
|
||||||
ms.Close()
|
closeStreams = append(closeStreams, ms.Stream)
|
||||||
lease = ms.lease
|
lease = ms.lease
|
||||||
}
|
}
|
||||||
if len(nsMap) == 0 {
|
if len(nsMap) == 0 {
|
||||||
@ -252,6 +254,14 @@ func (cc *collectionContext) Finish() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cc.manager.rwlock.Unlock()
|
||||||
|
|
||||||
return nil
|
var errs *multierror.Error
|
||||||
|
for _, s := range closeStreams {
|
||||||
|
if err := s.Close(); err != nil {
|
||||||
|
errs = multierror.Append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return errs.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user