 dcf5687cab
			
		
	
	dcf5687cab
	
	
	
		
			
			Adds a service capable of streaming Any objects bi-directionally. This can be used by services to send data, received data, or to initiate requests from server to client. Signed-off-by: Derek McGowan <derek@mcg.dev>
		
			
				
	
	
		
			258 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			258 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package streaming
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/containerd/containerd/errdefs"
 | |
| 	"github.com/containerd/containerd/gc"
 | |
| 	"github.com/containerd/containerd/leases"
 | |
| 	"github.com/containerd/containerd/metadata"
 | |
| 	"github.com/containerd/containerd/namespaces"
 | |
| 	"github.com/containerd/containerd/pkg/streaming"
 | |
| 	"github.com/containerd/containerd/plugin"
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	plugin.Register(&plugin.Registration{
 | |
| 		Type: plugin.StreamingPlugin,
 | |
| 		ID:   "manager",
 | |
| 		Requires: []plugin.Type{
 | |
| 			plugin.MetadataPlugin,
 | |
| 		},
 | |
| 		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 | |
| 			md, err := ic.Get(plugin.MetadataPlugin)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			sm := &streamManager{
 | |
| 				streams: map[string]map[string]*managedStream{},
 | |
| 				byLease: map[string]map[string]map[string]struct{}{},
 | |
| 			}
 | |
| 			md.(*metadata.DB).RegisterCollectibleResource(metadata.ResourceStream, sm)
 | |
| 			return sm, nil
 | |
| 		},
 | |
| 	})
 | |
| }
 | |
| 
 | |
| type streamManager struct {
 | |
| 	// streams maps namespace -> name -> stream
 | |
| 	streams map[string]map[string]*managedStream
 | |
| 
 | |
| 	byLease map[string]map[string]map[string]struct{}
 | |
| 
 | |
| 	rwlock sync.RWMutex
 | |
| }
 | |
| 
 | |
| func (sm *streamManager) Register(ctx context.Context, name string, stream streaming.Stream) error {
 | |
| 	ns, _ := namespaces.Namespace(ctx)
 | |
| 	ls, _ := leases.FromContext(ctx)
 | |
| 
 | |
| 	ms := &managedStream{
 | |
| 		Stream:  stream,
 | |
| 		ns:      ns,
 | |
| 		name:    name,
 | |
| 		lease:   ls,
 | |
| 		manager: sm,
 | |
| 	}
 | |
| 
 | |
| 	sm.rwlock.Lock()
 | |
| 	defer sm.rwlock.Unlock()
 | |
| 	nsMap, ok := sm.streams[ns]
 | |
| 	if !ok {
 | |
| 		nsMap = make(map[string]*managedStream)
 | |
| 		sm.streams[ns] = nsMap
 | |
| 	}
 | |
| 	if _, ok := nsMap[name]; ok {
 | |
| 		return errdefs.ErrAlreadyExists
 | |
| 	}
 | |
| 	nsMap[name] = ms
 | |
| 
 | |
| 	if ls != "" {
 | |
| 		nsMap, ok := sm.byLease[ns]
 | |
| 		if !ok {
 | |
| 			nsMap = make(map[string]map[string]struct{})
 | |
| 			sm.byLease[ns] = nsMap
 | |
| 		}
 | |
| 		lsMap, ok := nsMap[ls]
 | |
| 		if !ok {
 | |
| 			lsMap = make(map[string]struct{})
 | |
| 			nsMap[ls] = lsMap
 | |
| 		}
 | |
| 		lsMap[name] = struct{}{}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sm *streamManager) Get(ctx context.Context, name string) (streaming.Stream, error) {
 | |
| 	ns, _ := namespaces.Namespace(ctx)
 | |
| 	sm.rwlock.RLock()
 | |
| 	defer sm.rwlock.RUnlock()
 | |
| 
 | |
| 	nsMap, ok := sm.streams[ns]
 | |
| 	if !ok {
 | |
| 		return nil, errdefs.ErrNotFound
 | |
| 	}
 | |
| 	stream, ok := nsMap[name]
 | |
| 	if !ok {
 | |
| 		return nil, errdefs.ErrNotFound
 | |
| 	}
 | |
| 
 | |
| 	return stream, nil
 | |
| }
 | |
| 
 | |
| func (sm *streamManager) StartCollection(context.Context) (metadata.CollectionContext, error) {
 | |
| 	// lock now and collection will unlock
 | |
| 	sm.rwlock.Lock()
 | |
| 
 | |
| 	return &collectionContext{
 | |
| 		manager: sm,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (sm *streamManager) ReferenceLabel() string {
 | |
| 	return "stream"
 | |
| }
 | |
| 
 | |
| type managedStream struct {
 | |
| 	streaming.Stream
 | |
| 
 | |
| 	ns      string
 | |
| 	name    string
 | |
| 	lease   string
 | |
| 	manager *streamManager
 | |
| }
 | |
| 
 | |
| func (m *managedStream) Close() error {
 | |
| 	m.manager.rwlock.Lock()
 | |
| 	if nsMap, ok := m.manager.streams[m.ns]; ok {
 | |
| 		delete(nsMap, m.name)
 | |
| 		if len(nsMap) == 0 {
 | |
| 			delete(m.manager.streams, m.ns)
 | |
| 		}
 | |
| 	}
 | |
| 	if m.lease != "" {
 | |
| 		if nsMap, ok := m.manager.byLease[m.ns]; ok {
 | |
| 			if lsMap, ok := nsMap[m.lease]; ok {
 | |
| 				delete(lsMap, m.name)
 | |
| 				if len(lsMap) == 0 {
 | |
| 					delete(nsMap, m.lease)
 | |
| 				}
 | |
| 			}
 | |
| 			if len(nsMap) == 0 {
 | |
| 				delete(m.manager.byLease, m.ns)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	m.manager.rwlock.Unlock()
 | |
| 	return m.Stream.Close()
 | |
| }
 | |
| 
 | |
| type collectionContext struct {
 | |
| 	manager *streamManager
 | |
| 	removed []gc.Node
 | |
| }
 | |
| 
 | |
| func (cc *collectionContext) All(fn func(gc.Node)) {
 | |
| 	for ns, nsMap := range cc.manager.streams {
 | |
| 		for name := range nsMap {
 | |
| 			fn(gc.Node{
 | |
| 				Type:      metadata.ResourceStream,
 | |
| 				Namespace: ns,
 | |
| 				Key:       name,
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| }
 | |
| 
 | |
| func (cc *collectionContext) Active(ns string, fn func(gc.Node)) {
 | |
| 	if nsMap, ok := cc.manager.streams[ns]; ok {
 | |
| 		for name, stream := range nsMap {
 | |
| 			// Don't consider leased streams as active, the lease
 | |
| 			// will determine the status
 | |
| 			// TODO: expire non-active streams
 | |
| 			if stream.lease == "" {
 | |
| 				fn(gc.Node{
 | |
| 					Type:      metadata.ResourceStream,
 | |
| 					Namespace: ns,
 | |
| 					Key:       name,
 | |
| 				})
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cc *collectionContext) Leased(ns, lease string, fn func(gc.Node)) {
 | |
| 	if nsMap, ok := cc.manager.byLease[ns]; ok {
 | |
| 		if lsMap, ok := nsMap[lease]; ok {
 | |
| 			for name := range lsMap {
 | |
| 				fn(gc.Node{
 | |
| 					Type:      metadata.ResourceStream,
 | |
| 					Namespace: ns,
 | |
| 					Key:       name,
 | |
| 				})
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cc *collectionContext) Remove(n gc.Node) {
 | |
| 	cc.removed = append(cc.removed, n)
 | |
| }
 | |
| 
 | |
| func (cc *collectionContext) Cancel() error {
 | |
| 	cc.manager.rwlock.Unlock()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (cc *collectionContext) Finish() error {
 | |
| 	defer cc.manager.rwlock.Unlock()
 | |
| 	for _, node := range cc.removed {
 | |
| 		var lease string
 | |
| 		if nsMap, ok := cc.manager.streams[node.Namespace]; ok {
 | |
| 			if ms, ok := nsMap[node.Key]; ok {
 | |
| 				delete(nsMap, node.Key)
 | |
| 				ms.Close()
 | |
| 				lease = ms.lease
 | |
| 			}
 | |
| 			if len(nsMap) == 0 {
 | |
| 				delete(cc.manager.streams, node.Namespace)
 | |
| 			}
 | |
| 		}
 | |
| 		if lease != "" {
 | |
| 			if nsMap, ok := cc.manager.byLease[node.Namespace]; ok {
 | |
| 				if lsMap, ok := nsMap[lease]; ok {
 | |
| 					delete(lsMap, node.Key)
 | |
| 					if len(lsMap) == 0 {
 | |
| 						delete(nsMap, lease)
 | |
| 					}
 | |
| 				}
 | |
| 				if len(nsMap) == 0 {
 | |
| 					delete(cc.manager.byLease, node.Namespace)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |