Ensure that lock is released and stream is closed. Signed-off-by: Derek McGowan <derek@mcg.dev>
		
			
				
	
	
		
			268 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			268 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
   Copyright The containerd Authors.
 | 
						|
 | 
						|
   Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
   you may not use this file except in compliance with the License.
 | 
						|
   You may obtain a copy of the License at
 | 
						|
 | 
						|
       http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
   Unless required by applicable law or agreed to in writing, software
 | 
						|
   distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
   See the License for the specific language governing permissions and
 | 
						|
   limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package streaming
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/containerd/containerd/errdefs"
 | 
						|
	"github.com/containerd/containerd/gc"
 | 
						|
	"github.com/containerd/containerd/leases"
 | 
						|
	"github.com/containerd/containerd/metadata"
 | 
						|
	"github.com/containerd/containerd/namespaces"
 | 
						|
	"github.com/containerd/containerd/pkg/streaming"
 | 
						|
	"github.com/containerd/containerd/plugin"
 | 
						|
 | 
						|
	"github.com/hashicorp/go-multierror"
 | 
						|
)
 | 
						|
 | 
						|
func init() {
 | 
						|
	plugin.Register(&plugin.Registration{
 | 
						|
		Type: plugin.StreamingPlugin,
 | 
						|
		ID:   "manager",
 | 
						|
		Requires: []plugin.Type{
 | 
						|
			plugin.MetadataPlugin,
 | 
						|
		},
 | 
						|
		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 | 
						|
			md, err := ic.Get(plugin.MetadataPlugin)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			sm := &streamManager{
 | 
						|
				streams: map[string]map[string]*managedStream{},
 | 
						|
				byLease: map[string]map[string]map[string]struct{}{},
 | 
						|
			}
 | 
						|
			md.(*metadata.DB).RegisterCollectibleResource(metadata.ResourceStream, sm)
 | 
						|
			return sm, nil
 | 
						|
		},
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
type streamManager struct {
 | 
						|
	// streams maps namespace -> name -> stream
 | 
						|
	streams map[string]map[string]*managedStream
 | 
						|
 | 
						|
	byLease map[string]map[string]map[string]struct{}
 | 
						|
 | 
						|
	rwlock sync.RWMutex
 | 
						|
}
 | 
						|
 | 
						|
func (sm *streamManager) Register(ctx context.Context, name string, stream streaming.Stream) error {
 | 
						|
	ns, _ := namespaces.Namespace(ctx)
 | 
						|
	ls, _ := leases.FromContext(ctx)
 | 
						|
 | 
						|
	ms := &managedStream{
 | 
						|
		Stream:  stream,
 | 
						|
		ns:      ns,
 | 
						|
		name:    name,
 | 
						|
		lease:   ls,
 | 
						|
		manager: sm,
 | 
						|
	}
 | 
						|
 | 
						|
	sm.rwlock.Lock()
 | 
						|
	defer sm.rwlock.Unlock()
 | 
						|
	nsMap, ok := sm.streams[ns]
 | 
						|
	if !ok {
 | 
						|
		nsMap = make(map[string]*managedStream)
 | 
						|
		sm.streams[ns] = nsMap
 | 
						|
	}
 | 
						|
	if _, ok := nsMap[name]; ok {
 | 
						|
		return errdefs.ErrAlreadyExists
 | 
						|
	}
 | 
						|
	nsMap[name] = ms
 | 
						|
 | 
						|
	if ls != "" {
 | 
						|
		nsMap, ok := sm.byLease[ns]
 | 
						|
		if !ok {
 | 
						|
			nsMap = make(map[string]map[string]struct{})
 | 
						|
			sm.byLease[ns] = nsMap
 | 
						|
		}
 | 
						|
		lsMap, ok := nsMap[ls]
 | 
						|
		if !ok {
 | 
						|
			lsMap = make(map[string]struct{})
 | 
						|
			nsMap[ls] = lsMap
 | 
						|
		}
 | 
						|
		lsMap[name] = struct{}{}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (sm *streamManager) Get(ctx context.Context, name string) (streaming.Stream, error) {
 | 
						|
	ns, _ := namespaces.Namespace(ctx)
 | 
						|
	sm.rwlock.RLock()
 | 
						|
	defer sm.rwlock.RUnlock()
 | 
						|
 | 
						|
	nsMap, ok := sm.streams[ns]
 | 
						|
	if !ok {
 | 
						|
		return nil, errdefs.ErrNotFound
 | 
						|
	}
 | 
						|
	stream, ok := nsMap[name]
 | 
						|
	if !ok {
 | 
						|
		return nil, errdefs.ErrNotFound
 | 
						|
	}
 | 
						|
 | 
						|
	return stream, nil
 | 
						|
}
 | 
						|
 | 
						|
func (sm *streamManager) StartCollection(ctx context.Context) (metadata.CollectionContext, error) {
 | 
						|
	// lock now and collection will unlock on cancel or finish
 | 
						|
	sm.rwlock.Lock()
 | 
						|
 | 
						|
	return &collectionContext{
 | 
						|
		manager: sm,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (sm *streamManager) ReferenceLabel() string {
 | 
						|
	return "stream"
 | 
						|
}
 | 
						|
 | 
						|
type managedStream struct {
 | 
						|
	streaming.Stream
 | 
						|
 | 
						|
	ns      string
 | 
						|
	name    string
 | 
						|
	lease   string
 | 
						|
	manager *streamManager
 | 
						|
}
 | 
						|
 | 
						|
func (m *managedStream) Close() error {
 | 
						|
	m.manager.rwlock.Lock()
 | 
						|
	if nsMap, ok := m.manager.streams[m.ns]; ok {
 | 
						|
		delete(nsMap, m.name)
 | 
						|
		if len(nsMap) == 0 {
 | 
						|
			delete(m.manager.streams, m.ns)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if m.lease != "" {
 | 
						|
		if nsMap, ok := m.manager.byLease[m.ns]; ok {
 | 
						|
			if lsMap, ok := nsMap[m.lease]; ok {
 | 
						|
				delete(lsMap, m.name)
 | 
						|
				if len(lsMap) == 0 {
 | 
						|
					delete(nsMap, m.lease)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			if len(nsMap) == 0 {
 | 
						|
				delete(m.manager.byLease, m.ns)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	m.manager.rwlock.Unlock()
 | 
						|
	return m.Stream.Close()
 | 
						|
}
 | 
						|
 | 
						|
type collectionContext struct {
 | 
						|
	manager *streamManager
 | 
						|
	removed []gc.Node
 | 
						|
}
 | 
						|
 | 
						|
func (cc *collectionContext) All(fn func(gc.Node)) {
 | 
						|
	for ns, nsMap := range cc.manager.streams {
 | 
						|
		for name := range nsMap {
 | 
						|
			fn(gc.Node{
 | 
						|
				Type:      metadata.ResourceStream,
 | 
						|
				Namespace: ns,
 | 
						|
				Key:       name,
 | 
						|
			})
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (cc *collectionContext) Active(ns string, fn func(gc.Node)) {
 | 
						|
	if nsMap, ok := cc.manager.streams[ns]; ok {
 | 
						|
		for name, stream := range nsMap {
 | 
						|
			// Don't consider leased streams as active, the lease
 | 
						|
			// will determine the status
 | 
						|
			// TODO: expire non-active streams
 | 
						|
			if stream.lease == "" {
 | 
						|
				fn(gc.Node{
 | 
						|
					Type:      metadata.ResourceStream,
 | 
						|
					Namespace: ns,
 | 
						|
					Key:       name,
 | 
						|
				})
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (cc *collectionContext) Leased(ns, lease string, fn func(gc.Node)) {
 | 
						|
	if nsMap, ok := cc.manager.byLease[ns]; ok {
 | 
						|
		if lsMap, ok := nsMap[lease]; ok {
 | 
						|
			for name := range lsMap {
 | 
						|
				fn(gc.Node{
 | 
						|
					Type:      metadata.ResourceStream,
 | 
						|
					Namespace: ns,
 | 
						|
					Key:       name,
 | 
						|
				})
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (cc *collectionContext) Remove(n gc.Node) {
 | 
						|
	cc.removed = append(cc.removed, n)
 | 
						|
}
 | 
						|
 | 
						|
func (cc *collectionContext) Cancel() error {
 | 
						|
	cc.manager.rwlock.Unlock()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (cc *collectionContext) Finish() error {
 | 
						|
	var closeStreams []streaming.Stream
 | 
						|
	for _, node := range cc.removed {
 | 
						|
		var lease string
 | 
						|
		if nsMap, ok := cc.manager.streams[node.Namespace]; ok {
 | 
						|
			if ms, ok := nsMap[node.Key]; ok {
 | 
						|
				delete(nsMap, node.Key)
 | 
						|
				closeStreams = append(closeStreams, ms.Stream)
 | 
						|
				lease = ms.lease
 | 
						|
			}
 | 
						|
			if len(nsMap) == 0 {
 | 
						|
				delete(cc.manager.streams, node.Namespace)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if lease != "" {
 | 
						|
			if nsMap, ok := cc.manager.byLease[node.Namespace]; ok {
 | 
						|
				if lsMap, ok := nsMap[lease]; ok {
 | 
						|
					delete(lsMap, node.Key)
 | 
						|
					if len(lsMap) == 0 {
 | 
						|
						delete(nsMap, lease)
 | 
						|
					}
 | 
						|
				}
 | 
						|
				if len(nsMap) == 0 {
 | 
						|
					delete(cc.manager.byLease, node.Namespace)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	cc.manager.rwlock.Unlock()
 | 
						|
 | 
						|
	var errs *multierror.Error
 | 
						|
	for _, s := range closeStreams {
 | 
						|
		if err := s.Close(); err != nil {
 | 
						|
			errs = multierror.Append(errs, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return errs.ErrorOrNil()
 | 
						|
}
 |