Merge pull request #144 from Iceber/stream_recv_channel

First process the pending messages in recv channel
This commit is contained in:
Fu Wei 2023-05-09 13:11:40 +08:00 committed by GitHub
commit ac26f8cbea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -483,23 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
} }
defer c.deleteStream(s) defer c.deleteStream(s)
var msg *streamMessage
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-c.ctx.Done(): case <-c.ctx.Done():
return ErrClosed return ErrClosed
case <-s.recvClose: case <-s.recvClose:
return s.recvErr // If recv has a pending message, process that first
case msg := <-s.recv: select {
if msg.header.Type == messageTypeResponse { case msg = <-s.recv:
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) default:
} else { return s.recvErr
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
} }
case msg = <-s.recv:
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
} }
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
} }