vendor: update grpc dependencies

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day
2017-05-25 16:52:15 -07:00
parent 26183b3a69
commit b626757d06
117 changed files with 40053 additions and 1867 deletions

View File

@@ -37,7 +37,6 @@ import (
"bytes"
"errors"
"io"
"math"
"sync"
"time"
@@ -46,6 +45,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -107,11 +107,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
t transport.ClientTransport
s *transport.Stream
put func()
t transport.ClientTransport
s *transport.Stream
put func()
cancel context.CancelFunc
)
c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok {
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
}
}
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, toRPCErr(err)
@@ -144,23 +151,25 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
if stats.On() {
ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
ctx = newContextWithRPCInfo(ctx)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
stats.HandleRPC(ctx, begin)
sh.HandleRPC(ctx, begin)
}
defer func() {
if err != nil && stats.On() {
if err != nil && sh != nil {
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
Error: err,
}
stats.HandleRPC(ctx, end)
sh.HandleRPC(ctx, end)
}
}()
gopts := BalancerGetOptions{
@@ -170,7 +179,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(*rpcError); ok {
if _, ok := status.FromError(err); ok {
return nil, err
}
if err == errConnClosing || err == errConnUnavailable {
@@ -185,14 +194,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
s, err = t.NewStream(ctx, callHdr)
if err != nil {
if _, ok := err.(transport.ConnectionError); ok && put != nil {
// If error is connection error, transport was sending data on wire,
// and we are not sure if anything has been sent on wire.
// If error is not connection error, we are sure nothing has been sent.
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
}
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return nil, toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
}
return nil, toRPCErr(err)
@@ -200,12 +212,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
opts: opts,
c: c,
desc: desc,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
opts: opts,
c: c,
desc: desc,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
maxMsgSize: cc.dopts.maxMsgSize,
cancel: cancel,
put: put,
t: t,
@@ -215,7 +229,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
tracing: EnableTracing,
trInfo: trInfo,
statsCtx: ctx,
statsCtx: ctx,
statsHandler: cc.dopts.copts.StatsHandler,
}
if cc.dopts.cp != nil {
cs.cbuf = new(bytes.Buffer)
@@ -226,14 +241,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
select {
case <-t.Error():
// Incur transport error, simply exit.
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
cs.closeTransportStream(ErrClientConnClosing)
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.
if s.StatusCode() == codes.OK {
cs.finish(nil)
} else {
cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
}
cs.finish(s.Status().Err())
cs.closeTransportStream(nil)
case <-s.GoAway():
cs.finish(errConnDrain)
@@ -249,22 +263,25 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
opts []CallOption
c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
desc *StreamDesc
codec Codec
cp Compressor
cbuf *bytes.Buffer
dc Decompressor
opts []CallOption
c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
desc *StreamDesc
codec Codec
cp Compressor
cbuf *bytes.Buffer
dc Decompressor
maxMsgSize int
cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
mu sync.Mutex
put func()
closed bool
mu sync.Mutex
put func()
closed bool
finished bool
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
// and is set to nil when the clientStream's finish method is called.
trInfo traceInfo
@@ -272,7 +289,8 @@ type clientStream struct {
// statsCtx keeps the user context for stats handling.
// All stats collection should use the statsCtx (instead of the stream context)
// so that all the generated stats for a particular RPC can be associated in the processing phase.
statsCtx context.Context
statsCtx context.Context
statsHandler stats.Handler
}
func (cs *clientStream) Context() context.Context {
@@ -326,7 +344,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
err = toRPCErr(err)
}()
var outPayload *stats.OutPayload
if stats.On() {
if cs.statsHandler != nil {
outPayload = &stats.OutPayload{
Client: true,
}
@@ -343,34 +361,19 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
stats.HandleRPC(cs.statsCtx, outPayload)
cs.statsHandler.HandleRPC(cs.statsCtx, outPayload)
}
return err
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
defer func() {
if err != nil && stats.On() {
// Only generate End if err != nil.
// If err == nil, it's not the last RecvMsg.
// The last RecvMsg gets either an RPC error or io.EOF.
end := &stats.End{
Client: true,
EndTime: time.Now(),
}
if err != io.EOF {
end.Error = toRPCErr(err)
}
stats.HandleRPC(cs.statsCtx, end)
}
}()
var inPayload *stats.InPayload
if stats.On() {
if cs.statsHandler != nil {
inPayload = &stats.InPayload{
Client: true,
}
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@@ -386,24 +389,24 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.mu.Unlock()
}
if inPayload != nil {
stats.HandleRPC(cs.statsCtx, inPayload)
cs.statsHandler.HandleRPC(cs.statsCtx, inPayload)
}
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
return
}
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if err == io.EOF {
if cs.s.StatusCode() == codes.OK {
cs.finish(err)
return nil
if se := cs.s.Status().Err(); se != nil {
return se
}
return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
cs.finish(err)
return nil
}
return toRPCErr(err)
}
@@ -411,11 +414,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.closeTransportStream(err)
}
if err == io.EOF {
if cs.s.StatusCode() == codes.OK {
// Returns io.EOF to indicate the end of the stream.
return
if statusErr := cs.s.Status().Err(); statusErr != nil {
return statusErr
}
return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
// Returns io.EOF to indicate the end of the stream.
return
}
return toRPCErr(err)
}
@@ -451,13 +454,37 @@ func (cs *clientStream) closeTransportStream(err error) {
func (cs *clientStream) finish(err error) {
cs.mu.Lock()
defer cs.mu.Unlock()
if cs.finished {
return
}
cs.finished = true
defer func() {
if cs.cancel != nil {
cs.cancel()
}
}()
for _, o := range cs.opts {
o.after(&cs.c)
}
if cs.put != nil {
updateRPCInfoInContext(cs.s.Context(), rpcInfo{
bytesSent: cs.s.BytesSent(),
bytesReceived: cs.s.BytesReceived(),
})
cs.put()
cs.put = nil
}
if cs.statsHandler != nil {
end := &stats.End{
Client: true,
EndTime: time.Now(),
}
if err != io.EOF {
// end.Error is nil if the RPC finished successfully.
end.Error = toRPCErr(err)
}
cs.statsHandler.HandleRPC(cs.statsCtx, end)
}
if !cs.tracing {
return
}
@@ -502,10 +529,10 @@ type serverStream struct {
dc Decompressor
cbuf *bytes.Buffer
maxMsgSize int
statusCode codes.Code
statusDesc string
trInfo *traceInfo
statsHandler stats.Handler
mu sync.Mutex // protects trInfo.tr after the service handler runs.
}
@@ -548,7 +575,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()
var outPayload *stats.OutPayload
if stats.On() {
if ss.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
@@ -566,7 +593,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
if outPayload != nil {
outPayload.SentTime = time.Now()
stats.HandleRPC(ss.s.Context(), outPayload)
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
}
return nil
}
@@ -587,7 +614,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
}()
var inPayload *stats.InPayload
if stats.On() {
if ss.statsHandler != nil {
inPayload = &stats.InPayload{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
@@ -600,7 +627,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
return toRPCErr(err)
}
if inPayload != nil {
stats.HandleRPC(ss.s.Context(), inPayload)
ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
}
return nil
}