From c51165f20d52c5b19c76cd8a1d16ead701146e89 Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Tue, 9 May 2023 11:40:38 +0800 Subject: [PATCH] First process the pending messages in recv channel Signed-off-by: Iceber Gu --- client.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index b73116b..4b1e1e7 100644 --- a/client.go +++ b/client.go @@ -483,23 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err } defer c.deleteStream(s) + var msg *streamMessage select { case <-ctx.Done(): return ctx.Err() case <-c.ctx.Done(): return ErrClosed case <-s.recvClose: - return s.recvErr - case msg := <-s.recv: - if msg.header.Type == messageTypeResponse { - err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) - } else { - err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + // If recv has a pending message, process that first + select { + case msg = <-s.recv: + default: + return s.recvErr } - - // return the payload buffer for reuse - c.channel.putmbuf(msg.payload) - - return err + case msg = <-s.recv: } + + if msg.header.Type == messageTypeResponse { + err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) + } else { + err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + } + + // return the payload buffer for reuse + c.channel.putmbuf(msg.payload) + + return err }