diff --git a/server.go b/server.go index 0e65122..b1859b8 100644 --- a/server.go +++ b/server.go @@ -318,6 +318,7 @@ func (c *serverConn) run(sctx context.Context) { responses = make(chan response) recvErr = make(chan error, 1) done = make(chan struct{}) + streams = sync.Map{} active int32 lastStreamID uint32 ) @@ -347,7 +348,6 @@ func (c *serverConn) run(sctx context.Context) { go func(recvErr chan error) { defer close(recvErr) - streams := map[uint32]*streamHandler{} for { select { case <-c.shutdown: @@ -383,12 +383,13 @@ func (c *serverConn) run(sctx context.Context) { } if mh.Type == messageTypeData { - sh, ok := streams[mh.StreamID] + i, ok := streams.Load(mh.StreamID) if !ok { if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID is no longer active")) { return } } + sh := i.(*streamHandler) if mh.Flags&flagNoData != flagNoData { unmarshal := func(obj interface{}) error { err := protoUnmarshal(p, obj) @@ -458,7 +459,7 @@ func (c *serverConn) run(sctx context.Context) { continue } - streams[id] = sh + streams.Store(id, sh) atomic.AddInt32(&active, 1) } // TODO: else we must ignore this for future compat. log this? @@ -518,6 +519,7 @@ func (c *serverConn) run(sctx context.Context) { // The ttrpc protocol currently does not support the case where // the server is localClosed but not remoteClosed. Once the server // is closing, the whole stream may be considered finished + streams.Delete(response.id) atomic.AddInt32(&active, -1) } case err := <-recvErr: