Update google.golang.org/grpc from v1.27.1 to v1.38.0
v1.38.0 is used by Kubernetes since https://github.com/kubernetes/kubernetes/pull/100488. Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
This commit is contained in:
		 Kazuyoshi Kato
					Kazuyoshi Kato
				
			
				
					committed by
					
						 Davanum Srinivas
						Davanum Srinivas
					
				
			
			
				
	
			
			
			 Davanum Srinivas
						Davanum Srinivas
					
				
			
						parent
						
							8d2e156ddb
						
					
				
				
					commit
					b7e79dc5ab
				
			
							
								
								
									
										506
									
								
								vendor/google.golang.org/grpc/server.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										506
									
								
								vendor/google.golang.org/grpc/server.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -40,8 +40,10 @@ import ( | ||||
| 	"google.golang.org/grpc/encoding" | ||||
| 	"google.golang.org/grpc/encoding/proto" | ||||
| 	"google.golang.org/grpc/grpclog" | ||||
| 	"google.golang.org/grpc/internal" | ||||
| 	"google.golang.org/grpc/internal/binarylog" | ||||
| 	"google.golang.org/grpc/internal/channelz" | ||||
| 	"google.golang.org/grpc/internal/grpcrand" | ||||
| 	"google.golang.org/grpc/internal/grpcsync" | ||||
| 	"google.golang.org/grpc/internal/transport" | ||||
| 	"google.golang.org/grpc/keepalive" | ||||
| @@ -55,9 +57,26 @@ import ( | ||||
| const ( | ||||
| 	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 | ||||
| 	defaultServerMaxSendMessageSize    = math.MaxInt32 | ||||
|  | ||||
| 	// Server transports are tracked in a map which is keyed on listener | ||||
| 	// address. For regular gRPC traffic, connections are accepted in Serve() | ||||
| 	// through a call to Accept(), and we use the actual listener address as key | ||||
| 	// when we add it to the map. But for connections received through | ||||
| 	// ServeHTTP(), we do not have a listener and hence use this dummy value. | ||||
| 	listenerAddressForServeHTTP = "listenerAddressForServeHTTP" | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { | ||||
| 		return srv.opts.creds | ||||
| 	} | ||||
| 	internal.DrainServerTransports = func(srv *Server, addr string) { | ||||
| 		srv.drainServerTransports(addr) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| var statusOK = status.New(codes.OK, "") | ||||
| var logger = grpclog.Component("core") | ||||
|  | ||||
| type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) | ||||
|  | ||||
| @@ -78,27 +97,37 @@ type ServiceDesc struct { | ||||
| 	Metadata    interface{} | ||||
| } | ||||
|  | ||||
| // service consists of the information of the server serving this service and | ||||
| // the methods in this service. | ||||
| type service struct { | ||||
| 	server interface{} // the server for service methods | ||||
| 	md     map[string]*MethodDesc | ||||
| 	sd     map[string]*StreamDesc | ||||
| 	mdata  interface{} | ||||
| // serviceInfo wraps information about a service. It is very similar to | ||||
| // ServiceDesc and is constructed from it for internal purposes. | ||||
| type serviceInfo struct { | ||||
| 	// Contains the implementation for the methods in this service. | ||||
| 	serviceImpl interface{} | ||||
| 	methods     map[string]*MethodDesc | ||||
| 	streams     map[string]*StreamDesc | ||||
| 	mdata       interface{} | ||||
| } | ||||
|  | ||||
| type serverWorkerData struct { | ||||
| 	st     transport.ServerTransport | ||||
| 	wg     *sync.WaitGroup | ||||
| 	stream *transport.Stream | ||||
| } | ||||
|  | ||||
| // Server is a gRPC server to serve RPC requests. | ||||
| type Server struct { | ||||
| 	opts serverOptions | ||||
|  | ||||
| 	mu     sync.Mutex // guards following | ||||
| 	lis    map[net.Listener]bool | ||||
| 	conns  map[transport.ServerTransport]bool | ||||
| 	serve  bool | ||||
| 	drain  bool | ||||
| 	cv     *sync.Cond          // signaled when connections close for GracefulStop | ||||
| 	m      map[string]*service // service name -> service info | ||||
| 	events trace.EventLog | ||||
| 	mu  sync.Mutex // guards following | ||||
| 	lis map[net.Listener]bool | ||||
| 	// conns contains all active server transports. It is a map keyed on a | ||||
| 	// listener address with the value being the set of active transports | ||||
| 	// belonging to that listener. | ||||
| 	conns    map[string]map[transport.ServerTransport]bool | ||||
| 	serve    bool | ||||
| 	drain    bool | ||||
| 	cv       *sync.Cond              // signaled when connections close for GracefulStop | ||||
| 	services map[string]*serviceInfo // service name -> service info | ||||
| 	events   trace.EventLog | ||||
|  | ||||
| 	quit               *grpcsync.Event | ||||
| 	done               *grpcsync.Event | ||||
| @@ -107,6 +136,8 @@ type Server struct { | ||||
|  | ||||
| 	channelzID int64 // channelz unique identification number | ||||
| 	czData     *channelzData | ||||
|  | ||||
| 	serverWorkerChannels []chan *serverWorkerData | ||||
| } | ||||
|  | ||||
| type serverOptions struct { | ||||
| @@ -116,6 +147,8 @@ type serverOptions struct { | ||||
| 	dc                    Decompressor | ||||
| 	unaryInt              UnaryServerInterceptor | ||||
| 	streamInt             StreamServerInterceptor | ||||
| 	chainUnaryInts        []UnaryServerInterceptor | ||||
| 	chainStreamInts       []StreamServerInterceptor | ||||
| 	inTapHandle           tap.ServerInHandle | ||||
| 	statsHandler          stats.Handler | ||||
| 	maxConcurrentStreams  uint32 | ||||
| @@ -131,6 +164,7 @@ type serverOptions struct { | ||||
| 	connectionTimeout     time.Duration | ||||
| 	maxHeaderListSize     *uint32 | ||||
| 	headerTableSize       *uint32 | ||||
| 	numServerWorkers      uint32 | ||||
| } | ||||
|  | ||||
| var defaultServerOptions = serverOptions{ | ||||
| @@ -149,7 +183,10 @@ type ServerOption interface { | ||||
| // EmptyServerOption does not alter the server configuration. It can be embedded | ||||
| // in another structure to build custom server options. | ||||
| // | ||||
| // This API is EXPERIMENTAL. | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This type is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| type EmptyServerOption struct{} | ||||
|  | ||||
| func (EmptyServerOption) apply(*serverOptions) {} | ||||
| @@ -211,7 +248,7 @@ func InitialConnWindowSize(s int32) ServerOption { | ||||
| // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. | ||||
| func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { | ||||
| 	if kp.Time > 0 && kp.Time < time.Second { | ||||
| 		grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s") | ||||
| 		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s") | ||||
| 		kp.Time = time.Second | ||||
| 	} | ||||
|  | ||||
| @@ -230,19 +267,55 @@ func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { | ||||
| // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. | ||||
| // | ||||
| // This will override any lookups by content-subtype for Codecs registered with RegisterCodec. | ||||
| // | ||||
| // Deprecated: register codecs using encoding.RegisterCodec. The server will | ||||
| // automatically use registered codecs based on the incoming requests' headers. | ||||
| // See also | ||||
| // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. | ||||
| // Will be supported throughout 1.x. | ||||
| func CustomCodec(codec Codec) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.codec = codec | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // ForceServerCodec returns a ServerOption that sets a codec for message | ||||
| // marshaling and unmarshaling. | ||||
| // | ||||
| // This will override any lookups by content-subtype for Codecs registered | ||||
| // with RegisterCodec. | ||||
| // | ||||
| // See Content-Type on | ||||
| // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for | ||||
| // more details. Also see the documentation on RegisterCodec and | ||||
| // CallContentSubtype for more details on the interaction between encoding.Codec | ||||
| // and content-subtype. | ||||
| // | ||||
| // This function is provided for advanced users; prefer to register codecs | ||||
| // using encoding.RegisterCodec. | ||||
| // The server will automatically use registered codecs based on the incoming | ||||
| // requests' headers. See also | ||||
| // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. | ||||
| // Will be supported throughout 1.x. | ||||
| // | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func ForceServerCodec(codec encoding.Codec) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.codec = codec | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // RPCCompressor returns a ServerOption that sets a compressor for outbound | ||||
| // messages.  For backward compatibility, all outbound messages will be sent | ||||
| // using this compressor, regardless of incoming message compression.  By | ||||
| // default, server messages will be sent using the same compressor with which | ||||
| // request messages were sent. | ||||
| // | ||||
| // Deprecated: use encoding.RegisterCompressor instead. | ||||
| // Deprecated: use encoding.RegisterCompressor instead. Will be supported | ||||
| // throughout 1.x. | ||||
| func RPCCompressor(cp Compressor) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.cp = cp | ||||
| @@ -253,7 +326,8 @@ func RPCCompressor(cp Compressor) ServerOption { | ||||
| // messages.  It has higher priority than decompressors registered via | ||||
| // encoding.RegisterCompressor. | ||||
| // | ||||
| // Deprecated: use encoding.RegisterCompressor instead. | ||||
| // Deprecated: use encoding.RegisterCompressor instead. Will be supported | ||||
| // throughout 1.x. | ||||
| func RPCDecompressor(dc Decompressor) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.dc = dc | ||||
| @@ -263,7 +337,7 @@ func RPCDecompressor(dc Decompressor) ServerOption { | ||||
| // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. | ||||
| // If this is not set, gRPC uses the default limit. | ||||
| // | ||||
| // Deprecated: use MaxRecvMsgSize instead. | ||||
| // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x. | ||||
| func MaxMsgSize(m int) ServerOption { | ||||
| 	return MaxRecvMsgSize(m) | ||||
| } | ||||
| @@ -311,6 +385,16 @@ func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor | ||||
| // for unary RPCs. The first interceptor will be the outer most, | ||||
| // while the last interceptor will be the inner most wrapper around the real call. | ||||
| // All unary interceptors added by this method will be chained. | ||||
| func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the | ||||
| // server. Only one stream interceptor can be installed. | ||||
| func StreamInterceptor(i StreamServerInterceptor) ServerOption { | ||||
| @@ -322,8 +406,23 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption { | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor | ||||
| // for streaming RPCs. The first interceptor will be the outer most, | ||||
| // while the last interceptor will be the inner most wrapper around the real call. | ||||
| // All stream interceptors added by this method will be chained. | ||||
| func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.chainStreamInts = append(o.chainStreamInts, interceptors...) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // InTapHandle returns a ServerOption that sets the tap handle for all the server | ||||
| // transport to be created. Only one can be installed. | ||||
| // | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func InTapHandle(h tap.ServerInHandle) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		if o.inTapHandle != nil { | ||||
| @@ -363,7 +462,10 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { | ||||
| // new connections.  If this is not set, the default is 120 seconds.  A zero or | ||||
| // negative value will result in an immediate timeout. | ||||
| // | ||||
| // This API is EXPERIMENTAL. | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func ConnectionTimeout(d time.Duration) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.connectionTimeout = d | ||||
| @@ -381,13 +483,79 @@ func MaxHeaderListSize(s uint32) ServerOption { | ||||
| // HeaderTableSize returns a ServerOption that sets the size of dynamic | ||||
| // header table for stream. | ||||
| // | ||||
| // This API is EXPERIMENTAL. | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func HeaderTableSize(s uint32) ServerOption { | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.headerTableSize = &s | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // NumStreamWorkers returns a ServerOption that sets the number of worker | ||||
| // goroutines that should be used to process incoming streams. Setting this to | ||||
| // zero (default) will disable workers and spawn a new goroutine for each | ||||
| // stream. | ||||
| // | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func NumStreamWorkers(numServerWorkers uint32) ServerOption { | ||||
| 	// TODO: If/when this API gets stabilized (i.e. stream workers become the | ||||
| 	// only way streams are processed), change the behavior of the zero value to | ||||
| 	// a sane default. Preliminary experiments suggest that a value equal to the | ||||
| 	// number of CPUs available is most performant; requires thorough testing. | ||||
| 	return newFuncServerOption(func(o *serverOptions) { | ||||
| 		o.numServerWorkers = numServerWorkers | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // serverWorkerResetThreshold defines how often the stack must be reset. Every | ||||
| // N requests, by spawning a new goroutine in its place, a worker can reset its | ||||
| // stack so that large stacks don't live in memory forever. 2^16 should allow | ||||
| // each goroutine stack to live for at least a few seconds in a typical | ||||
| // workload (assuming a QPS of a few thousand requests/sec). | ||||
| const serverWorkerResetThreshold = 1 << 16 | ||||
|  | ||||
| // serverWorkers blocks on a *transport.Stream channel forever and waits for | ||||
| // data to be fed by serveStreams. This allows different requests to be | ||||
| // processed by the same goroutine, removing the need for expensive stack | ||||
| // re-allocations (see the runtime.morestack problem [1]). | ||||
| // | ||||
| // [1] https://github.com/golang/go/issues/18138 | ||||
| func (s *Server) serverWorker(ch chan *serverWorkerData) { | ||||
| 	// To make sure all server workers don't reset at the same time, choose a | ||||
| 	// random number of iterations before resetting. | ||||
| 	threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) | ||||
| 	for completed := 0; completed < threshold; completed++ { | ||||
| 		data, ok := <-ch | ||||
| 		if !ok { | ||||
| 			return | ||||
| 		} | ||||
| 		s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) | ||||
| 		data.wg.Done() | ||||
| 	} | ||||
| 	go s.serverWorker(ch) | ||||
| } | ||||
|  | ||||
| // initServerWorkers creates worker goroutines and channels to process incoming | ||||
| // connections to reduce the time spent overall on runtime.morestack. | ||||
| func (s *Server) initServerWorkers() { | ||||
| 	s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) | ||||
| 	for i := uint32(0); i < s.opts.numServerWorkers; i++ { | ||||
| 		s.serverWorkerChannels[i] = make(chan *serverWorkerData) | ||||
| 		go s.serverWorker(s.serverWorkerChannels[i]) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *Server) stopServerWorkers() { | ||||
| 	for i := uint32(0); i < s.opts.numServerWorkers; i++ { | ||||
| 		close(s.serverWorkerChannels[i]) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // NewServer creates a gRPC server which has no service registered and has not | ||||
| // started to accept requests yet. | ||||
| func NewServer(opt ...ServerOption) *Server { | ||||
| @@ -396,20 +564,26 @@ func NewServer(opt ...ServerOption) *Server { | ||||
| 		o.apply(&opts) | ||||
| 	} | ||||
| 	s := &Server{ | ||||
| 		lis:    make(map[net.Listener]bool), | ||||
| 		opts:   opts, | ||||
| 		conns:  make(map[transport.ServerTransport]bool), | ||||
| 		m:      make(map[string]*service), | ||||
| 		quit:   grpcsync.NewEvent(), | ||||
| 		done:   grpcsync.NewEvent(), | ||||
| 		czData: new(channelzData), | ||||
| 		lis:      make(map[net.Listener]bool), | ||||
| 		opts:     opts, | ||||
| 		conns:    make(map[string]map[transport.ServerTransport]bool), | ||||
| 		services: make(map[string]*serviceInfo), | ||||
| 		quit:     grpcsync.NewEvent(), | ||||
| 		done:     grpcsync.NewEvent(), | ||||
| 		czData:   new(channelzData), | ||||
| 	} | ||||
| 	chainUnaryServerInterceptors(s) | ||||
| 	chainStreamServerInterceptors(s) | ||||
| 	s.cv = sync.NewCond(&s.mu) | ||||
| 	if EnableTracing { | ||||
| 		_, file, line, _ := runtime.Caller(1) | ||||
| 		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) | ||||
| 	} | ||||
|  | ||||
| 	if s.opts.numServerWorkers > 0 { | ||||
| 		s.initServerWorkers() | ||||
| 	} | ||||
|  | ||||
| 	if channelz.IsOn() { | ||||
| 		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") | ||||
| 	} | ||||
| @@ -432,14 +606,29 @@ func (s *Server) errorf(format string, a ...interface{}) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ServiceRegistrar wraps a single method that supports service registration. It | ||||
| // enables users to pass concrete types other than grpc.Server to the service | ||||
| // registration methods exported by the IDL generated code. | ||||
| type ServiceRegistrar interface { | ||||
| 	// RegisterService registers a service and its implementation to the | ||||
| 	// concrete type implementing this interface.  It may not be called | ||||
| 	// once the server has started serving. | ||||
| 	// desc describes the service and its methods and handlers. impl is the | ||||
| 	// service implementation which is passed to the method handlers. | ||||
| 	RegisterService(desc *ServiceDesc, impl interface{}) | ||||
| } | ||||
|  | ||||
| // RegisterService registers a service and its implementation to the gRPC | ||||
| // server. It is called from the IDL generated code. This must be called before | ||||
| // invoking Serve. | ||||
| // invoking Serve. If ss is non-nil (for legacy code), its type is checked to | ||||
| // ensure it implements sd.HandlerType. | ||||
| func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { | ||||
| 	ht := reflect.TypeOf(sd.HandlerType).Elem() | ||||
| 	st := reflect.TypeOf(ss) | ||||
| 	if !st.Implements(ht) { | ||||
| 		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) | ||||
| 	if ss != nil { | ||||
| 		ht := reflect.TypeOf(sd.HandlerType).Elem() | ||||
| 		st := reflect.TypeOf(ss) | ||||
| 		if !st.Implements(ht) { | ||||
| 			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) | ||||
| 		} | ||||
| 	} | ||||
| 	s.register(sd, ss) | ||||
| } | ||||
| @@ -449,26 +638,26 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) { | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.printf("RegisterService(%q)", sd.ServiceName) | ||||
| 	if s.serve { | ||||
| 		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) | ||||
| 		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) | ||||
| 	} | ||||
| 	if _, ok := s.m[sd.ServiceName]; ok { | ||||
| 		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) | ||||
| 	if _, ok := s.services[sd.ServiceName]; ok { | ||||
| 		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) | ||||
| 	} | ||||
| 	srv := &service{ | ||||
| 		server: ss, | ||||
| 		md:     make(map[string]*MethodDesc), | ||||
| 		sd:     make(map[string]*StreamDesc), | ||||
| 		mdata:  sd.Metadata, | ||||
| 	info := &serviceInfo{ | ||||
| 		serviceImpl: ss, | ||||
| 		methods:     make(map[string]*MethodDesc), | ||||
| 		streams:     make(map[string]*StreamDesc), | ||||
| 		mdata:       sd.Metadata, | ||||
| 	} | ||||
| 	for i := range sd.Methods { | ||||
| 		d := &sd.Methods[i] | ||||
| 		srv.md[d.MethodName] = d | ||||
| 		info.methods[d.MethodName] = d | ||||
| 	} | ||||
| 	for i := range sd.Streams { | ||||
| 		d := &sd.Streams[i] | ||||
| 		srv.sd[d.StreamName] = d | ||||
| 		info.streams[d.StreamName] = d | ||||
| 	} | ||||
| 	s.m[sd.ServiceName] = srv | ||||
| 	s.services[sd.ServiceName] = info | ||||
| } | ||||
|  | ||||
| // MethodInfo contains the information of an RPC including its method name and type. | ||||
| @@ -492,16 +681,16 @@ type ServiceInfo struct { | ||||
| // Service names include the package names, in the form of <package>.<service>. | ||||
| func (s *Server) GetServiceInfo() map[string]ServiceInfo { | ||||
| 	ret := make(map[string]ServiceInfo) | ||||
| 	for n, srv := range s.m { | ||||
| 		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) | ||||
| 		for m := range srv.md { | ||||
| 	for n, srv := range s.services { | ||||
| 		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams)) | ||||
| 		for m := range srv.methods { | ||||
| 			methods = append(methods, MethodInfo{ | ||||
| 				Name:           m, | ||||
| 				IsClientStream: false, | ||||
| 				IsServerStream: false, | ||||
| 			}) | ||||
| 		} | ||||
| 		for m, d := range srv.sd { | ||||
| 		for m, d := range srv.streams { | ||||
| 			methods = append(methods, MethodInfo{ | ||||
| 				Name:           m, | ||||
| 				IsClientStream: d.ClientStreams, | ||||
| @@ -636,7 +825,7 @@ func (s *Server) Serve(lis net.Listener) error { | ||||
| 		// s.conns before this conn can be added. | ||||
| 		s.serveWG.Add(1) | ||||
| 		go func() { | ||||
| 			s.handleRawConn(rawConn) | ||||
| 			s.handleRawConn(lis.Addr().String(), rawConn) | ||||
| 			s.serveWG.Done() | ||||
| 		}() | ||||
| 	} | ||||
| @@ -644,7 +833,7 @@ func (s *Server) Serve(lis net.Listener) error { | ||||
|  | ||||
| // handleRawConn forks a goroutine to handle a just-accepted connection that | ||||
| // has not had any I/O performed on it yet. | ||||
| func (s *Server) handleRawConn(rawConn net.Conn) { | ||||
| func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { | ||||
| 	if s.quit.HasFired() { | ||||
| 		rawConn.Close() | ||||
| 		return | ||||
| @@ -658,7 +847,7 @@ func (s *Server) handleRawConn(rawConn net.Conn) { | ||||
| 			s.mu.Lock() | ||||
| 			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) | ||||
| 			s.mu.Unlock() | ||||
| 			grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) | ||||
| 			channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) | ||||
| 			rawConn.Close() | ||||
| 		} | ||||
| 		rawConn.SetDeadline(time.Time{}) | ||||
| @@ -672,15 +861,24 @@ func (s *Server) handleRawConn(rawConn net.Conn) { | ||||
| 	} | ||||
|  | ||||
| 	rawConn.SetDeadline(time.Time{}) | ||||
| 	if !s.addConn(st) { | ||||
| 	if !s.addConn(lisAddr, st) { | ||||
| 		return | ||||
| 	} | ||||
| 	go func() { | ||||
| 		s.serveStreams(st) | ||||
| 		s.removeConn(st) | ||||
| 		s.removeConn(lisAddr, st) | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (s *Server) drainServerTransports(addr string) { | ||||
| 	s.mu.Lock() | ||||
| 	conns := s.conns[addr] | ||||
| 	for st := range conns { | ||||
| 		st.Drain() | ||||
| 	} | ||||
| 	s.mu.Unlock() | ||||
| } | ||||
|  | ||||
| // newHTTP2Transport sets up a http/2 transport (using the | ||||
| // gRPC http2 server transport in transport/http2_server.go). | ||||
| func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { | ||||
| @@ -705,7 +903,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr | ||||
| 		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) | ||||
| 		s.mu.Unlock() | ||||
| 		c.Close() | ||||
| 		grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) | ||||
| 		channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| @@ -715,12 +913,27 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr | ||||
| func (s *Server) serveStreams(st transport.ServerTransport) { | ||||
| 	defer st.Close() | ||||
| 	var wg sync.WaitGroup | ||||
|  | ||||
| 	var roundRobinCounter uint32 | ||||
| 	st.HandleStreams(func(stream *transport.Stream) { | ||||
| 		wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer wg.Done() | ||||
| 			s.handleStream(st, stream, s.traceInfo(st, stream)) | ||||
| 		}() | ||||
| 		if s.opts.numServerWorkers > 0 { | ||||
| 			data := &serverWorkerData{st: st, wg: &wg, stream: stream} | ||||
| 			select { | ||||
| 			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: | ||||
| 			default: | ||||
| 				// If all stream workers are busy, fallback to the default code path. | ||||
| 				go func() { | ||||
| 					s.handleStream(st, stream, s.traceInfo(st, stream)) | ||||
| 					wg.Done() | ||||
| 				}() | ||||
| 			} | ||||
| 		} else { | ||||
| 			go func() { | ||||
| 				defer wg.Done() | ||||
| 				s.handleStream(st, stream, s.traceInfo(st, stream)) | ||||
| 			}() | ||||
| 		} | ||||
| 	}, func(ctx context.Context, method string) context.Context { | ||||
| 		if !EnableTracing { | ||||
| 			return ctx | ||||
| @@ -755,18 +968,22 @@ var _ http.Handler = (*Server)(nil) | ||||
| // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally | ||||
| // separate from grpc-go's HTTP/2 server. Performance and features may vary | ||||
| // between the two paths. ServeHTTP does not support some gRPC features | ||||
| // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL | ||||
| // and subject to change. | ||||
| // available through grpc-go's HTTP/2 server. | ||||
| // | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) | ||||
| 	if err != nil { | ||||
| 		http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	if !s.addConn(st) { | ||||
| 	if !s.addConn(listenerAddressForServeHTTP, st) { | ||||
| 		return | ||||
| 	} | ||||
| 	defer s.removeConn(st) | ||||
| 	defer s.removeConn(listenerAddressForServeHTTP, st) | ||||
| 	s.serveStreams(st) | ||||
| } | ||||
|  | ||||
| @@ -794,7 +1011,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea | ||||
| 	return trInfo | ||||
| } | ||||
|  | ||||
| func (s *Server) addConn(st transport.ServerTransport) bool { | ||||
| func (s *Server) addConn(addr string, st transport.ServerTransport) bool { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	if s.conns == nil { | ||||
| @@ -806,15 +1023,28 @@ func (s *Server) addConn(st transport.ServerTransport) bool { | ||||
| 		// immediately. | ||||
| 		st.Drain() | ||||
| 	} | ||||
| 	s.conns[st] = true | ||||
|  | ||||
| 	if s.conns[addr] == nil { | ||||
| 		// Create a map entry if this is the first connection on this listener. | ||||
| 		s.conns[addr] = make(map[transport.ServerTransport]bool) | ||||
| 	} | ||||
| 	s.conns[addr][st] = true | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (s *Server) removeConn(st transport.ServerTransport) { | ||||
| func (s *Server) removeConn(addr string, st transport.ServerTransport) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	if s.conns != nil { | ||||
| 		delete(s.conns, st) | ||||
|  | ||||
| 	conns := s.conns[addr] | ||||
| 	if conns != nil { | ||||
| 		delete(conns, st) | ||||
| 		if len(conns) == 0 { | ||||
| 			// If the last connection for this address is being removed, also | ||||
| 			// remove the map entry corresponding to the address. This is used | ||||
| 			// in GracefulStop() when waiting for all connections to be closed. | ||||
| 			delete(s.conns, addr) | ||||
| 		} | ||||
| 		s.cv.Broadcast() | ||||
| 	} | ||||
| } | ||||
| @@ -844,12 +1074,12 @@ func (s *Server) incrCallsFailed() { | ||||
| func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { | ||||
| 	data, err := encode(s.getCodec(stream.ContentSubtype()), msg) | ||||
| 	if err != nil { | ||||
| 		grpclog.Errorln("grpc: server failed to encode response: ", err) | ||||
| 		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err) | ||||
| 		return err | ||||
| 	} | ||||
| 	compData, err := compress(data, cp, comp) | ||||
| 	if err != nil { | ||||
| 		grpclog.Errorln("grpc: server failed to compress response: ", err) | ||||
| 		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err) | ||||
| 		return err | ||||
| 	} | ||||
| 	hdr, payload := msgHeader(data, compData) | ||||
| @@ -864,7 +1094,41 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { | ||||
| // chainUnaryServerInterceptors chains all unary server interceptors into one. | ||||
| func chainUnaryServerInterceptors(s *Server) { | ||||
| 	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will | ||||
| 	// be executed before any other chained interceptors. | ||||
| 	interceptors := s.opts.chainUnaryInts | ||||
| 	if s.opts.unaryInt != nil { | ||||
| 		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...) | ||||
| 	} | ||||
|  | ||||
| 	var chainedInt UnaryServerInterceptor | ||||
| 	if len(interceptors) == 0 { | ||||
| 		chainedInt = nil | ||||
| 	} else if len(interceptors) == 1 { | ||||
| 		chainedInt = interceptors[0] | ||||
| 	} else { | ||||
| 		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { | ||||
| 			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler)) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	s.opts.unaryInt = chainedInt | ||||
| } | ||||
|  | ||||
| // getChainUnaryHandler recursively generate the chained UnaryHandler | ||||
| func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler { | ||||
| 	if curr == len(interceptors)-1 { | ||||
| 		return finalHandler | ||||
| 	} | ||||
|  | ||||
| 	return func(ctx context.Context, req interface{}) (interface{}, error) { | ||||
| 		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { | ||||
| 	sh := s.opts.statsHandler | ||||
| 	if sh != nil || trInfo != nil || channelz.IsOn() { | ||||
| 		if channelz.IsOn() { | ||||
| @@ -987,10 +1251,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | ||||
| 	} | ||||
| 	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) | ||||
| 	if err != nil { | ||||
| 		if st, ok := status.FromError(err); ok { | ||||
| 			if e := t.WriteStatus(stream, st); e != nil { | ||||
| 				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) | ||||
| 			} | ||||
| 		if e := t.WriteStatus(stream, status.Convert(err)); e != nil { | ||||
| 			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e) | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
| @@ -1005,7 +1267,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | ||||
| 			sh.HandleRPC(stream.Context(), &stats.InPayload{ | ||||
| 				RecvTime:   time.Now(), | ||||
| 				Payload:    v, | ||||
| 				WireLength: payInfo.wireLength, | ||||
| 				WireLength: payInfo.wireLength + headerLen, | ||||
| 				Data:       d, | ||||
| 				Length:     len(d), | ||||
| 			}) | ||||
| @@ -1021,7 +1283,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | ||||
| 		return nil | ||||
| 	} | ||||
| 	ctx := NewContextWithServerTransportStream(stream.Context(), stream) | ||||
| 	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) | ||||
| 	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) | ||||
| 	if appErr != nil { | ||||
| 		appStatus, ok := status.FromError(appErr) | ||||
| 		if !ok { | ||||
| @@ -1034,7 +1296,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | ||||
| 			trInfo.tr.SetError() | ||||
| 		} | ||||
| 		if e := t.WriteStatus(stream, appStatus); e != nil { | ||||
| 			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) | ||||
| 			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) | ||||
| 		} | ||||
| 		if binlog != nil { | ||||
| 			if h, _ := stream.Header(); h.Len() > 0 { | ||||
| @@ -1061,9 +1323,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | ||||
| 			// The entire stream is done (for unary RPC only). | ||||
| 			return err | ||||
| 		} | ||||
| 		if s, ok := status.FromError(err); ok { | ||||
| 			if e := t.WriteStatus(stream, s); e != nil { | ||||
| 				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) | ||||
| 		if sts, ok := status.FromError(err); ok { | ||||
| 			if e := t.WriteStatus(stream, sts); e != nil { | ||||
| 				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) | ||||
| 			} | ||||
| 		} else { | ||||
| 			switch st := err.(type) { | ||||
| @@ -1113,7 +1375,41 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { | ||||
| // chainStreamServerInterceptors chains all stream server interceptors into one. | ||||
| func chainStreamServerInterceptors(s *Server) { | ||||
| 	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will | ||||
| 	// be executed before any other chained interceptors. | ||||
| 	interceptors := s.opts.chainStreamInts | ||||
| 	if s.opts.streamInt != nil { | ||||
| 		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...) | ||||
| 	} | ||||
|  | ||||
| 	var chainedInt StreamServerInterceptor | ||||
| 	if len(interceptors) == 0 { | ||||
| 		chainedInt = nil | ||||
| 	} else if len(interceptors) == 1 { | ||||
| 		chainedInt = interceptors[0] | ||||
| 	} else { | ||||
| 		chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { | ||||
| 			return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler)) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	s.opts.streamInt = chainedInt | ||||
| } | ||||
|  | ||||
| // getChainStreamHandler recursively generate the chained StreamHandler | ||||
| func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler { | ||||
| 	if curr == len(interceptors)-1 { | ||||
| 		return finalHandler | ||||
| 	} | ||||
|  | ||||
| 	return func(srv interface{}, ss ServerStream) error { | ||||
| 		return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { | ||||
| 	if channelz.IsOn() { | ||||
| 		s.incrCallsStarted() | ||||
| 	} | ||||
| @@ -1230,8 +1526,8 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp | ||||
| 	} | ||||
| 	var appErr error | ||||
| 	var server interface{} | ||||
| 	if srv != nil { | ||||
| 		server = srv.server | ||||
| 	if info != nil { | ||||
| 		server = info.serviceImpl | ||||
| 	} | ||||
| 	if s.opts.streamInt == nil { | ||||
| 		appErr = sd.Handler(server, ss) | ||||
| @@ -1297,7 +1593,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str | ||||
| 				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | ||||
| 				trInfo.tr.SetError() | ||||
| 			} | ||||
| 			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) | ||||
| 			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) | ||||
| 		} | ||||
| 		if trInfo != nil { | ||||
| 			trInfo.tr.Finish() | ||||
| @@ -1307,13 +1603,13 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str | ||||
| 	service := sm[:pos] | ||||
| 	method := sm[pos+1:] | ||||
|  | ||||
| 	srv, knownService := s.m[service] | ||||
| 	srv, knownService := s.services[service] | ||||
| 	if knownService { | ||||
| 		if md, ok := srv.md[method]; ok { | ||||
| 		if md, ok := srv.methods[method]; ok { | ||||
| 			s.processUnaryRPC(t, stream, srv, md, trInfo) | ||||
| 			return | ||||
| 		} | ||||
| 		if sd, ok := srv.sd[method]; ok { | ||||
| 		if sd, ok := srv.streams[method]; ok { | ||||
| 			s.processStreamingRPC(t, stream, srv, sd, trInfo) | ||||
| 			return | ||||
| 		} | ||||
| @@ -1338,7 +1634,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str | ||||
| 			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | ||||
| 			trInfo.tr.SetError() | ||||
| 		} | ||||
| 		grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) | ||||
| 		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) | ||||
| 	} | ||||
| 	if trInfo != nil { | ||||
| 		trInfo.tr.Finish() | ||||
| @@ -1351,7 +1647,10 @@ type streamKey struct{} | ||||
| // NewContextWithServerTransportStream creates a new context from ctx and | ||||
| // attaches stream to it. | ||||
| // | ||||
| // This API is EXPERIMENTAL. | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { | ||||
| 	return context.WithValue(ctx, streamKey{}, stream) | ||||
| } | ||||
| @@ -1363,7 +1662,10 @@ func NewContextWithServerTransportStream(ctx context.Context, stream ServerTrans | ||||
| // | ||||
| // See also NewContextWithServerTransportStream. | ||||
| // | ||||
| // This API is EXPERIMENTAL. | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This type is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| type ServerTransportStream interface { | ||||
| 	Method() string | ||||
| 	SetHeader(md metadata.MD) error | ||||
| @@ -1375,7 +1677,10 @@ type ServerTransportStream interface { | ||||
| // ctx. Returns nil if the given context has no stream associated with it | ||||
| // (which implies it is not an RPC invocation context). | ||||
| // | ||||
| // This API is EXPERIMENTAL. | ||||
| // Experimental | ||||
| // | ||||
| // Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||
| // later release. | ||||
| func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { | ||||
| 	s, _ := ctx.Value(streamKey{}).(ServerTransportStream) | ||||
| 	return s | ||||
| @@ -1403,7 +1708,7 @@ func (s *Server) Stop() { | ||||
| 	s.mu.Lock() | ||||
| 	listeners := s.lis | ||||
| 	s.lis = nil | ||||
| 	st := s.conns | ||||
| 	conns := s.conns | ||||
| 	s.conns = nil | ||||
| 	// interrupt GracefulStop if Stop and GracefulStop are called concurrently. | ||||
| 	s.cv.Broadcast() | ||||
| @@ -1412,8 +1717,13 @@ func (s *Server) Stop() { | ||||
| 	for lis := range listeners { | ||||
| 		lis.Close() | ||||
| 	} | ||||
| 	for c := range st { | ||||
| 		c.Close() | ||||
| 	for _, cs := range conns { | ||||
| 		for st := range cs { | ||||
| 			st.Close() | ||||
| 		} | ||||
| 	} | ||||
| 	if s.opts.numServerWorkers > 0 { | ||||
| 		s.stopServerWorkers() | ||||
| 	} | ||||
|  | ||||
| 	s.mu.Lock() | ||||
| @@ -1447,8 +1757,10 @@ func (s *Server) GracefulStop() { | ||||
| 	} | ||||
| 	s.lis = nil | ||||
| 	if !s.drain { | ||||
| 		for st := range s.conns { | ||||
| 			st.Drain() | ||||
| 		for _, conns := range s.conns { | ||||
| 			for st := range conns { | ||||
| 				st.Drain() | ||||
| 			} | ||||
| 		} | ||||
| 		s.drain = true | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user