Upgrade OpenTelemetry dependencies

This commit upgrades the packages under go.opentelemetry.io/.

Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
This commit is contained in:
Kazuyoshi Kato
2021-12-16 22:35:57 +00:00
parent 28ffedd06b
commit 2fb739aa21
247 changed files with 10755 additions and 2936 deletions

View File

@@ -25,55 +25,75 @@
// later release.
package attributes
import "fmt"
// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys.
// types for keys. Values should not be modified after they are added to an
// Attributes or if they were received from one. If values implement 'Equal(o
// interface{}) bool', it will be called by (*Attributes).Equal to determine
// whether two values with the same key should be considered equal.
type Attributes struct {
m map[interface{}]interface{}
}
// New returns a new Attributes containing all key/value pairs in kvs. If the
// same key appears multiple times, the last value overwrites all previous
// values for that key. Panics if len(kvs) is not even.
func New(kvs ...interface{}) *Attributes {
if len(kvs)%2 != 0 {
panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs)))
}
a := &Attributes{m: make(map[interface{}]interface{}, len(kvs)/2)}
for i := 0; i < len(kvs)/2; i++ {
a.m[kvs[i*2]] = kvs[i*2+1]
}
return a
// New returns a new Attributes containing the key/value pair.
func New(key, value interface{}) *Attributes {
return &Attributes{m: map[interface{}]interface{}{key: value}}
}
// WithValues returns a new Attributes containing all key/value pairs in a and
// kvs. Panics if len(kvs) is not even. If the same key appears multiple
// times, the last value overwrites all previous values for that key. To
// remove an existing key, use a nil value.
func (a *Attributes) WithValues(kvs ...interface{}) *Attributes {
// WithValue returns a new Attributes containing the previous keys and values
// and the new key/value pair. If the same key appears multiple times, the
// last value overwrites all previous values for that key. To remove an
// existing key, use a nil value. value should not be modified later.
func (a *Attributes) WithValue(key, value interface{}) *Attributes {
if a == nil {
return New(kvs...)
return New(key, value)
}
if len(kvs)%2 != 0 {
panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs)))
}
n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+len(kvs)/2)}
n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+1)}
for k, v := range a.m {
n.m[k] = v
}
for i := 0; i < len(kvs)/2; i++ {
n.m[kvs[i*2]] = kvs[i*2+1]
}
n.m[key] = value
return n
}
// Value returns the value associated with these attributes for key, or nil if
// no value is associated with key.
// no value is associated with key. The returned value should not be modified.
func (a *Attributes) Value(key interface{}) interface{} {
if a == nil {
return nil
}
return a.m[key]
}
// Equal returns whether a and o are equivalent. If 'Equal(o interface{})
// bool' is implemented for a value in the attributes, it is called to
// determine if the value matches the one stored in the other attributes. If
// Equal is not implemented, standard equality is used to determine if the two
// values are equal.
func (a *Attributes) Equal(o *Attributes) bool {
if a == nil && o == nil {
return true
}
if a == nil || o == nil {
return false
}
if len(a.m) != len(o.m) {
return false
}
for k, v := range a.m {
ov, ok := o.m[k]
if !ok {
// o missing element of a
return false
}
if eq, ok := v.(interface{ Equal(o interface{}) bool }); ok {
if !eq.Equal(ov) {
return false
}
} else if v != ov {
// Fallback to a standard equality check if Value is unimplemented.
return false
}
}
return true
}

View File

@@ -174,25 +174,32 @@ type ClientConn interface {
// BuildOptions contains additional information for Build.
type BuildOptions struct {
// DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations
// can ignore this if it does not need to talk to another party securely.
// DialCreds is the transport credentials to use when communicating with a
// remote load balancer server. Balancer implementations which do not
// communicate with a remote load balancer server can ignore this field.
DialCreds credentials.TransportCredentials
// CredsBundle is the credentials bundle that the Balancer can use.
// CredsBundle is the credentials bundle to use when communicating with a
// remote load balancer server. Balancer implementations which do not
// communicate with a remote load balancer server can ignore this field.
CredsBundle credentials.Bundle
// Dialer is the custom dialer the Balancer implementation can use to dial
// to a remote load balancer server. The Balancer implementations
// can ignore this if it doesn't need to talk to remote balancer.
// Dialer is the custom dialer to use when communicating with a remote load
// balancer server. Balancer implementations which do not communicate with a
// remote load balancer server can ignore this field.
Dialer func(context.Context, string) (net.Conn, error)
// ChannelzParentID is the entity parent's channelz unique identification number.
// Authority is the server name to use as part of the authentication
// handshake when communicating with a remote load balancer server. Balancer
// implementations which do not communicate with a remote load balancer
// server can ignore this field.
Authority string
// ChannelzParentID is the parent ClientConn's channelz ID.
ChannelzParentID int64
// CustomUserAgent is the custom user agent set on the parent ClientConn.
// The balancer should set the same custom user agent if it creates a
// ClientConn.
CustomUserAgent string
// Target contains the parsed address info of the dial target. It is the same resolver.Target as
// passed to the resolver.
// See the documentation for the resolver.Target type for details about what it contains.
// Target contains the parsed address info of the dial target. It is the
// same resolver.Target as passed to the resolver. See the documentation for
// the resolver.Target type for details about what it contains.
Target resolver.Target
}

View File

@@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
@@ -42,7 +41,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: make(map[resolver.Address]subConnInfo),
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
@@ -58,11 +57,6 @@ func (bb *baseBuilder) Name() string {
return bb.name
}
type subConnInfo struct {
subConn balancer.SubConn
attrs *attributes.Attributes
}
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
@@ -70,7 +64,7 @@ type baseBalancer struct {
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses)
subConns *resolver.AddressMap
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
@@ -81,7 +75,7 @@ type baseBalancer struct {
func (b *baseBalancer) ResolverError(err error) {
b.resolverErr = err
if len(b.subConns) == 0 {
if b.subConns.Len() == 0 {
b.state = connectivity.TransientFailure
}
@@ -105,53 +99,29 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
addrsSet := resolver.NewAddressMap()
for _, a := range s.ResolverState.Addresses {
// Strip attributes from addresses before using them as map keys. So
// that when two addresses only differ in attributes pointers (but with
// the same attribute content), they are considered the same address.
//
// Note that this doesn't handle the case where the attribute content is
// different. So if users want to set different attributes to create
// duplicate connections to the same backend, it doesn't work. This is
// fine for now, because duplicate is done by setting Metadata today.
//
// TODO: read attributes to handle duplicate connections.
aNoAttrs := a
aNoAttrs.Attributes = nil
addrsSet[aNoAttrs] = struct{}{}
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
//
// When creating SubConn, the original address with attributes is
// passed through. So that connection configurations in attributes
// (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
} else {
// Always update the subconn's address in case the attributes
// changed.
//
// The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
scInfo.attrs = a.Attributes
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})
}
}
for a, scInfo := range b.subConns {
for _, a := range b.subConns.Keys() {
sci, _ := b.subConns.Get(a)
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(scInfo.subConn)
delete(b.subConns, a)
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
}
@@ -193,10 +163,11 @@ func (b *baseBalancer) regeneratePicker() {
readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
for addr, scInfo := range b.subConns {
if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready {
addr.Attributes = scInfo.attrs
readySCs[scInfo.subConn] = SubConnInfo{Address: addr}
for _, addr := range b.subConns.Keys() {
sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
}
}
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})

View File

@@ -39,7 +39,7 @@ type State struct {
// Set returns a copy of the provided state with attributes containing s. s's
// data should not be mutated after calling Set.
func Set(state resolver.State, s *State) resolver.State {
state.Attributes = state.Attributes.WithValues(key, s)
state.Attributes = state.Attributes.WithValue(key, s)
return state
}

View File

@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"math"
"net/url"
"reflect"
"strings"
"sync"
@@ -37,7 +38,6 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
@@ -83,13 +83,13 @@ var (
// errTransportCredsAndBundle indicates that creds bundle is used together
// with other individual Transport Credentials.
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
// errTransportCredentialsMissing indicates that users want to transmit security
// information (e.g., OAuth2 token) which requires secure connection on an insecure
// connection.
// errNoTransportCredsInBundle indicated that the configured creds bundle
// returned a transport credentials which was nil.
errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
// errTransportCredentialsMissing indicates that users want to transmit
// security information (e.g., OAuth2 token) which requires secure
// connection on an insecure connection.
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
)
const (
@@ -177,17 +177,20 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.csMgr.channelzID = cc.channelzID
}
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
} else {
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
return nil, errCredentialsConflict
}
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
return nil, errNoTransportCredsInBundle
}
transportCreds := cc.dopts.copts.TransportCredentials
if transportCreds == nil {
transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
}
if transportCreds.Info().SecurityProtocol == "insecure" {
for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
@@ -248,38 +251,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
// Determine the resolver to use.
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
}
resolverBuilder, err := cc.parseTargetAndFindResolver()
if err != nil {
return nil, err
}
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
} else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
cc.authority = "localhost"
} else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
cc.authority = "localhost" + cc.parsedTarget.Endpoint
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc.authority = cc.parsedTarget.Endpoint
cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
if err != nil {
return nil, err
}
channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
@@ -305,6 +285,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
@@ -652,7 +633,10 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
}
var ret error
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
if cc.dopts.disableServiceConfig {
channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
cc.maybeApplyDefaultServiceConfig(s.Addresses)
} else if s.ServiceConfig == nil {
cc.maybeApplyDefaultServiceConfig(s.Addresses)
// TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design?
@@ -902,10 +886,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
// ac.state is Ready, try to find the connected address.
var curAddrFound bool
for _, a := range addrs {
// a.ServerName takes precedent over ClientConn authority, if present.
if a.ServerName == "" {
a.ServerName = ac.cc.authority
}
a.ServerName = ac.cc.getServerName(a)
if reflect.DeepEqual(ac.curAddr, a) {
curAddrFound = true
break
@@ -919,6 +900,26 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return curAddrFound
}
// getServerName determines the serverName to be used in the connection
// handshake. The default value for the serverName is the authority on the
// ClientConn, which either comes from the user's dial target or through an
// authority override specified using the WithAuthority dial option. Name
// resolvers can specify a per-address override for the serverName through the
// resolver.Address.ServerName field which is used only if the WithAuthority
// dial option was not used. The rationale is that per-address authority
// overrides specified by the name resolver can represent a security risk, while
// an override specified by the user is more dependable since they probably know
// what they are doing.
func (cc *ClientConn) getServerName(addr resolver.Address) string {
if cc.dopts.authority != "" {
return cc.dopts.authority
}
if addr.ServerName != "" {
return addr.ServerName
}
return cc.authority
}
func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
if sc == nil {
return MethodConfig{}
@@ -1275,11 +1276,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
prefaceReceived := grpcsync.NewEvent()
connClosed := grpcsync.NewEvent()
// addr.ServerName takes precedent over ClientConn authority, if present.
if addr.ServerName == "" {
addr.ServerName = ac.cc.authority
}
addr.ServerName = ac.cc.getServerName(addr)
hctx, hcancel := context.WithCancel(ac.ctx)
hcStarted := false // protected by ac.mu
@@ -1621,3 +1618,114 @@ func (cc *ClientConn) connectionError() error {
defer cc.lceMu.Unlock()
return cc.lastConnectionError
}
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
var rb resolver.Builder
parsedTarget, err := parseTarget(cc.target)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
} else {
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
return rb, nil
}
}
// We are here because the user's dial target did not contain a scheme or
// specified an unregistered scheme. We should fallback to the default
// scheme, except when a custom dialer is specified in which case, we should
// always use passthrough scheme.
defScheme := resolver.GetDefaultScheme()
channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
canonicalTarget := defScheme + ":///" + cc.target
parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
return nil, err
}
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.Scheme)
if rb == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
}
cc.parsedTarget = parsedTarget
return rb, nil
}
// parseTarget uses RFC 3986 semantics to parse the given target into a
// resolver.Target struct containing scheme, authority and endpoint. Query
// params are stripped from the endpoint.
func parseTarget(target string) (resolver.Target, error) {
u, err := url.Parse(target)
if err != nil {
return resolver.Target{}, err
}
// For targets of the form "[scheme]://[authority]/endpoint, the endpoint
// value returned from url.Parse() contains a leading "/". Although this is
// in accordance with RFC 3986, we do not want to break existing resolver
// implementations which expect the endpoint without the leading "/". So, we
// end up stripping the leading "/" here. But this will result in an
// incorrect parsing for something like "unix:///path/to/socket". Since we
// own the "unix" resolver, we can workaround in the unix resolver by using
// the `URL` field instead of the `Endpoint` field.
endpoint := u.Path
if endpoint == "" {
endpoint = u.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")
return resolver.Target{
Scheme: u.Scheme,
Authority: u.Host,
Endpoint: endpoint,
URL: *u,
}, nil
}
// Determine channel authority. The order of precedence is as follows:
// - user specified authority override using `WithAuthority` dial option
// - creds' notion of server name for the authentication handshake
// - endpoint from dial target of the form "scheme://[authority]/endpoint"
func determineAuthority(endpoint, target string, dopts dialOptions) (string, error) {
// Historically, we had two options for users to specify the serverName or
// authority for a channel. One was through the transport credentials
// (either in its constructor, or through the OverrideServerName() method).
// The other option (for cases where WithInsecure() dial option was used)
// was to use the WithAuthority() dial option.
//
// A few things have changed since:
// - `insecure` package with an implementation of the `TransportCredentials`
// interface for the insecure case
// - WithAuthority() dial option support for secure credentials
authorityFromCreds := ""
if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
authorityFromCreds = creds.Info().ServerName
}
authorityFromDialOption := dopts.authority
if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
}
switch {
case authorityFromDialOption != "":
return authorityFromDialOption, nil
case authorityFromCreds != "":
return authorityFromCreds, nil
case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):
// TODO: remove when the unix resolver implements optional interface to
// return channel authority.
return "localhost", nil
case strings.HasPrefix(endpoint, ":"):
return "localhost" + endpoint, nil
default:
// TODO: Define an optional interface on the resolver builder to return
// the channel authority given the user's dial target. For resolvers
// which don't implement this interface, we will use the endpoint from
// "scheme://authority/endpoint" as the default authority.
return endpoint, nil
}
}

View File

@@ -140,6 +140,11 @@ type TransportCredentials interface {
// Additionally, ClientHandshakeInfo data will be available via the context
// passed to this call.
//
// The second argument to this method is the `:authority` header value used
// while creating new streams on this connection after authentication
// succeeds. Implementations must use this as the server name during the
// authentication handshake.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
// ServerHandshake does the authentication handshake for servers. It returns
@@ -153,9 +158,13 @@ type TransportCredentials interface {
Info() ProtocolInfo
// Clone makes a copy of this TransportCredentials.
Clone() TransportCredentials
// OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
// gRPC internals also use it to override the virtual hosting name if it is set.
// It must be called before dialing. Currently, this is only used by grpclb.
// OverrideServerName specifies the value used for the following:
// - verifying the hostname on the returned certificates
// - as SNI in the client's handshake to support virtual hosting
// - as the value for `:authority` header at stream creation time
//
// Deprecated: use grpc.WithAuthority instead. Will be supported
// throughout 1.x.
OverrideServerName(string) error
}
@@ -169,8 +178,18 @@ type TransportCredentials interface {
//
// This API is experimental.
type Bundle interface {
// TransportCredentials returns the transport credentials from the Bundle.
//
// Implementations must return non-nil transport credentials. If transport
// security is not needed by the Bundle, implementations may choose to
// return insecure.NewCredentials().
TransportCredentials() TransportCredentials
// PerRPCCredentials returns the per-RPC credentials from the Bundle.
//
// May be nil if per-RPC credentials are not needed.
PerRPCCredentials() PerRPCCredentials
// NewWithMode should make a copy of Bundle, and switch mode. Modifying the
// existing Bundle may cause races.
//

View File

@@ -0,0 +1,77 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package insecure provides an implementation of the
// credentials.TransportCredentials interface which disables transport security.
//
// Experimental
//
// Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release.
package insecure
import (
"context"
"net"
"google.golang.org/grpc/credentials"
)
// NewCredentials returns a credentials which disables transport security.
//
// Note that using this credentials with per-RPC credentials which require
// transport security is incompatible and will cause grpc.Dial() to fail.
func NewCredentials() credentials.TransportCredentials {
return insecureTC{}
}
// insecureTC implements the insecure transport credentials. The handshake
// methods simply return the passed in net.Conn and set the security level to
// NoSecurity.
type insecureTC struct{}
func (insecureTC) ClientHandshake(ctx context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil
}
func (insecureTC) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil
}
func (insecureTC) Info() credentials.ProtocolInfo {
return credentials.ProtocolInfo{SecurityProtocol: "insecure"}
}
func (insecureTC) Clone() credentials.TransportCredentials {
return insecureTC{}
}
func (insecureTC) OverrideServerName(string) error {
return nil
}
// info contains the auth information for an insecure connection.
// It implements the AuthInfo interface.
type info struct {
credentials.CommonAuthInfo
}
// AuthType returns the type of info as a string.
func (info) AuthType() string {
return "insecure"
}

View File

@@ -27,9 +27,9 @@ import (
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
@@ -50,7 +50,6 @@ type dialOptions struct {
bs internalbackoff.Strategy
block bool
returnLastError bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
authority string
@@ -228,18 +227,14 @@ func WithServiceConfig(c <-chan ServiceConfig) DialOption {
})
}
// WithConnectParams configures the dialer to use the provided ConnectParams.
// WithConnectParams configures the ClientConn to use the provided ConnectParams
// for creating and maintaining connections to servers.
//
// The backoff configuration specified as part of the ConnectParams overrides
// all defaults specified in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. Consider
// using the backoff.DefaultConfig as a base, in cases where you want to
// override only a subset of the backoff configuration.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithConnectParams(p ConnectParams) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.bs = internalbackoff.Exponential{Config: p.Backoff}
@@ -303,11 +298,17 @@ func WithReturnConnectionError() DialOption {
}
// WithInsecure returns a DialOption which disables transport security for this
// ClientConn. Note that transport security is required unless WithInsecure is
// set.
// ClientConn. Under the hood, it uses insecure.NewCredentials().
//
// Note that using this DialOption with per-RPC credentials (through
// WithCredentialsBundle or WithPerRPCCredentials) which require transport
// security is incompatible and will cause grpc.Dial() to fail.
//
// Deprecated: use insecure.NewCredentials() instead.
// Will be supported throughout 1.x.
func WithInsecure() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.insecure = true
o.copts.TransportCredentials = insecure.NewCredentials()
})
}
@@ -482,8 +483,7 @@ func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOpt
}
// WithAuthority returns a DialOption that specifies the value to be used as the
// :authority pseudo-header. This value only works with WithInsecure and has no
// effect if TransportCredentials are present.
// :authority pseudo-header and as the server name in authentication handshake.
func WithAuthority(a string) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.authority = a
@@ -519,14 +519,16 @@ func WithDisableServiceConfig() DialOption {
// WithDefaultServiceConfig returns a DialOption that configures the default
// service config, which will be used in cases where:
//
// 1. WithDisableServiceConfig is also used.
// 2. Resolver does not return a service config or if the resolver returns an
// invalid service config.
// 1. WithDisableServiceConfig is also used, or
//
// Experimental
// 2. The name resolver does not provide a service config or provides an
// invalid service config.
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
// The parameter s is the JSON representation of the default service config.
// For more information about service configs, see:
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
// For a simple example of usage, see:
// examples/features/load_balancing/client/main.go
func WithDefaultServiceConfig(s string) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.defaultServiceConfigRawJSON = &s
@@ -538,14 +540,8 @@ func WithDefaultServiceConfig(s string) DialOption {
// will happen automatically if no data is written to the wire or if the RPC is
// unprocessed by the remote server.
//
// Retry support is currently disabled by default, but will be enabled by
// default in the future. Until then, it may be enabled by setting the
// environment variable "GRPC_GO_RETRY" to "on".
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
// Retry support is currently enabled by default, but may be disabled by
// setting the environment variable "GRPC_GO_RETRY" to "off".
func WithDisableRetry() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.disableRetry = true
@@ -585,7 +581,6 @@ func withHealthCheckFunc(f internal.HealthChecker) DialOption {
func defaultDialOptions() dialOptions {
return dialOptions{
disableRetry: !envconfig.Retry,
healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,

View File

@@ -4,7 +4,8 @@ go 1.14
require (
github.com/cespare/xxhash/v2 v2.1.1
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3

View File

@@ -9,10 +9,13 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158 h1:CevA8fI91PAnP8vpnXuB8ZYAZ5wqY86nAbxfgK8tWO4=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 h1:hzAQntlaYRkVSFEfj9OTWlVV1H155FMD8BTKktLv0QI=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=

View File

@@ -19,11 +19,14 @@
package grpclog
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"google.golang.org/grpc/internal/grpclog"
)
@@ -95,8 +98,9 @@ var severityName = []string{
// loggerT is the default logger used by grpclog.
type loggerT struct {
m []*log.Logger
v int
m []*log.Logger
v int
jsonFormat bool
}
// NewLoggerV2 creates a loggerV2 with the provided writers.
@@ -105,19 +109,32 @@ type loggerT struct {
// Warning logs will be written to warningW and infoW.
// Info logs will be written to infoW.
func NewLoggerV2(infoW, warningW, errorW io.Writer) LoggerV2 {
return NewLoggerV2WithVerbosity(infoW, warningW, errorW, 0)
return newLoggerV2WithConfig(infoW, warningW, errorW, loggerV2Config{})
}
// NewLoggerV2WithVerbosity creates a loggerV2 with the provided writers and
// verbosity level.
func NewLoggerV2WithVerbosity(infoW, warningW, errorW io.Writer, v int) LoggerV2 {
return newLoggerV2WithConfig(infoW, warningW, errorW, loggerV2Config{verbose: v})
}
type loggerV2Config struct {
verbose int
jsonFormat bool
}
func newLoggerV2WithConfig(infoW, warningW, errorW io.Writer, c loggerV2Config) LoggerV2 {
var m []*log.Logger
m = append(m, log.New(infoW, severityName[infoLog]+": ", log.LstdFlags))
m = append(m, log.New(io.MultiWriter(infoW, warningW), severityName[warningLog]+": ", log.LstdFlags))
flag := log.LstdFlags
if c.jsonFormat {
flag = 0
}
m = append(m, log.New(infoW, "", flag))
m = append(m, log.New(io.MultiWriter(infoW, warningW), "", flag))
ew := io.MultiWriter(infoW, warningW, errorW) // ew will be used for error and fatal.
m = append(m, log.New(ew, severityName[errorLog]+": ", log.LstdFlags))
m = append(m, log.New(ew, severityName[fatalLog]+": ", log.LstdFlags))
return &loggerT{m: m, v: v}
m = append(m, log.New(ew, "", flag))
m = append(m, log.New(ew, "", flag))
return &loggerT{m: m, v: c.verbose, jsonFormat: c.jsonFormat}
}
// newLoggerV2 creates a loggerV2 to be used as default logger.
@@ -142,58 +159,79 @@ func newLoggerV2() LoggerV2 {
if vl, err := strconv.Atoi(vLevel); err == nil {
v = vl
}
return NewLoggerV2WithVerbosity(infoW, warningW, errorW, v)
jsonFormat := strings.EqualFold(os.Getenv("GRPC_GO_LOG_FORMATTER"), "json")
return newLoggerV2WithConfig(infoW, warningW, errorW, loggerV2Config{
verbose: v,
jsonFormat: jsonFormat,
})
}
func (g *loggerT) output(severity int, s string) {
sevStr := severityName[severity]
if !g.jsonFormat {
g.m[severity].Output(2, fmt.Sprintf("%v: %v", sevStr, s))
return
}
// TODO: we can also include the logging component, but that needs more
// (API) changes.
b, _ := json.Marshal(map[string]string{
"severity": sevStr,
"message": s,
})
g.m[severity].Output(2, string(b))
}
func (g *loggerT) Info(args ...interface{}) {
g.m[infoLog].Print(args...)
g.output(infoLog, fmt.Sprint(args...))
}
func (g *loggerT) Infoln(args ...interface{}) {
g.m[infoLog].Println(args...)
g.output(infoLog, fmt.Sprintln(args...))
}
func (g *loggerT) Infof(format string, args ...interface{}) {
g.m[infoLog].Printf(format, args...)
g.output(infoLog, fmt.Sprintf(format, args...))
}
func (g *loggerT) Warning(args ...interface{}) {
g.m[warningLog].Print(args...)
g.output(warningLog, fmt.Sprint(args...))
}
func (g *loggerT) Warningln(args ...interface{}) {
g.m[warningLog].Println(args...)
g.output(warningLog, fmt.Sprintln(args...))
}
func (g *loggerT) Warningf(format string, args ...interface{}) {
g.m[warningLog].Printf(format, args...)
g.output(warningLog, fmt.Sprintf(format, args...))
}
func (g *loggerT) Error(args ...interface{}) {
g.m[errorLog].Print(args...)
g.output(errorLog, fmt.Sprint(args...))
}
func (g *loggerT) Errorln(args ...interface{}) {
g.m[errorLog].Println(args...)
g.output(errorLog, fmt.Sprintln(args...))
}
func (g *loggerT) Errorf(format string, args ...interface{}) {
g.m[errorLog].Printf(format, args...)
g.output(errorLog, fmt.Sprintf(format, args...))
}
func (g *loggerT) Fatal(args ...interface{}) {
g.m[fatalLog].Fatal(args...)
// No need to call os.Exit() again because log.Logger.Fatal() calls os.Exit().
g.output(fatalLog, fmt.Sprint(args...))
os.Exit(1)
}
func (g *loggerT) Fatalln(args ...interface{}) {
g.m[fatalLog].Fatalln(args...)
// No need to call os.Exit() again because log.Logger.Fatal() calls os.Exit().
g.output(fatalLog, fmt.Sprintln(args...))
os.Exit(1)
}
func (g *loggerT) Fatalf(format string, args ...interface{}) {
g.m[fatalLog].Fatalf(format, args...)
// No need to call os.Exit() again because log.Logger.Fatal() calls os.Exit().
g.output(fatalLog, fmt.Sprintf(format, args...))
os.Exit(1)
}
func (g *loggerT) V(l int) bool {

View File

@@ -204,9 +204,9 @@ func RegisterChannel(c Channel, pid int64, ref string) int64 {
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
if pid == 0 {
db.get().addChannel(id, cn, true, pid, ref)
db.get().addChannel(id, cn, true, pid)
} else {
db.get().addChannel(id, cn, false, pid, ref)
db.get().addChannel(id, cn, false, pid)
}
return id
}
@@ -228,7 +228,7 @@ func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
pid: pid,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
db.get().addSubChannel(id, sc, pid, ref)
db.get().addSubChannel(id, sc, pid)
return id
}
@@ -258,7 +258,7 @@ func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
}
id := idGen.genID()
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addListenSocket(id, ls, pid, ref)
db.get().addListenSocket(id, ls, pid)
return id
}
@@ -273,11 +273,11 @@ func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
}
id := idGen.genID()
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addNormalSocket(id, ns, pid, ref)
db.get().addNormalSocket(id, ns, pid)
return id
}
// RemoveEntry removes an entry with unique channelz trakcing id to be id from
// RemoveEntry removes an entry with unique channelz tracking id to be id from
// channelz database.
func RemoveEntry(id int64) {
db.get().removeEntry(id)
@@ -333,7 +333,7 @@ func (c *channelMap) addServer(id int64, s *server) {
c.mu.Unlock()
}
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) {
c.mu.Lock()
cn.cm = c
cn.trace.cm = c
@@ -346,7 +346,7 @@ func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid in
c.mu.Unlock()
}
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) {
c.mu.Lock()
sc.cm = c
sc.trace.cm = c
@@ -355,7 +355,7 @@ func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref stri
c.mu.Unlock()
}
func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) {
c.mu.Lock()
ls.cm = c
c.listenSockets[id] = ls
@@ -363,7 +363,7 @@ func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref
c.mu.Unlock()
}
func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) {
c.mu.Lock()
ns.cm = c
c.normalSockets[id] = ns

View File

@@ -22,19 +22,14 @@ package envconfig
import (
"os"
"strings"
xdsenv "google.golang.org/grpc/internal/xds/env"
)
const (
prefix = "GRPC_GO_"
retryStr = prefix + "RETRY"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on" or if XDS retry support is enabled.
Retry = strings.EqualFold(os.Getenv(retryStr), "on") || xdsenv.RetrySupport
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
)

View File

@@ -0,0 +1,90 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package envconfig
import (
"os"
"strings"
)
const (
// XDSBootstrapFileNameEnv is the env variable to set bootstrap file name.
// Do not use this and read from env directly. Its value is read and kept in
// variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
// XDSBootstrapFileContentEnv is the env variable to set bootstrapp file
// content. Do not use this and read from env directly. Its value is read
// and kept in variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC"
federationEnv = "GRPC_EXPERIMENTAL_XDS_FEDERATION"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
)
var (
// XDSBootstrapFileName holds the name of the file which contains xDS
// bootstrap configuration. Users can specify the location of the bootstrap
// file by setting the environment variable "GRPC_XDS_BOOTSTRAP".
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileName = os.Getenv(XDSBootstrapFileNameEnv)
// XDSBootstrapFileContent holds the content of the xDS bootstrap
// configuration. Users can specify the bootstrap config by setting the
// environment variable "GRPC_XDS_BOOTSTRAP_CONFIG".
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileContent = os.Getenv(XDSBootstrapFileContentEnv)
// XDSRingHash indicates whether ring hash support is enabled, which can be
// disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
XDSRingHash = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
// XDSClientSideSecurity is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
XDSClientSideSecurity = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false")
// XDSAggregateAndDNS indicates whether processing of aggregated cluster
// and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
XDSAggregateAndDNS = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
// which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
// XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
)

View File

@@ -0,0 +1,20 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package grpcutil provides utility functions used across the gRPC codebase.
package grpcutil

View File

@@ -0,0 +1,28 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcutil
import "regexp"
// FullMatchWithRegex returns whether the full string matches the regex provided.
func FullMatchWithRegex(re *regexp.Regexp, string string) bool {
re.Longest()
rem := re.FindString(string)
return len(rem) == len(string)
}

View File

@@ -1,89 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package grpcutil provides a bunch of utility functions to be used across the
// gRPC codebase.
package grpcutil
import (
"strings"
"google.golang.org/grpc/resolver"
)
// split2 returns the values from strings.SplitN(s, sep, 2).
// If sep is not found, it returns ("", "", false) instead.
func split2(s, sep string) (string, string, bool) {
spl := strings.SplitN(s, sep, 2)
if len(spl) < 2 {
return "", "", false
}
return spl[0], spl[1], true
}
// ParseTarget splits target into a resolver.Target struct containing scheme,
// authority and endpoint. skipUnixColonParsing indicates that the parse should
// not parse "unix:[path]" cases. This should be true in cases where a custom
// dialer is present, to prevent a behavior change.
//
// If target is not a valid scheme://authority/endpoint as specified in
// https://github.com/grpc/grpc/blob/master/doc/naming.md,
// it returns {Endpoint: target}.
func ParseTarget(target string, skipUnixColonParsing bool) (ret resolver.Target) {
var ok bool
if strings.HasPrefix(target, "unix-abstract:") {
if strings.HasPrefix(target, "unix-abstract://") {
// Maybe, with Authority specified, try to parse it
var remain string
ret.Scheme, remain, _ = split2(target, "://")
ret.Authority, ret.Endpoint, ok = split2(remain, "/")
if !ok {
// No Authority, add the "//" back
ret.Endpoint = "//" + remain
} else {
// Found Authority, add the "/" back
ret.Endpoint = "/" + ret.Endpoint
}
} else {
// Without Authority specified, split target on ":"
ret.Scheme, ret.Endpoint, _ = split2(target, ":")
}
return ret
}
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
if strings.HasPrefix(target, "unix:") && !skipUnixColonParsing {
// Handle the "unix:[local/path]" and "unix:[/absolute/path]" cases,
// because splitting on :// only handles the
// "unix://[/absolute/path]" case. Only handle if the dialer is nil,
// to avoid a behavior change with custom dialers.
return resolver.Target{Scheme: "unix", Endpoint: target[len("unix:"):]}
}
return resolver.Target{Endpoint: target}
}
ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
if !ok {
return resolver.Target{Endpoint: target}
}
if ret.Scheme == "unix" {
// Add the "/" back in the unix case, so the unix resolver receives the
// actual endpoint in the "unix://[/absolute/path]" case.
ret.Endpoint = "/" + ret.Endpoint
}
return ret
}

View File

@@ -30,14 +30,38 @@ type mdKeyType string
const mdKey = mdKeyType("grpc.internal.address.metadata")
type mdValue metadata.MD
func (m mdValue) Equal(o interface{}) bool {
om, ok := o.(mdValue)
if !ok {
return false
}
if len(m) != len(om) {
return false
}
for k, v := range m {
ov := om[k]
if len(ov) != len(v) {
return false
}
for i, ve := range v {
if ov[i] != ve {
return false
}
}
}
return true
}
// Get returns the metadata of addr.
func Get(addr resolver.Address) metadata.MD {
attrs := addr.Attributes
if attrs == nil {
return nil
}
md, _ := attrs.Value(mdKey).(metadata.MD)
return md
md, _ := attrs.Value(mdKey).(mdValue)
return metadata.MD(md)
}
// Set sets (overrides) the metadata in addr.
@@ -45,6 +69,6 @@ func Get(addr resolver.Address) metadata.MD {
// When a SubConn is created with this address, the RPCs sent on it will all
// have this metadata.
func Set(addr resolver.Address, md metadata.MD) resolver.Address {
addr.Attributes = addr.Attributes.WithValues(mdKey, md)
addr.Attributes = addr.Attributes.WithValue(mdKey, mdValue(md))
return addr
}

View File

@@ -132,7 +132,7 @@ const csKey = csKeyType("grpc.internal.resolver.configSelector")
// SetConfigSelector sets the config selector in state and returns the new
// state.
func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
state.Attributes = state.Attributes.WithValues(csKey, cs)
state.Attributes = state.Attributes.WithValue(csKey, cs)
return state
}

View File

@@ -37,7 +37,17 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolv
if target.Authority != "" {
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.Authority)
}
addr := resolver.Address{Addr: target.Endpoint}
// gRPC was parsing the dial target manually before PR #4817, and we
// switched to using url.Parse() in that PR. To avoid breaking existing
// resolver implementations we ended up stripping the leading "/" from the
// endpoint. This obviously does not work for the "unix" scheme. Hence we
// end up using the parsed URL instead.
endpoint := target.URL.Path
if endpoint == "" {
endpoint = target.URL.Opaque
}
addr := resolver.Address{Addr: endpoint}
if b.scheme == unixAbstractScheme {
// prepend "\x00" to address for unix-abstract
addr.Addr = "\x00" + addr.Addr

View File

@@ -133,6 +133,7 @@ type cleanupStream struct {
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
type earlyAbortStream struct {
httpStatus uint32
streamID uint32
contentSubtype string
status *status.Status
@@ -771,9 +772,12 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
if l.side == clientSide {
return errors.New("earlyAbortStream not handled on client")
}
// In case the caller forgets to set the http status, default to 200.
if eas.httpStatus == 0 {
eas.httpStatus = 200
}
headerFields := []hpack.HeaderField{
{Name: ":status", Value: "200"},
{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},

View File

@@ -136,12 +136,10 @@ type inFlow struct {
// newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) uint32 {
func (f *inFlow) newLimit(n uint32) {
f.mu.Lock()
d := n - f.limit
f.limit = n
f.mu.Unlock()
return d
}
func (f *inFlow) maybeAdjust(n uint32) uint32 {

View File

@@ -25,6 +25,7 @@ import (
"math"
"net"
"net/http"
"path/filepath"
"strconv"
"strings"
"sync"
@@ -146,13 +147,20 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
address := addr.Addr
networkType, ok := networktype.Get(addr)
if fn != nil {
// Special handling for unix scheme with custom dialer. Back in the day,
// we did not have a unix resolver and therefore targets with a unix
// scheme would end up using the passthrough resolver. So, user's used a
// custom dialer in this case and expected the original dial target to
// be passed to the custom dialer. Now, we have a unix resolver. But if
// a custom dialer is specified, we want to retain the old behavior in
// terms of the address being passed to the custom dialer.
if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
// For backward compatibility, if the user dialed "unix:///path",
// the passthrough resolver would be used and the user's custom
// dialer would see "unix:///path". Since the unix resolver is used
// and the address is now "/path", prepend "unix://" so the user's
// custom dialer sees the same address.
return fn(ctx, "unix://"+address)
// Supported unix targets are either "unix://absolute-path" or
// "unix:relative-path".
if filepath.IsAbs(address) {
return fn(ctx, "unix://"+address)
}
return fn(ctx, "unix:"+address)
}
return fn(ctx, address)
}
@@ -193,6 +201,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}()
// gRPC, resolver, balancer etc. can specify arbitrary data in the
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the dialer and credential handshaker. This makes it possible for
// address specific arbitrary data to reach custom dialers and credential handshakers.
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
if err != nil {
if opts.FailOnNonTempDialError {
@@ -237,11 +251,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}
if transportCreds != nil {
// gRPC, resolver, balancer etc. can specify arbitrary data in the
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker.
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
rawConn := conn
// Pull the deadline from the connectCtx, which will be used for
// timeouts in the authentication protocol handshake. Can ignore the
@@ -579,7 +588,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
return nil, err
}
return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
}
for k, v := range data {
// Capital header names are illegal in HTTP/2.
@@ -1073,7 +1082,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
if f.StreamEnded() {
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
}
@@ -1403,26 +1412,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
isHeader := false
defer func() {
if t.statsHandler != nil {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Header: s.header.Copy(),
Compression: s.recvCompress,
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: s.trailer.Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
}
}()
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
@@ -1444,6 +1433,25 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
close(s.headerChan)
}
if t.statsHandler != nil {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
}
if !endStream {
return
}
@@ -1549,7 +1557,7 @@ func minTime(a, b time.Duration) time.Duration {
return b
}
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.

View File

@@ -73,7 +73,6 @@ type http2Server struct {
writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
framer *framer
@@ -123,13 +122,18 @@ type http2Server struct {
bufferPool *bufferPool
connectionID uint64
// maxStreamMu guards the maximum stream ID
// This lock may not be taken if mu is already held.
maxStreamMu sync.Mutex
maxStreamID uint32 // max stream ID ever seen
}
// NewServerTransport creates a http2 transport with conn and configuration
// options from config.
//
// It returns a non-nil transport and a nil error on success. On failure, it
// returns a non-nil transport and a nil-error. For a special case where the
// returns a nil transport and a non-nil error. For a special case where the
// underlying conn gets closed before the client preface could be read, it
// returns a nil transport and a nil error.
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
@@ -290,10 +294,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if _, err := io.ReadFull(t.conn, preface); err != nil {
// In deployments where a gRPC server runs behind a cloud load balancer
// which performs regular TCP level health checks, the connection is
// closed immediately by the latter. Skipping the error here will help
// reduce log clutter.
// closed immediately by the latter. Returning io.EOF here allows the
// grpc server implementation to recognize this scenario and suppress
// logging to reduce spam.
if err == io.EOF {
return nil, nil
return nil, io.EOF
}
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
@@ -333,6 +338,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
streamID := frame.Header().StreamID
// frame.Truncated is set to true when framer detects that the current header
@@ -347,6 +356,15 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
}
t.maxStreamID = streamID
buf := newRecvBuffer()
s := &Stream{
id: streamID,
@@ -354,7 +372,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
@@ -390,6 +407,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if timeout, err = decodeTimeout(hf.Value); err != nil {
headerError = true
}
// "Transports must consider requests containing the Connection header
// as malformed." - A41
case "connection":
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
}
headerError = true
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
@@ -404,6 +428,25 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
}
// "If multiple Host headers or multiple :authority headers are present, the
// request must be rejected with an HTTP status code 400 as required by Host
// validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
// error, this takes precedence over a client not speaking gRPC.
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
if logger.V(logLevel) {
logger.Errorf("transport: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 400,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
})
return false
}
if !isGRPC || headerError {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
@@ -414,6 +457,19 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return false
}
// "If :authority is missing, Host must be renamed to :authority." - A41
if len(mdata[":authority"]) == 0 {
// No-op if host isn't present, no eventual :authority header is a valid
// RPC.
if host, ok := mdata["host"]; ok {
mdata[":authority"] = host
delete(mdata, "host")
}
} else {
// "If :authority is present, Host must be discarded" - A41
delete(mdata, "host")
}
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
@@ -458,16 +514,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
s.cancel()
return true
}
t.maxStreamID = streamID
if httpMethod != http.MethodPost {
t.mu.Unlock()
if logger.V(logLevel) {
@@ -494,6 +540,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
stat = status.New(codes.PermissionDenied, err.Error())
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 200,
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
@@ -734,7 +781,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
if f.StreamEnded() {
// Received the end of stream from the client.
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
@@ -1252,20 +1299,23 @@ var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.maxStreamMu.Lock()
t.mu.Lock()
if t.state == closing { // TODO(mmukhi): This seems unnecessary.
t.mu.Unlock()
t.maxStreamMu.Unlock()
// The transport is closing.
return false, ErrConnClosing
}
sid := t.maxStreamID
if !g.headsUp {
// Stop accepting more streams now.
t.state = draining
sid := t.maxStreamID
if len(t.activeStreams) == 0 {
g.closeConn = true
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
@@ -1278,6 +1328,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return true, nil
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have

View File

@@ -31,7 +31,7 @@ const key = keyType("grpc.internal.transport.networktype")
// Set returns a copy of the provided address with attributes containing networkType.
func Set(address resolver.Address, networkType string) resolver.Address {
address.Attributes = address.Attributes.WithValues(key, networkType)
address.Attributes = address.Attributes.WithValue(key, networkType)
return address
}

View File

@@ -37,7 +37,7 @@ var (
httpProxyFromEnvironment = http.ProxyFromEnvironment
)
func mapAddress(ctx context.Context, address string) (*url.URL, error) {
func mapAddress(address string) (*url.URL, error) {
req := &http.Request{
URL: &url.URL{
Scheme: "https",
@@ -114,7 +114,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
// connection.
func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
newAddr := addr
proxyURL, err := mapAddress(ctx, addr)
proxyURL, err := mapAddress(addr)
if err != nil {
return nil, err
}

View File

@@ -1,95 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package env acts a single source of definition for all environment variables
// related to the xDS implementation in gRPC.
package env
import (
"os"
"strings"
)
const (
// BootstrapFileNameEnv is the env variable to set bootstrap file name.
// Do not use this and read from env directly. Its value is read and kept in
// variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
// BootstrapFileContentEnv is the env variable to set bootstrapp file
// content. Do not use this and read from env directly. Its value is read
// and kept in variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
retrySupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RBAC"
c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
)
var (
// BootstrapFileName holds the name of the file which contains xDS bootstrap
// configuration. Users can specify the location of the bootstrap file by
// setting the environment variable "GRPC_XDS_BOOTSTRAP".
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileName = os.Getenv(BootstrapFileNameEnv)
// BootstrapFileContent holds the content of the xDS bootstrap
// configuration. Users can specify the bootstrap config by
// setting the environment variable "GRPC_XDS_BOOTSTRAP_CONFIG".
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)
// RingHashSupport indicates whether ring hash support is enabled, which can
// be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
RingHashSupport = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
ClientSideSecuritySupport = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false")
// AggregateAndDNSSupportEnv indicates whether processing of aggregated
// cluster and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
// RetrySupport indicates whether xDS retry is enabled.
RetrySupport = !strings.EqualFold(os.Getenv(retrySupportEnv), "false")
// RBACSupport indicates whether xDS configured RBAC HTTP Filter is enabled.
RBACSupport = strings.EqualFold(os.Getenv(rbacSupportEnv), "true")
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
)

View File

@@ -28,7 +28,7 @@ type handshakeClusterNameKey struct{}
// SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field
// is updated with the cluster name.
func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address {
addr.Attributes = addr.Attributes.WithValues(handshakeClusterNameKey{}, clusterName)
addr.Attributes = addr.Attributes.WithValue(handshakeClusterNameKey{}, clusterName)
return addr
}

View File

@@ -144,7 +144,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
if !ok {
logger.Error("subconn returned from pick is not *acBalancerWrapper")
logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
continue
}
if t := acw.getAddrConn().getReadyTransport(); t != nil {

View File

@@ -125,7 +125,7 @@ func (b *pickfirstBalancer) Close() {
}
func (b *pickfirstBalancer) ExitIdle() {
if b.state == connectivity.Idle {
if b.sc != nil && b.state == connectivity.Idle {
b.sc.Connect()
}
}

View File

@@ -102,8 +102,8 @@ done
# The go_package option in grpc/lookup/v1/rls.proto doesn't match the
# current location. Move it into the right place.
mkdir -p ${WORKDIR}/out/google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1
mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1
mkdir -p ${WORKDIR}/out/google.golang.org/grpc/internal/proto/grpc_lookup_v1
mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/google.golang.org/grpc/internal/proto/grpc_lookup_v1
# grpc_testingv3/testv3.pb.go is not re-generated because it was
# intentionally generated by an older version of protoc-gen-go.

109
vendor/google.golang.org/grpc/resolver/map.go generated vendored Normal file
View File

@@ -0,0 +1,109 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package resolver
type addressMapEntry struct {
addr Address
value interface{}
}
// AddressMap is a map of addresses to arbitrary values taking into account
// Attributes. BalancerAttributes are ignored, as are Metadata and Type.
// Multiple accesses may not be performed concurrently. Must be created via
// NewAddressMap; do not construct directly.
type AddressMap struct {
m map[string]addressMapEntryList
}
type addressMapEntryList []*addressMapEntry
// NewAddressMap creates a new AddressMap.
func NewAddressMap() *AddressMap {
return &AddressMap{m: make(map[string]addressMapEntryList)}
}
// find returns the index of addr in the addressMapEntry slice, or -1 if not
// present.
func (l addressMapEntryList) find(addr Address) int {
if len(l) == 0 {
return -1
}
for i, entry := range l {
if entry.addr.ServerName == addr.ServerName &&
entry.addr.Attributes.Equal(addr.Attributes) {
return i
}
}
return -1
}
// Get returns the value for the address in the map, if present.
func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) {
entryList := a.m[addr.Addr]
if entry := entryList.find(addr); entry != -1 {
return entryList[entry].value, true
}
return nil, false
}
// Set updates or adds the value to the address in the map.
func (a *AddressMap) Set(addr Address, value interface{}) {
entryList := a.m[addr.Addr]
if entry := entryList.find(addr); entry != -1 {
a.m[addr.Addr][entry].value = value
return
}
a.m[addr.Addr] = append(a.m[addr.Addr], &addressMapEntry{addr: addr, value: value})
}
// Delete removes addr from the map.
func (a *AddressMap) Delete(addr Address) {
entryList := a.m[addr.Addr]
entry := entryList.find(addr)
if entry == -1 {
return
}
if len(entryList) == 1 {
entryList = nil
} else {
copy(entryList[entry:], entryList[entry+1:])
entryList = entryList[:len(entryList)-1]
}
a.m[addr.Addr] = entryList
}
// Len returns the number of entries in the map.
func (a *AddressMap) Len() int {
ret := 0
for _, entryList := range a.m {
ret += len(entryList)
}
return ret
}
// Keys returns a slice of all current map keys.
func (a *AddressMap) Keys() []Address {
ret := make([]Address, 0, a.Len())
for _, entryList := range a.m {
for _, entry := range entryList {
ret = append(ret, entry.addr)
}
}
return ret
}

View File

@@ -23,6 +23,7 @@ package resolver
import (
"context"
"net"
"net/url"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
@@ -116,9 +117,14 @@ type Address struct {
ServerName string
// Attributes contains arbitrary data about this address intended for
// consumption by the load balancing policy.
// consumption by the SubConn.
Attributes *attributes.Attributes
// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attribes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes
// Type is the type of this address.
//
// Deprecated: use Attributes instead.
@@ -131,6 +137,15 @@ type Address struct {
Metadata interface{}
}
// Equal returns whether a and o are identical. Metadata is compared directly,
// not with any recursive introspection.
func (a *Address) Equal(o Address) bool {
return a.Addr == o.Addr && a.ServerName == o.ServerName &&
a.Attributes.Equal(o.Attributes) &&
a.BalancerAttributes.Equal(o.BalancerAttributes) &&
a.Type == o.Type && a.Metadata == o.Metadata
}
// BuildOptions includes additional information for the builder to create
// the resolver.
type BuildOptions struct {
@@ -204,25 +219,36 @@ type ClientConn interface {
// Target represents a target for gRPC, as specified in:
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// It is parsed from the target string that gets passed into Dial or DialContext by the user. And
// grpc passes it to the resolver and the balancer.
// It is parsed from the target string that gets passed into Dial or DialContext
// by the user. And gRPC passes it to the resolver and the balancer.
//
// If the target follows the naming spec, and the parsed scheme is registered with grpc, we will
// parse the target string according to the spec. e.g. "dns://some_authority/foo.bar" will be parsed
// into &Target{Scheme: "dns", Authority: "some_authority", Endpoint: "foo.bar"}
// If the target follows the naming spec, and the parsed scheme is registered
// with gRPC, we will parse the target string according to the spec. If the
// target does not contain a scheme or if the parsed scheme is not registered
// (i.e. no corresponding resolver available to resolve the endpoint), we will
// apply the default scheme, and will attempt to reparse it.
//
// If the target does not contain a scheme, we will apply the default scheme, and set the Target to
// be the full target string. e.g. "foo.bar" will be parsed into
// &Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "foo.bar"}.
// Examples:
//
// If the parsed scheme is not registered (i.e. no corresponding resolver available to resolve the
// endpoint), we set the Scheme to be the default scheme, and set the Endpoint to be the full target
// string. e.g. target string "unknown_scheme://authority/endpoint" will be parsed into
// &Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"}.
// - "dns://some_authority/foo.bar"
// Target{Scheme: "dns", Authority: "some_authority", Endpoint: "foo.bar"}
// - "foo.bar"
// Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "foo.bar"}
// - "unknown_scheme://authority/endpoint"
// Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"}
type Target struct {
Scheme string
// Deprecated: use URL.Scheme instead.
Scheme string
// Deprecated: use URL.Host instead.
Authority string
Endpoint string
// Deprecated: use URL.Path or URL.Opaque instead. The latter is set when
// the former is empty.
Endpoint string
// URL contains the parsed dial target with an optional default scheme added
// to it if the original dial target contained no scheme or contained an
// unregistered scheme. Any query params specified in the original dial
// target can be accessed from here.
URL url.URL
}
// Builder creates a resolver that will be used to watch name resolution updates.

View File

@@ -712,13 +712,11 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
} else {
size = len(d)
}
if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize)
if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
}
return d, nil
}

View File

@@ -885,13 +885,11 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {
c.Close()
}
// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
if err != credentials.ErrConnDispatched {
// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
if err != io.EOF {
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
}
c.Close()
}
return nil
}
@@ -1106,16 +1104,21 @@ func chainUnaryServerInterceptors(s *Server) {
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
var i int
var next UnaryHandler
next = func(ctx context.Context, req interface{}) (interface{}, error) {
if i == len(interceptors)-1 {
return interceptors[i](ctx, req, info, handler)
}
i++
return interceptors[i-1](ctx, req, info, next)
// the struct ensures the variables are allocated together, rather than separately, since we
// know they should be garbage collected together. This saves 1 allocation and decreases
// time/call by about 10% on the microbenchmark.
var state struct {
i int
next UnaryHandler
}
return next(ctx, req)
state.next = func(ctx context.Context, req interface{}) (interface{}, error) {
if state.i == len(interceptors)-1 {
return interceptors[state.i](ctx, req, info, handler)
}
state.i++
return interceptors[state.i-1](ctx, req, info, state.next)
}
return state.next(ctx, req)
}
}
@@ -1391,16 +1394,21 @@ func chainStreamServerInterceptors(s *Server) {
func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
var i int
var next StreamHandler
next = func(srv interface{}, ss ServerStream) error {
if i == len(interceptors)-1 {
return interceptors[i](srv, ss, info, handler)
}
i++
return interceptors[i-1](srv, ss, info, next)
// the struct ensures the variables are allocated together, rather than separately, since we
// know they should be garbage collected together. This saves 1 allocation and decreases
// time/call by about 10% on the microbenchmark.
var state struct {
i int
next StreamHandler
}
return next(srv, ss)
state.next = func(srv interface{}, ss ServerStream) error {
if state.i == len(interceptors)-1 {
return interceptors[state.i](srv, ss, info, handler)
}
state.i++
return interceptors[state.i-1](srv, ss, info, state.next)
}
return state.next(srv, ss)
}
}

View File

@@ -29,6 +29,7 @@ package status
import (
"context"
"errors"
"fmt"
spb "google.golang.org/genproto/googleapis/rpc/status"
@@ -73,11 +74,16 @@ func FromProto(s *spb.Status) *Status {
return status.FromProto(s)
}
// FromError returns a Status representing err if it was produced by this
// package or has a method `GRPCStatus() *Status`.
// If err is nil, a Status is returned with codes.OK and no message.
// Otherwise, ok is false and a Status is returned with codes.Unknown and
// the original error message.
// FromError returns a Status representation of err.
//
// - If err was produced by this package or implements the method `GRPCStatus()
// *Status`, the appropriate Status is returned.
//
// - If err is nil, a Status is returned with codes.OK and no message.
//
// - Otherwise, err is an error not compatible with this package. In this
// case, a Status is returned with codes.Unknown and err's Error() message,
// and ok is false.
func FromError(err error) (s *Status, ok bool) {
if err == nil {
return nil, true
@@ -112,18 +118,18 @@ func Code(err error) codes.Code {
return codes.Unknown
}
// FromContextError converts a context error into a Status. It returns a
// Status with codes.OK if err is nil, or a Status with codes.Unknown if err is
// non-nil and not a context error.
// FromContextError converts a context error or wrapped context error into a
// Status. It returns a Status with codes.OK if err is nil, or a Status with
// codes.Unknown if err is non-nil and not a context error.
func FromContextError(err error) *Status {
switch err {
case nil:
if err == nil {
return nil
case context.DeadlineExceeded:
return New(codes.DeadlineExceeded, err.Error())
case context.Canceled:
return New(codes.Canceled, err.Error())
default:
return New(codes.Unknown, err.Error())
}
if errors.Is(err, context.DeadlineExceeded) {
return New(codes.DeadlineExceeded, err.Error())
}
if errors.Is(err, context.Canceled) {
return New(codes.Canceled, err.Error())
}
return New(codes.Unknown, err.Error())
}

View File

@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.41.0"
const Version = "1.43.0"