First process the pending messages in recv channel
Signed-off-by: Iceber Gu <wei.cai-nat@daocloud.io>
This commit is contained in:
parent
0ca69a9ca2
commit
c51165f20d
29
client.go
29
client.go
@ -483,23 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
|
|||||||
}
|
}
|
||||||
defer c.deleteStream(s)
|
defer c.deleteStream(s)
|
||||||
|
|
||||||
|
var msg *streamMessage
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case <-c.ctx.Done():
|
case <-c.ctx.Done():
|
||||||
return ErrClosed
|
return ErrClosed
|
||||||
case <-s.recvClose:
|
case <-s.recvClose:
|
||||||
return s.recvErr
|
// If recv has a pending message, process that first
|
||||||
case msg := <-s.recv:
|
select {
|
||||||
if msg.header.Type == messageTypeResponse {
|
case msg = <-s.recv:
|
||||||
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
|
default:
|
||||||
} else {
|
return s.recvErr
|
||||||
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
|
|
||||||
}
|
}
|
||||||
|
case msg = <-s.recv:
|
||||||
// return the payload buffer for reuse
|
|
||||||
c.channel.putmbuf(msg.payload)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if msg.header.Type == messageTypeResponse {
|
||||||
|
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the payload buffer for reuse
|
||||||
|
c.channel.putmbuf(msg.payload)
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user