Bump gRPC to v1.7.4

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2017-12-05 21:20:38 -08:00
parent 1eea02c23b
commit ce3e32680d
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
8 changed files with 86 additions and 62 deletions

View File

@ -25,7 +25,7 @@ github.com/pmezard/go-difflib v1.0.0
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6 github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c
golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6 golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6
google.golang.org/grpc v1.7.2 google.golang.org/grpc v1.7.4
github.com/pkg/errors v0.8.0 github.com/pkg/errors v0.8.0
github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448 github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448
golang.org/x/sys 314a259e304ff91bd6985da2a7149bbf91237993 https://github.com/golang/sys golang.org/x/sys 314a259e304ff91bd6985da2a7149bbf91237993 https://github.com/golang/sys

View File

@ -171,7 +171,9 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err return nil, err
} }
acbw := &acBalancerWrapper{ac: ac} acbw := &acBalancerWrapper{ac: ac}
ac.mu.Lock()
ac.acbw = acbw ac.acbw = acbw
ac.mu.Unlock()
return acbw, nil return acbw, nil
} }
@ -228,7 +230,9 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
return return
} }
acbw.ac = ac acbw.ac = ac
ac.mu.Lock()
ac.acbw = acbw ac.acbw = acbw
ac.mu.Unlock()
if acState != connectivity.Idle { if acState != connectivity.Idle {
ac.connect(false) ac.connect(false)
} }

View File

@ -929,6 +929,16 @@ func (ac *addrConn) resetTransport() error {
newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout) newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout)
if err != nil { if err != nil {
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
ac.mu.Lock()
if ac.state != connectivity.Shutdown {
ac.state = connectivity.TransientFailure
if ac.cc.balancerWrapper != nil {
ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
} else {
ac.cc.csMgr.updateState(ac.state)
}
}
ac.mu.Unlock()
return err return err
} }
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr) grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)

View File

@ -91,10 +91,14 @@ type TransportCredentials interface {
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true). // (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
// If the returned error is a wrapper error, implementations should make sure that // If the returned error is a wrapper error, implementations should make sure that
// the error implements Temporary() to have the correct retry behaviors. // the error implements Temporary() to have the correct retry behaviors.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error) ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
// ServerHandshake does the authentication handshake for servers. It returns // ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about // the authenticated connection and the corresponding auth information about
// the connection. // the connection.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
ServerHandshake(net.Conn) (net.Conn, AuthInfo, error) ServerHandshake(net.Conn) (net.Conn, AuthInfo, error)
// Info provides the ProtocolInfo of this TransportCredentials. // Info provides the ProtocolInfo of this TransportCredentials.
Info() ProtocolInfo Info() ProtocolInfo

View File

@ -567,6 +567,6 @@ const SupportPackageIsVersion3 = true
const SupportPackageIsVersion4 = true const SupportPackageIsVersion4 = true
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.7.2" const Version = "1.7.4"
const grpcUA = "grpc-go/" + Version const grpcUA = "grpc-go/" + Version

View File

@ -118,11 +118,13 @@ type options struct {
initialConnWindowSize int32 initialConnWindowSize int32
writeBufferSize int writeBufferSize int
readBufferSize int readBufferSize int
connectionTimeout time.Duration
} }
var defaultServerOptions = options{ var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
} }
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
@ -291,6 +293,16 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
} }
} }
// ConnectionTimeout returns a ServerOption that sets the timeout for
// connection establishment (up to and including HTTP/2 handshaking) for all
// new connections. If this is not set, the default is 120 seconds. A zero or
// negative value will result in an immediate timeout.
func ConnectionTimeout(d time.Duration) ServerOption {
return func(o *options) {
o.connectionTimeout = d
}
}
// NewServer creates a gRPC server which has no service registered and has not // NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet. // started to accept requests yet.
func NewServer(opt ...ServerOption) *Server { func NewServer(opt ...ServerOption) *Server {
@ -499,16 +511,18 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn is run in its own goroutine and handles a just-accepted // handleRawConn is run in its own goroutine and handles a just-accepted
// connection that has not had any I/O performed on it yet. // connection that has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) { func (s *Server) handleRawConn(rawConn net.Conn) {
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn) conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil { if err != nil {
s.mu.Lock() s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock() s.mu.Unlock()
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
// If serverHandShake returns ErrConnDispatched, keep rawConn open. // If serverHandshake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched { if err != credentials.ErrConnDispatched {
rawConn.Close() rawConn.Close()
} }
rawConn.SetDeadline(time.Time{})
return return
} }
@ -521,18 +535,21 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.mu.Unlock() s.mu.Unlock()
if s.opts.useHandlerImpl { if s.opts.useHandlerImpl {
rawConn.SetDeadline(time.Time{})
s.serveUsingHandler(conn) s.serveUsingHandler(conn)
} else { } else {
s.serveHTTP2Transport(conn, authInfo) st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
rawConn.SetDeadline(time.Time{})
s.serveStreams(st)
} }
} }
// serveHTTP2Transport sets up a http/2 transport (using the // newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go) and // gRPC http2 server transport in transport/http2_server.go).
// serves streams on it. func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
// This is run in its own goroutine (it does network I/O in
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{ config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams, MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo, AuthInfo: authInfo,
@ -552,13 +569,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
s.mu.Unlock() s.mu.Unlock()
c.Close() c.Close()
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return return nil
} }
if !s.addConn(st) { if !s.addConn(st) {
st.Close() st.Close()
return return nil
} }
s.serveStreams(st) return st
} }
func (s *Server) serveStreams(st transport.ServerTransport) { func (s *Server) serveStreams(st transport.ServerTransport) {

View File

@ -123,10 +123,9 @@ type serverHandlerTransport struct {
// when WriteStatus is called. // when WriteStatus is called.
writes chan func() writes chan func()
mu sync.Mutex // block concurrent WriteStatus calls
// streamDone indicates whether WriteStatus has been called and writes channel // e.g. grpc/(*serverStream).SendMsg/RecvMsg
// has been closed. writeStatusMu sync.Mutex
streamDone bool
} }
func (ht *serverHandlerTransport) Close() error { func (ht *serverHandlerTransport) Close() error {
@ -177,13 +176,9 @@ func (ht *serverHandlerTransport) do(fn func()) error {
} }
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
ht.mu.Lock() ht.writeStatusMu.Lock()
if ht.streamDone { defer ht.writeStatusMu.Unlock()
ht.mu.Unlock()
return nil
}
ht.streamDone = true
ht.mu.Unlock()
err := ht.do(func() { err := ht.do(func() {
ht.writeCommonHeaders(s) ht.writeCommonHeaders(s)
@ -222,7 +217,11 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
} }
} }
}) })
close(ht.writes)
if err == nil { // transport has not been closed
ht.Close()
close(ht.writes)
}
return err return err
} }

View File

@ -152,12 +152,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
Val: uint32(iwz)}) Val: uint32(iwz)})
} }
if err := framer.fr.WriteSettings(isettings...); err != nil { if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err) return nil, connectionErrorf(false, err, "transport: %v", err)
} }
// Adjust the connection flow control window if needed. // Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 { if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err) return nil, connectionErrorf(false, err, "transport: %v", err)
} }
} }
kp := config.KeepaliveParams kp := config.KeepaliveParams
@ -223,6 +223,31 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
t.stats.HandleConn(t.ctx, connBegin) t.stats.HandleConn(t.ctx, connBegin)
} }
t.framer.writer.Flush() t.framer.writer.Flush()
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
if !bytes.Equal(preface, clientPreface) {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
}
frame, err := t.framer.fr.ReadFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
if err != nil {
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
}
atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
}
t.handleSettings(sf)
go func() { go func() {
loopyWriter(t.ctx, t.controlBuf, t.itemHandler) loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
t.Close() t.Close()
@ -354,41 +379,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// typically run in a separate goroutine. // typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context. // traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
// Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket)
if err != io.EOF {
errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
t.Close()
return
}
if !bytes.Equal(preface, clientPreface) {
errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
t.Close()
return
}
frame, err := t.framer.fr.ReadFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
if err != nil {
errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
t.Close()
return
}
atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
t.Close()
return
}
t.handleSettings(sf)
for { for {
frame, err := t.framer.fr.ReadFrame() frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1) atomic.StoreUint32(&t.activity, 1)