Bump grpc to v1.47.0
Signed-off-by: Luca Comellini <luca.com@gmail.com>
This commit is contained in:
parent
1f4e5175c4
commit
5fcde823dd
2
go.mod
2
go.mod
@ -63,7 +63,7 @@ require (
|
|||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||||
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150
|
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150
|
||||||
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46
|
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46
|
||||||
google.golang.org/grpc v1.46.0
|
google.golang.org/grpc v1.47.0
|
||||||
google.golang.org/protobuf v1.28.0
|
google.golang.org/protobuf v1.28.0
|
||||||
k8s.io/api v0.24.0
|
k8s.io/api v0.24.0
|
||||||
k8s.io/apimachinery v0.24.0
|
k8s.io/apimachinery v0.24.0
|
||||||
|
3
go.sum
3
go.sum
@ -1475,8 +1475,9 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K
|
|||||||
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
||||||
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
||||||
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
|
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
|
||||||
google.golang.org/grpc v1.46.0 h1:oCjezcn6g6A75TGoKYBPgKmVBLexhYLM6MebdrPApP8=
|
|
||||||
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
|
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
|
||||||
|
google.golang.org/grpc v1.47.0 h1:9n77onPX5F3qfFCqjy9dhn8PbNQsIKeVU04J9G7umt8=
|
||||||
|
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
|
||||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||||
|
@ -58,7 +58,7 @@ require (
|
|||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||||
golang.org/x/text v0.3.7 // indirect
|
golang.org/x/text v0.3.7 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 // indirect
|
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 // indirect
|
||||||
google.golang.org/grpc v1.46.0 // indirect
|
google.golang.org/grpc v1.47.0 // indirect
|
||||||
google.golang.org/protobuf v1.28.0 // indirect
|
google.golang.org/protobuf v1.28.0 // indirect
|
||||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||||
|
@ -1130,8 +1130,9 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K
|
|||||||
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
||||||
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
||||||
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
|
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
|
||||||
google.golang.org/grpc v1.46.0 h1:oCjezcn6g6A75TGoKYBPgKmVBLexhYLM6MebdrPApP8=
|
|
||||||
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
|
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
|
||||||
|
google.golang.org/grpc v1.47.0 h1:9n77onPX5F3qfFCqjy9dhn8PbNQsIKeVU04J9G7umt8=
|
||||||
|
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
|
||||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||||
|
34
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
34
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -801,16 +801,31 @@ func (ac *addrConn) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func equalAddresses(a, b []resolver.Address) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i, v := range a {
|
||||||
|
if !v.Equal(b[i]) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
|
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
|
||||||
//
|
//
|
||||||
// If ac is Connecting, it returns false. The caller should tear down the ac and
|
|
||||||
// create a new one. Note that the backoff will be reset when this happens.
|
|
||||||
//
|
|
||||||
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
|
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
|
||||||
// addresses will be picked up by retry in the next iteration after backoff.
|
// addresses will be picked up by retry in the next iteration after backoff.
|
||||||
//
|
//
|
||||||
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
|
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
|
||||||
//
|
//
|
||||||
|
// If the addresses is the same as the old list, it does nothing and returns
|
||||||
|
// true.
|
||||||
|
//
|
||||||
|
// If ac is Connecting, it returns false. The caller should tear down the ac and
|
||||||
|
// create a new one. Note that the backoff will be reset when this happens.
|
||||||
|
//
|
||||||
// If ac is Ready, it checks whether current connected address of ac is in the
|
// If ac is Ready, it checks whether current connected address of ac is in the
|
||||||
// new addrs list.
|
// new addrs list.
|
||||||
// - If true, it updates ac.addrs and returns true. The ac will keep using
|
// - If true, it updates ac.addrs and returns true. The ac will keep using
|
||||||
@ -827,6 +842,10 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if equalAddresses(ac.addrs, addrs) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
if ac.state == connectivity.Connecting {
|
if ac.state == connectivity.Connecting {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -907,14 +926,10 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
|
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
|
||||||
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
|
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
FullMethodName: method,
|
FullMethodName: method,
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return nil, nil, toRPCErr(err)
|
|
||||||
}
|
|
||||||
return t, done, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
|
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
|
||||||
@ -1223,6 +1238,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
|||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
defer ac.mu.Unlock()
|
defer ac.mu.Unlock()
|
||||||
defer connClosed.Fire()
|
defer connClosed.Fire()
|
||||||
|
defer hcancel()
|
||||||
if !hcStarted || hctx.Err() != nil {
|
if !hcStarted || hctx.Err() != nil {
|
||||||
// We didn't start the health check or set the state to READY, so
|
// We didn't start the health check or set the state to READY, so
|
||||||
// no need to do anything else here.
|
// no need to do anything else here.
|
||||||
@ -1233,7 +1249,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
|||||||
// state, since there may be a new transport in this addrConn.
|
// state, since there may be a new transport in this addrConn.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
hcancel()
|
|
||||||
ac.transport = nil
|
ac.transport = nil
|
||||||
// Refresh the name resolver
|
// Refresh the name resolver
|
||||||
ac.cc.resolveNow(resolver.ResolveNowOptions{})
|
ac.cc.resolveNow(resolver.ResolveNowOptions{})
|
||||||
@ -1256,6 +1271,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
|||||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
|
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// newTr is either nil, or closed.
|
// newTr is either nil, or closed.
|
||||||
|
hcancel()
|
||||||
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
|
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
2
vendor/google.golang.org/grpc/encoding/encoding.go
generated
vendored
2
vendor/google.golang.org/grpc/encoding/encoding.go
generated
vendored
@ -108,7 +108,7 @@ var registeredCodecs = make(map[string]Codec)
|
|||||||
// more details.
|
// more details.
|
||||||
//
|
//
|
||||||
// NOTE: this function must only be called during initialization time (i.e. in
|
// NOTE: this function must only be called during initialization time (i.e. in
|
||||||
// an init() function), and is not thread-safe. If multiple Compressors are
|
// an init() function), and is not thread-safe. If multiple Codecs are
|
||||||
// registered with the same name, the one registered last will take effect.
|
// registered with the same name, the one registered last will take effect.
|
||||||
func RegisterCodec(codec Codec) {
|
func RegisterCodec(codec Codec) {
|
||||||
if codec == nil {
|
if codec == nil {
|
||||||
|
6
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
6
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@ -137,6 +137,7 @@ type earlyAbortStream struct {
|
|||||||
streamID uint32
|
streamID uint32
|
||||||
contentSubtype string
|
contentSubtype string
|
||||||
status *status.Status
|
status *status.Status
|
||||||
|
rst bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
|
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
|
||||||
@ -786,6 +787,11 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
|
|||||||
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
|
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if eas.rst {
|
||||||
|
if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
20
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
20
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@ -631,8 +631,8 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
|
|||||||
// the wire. However, there are two notable exceptions:
|
// the wire. However, there are two notable exceptions:
|
||||||
//
|
//
|
||||||
// 1. If the stream headers violate the max header list size allowed by the
|
// 1. If the stream headers violate the max header list size allowed by the
|
||||||
// server. In this case there is no reason to retry at all, as it is
|
// server. It's possible this could succeed on another transport, even if
|
||||||
// assumed the RPC would continue to fail on subsequent attempts.
|
// it's unlikely, but do not transparently retry.
|
||||||
// 2. If the credentials errored when requesting their headers. In this case,
|
// 2. If the credentials errored when requesting their headers. In this case,
|
||||||
// it's possible a retry can fix the problem, but indefinitely transparently
|
// it's possible a retry can fix the problem, but indefinitely transparently
|
||||||
// retrying is not appropriate as it is likely the credentials, if they can
|
// retrying is not appropriate as it is likely the credentials, if they can
|
||||||
@ -640,8 +640,7 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
|
|||||||
type NewStreamError struct {
|
type NewStreamError struct {
|
||||||
Err error
|
Err error
|
||||||
|
|
||||||
DoNotRetry bool
|
AllowTransparentRetry bool
|
||||||
DoNotTransparentRetry bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e NewStreamError) Error() string {
|
func (e NewStreamError) Error() string {
|
||||||
@ -650,11 +649,11 @@ func (e NewStreamError) Error() string {
|
|||||||
|
|
||||||
// NewStream creates a stream and registers it into the transport as "active"
|
// NewStream creates a stream and registers it into the transport as "active"
|
||||||
// streams. All non-nil errors returned will be *NewStreamError.
|
// streams. All non-nil errors returned will be *NewStreamError.
|
||||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
|
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
|
||||||
ctx = peer.NewContext(ctx, t.getPeer())
|
ctx = peer.NewContext(ctx, t.getPeer())
|
||||||
headerFields, err := t.createHeaderFields(ctx, callHdr)
|
headerFields, err := t.createHeaderFields(ctx, callHdr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
|
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
|
||||||
}
|
}
|
||||||
s := t.newStream(ctx, callHdr)
|
s := t.newStream(ctx, callHdr)
|
||||||
cleanup := func(err error) {
|
cleanup := func(err error) {
|
||||||
@ -754,13 +753,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
return true
|
return true
|
||||||
}, hdr)
|
}, hdr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &NewStreamError{Err: err}
|
// Connection closed.
|
||||||
|
return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
|
||||||
}
|
}
|
||||||
if success {
|
if success {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if hdrListSizeErr != nil {
|
if hdrListSizeErr != nil {
|
||||||
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
|
return nil, &NewStreamError{Err: hdrListSizeErr}
|
||||||
}
|
}
|
||||||
firstTry = false
|
firstTry = false
|
||||||
select {
|
select {
|
||||||
@ -768,9 +768,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
|
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
|
||||||
case <-t.goAway:
|
case <-t.goAway:
|
||||||
return nil, &NewStreamError{Err: errStreamDrain}
|
return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
|
||||||
case <-t.ctx.Done():
|
case <-t.ctx.Done():
|
||||||
return nil, &NewStreamError{Err: ErrConnClosing}
|
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if t.statsHandler != nil {
|
if t.statsHandler != nil {
|
||||||
|
75
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
75
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@ -21,7 +21,6 @@ package transport
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
@ -53,10 +52,10 @@ import (
|
|||||||
var (
|
var (
|
||||||
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
|
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
|
||||||
// the stream's state.
|
// the stream's state.
|
||||||
ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
|
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
|
||||||
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
|
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
|
||||||
// than the limit set by peer.
|
// than the limit set by peer.
|
||||||
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
|
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
|
||||||
)
|
)
|
||||||
|
|
||||||
// serverConnectionCounter counts the number of connections a server has seen
|
// serverConnectionCounter counts the number of connections a server has seen
|
||||||
@ -449,6 +448,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
streamID: streamID,
|
streamID: streamID,
|
||||||
contentSubtype: s.contentSubtype,
|
contentSubtype: s.contentSubtype,
|
||||||
status: status.New(codes.Internal, errMsg),
|
status: status.New(codes.Internal, errMsg),
|
||||||
|
rst: !frame.StreamEnded(),
|
||||||
})
|
})
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -522,14 +522,16 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
}
|
}
|
||||||
if httpMethod != http.MethodPost {
|
if httpMethod != http.MethodPost {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
|
errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
|
||||||
if logger.V(logLevel) {
|
if logger.V(logLevel) {
|
||||||
logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
|
logger.Infof("transport: %v", errMsg)
|
||||||
}
|
}
|
||||||
t.controlBuf.put(&cleanupStream{
|
t.controlBuf.put(&earlyAbortStream{
|
||||||
streamID: streamID,
|
httpStatus: 405,
|
||||||
rst: true,
|
streamID: streamID,
|
||||||
rstCode: http2.ErrCodeProtocol,
|
contentSubtype: s.contentSubtype,
|
||||||
onWrite: func() {},
|
status: status.New(codes.Internal, errMsg),
|
||||||
|
rst: !frame.StreamEnded(),
|
||||||
})
|
})
|
||||||
s.cancel()
|
s.cancel()
|
||||||
return false
|
return false
|
||||||
@ -550,6 +552,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
streamID: s.id,
|
streamID: s.id,
|
||||||
contentSubtype: s.contentSubtype,
|
contentSubtype: s.contentSubtype,
|
||||||
status: stat,
|
status: stat,
|
||||||
|
rst: !frame.StreamEnded(),
|
||||||
})
|
})
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -931,11 +934,25 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *http2Server) streamContextErr(s *Stream) error {
|
||||||
|
select {
|
||||||
|
case <-t.done:
|
||||||
|
return ErrConnClosing
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return ContextErr(s.ctx.Err())
|
||||||
|
}
|
||||||
|
|
||||||
// WriteHeader sends the header metadata md back to the client.
|
// WriteHeader sends the header metadata md back to the client.
|
||||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||||
if s.updateHeaderSent() || s.getState() == streamDone {
|
if s.updateHeaderSent() {
|
||||||
return ErrIllegalHeaderWrite
|
return ErrIllegalHeaderWrite
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.getState() == streamDone {
|
||||||
|
return t.streamContextErr(s)
|
||||||
|
}
|
||||||
|
|
||||||
s.hdrMu.Lock()
|
s.hdrMu.Lock()
|
||||||
if md.Len() > 0 {
|
if md.Len() > 0 {
|
||||||
if s.header.Len() > 0 {
|
if s.header.Len() > 0 {
|
||||||
@ -946,7 +963,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
|||||||
}
|
}
|
||||||
if err := t.writeHeaderLocked(s); err != nil {
|
if err := t.writeHeaderLocked(s); err != nil {
|
||||||
s.hdrMu.Unlock()
|
s.hdrMu.Unlock()
|
||||||
return err
|
return status.Convert(err).Err()
|
||||||
}
|
}
|
||||||
s.hdrMu.Unlock()
|
s.hdrMu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
@ -1062,23 +1079,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
|||||||
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||||
if !s.isHeaderSent() { // Headers haven't been written yet.
|
if !s.isHeaderSent() { // Headers haven't been written yet.
|
||||||
if err := t.WriteHeader(s, nil); err != nil {
|
if err := t.WriteHeader(s, nil); err != nil {
|
||||||
if _, ok := err.(ConnectionError); ok {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
|
|
||||||
return status.Errorf(codes.Internal, "transport: %v", err)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Writing headers checks for this condition.
|
// Writing headers checks for this condition.
|
||||||
if s.getState() == streamDone {
|
if s.getState() == streamDone {
|
||||||
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
|
return t.streamContextErr(s)
|
||||||
s.cancel()
|
|
||||||
select {
|
|
||||||
case <-t.done:
|
|
||||||
return ErrConnClosing
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return ContextErr(s.ctx.Err())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
df := &dataFrame{
|
df := &dataFrame{
|
||||||
@ -1088,12 +1094,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
onEachWrite: t.setResetPingStrikes,
|
onEachWrite: t.setResetPingStrikes,
|
||||||
}
|
}
|
||||||
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
|
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
|
||||||
select {
|
return t.streamContextErr(s)
|
||||||
case <-t.done:
|
|
||||||
return ErrConnClosing
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return ContextErr(s.ctx.Err())
|
|
||||||
}
|
}
|
||||||
return t.controlBuf.put(df)
|
return t.controlBuf.put(df)
|
||||||
}
|
}
|
||||||
@ -1229,10 +1230,6 @@ func (t *http2Server) Close() {
|
|||||||
|
|
||||||
// deleteStream deletes the stream s from transport's active streams.
|
// deleteStream deletes the stream s from transport's active streams.
|
||||||
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||||
// In case stream sending and receiving are invoked in separate
|
|
||||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
|
||||||
// called to interrupt the potential blocking on other goroutines.
|
|
||||||
s.cancel()
|
|
||||||
|
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
if _, ok := t.activeStreams[s.id]; ok {
|
if _, ok := t.activeStreams[s.id]; ok {
|
||||||
@ -1254,6 +1251,11 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
|||||||
|
|
||||||
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
|
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
|
||||||
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||||
|
// In case stream sending and receiving are invoked in separate
|
||||||
|
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||||
|
// called to interrupt the potential blocking on other goroutines.
|
||||||
|
s.cancel()
|
||||||
|
|
||||||
oldState := s.swapState(streamDone)
|
oldState := s.swapState(streamDone)
|
||||||
if oldState == streamDone {
|
if oldState == streamDone {
|
||||||
// If the stream was already done, return.
|
// If the stream was already done, return.
|
||||||
@ -1273,6 +1275,11 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
|
|||||||
|
|
||||||
// closeStream clears the footprint of a stream when the stream is not needed any more.
|
// closeStream clears the footprint of a stream when the stream is not needed any more.
|
||||||
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
||||||
|
// In case stream sending and receiving are invoked in separate
|
||||||
|
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||||
|
// called to interrupt the potential blocking on other goroutines.
|
||||||
|
s.cancel()
|
||||||
|
|
||||||
s.swapState(streamDone)
|
s.swapState(streamDone)
|
||||||
t.deleteStream(s, eosReceived)
|
t.deleteStream(s, eosReceived)
|
||||||
|
|
||||||
|
8
vendor/google.golang.org/grpc/picker_wrapper.go
generated
vendored
8
vendor/google.golang.org/grpc/picker_wrapper.go
generated
vendored
@ -131,7 +131,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
|
|||||||
}
|
}
|
||||||
if _, ok := status.FromError(err); ok {
|
if _, ok := status.FromError(err); ok {
|
||||||
// Status error: end the RPC unconditionally with this status.
|
// Status error: end the RPC unconditionally with this status.
|
||||||
return nil, nil, err
|
return nil, nil, dropError{error: err}
|
||||||
}
|
}
|
||||||
// For all other errors, wait for ready RPCs should block and other
|
// For all other errors, wait for ready RPCs should block and other
|
||||||
// RPCs should fail with unavailable.
|
// RPCs should fail with unavailable.
|
||||||
@ -175,3 +175,9 @@ func (pw *pickerWrapper) close() {
|
|||||||
pw.done = true
|
pw.done = true
|
||||||
close(pw.blockingCh)
|
close(pw.blockingCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dropError is a wrapper error that indicates the LB policy wishes to drop the
|
||||||
|
// RPC and not retry it.
|
||||||
|
type dropError struct {
|
||||||
|
error
|
||||||
|
}
|
||||||
|
40
vendor/google.golang.org/grpc/server.go
generated
vendored
40
vendor/google.golang.org/grpc/server.go
generated
vendored
@ -1801,12 +1801,26 @@ func (s *Server) getCodec(contentSubtype string) baseCodec {
|
|||||||
return codec
|
return codec
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetHeader sets the header metadata.
|
// SetHeader sets the header metadata to be sent from the server to the client.
|
||||||
// When called multiple times, all the provided metadata will be merged.
|
// The context provided must be the context passed to the server's handler.
|
||||||
// All the metadata will be sent out when one of the following happens:
|
//
|
||||||
// - grpc.SendHeader() is called;
|
// Streaming RPCs should prefer the SetHeader method of the ServerStream.
|
||||||
// - The first response is sent out;
|
//
|
||||||
// - An RPC status is sent out (error or success).
|
// When called multiple times, all the provided metadata will be merged. All
|
||||||
|
// the metadata will be sent out when one of the following happens:
|
||||||
|
//
|
||||||
|
// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
|
||||||
|
// - The first response message is sent. For unary handlers, this occurs when
|
||||||
|
// the handler returns; for streaming handlers, this can happen when stream's
|
||||||
|
// SendMsg method is called.
|
||||||
|
// - An RPC status is sent out (error or success). This occurs when the handler
|
||||||
|
// returns.
|
||||||
|
//
|
||||||
|
// SetHeader will fail if called after any of the events above.
|
||||||
|
//
|
||||||
|
// The error returned is compatible with the status package. However, the
|
||||||
|
// status code will often not match the RPC status as seen by the client
|
||||||
|
// application, and therefore, should not be relied upon for this purpose.
|
||||||
func SetHeader(ctx context.Context, md metadata.MD) error {
|
func SetHeader(ctx context.Context, md metadata.MD) error {
|
||||||
if md.Len() == 0 {
|
if md.Len() == 0 {
|
||||||
return nil
|
return nil
|
||||||
@ -1818,8 +1832,14 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
|
|||||||
return stream.SetHeader(md)
|
return stream.SetHeader(md)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendHeader sends header metadata. It may be called at most once.
|
// SendHeader sends header metadata. It may be called at most once, and may not
|
||||||
// The provided md and headers set by SetHeader() will be sent.
|
// be called after any event that causes headers to be sent (see SetHeader for
|
||||||
|
// a complete list). The provided md and headers set by SetHeader() will be
|
||||||
|
// sent.
|
||||||
|
//
|
||||||
|
// The error returned is compatible with the status package. However, the
|
||||||
|
// status code will often not match the RPC status as seen by the client
|
||||||
|
// application, and therefore, should not be relied upon for this purpose.
|
||||||
func SendHeader(ctx context.Context, md metadata.MD) error {
|
func SendHeader(ctx context.Context, md metadata.MD) error {
|
||||||
stream := ServerTransportStreamFromContext(ctx)
|
stream := ServerTransportStreamFromContext(ctx)
|
||||||
if stream == nil {
|
if stream == nil {
|
||||||
@ -1833,6 +1853,10 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
|
|||||||
|
|
||||||
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
|
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
|
||||||
// When called more than once, all the provided metadata will be merged.
|
// When called more than once, all the provided metadata will be merged.
|
||||||
|
//
|
||||||
|
// The error returned is compatible with the status package. However, the
|
||||||
|
// status code will often not match the RPC status as seen by the client
|
||||||
|
// application, and therefore, should not be relied upon for this purpose.
|
||||||
func SetTrailer(ctx context.Context, md metadata.MD) error {
|
func SetTrailer(ctx context.Context, md metadata.MD) error {
|
||||||
if md.Len() == 0 {
|
if md.Len() == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
203
vendor/google.golang.org/grpc/stream.go
generated
vendored
203
vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -303,14 +303,28 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
|
|||||||
}
|
}
|
||||||
cs.binlog = binarylog.GetMethodLogger(method)
|
cs.binlog = binarylog.GetMethodLogger(method)
|
||||||
|
|
||||||
if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
|
cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
|
||||||
|
if err != nil {
|
||||||
cs.finish(err)
|
cs.finish(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
op := func(a *csAttempt) error { return a.newStream() }
|
// Pick the transport to use and create a new stream on the transport.
|
||||||
|
// Assign cs.attempt upon success.
|
||||||
|
op := func(a *csAttempt) error {
|
||||||
|
if err := a.getTransport(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := a.newStream(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Because this operation is always called either here (while creating
|
||||||
|
// the clientStream) or by the retry code while locked when replaying
|
||||||
|
// the operation, it is safe to access cs.attempt directly.
|
||||||
|
cs.attempt = a
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
|
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
|
||||||
cs.finish(err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -349,9 +363,15 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
|
|||||||
return cs, nil
|
return cs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// newAttemptLocked creates a new attempt with a transport.
|
// newAttemptLocked creates a new csAttempt without a transport or stream.
|
||||||
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
|
func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
|
||||||
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
|
if err := cs.ctx.Err(); err != nil {
|
||||||
|
return nil, toRPCErr(err)
|
||||||
|
}
|
||||||
|
if err := cs.cc.ctx.Err(); err != nil {
|
||||||
|
return nil, ErrClientConnClosing
|
||||||
|
}
|
||||||
|
|
||||||
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
|
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
|
||||||
method := cs.callHdr.Method
|
method := cs.callHdr.Method
|
||||||
sh := cs.cc.dopts.copts.StatsHandler
|
sh := cs.cc.dopts.copts.StatsHandler
|
||||||
@ -385,27 +405,6 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
|
|||||||
ctx = trace.NewContext(ctx, trInfo.tr)
|
ctx = trace.NewContext(ctx, trInfo.tr)
|
||||||
}
|
}
|
||||||
|
|
||||||
newAttempt := &csAttempt{
|
|
||||||
ctx: ctx,
|
|
||||||
beginTime: beginTime,
|
|
||||||
cs: cs,
|
|
||||||
dc: cs.cc.dopts.dc,
|
|
||||||
statsHandler: sh,
|
|
||||||
trInfo: trInfo,
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if retErr != nil {
|
|
||||||
// This attempt is not set in the clientStream, so it's finish won't
|
|
||||||
// be called. Call it here for stats and trace in case they are not
|
|
||||||
// nil.
|
|
||||||
newAttempt.finish(retErr)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err := ctx.Err(); err != nil {
|
|
||||||
return toRPCErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cs.cc.parsedTarget.Scheme == "xds" {
|
if cs.cc.parsedTarget.Scheme == "xds" {
|
||||||
// Add extra metadata (metadata that will be added by transport) to context
|
// Add extra metadata (metadata that will be added by transport) to context
|
||||||
// so the balancer can see them.
|
// so the balancer can see them.
|
||||||
@ -413,16 +412,32 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
|
|||||||
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
|
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
|
||||||
|
return &csAttempt{
|
||||||
|
ctx: ctx,
|
||||||
|
beginTime: beginTime,
|
||||||
|
cs: cs,
|
||||||
|
dc: cs.cc.dopts.dc,
|
||||||
|
statsHandler: sh,
|
||||||
|
trInfo: trInfo,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *csAttempt) getTransport() error {
|
||||||
|
cs := a.cs
|
||||||
|
|
||||||
|
var err error
|
||||||
|
a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if de, ok := err.(dropError); ok {
|
||||||
|
err = de.error
|
||||||
|
a.drop = true
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if trInfo != nil {
|
if a.trInfo != nil {
|
||||||
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
|
a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
|
||||||
}
|
}
|
||||||
newAttempt.t = t
|
|
||||||
newAttempt.done = done
|
|
||||||
cs.attempt = newAttempt
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -431,12 +446,21 @@ func (a *csAttempt) newStream() error {
|
|||||||
cs.callHdr.PreviousAttempts = cs.numRetries
|
cs.callHdr.PreviousAttempts = cs.numRetries
|
||||||
s, err := a.t.NewStream(a.ctx, cs.callHdr)
|
s, err := a.t.NewStream(a.ctx, cs.callHdr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Return without converting to an RPC error so retry code can
|
nse, ok := err.(*transport.NewStreamError)
|
||||||
// inspect.
|
if !ok {
|
||||||
return err
|
// Unexpected.
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if nse.AllowTransparentRetry {
|
||||||
|
a.allowTransparentRetry = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unwrap and convert error.
|
||||||
|
return toRPCErr(nse.Err)
|
||||||
}
|
}
|
||||||
cs.attempt.s = s
|
a.s = s
|
||||||
cs.attempt.p = &parser{r: s}
|
a.p = &parser{r: s}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -514,6 +538,11 @@ type csAttempt struct {
|
|||||||
|
|
||||||
statsHandler stats.Handler
|
statsHandler stats.Handler
|
||||||
beginTime time.Time
|
beginTime time.Time
|
||||||
|
|
||||||
|
// set for newStream errors that may be transparently retried
|
||||||
|
allowTransparentRetry bool
|
||||||
|
// set for pick errors that are returned as a status
|
||||||
|
drop bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) commitAttemptLocked() {
|
func (cs *clientStream) commitAttemptLocked() {
|
||||||
@ -533,41 +562,21 @@ func (cs *clientStream) commitAttempt() {
|
|||||||
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
|
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
|
||||||
// the error that should be returned by the operation. If the RPC should be
|
// the error that should be returned by the operation. If the RPC should be
|
||||||
// retried, the bool indicates whether it is being retried transparently.
|
// retried, the bool indicates whether it is being retried transparently.
|
||||||
func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
func (a *csAttempt) shouldRetry(err error) (bool, error) {
|
||||||
if cs.attempt.s == nil {
|
cs := a.cs
|
||||||
// Error from NewClientStream.
|
|
||||||
nse, ok := err.(*transport.NewStreamError)
|
|
||||||
if !ok {
|
|
||||||
// Unexpected, but assume no I/O was performed and the RPC is not
|
|
||||||
// fatal, so retry indefinitely.
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unwrap and convert error.
|
if cs.finished || cs.committed || a.drop {
|
||||||
err = toRPCErr(nse.Err)
|
// RPC is finished or committed or was dropped by the picker; cannot retry.
|
||||||
|
|
||||||
// Never retry DoNotRetry errors, which indicate the RPC should not be
|
|
||||||
// retried due to max header list size violation, etc.
|
|
||||||
if nse.DoNotRetry {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// In the event of a non-IO operation error from NewStream, we never
|
|
||||||
// attempted to write anything to the wire, so we can retry
|
|
||||||
// indefinitely.
|
|
||||||
if !nse.DoNotTransparentRetry {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if cs.finished || cs.committed {
|
|
||||||
// RPC is finished or committed; cannot retry.
|
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
if a.s == nil && a.allowTransparentRetry {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
// Wait for the trailers.
|
// Wait for the trailers.
|
||||||
unprocessed := false
|
unprocessed := false
|
||||||
if cs.attempt.s != nil {
|
if a.s != nil {
|
||||||
<-cs.attempt.s.Done()
|
<-a.s.Done()
|
||||||
unprocessed = cs.attempt.s.Unprocessed()
|
unprocessed = a.s.Unprocessed()
|
||||||
}
|
}
|
||||||
if cs.firstAttempt && unprocessed {
|
if cs.firstAttempt && unprocessed {
|
||||||
// First attempt, stream unprocessed: transparently retry.
|
// First attempt, stream unprocessed: transparently retry.
|
||||||
@ -579,14 +588,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
|||||||
|
|
||||||
pushback := 0
|
pushback := 0
|
||||||
hasPushback := false
|
hasPushback := false
|
||||||
if cs.attempt.s != nil {
|
if a.s != nil {
|
||||||
if !cs.attempt.s.TrailersOnly() {
|
if !a.s.TrailersOnly() {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(retry): Move down if the spec changes to not check server pushback
|
// TODO(retry): Move down if the spec changes to not check server pushback
|
||||||
// before considering this a failure for throttling.
|
// before considering this a failure for throttling.
|
||||||
sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
|
sps := a.s.Trailer()["grpc-retry-pushback-ms"]
|
||||||
if len(sps) == 1 {
|
if len(sps) == 1 {
|
||||||
var e error
|
var e error
|
||||||
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
|
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
|
||||||
@ -603,10 +612,10 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var code codes.Code
|
var code codes.Code
|
||||||
if cs.attempt.s != nil {
|
if a.s != nil {
|
||||||
code = cs.attempt.s.Status().Code()
|
code = a.s.Status().Code()
|
||||||
} else {
|
} else {
|
||||||
code = status.Convert(err).Code()
|
code = status.Code(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rp := cs.methodConfig.RetryPolicy
|
rp := cs.methodConfig.RetryPolicy
|
||||||
@ -651,19 +660,24 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns nil if a retry was performed and succeeded; error otherwise.
|
// Returns nil if a retry was performed and succeeded; error otherwise.
|
||||||
func (cs *clientStream) retryLocked(lastErr error) error {
|
func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
|
||||||
for {
|
for {
|
||||||
cs.attempt.finish(toRPCErr(lastErr))
|
attempt.finish(toRPCErr(lastErr))
|
||||||
isTransparent, err := cs.shouldRetry(lastErr)
|
isTransparent, err := attempt.shouldRetry(lastErr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cs.commitAttemptLocked()
|
cs.commitAttemptLocked()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cs.firstAttempt = false
|
cs.firstAttempt = false
|
||||||
if err := cs.newAttemptLocked(isTransparent); err != nil {
|
attempt, err = cs.newAttemptLocked(isTransparent)
|
||||||
|
if err != nil {
|
||||||
|
// Only returns error if the clientconn is closed or the context of
|
||||||
|
// the stream is canceled.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if lastErr = cs.replayBufferLocked(); lastErr == nil {
|
// Note that the first op in the replay buffer always sets cs.attempt
|
||||||
|
// if it is able to pick a transport and create a stream.
|
||||||
|
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -673,7 +687,10 @@ func (cs *clientStream) Context() context.Context {
|
|||||||
cs.commitAttempt()
|
cs.commitAttempt()
|
||||||
// No need to lock before using attempt, since we know it is committed and
|
// No need to lock before using attempt, since we know it is committed and
|
||||||
// cannot change.
|
// cannot change.
|
||||||
return cs.attempt.s.Context()
|
if cs.attempt.s != nil {
|
||||||
|
return cs.attempt.s.Context()
|
||||||
|
}
|
||||||
|
return cs.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
|
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
|
||||||
@ -703,7 +720,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
|
|||||||
cs.mu.Unlock()
|
cs.mu.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := cs.retryLocked(err); err != nil {
|
if err := cs.retryLocked(a, err); err != nil {
|
||||||
cs.mu.Unlock()
|
cs.mu.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -734,7 +751,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
|
|||||||
cs.binlog.Log(logEntry)
|
cs.binlog.Log(logEntry)
|
||||||
cs.serverHeaderBinlogged = true
|
cs.serverHeaderBinlogged = true
|
||||||
}
|
}
|
||||||
return m, err
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) Trailer() metadata.MD {
|
func (cs *clientStream) Trailer() metadata.MD {
|
||||||
@ -752,10 +769,9 @@ func (cs *clientStream) Trailer() metadata.MD {
|
|||||||
return cs.attempt.s.Trailer()
|
return cs.attempt.s.Trailer()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) replayBufferLocked() error {
|
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
|
||||||
a := cs.attempt
|
|
||||||
for _, f := range cs.buffer {
|
for _, f := range cs.buffer {
|
||||||
if err := f(a); err != nil {
|
if err := f(attempt); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -803,22 +819,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||||||
if len(payload) > *cs.callInfo.maxSendMessageSize {
|
if len(payload) > *cs.callInfo.maxSendMessageSize {
|
||||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
|
||||||
}
|
}
|
||||||
msgBytes := data // Store the pointer before setting to nil. For binary logging.
|
|
||||||
op := func(a *csAttempt) error {
|
op := func(a *csAttempt) error {
|
||||||
err := a.sendMsg(m, hdr, payload, data)
|
return a.sendMsg(m, hdr, payload, data)
|
||||||
// nil out the message and uncomp when replaying; they are only needed for
|
|
||||||
// stats which is disabled for subsequent attempts.
|
|
||||||
m, data = nil, nil
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
|
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
|
||||||
if cs.binlog != nil && err == nil {
|
if cs.binlog != nil && err == nil {
|
||||||
cs.binlog.Log(&binarylog.ClientMessage{
|
cs.binlog.Log(&binarylog.ClientMessage{
|
||||||
OnClientSide: true,
|
OnClientSide: true,
|
||||||
Message: msgBytes,
|
Message: data,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) RecvMsg(m interface{}) error {
|
func (cs *clientStream) RecvMsg(m interface{}) error {
|
||||||
@ -1370,8 +1381,10 @@ func (as *addrConnStream) finish(err error) {
|
|||||||
|
|
||||||
// ServerStream defines the server-side behavior of a streaming RPC.
|
// ServerStream defines the server-side behavior of a streaming RPC.
|
||||||
//
|
//
|
||||||
// All errors returned from ServerStream methods are compatible with the
|
// Errors returned from ServerStream methods are compatible with the status
|
||||||
// status package.
|
// package. However, the status code will often not match the RPC status as
|
||||||
|
// seen by the client application, and therefore, should not be relied upon for
|
||||||
|
// this purpose.
|
||||||
type ServerStream interface {
|
type ServerStream interface {
|
||||||
// SetHeader sets the header metadata. It may be called multiple times.
|
// SetHeader sets the header metadata. It may be called multiple times.
|
||||||
// When call multiple times, all the provided metadata will be merged.
|
// When call multiple times, all the provided metadata will be merged.
|
||||||
|
2
vendor/google.golang.org/grpc/version.go
generated
vendored
2
vendor/google.golang.org/grpc/version.go
generated
vendored
@ -19,4 +19,4 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
// Version is the current grpc version.
|
// Version is the current grpc version.
|
||||||
const Version = "1.46.0"
|
const Version = "1.47.0"
|
||||||
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@ -531,7 +531,7 @@ google.golang.org/genproto/googleapis/rpc/code
|
|||||||
google.golang.org/genproto/googleapis/rpc/errdetails
|
google.golang.org/genproto/googleapis/rpc/errdetails
|
||||||
google.golang.org/genproto/googleapis/rpc/status
|
google.golang.org/genproto/googleapis/rpc/status
|
||||||
google.golang.org/genproto/protobuf/field_mask
|
google.golang.org/genproto/protobuf/field_mask
|
||||||
# google.golang.org/grpc v1.46.0
|
# google.golang.org/grpc v1.47.0
|
||||||
## explicit; go 1.14
|
## explicit; go 1.14
|
||||||
google.golang.org/grpc
|
google.golang.org/grpc
|
||||||
google.golang.org/grpc/attributes
|
google.golang.org/grpc/attributes
|
||||||
|
Loading…
Reference in New Issue
Block a user