Merge pull request #120 from wllenyj/fix-streams

stream: fix the map of streams leak
This commit is contained in:
Derek McGowan 2022-09-28 22:34:27 -07:00 committed by GitHub
commit 89444d66c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -318,6 +318,7 @@ func (c *serverConn) run(sctx context.Context) {
responses = make(chan response) responses = make(chan response)
recvErr = make(chan error, 1) recvErr = make(chan error, 1)
done = make(chan struct{}) done = make(chan struct{})
streams = sync.Map{}
active int32 active int32
lastStreamID uint32 lastStreamID uint32
) )
@ -347,7 +348,6 @@ func (c *serverConn) run(sctx context.Context) {
go func(recvErr chan error) { go func(recvErr chan error) {
defer close(recvErr) defer close(recvErr)
streams := map[uint32]*streamHandler{}
for { for {
select { select {
case <-c.shutdown: case <-c.shutdown:
@ -383,12 +383,13 @@ func (c *serverConn) run(sctx context.Context) {
} }
if mh.Type == messageTypeData { if mh.Type == messageTypeData {
sh, ok := streams[mh.StreamID] i, ok := streams.Load(mh.StreamID)
if !ok { if !ok {
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID is no longer active")) { if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID is no longer active")) {
return return
} }
} }
sh := i.(*streamHandler)
if mh.Flags&flagNoData != flagNoData { if mh.Flags&flagNoData != flagNoData {
unmarshal := func(obj interface{}) error { unmarshal := func(obj interface{}) error {
err := protoUnmarshal(p, obj) err := protoUnmarshal(p, obj)
@ -458,7 +459,7 @@ func (c *serverConn) run(sctx context.Context) {
continue continue
} }
streams[id] = sh streams.Store(id, sh)
atomic.AddInt32(&active, 1) atomic.AddInt32(&active, 1)
} }
// TODO: else we must ignore this for future compat. log this? // TODO: else we must ignore this for future compat. log this?
@ -518,6 +519,7 @@ func (c *serverConn) run(sctx context.Context) {
// The ttrpc protocol currently does not support the case where // The ttrpc protocol currently does not support the case where
// the server is localClosed but not remoteClosed. Once the server // the server is localClosed but not remoteClosed. Once the server
// is closing, the whole stream may be considered finished // is closing, the whole stream may be considered finished
streams.Delete(response.id)
atomic.AddInt32(&active, -1) atomic.AddInt32(&active, -1)
} }
case err := <-recvErr: case err := <-recvErr: