vendor: update grpc to 1.10.1

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day
2018-03-05 17:43:48 -08:00
parent 076cf7f16e
commit b5cbe7b590
52 changed files with 4516 additions and 2202 deletions

View File

@@ -32,11 +32,15 @@ import (
"sync"
"time"
"io/ioutil"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/keepalive"
@@ -89,18 +93,20 @@ type Server struct {
conns map[io.Closer]bool
serve bool
drain bool
ctx context.Context
cancel context.CancelFunc
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
// and all the transport goes away.
cv *sync.Cond
cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info
events trace.EventLog
quit chan struct{}
done chan struct{}
quitOnce sync.Once
doneOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
}
type options struct {
creds credentials.TransportCredentials
codec Codec
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
@@ -177,20 +183,32 @@ func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
}
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
o.codec = codec
}
}
// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages. For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression. By
// default, server messages will be sent using the same compressor with which
// request messages were sent.
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCCompressor(cp Compressor) ServerOption {
return func(o *options) {
o.cp = cp
}
}
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
// messages. It has higher priority than decompressors registered via
// encoding.RegisterCompressor.
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCDecompressor(dc Decompressor) ServerOption {
return func(o *options) {
o.dc = dc
@@ -297,6 +315,8 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
// connection establishment (up to and including HTTP/2 handshaking) for all
// new connections. If this is not set, the default is 120 seconds. A zero or
// negative value will result in an immediate timeout.
//
// This API is EXPERIMENTAL.
func ConnectionTimeout(d time.Duration) ServerOption {
return func(o *options) {
o.connectionTimeout = d
@@ -310,18 +330,15 @@ func NewServer(opt ...ServerOption) *Server {
for _, o := range opt {
o(&opts)
}
if opts.codec == nil {
// Set the default codec.
opts.codec = protoCodec{}
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[io.Closer]bool),
m: make(map[string]*service),
quit: make(chan struct{}),
done: make(chan struct{}),
}
s.cv = sync.NewCond(&s.mu)
s.ctx, s.cancel = context.WithCancel(context.Background())
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
@@ -430,11 +447,9 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo {
return ret
}
var (
// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
ErrServerStopped = errors.New("grpc: the server has been stopped")
)
// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
var ErrServerStopped = errors.New("grpc: the server has been stopped")
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if s.opts.creds == nil {
@@ -448,16 +463,29 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve always returns non-nil error.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
s.serve = true
if s.lis == nil {
// Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
select {
// Stop or GracefulStop called; block until done and return nil.
case <-s.quit:
<-s.done
default:
}
}()
s.lis[lis] = true
s.mu.Unlock()
defer func() {
@@ -491,25 +519,39 @@ func (s *Server) Serve(lis net.Listener) error {
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
case <-s.ctx.Done():
case <-s.quit:
timer.Stop()
return nil
}
timer.Stop()
continue
}
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
select {
case <-s.quit:
return nil
default:
}
return err
}
tempDelay = 0
// Start a new goroutine to deal with rawConn
// so we don't stall this Accept loop goroutine.
go s.handleRawConn(rawConn)
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
// handleRawConn is run in its own goroutine and handles a just-accepted
// connection that has not had any I/O performed on it yet.
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
@@ -534,17 +576,28 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}
s.mu.Unlock()
var serve func()
c := conn.(io.Closer)
if s.opts.useHandlerImpl {
rawConn.SetDeadline(time.Time{})
s.serveUsingHandler(conn)
serve = func() { s.serveUsingHandler(conn) }
} else {
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
rawConn.SetDeadline(time.Time{})
s.serveStreams(st)
c = st
serve = func() { s.serveStreams(st) }
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(c) {
return
}
go func() {
serve()
s.removeConn(c)
}()
}
// newHTTP2Transport sets up a http/2 transport (using the
@@ -571,15 +624,10 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return nil
}
if !s.addConn(st) {
st.Close()
return nil
}
return st
}
func (s *Server) serveStreams(st transport.ServerTransport) {
defer s.removeConn(st)
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
@@ -613,11 +661,6 @@ var _ http.Handler = (*Server)(nil)
//
// conn is the *tls.Conn that's already been authenticated.
func (s *Server) serveUsingHandler(conn net.Conn) {
if !s.addConn(conn) {
conn.Close()
return
}
defer s.removeConn(conn)
h2s := &http2.Server{
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
}
@@ -651,13 +694,12 @@ func (s *Server) serveUsingHandler(conn net.Conn) {
// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
// and subject to change.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
st, err := transport.NewServerHandlerTransport(w, r)
st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !s.addConn(st) {
st.Close()
return
}
defer s.removeConn(st)
@@ -687,9 +729,15 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
func (s *Server) addConn(c io.Closer) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil || s.drain {
if s.conns == nil {
c.Close()
return false
}
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
c.(transport.ServerTransport).Drain()
}
s.conns[c] = true
return true
}
@@ -703,18 +751,14 @@ func (s *Server) removeConn(c io.Closer) {
}
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
var (
cbuf *bytes.Buffer
outPayload *stats.OutPayload
)
if cp != nil {
cbuf = new(bytes.Buffer)
}
if s.opts.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
hdr, data, err := encode(s.getCodec(stream.ContentSubtype()), msg, cp, outPayload, comp)
if err != nil {
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
@@ -758,10 +802,43 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}()
}
if s.opts.cp != nil {
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
stream.SetSendCompress(s.opts.cp.Type())
// comp and cp are used for compression. decomp and dc are used for
// decompression. If comp and decomp are both set, they are the same;
// however they are kept separate to ensure that at most one of the
// compressor/decompressor variable pairs are set for use later.
var comp, decomp encoding.Compressor
var cp Compressor
var dc Decompressor
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
dc = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
decomp = encoding.GetCompressor(rc)
if decomp == nil {
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
t.WriteStatus(stream, st)
return st.Err()
}
}
// If cp is set, use it. Otherwise, attempt to compress the response using
// the incoming message compression method.
//
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
cp = s.opts.cp
stream.SetSendCompress(cp.Type())
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
stream.SetSendCompress(rc)
}
}
p := &parser{r: stream}
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
if err == io.EOF {
@@ -769,7 +846,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return err
}
if err == io.ErrUnexpectedEOF {
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
if err != nil {
if st, ok := status.FromError(err); ok {
@@ -790,19 +867,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return err
}
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
if st, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, st); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
return err
}
if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
if e := t.WriteStatus(stream, st); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
// TODO checkRecvPayload always return RPC error. Add a return here if necessary.
return st.Err()
}
var inPayload *stats.InPayload
if sh != nil {
@@ -816,9 +885,17 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if pf == compressionMade {
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
if err != nil {
return Errorf(codes.Internal, err.Error())
if dc != nil {
req, err = dc.Do(bytes.NewReader(req))
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
} else {
tmp, _ := decomp.Decompress(bytes.NewReader(req))
req, err = ioutil.ReadAll(tmp)
if err != nil {
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
}
}
if len(req) > s.opts.maxReceiveMessageSize {
@@ -826,7 +903,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// java implementation.
return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
}
if err := s.opts.codec.Unmarshal(req, v); err != nil {
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if inPayload != nil {
@@ -864,7 +941,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Last: true,
Delay: false,
}
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
@@ -913,21 +991,45 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
sh.HandleRPC(stream.Context(), end)
}()
}
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
t: t,
s: stream,
p: &parser{r: stream},
codec: s.opts.codec,
cp: s.opts.cp,
dc: s.opts.dc,
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
statsHandler: sh,
}
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
ss.dc = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
ss.decomp = encoding.GetCompressor(rc)
if ss.decomp == nil {
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
t.WriteStatus(ss.s, st)
return st.Err()
}
}
// If cp is set, use it. Otherwise, attempt to compress the response using
// the incoming message compression method.
//
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
ss.cp = s.opts.cp
stream.SetSendCompress(s.opts.cp.Type())
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
ss.comp = encoding.GetCompressor(rc)
if ss.comp != nil {
stream.SetSendCompress(rc)
}
}
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
@@ -1071,6 +1173,17 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
s.quitOnce.Do(func() {
close(s.quit)
})
defer func() {
s.serveWG.Wait()
s.doneOnce.Do(func() {
close(s.done)
})
}()
s.mu.Lock()
listeners := s.lis
s.lis = nil
@@ -1088,7 +1201,6 @@ func (s *Server) Stop() {
}
s.mu.Lock()
s.cancel()
if s.events != nil {
s.events.Finish()
s.events = nil
@@ -1100,22 +1212,38 @@ func (s *Server) Stop() {
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
s.quitOnce.Do(func() {
close(s.quit)
})
defer func() {
s.doneOnce.Do(func() {
close(s.done)
})
}()
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
s.mu.Unlock()
return
}
for lis := range s.lis {
lis.Close()
}
s.lis = nil
s.cancel()
if !s.drain {
for c := range s.conns {
c.(transport.ServerTransport).Drain()
}
s.drain = true
}
// Wait for serving threads to be ready to exit. Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
s.serveWG.Wait()
s.mu.Lock()
for len(s.conns) != 0 {
s.cv.Wait()
}
@@ -1124,26 +1252,29 @@ func (s *Server) GracefulStop() {
s.events.Finish()
s.events = nil
}
s.mu.Unlock()
}
func init() {
internal.TestingCloseConns = func(arg interface{}) {
arg.(*Server).testingCloseConns()
}
internal.TestingUseHandlerImpl = func(arg interface{}) {
arg.(*Server).opts.useHandlerImpl = true
}
}
// testingCloseConns closes all existing transports but keeps s.lis
// accepting new connections.
func (s *Server) testingCloseConns() {
s.mu.Lock()
for c := range s.conns {
c.Close()
delete(s.conns, c)
// contentSubtype must be lowercase
// cannot return nil
func (s *Server) getCodec(contentSubtype string) baseCodec {
if s.opts.codec != nil {
return s.opts.codec
}
s.mu.Unlock()
if contentSubtype == "" {
return encoding.GetCodec(proto.Name)
}
codec := encoding.GetCodec(contentSubtype)
if codec == nil {
return encoding.GetCodec(proto.Name)
}
return codec
}
// SetHeader sets the header metadata.
@@ -1158,7 +1289,7 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetHeader(md)
}
@@ -1168,7 +1299,7 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
func SendHeader(ctx context.Context, md metadata.MD) error {
stream, ok := transport.StreamFromContext(ctx)
if !ok {
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
t := stream.ServerTransport()
if t == nil {
@@ -1188,7 +1319,7 @@ func SetTrailer(ctx context.Context, md metadata.MD) error {
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetTrailer(md)
}