Add integration test package

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-03-04 18:07:04 -08:00
parent a323535118
commit 84187a847b
6 changed files with 1217 additions and 0 deletions

View File

@ -16,4 +16,11 @@ generators = ["go"]
# This section maps protobuf imports to Go packages. These will become
# `-M` directives in the call to the go protobuf generator.
[packages]
"google/protobuf/any.proto" = "github.com/gogo/protobuf/types"
"proto/status.proto" = "google.golang.org/genproto/googleapis/rpc/status"
[[overrides]]
# enable ttrpc and disable fieldpath and grpc for the shim
prefixes = ["github.com/containerd/ttrpc/integration/streaming"]
generators = ["go", "go-ttrpc"]
plugins = ["ttrpc"]

View File

@ -0,0 +1,17 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming

View File

@ -0,0 +1,355 @@
//
//Copyright The containerd Authors.
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.11.4
// source: github.com/containerd/ttrpc/integration/streaming/test.proto
package streaming
import (
empty "github.com/golang/protobuf/ptypes/empty"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type EchoPayload struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Seq uint32 `protobuf:"varint,1,opt,name=seq,proto3" json:"seq,omitempty"`
Msg string `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"`
}
func (x *EchoPayload) Reset() {
*x = EchoPayload{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *EchoPayload) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*EchoPayload) ProtoMessage() {}
func (x *EchoPayload) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use EchoPayload.ProtoReflect.Descriptor instead.
func (*EchoPayload) Descriptor() ([]byte, []int) {
return file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescGZIP(), []int{0}
}
func (x *EchoPayload) GetSeq() uint32 {
if x != nil {
return x.Seq
}
return 0
}
func (x *EchoPayload) GetMsg() string {
if x != nil {
return x.Msg
}
return ""
}
type Part struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Add int32 `protobuf:"varint,1,opt,name=add,proto3" json:"add,omitempty"`
}
func (x *Part) Reset() {
*x = Part{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Part) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Part) ProtoMessage() {}
func (x *Part) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Part.ProtoReflect.Descriptor instead.
func (*Part) Descriptor() ([]byte, []int) {
return file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescGZIP(), []int{1}
}
func (x *Part) GetAdd() int32 {
if x != nil {
return x.Add
}
return 0
}
type Sum struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Sum int32 `protobuf:"varint,1,opt,name=sum,proto3" json:"sum,omitempty"`
Num int32 `protobuf:"varint,2,opt,name=num,proto3" json:"num,omitempty"`
}
func (x *Sum) Reset() {
*x = Sum{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Sum) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Sum) ProtoMessage() {}
func (x *Sum) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Sum.ProtoReflect.Descriptor instead.
func (*Sum) Descriptor() ([]byte, []int) {
return file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescGZIP(), []int{2}
}
func (x *Sum) GetSum() int32 {
if x != nil {
return x.Sum
}
return 0
}
func (x *Sum) GetNum() int32 {
if x != nil {
return x.Num
}
return 0
}
var File_github_com_containerd_ttrpc_integration_streaming_test_proto protoreflect.FileDescriptor
var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc = []byte{
0x0a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e,
0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x69, 0x6e,
0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x69, 0x6e, 0x67, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1b,
0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x1a, 0x1b, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70,
0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x31, 0x0a, 0x0b, 0x45, 0x63, 0x68, 0x6f,
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x65, 0x71, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x73, 0x65, 0x71, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x22, 0x18, 0x0a, 0x04, 0x50,
0x61, 0x72, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x64, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05,
0x52, 0x03, 0x61, 0x64, 0x64, 0x22, 0x29, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03,
0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x75, 0x6d, 0x12, 0x10,
0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6e, 0x75, 0x6d,
0x32, 0xa0, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69,
0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
0x1a, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45,
0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x64, 0x0a, 0x0a, 0x45, 0x63,
0x68, 0x6f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63,
0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f,
0x61, 0x64, 0x1a, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67,
0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67,
0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x28, 0x01, 0x30, 0x01,
0x12, 0x52, 0x0a, 0x09, 0x53, 0x75, 0x6d, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x21, 0x2e,
0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x61, 0x72, 0x74,
0x1a, 0x20, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x53,
0x75, 0x6d, 0x28, 0x01, 0x12, 0x55, 0x0a, 0x0c, 0x44, 0x69, 0x76, 0x69, 0x64, 0x65, 0x53, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74,
0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x2e, 0x53, 0x75, 0x6d, 0x1a, 0x21, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69,
0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x30, 0x01, 0x12, 0x4e, 0x0a, 0x08, 0x45,
0x63, 0x68, 0x6f, 0x4e, 0x75, 0x6c, 0x6c, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e,
0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61,
0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28, 0x01, 0x12, 0x56, 0x0a, 0x0e, 0x45,
0x63, 0x68, 0x6f, 0x4e, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x28, 0x2e,
0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f,
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28,
0x01, 0x30, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72,
0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescOnce sync.Once
file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescData = file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc
)
func file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescGZIP() []byte {
file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescOnce.Do(func() {
file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescData)
})
return file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDescData
}
var file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_github_com_containerd_ttrpc_integration_streaming_test_proto_goTypes = []interface{}{
(*EchoPayload)(nil), // 0: ttrpc.integration.streaming.EchoPayload
(*Part)(nil), // 1: ttrpc.integration.streaming.Part
(*Sum)(nil), // 2: ttrpc.integration.streaming.Sum
(*empty.Empty)(nil), // 3: google.protobuf.Empty
}
var file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs = []int32{
0, // 0: ttrpc.integration.streaming.Streaming.Echo:input_type -> ttrpc.integration.streaming.EchoPayload
0, // 1: ttrpc.integration.streaming.Streaming.EchoStream:input_type -> ttrpc.integration.streaming.EchoPayload
1, // 2: ttrpc.integration.streaming.Streaming.SumStream:input_type -> ttrpc.integration.streaming.Part
2, // 3: ttrpc.integration.streaming.Streaming.DivideStream:input_type -> ttrpc.integration.streaming.Sum
0, // 4: ttrpc.integration.streaming.Streaming.EchoNull:input_type -> ttrpc.integration.streaming.EchoPayload
0, // 5: ttrpc.integration.streaming.Streaming.EchoNullStream:input_type -> ttrpc.integration.streaming.EchoPayload
0, // 6: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
0, // 7: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
2, // 8: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
1, // 9: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
3, // 10: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
3, // 11: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
6, // [6:12] is the sub-list for method output_type
0, // [0:6] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_github_com_containerd_ttrpc_integration_streaming_test_proto_init() }
func file_github_com_containerd_ttrpc_integration_streaming_test_proto_init() {
if File_github_com_containerd_ttrpc_integration_streaming_test_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*EchoPayload); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Part); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Sum); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_github_com_containerd_ttrpc_integration_streaming_test_proto_goTypes,
DependencyIndexes: file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs,
MessageInfos: file_github_com_containerd_ttrpc_integration_streaming_test_proto_msgTypes,
}.Build()
File_github_com_containerd_ttrpc_integration_streaming_test_proto = out.File
file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc = nil
file_github_com_containerd_ttrpc_integration_streaming_test_proto_goTypes = nil
file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs = nil
}

View File

@ -0,0 +1,51 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package ttrpc.integration.streaming;
import "google/protobuf/empty.proto";
option go_package = "github.com/containerd/ttrpc/integration/streaming;streaming";
// Shim service is launched for each container and is responsible for owning the IO
// for the container and its additional processes. The shim is also the parent of
// each container and allows reattaching to the IO and receiving the exit status
// for the container processes.
service Streaming {
rpc Echo(EchoPayload) returns (EchoPayload);
rpc EchoStream(stream EchoPayload) returns (stream EchoPayload);
rpc SumStream(stream Part) returns (Sum);
rpc DivideStream(Sum) returns (stream Part);
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
}
message EchoPayload {
uint32 seq = 1;
string msg = 2;
}
message Part {
int32 add = 1;
}
message Sum {
int32 sum = 1;
int32 num = 2;
}

View File

@ -0,0 +1,362 @@
// Code generated by protoc-gen-go-ttrpc. DO NOT EDIT.
// source: github.com/containerd/ttrpc/integration/streaming/test.proto
package streaming
import (
context "context"
ttrpc "github.com/containerd/ttrpc"
empty "github.com/golang/protobuf/ptypes/empty"
)
type StreamingService interface {
Echo(context.Context, *EchoPayload) (*EchoPayload, error)
EchoStream(context.Context, Streaming_EchoStreamServer) error
SumStream(context.Context, Streaming_SumStreamServer) (*Sum, error)
DivideStream(context.Context, *Sum, Streaming_DivideStreamServer) error
EchoNull(context.Context, Streaming_EchoNullServer) (*empty.Empty, error)
EchoNullStream(context.Context, Streaming_EchoNullStreamServer) error
}
type Streaming_EchoStreamServer interface {
Send(*EchoPayload) error
Recv() (*EchoPayload, error)
ttrpc.StreamServer
}
type streamingEchoStreamServer struct {
ttrpc.StreamServer
}
func (x *streamingEchoStreamServer) Send(m *EchoPayload) error {
return x.StreamServer.SendMsg(m)
}
func (x *streamingEchoStreamServer) Recv() (*EchoPayload, error) {
m := new(EchoPayload)
if err := x.StreamServer.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
type Streaming_SumStreamServer interface {
Recv() (*Part, error)
ttrpc.StreamServer
}
type streamingSumStreamServer struct {
ttrpc.StreamServer
}
func (x *streamingSumStreamServer) Recv() (*Part, error) {
m := new(Part)
if err := x.StreamServer.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
type Streaming_DivideStreamServer interface {
Send(*Part) error
ttrpc.StreamServer
}
type streamingDivideStreamServer struct {
ttrpc.StreamServer
}
func (x *streamingDivideStreamServer) Send(m *Part) error {
return x.StreamServer.SendMsg(m)
}
type Streaming_EchoNullServer interface {
Recv() (*EchoPayload, error)
ttrpc.StreamServer
}
type streamingEchoNullServer struct {
ttrpc.StreamServer
}
func (x *streamingEchoNullServer) Recv() (*EchoPayload, error) {
m := new(EchoPayload)
if err := x.StreamServer.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
type Streaming_EchoNullStreamServer interface {
Send(*empty.Empty) error
Recv() (*EchoPayload, error)
ttrpc.StreamServer
}
type streamingEchoNullStreamServer struct {
ttrpc.StreamServer
}
func (x *streamingEchoNullStreamServer) Send(m *empty.Empty) error {
return x.StreamServer.SendMsg(m)
}
func (x *streamingEchoNullStreamServer) Recv() (*EchoPayload, error) {
m := new(EchoPayload)
if err := x.StreamServer.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func RegisterStreamingService(srv *ttrpc.Server, svc StreamingService) {
srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
Methods: map[string]ttrpc.Method{
"Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req EchoPayload
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Echo(ctx, &req)
},
},
Streams: map[string]ttrpc.Stream{
"EchoStream": {
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
return nil, svc.EchoStream(ctx, &streamingEchoStreamServer{stream})
},
StreamingClient: true,
StreamingServer: true,
},
"SumStream": {
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
return svc.SumStream(ctx, &streamingSumStreamServer{stream})
},
StreamingClient: true,
StreamingServer: false,
},
"DivideStream": {
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
m := new(Sum)
if err := stream.RecvMsg(m); err != nil {
return nil, err
}
return nil, svc.DivideStream(ctx, m, &streamingDivideStreamServer{stream})
},
StreamingClient: false,
StreamingServer: true,
},
"EchoNull": {
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
return svc.EchoNull(ctx, &streamingEchoNullServer{stream})
},
StreamingClient: true,
StreamingServer: false,
},
"EchoNullStream": {
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
return nil, svc.EchoNullStream(ctx, &streamingEchoNullStreamServer{stream})
},
StreamingClient: true,
StreamingServer: true,
},
},
})
}
type StreamingClient interface {
Echo(context.Context, *EchoPayload) (*EchoPayload, error)
EchoStream(context.Context) (Streaming_EchoStreamClient, error)
SumStream(context.Context) (Streaming_SumStreamClient, error)
DivideStream(context.Context, *Sum) (Streaming_DivideStreamClient, error)
EchoNull(context.Context) (Streaming_EchoNullClient, error)
EchoNullStream(context.Context) (Streaming_EchoNullStreamClient, error)
}
type streamingClient struct {
client *ttrpc.Client
}
func NewStreamingClient(client *ttrpc.Client) StreamingClient {
return &streamingClient{
client: client,
}
}
func (c *streamingClient) Echo(ctx context.Context, req *EchoPayload) (*EchoPayload, error) {
var resp EchoPayload
if err := c.client.Call(ctx, "ttrpc.integration.streaming.Streaming", "Echo", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *streamingClient) EchoStream(ctx context.Context) (Streaming_EchoStreamClient, error) {
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
StreamingClient: true,
StreamingServer: true,
}, "ttrpc.integration.streaming.Streaming", "EchoStream", nil)
if err != nil {
return nil, err
}
x := &streamingEchoStreamClient{stream}
return x, nil
}
type Streaming_EchoStreamClient interface {
Send(*EchoPayload) error
Recv() (*EchoPayload, error)
ttrpc.ClientStream
}
type streamingEchoStreamClient struct {
ttrpc.ClientStream
}
func (x *streamingEchoStreamClient) Send(m *EchoPayload) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamingEchoStreamClient) Recv() (*EchoPayload, error) {
m := new(EchoPayload)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamingClient) SumStream(ctx context.Context) (Streaming_SumStreamClient, error) {
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
StreamingClient: true,
StreamingServer: false,
}, "ttrpc.integration.streaming.Streaming", "SumStream", nil)
if err != nil {
return nil, err
}
x := &streamingSumStreamClient{stream}
return x, nil
}
type Streaming_SumStreamClient interface {
Send(*Part) error
CloseAndRecv() (*Sum, error)
ttrpc.ClientStream
}
type streamingSumStreamClient struct {
ttrpc.ClientStream
}
func (x *streamingSumStreamClient) Send(m *Part) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamingSumStreamClient) CloseAndRecv() (*Sum, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(Sum)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamingClient) DivideStream(ctx context.Context, req *Sum) (Streaming_DivideStreamClient, error) {
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
StreamingClient: false,
StreamingServer: true,
}, "ttrpc.integration.streaming.Streaming", "DivideStream", req)
if err != nil {
return nil, err
}
x := &streamingDivideStreamClient{stream}
return x, nil
}
type Streaming_DivideStreamClient interface {
Recv() (*Part, error)
ttrpc.ClientStream
}
type streamingDivideStreamClient struct {
ttrpc.ClientStream
}
func (x *streamingDivideStreamClient) Recv() (*Part, error) {
m := new(Part)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamingClient) EchoNull(ctx context.Context) (Streaming_EchoNullClient, error) {
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
StreamingClient: true,
StreamingServer: false,
}, "ttrpc.integration.streaming.Streaming", "EchoNull", nil)
if err != nil {
return nil, err
}
x := &streamingEchoNullClient{stream}
return x, nil
}
type Streaming_EchoNullClient interface {
Send(*EchoPayload) error
CloseAndRecv() (*empty.Empty, error)
ttrpc.ClientStream
}
type streamingEchoNullClient struct {
ttrpc.ClientStream
}
func (x *streamingEchoNullClient) Send(m *EchoPayload) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamingEchoNullClient) CloseAndRecv() (*empty.Empty, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(empty.Empty)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamingClient) EchoNullStream(ctx context.Context) (Streaming_EchoNullStreamClient, error) {
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
StreamingClient: true,
StreamingServer: true,
}, "ttrpc.integration.streaming.Streaming", "EchoNullStream", nil)
if err != nil {
return nil, err
}
x := &streamingEchoNullStreamClient{stream}
return x, nil
}
type Streaming_EchoNullStreamClient interface {
Send(*EchoPayload) error
Recv() (*empty.Empty, error)
ttrpc.ClientStream
}
type streamingEchoNullStreamClient struct {
ttrpc.ClientStream
}
func (x *streamingEchoNullStreamClient) Send(m *EchoPayload) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamingEchoNullStreamClient) Recv() (*empty.Empty, error) {
m := new(empty.Empty)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

View File

@ -0,0 +1,425 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"net"
"os"
"sync"
"testing"
"time"
"github.com/containerd/ttrpc"
"github.com/containerd/ttrpc/integration/streaming"
"github.com/golang/protobuf/ptypes/empty"
)
func runService(ctx context.Context, t testing.TB, service streaming.StreamingService) (streaming.StreamingClient, func()) {
server, err := ttrpc.NewServer()
if err != nil {
t.Fatal(err)
}
streaming.RegisterStreamingService(server, service)
addr := t.Name() + ".sock"
if err := os.RemoveAll(addr); err != nil {
t.Fatal(err)
}
listener, err := net.Listen("unix", addr)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(ctx)
defer func() {
if t.Failed() {
cancel()
server.Close()
}
}()
go func() {
err := server.Serve(ctx, listener)
if err != nil && !errors.Is(err, ttrpc.ErrServerClosed) {
t.Error(err)
}
}()
conn, err := net.Dial("unix", addr)
if err != nil {
t.Fatal(err)
}
client := ttrpc.NewClient(conn)
return streaming.NewStreamingClient(client), func() {
client.Close()
server.Close()
conn.Close()
cancel()
}
}
type testStreamingService struct {
t testing.TB
}
func (tss *testStreamingService) Echo(_ context.Context, e *streaming.EchoPayload) (*streaming.EchoPayload, error) {
e.Seq++
return e, nil
}
func (tss *testStreamingService) EchoStream(_ context.Context, es streaming.Streaming_EchoStreamServer) error {
for {
var e streaming.EchoPayload
if err := es.RecvMsg(&e); err != nil {
if err == io.EOF {
return nil
}
return err
}
e.Seq++
if err := es.SendMsg(&e); err != nil {
return err
}
}
}
func (tss *testStreamingService) SumStream(_ context.Context, ss streaming.Streaming_SumStreamServer) (*streaming.Sum, error) {
var sum streaming.Sum
for {
var part streaming.Part
if err := ss.RecvMsg(&part); err != nil {
if err == io.EOF {
break
}
return nil, err
}
sum.Sum = sum.Sum + part.Add
sum.Num++
}
return &sum, nil
}
func (tss *testStreamingService) DivideStream(_ context.Context, sum *streaming.Sum, ss streaming.Streaming_DivideStreamServer) error {
parts := divideSum(sum)
for _, part := range parts {
if err := ss.Send(part); err != nil {
return err
}
}
return nil
}
func (tss *testStreamingService) EchoNull(_ context.Context, es streaming.Streaming_EchoNullServer) (*empty.Empty, error) {
msg := "non-empty empty"
for seq := uint32(0); ; seq++ {
var e streaming.EchoPayload
if err := es.RecvMsg(&e); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if e.Seq != seq {
return nil, fmt.Errorf("unexpected sequence %d, expected %d", e.Seq, seq)
}
if e.Msg != msg {
return nil, fmt.Errorf("unexpected message %q, expected %q", e.Msg, msg)
}
}
return &empty.Empty{}, nil
}
func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.Streaming_EchoNullStreamServer) error {
msg := "non-empty empty"
empty := &empty.Empty{}
var wg sync.WaitGroup
var sendErr error
var errOnce sync.Once
for seq := uint32(0); ; seq++ {
var e streaming.EchoPayload
if err := es.RecvMsg(&e); err != nil {
if err == io.EOF {
break
}
return err
}
if e.Seq != seq {
return fmt.Errorf("unexpected sequence %d, expected %d", e.Seq, seq)
}
if e.Msg != msg {
return fmt.Errorf("unexpected message %q, expected %q", e.Msg, msg)
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := es.SendMsg(empty); err != nil {
errOnce.Do(func() {
sendErr = err
})
}
}()
}
}
wg.Wait()
return sendErr
}
func TestStreamingService(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, cleanup := runService(ctx, t, &testStreamingService{t})
defer cleanup()
t.Run("Echo", echoTest(ctx, client))
t.Run("EchoStream", echoStreamTest(ctx, client))
t.Run("SumStream", sumStreamTest(ctx, client))
t.Run("DivideStream", divideStreamTest(ctx, client))
t.Run("EchoNull", echoNullTest(ctx, client))
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
}
func echoTest(ctx context.Context, client streaming.StreamingClient) func(t *testing.T) {
return func(t *testing.T) {
echo1 := &streaming.EchoPayload{
Seq: 1,
Msg: "Echo Me",
}
resp, err := client.Echo(ctx, echo1)
if err != nil {
t.Fatal(err)
}
assertNextEcho(t, echo1, resp)
}
}
func echoStreamTest(ctx context.Context, client streaming.StreamingClient) func(t *testing.T) {
return func(t *testing.T) {
stream, err := client.EchoStream(ctx)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 100; i = i + 2 {
echoi := &streaming.EchoPayload{
Seq: uint32(i),
Msg: fmt.Sprintf("%d: Echo in a stream", i),
}
if err := stream.Send(echoi); err != nil {
t.Fatal(err)
}
resp, err := stream.Recv()
if err != nil {
t.Fatal(err)
}
assertNextEcho(t, echoi, resp)
}
if err := stream.CloseSend(); err != nil {
t.Fatal(err)
}
if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("Expected io.EOF, got %v", err)
}
}
}
func sumStreamTest(ctx context.Context, client streaming.StreamingClient) func(t *testing.T) {
return func(t *testing.T) {
stream, err := client.SumStream(ctx)
if err != nil {
t.Fatal(err)
}
var sum streaming.Sum
if err := stream.Send(&streaming.Part{}); err != nil {
t.Fatal(err)
}
sum.Num++
for i := -99; i <= 100; i++ {
addi := &streaming.Part{
Add: int32(i),
}
if err := stream.Send(addi); err != nil {
t.Fatal(err)
}
sum.Sum = sum.Sum + int32(i)
sum.Num++
}
if err := stream.Send(&streaming.Part{}); err != nil {
t.Fatal(err)
}
sum.Num++
ssum, err := stream.CloseAndRecv()
if err != nil {
t.Fatal(err)
}
assertSum(t, ssum, &sum)
}
}
func divideStreamTest(ctx context.Context, client streaming.StreamingClient) func(t *testing.T) {
return func(t *testing.T) {
expected := &streaming.Sum{
Sum: 392,
Num: 30,
}
stream, err := client.DivideStream(ctx, expected)
if err != nil {
t.Fatal(err)
}
var actual streaming.Sum
for {
part, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
actual.Sum = actual.Sum + part.Add
actual.Num++
}
assertSum(t, &actual, expected)
}
}
func echoNullTest(ctx context.Context, client streaming.StreamingClient) func(t *testing.T) {
return func(t *testing.T) {
stream, err := client.EchoNull(ctx)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 100; i++ {
echoi := &streaming.EchoPayload{
Seq: uint32(i),
Msg: "non-empty empty",
}
if err := stream.Send(echoi); err != nil {
t.Fatal(err)
}
}
if _, err := stream.CloseAndRecv(); err != nil {
t.Fatal(err)
}
}
}
func echoNullStreamTest(ctx context.Context, client streaming.StreamingClient) func(t *testing.T) {
return func(t *testing.T) {
stream, err := client.EchoNullStream(ctx)
if err != nil {
t.Fatal(err)
}
var c int
wait := make(chan error)
go func() {
defer close(wait)
for {
_, err := stream.Recv()
if err != nil {
if err != io.EOF {
wait <- err
}
return
}
c++
}
}()
for i := 0; i < 100; i++ {
echoi := &streaming.EchoPayload{
Seq: uint32(i),
Msg: "non-empty empty",
}
if err := stream.Send(echoi); err != nil {
t.Fatal(err)
}
}
if err := stream.CloseSend(); err != nil {
t.Fatal(err)
}
select {
case err := <-wait:
if err != nil {
t.Fatal(err)
}
case <-time.After(time.Second * 10):
t.Fatal("did not receive EOF within 10 seconds")
}
}
}
func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
t.Helper()
if a.Msg != b.Msg {
t.Fatalf("Mismatched messages: %q != %q", a.Msg, b.Msg)
}
if b.Seq != a.Seq+1 {
t.Fatalf("Wrong sequence ID: got %d, expected %d", b.Seq, a.Seq+1)
}
}
func assertSum(t testing.TB, a, b *streaming.Sum) {
t.Helper()
if a.Sum != b.Sum {
t.Fatalf("Wrong sum %d, expected %d", a.Sum, b.Sum)
}
if a.Num != b.Num {
t.Fatalf("Wrong num %d, expected %d", a.Num, b.Num)
}
}
func divideSum(sum *streaming.Sum) []*streaming.Part {
r := rand.New(rand.NewSource(14))
var total int32
parts := make([]*streaming.Part, sum.Num)
for i := int32(1); i < sum.Num-2; i++ {
add := r.Int31()%1000 - 500
parts[i] = &streaming.Part{
Add: add,
}
total = total + add
}
parts[0] = &streaming.Part{}
parts[sum.Num-2] = &streaming.Part{
Add: sum.Sum - total,
}
parts[sum.Num-1] = &streaming.Part{}
return parts
}