Add streaming service

Adds a service capable of streaming Any objects bi-directionally.
This can be used by services to send data, received data, or to
initiate requests from server to client.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-04-12 18:44:22 -07:00
parent c469f67a2b
commit dcf5687cab
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
12 changed files with 826 additions and 0 deletions

View File

@ -4972,6 +4972,36 @@ file {
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/services/streaming/v1/streaming.proto"
package: "containerd.services.streaming.v1"
dependency: "gogoproto/gogo.proto"
dependency: "google/protobuf/any.proto"
message_type {
name: "StreamInit"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
}
service {
name: "Streaming"
method {
name: "Stream"
input_type: ".google.protobuf.Any"
output_type: ".google.protobuf.Any"
client_streaming: true
server_streaming: true
}
}
options {
go_package: "github.com/containerd/containerd/api/services/streaming/v1;streaming"
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/types/metrics.proto"
package: "containerd.types"

View File

@ -0,0 +1,17 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming

View File

@ -0,0 +1,175 @@
//
//Copyright The containerd Authors.
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.20.1
// source: github.com/containerd/containerd/api/services/streaming/v1/streaming.proto
package streaming
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
anypb "google.golang.org/protobuf/types/known/anypb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type StreamInit struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
}
func (x *StreamInit) Reset() {
*x = StreamInit{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamInit) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamInit) ProtoMessage() {}
func (x *StreamInit) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamInit.ProtoReflect.Descriptor instead.
func (*StreamInit) Descriptor() ([]byte, []int) {
return file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescGZIP(), []int{0}
}
func (x *StreamInit) GetID() string {
if x != nil {
return x.ID
}
return ""
}
var File_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto protoreflect.FileDescriptor
var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc = []byte{
0x0a, 0x4a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e,
0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65,
0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x20, 0x63, 0x6f,
0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x73, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x1a, 0x19,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f,
0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1c, 0x0a, 0x0a, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x32, 0x45, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x69, 0x6e, 0x67, 0x12, 0x38, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x14,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2e, 0x41, 0x6e, 0x79, 0x1a, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x28, 0x01, 0x30, 0x01, 0x42, 0x46,
0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e,
0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65,
0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x3b, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescOnce sync.Once
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData = file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc
)
func file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescGZIP() []byte {
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescOnce.Do(func() {
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData = protoimpl.X.CompressGZIP(file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData)
})
return file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData
}
var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_goTypes = []interface{}{
(*StreamInit)(nil), // 0: containerd.services.streaming.v1.StreamInit
(*anypb.Any)(nil), // 1: google.protobuf.Any
}
var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_depIdxs = []int32{
1, // 0: containerd.services.streaming.v1.Streaming.Stream:input_type -> google.protobuf.Any
1, // 1: containerd.services.streaming.v1.Streaming.Stream:output_type -> google.protobuf.Any
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_init() }
func file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_init() {
if File_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamInit); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_goTypes,
DependencyIndexes: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_depIdxs,
MessageInfos: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes,
}.Build()
File_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto = out.File
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc = nil
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_goTypes = nil
file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_depIdxs = nil
}

View File

@ -0,0 +1,31 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package containerd.services.streaming.v1;
import "google/protobuf/any.proto";
option go_package = "github.com/containerd/containerd/api/services/streaming/v1;streaming";
service Streaming {
rpc Stream(stream google.protobuf.Any) returns (stream google.protobuf.Any);
}
message StreamInit {
string id = 1;
}

View File

@ -0,0 +1,138 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.20.1
// source: github.com/containerd/containerd/api/services/streaming/v1/streaming.proto
package streaming
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
anypb "google.golang.org/protobuf/types/known/anypb"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// StreamingClient is the client API for Streaming service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type StreamingClient interface {
Stream(ctx context.Context, opts ...grpc.CallOption) (Streaming_StreamClient, error)
}
type streamingClient struct {
cc grpc.ClientConnInterface
}
func NewStreamingClient(cc grpc.ClientConnInterface) StreamingClient {
return &streamingClient{cc}
}
func (c *streamingClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Streaming_StreamClient, error) {
stream, err := c.cc.NewStream(ctx, &Streaming_ServiceDesc.Streams[0], "/containerd.services.streaming.v1.Streaming/Stream", opts...)
if err != nil {
return nil, err
}
x := &streamingStreamClient{stream}
return x, nil
}
type Streaming_StreamClient interface {
Send(*anypb.Any) error
Recv() (*anypb.Any, error)
grpc.ClientStream
}
type streamingStreamClient struct {
grpc.ClientStream
}
func (x *streamingStreamClient) Send(m *anypb.Any) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamingStreamClient) Recv() (*anypb.Any, error) {
m := new(anypb.Any)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StreamingServer is the server API for Streaming service.
// All implementations must embed UnimplementedStreamingServer
// for forward compatibility
type StreamingServer interface {
Stream(Streaming_StreamServer) error
mustEmbedUnimplementedStreamingServer()
}
// UnimplementedStreamingServer must be embedded to have forward compatible implementations.
type UnimplementedStreamingServer struct {
}
func (UnimplementedStreamingServer) Stream(Streaming_StreamServer) error {
return status.Errorf(codes.Unimplemented, "method Stream not implemented")
}
func (UnimplementedStreamingServer) mustEmbedUnimplementedStreamingServer() {}
// UnsafeStreamingServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to StreamingServer will
// result in compilation errors.
type UnsafeStreamingServer interface {
mustEmbedUnimplementedStreamingServer()
}
func RegisterStreamingServer(s grpc.ServiceRegistrar, srv StreamingServer) {
s.RegisterService(&Streaming_ServiceDesc, srv)
}
func _Streaming_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamingServer).Stream(&streamingStreamServer{stream})
}
type Streaming_StreamServer interface {
Send(*anypb.Any) error
Recv() (*anypb.Any, error)
grpc.ServerStream
}
type streamingStreamServer struct {
grpc.ServerStream
}
func (x *streamingStreamServer) Send(m *anypb.Any) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamingStreamServer) Recv() (*anypb.Any, error) {
m := new(anypb.Any)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Streaming_ServiceDesc is the grpc.ServiceDesc for Streaming service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Streaming_ServiceDesc = grpc.ServiceDesc{
ServiceName: "containerd.services.streaming.v1.Streaming",
HandlerType: (*StreamingServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Stream",
Handler: _Streaming_Stream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "github.com/containerd/containerd/api/services/streaming/v1/streaming.proto",
}

View File

@ -24,6 +24,7 @@ import (
_ "github.com/containerd/containerd/leases/plugin"
_ "github.com/containerd/containerd/metadata/plugin"
_ "github.com/containerd/containerd/pkg/nri/plugin"
_ "github.com/containerd/containerd/plugins/streaming"
_ "github.com/containerd/containerd/runtime/restart/monitor"
_ "github.com/containerd/containerd/runtime/v2"
_ "github.com/containerd/containerd/services/containers"
@ -38,6 +39,7 @@ import (
_ "github.com/containerd/containerd/services/opt"
_ "github.com/containerd/containerd/services/sandbox"
_ "github.com/containerd/containerd/services/snapshots"
_ "github.com/containerd/containerd/services/streaming"
_ "github.com/containerd/containerd/services/tasks"
_ "github.com/containerd/containerd/services/version"
)

View File

@ -46,6 +46,8 @@ const (
ResourceIngest
// resourceEnd is the end of specified resource types
resourceEnd
// ResourceStream specifies a stream
ResourceStream
)
const (

View File

@ -0,0 +1,47 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"context"
"github.com/containerd/typeurl"
)
type StreamManager interface {
StreamGetter
Register(context.Context, string, Stream) error
}
type StreamGetter interface {
Get(context.Context, string) (Stream, error)
}
type StreamCreator interface {
Create(context.Context, string) (Stream, error)
}
type Stream interface {
// Send sends the object on the stream
Send(typeurl.Any) error
// Recv receives an object on the stream
Recv() (typeurl.Any, error)
// Close closes the stream
Close() error
}

View File

@ -78,6 +78,8 @@ const (
EventPlugin Type = "io.containerd.event.v1"
// LeasePlugin implements lease manager
LeasePlugin Type = "io.containerd.lease.v1"
// Streaming implements a stream manager
StreamingPlugin Type = "io.containerd.streaming.v1"
// TracingProcessorPlugin implements a open telemetry span processor
TracingProcessorPlugin Type = "io.containerd.tracing.processor.v1"
// NRIApiPlugin implements the NRI adaptation interface for containerd.

View File

@ -0,0 +1,257 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"context"
"sync"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/plugin"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.StreamingPlugin,
ID: "manager",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
md, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
sm := &streamManager{
streams: map[string]map[string]*managedStream{},
byLease: map[string]map[string]map[string]struct{}{},
}
md.(*metadata.DB).RegisterCollectibleResource(metadata.ResourceStream, sm)
return sm, nil
},
})
}
type streamManager struct {
// streams maps namespace -> name -> stream
streams map[string]map[string]*managedStream
byLease map[string]map[string]map[string]struct{}
rwlock sync.RWMutex
}
func (sm *streamManager) Register(ctx context.Context, name string, stream streaming.Stream) error {
ns, _ := namespaces.Namespace(ctx)
ls, _ := leases.FromContext(ctx)
ms := &managedStream{
Stream: stream,
ns: ns,
name: name,
lease: ls,
manager: sm,
}
sm.rwlock.Lock()
defer sm.rwlock.Unlock()
nsMap, ok := sm.streams[ns]
if !ok {
nsMap = make(map[string]*managedStream)
sm.streams[ns] = nsMap
}
if _, ok := nsMap[name]; ok {
return errdefs.ErrAlreadyExists
}
nsMap[name] = ms
if ls != "" {
nsMap, ok := sm.byLease[ns]
if !ok {
nsMap = make(map[string]map[string]struct{})
sm.byLease[ns] = nsMap
}
lsMap, ok := nsMap[ls]
if !ok {
lsMap = make(map[string]struct{})
nsMap[ls] = lsMap
}
lsMap[name] = struct{}{}
}
return nil
}
func (sm *streamManager) Get(ctx context.Context, name string) (streaming.Stream, error) {
ns, _ := namespaces.Namespace(ctx)
sm.rwlock.RLock()
defer sm.rwlock.RUnlock()
nsMap, ok := sm.streams[ns]
if !ok {
return nil, errdefs.ErrNotFound
}
stream, ok := nsMap[name]
if !ok {
return nil, errdefs.ErrNotFound
}
return stream, nil
}
func (sm *streamManager) StartCollection(context.Context) (metadata.CollectionContext, error) {
// lock now and collection will unlock
sm.rwlock.Lock()
return &collectionContext{
manager: sm,
}, nil
}
func (sm *streamManager) ReferenceLabel() string {
return "stream"
}
type managedStream struct {
streaming.Stream
ns string
name string
lease string
manager *streamManager
}
func (m *managedStream) Close() error {
m.manager.rwlock.Lock()
if nsMap, ok := m.manager.streams[m.ns]; ok {
delete(nsMap, m.name)
if len(nsMap) == 0 {
delete(m.manager.streams, m.ns)
}
}
if m.lease != "" {
if nsMap, ok := m.manager.byLease[m.ns]; ok {
if lsMap, ok := nsMap[m.lease]; ok {
delete(lsMap, m.name)
if len(lsMap) == 0 {
delete(nsMap, m.lease)
}
}
if len(nsMap) == 0 {
delete(m.manager.byLease, m.ns)
}
}
}
m.manager.rwlock.Unlock()
return m.Stream.Close()
}
type collectionContext struct {
manager *streamManager
removed []gc.Node
}
func (cc *collectionContext) All(fn func(gc.Node)) {
for ns, nsMap := range cc.manager.streams {
for name := range nsMap {
fn(gc.Node{
Type: metadata.ResourceStream,
Namespace: ns,
Key: name,
})
}
}
}
func (cc *collectionContext) Active(ns string, fn func(gc.Node)) {
if nsMap, ok := cc.manager.streams[ns]; ok {
for name, stream := range nsMap {
// Don't consider leased streams as active, the lease
// will determine the status
// TODO: expire non-active streams
if stream.lease == "" {
fn(gc.Node{
Type: metadata.ResourceStream,
Namespace: ns,
Key: name,
})
}
}
}
}
func (cc *collectionContext) Leased(ns, lease string, fn func(gc.Node)) {
if nsMap, ok := cc.manager.byLease[ns]; ok {
if lsMap, ok := nsMap[lease]; ok {
for name := range lsMap {
fn(gc.Node{
Type: metadata.ResourceStream,
Namespace: ns,
Key: name,
})
}
}
}
}
func (cc *collectionContext) Remove(n gc.Node) {
cc.removed = append(cc.removed, n)
}
func (cc *collectionContext) Cancel() error {
cc.manager.rwlock.Unlock()
return nil
}
func (cc *collectionContext) Finish() error {
defer cc.manager.rwlock.Unlock()
for _, node := range cc.removed {
var lease string
if nsMap, ok := cc.manager.streams[node.Namespace]; ok {
if ms, ok := nsMap[node.Key]; ok {
delete(nsMap, node.Key)
ms.Close()
lease = ms.lease
}
if len(nsMap) == 0 {
delete(cc.manager.streams, node.Namespace)
}
}
if lease != "" {
if nsMap, ok := cc.manager.byLease[node.Namespace]; ok {
if lsMap, ok := nsMap[lease]; ok {
delete(lsMap, node.Key)
if len(lsMap) == 0 {
delete(nsMap, lease)
}
}
if len(nsMap) == 0 {
delete(cc.manager.byLease, node.Namespace)
}
}
}
}
return nil
}

View File

@ -37,4 +37,6 @@ const (
SandboxStoreService = "sandbox-store-service"
// SandboxControllerService is the id of Sandbox's controller service
SandboxControllerService = "sandbox-controller-service"
// Streaming service is the id of the streaming service
StreamingService = "streaming-service"
)

View File

@ -0,0 +1,123 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"io"
api "github.com/containerd/containerd/api/services/streaming/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/protobuf"
ptypes "github.com/containerd/containerd/protobuf/types"
"github.com/containerd/typeurl"
"google.golang.org/grpc"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "streaming",
Requires: []plugin.Type{
plugin.StreamingPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
i, err := ic.GetByID(plugin.StreamingPlugin, "manager")
if err != nil {
return nil, err
}
return &service{manager: i.(streaming.StreamManager)}, nil
},
})
}
type service struct {
manager streaming.StreamManager
api.UnimplementedStreamingServer
}
func (s *service) Register(server *grpc.Server) error {
api.RegisterStreamingServer(server, s)
return nil
}
func (s *service) Stream(srv api.Streaming_StreamServer) error {
// TODO: Timeout waiting
a, err := srv.Recv()
if err != nil {
return err
}
var i api.StreamInit
if err := typeurl.UnmarshalTo(a, &i); err != nil {
return err
}
// TODO: Save this response to avoid marshaling everytime
response, err := typeurl.MarshalAny(&ptypes.Empty{})
if err != nil {
return err
}
if err := srv.Send(protobuf.FromAny(response)); err != nil {
return err
}
cc := make(chan struct{})
ss := &serviceStream{
s: srv,
cc: cc,
}
log.G(srv.Context()).WithField("stream", i.ID).Debug("registering stream")
if err := s.manager.Register(srv.Context(), i.ID, ss); err != nil {
return err
}
select {
case <-srv.Context().Done():
// TODO: Should return error if not cancelled?
case <-cc:
}
return nil
}
type serviceStream struct {
s api.Streaming_StreamServer
cc chan struct{}
}
func (ss *serviceStream) Send(a typeurl.Any) error {
return errdefs.FromGRPC(ss.s.Send(protobuf.FromAny(a)))
}
func (ss *serviceStream) Recv() (a typeurl.Any, err error) {
a, err = ss.s.Recv()
if err != io.EOF {
err = errdefs.FromGRPC(err)
}
return
}
func (ss *serviceStream) Close() error {
select {
case <-ss.cc:
default:
close(ss.cc)
}
return nil
}