Add integration test to reproduce issue with empty payloads
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
@@ -15,6 +15,7 @@ type TTRPCStreamingService interface {
|
||||
DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error
|
||||
EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error)
|
||||
EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error
|
||||
EmptyPayloadStream(context.Context, *emptypb.Empty, TTRPCStreaming_EmptyPayloadStreamServer) error
|
||||
}
|
||||
|
||||
type TTRPCStreaming_EchoStreamServer interface {
|
||||
@@ -108,6 +109,19 @@ func (x *ttrpcstreamingEchoNullStreamServer) Recv() (*EchoPayload, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
type TTRPCStreaming_EmptyPayloadStreamServer interface {
|
||||
Send(*EchoPayload) error
|
||||
ttrpc.StreamServer
|
||||
}
|
||||
|
||||
type ttrpcstreamingEmptyPayloadStreamServer struct {
|
||||
ttrpc.StreamServer
|
||||
}
|
||||
|
||||
func (x *ttrpcstreamingEmptyPayloadStreamServer) Send(m *EchoPayload) error {
|
||||
return x.StreamServer.SendMsg(m)
|
||||
}
|
||||
|
||||
func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) {
|
||||
srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
|
||||
Methods: map[string]ttrpc.Method{
|
||||
@@ -159,6 +173,17 @@ func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService)
|
||||
StreamingClient: true,
|
||||
StreamingServer: true,
|
||||
},
|
||||
"EmptyPayloadStream": {
|
||||
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
|
||||
m := new(emptypb.Empty)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, svc.EmptyPayloadStream(ctx, m, &ttrpcstreamingEmptyPayloadStreamServer{stream})
|
||||
},
|
||||
StreamingClient: false,
|
||||
StreamingServer: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -170,6 +195,7 @@ type TTRPCStreamingClient interface {
|
||||
DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error)
|
||||
EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error)
|
||||
EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error)
|
||||
EmptyPayloadStream(context.Context, *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error)
|
||||
}
|
||||
|
||||
type ttrpcstreamingClient struct {
|
||||
@@ -360,3 +386,32 @@ func (x *ttrpcstreamingEchoNullStreamClient) Recv() (*emptypb.Empty, error) {
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (c *ttrpcstreamingClient) EmptyPayloadStream(ctx context.Context, req *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error) {
|
||||
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
|
||||
StreamingClient: false,
|
||||
StreamingServer: true,
|
||||
}, "ttrpc.integration.streaming.Streaming", "EmptyPayloadStream", req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &ttrpcstreamingEmptyPayloadStreamClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type TTRPCStreaming_EmptyPayloadStreamClient interface {
|
||||
Recv() (*EchoPayload, error)
|
||||
ttrpc.ClientStream
|
||||
}
|
||||
|
||||
type ttrpcstreamingEmptyPayloadStreamClient struct {
|
||||
ttrpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *ttrpcstreamingEmptyPayloadStreamClient) Recv() (*EchoPayload, error) {
|
||||
m := new(EchoPayload)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user