stream: fix the map of streams leak

In a connection, streams are added only, but not deleted.
When replying to the last data, delete the stream from map.

And delete operations require concurrency.

Signed-off-by: wllenyj <wllenyj@linux.alibaba.com>
This commit is contained in:
wllenyj 2022-06-07 02:06:13 +08:00
parent 74421d1018
commit 660ded4433

View File

@ -318,6 +318,7 @@ func (c *serverConn) run(sctx context.Context) {
responses = make(chan response)
recvErr = make(chan error, 1)
done = make(chan struct{})
streams = sync.Map{}
active int32
lastStreamID uint32
)
@ -347,7 +348,6 @@ func (c *serverConn) run(sctx context.Context) {
go func(recvErr chan error) {
defer close(recvErr)
streams := map[uint32]*streamHandler{}
for {
select {
case <-c.shutdown:
@ -383,12 +383,13 @@ func (c *serverConn) run(sctx context.Context) {
}
if mh.Type == messageTypeData {
sh, ok := streams[mh.StreamID]
i, ok := streams.Load(mh.StreamID)
if !ok {
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID is no longer active")) {
return
}
}
sh := i.(*streamHandler)
if mh.Flags&flagNoData != flagNoData {
unmarshal := func(obj interface{}) error {
err := protoUnmarshal(p, obj)
@ -458,7 +459,7 @@ func (c *serverConn) run(sctx context.Context) {
continue
}
streams[id] = sh
streams.Store(id, sh)
atomic.AddInt32(&active, 1)
}
// TODO: else we must ignore this for future compat. log this?
@ -518,6 +519,7 @@ func (c *serverConn) run(sctx context.Context) {
// The ttrpc protocol currently does not support the case where
// the server is localClosed but not remoteClosed. Once the server
// is closing, the whole stream may be considered finished
streams.Delete(response.id)
atomic.AddInt32(&active, -1)
}
case err := <-recvErr: