Bump grpc to v1.51.0

Signed-off-by: Luca Comellini <luca.com@gmail.com>
This commit is contained in:
Luca Comellini
2022-11-21 18:53:05 -08:00
parent 09d7286eb5
commit 2309c6b498
80 changed files with 941 additions and 472 deletions

View File

@@ -39,6 +39,7 @@ import (
imetadata "google.golang.org/grpc/internal/metadata"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -195,6 +196,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
}
return nil, err
}
return nil, toRPCErr(err)
}
@@ -301,7 +309,14 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}
cs.binlog = binarylog.GetMethodLogger(method)
if ml := binarylog.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
if cc.dopts.binaryLogger != nil {
if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
}
// Pick the transport to use and create a new stream on the transport.
// Assign cs.attempt upon success.
@@ -322,7 +337,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
return nil, err
}
if cs.binlog != nil {
if len(cs.binlogs) != 0 {
md, _ := metadata.FromOutgoingContext(ctx)
logEntry := &binarylog.ClientHeader{
OnClientSide: true,
@@ -336,7 +351,9 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
logEntry.Timeout = 0
}
}
cs.binlog.Log(logEntry)
for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
}
if desc != unaryStreamDesc {
@@ -480,7 +497,7 @@ type clientStream struct {
retryThrottler *retryThrottler // The throttler active when the RPC began.
binlog binarylog.MethodLogger // Binary logger, can be nil.
binlogs []binarylog.MethodLogger
// serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv().
@@ -735,17 +752,25 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
func (cs *clientStream) Header() (metadata.MD, error) {
var m metadata.MD
noHeader := false
err := cs.withRetry(func(a *csAttempt) error {
var err error
m, err = a.s.Header()
if err == transport.ErrNoHeaders {
noHeader = true
return nil
}
return toRPCErr(err)
}, cs.commitAttemptLocked)
if err != nil {
cs.finish(err)
return nil, err
}
if cs.binlog != nil && !cs.serverHeaderBinlogged {
// Only log if binary log is on and header has not been logged.
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader {
// Only log if binary log is on and header has not been logged, and
// there is actually headers to log.
logEntry := &binarylog.ServerHeader{
OnClientSide: true,
Header: m,
@@ -754,8 +779,10 @@ func (cs *clientStream) Header() (metadata.MD, error) {
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true
for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
}
return m, nil
}
@@ -829,38 +856,44 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
return a.sendMsg(m, hdr, payload, data)
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
if len(cs.binlogs) != 0 && err == nil {
cm := &binarylog.ClientMessage{
OnClientSide: true,
Message: data,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(cm)
}
}
return err
}
func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.binlog != nil && !cs.serverHeaderBinlogged {
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
cs.Header()
}
var recvInfo *payloadInfo
if cs.binlog != nil {
if len(cs.binlogs) != 0 {
recvInfo = &payloadInfo{}
}
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ServerMessage{
if len(cs.binlogs) != 0 && err == nil {
sm := &binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(sm)
}
}
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)
if cs.binlog != nil {
if len(cs.binlogs) != 0 {
// finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
@@ -873,7 +906,9 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.binlog.Log(logEntry)
for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
}
}
return err
@@ -894,10 +929,13 @@ func (cs *clientStream) CloseSend() error {
return nil
}
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
if cs.binlog != nil {
cs.binlog.Log(&binarylog.ClientHalfClose{
if len(cs.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{
OnClientSide: true,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(chc)
}
}
// We never returned an error here for reasons.
return nil
@@ -930,10 +968,13 @@ func (cs *clientStream) finish(err error) {
//
// Only one of cancel or trailer needs to be logged. In the cases where
// users don't call RecvMsg, users must have already canceled the RPC.
if cs.binlog != nil && status.Code(err) == codes.Canceled {
cs.binlog.Log(&binarylog.Cancel{
if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
c := &binarylog.Cancel{
OnClientSide: true,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(c)
}
}
if err == nil {
cs.retryThrottler.successfulRPC()
@@ -1005,6 +1046,7 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
if a.trInfo != nil {
@@ -1453,7 +1495,7 @@ type serverStream struct {
statsHandler []stats.Handler
binlog binarylog.MethodLogger
binlogs []binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
// will happen when one of the following two happens: stream.SendHeader(),
// stream.Send().
@@ -1487,12 +1529,15 @@ func (ss *serverStream) SendHeader(md metadata.MD) error {
}
err = ss.t.WriteHeader(ss.s, md)
if ss.binlog != nil && !ss.serverHeaderBinlogged {
if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(sh)
}
}
return err
}
@@ -1549,17 +1594,23 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if ss.binlog != nil {
if len(ss.binlogs) != 0 {
if !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(sh)
}
}
ss.binlog.Log(&binarylog.ServerMessage{
sm := &binarylog.ServerMessage{
Message: data,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(sm)
}
}
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
@@ -1598,13 +1649,16 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
}()
var payInfo *payloadInfo
if len(ss.statsHandler) != 0 || ss.binlog != nil {
if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
payInfo = &payloadInfo{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
if err == io.EOF {
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ClientHalfClose{})
if len(ss.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{}
for _, binlog := range ss.binlogs {
binlog.Log(chc)
}
}
return err
}
@@ -1625,10 +1679,13 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
})
}
}
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ClientMessage{
if len(ss.binlogs) != 0 {
cm := &binarylog.ClientMessage{
Message: payInfo.uncompressedBytes,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(cm)
}
}
return nil
}