Merge pull request #157 from mxpv/empty_payload
Fix streaming with empty payloads
This commit is contained in:
commit
90d421ee7e
@ -211,7 +211,7 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
|
|||||||
0x52, 0x03, 0x61, 0x64, 0x64, 0x22, 0x29, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03,
|
0x52, 0x03, 0x61, 0x64, 0x64, 0x22, 0x29, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03,
|
||||||
0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x75, 0x6d, 0x12, 0x10,
|
0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x75, 0x6d, 0x12, 0x10,
|
||||||
0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6e, 0x75, 0x6d,
|
0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6e, 0x75, 0x6d,
|
||||||
0x32, 0xa0, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
|
0x32, 0xfa, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
|
||||||
0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69,
|
0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69,
|
||||||
0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
|
0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
|
||||||
0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
|
0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
|
||||||
@ -245,11 +245,17 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
|
|||||||
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f,
|
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f,
|
||||||
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
|
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
|
||||||
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28,
|
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28,
|
||||||
0x01, 0x30, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
|
0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x12, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x50, 0x61, 0x79, 0x6c,
|
||||||
0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72,
|
0x6f, 0x61, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
|
||||||
0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73,
|
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
|
||||||
0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
|
0x79, 0x1a, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72,
|
||||||
0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e,
|
||||||
|
0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x30, 0x01, 0x42, 0x3d, 0x5a,
|
||||||
|
0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74,
|
||||||
|
0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74,
|
||||||
|
0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
|
||||||
|
0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72,
|
||||||
|
0x6f, 0x74, 0x6f, 0x33,
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -278,14 +284,16 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs =
|
|||||||
2, // 3: ttrpc.integration.streaming.Streaming.DivideStream:input_type -> ttrpc.integration.streaming.Sum
|
2, // 3: ttrpc.integration.streaming.Streaming.DivideStream:input_type -> ttrpc.integration.streaming.Sum
|
||||||
0, // 4: ttrpc.integration.streaming.Streaming.EchoNull:input_type -> ttrpc.integration.streaming.EchoPayload
|
0, // 4: ttrpc.integration.streaming.Streaming.EchoNull:input_type -> ttrpc.integration.streaming.EchoPayload
|
||||||
0, // 5: ttrpc.integration.streaming.Streaming.EchoNullStream:input_type -> ttrpc.integration.streaming.EchoPayload
|
0, // 5: ttrpc.integration.streaming.Streaming.EchoNullStream:input_type -> ttrpc.integration.streaming.EchoPayload
|
||||||
0, // 6: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
|
3, // 6: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:input_type -> google.protobuf.Empty
|
||||||
0, // 7: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
|
0, // 7: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||||
2, // 8: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
|
0, // 8: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||||
1, // 9: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
|
2, // 9: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
|
||||||
3, // 10: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
|
1, // 10: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
|
||||||
3, // 11: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
|
3, // 11: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
|
||||||
6, // [6:12] is the sub-list for method output_type
|
3, // 12: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
|
||||||
0, // [0:6] is the sub-list for method input_type
|
0, // 13: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||||
|
7, // [7:14] is the sub-list for method output_type
|
||||||
|
0, // [0:7] is the sub-list for method input_type
|
||||||
0, // [0:0] is the sub-list for extension type_name
|
0, // [0:0] is the sub-list for extension type_name
|
||||||
0, // [0:0] is the sub-list for extension extendee
|
0, // [0:0] is the sub-list for extension extendee
|
||||||
0, // [0:0] is the sub-list for field type_name
|
0, // [0:0] is the sub-list for field type_name
|
||||||
|
@ -34,6 +34,7 @@ service Streaming {
|
|||||||
rpc DivideStream(Sum) returns (stream Part);
|
rpc DivideStream(Sum) returns (stream Part);
|
||||||
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
|
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
|
||||||
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
|
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
|
||||||
|
rpc EmptyPayloadStream(google.protobuf.Empty) returns (stream EchoPayload);
|
||||||
}
|
}
|
||||||
|
|
||||||
message EchoPayload {
|
message EchoPayload {
|
||||||
|
@ -15,6 +15,7 @@ type TTRPCStreamingService interface {
|
|||||||
DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error
|
DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error
|
||||||
EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error)
|
EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error)
|
||||||
EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error
|
EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error
|
||||||
|
EmptyPayloadStream(context.Context, *emptypb.Empty, TTRPCStreaming_EmptyPayloadStreamServer) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type TTRPCStreaming_EchoStreamServer interface {
|
type TTRPCStreaming_EchoStreamServer interface {
|
||||||
@ -108,6 +109,19 @@ func (x *ttrpcstreamingEchoNullStreamServer) Recv() (*EchoPayload, error) {
|
|||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TTRPCStreaming_EmptyPayloadStreamServer interface {
|
||||||
|
Send(*EchoPayload) error
|
||||||
|
ttrpc.StreamServer
|
||||||
|
}
|
||||||
|
|
||||||
|
type ttrpcstreamingEmptyPayloadStreamServer struct {
|
||||||
|
ttrpc.StreamServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ttrpcstreamingEmptyPayloadStreamServer) Send(m *EchoPayload) error {
|
||||||
|
return x.StreamServer.SendMsg(m)
|
||||||
|
}
|
||||||
|
|
||||||
func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) {
|
func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) {
|
||||||
srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
|
srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
|
||||||
Methods: map[string]ttrpc.Method{
|
Methods: map[string]ttrpc.Method{
|
||||||
@ -159,6 +173,17 @@ func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService)
|
|||||||
StreamingClient: true,
|
StreamingClient: true,
|
||||||
StreamingServer: true,
|
StreamingServer: true,
|
||||||
},
|
},
|
||||||
|
"EmptyPayloadStream": {
|
||||||
|
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
|
||||||
|
m := new(emptypb.Empty)
|
||||||
|
if err := stream.RecvMsg(m); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return nil, svc.EmptyPayloadStream(ctx, m, &ttrpcstreamingEmptyPayloadStreamServer{stream})
|
||||||
|
},
|
||||||
|
StreamingClient: false,
|
||||||
|
StreamingServer: true,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -170,6 +195,7 @@ type TTRPCStreamingClient interface {
|
|||||||
DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error)
|
DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error)
|
||||||
EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error)
|
EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error)
|
||||||
EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error)
|
EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error)
|
||||||
|
EmptyPayloadStream(context.Context, *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ttrpcstreamingClient struct {
|
type ttrpcstreamingClient struct {
|
||||||
@ -360,3 +386,32 @@ func (x *ttrpcstreamingEchoNullStreamClient) Recv() (*emptypb.Empty, error) {
|
|||||||
}
|
}
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ttrpcstreamingClient) EmptyPayloadStream(ctx context.Context, req *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error) {
|
||||||
|
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
|
||||||
|
StreamingClient: false,
|
||||||
|
StreamingServer: true,
|
||||||
|
}, "ttrpc.integration.streaming.Streaming", "EmptyPayloadStream", req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
x := &ttrpcstreamingEmptyPayloadStreamClient{stream}
|
||||||
|
return x, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type TTRPCStreaming_EmptyPayloadStreamClient interface {
|
||||||
|
Recv() (*EchoPayload, error)
|
||||||
|
ttrpc.ClientStream
|
||||||
|
}
|
||||||
|
|
||||||
|
type ttrpcstreamingEmptyPayloadStreamClient struct {
|
||||||
|
ttrpc.ClientStream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *ttrpcstreamingEmptyPayloadStreamClient) Recv() (*EchoPayload, error) {
|
||||||
|
m := new(EchoPayload)
|
||||||
|
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/containerd/ttrpc"
|
"github.com/containerd/ttrpc"
|
||||||
"github.com/containerd/ttrpc/integration/streaming"
|
"github.com/containerd/ttrpc/integration/streaming"
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
|
"google.golang.org/protobuf/types/known/emptypb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) {
|
func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) {
|
||||||
@ -190,6 +191,14 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.
|
|||||||
return sendErr
|
return sendErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tss *testStreamingService) EmptyPayloadStream(_ context.Context, _ *emptypb.Empty, streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer) error {
|
||||||
|
if err := streamer.Send(&streaming.EchoPayload{Seq: 1}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return streamer.Send(&streaming.EchoPayload{Seq: 2})
|
||||||
|
}
|
||||||
|
|
||||||
func TestStreamingService(t *testing.T) {
|
func TestStreamingService(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -203,6 +212,7 @@ func TestStreamingService(t *testing.T) {
|
|||||||
t.Run("DivideStream", divideStreamTest(ctx, client))
|
t.Run("DivideStream", divideStreamTest(ctx, client))
|
||||||
t.Run("EchoNull", echoNullTest(ctx, client))
|
t.Run("EchoNull", echoNullTest(ctx, client))
|
||||||
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
|
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
|
||||||
|
t.Run("EmptyPayloadStream", emptyPayloadStream(ctx, client))
|
||||||
}
|
}
|
||||||
|
|
||||||
func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
|
func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
|
||||||
@ -385,6 +395,33 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func emptyPayloadStream(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
stream, err := client.EmptyPayloadStream(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := uint32(1); i < 3; i++ {
|
||||||
|
first, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if first.Seq != i {
|
||||||
|
t.Fatalf("unexpected seq: %d != %d", first.Seq, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := stream.Recv(); err != io.EOF {
|
||||||
|
t.Fatalf("Expected io.EOF, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
|
func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if a.Msg != b.Msg {
|
if a.Msg != b.Msg {
|
||||||
|
@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
|
|||||||
respond(st, p, stream.StreamingServer, true)
|
respond(st, p, stream.StreamingServer, true)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if req.Payload != nil {
|
// Empty proto messages serialized to 0 payloads,
|
||||||
|
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
|
||||||
|
// don't get invoked here, which causes hang on client side.
|
||||||
|
// See https://github.com/containerd/ttrpc/issues/126
|
||||||
|
if req.Payload != nil || !info.StreamingClient {
|
||||||
unmarshal := func(obj interface{}) error {
|
unmarshal := func(obj interface{}) error {
|
||||||
return protoUnmarshal(req.Payload, obj)
|
return protoUnmarshal(req.Payload, obj)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user