Merge pull request #168 from kevpar/deadlock
client: Fix deadlock when writing to pipe blocks
This commit is contained in:
commit
ef5734239e
37
client.go
37
client.go
@ -386,25 +386,44 @@ func (c *Client) receiveLoop() error {
|
|||||||
// createStream creates a new stream and registers it with the client
|
// createStream creates a new stream and registers it with the client
|
||||||
// Introduce stream types for multiple or single response
|
// Introduce stream types for multiple or single response
|
||||||
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
|
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
|
||||||
c.streamLock.Lock()
|
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
|
||||||
|
// This ensures that new stream IDs sent on the wire are always increasing, which is a
|
||||||
|
// requirement of the TTRPC protocol.
|
||||||
|
// This use of sendLock could be split into another mutex that covers stream creation + first send,
|
||||||
|
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
|
||||||
|
c.sendLock.Lock()
|
||||||
|
defer c.sendLock.Unlock()
|
||||||
|
|
||||||
// Check if closed since lock acquired to prevent adding
|
// Check if closed since lock acquired to prevent adding
|
||||||
// anything after cleanup completes
|
// anything after cleanup completes
|
||||||
select {
|
select {
|
||||||
case <-c.ctx.Done():
|
case <-c.ctx.Done():
|
||||||
c.streamLock.Unlock()
|
|
||||||
return nil, ErrClosed
|
return nil, ErrClosed
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream ID should be allocated at same time
|
var s *stream
|
||||||
s := newStream(c.nextStreamID, c)
|
if err := func() error {
|
||||||
c.streams[s.id] = s
|
// In the future this could be replaced with a sync.Map instead of streamLock+map.
|
||||||
c.nextStreamID = c.nextStreamID + 2
|
c.streamLock.Lock()
|
||||||
|
defer c.streamLock.Unlock()
|
||||||
|
|
||||||
c.sendLock.Lock()
|
// Check if closed since lock acquired to prevent adding
|
||||||
defer c.sendLock.Unlock()
|
// anything after cleanup completes
|
||||||
c.streamLock.Unlock()
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return ErrClosed
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
s = newStream(c.nextStreamID, c)
|
||||||
|
c.streams[s.id] = s
|
||||||
|
c.nextStreamID = c.nextStreamID + 2
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
|
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
|
||||||
return s, filterCloseErr(err)
|
return s, filterCloseErr(err)
|
||||||
|
Loading…
Reference in New Issue
Block a user