diff --git a/api/next.pb.txt b/api/next.pb.txt index 592bab979..24edcb541 100644 --- a/api/next.pb.txt +++ b/api/next.pb.txt @@ -4972,6 +4972,36 @@ file { } syntax: "proto3" } +file { + name: "github.com/containerd/containerd/api/services/streaming/v1/streaming.proto" + package: "containerd.services.streaming.v1" + dependency: "gogoproto/gogo.proto" + dependency: "google/protobuf/any.proto" + message_type { + name: "StreamInit" + field { + name: "id" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "id" + } + } + service { + name: "Streaming" + method { + name: "Stream" + input_type: ".google.protobuf.Any" + output_type: ".google.protobuf.Any" + client_streaming: true + server_streaming: true + } + } + options { + go_package: "github.com/containerd/containerd/api/services/streaming/v1;streaming" + } + syntax: "proto3" +} file { name: "github.com/containerd/containerd/api/types/metrics.proto" package: "containerd.types" diff --git a/api/services/streaming/v1/doc.go b/api/services/streaming/v1/doc.go new file mode 100644 index 000000000..04c4362d8 --- /dev/null +++ b/api/services/streaming/v1/doc.go @@ -0,0 +1,17 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package streaming diff --git a/api/services/streaming/v1/streaming.pb.go b/api/services/streaming/v1/streaming.pb.go new file mode 100644 index 000000000..17ba7c3e1 --- /dev/null +++ b/api/services/streaming/v1/streaming.pb.go @@ -0,0 +1,175 @@ +// +//Copyright The containerd Authors. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.20.1 +// source: github.com/containerd/containerd/api/services/streaming/v1/streaming.proto + +package streaming + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StreamInit struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *StreamInit) Reset() { + *x = StreamInit{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamInit) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamInit) ProtoMessage() {} + +func (x *StreamInit) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamInit.ProtoReflect.Descriptor instead. +func (*StreamInit) Descriptor() ([]byte, []int) { + return file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamInit) GetID() string { + if x != nil { + return x.ID + } + return "" +} + +var File_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto protoreflect.FileDescriptor + +var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc = []byte{ + 0x0a, 0x4a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, + 0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x20, 0x63, 0x6f, + 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x1a, 0x19, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1c, 0x0a, 0x0a, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x32, 0x45, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x38, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x14, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x41, 0x6e, 0x79, 0x1a, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x28, 0x01, 0x30, 0x01, 0x42, 0x46, + 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, + 0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x3b, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescOnce sync.Once + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData = file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc +) + +func file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescGZIP() []byte { + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescOnce.Do(func() { + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData = protoimpl.X.CompressGZIP(file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData) + }) + return file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDescData +} + +var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_goTypes = []interface{}{ + (*StreamInit)(nil), // 0: containerd.services.streaming.v1.StreamInit + (*anypb.Any)(nil), // 1: google.protobuf.Any +} +var file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_depIdxs = []int32{ + 1, // 0: containerd.services.streaming.v1.Streaming.Stream:input_type -> google.protobuf.Any + 1, // 1: containerd.services.streaming.v1.Streaming.Stream:output_type -> google.protobuf.Any + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_init() } +func file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_init() { + if File_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamInit); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_goTypes, + DependencyIndexes: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_depIdxs, + MessageInfos: file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_msgTypes, + }.Build() + File_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto = out.File + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_rawDesc = nil + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_goTypes = nil + file_github_com_containerd_containerd_api_services_streaming_v1_streaming_proto_depIdxs = nil +} diff --git a/api/services/streaming/v1/streaming.proto b/api/services/streaming/v1/streaming.proto new file mode 100644 index 000000000..4c14f2ecf --- /dev/null +++ b/api/services/streaming/v1/streaming.proto @@ -0,0 +1,31 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +package containerd.services.streaming.v1; + +import "google/protobuf/any.proto"; + +option go_package = "github.com/containerd/containerd/api/services/streaming/v1;streaming"; + +service Streaming { + rpc Stream(stream google.protobuf.Any) returns (stream google.protobuf.Any); +} + +message StreamInit { + string id = 1; +} diff --git a/api/services/streaming/v1/streaming_grpc.pb.go b/api/services/streaming/v1/streaming_grpc.pb.go new file mode 100644 index 000000000..a0a0bc59c --- /dev/null +++ b/api/services/streaming/v1/streaming_grpc.pb.go @@ -0,0 +1,138 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.20.1 +// source: github.com/containerd/containerd/api/services/streaming/v1/streaming.proto + +package streaming + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + anypb "google.golang.org/protobuf/types/known/anypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// StreamingClient is the client API for Streaming service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type StreamingClient interface { + Stream(ctx context.Context, opts ...grpc.CallOption) (Streaming_StreamClient, error) +} + +type streamingClient struct { + cc grpc.ClientConnInterface +} + +func NewStreamingClient(cc grpc.ClientConnInterface) StreamingClient { + return &streamingClient{cc} +} + +func (c *streamingClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Streaming_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Streaming_ServiceDesc.Streams[0], "/containerd.services.streaming.v1.Streaming/Stream", opts...) + if err != nil { + return nil, err + } + x := &streamingStreamClient{stream} + return x, nil +} + +type Streaming_StreamClient interface { + Send(*anypb.Any) error + Recv() (*anypb.Any, error) + grpc.ClientStream +} + +type streamingStreamClient struct { + grpc.ClientStream +} + +func (x *streamingStreamClient) Send(m *anypb.Any) error { + return x.ClientStream.SendMsg(m) +} + +func (x *streamingStreamClient) Recv() (*anypb.Any, error) { + m := new(anypb.Any) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// StreamingServer is the server API for Streaming service. +// All implementations must embed UnimplementedStreamingServer +// for forward compatibility +type StreamingServer interface { + Stream(Streaming_StreamServer) error + mustEmbedUnimplementedStreamingServer() +} + +// UnimplementedStreamingServer must be embedded to have forward compatible implementations. +type UnimplementedStreamingServer struct { +} + +func (UnimplementedStreamingServer) Stream(Streaming_StreamServer) error { + return status.Errorf(codes.Unimplemented, "method Stream not implemented") +} +func (UnimplementedStreamingServer) mustEmbedUnimplementedStreamingServer() {} + +// UnsafeStreamingServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to StreamingServer will +// result in compilation errors. +type UnsafeStreamingServer interface { + mustEmbedUnimplementedStreamingServer() +} + +func RegisterStreamingServer(s grpc.ServiceRegistrar, srv StreamingServer) { + s.RegisterService(&Streaming_ServiceDesc, srv) +} + +func _Streaming_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(StreamingServer).Stream(&streamingStreamServer{stream}) +} + +type Streaming_StreamServer interface { + Send(*anypb.Any) error + Recv() (*anypb.Any, error) + grpc.ServerStream +} + +type streamingStreamServer struct { + grpc.ServerStream +} + +func (x *streamingStreamServer) Send(m *anypb.Any) error { + return x.ServerStream.SendMsg(m) +} + +func (x *streamingStreamServer) Recv() (*anypb.Any, error) { + m := new(anypb.Any) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Streaming_ServiceDesc is the grpc.ServiceDesc for Streaming service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Streaming_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "containerd.services.streaming.v1.Streaming", + HandlerType: (*StreamingServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _Streaming_Stream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "github.com/containerd/containerd/api/services/streaming/v1/streaming.proto", +} diff --git a/cmd/containerd/builtins/builtins.go b/cmd/containerd/builtins/builtins.go index 5fdbfcab7..81e508603 100644 --- a/cmd/containerd/builtins/builtins.go +++ b/cmd/containerd/builtins/builtins.go @@ -24,6 +24,7 @@ import ( _ "github.com/containerd/containerd/leases/plugin" _ "github.com/containerd/containerd/metadata/plugin" _ "github.com/containerd/containerd/pkg/nri/plugin" + _ "github.com/containerd/containerd/plugins/streaming" _ "github.com/containerd/containerd/runtime/restart/monitor" _ "github.com/containerd/containerd/runtime/v2" _ "github.com/containerd/containerd/services/containers" @@ -38,6 +39,7 @@ import ( _ "github.com/containerd/containerd/services/opt" _ "github.com/containerd/containerd/services/sandbox" _ "github.com/containerd/containerd/services/snapshots" + _ "github.com/containerd/containerd/services/streaming" _ "github.com/containerd/containerd/services/tasks" _ "github.com/containerd/containerd/services/version" ) diff --git a/metadata/gc.go b/metadata/gc.go index 288ace95c..87645d6d2 100644 --- a/metadata/gc.go +++ b/metadata/gc.go @@ -46,6 +46,8 @@ const ( ResourceIngest // resourceEnd is the end of specified resource types resourceEnd + // ResourceStream specifies a stream + ResourceStream ) const ( diff --git a/pkg/streaming/streaming.go b/pkg/streaming/streaming.go new file mode 100644 index 000000000..b89821d23 --- /dev/null +++ b/pkg/streaming/streaming.go @@ -0,0 +1,47 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package streaming + +import ( + "context" + + "github.com/containerd/typeurl" +) + +type StreamManager interface { + StreamGetter + Register(context.Context, string, Stream) error +} + +type StreamGetter interface { + Get(context.Context, string) (Stream, error) +} + +type StreamCreator interface { + Create(context.Context, string) (Stream, error) +} + +type Stream interface { + // Send sends the object on the stream + Send(typeurl.Any) error + + // Recv receives an object on the stream + Recv() (typeurl.Any, error) + + // Close closes the stream + Close() error +} diff --git a/plugin/plugin.go b/plugin/plugin.go index d4d3eaea4..1883d97d9 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -78,6 +78,8 @@ const ( EventPlugin Type = "io.containerd.event.v1" // LeasePlugin implements lease manager LeasePlugin Type = "io.containerd.lease.v1" + // Streaming implements a stream manager + StreamingPlugin Type = "io.containerd.streaming.v1" // TracingProcessorPlugin implements a open telemetry span processor TracingProcessorPlugin Type = "io.containerd.tracing.processor.v1" // NRIApiPlugin implements the NRI adaptation interface for containerd. diff --git a/plugins/streaming/manager.go b/plugins/streaming/manager.go new file mode 100644 index 000000000..6d0d52a80 --- /dev/null +++ b/plugins/streaming/manager.go @@ -0,0 +1,257 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package streaming + +import ( + "context" + "sync" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/streaming" + "github.com/containerd/containerd/plugin" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.StreamingPlugin, + ID: "manager", + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + md, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + + sm := &streamManager{ + streams: map[string]map[string]*managedStream{}, + byLease: map[string]map[string]map[string]struct{}{}, + } + md.(*metadata.DB).RegisterCollectibleResource(metadata.ResourceStream, sm) + return sm, nil + }, + }) +} + +type streamManager struct { + // streams maps namespace -> name -> stream + streams map[string]map[string]*managedStream + + byLease map[string]map[string]map[string]struct{} + + rwlock sync.RWMutex +} + +func (sm *streamManager) Register(ctx context.Context, name string, stream streaming.Stream) error { + ns, _ := namespaces.Namespace(ctx) + ls, _ := leases.FromContext(ctx) + + ms := &managedStream{ + Stream: stream, + ns: ns, + name: name, + lease: ls, + manager: sm, + } + + sm.rwlock.Lock() + defer sm.rwlock.Unlock() + nsMap, ok := sm.streams[ns] + if !ok { + nsMap = make(map[string]*managedStream) + sm.streams[ns] = nsMap + } + if _, ok := nsMap[name]; ok { + return errdefs.ErrAlreadyExists + } + nsMap[name] = ms + + if ls != "" { + nsMap, ok := sm.byLease[ns] + if !ok { + nsMap = make(map[string]map[string]struct{}) + sm.byLease[ns] = nsMap + } + lsMap, ok := nsMap[ls] + if !ok { + lsMap = make(map[string]struct{}) + nsMap[ls] = lsMap + } + lsMap[name] = struct{}{} + } + return nil +} + +func (sm *streamManager) Get(ctx context.Context, name string) (streaming.Stream, error) { + ns, _ := namespaces.Namespace(ctx) + sm.rwlock.RLock() + defer sm.rwlock.RUnlock() + + nsMap, ok := sm.streams[ns] + if !ok { + return nil, errdefs.ErrNotFound + } + stream, ok := nsMap[name] + if !ok { + return nil, errdefs.ErrNotFound + } + + return stream, nil +} + +func (sm *streamManager) StartCollection(context.Context) (metadata.CollectionContext, error) { + // lock now and collection will unlock + sm.rwlock.Lock() + + return &collectionContext{ + manager: sm, + }, nil +} + +func (sm *streamManager) ReferenceLabel() string { + return "stream" +} + +type managedStream struct { + streaming.Stream + + ns string + name string + lease string + manager *streamManager +} + +func (m *managedStream) Close() error { + m.manager.rwlock.Lock() + if nsMap, ok := m.manager.streams[m.ns]; ok { + delete(nsMap, m.name) + if len(nsMap) == 0 { + delete(m.manager.streams, m.ns) + } + } + if m.lease != "" { + if nsMap, ok := m.manager.byLease[m.ns]; ok { + if lsMap, ok := nsMap[m.lease]; ok { + delete(lsMap, m.name) + if len(lsMap) == 0 { + delete(nsMap, m.lease) + } + } + if len(nsMap) == 0 { + delete(m.manager.byLease, m.ns) + } + } + } + + m.manager.rwlock.Unlock() + return m.Stream.Close() +} + +type collectionContext struct { + manager *streamManager + removed []gc.Node +} + +func (cc *collectionContext) All(fn func(gc.Node)) { + for ns, nsMap := range cc.manager.streams { + for name := range nsMap { + fn(gc.Node{ + Type: metadata.ResourceStream, + Namespace: ns, + Key: name, + }) + } + } + +} + +func (cc *collectionContext) Active(ns string, fn func(gc.Node)) { + if nsMap, ok := cc.manager.streams[ns]; ok { + for name, stream := range nsMap { + // Don't consider leased streams as active, the lease + // will determine the status + // TODO: expire non-active streams + if stream.lease == "" { + fn(gc.Node{ + Type: metadata.ResourceStream, + Namespace: ns, + Key: name, + }) + } + } + } +} + +func (cc *collectionContext) Leased(ns, lease string, fn func(gc.Node)) { + if nsMap, ok := cc.manager.byLease[ns]; ok { + if lsMap, ok := nsMap[lease]; ok { + for name := range lsMap { + fn(gc.Node{ + Type: metadata.ResourceStream, + Namespace: ns, + Key: name, + }) + } + } + } +} + +func (cc *collectionContext) Remove(n gc.Node) { + cc.removed = append(cc.removed, n) +} + +func (cc *collectionContext) Cancel() error { + cc.manager.rwlock.Unlock() + return nil +} + +func (cc *collectionContext) Finish() error { + defer cc.manager.rwlock.Unlock() + for _, node := range cc.removed { + var lease string + if nsMap, ok := cc.manager.streams[node.Namespace]; ok { + if ms, ok := nsMap[node.Key]; ok { + delete(nsMap, node.Key) + ms.Close() + lease = ms.lease + } + if len(nsMap) == 0 { + delete(cc.manager.streams, node.Namespace) + } + } + if lease != "" { + if nsMap, ok := cc.manager.byLease[node.Namespace]; ok { + if lsMap, ok := nsMap[lease]; ok { + delete(lsMap, node.Key) + if len(lsMap) == 0 { + delete(nsMap, lease) + } + } + if len(nsMap) == 0 { + delete(cc.manager.byLease, node.Namespace) + } + } + } + } + + return nil +} diff --git a/services/services.go b/services/services.go index 5b47d9f68..26c800fc8 100644 --- a/services/services.go +++ b/services/services.go @@ -37,4 +37,6 @@ const ( SandboxStoreService = "sandbox-store-service" // SandboxControllerService is the id of Sandbox's controller service SandboxControllerService = "sandbox-controller-service" + // Streaming service is the id of the streaming service + StreamingService = "streaming-service" ) diff --git a/services/streaming/service.go b/services/streaming/service.go new file mode 100644 index 000000000..ffb0ece61 --- /dev/null +++ b/services/streaming/service.go @@ -0,0 +1,123 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package streaming + +import ( + "io" + + api "github.com/containerd/containerd/api/services/streaming/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/streaming" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/protobuf" + ptypes "github.com/containerd/containerd/protobuf/types" + "github.com/containerd/typeurl" + "google.golang.org/grpc" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.GRPCPlugin, + ID: "streaming", + Requires: []plugin.Type{ + plugin.StreamingPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + i, err := ic.GetByID(plugin.StreamingPlugin, "manager") + if err != nil { + return nil, err + } + return &service{manager: i.(streaming.StreamManager)}, nil + }, + }) +} + +type service struct { + manager streaming.StreamManager + api.UnimplementedStreamingServer +} + +func (s *service) Register(server *grpc.Server) error { + api.RegisterStreamingServer(server, s) + return nil +} + +func (s *service) Stream(srv api.Streaming_StreamServer) error { + // TODO: Timeout waiting + a, err := srv.Recv() + if err != nil { + return err + } + var i api.StreamInit + if err := typeurl.UnmarshalTo(a, &i); err != nil { + return err + } + + // TODO: Save this response to avoid marshaling everytime + response, err := typeurl.MarshalAny(&ptypes.Empty{}) + if err != nil { + return err + } + if err := srv.Send(protobuf.FromAny(response)); err != nil { + return err + } + + cc := make(chan struct{}) + ss := &serviceStream{ + s: srv, + cc: cc, + } + log.G(srv.Context()).WithField("stream", i.ID).Debug("registering stream") + if err := s.manager.Register(srv.Context(), i.ID, ss); err != nil { + return err + } + + select { + case <-srv.Context().Done(): + // TODO: Should return error if not cancelled? + case <-cc: + } + + return nil +} + +type serviceStream struct { + s api.Streaming_StreamServer + cc chan struct{} +} + +func (ss *serviceStream) Send(a typeurl.Any) error { + return errdefs.FromGRPC(ss.s.Send(protobuf.FromAny(a))) +} + +func (ss *serviceStream) Recv() (a typeurl.Any, err error) { + a, err = ss.s.Recv() + if err != io.EOF { + err = errdefs.FromGRPC(err) + } + return +} + +func (ss *serviceStream) Close() error { + select { + case <-ss.cc: + default: + close(ss.cc) + } + return nil +}