1888 lines
		
	
	
		
			58 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1888 lines
		
	
	
		
			58 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 *
 | 
						|
 * Copyright 2014 gRPC authors.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 *
 | 
						|
 */
 | 
						|
 | 
						|
package grpc
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"math"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"reflect"
 | 
						|
	"runtime"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"golang.org/x/net/trace"
 | 
						|
 | 
						|
	"google.golang.org/grpc/codes"
 | 
						|
	"google.golang.org/grpc/credentials"
 | 
						|
	"google.golang.org/grpc/encoding"
 | 
						|
	"google.golang.org/grpc/encoding/proto"
 | 
						|
	"google.golang.org/grpc/grpclog"
 | 
						|
	"google.golang.org/grpc/internal"
 | 
						|
	"google.golang.org/grpc/internal/binarylog"
 | 
						|
	"google.golang.org/grpc/internal/channelz"
 | 
						|
	"google.golang.org/grpc/internal/grpcrand"
 | 
						|
	"google.golang.org/grpc/internal/grpcsync"
 | 
						|
	"google.golang.org/grpc/internal/transport"
 | 
						|
	"google.golang.org/grpc/keepalive"
 | 
						|
	"google.golang.org/grpc/metadata"
 | 
						|
	"google.golang.org/grpc/peer"
 | 
						|
	"google.golang.org/grpc/stats"
 | 
						|
	"google.golang.org/grpc/status"
 | 
						|
	"google.golang.org/grpc/tap"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
 | 
						|
	defaultServerMaxSendMessageSize    = math.MaxInt32
 | 
						|
 | 
						|
	// Server transports are tracked in a map which is keyed on listener
 | 
						|
	// address. For regular gRPC traffic, connections are accepted in Serve()
 | 
						|
	// through a call to Accept(), and we use the actual listener address as key
 | 
						|
	// when we add it to the map. But for connections received through
 | 
						|
	// ServeHTTP(), we do not have a listener and hence use this dummy value.
 | 
						|
	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
 | 
						|
)
 | 
						|
 | 
						|
func init() {
 | 
						|
	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
 | 
						|
		return srv.opts.creds
 | 
						|
	}
 | 
						|
	internal.DrainServerTransports = func(srv *Server, addr string) {
 | 
						|
		srv.drainServerTransports(addr)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
var statusOK = status.New(codes.OK, "")
 | 
						|
var logger = grpclog.Component("core")
 | 
						|
 | 
						|
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
 | 
						|
 | 
						|
// MethodDesc represents an RPC service's method specification.
 | 
						|
type MethodDesc struct {
 | 
						|
	MethodName string
 | 
						|
	Handler    methodHandler
 | 
						|
}
 | 
						|
 | 
						|
// ServiceDesc represents an RPC service's specification.
 | 
						|
type ServiceDesc struct {
 | 
						|
	ServiceName string
 | 
						|
	// The pointer to the service interface. Used to check whether the user
 | 
						|
	// provided implementation satisfies the interface requirements.
 | 
						|
	HandlerType interface{}
 | 
						|
	Methods     []MethodDesc
 | 
						|
	Streams     []StreamDesc
 | 
						|
	Metadata    interface{}
 | 
						|
}
 | 
						|
 | 
						|
// serviceInfo wraps information about a service. It is very similar to
 | 
						|
// ServiceDesc and is constructed from it for internal purposes.
 | 
						|
type serviceInfo struct {
 | 
						|
	// Contains the implementation for the methods in this service.
 | 
						|
	serviceImpl interface{}
 | 
						|
	methods     map[string]*MethodDesc
 | 
						|
	streams     map[string]*StreamDesc
 | 
						|
	mdata       interface{}
 | 
						|
}
 | 
						|
 | 
						|
type serverWorkerData struct {
 | 
						|
	st     transport.ServerTransport
 | 
						|
	wg     *sync.WaitGroup
 | 
						|
	stream *transport.Stream
 | 
						|
}
 | 
						|
 | 
						|
// Server is a gRPC server to serve RPC requests.
 | 
						|
type Server struct {
 | 
						|
	opts serverOptions
 | 
						|
 | 
						|
	mu  sync.Mutex // guards following
 | 
						|
	lis map[net.Listener]bool
 | 
						|
	// conns contains all active server transports. It is a map keyed on a
 | 
						|
	// listener address with the value being the set of active transports
 | 
						|
	// belonging to that listener.
 | 
						|
	conns    map[string]map[transport.ServerTransport]bool
 | 
						|
	serve    bool
 | 
						|
	drain    bool
 | 
						|
	cv       *sync.Cond              // signaled when connections close for GracefulStop
 | 
						|
	services map[string]*serviceInfo // service name -> service info
 | 
						|
	events   trace.EventLog
 | 
						|
 | 
						|
	quit               *grpcsync.Event
 | 
						|
	done               *grpcsync.Event
 | 
						|
	channelzRemoveOnce sync.Once
 | 
						|
	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
 | 
						|
 | 
						|
	channelzID *channelz.Identifier
 | 
						|
	czData     *channelzData
 | 
						|
 | 
						|
	serverWorkerChannels []chan *serverWorkerData
 | 
						|
}
 | 
						|
 | 
						|
type serverOptions struct {
 | 
						|
	creds                 credentials.TransportCredentials
 | 
						|
	codec                 baseCodec
 | 
						|
	cp                    Compressor
 | 
						|
	dc                    Decompressor
 | 
						|
	unaryInt              UnaryServerInterceptor
 | 
						|
	streamInt             StreamServerInterceptor
 | 
						|
	chainUnaryInts        []UnaryServerInterceptor
 | 
						|
	chainStreamInts       []StreamServerInterceptor
 | 
						|
	inTapHandle           tap.ServerInHandle
 | 
						|
	statsHandler          stats.Handler
 | 
						|
	maxConcurrentStreams  uint32
 | 
						|
	maxReceiveMessageSize int
 | 
						|
	maxSendMessageSize    int
 | 
						|
	unknownStreamDesc     *StreamDesc
 | 
						|
	keepaliveParams       keepalive.ServerParameters
 | 
						|
	keepalivePolicy       keepalive.EnforcementPolicy
 | 
						|
	initialWindowSize     int32
 | 
						|
	initialConnWindowSize int32
 | 
						|
	writeBufferSize       int
 | 
						|
	readBufferSize        int
 | 
						|
	connectionTimeout     time.Duration
 | 
						|
	maxHeaderListSize     *uint32
 | 
						|
	headerTableSize       *uint32
 | 
						|
	numServerWorkers      uint32
 | 
						|
}
 | 
						|
 | 
						|
var defaultServerOptions = serverOptions{
 | 
						|
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
 | 
						|
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
 | 
						|
	connectionTimeout:     120 * time.Second,
 | 
						|
	writeBufferSize:       defaultWriteBufSize,
 | 
						|
	readBufferSize:        defaultReadBufSize,
 | 
						|
}
 | 
						|
 | 
						|
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
 | 
						|
type ServerOption interface {
 | 
						|
	apply(*serverOptions)
 | 
						|
}
 | 
						|
 | 
						|
// EmptyServerOption does not alter the server configuration. It can be embedded
 | 
						|
// in another structure to build custom server options.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
type EmptyServerOption struct{}
 | 
						|
 | 
						|
func (EmptyServerOption) apply(*serverOptions) {}
 | 
						|
 | 
						|
// funcServerOption wraps a function that modifies serverOptions into an
 | 
						|
// implementation of the ServerOption interface.
 | 
						|
type funcServerOption struct {
 | 
						|
	f func(*serverOptions)
 | 
						|
}
 | 
						|
 | 
						|
func (fdo *funcServerOption) apply(do *serverOptions) {
 | 
						|
	fdo.f(do)
 | 
						|
}
 | 
						|
 | 
						|
func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
 | 
						|
	return &funcServerOption{
 | 
						|
		f: f,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
 | 
						|
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
 | 
						|
// The default value for this buffer is 32KB.
 | 
						|
// Zero will disable the write buffer such that each write will be on underlying connection.
 | 
						|
// Note: A Send call may not directly translate to a write.
 | 
						|
func WriteBufferSize(s int) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.writeBufferSize = s
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
 | 
						|
// for one read syscall.
 | 
						|
// The default value for this buffer is 32KB.
 | 
						|
// Zero will disable read buffer for a connection so data framer can access the underlying
 | 
						|
// conn directly.
 | 
						|
func ReadBufferSize(s int) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.readBufferSize = s
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// InitialWindowSize returns a ServerOption that sets window size for stream.
 | 
						|
// The lower bound for window size is 64K and any value smaller than that will be ignored.
 | 
						|
func InitialWindowSize(s int32) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.initialWindowSize = s
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
 | 
						|
// The lower bound for window size is 64K and any value smaller than that will be ignored.
 | 
						|
func InitialConnWindowSize(s int32) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.initialConnWindowSize = s
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
 | 
						|
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
 | 
						|
	if kp.Time > 0 && kp.Time < time.Second {
 | 
						|
		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
 | 
						|
		kp.Time = time.Second
 | 
						|
	}
 | 
						|
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.keepaliveParams = kp
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
 | 
						|
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.keepalivePolicy = kep
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
 | 
						|
//
 | 
						|
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
 | 
						|
//
 | 
						|
// Deprecated: register codecs using encoding.RegisterCodec. The server will
 | 
						|
// automatically use registered codecs based on the incoming requests' headers.
 | 
						|
// See also
 | 
						|
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
 | 
						|
// Will be supported throughout 1.x.
 | 
						|
func CustomCodec(codec Codec) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.codec = codec
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// ForceServerCodec returns a ServerOption that sets a codec for message
 | 
						|
// marshaling and unmarshaling.
 | 
						|
//
 | 
						|
// This will override any lookups by content-subtype for Codecs registered
 | 
						|
// with RegisterCodec.
 | 
						|
//
 | 
						|
// See Content-Type on
 | 
						|
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
 | 
						|
// more details. Also see the documentation on RegisterCodec and
 | 
						|
// CallContentSubtype for more details on the interaction between encoding.Codec
 | 
						|
// and content-subtype.
 | 
						|
//
 | 
						|
// This function is provided for advanced users; prefer to register codecs
 | 
						|
// using encoding.RegisterCodec.
 | 
						|
// The server will automatically use registered codecs based on the incoming
 | 
						|
// requests' headers. See also
 | 
						|
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
 | 
						|
// Will be supported throughout 1.x.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func ForceServerCodec(codec encoding.Codec) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.codec = codec
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// RPCCompressor returns a ServerOption that sets a compressor for outbound
 | 
						|
// messages.  For backward compatibility, all outbound messages will be sent
 | 
						|
// using this compressor, regardless of incoming message compression.  By
 | 
						|
// default, server messages will be sent using the same compressor with which
 | 
						|
// request messages were sent.
 | 
						|
//
 | 
						|
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
 | 
						|
// throughout 1.x.
 | 
						|
func RPCCompressor(cp Compressor) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.cp = cp
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
 | 
						|
// messages.  It has higher priority than decompressors registered via
 | 
						|
// encoding.RegisterCompressor.
 | 
						|
//
 | 
						|
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
 | 
						|
// throughout 1.x.
 | 
						|
func RPCDecompressor(dc Decompressor) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.dc = dc
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
 | 
						|
// If this is not set, gRPC uses the default limit.
 | 
						|
//
 | 
						|
// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
 | 
						|
func MaxMsgSize(m int) ServerOption {
 | 
						|
	return MaxRecvMsgSize(m)
 | 
						|
}
 | 
						|
 | 
						|
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
 | 
						|
// If this is not set, gRPC uses the default 4MB.
 | 
						|
func MaxRecvMsgSize(m int) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.maxReceiveMessageSize = m
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
 | 
						|
// If this is not set, gRPC uses the default `math.MaxInt32`.
 | 
						|
func MaxSendMsgSize(m int) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.maxSendMessageSize = m
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
 | 
						|
// of concurrent streams to each ServerTransport.
 | 
						|
func MaxConcurrentStreams(n uint32) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.maxConcurrentStreams = n
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// Creds returns a ServerOption that sets credentials for server connections.
 | 
						|
func Creds(c credentials.TransportCredentials) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.creds = c
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
 | 
						|
// server. Only one unary interceptor can be installed. The construction of multiple
 | 
						|
// interceptors (e.g., chaining) can be implemented at the caller.
 | 
						|
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		if o.unaryInt != nil {
 | 
						|
			panic("The unary server interceptor was already set and may not be reset.")
 | 
						|
		}
 | 
						|
		o.unaryInt = i
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
 | 
						|
// for unary RPCs. The first interceptor will be the outer most,
 | 
						|
// while the last interceptor will be the inner most wrapper around the real call.
 | 
						|
// All unary interceptors added by this method will be chained.
 | 
						|
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
 | 
						|
// server. Only one stream interceptor can be installed.
 | 
						|
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		if o.streamInt != nil {
 | 
						|
			panic("The stream server interceptor was already set and may not be reset.")
 | 
						|
		}
 | 
						|
		o.streamInt = i
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
 | 
						|
// for streaming RPCs. The first interceptor will be the outer most,
 | 
						|
// while the last interceptor will be the inner most wrapper around the real call.
 | 
						|
// All stream interceptors added by this method will be chained.
 | 
						|
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// InTapHandle returns a ServerOption that sets the tap handle for all the server
 | 
						|
// transport to be created. Only one can be installed.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func InTapHandle(h tap.ServerInHandle) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		if o.inTapHandle != nil {
 | 
						|
			panic("The tap handle was already set and may not be reset.")
 | 
						|
		}
 | 
						|
		o.inTapHandle = h
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// StatsHandler returns a ServerOption that sets the stats handler for the server.
 | 
						|
func StatsHandler(h stats.Handler) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.statsHandler = h
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
 | 
						|
// unknown service handler. The provided method is a bidi-streaming RPC service
 | 
						|
// handler that will be invoked instead of returning the "unimplemented" gRPC
 | 
						|
// error whenever a request is received for an unregistered service or method.
 | 
						|
// The handling function and stream interceptor (if set) have full access to
 | 
						|
// the ServerStream, including its Context.
 | 
						|
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.unknownStreamDesc = &StreamDesc{
 | 
						|
			StreamName: "unknown_service_handler",
 | 
						|
			Handler:    streamHandler,
 | 
						|
			// We need to assume that the users of the streamHandler will want to use both.
 | 
						|
			ClientStreams: true,
 | 
						|
			ServerStreams: true,
 | 
						|
		}
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// ConnectionTimeout returns a ServerOption that sets the timeout for
 | 
						|
// connection establishment (up to and including HTTP/2 handshaking) for all
 | 
						|
// new connections.  If this is not set, the default is 120 seconds.  A zero or
 | 
						|
// negative value will result in an immediate timeout.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func ConnectionTimeout(d time.Duration) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.connectionTimeout = d
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
 | 
						|
// of header list that the server is prepared to accept.
 | 
						|
func MaxHeaderListSize(s uint32) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.maxHeaderListSize = &s
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// HeaderTableSize returns a ServerOption that sets the size of dynamic
 | 
						|
// header table for stream.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func HeaderTableSize(s uint32) ServerOption {
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.headerTableSize = &s
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// NumStreamWorkers returns a ServerOption that sets the number of worker
 | 
						|
// goroutines that should be used to process incoming streams. Setting this to
 | 
						|
// zero (default) will disable workers and spawn a new goroutine for each
 | 
						|
// stream.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func NumStreamWorkers(numServerWorkers uint32) ServerOption {
 | 
						|
	// TODO: If/when this API gets stabilized (i.e. stream workers become the
 | 
						|
	// only way streams are processed), change the behavior of the zero value to
 | 
						|
	// a sane default. Preliminary experiments suggest that a value equal to the
 | 
						|
	// number of CPUs available is most performant; requires thorough testing.
 | 
						|
	return newFuncServerOption(func(o *serverOptions) {
 | 
						|
		o.numServerWorkers = numServerWorkers
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// serverWorkerResetThreshold defines how often the stack must be reset. Every
 | 
						|
// N requests, by spawning a new goroutine in its place, a worker can reset its
 | 
						|
// stack so that large stacks don't live in memory forever. 2^16 should allow
 | 
						|
// each goroutine stack to live for at least a few seconds in a typical
 | 
						|
// workload (assuming a QPS of a few thousand requests/sec).
 | 
						|
const serverWorkerResetThreshold = 1 << 16
 | 
						|
 | 
						|
// serverWorkers blocks on a *transport.Stream channel forever and waits for
 | 
						|
// data to be fed by serveStreams. This allows different requests to be
 | 
						|
// processed by the same goroutine, removing the need for expensive stack
 | 
						|
// re-allocations (see the runtime.morestack problem [1]).
 | 
						|
//
 | 
						|
// [1] https://github.com/golang/go/issues/18138
 | 
						|
func (s *Server) serverWorker(ch chan *serverWorkerData) {
 | 
						|
	// To make sure all server workers don't reset at the same time, choose a
 | 
						|
	// random number of iterations before resetting.
 | 
						|
	threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
 | 
						|
	for completed := 0; completed < threshold; completed++ {
 | 
						|
		data, ok := <-ch
 | 
						|
		if !ok {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
 | 
						|
		data.wg.Done()
 | 
						|
	}
 | 
						|
	go s.serverWorker(ch)
 | 
						|
}
 | 
						|
 | 
						|
// initServerWorkers creates worker goroutines and channels to process incoming
 | 
						|
// connections to reduce the time spent overall on runtime.morestack.
 | 
						|
func (s *Server) initServerWorkers() {
 | 
						|
	s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
 | 
						|
	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
 | 
						|
		s.serverWorkerChannels[i] = make(chan *serverWorkerData)
 | 
						|
		go s.serverWorker(s.serverWorkerChannels[i])
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) stopServerWorkers() {
 | 
						|
	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
 | 
						|
		close(s.serverWorkerChannels[i])
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// NewServer creates a gRPC server which has no service registered and has not
 | 
						|
// started to accept requests yet.
 | 
						|
func NewServer(opt ...ServerOption) *Server {
 | 
						|
	opts := defaultServerOptions
 | 
						|
	for _, o := range opt {
 | 
						|
		o.apply(&opts)
 | 
						|
	}
 | 
						|
	s := &Server{
 | 
						|
		lis:      make(map[net.Listener]bool),
 | 
						|
		opts:     opts,
 | 
						|
		conns:    make(map[string]map[transport.ServerTransport]bool),
 | 
						|
		services: make(map[string]*serviceInfo),
 | 
						|
		quit:     grpcsync.NewEvent(),
 | 
						|
		done:     grpcsync.NewEvent(),
 | 
						|
		czData:   new(channelzData),
 | 
						|
	}
 | 
						|
	chainUnaryServerInterceptors(s)
 | 
						|
	chainStreamServerInterceptors(s)
 | 
						|
	s.cv = sync.NewCond(&s.mu)
 | 
						|
	if EnableTracing {
 | 
						|
		_, file, line, _ := runtime.Caller(1)
 | 
						|
		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
 | 
						|
	}
 | 
						|
 | 
						|
	if s.opts.numServerWorkers > 0 {
 | 
						|
		s.initServerWorkers()
 | 
						|
	}
 | 
						|
 | 
						|
	s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
 | 
						|
	channelz.Info(logger, s.channelzID, "Server created")
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// printf records an event in s's event log, unless s has been stopped.
 | 
						|
// REQUIRES s.mu is held.
 | 
						|
func (s *Server) printf(format string, a ...interface{}) {
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Printf(format, a...)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// errorf records an error in s's event log, unless s has been stopped.
 | 
						|
// REQUIRES s.mu is held.
 | 
						|
func (s *Server) errorf(format string, a ...interface{}) {
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Errorf(format, a...)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ServiceRegistrar wraps a single method that supports service registration. It
 | 
						|
// enables users to pass concrete types other than grpc.Server to the service
 | 
						|
// registration methods exported by the IDL generated code.
 | 
						|
type ServiceRegistrar interface {
 | 
						|
	// RegisterService registers a service and its implementation to the
 | 
						|
	// concrete type implementing this interface.  It may not be called
 | 
						|
	// once the server has started serving.
 | 
						|
	// desc describes the service and its methods and handlers. impl is the
 | 
						|
	// service implementation which is passed to the method handlers.
 | 
						|
	RegisterService(desc *ServiceDesc, impl interface{})
 | 
						|
}
 | 
						|
 | 
						|
// RegisterService registers a service and its implementation to the gRPC
 | 
						|
// server. It is called from the IDL generated code. This must be called before
 | 
						|
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
 | 
						|
// ensure it implements sd.HandlerType.
 | 
						|
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
 | 
						|
	if ss != nil {
 | 
						|
		ht := reflect.TypeOf(sd.HandlerType).Elem()
 | 
						|
		st := reflect.TypeOf(ss)
 | 
						|
		if !st.Implements(ht) {
 | 
						|
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	s.register(sd, ss)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
	s.printf("RegisterService(%q)", sd.ServiceName)
 | 
						|
	if s.serve {
 | 
						|
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
 | 
						|
	}
 | 
						|
	if _, ok := s.services[sd.ServiceName]; ok {
 | 
						|
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
 | 
						|
	}
 | 
						|
	info := &serviceInfo{
 | 
						|
		serviceImpl: ss,
 | 
						|
		methods:     make(map[string]*MethodDesc),
 | 
						|
		streams:     make(map[string]*StreamDesc),
 | 
						|
		mdata:       sd.Metadata,
 | 
						|
	}
 | 
						|
	for i := range sd.Methods {
 | 
						|
		d := &sd.Methods[i]
 | 
						|
		info.methods[d.MethodName] = d
 | 
						|
	}
 | 
						|
	for i := range sd.Streams {
 | 
						|
		d := &sd.Streams[i]
 | 
						|
		info.streams[d.StreamName] = d
 | 
						|
	}
 | 
						|
	s.services[sd.ServiceName] = info
 | 
						|
}
 | 
						|
 | 
						|
// MethodInfo contains the information of an RPC including its method name and type.
 | 
						|
type MethodInfo struct {
 | 
						|
	// Name is the method name only, without the service name or package name.
 | 
						|
	Name string
 | 
						|
	// IsClientStream indicates whether the RPC is a client streaming RPC.
 | 
						|
	IsClientStream bool
 | 
						|
	// IsServerStream indicates whether the RPC is a server streaming RPC.
 | 
						|
	IsServerStream bool
 | 
						|
}
 | 
						|
 | 
						|
// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
 | 
						|
type ServiceInfo struct {
 | 
						|
	Methods []MethodInfo
 | 
						|
	// Metadata is the metadata specified in ServiceDesc when registering service.
 | 
						|
	Metadata interface{}
 | 
						|
}
 | 
						|
 | 
						|
// GetServiceInfo returns a map from service names to ServiceInfo.
 | 
						|
// Service names include the package names, in the form of <package>.<service>.
 | 
						|
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
 | 
						|
	ret := make(map[string]ServiceInfo)
 | 
						|
	for n, srv := range s.services {
 | 
						|
		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
 | 
						|
		for m := range srv.methods {
 | 
						|
			methods = append(methods, MethodInfo{
 | 
						|
				Name:           m,
 | 
						|
				IsClientStream: false,
 | 
						|
				IsServerStream: false,
 | 
						|
			})
 | 
						|
		}
 | 
						|
		for m, d := range srv.streams {
 | 
						|
			methods = append(methods, MethodInfo{
 | 
						|
				Name:           m,
 | 
						|
				IsClientStream: d.ClientStreams,
 | 
						|
				IsServerStream: d.ServerStreams,
 | 
						|
			})
 | 
						|
		}
 | 
						|
 | 
						|
		ret[n] = ServiceInfo{
 | 
						|
			Methods:  methods,
 | 
						|
			Metadata: srv.mdata,
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// ErrServerStopped indicates that the operation is now illegal because of
 | 
						|
// the server being stopped.
 | 
						|
var ErrServerStopped = errors.New("grpc: the server has been stopped")
 | 
						|
 | 
						|
type listenSocket struct {
 | 
						|
	net.Listener
 | 
						|
	channelzID *channelz.Identifier
 | 
						|
}
 | 
						|
 | 
						|
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
 | 
						|
	return &channelz.SocketInternalMetric{
 | 
						|
		SocketOptions: channelz.GetSocketOption(l.Listener),
 | 
						|
		LocalAddr:     l.Listener.Addr(),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (l *listenSocket) Close() error {
 | 
						|
	err := l.Listener.Close()
 | 
						|
	channelz.RemoveEntry(l.channelzID)
 | 
						|
	channelz.Info(logger, l.channelzID, "ListenSocket deleted")
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Serve accepts incoming connections on the listener lis, creating a new
 | 
						|
// ServerTransport and service goroutine for each. The service goroutines
 | 
						|
// read gRPC requests and then call the registered handlers to reply to them.
 | 
						|
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
 | 
						|
// this method returns.
 | 
						|
// Serve will return a non-nil error unless Stop or GracefulStop is called.
 | 
						|
func (s *Server) Serve(lis net.Listener) error {
 | 
						|
	s.mu.Lock()
 | 
						|
	s.printf("serving")
 | 
						|
	s.serve = true
 | 
						|
	if s.lis == nil {
 | 
						|
		// Serve called after Stop or GracefulStop.
 | 
						|
		s.mu.Unlock()
 | 
						|
		lis.Close()
 | 
						|
		return ErrServerStopped
 | 
						|
	}
 | 
						|
 | 
						|
	s.serveWG.Add(1)
 | 
						|
	defer func() {
 | 
						|
		s.serveWG.Done()
 | 
						|
		if s.quit.HasFired() {
 | 
						|
			// Stop or GracefulStop called; block until done and return nil.
 | 
						|
			<-s.done.Done()
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	ls := &listenSocket{Listener: lis}
 | 
						|
	s.lis[ls] = true
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		s.mu.Lock()
 | 
						|
		if s.lis != nil && s.lis[ls] {
 | 
						|
			ls.Close()
 | 
						|
			delete(s.lis, ls)
 | 
						|
		}
 | 
						|
		s.mu.Unlock()
 | 
						|
	}()
 | 
						|
 | 
						|
	var err error
 | 
						|
	ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
 | 
						|
	if err != nil {
 | 
						|
		s.mu.Unlock()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
	channelz.Info(logger, ls.channelzID, "ListenSocket created")
 | 
						|
 | 
						|
	var tempDelay time.Duration // how long to sleep on accept failure
 | 
						|
	for {
 | 
						|
		rawConn, err := lis.Accept()
 | 
						|
		if err != nil {
 | 
						|
			if ne, ok := err.(interface {
 | 
						|
				Temporary() bool
 | 
						|
			}); ok && ne.Temporary() {
 | 
						|
				if tempDelay == 0 {
 | 
						|
					tempDelay = 5 * time.Millisecond
 | 
						|
				} else {
 | 
						|
					tempDelay *= 2
 | 
						|
				}
 | 
						|
				if max := 1 * time.Second; tempDelay > max {
 | 
						|
					tempDelay = max
 | 
						|
				}
 | 
						|
				s.mu.Lock()
 | 
						|
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
 | 
						|
				s.mu.Unlock()
 | 
						|
				timer := time.NewTimer(tempDelay)
 | 
						|
				select {
 | 
						|
				case <-timer.C:
 | 
						|
				case <-s.quit.Done():
 | 
						|
					timer.Stop()
 | 
						|
					return nil
 | 
						|
				}
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			s.mu.Lock()
 | 
						|
			s.printf("done serving; Accept = %v", err)
 | 
						|
			s.mu.Unlock()
 | 
						|
 | 
						|
			if s.quit.HasFired() {
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		tempDelay = 0
 | 
						|
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
 | 
						|
		// loop goroutine.
 | 
						|
		//
 | 
						|
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
 | 
						|
		// s.conns before this conn can be added.
 | 
						|
		s.serveWG.Add(1)
 | 
						|
		go func() {
 | 
						|
			s.handleRawConn(lis.Addr().String(), rawConn)
 | 
						|
			s.serveWG.Done()
 | 
						|
		}()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// handleRawConn forks a goroutine to handle a just-accepted connection that
 | 
						|
// has not had any I/O performed on it yet.
 | 
						|
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
 | 
						|
	if s.quit.HasFired() {
 | 
						|
		rawConn.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
 | 
						|
 | 
						|
	// Finish handshaking (HTTP2)
 | 
						|
	st := s.newHTTP2Transport(rawConn)
 | 
						|
	rawConn.SetDeadline(time.Time{})
 | 
						|
	if st == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if !s.addConn(lisAddr, st) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	go func() {
 | 
						|
		s.serveStreams(st)
 | 
						|
		s.removeConn(lisAddr, st)
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) drainServerTransports(addr string) {
 | 
						|
	s.mu.Lock()
 | 
						|
	conns := s.conns[addr]
 | 
						|
	for st := range conns {
 | 
						|
		st.Drain()
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// newHTTP2Transport sets up a http/2 transport (using the
 | 
						|
// gRPC http2 server transport in transport/http2_server.go).
 | 
						|
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
 | 
						|
	config := &transport.ServerConfig{
 | 
						|
		MaxStreams:            s.opts.maxConcurrentStreams,
 | 
						|
		ConnectionTimeout:     s.opts.connectionTimeout,
 | 
						|
		Credentials:           s.opts.creds,
 | 
						|
		InTapHandle:           s.opts.inTapHandle,
 | 
						|
		StatsHandler:          s.opts.statsHandler,
 | 
						|
		KeepaliveParams:       s.opts.keepaliveParams,
 | 
						|
		KeepalivePolicy:       s.opts.keepalivePolicy,
 | 
						|
		InitialWindowSize:     s.opts.initialWindowSize,
 | 
						|
		InitialConnWindowSize: s.opts.initialConnWindowSize,
 | 
						|
		WriteBufferSize:       s.opts.writeBufferSize,
 | 
						|
		ReadBufferSize:        s.opts.readBufferSize,
 | 
						|
		ChannelzParentID:      s.channelzID,
 | 
						|
		MaxHeaderListSize:     s.opts.maxHeaderListSize,
 | 
						|
		HeaderTableSize:       s.opts.headerTableSize,
 | 
						|
	}
 | 
						|
	st, err := transport.NewServerTransport(c, config)
 | 
						|
	if err != nil {
 | 
						|
		s.mu.Lock()
 | 
						|
		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
 | 
						|
		s.mu.Unlock()
 | 
						|
		// ErrConnDispatched means that the connection was dispatched away from
 | 
						|
		// gRPC; those connections should be left open.
 | 
						|
		if err != credentials.ErrConnDispatched {
 | 
						|
			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
 | 
						|
			if err != io.EOF {
 | 
						|
				channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
 | 
						|
			}
 | 
						|
			c.Close()
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	return st
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) serveStreams(st transport.ServerTransport) {
 | 
						|
	defer st.Close()
 | 
						|
	var wg sync.WaitGroup
 | 
						|
 | 
						|
	var roundRobinCounter uint32
 | 
						|
	st.HandleStreams(func(stream *transport.Stream) {
 | 
						|
		wg.Add(1)
 | 
						|
		if s.opts.numServerWorkers > 0 {
 | 
						|
			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
 | 
						|
			select {
 | 
						|
			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
 | 
						|
			default:
 | 
						|
				// If all stream workers are busy, fallback to the default code path.
 | 
						|
				go func() {
 | 
						|
					s.handleStream(st, stream, s.traceInfo(st, stream))
 | 
						|
					wg.Done()
 | 
						|
				}()
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			go func() {
 | 
						|
				defer wg.Done()
 | 
						|
				s.handleStream(st, stream, s.traceInfo(st, stream))
 | 
						|
			}()
 | 
						|
		}
 | 
						|
	}, func(ctx context.Context, method string) context.Context {
 | 
						|
		if !EnableTracing {
 | 
						|
			return ctx
 | 
						|
		}
 | 
						|
		tr := trace.New("grpc.Recv."+methodFamily(method), method)
 | 
						|
		return trace.NewContext(ctx, tr)
 | 
						|
	})
 | 
						|
	wg.Wait()
 | 
						|
}
 | 
						|
 | 
						|
var _ http.Handler = (*Server)(nil)
 | 
						|
 | 
						|
// ServeHTTP implements the Go standard library's http.Handler
 | 
						|
// interface by responding to the gRPC request r, by looking up
 | 
						|
// the requested gRPC method in the gRPC server s.
 | 
						|
//
 | 
						|
// The provided HTTP request must have arrived on an HTTP/2
 | 
						|
// connection. When using the Go standard library's server,
 | 
						|
// practically this means that the Request must also have arrived
 | 
						|
// over TLS.
 | 
						|
//
 | 
						|
// To share one port (such as 443 for https) between gRPC and an
 | 
						|
// existing http.Handler, use a root http.Handler such as:
 | 
						|
//
 | 
						|
//   if r.ProtoMajor == 2 && strings.HasPrefix(
 | 
						|
//   	r.Header.Get("Content-Type"), "application/grpc") {
 | 
						|
//   	grpcServer.ServeHTTP(w, r)
 | 
						|
//   } else {
 | 
						|
//   	yourMux.ServeHTTP(w, r)
 | 
						|
//   }
 | 
						|
//
 | 
						|
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
 | 
						|
// separate from grpc-go's HTTP/2 server. Performance and features may vary
 | 
						|
// between the two paths. ServeHTTP does not support some gRPC features
 | 
						|
// available through grpc-go's HTTP/2 server.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
						|
	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if !s.addConn(listenerAddressForServeHTTP, st) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer s.removeConn(listenerAddressForServeHTTP, st)
 | 
						|
	s.serveStreams(st)
 | 
						|
}
 | 
						|
 | 
						|
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
 | 
						|
// If tracing is not enabled, it returns nil.
 | 
						|
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
 | 
						|
	if !EnableTracing {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	tr, ok := trace.FromContext(stream.Context())
 | 
						|
	if !ok {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	trInfo = &traceInfo{
 | 
						|
		tr: tr,
 | 
						|
		firstLine: firstLine{
 | 
						|
			client:     false,
 | 
						|
			remoteAddr: st.RemoteAddr(),
 | 
						|
		},
 | 
						|
	}
 | 
						|
	if dl, ok := stream.Context().Deadline(); ok {
 | 
						|
		trInfo.firstLine.deadline = time.Until(dl)
 | 
						|
	}
 | 
						|
	return trInfo
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
	if s.conns == nil {
 | 
						|
		st.Close()
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if s.drain {
 | 
						|
		// Transport added after we drained our existing conns: drain it
 | 
						|
		// immediately.
 | 
						|
		st.Drain()
 | 
						|
	}
 | 
						|
 | 
						|
	if s.conns[addr] == nil {
 | 
						|
		// Create a map entry if this is the first connection on this listener.
 | 
						|
		s.conns[addr] = make(map[transport.ServerTransport]bool)
 | 
						|
	}
 | 
						|
	s.conns[addr][st] = true
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) removeConn(addr string, st transport.ServerTransport) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
 | 
						|
	conns := s.conns[addr]
 | 
						|
	if conns != nil {
 | 
						|
		delete(conns, st)
 | 
						|
		if len(conns) == 0 {
 | 
						|
			// If the last connection for this address is being removed, also
 | 
						|
			// remove the map entry corresponding to the address. This is used
 | 
						|
			// in GracefulStop() when waiting for all connections to be closed.
 | 
						|
			delete(s.conns, addr)
 | 
						|
		}
 | 
						|
		s.cv.Broadcast()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
 | 
						|
	return &channelz.ServerInternalMetric{
 | 
						|
		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
 | 
						|
		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
 | 
						|
		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
 | 
						|
		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) incrCallsStarted() {
 | 
						|
	atomic.AddInt64(&s.czData.callsStarted, 1)
 | 
						|
	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) incrCallsSucceeded() {
 | 
						|
	atomic.AddInt64(&s.czData.callsSucceeded, 1)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) incrCallsFailed() {
 | 
						|
	atomic.AddInt64(&s.czData.callsFailed, 1)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
 | 
						|
	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
 | 
						|
	if err != nil {
 | 
						|
		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	compData, err := compress(data, cp, comp)
 | 
						|
	if err != nil {
 | 
						|
		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	hdr, payload := msgHeader(data, compData)
 | 
						|
	// TODO(dfawley): should we be checking len(data) instead?
 | 
						|
	if len(payload) > s.opts.maxSendMessageSize {
 | 
						|
		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
 | 
						|
	}
 | 
						|
	err = t.Write(stream, hdr, payload, opts)
 | 
						|
	if err == nil && s.opts.statsHandler != nil {
 | 
						|
		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// chainUnaryServerInterceptors chains all unary server interceptors into one.
 | 
						|
func chainUnaryServerInterceptors(s *Server) {
 | 
						|
	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
 | 
						|
	// be executed before any other chained interceptors.
 | 
						|
	interceptors := s.opts.chainUnaryInts
 | 
						|
	if s.opts.unaryInt != nil {
 | 
						|
		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
 | 
						|
	}
 | 
						|
 | 
						|
	var chainedInt UnaryServerInterceptor
 | 
						|
	if len(interceptors) == 0 {
 | 
						|
		chainedInt = nil
 | 
						|
	} else if len(interceptors) == 1 {
 | 
						|
		chainedInt = interceptors[0]
 | 
						|
	} else {
 | 
						|
		chainedInt = chainUnaryInterceptors(interceptors)
 | 
						|
	}
 | 
						|
 | 
						|
	s.opts.unaryInt = chainedInt
 | 
						|
}
 | 
						|
 | 
						|
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
 | 
						|
	return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
 | 
						|
		// the struct ensures the variables are allocated together, rather than separately, since we
 | 
						|
		// know they should be garbage collected together. This saves 1 allocation and decreases
 | 
						|
		// time/call by about 10% on the microbenchmark.
 | 
						|
		var state struct {
 | 
						|
			i    int
 | 
						|
			next UnaryHandler
 | 
						|
		}
 | 
						|
		state.next = func(ctx context.Context, req interface{}) (interface{}, error) {
 | 
						|
			if state.i == len(interceptors)-1 {
 | 
						|
				return interceptors[state.i](ctx, req, info, handler)
 | 
						|
			}
 | 
						|
			state.i++
 | 
						|
			return interceptors[state.i-1](ctx, req, info, state.next)
 | 
						|
		}
 | 
						|
		return state.next(ctx, req)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
 | 
						|
	sh := s.opts.statsHandler
 | 
						|
	if sh != nil || trInfo != nil || channelz.IsOn() {
 | 
						|
		if channelz.IsOn() {
 | 
						|
			s.incrCallsStarted()
 | 
						|
		}
 | 
						|
		var statsBegin *stats.Begin
 | 
						|
		if sh != nil {
 | 
						|
			beginTime := time.Now()
 | 
						|
			statsBegin = &stats.Begin{
 | 
						|
				BeginTime:      beginTime,
 | 
						|
				IsClientStream: false,
 | 
						|
				IsServerStream: false,
 | 
						|
			}
 | 
						|
			sh.HandleRPC(stream.Context(), statsBegin)
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&trInfo.firstLine, false)
 | 
						|
		}
 | 
						|
		// The deferred error handling for tracing, stats handler and channelz are
 | 
						|
		// combined into one function to reduce stack usage -- a defer takes ~56-64
 | 
						|
		// bytes on the stack, so overflowing the stack will require a stack
 | 
						|
		// re-allocation, which is expensive.
 | 
						|
		//
 | 
						|
		// To maintain behavior similar to separate deferred statements, statements
 | 
						|
		// should be executed in the reverse order. That is, tracing first, stats
 | 
						|
		// handler second, and channelz last. Note that panics *within* defers will
 | 
						|
		// lead to different behavior, but that's an acceptable compromise; that
 | 
						|
		// would be undefined behavior territory anyway.
 | 
						|
		defer func() {
 | 
						|
			if trInfo != nil {
 | 
						|
				if err != nil && err != io.EOF {
 | 
						|
					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
					trInfo.tr.SetError()
 | 
						|
				}
 | 
						|
				trInfo.tr.Finish()
 | 
						|
			}
 | 
						|
 | 
						|
			if sh != nil {
 | 
						|
				end := &stats.End{
 | 
						|
					BeginTime: statsBegin.BeginTime,
 | 
						|
					EndTime:   time.Now(),
 | 
						|
				}
 | 
						|
				if err != nil && err != io.EOF {
 | 
						|
					end.Error = toRPCErr(err)
 | 
						|
				}
 | 
						|
				sh.HandleRPC(stream.Context(), end)
 | 
						|
			}
 | 
						|
 | 
						|
			if channelz.IsOn() {
 | 
						|
				if err != nil && err != io.EOF {
 | 
						|
					s.incrCallsFailed()
 | 
						|
				} else {
 | 
						|
					s.incrCallsSucceeded()
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	binlog := binarylog.GetMethodLogger(stream.Method())
 | 
						|
	if binlog != nil {
 | 
						|
		ctx := stream.Context()
 | 
						|
		md, _ := metadata.FromIncomingContext(ctx)
 | 
						|
		logEntry := &binarylog.ClientHeader{
 | 
						|
			Header:     md,
 | 
						|
			MethodName: stream.Method(),
 | 
						|
			PeerAddr:   nil,
 | 
						|
		}
 | 
						|
		if deadline, ok := ctx.Deadline(); ok {
 | 
						|
			logEntry.Timeout = time.Until(deadline)
 | 
						|
			if logEntry.Timeout < 0 {
 | 
						|
				logEntry.Timeout = 0
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if a := md[":authority"]; len(a) > 0 {
 | 
						|
			logEntry.Authority = a[0]
 | 
						|
		}
 | 
						|
		if peer, ok := peer.FromContext(ctx); ok {
 | 
						|
			logEntry.PeerAddr = peer.Addr
 | 
						|
		}
 | 
						|
		binlog.Log(logEntry)
 | 
						|
	}
 | 
						|
 | 
						|
	// comp and cp are used for compression.  decomp and dc are used for
 | 
						|
	// decompression.  If comp and decomp are both set, they are the same;
 | 
						|
	// however they are kept separate to ensure that at most one of the
 | 
						|
	// compressor/decompressor variable pairs are set for use later.
 | 
						|
	var comp, decomp encoding.Compressor
 | 
						|
	var cp Compressor
 | 
						|
	var dc Decompressor
 | 
						|
 | 
						|
	// If dc is set and matches the stream's compression, use it.  Otherwise, try
 | 
						|
	// to find a matching registered compressor for decomp.
 | 
						|
	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
 | 
						|
		dc = s.opts.dc
 | 
						|
	} else if rc != "" && rc != encoding.Identity {
 | 
						|
		decomp = encoding.GetCompressor(rc)
 | 
						|
		if decomp == nil {
 | 
						|
			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
 | 
						|
			t.WriteStatus(stream, st)
 | 
						|
			return st.Err()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// If cp is set, use it.  Otherwise, attempt to compress the response using
 | 
						|
	// the incoming message compression method.
 | 
						|
	//
 | 
						|
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
 | 
						|
	if s.opts.cp != nil {
 | 
						|
		cp = s.opts.cp
 | 
						|
		stream.SetSendCompress(cp.Type())
 | 
						|
	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
 | 
						|
		// Legacy compressor not specified; attempt to respond with same encoding.
 | 
						|
		comp = encoding.GetCompressor(rc)
 | 
						|
		if comp != nil {
 | 
						|
			stream.SetSendCompress(rc)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	var payInfo *payloadInfo
 | 
						|
	if sh != nil || binlog != nil {
 | 
						|
		payInfo = &payloadInfo{}
 | 
						|
	}
 | 
						|
	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
 | 
						|
	if err != nil {
 | 
						|
		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
 | 
						|
			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if channelz.IsOn() {
 | 
						|
		t.IncrMsgRecv()
 | 
						|
	}
 | 
						|
	df := func(v interface{}) error {
 | 
						|
		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
 | 
						|
			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
 | 
						|
		}
 | 
						|
		if sh != nil {
 | 
						|
			sh.HandleRPC(stream.Context(), &stats.InPayload{
 | 
						|
				RecvTime:   time.Now(),
 | 
						|
				Payload:    v,
 | 
						|
				WireLength: payInfo.wireLength + headerLen,
 | 
						|
				Data:       d,
 | 
						|
				Length:     len(d),
 | 
						|
			})
 | 
						|
		}
 | 
						|
		if binlog != nil {
 | 
						|
			binlog.Log(&binarylog.ClientMessage{
 | 
						|
				Message: d,
 | 
						|
			})
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
 | 
						|
	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
 | 
						|
	if appErr != nil {
 | 
						|
		appStatus, ok := status.FromError(appErr)
 | 
						|
		if !ok {
 | 
						|
			// Convert non-status application error to a status error with code
 | 
						|
			// Unknown, but handle context errors specifically.
 | 
						|
			appStatus = status.FromContextError(appErr)
 | 
						|
			appErr = appStatus.Err()
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
 | 
						|
			trInfo.tr.SetError()
 | 
						|
		}
 | 
						|
		if e := t.WriteStatus(stream, appStatus); e != nil {
 | 
						|
			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 | 
						|
		}
 | 
						|
		if binlog != nil {
 | 
						|
			if h, _ := stream.Header(); h.Len() > 0 {
 | 
						|
				// Only log serverHeader if there was header. Otherwise it can
 | 
						|
				// be trailer only.
 | 
						|
				binlog.Log(&binarylog.ServerHeader{
 | 
						|
					Header: h,
 | 
						|
				})
 | 
						|
			}
 | 
						|
			binlog.Log(&binarylog.ServerTrailer{
 | 
						|
				Trailer: stream.Trailer(),
 | 
						|
				Err:     appErr,
 | 
						|
			})
 | 
						|
		}
 | 
						|
		return appErr
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.LazyLog(stringer("OK"), false)
 | 
						|
	}
 | 
						|
	opts := &transport.Options{Last: true}
 | 
						|
 | 
						|
	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
 | 
						|
		if err == io.EOF {
 | 
						|
			// The entire stream is done (for unary RPC only).
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if sts, ok := status.FromError(err); ok {
 | 
						|
			if e := t.WriteStatus(stream, sts); e != nil {
 | 
						|
				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			switch st := err.(type) {
 | 
						|
			case transport.ConnectionError:
 | 
						|
				// Nothing to do here.
 | 
						|
			default:
 | 
						|
				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if binlog != nil {
 | 
						|
			h, _ := stream.Header()
 | 
						|
			binlog.Log(&binarylog.ServerHeader{
 | 
						|
				Header: h,
 | 
						|
			})
 | 
						|
			binlog.Log(&binarylog.ServerTrailer{
 | 
						|
				Trailer: stream.Trailer(),
 | 
						|
				Err:     appErr,
 | 
						|
			})
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if binlog != nil {
 | 
						|
		h, _ := stream.Header()
 | 
						|
		binlog.Log(&binarylog.ServerHeader{
 | 
						|
			Header: h,
 | 
						|
		})
 | 
						|
		binlog.Log(&binarylog.ServerMessage{
 | 
						|
			Message: reply,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	if channelz.IsOn() {
 | 
						|
		t.IncrMsgSent()
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
 | 
						|
	}
 | 
						|
	// TODO: Should we be logging if writing status failed here, like above?
 | 
						|
	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
 | 
						|
	// error or allow the stats handler to see it?
 | 
						|
	err = t.WriteStatus(stream, statusOK)
 | 
						|
	if binlog != nil {
 | 
						|
		binlog.Log(&binarylog.ServerTrailer{
 | 
						|
			Trailer: stream.Trailer(),
 | 
						|
			Err:     appErr,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// chainStreamServerInterceptors chains all stream server interceptors into one.
 | 
						|
func chainStreamServerInterceptors(s *Server) {
 | 
						|
	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
 | 
						|
	// be executed before any other chained interceptors.
 | 
						|
	interceptors := s.opts.chainStreamInts
 | 
						|
	if s.opts.streamInt != nil {
 | 
						|
		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
 | 
						|
	}
 | 
						|
 | 
						|
	var chainedInt StreamServerInterceptor
 | 
						|
	if len(interceptors) == 0 {
 | 
						|
		chainedInt = nil
 | 
						|
	} else if len(interceptors) == 1 {
 | 
						|
		chainedInt = interceptors[0]
 | 
						|
	} else {
 | 
						|
		chainedInt = chainStreamInterceptors(interceptors)
 | 
						|
	}
 | 
						|
 | 
						|
	s.opts.streamInt = chainedInt
 | 
						|
}
 | 
						|
 | 
						|
func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
 | 
						|
	return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
 | 
						|
		// the struct ensures the variables are allocated together, rather than separately, since we
 | 
						|
		// know they should be garbage collected together. This saves 1 allocation and decreases
 | 
						|
		// time/call by about 10% on the microbenchmark.
 | 
						|
		var state struct {
 | 
						|
			i    int
 | 
						|
			next StreamHandler
 | 
						|
		}
 | 
						|
		state.next = func(srv interface{}, ss ServerStream) error {
 | 
						|
			if state.i == len(interceptors)-1 {
 | 
						|
				return interceptors[state.i](srv, ss, info, handler)
 | 
						|
			}
 | 
						|
			state.i++
 | 
						|
			return interceptors[state.i-1](srv, ss, info, state.next)
 | 
						|
		}
 | 
						|
		return state.next(srv, ss)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
 | 
						|
	if channelz.IsOn() {
 | 
						|
		s.incrCallsStarted()
 | 
						|
	}
 | 
						|
	sh := s.opts.statsHandler
 | 
						|
	var statsBegin *stats.Begin
 | 
						|
	if sh != nil {
 | 
						|
		beginTime := time.Now()
 | 
						|
		statsBegin = &stats.Begin{
 | 
						|
			BeginTime:      beginTime,
 | 
						|
			IsClientStream: sd.ClientStreams,
 | 
						|
			IsServerStream: sd.ServerStreams,
 | 
						|
		}
 | 
						|
		sh.HandleRPC(stream.Context(), statsBegin)
 | 
						|
	}
 | 
						|
	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
 | 
						|
	ss := &serverStream{
 | 
						|
		ctx:                   ctx,
 | 
						|
		t:                     t,
 | 
						|
		s:                     stream,
 | 
						|
		p:                     &parser{r: stream},
 | 
						|
		codec:                 s.getCodec(stream.ContentSubtype()),
 | 
						|
		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
 | 
						|
		maxSendMessageSize:    s.opts.maxSendMessageSize,
 | 
						|
		trInfo:                trInfo,
 | 
						|
		statsHandler:          sh,
 | 
						|
	}
 | 
						|
 | 
						|
	if sh != nil || trInfo != nil || channelz.IsOn() {
 | 
						|
		// See comment in processUnaryRPC on defers.
 | 
						|
		defer func() {
 | 
						|
			if trInfo != nil {
 | 
						|
				ss.mu.Lock()
 | 
						|
				if err != nil && err != io.EOF {
 | 
						|
					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
					ss.trInfo.tr.SetError()
 | 
						|
				}
 | 
						|
				ss.trInfo.tr.Finish()
 | 
						|
				ss.trInfo.tr = nil
 | 
						|
				ss.mu.Unlock()
 | 
						|
			}
 | 
						|
 | 
						|
			if sh != nil {
 | 
						|
				end := &stats.End{
 | 
						|
					BeginTime: statsBegin.BeginTime,
 | 
						|
					EndTime:   time.Now(),
 | 
						|
				}
 | 
						|
				if err != nil && err != io.EOF {
 | 
						|
					end.Error = toRPCErr(err)
 | 
						|
				}
 | 
						|
				sh.HandleRPC(stream.Context(), end)
 | 
						|
			}
 | 
						|
 | 
						|
			if channelz.IsOn() {
 | 
						|
				if err != nil && err != io.EOF {
 | 
						|
					s.incrCallsFailed()
 | 
						|
				} else {
 | 
						|
					s.incrCallsSucceeded()
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	ss.binlog = binarylog.GetMethodLogger(stream.Method())
 | 
						|
	if ss.binlog != nil {
 | 
						|
		md, _ := metadata.FromIncomingContext(ctx)
 | 
						|
		logEntry := &binarylog.ClientHeader{
 | 
						|
			Header:     md,
 | 
						|
			MethodName: stream.Method(),
 | 
						|
			PeerAddr:   nil,
 | 
						|
		}
 | 
						|
		if deadline, ok := ctx.Deadline(); ok {
 | 
						|
			logEntry.Timeout = time.Until(deadline)
 | 
						|
			if logEntry.Timeout < 0 {
 | 
						|
				logEntry.Timeout = 0
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if a := md[":authority"]; len(a) > 0 {
 | 
						|
			logEntry.Authority = a[0]
 | 
						|
		}
 | 
						|
		if peer, ok := peer.FromContext(ss.Context()); ok {
 | 
						|
			logEntry.PeerAddr = peer.Addr
 | 
						|
		}
 | 
						|
		ss.binlog.Log(logEntry)
 | 
						|
	}
 | 
						|
 | 
						|
	// If dc is set and matches the stream's compression, use it.  Otherwise, try
 | 
						|
	// to find a matching registered compressor for decomp.
 | 
						|
	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
 | 
						|
		ss.dc = s.opts.dc
 | 
						|
	} else if rc != "" && rc != encoding.Identity {
 | 
						|
		ss.decomp = encoding.GetCompressor(rc)
 | 
						|
		if ss.decomp == nil {
 | 
						|
			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
 | 
						|
			t.WriteStatus(ss.s, st)
 | 
						|
			return st.Err()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// If cp is set, use it.  Otherwise, attempt to compress the response using
 | 
						|
	// the incoming message compression method.
 | 
						|
	//
 | 
						|
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
 | 
						|
	if s.opts.cp != nil {
 | 
						|
		ss.cp = s.opts.cp
 | 
						|
		stream.SetSendCompress(s.opts.cp.Type())
 | 
						|
	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
 | 
						|
		// Legacy compressor not specified; attempt to respond with same encoding.
 | 
						|
		ss.comp = encoding.GetCompressor(rc)
 | 
						|
		if ss.comp != nil {
 | 
						|
			stream.SetSendCompress(rc)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
 | 
						|
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 | 
						|
	}
 | 
						|
	var appErr error
 | 
						|
	var server interface{}
 | 
						|
	if info != nil {
 | 
						|
		server = info.serviceImpl
 | 
						|
	}
 | 
						|
	if s.opts.streamInt == nil {
 | 
						|
		appErr = sd.Handler(server, ss)
 | 
						|
	} else {
 | 
						|
		info := &StreamServerInfo{
 | 
						|
			FullMethod:     stream.Method(),
 | 
						|
			IsClientStream: sd.ClientStreams,
 | 
						|
			IsServerStream: sd.ServerStreams,
 | 
						|
		}
 | 
						|
		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
 | 
						|
	}
 | 
						|
	if appErr != nil {
 | 
						|
		appStatus, ok := status.FromError(appErr)
 | 
						|
		if !ok {
 | 
						|
			// Convert non-status application error to a status error with code
 | 
						|
			// Unknown, but handle context errors specifically.
 | 
						|
			appStatus = status.FromContextError(appErr)
 | 
						|
			appErr = appStatus.Err()
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			ss.mu.Lock()
 | 
						|
			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
 | 
						|
			ss.trInfo.tr.SetError()
 | 
						|
			ss.mu.Unlock()
 | 
						|
		}
 | 
						|
		t.WriteStatus(ss.s, appStatus)
 | 
						|
		if ss.binlog != nil {
 | 
						|
			ss.binlog.Log(&binarylog.ServerTrailer{
 | 
						|
				Trailer: ss.s.Trailer(),
 | 
						|
				Err:     appErr,
 | 
						|
			})
 | 
						|
		}
 | 
						|
		// TODO: Should we log an error from WriteStatus here and below?
 | 
						|
		return appErr
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		ss.mu.Lock()
 | 
						|
		ss.trInfo.tr.LazyLog(stringer("OK"), false)
 | 
						|
		ss.mu.Unlock()
 | 
						|
	}
 | 
						|
	err = t.WriteStatus(ss.s, statusOK)
 | 
						|
	if ss.binlog != nil {
 | 
						|
		ss.binlog.Log(&binarylog.ServerTrailer{
 | 
						|
			Trailer: ss.s.Trailer(),
 | 
						|
			Err:     appErr,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
 | 
						|
	sm := stream.Method()
 | 
						|
	if sm != "" && sm[0] == '/' {
 | 
						|
		sm = sm[1:]
 | 
						|
	}
 | 
						|
	pos := strings.LastIndex(sm, "/")
 | 
						|
	if pos == -1 {
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
 | 
						|
			trInfo.tr.SetError()
 | 
						|
		}
 | 
						|
		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
 | 
						|
		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
 | 
						|
			if trInfo != nil {
 | 
						|
				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
				trInfo.tr.SetError()
 | 
						|
			}
 | 
						|
			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.Finish()
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
	service := sm[:pos]
 | 
						|
	method := sm[pos+1:]
 | 
						|
 | 
						|
	srv, knownService := s.services[service]
 | 
						|
	if knownService {
 | 
						|
		if md, ok := srv.methods[method]; ok {
 | 
						|
			s.processUnaryRPC(t, stream, srv, md, trInfo)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if sd, ok := srv.streams[method]; ok {
 | 
						|
			s.processStreamingRPC(t, stream, srv, sd, trInfo)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Unknown service, or known server unknown method.
 | 
						|
	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
 | 
						|
		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	var errDesc string
 | 
						|
	if !knownService {
 | 
						|
		errDesc = fmt.Sprintf("unknown service %v", service)
 | 
						|
	} else {
 | 
						|
		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.LazyPrintf("%s", errDesc)
 | 
						|
		trInfo.tr.SetError()
 | 
						|
	}
 | 
						|
	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
			trInfo.tr.SetError()
 | 
						|
		}
 | 
						|
		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.Finish()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// The key to save ServerTransportStream in the context.
 | 
						|
type streamKey struct{}
 | 
						|
 | 
						|
// NewContextWithServerTransportStream creates a new context from ctx and
 | 
						|
// attaches stream to it.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
 | 
						|
	return context.WithValue(ctx, streamKey{}, stream)
 | 
						|
}
 | 
						|
 | 
						|
// ServerTransportStream is a minimal interface that a transport stream must
 | 
						|
// implement. This can be used to mock an actual transport stream for tests of
 | 
						|
// handler code that use, for example, grpc.SetHeader (which requires some
 | 
						|
// stream to be in context).
 | 
						|
//
 | 
						|
// See also NewContextWithServerTransportStream.
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
type ServerTransportStream interface {
 | 
						|
	Method() string
 | 
						|
	SetHeader(md metadata.MD) error
 | 
						|
	SendHeader(md metadata.MD) error
 | 
						|
	SetTrailer(md metadata.MD) error
 | 
						|
}
 | 
						|
 | 
						|
// ServerTransportStreamFromContext returns the ServerTransportStream saved in
 | 
						|
// ctx. Returns nil if the given context has no stream associated with it
 | 
						|
// (which implies it is not an RPC invocation context).
 | 
						|
//
 | 
						|
// Experimental
 | 
						|
//
 | 
						|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
 | 
						|
// later release.
 | 
						|
func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
 | 
						|
	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// Stop stops the gRPC server. It immediately closes all open
 | 
						|
// connections and listeners.
 | 
						|
// It cancels all active RPCs on the server side and the corresponding
 | 
						|
// pending RPCs on the client side will get notified by connection
 | 
						|
// errors.
 | 
						|
func (s *Server) Stop() {
 | 
						|
	s.quit.Fire()
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		s.serveWG.Wait()
 | 
						|
		s.done.Fire()
 | 
						|
	}()
 | 
						|
 | 
						|
	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
 | 
						|
 | 
						|
	s.mu.Lock()
 | 
						|
	listeners := s.lis
 | 
						|
	s.lis = nil
 | 
						|
	conns := s.conns
 | 
						|
	s.conns = nil
 | 
						|
	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
 | 
						|
	s.cv.Broadcast()
 | 
						|
	s.mu.Unlock()
 | 
						|
 | 
						|
	for lis := range listeners {
 | 
						|
		lis.Close()
 | 
						|
	}
 | 
						|
	for _, cs := range conns {
 | 
						|
		for st := range cs {
 | 
						|
			st.Close()
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if s.opts.numServerWorkers > 0 {
 | 
						|
		s.stopServerWorkers()
 | 
						|
	}
 | 
						|
 | 
						|
	s.mu.Lock()
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Finish()
 | 
						|
		s.events = nil
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// GracefulStop stops the gRPC server gracefully. It stops the server from
 | 
						|
// accepting new connections and RPCs and blocks until all the pending RPCs are
 | 
						|
// finished.
 | 
						|
func (s *Server) GracefulStop() {
 | 
						|
	s.quit.Fire()
 | 
						|
	defer s.done.Fire()
 | 
						|
 | 
						|
	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
 | 
						|
	s.mu.Lock()
 | 
						|
	if s.conns == nil {
 | 
						|
		s.mu.Unlock()
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	for lis := range s.lis {
 | 
						|
		lis.Close()
 | 
						|
	}
 | 
						|
	s.lis = nil
 | 
						|
	if !s.drain {
 | 
						|
		for _, conns := range s.conns {
 | 
						|
			for st := range conns {
 | 
						|
				st.Drain()
 | 
						|
			}
 | 
						|
		}
 | 
						|
		s.drain = true
 | 
						|
	}
 | 
						|
 | 
						|
	// Wait for serving threads to be ready to exit.  Only then can we be sure no
 | 
						|
	// new conns will be created.
 | 
						|
	s.mu.Unlock()
 | 
						|
	s.serveWG.Wait()
 | 
						|
	s.mu.Lock()
 | 
						|
 | 
						|
	for len(s.conns) != 0 {
 | 
						|
		s.cv.Wait()
 | 
						|
	}
 | 
						|
	s.conns = nil
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Finish()
 | 
						|
		s.events = nil
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// contentSubtype must be lowercase
 | 
						|
// cannot return nil
 | 
						|
func (s *Server) getCodec(contentSubtype string) baseCodec {
 | 
						|
	if s.opts.codec != nil {
 | 
						|
		return s.opts.codec
 | 
						|
	}
 | 
						|
	if contentSubtype == "" {
 | 
						|
		return encoding.GetCodec(proto.Name)
 | 
						|
	}
 | 
						|
	codec := encoding.GetCodec(contentSubtype)
 | 
						|
	if codec == nil {
 | 
						|
		return encoding.GetCodec(proto.Name)
 | 
						|
	}
 | 
						|
	return codec
 | 
						|
}
 | 
						|
 | 
						|
// SetHeader sets the header metadata to be sent from the server to the client.
 | 
						|
// The context provided must be the context passed to the server's handler.
 | 
						|
//
 | 
						|
// Streaming RPCs should prefer the SetHeader method of the ServerStream.
 | 
						|
//
 | 
						|
// When called multiple times, all the provided metadata will be merged.  All
 | 
						|
// the metadata will be sent out when one of the following happens:
 | 
						|
//
 | 
						|
// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
 | 
						|
// - The first response message is sent.  For unary handlers, this occurs when
 | 
						|
//   the handler returns; for streaming handlers, this can happen when stream's
 | 
						|
//   SendMsg method is called.
 | 
						|
// - An RPC status is sent out (error or success).  This occurs when the handler
 | 
						|
//   returns.
 | 
						|
//
 | 
						|
// SetHeader will fail if called after any of the events above.
 | 
						|
//
 | 
						|
// The error returned is compatible with the status package.  However, the
 | 
						|
// status code will often not match the RPC status as seen by the client
 | 
						|
// application, and therefore, should not be relied upon for this purpose.
 | 
						|
func SetHeader(ctx context.Context, md metadata.MD) error {
 | 
						|
	if md.Len() == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	stream := ServerTransportStreamFromContext(ctx)
 | 
						|
	if stream == nil {
 | 
						|
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
 | 
						|
	}
 | 
						|
	return stream.SetHeader(md)
 | 
						|
}
 | 
						|
 | 
						|
// SendHeader sends header metadata. It may be called at most once, and may not
 | 
						|
// be called after any event that causes headers to be sent (see SetHeader for
 | 
						|
// a complete list).  The provided md and headers set by SetHeader() will be
 | 
						|
// sent.
 | 
						|
//
 | 
						|
// The error returned is compatible with the status package.  However, the
 | 
						|
// status code will often not match the RPC status as seen by the client
 | 
						|
// application, and therefore, should not be relied upon for this purpose.
 | 
						|
func SendHeader(ctx context.Context, md metadata.MD) error {
 | 
						|
	stream := ServerTransportStreamFromContext(ctx)
 | 
						|
	if stream == nil {
 | 
						|
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
 | 
						|
	}
 | 
						|
	if err := stream.SendHeader(md); err != nil {
 | 
						|
		return toRPCErr(err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
 | 
						|
// When called more than once, all the provided metadata will be merged.
 | 
						|
//
 | 
						|
// The error returned is compatible with the status package.  However, the
 | 
						|
// status code will often not match the RPC status as seen by the client
 | 
						|
// application, and therefore, should not be relied upon for this purpose.
 | 
						|
func SetTrailer(ctx context.Context, md metadata.MD) error {
 | 
						|
	if md.Len() == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	stream := ServerTransportStreamFromContext(ctx)
 | 
						|
	if stream == nil {
 | 
						|
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
 | 
						|
	}
 | 
						|
	return stream.SetTrailer(md)
 | 
						|
}
 | 
						|
 | 
						|
// Method returns the method string for the server context.  The returned
 | 
						|
// string is in the format of "/service/method".
 | 
						|
func Method(ctx context.Context) (string, bool) {
 | 
						|
	s := ServerTransportStreamFromContext(ctx)
 | 
						|
	if s == nil {
 | 
						|
		return "", false
 | 
						|
	}
 | 
						|
	return s.Method(), true
 | 
						|
}
 | 
						|
 | 
						|
type channelzServer struct {
 | 
						|
	s *Server
 | 
						|
}
 | 
						|
 | 
						|
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
 | 
						|
	return c.s.channelzMetric()
 | 
						|
}
 |