go.mod: github.com/grpc-ecosystem/go-grpc-middleware v1.4.0

https://github.com/grpc-ecosystem/go-grpc-middleware/compare/v1.3.0...v1.4.0

Signed-off-by: Akihiro Suda <akihiro.suda.cz@hco.ntt.co.jp>
This commit is contained in:
Akihiro Suda
2023-07-23 02:40:22 +09:00
parent 0f033b6125
commit 4bda0a69e2
107 changed files with 20716 additions and 2960 deletions

View File

@@ -43,7 +43,6 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport"
@@ -146,7 +145,7 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData
serverWorkerChannels []chan *serverWorkerData
serverWorkerChannel chan *serverWorkerData
}
type serverOptions struct {
@@ -561,40 +560,38 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
const serverWorkerResetThreshold = 1 << 16
// serverWorkers blocks on a *transport.Stream channel forever and waits for
// data to be fed by serveStreams. This allows different requests to be
// data to be fed by serveStreams. This allows multiple requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker(ch chan *serverWorkerData) {
// To make sure all server workers don't reset at the same time, choose a
// random number of iterations before resetting.
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for completed := 0; completed < threshold; completed++ {
data, ok := <-ch
func (s *Server) serverWorker() {
for completed := 0; completed < serverWorkerResetThreshold; completed++ {
data, ok := <-s.serverWorkerChannel
if !ok {
return
}
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
data.wg.Done()
s.handleSingleStream(data)
}
go s.serverWorker(ch)
go s.serverWorker()
}
// initServerWorkers creates worker goroutines and channels to process incoming
func (s *Server) handleSingleStream(data *serverWorkerData) {
defer data.wg.Done()
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
}
// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
s.serverWorkerChannel = make(chan *serverWorkerData)
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
go s.serverWorker(s.serverWorkerChannels[i])
go s.serverWorker()
}
}
func (s *Server) stopServerWorkers() {
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
close(s.serverWorkerChannels[i])
}
close(s.serverWorkerChannel)
}
// NewServer creates a gRPC server which has no service registered and has not
@@ -898,7 +895,7 @@ func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain()
st.Drain("")
}
s.mu.Unlock()
}
@@ -946,26 +943,21 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close(errors.New("finished serving streams for the server transport"))
var wg sync.WaitGroup
var roundRobinCounter uint32
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
case s.serverWorkerChannel <- data:
return
default:
// If all stream workers are busy, fallback to the default code path.
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
@@ -1054,7 +1046,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
st.Drain()
st.Drain("")
}
if s.conns[addr] == nil {
@@ -1864,7 +1856,7 @@ func (s *Server) GracefulStop() {
if !s.drain {
for _, conns := range s.conns {
for st := range conns {
st.Drain()
st.Drain("graceful_stop")
}
}
s.drain = true