368 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			368 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 *
 | 
						|
 * Copyright 2017 gRPC authors.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 *
 | 
						|
 */
 | 
						|
 | 
						|
package grpc
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"golang.org/x/net/context"
 | 
						|
	"google.golang.org/grpc/balancer"
 | 
						|
	"google.golang.org/grpc/codes"
 | 
						|
	"google.golang.org/grpc/connectivity"
 | 
						|
	"google.golang.org/grpc/grpclog"
 | 
						|
	"google.golang.org/grpc/resolver"
 | 
						|
)
 | 
						|
 | 
						|
type balancerWrapperBuilder struct {
 | 
						|
	b Balancer // The v1 balancer.
 | 
						|
}
 | 
						|
 | 
						|
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
 | 
						|
	bwb.b.Start(cc.Target(), BalancerConfig{
 | 
						|
		DialCreds: opts.DialCreds,
 | 
						|
		Dialer:    opts.Dialer,
 | 
						|
	})
 | 
						|
	_, pickfirst := bwb.b.(*pickFirst)
 | 
						|
	bw := &balancerWrapper{
 | 
						|
		balancer:  bwb.b,
 | 
						|
		pickfirst: pickfirst,
 | 
						|
		cc:        cc,
 | 
						|
		startCh:   make(chan struct{}),
 | 
						|
		conns:     make(map[resolver.Address]balancer.SubConn),
 | 
						|
		connSt:    make(map[balancer.SubConn]*scState),
 | 
						|
		csEvltr:   &connectivityStateEvaluator{},
 | 
						|
		state:     connectivity.Idle,
 | 
						|
	}
 | 
						|
	cc.UpdateBalancerState(connectivity.Idle, bw)
 | 
						|
	go bw.lbWatcher()
 | 
						|
	return bw
 | 
						|
}
 | 
						|
 | 
						|
func (bwb *balancerWrapperBuilder) Name() string {
 | 
						|
	return "wrapper"
 | 
						|
}
 | 
						|
 | 
						|
type scState struct {
 | 
						|
	addr Address // The v1 address type.
 | 
						|
	s    connectivity.State
 | 
						|
	down func(error)
 | 
						|
}
 | 
						|
 | 
						|
type balancerWrapper struct {
 | 
						|
	balancer  Balancer // The v1 balancer.
 | 
						|
	pickfirst bool
 | 
						|
 | 
						|
	cc balancer.ClientConn
 | 
						|
 | 
						|
	// To aggregate the connectivity state.
 | 
						|
	csEvltr *connectivityStateEvaluator
 | 
						|
	state   connectivity.State
 | 
						|
 | 
						|
	mu     sync.Mutex
 | 
						|
	conns  map[resolver.Address]balancer.SubConn
 | 
						|
	connSt map[balancer.SubConn]*scState
 | 
						|
	// This channel is closed when handling the first resolver result.
 | 
						|
	// lbWatcher blocks until this is closed, to avoid race between
 | 
						|
	// - NewSubConn is created, cc wants to notify balancer of state changes;
 | 
						|
	// - Build hasn't return, cc doesn't have access to balancer.
 | 
						|
	startCh chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
// lbWatcher watches the Notify channel of the balancer and manages
 | 
						|
// connections accordingly.
 | 
						|
func (bw *balancerWrapper) lbWatcher() {
 | 
						|
	<-bw.startCh
 | 
						|
	grpclog.Infof("balancerWrapper: is pickfirst: %v\n", bw.pickfirst)
 | 
						|
	notifyCh := bw.balancer.Notify()
 | 
						|
	if notifyCh == nil {
 | 
						|
		// There's no resolver in the balancer. Connect directly.
 | 
						|
		a := resolver.Address{
 | 
						|
			Addr: bw.cc.Target(),
 | 
						|
			Type: resolver.Backend,
 | 
						|
		}
 | 
						|
		sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
 | 
						|
		if err != nil {
 | 
						|
			grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
 | 
						|
		} else {
 | 
						|
			bw.mu.Lock()
 | 
						|
			bw.conns[a] = sc
 | 
						|
			bw.connSt[sc] = &scState{
 | 
						|
				addr: Address{Addr: bw.cc.Target()},
 | 
						|
				s:    connectivity.Idle,
 | 
						|
			}
 | 
						|
			bw.mu.Unlock()
 | 
						|
			sc.Connect()
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	for addrs := range notifyCh {
 | 
						|
		grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
 | 
						|
		if bw.pickfirst {
 | 
						|
			var (
 | 
						|
				oldA  resolver.Address
 | 
						|
				oldSC balancer.SubConn
 | 
						|
			)
 | 
						|
			bw.mu.Lock()
 | 
						|
			for oldA, oldSC = range bw.conns {
 | 
						|
				break
 | 
						|
			}
 | 
						|
			bw.mu.Unlock()
 | 
						|
			if len(addrs) <= 0 {
 | 
						|
				if oldSC != nil {
 | 
						|
					// Teardown old sc.
 | 
						|
					bw.mu.Lock()
 | 
						|
					delete(bw.conns, oldA)
 | 
						|
					delete(bw.connSt, oldSC)
 | 
						|
					bw.mu.Unlock()
 | 
						|
					bw.cc.RemoveSubConn(oldSC)
 | 
						|
				}
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			var newAddrs []resolver.Address
 | 
						|
			for _, a := range addrs {
 | 
						|
				newAddr := resolver.Address{
 | 
						|
					Addr:       a.Addr,
 | 
						|
					Type:       resolver.Backend, // All addresses from balancer are all backends.
 | 
						|
					ServerName: "",
 | 
						|
					Metadata:   a.Metadata,
 | 
						|
				}
 | 
						|
				newAddrs = append(newAddrs, newAddr)
 | 
						|
			}
 | 
						|
			if oldSC == nil {
 | 
						|
				// Create new sc.
 | 
						|
				sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
 | 
						|
				if err != nil {
 | 
						|
					grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
 | 
						|
				} else {
 | 
						|
					bw.mu.Lock()
 | 
						|
					// For pickfirst, there should be only one SubConn, so the
 | 
						|
					// address doesn't matter. All states updating (up and down)
 | 
						|
					// and picking should all happen on that only SubConn.
 | 
						|
					bw.conns[resolver.Address{}] = sc
 | 
						|
					bw.connSt[sc] = &scState{
 | 
						|
						addr: addrs[0], // Use the first address.
 | 
						|
						s:    connectivity.Idle,
 | 
						|
					}
 | 
						|
					bw.mu.Unlock()
 | 
						|
					sc.Connect()
 | 
						|
				}
 | 
						|
			} else {
 | 
						|
				oldSC.UpdateAddresses(newAddrs)
 | 
						|
				bw.mu.Lock()
 | 
						|
				bw.connSt[oldSC].addr = addrs[0]
 | 
						|
				bw.mu.Unlock()
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			var (
 | 
						|
				add []resolver.Address // Addresses need to setup connections.
 | 
						|
				del []balancer.SubConn // Connections need to tear down.
 | 
						|
			)
 | 
						|
			resAddrs := make(map[resolver.Address]Address)
 | 
						|
			for _, a := range addrs {
 | 
						|
				resAddrs[resolver.Address{
 | 
						|
					Addr:       a.Addr,
 | 
						|
					Type:       resolver.Backend, // All addresses from balancer are all backends.
 | 
						|
					ServerName: "",
 | 
						|
					Metadata:   a.Metadata,
 | 
						|
				}] = a
 | 
						|
			}
 | 
						|
			bw.mu.Lock()
 | 
						|
			for a := range resAddrs {
 | 
						|
				if _, ok := bw.conns[a]; !ok {
 | 
						|
					add = append(add, a)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			for a, c := range bw.conns {
 | 
						|
				if _, ok := resAddrs[a]; !ok {
 | 
						|
					del = append(del, c)
 | 
						|
					delete(bw.conns, a)
 | 
						|
					// Keep the state of this sc in bw.connSt until its state becomes Shutdown.
 | 
						|
				}
 | 
						|
			}
 | 
						|
			bw.mu.Unlock()
 | 
						|
			for _, a := range add {
 | 
						|
				sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
 | 
						|
				if err != nil {
 | 
						|
					grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
 | 
						|
				} else {
 | 
						|
					bw.mu.Lock()
 | 
						|
					bw.conns[a] = sc
 | 
						|
					bw.connSt[sc] = &scState{
 | 
						|
						addr: resAddrs[a],
 | 
						|
						s:    connectivity.Idle,
 | 
						|
					}
 | 
						|
					bw.mu.Unlock()
 | 
						|
					sc.Connect()
 | 
						|
				}
 | 
						|
			}
 | 
						|
			for _, c := range del {
 | 
						|
				bw.cc.RemoveSubConn(c)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
 | 
						|
	grpclog.Infof("balancerWrapper: handle subconn state change: %p, %v", sc, s)
 | 
						|
	bw.mu.Lock()
 | 
						|
	defer bw.mu.Unlock()
 | 
						|
	scSt, ok := bw.connSt[sc]
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if s == connectivity.Idle {
 | 
						|
		sc.Connect()
 | 
						|
	}
 | 
						|
	oldS := scSt.s
 | 
						|
	scSt.s = s
 | 
						|
	if oldS != connectivity.Ready && s == connectivity.Ready {
 | 
						|
		scSt.down = bw.balancer.Up(scSt.addr)
 | 
						|
	} else if oldS == connectivity.Ready && s != connectivity.Ready {
 | 
						|
		if scSt.down != nil {
 | 
						|
			scSt.down(errConnClosing)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	sa := bw.csEvltr.recordTransition(oldS, s)
 | 
						|
	if bw.state != sa {
 | 
						|
		bw.state = sa
 | 
						|
	}
 | 
						|
	bw.cc.UpdateBalancerState(bw.state, bw)
 | 
						|
	if s == connectivity.Shutdown {
 | 
						|
		// Remove state for this sc.
 | 
						|
		delete(bw.connSt, sc)
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
 | 
						|
	bw.mu.Lock()
 | 
						|
	defer bw.mu.Unlock()
 | 
						|
	select {
 | 
						|
	case <-bw.startCh:
 | 
						|
	default:
 | 
						|
		close(bw.startCh)
 | 
						|
	}
 | 
						|
	// There should be a resolver inside the balancer.
 | 
						|
	// All updates here, if any, are ignored.
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (bw *balancerWrapper) Close() {
 | 
						|
	bw.mu.Lock()
 | 
						|
	defer bw.mu.Unlock()
 | 
						|
	select {
 | 
						|
	case <-bw.startCh:
 | 
						|
	default:
 | 
						|
		close(bw.startCh)
 | 
						|
	}
 | 
						|
	bw.balancer.Close()
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// The picker is the balancerWrapper itself.
 | 
						|
// Pick should never return ErrNoSubConnAvailable.
 | 
						|
// It either blocks or returns error, consistent with v1 balancer Get().
 | 
						|
func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
 | 
						|
	failfast := true // Default failfast is true.
 | 
						|
	if ss, ok := rpcInfoFromContext(ctx); ok {
 | 
						|
		failfast = ss.failfast
 | 
						|
	}
 | 
						|
	a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
	var done func(balancer.DoneInfo)
 | 
						|
	if p != nil {
 | 
						|
		done = func(i balancer.DoneInfo) { p() }
 | 
						|
	}
 | 
						|
	var sc balancer.SubConn
 | 
						|
	bw.mu.Lock()
 | 
						|
	defer bw.mu.Unlock()
 | 
						|
	if bw.pickfirst {
 | 
						|
		// Get the first sc in conns.
 | 
						|
		for _, sc = range bw.conns {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		var ok bool
 | 
						|
		sc, ok = bw.conns[resolver.Address{
 | 
						|
			Addr:       a.Addr,
 | 
						|
			Type:       resolver.Backend,
 | 
						|
			ServerName: "",
 | 
						|
			Metadata:   a.Metadata,
 | 
						|
		}]
 | 
						|
		if !ok && failfast {
 | 
						|
			return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
 | 
						|
		}
 | 
						|
		if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
 | 
						|
			// If the returned sc is not ready and RPC is failfast,
 | 
						|
			// return error, and this RPC will fail.
 | 
						|
			return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return sc, done, nil
 | 
						|
}
 | 
						|
 | 
						|
// connectivityStateEvaluator gets updated by addrConns when their
 | 
						|
// states transition, based on which it evaluates the state of
 | 
						|
// ClientConn.
 | 
						|
type connectivityStateEvaluator struct {
 | 
						|
	mu                  sync.Mutex
 | 
						|
	numReady            uint64 // Number of addrConns in ready state.
 | 
						|
	numConnecting       uint64 // Number of addrConns in connecting state.
 | 
						|
	numTransientFailure uint64 // Number of addrConns in transientFailure.
 | 
						|
}
 | 
						|
 | 
						|
// recordTransition records state change happening in every subConn and based on
 | 
						|
// that it evaluates what aggregated state should be.
 | 
						|
// It can only transition between Ready, Connecting and TransientFailure. Other states,
 | 
						|
// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
 | 
						|
// before any subConn is created ClientConn is in idle state. In the end when ClientConn
 | 
						|
// closes it is in Shutdown state.
 | 
						|
// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
 | 
						|
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
 | 
						|
	cse.mu.Lock()
 | 
						|
	defer cse.mu.Unlock()
 | 
						|
 | 
						|
	// Update counters.
 | 
						|
	for idx, state := range []connectivity.State{oldState, newState} {
 | 
						|
		updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
 | 
						|
		switch state {
 | 
						|
		case connectivity.Ready:
 | 
						|
			cse.numReady += updateVal
 | 
						|
		case connectivity.Connecting:
 | 
						|
			cse.numConnecting += updateVal
 | 
						|
		case connectivity.TransientFailure:
 | 
						|
			cse.numTransientFailure += updateVal
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Evaluate.
 | 
						|
	if cse.numReady > 0 {
 | 
						|
		return connectivity.Ready
 | 
						|
	}
 | 
						|
	if cse.numConnecting > 0 {
 | 
						|
		return connectivity.Connecting
 | 
						|
	}
 | 
						|
	return connectivity.TransientFailure
 | 
						|
}
 |