385 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			385 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2022 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| // Package gracefulswitch implements a graceful switch load balancer.
 | |
| package gracefulswitch
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 
 | |
| 	"google.golang.org/grpc/balancer"
 | |
| 	"google.golang.org/grpc/balancer/base"
 | |
| 	"google.golang.org/grpc/connectivity"
 | |
| 	"google.golang.org/grpc/resolver"
 | |
| )
 | |
| 
 | |
| var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
 | |
| var _ balancer.Balancer = (*Balancer)(nil)
 | |
| 
 | |
| // NewBalancer returns a graceful switch Balancer.
 | |
| func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
 | |
| 	return &Balancer{
 | |
| 		cc:    cc,
 | |
| 		bOpts: opts,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Balancer is a utility to gracefully switch from one balancer to
 | |
| // a new balancer. It implements the balancer.Balancer interface.
 | |
| type Balancer struct {
 | |
| 	bOpts balancer.BuildOptions
 | |
| 	cc    balancer.ClientConn
 | |
| 
 | |
| 	// mu protects the following fields and all fields within balancerCurrent
 | |
| 	// and balancerPending. mu does not need to be held when calling into the
 | |
| 	// child balancers, as all calls into these children happen only as a direct
 | |
| 	// result of a call into the gracefulSwitchBalancer, which are also
 | |
| 	// guaranteed to be synchronous. There is one exception: an UpdateState call
 | |
| 	// from a child balancer when current and pending are populated can lead to
 | |
| 	// calling Close() on the current. To prevent that racing with an
 | |
| 	// UpdateSubConnState from the channel, we hold currentMu during Close and
 | |
| 	// UpdateSubConnState calls.
 | |
| 	mu              sync.Mutex
 | |
| 	balancerCurrent *balancerWrapper
 | |
| 	balancerPending *balancerWrapper
 | |
| 	closed          bool // set to true when this balancer is closed
 | |
| 
 | |
| 	// currentMu must be locked before mu. This mutex guards against this
 | |
| 	// sequence of events: UpdateSubConnState() called, finds the
 | |
| 	// balancerCurrent, gives up lock, updateState comes in, causes Close() on
 | |
| 	// balancerCurrent before the UpdateSubConnState is called on the
 | |
| 	// balancerCurrent.
 | |
| 	currentMu sync.Mutex
 | |
| }
 | |
| 
 | |
| // swap swaps out the current lb with the pending lb and updates the ClientConn.
 | |
| // The caller must hold gsb.mu.
 | |
| func (gsb *Balancer) swap() {
 | |
| 	gsb.cc.UpdateState(gsb.balancerPending.lastState)
 | |
| 	cur := gsb.balancerCurrent
 | |
| 	gsb.balancerCurrent = gsb.balancerPending
 | |
| 	gsb.balancerPending = nil
 | |
| 	go func() {
 | |
| 		gsb.currentMu.Lock()
 | |
| 		defer gsb.currentMu.Unlock()
 | |
| 		cur.Close()
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // Helper function that checks if the balancer passed in is current or pending.
 | |
| // The caller must hold gsb.mu.
 | |
| func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
 | |
| 	return bw == gsb.balancerCurrent || bw == gsb.balancerPending
 | |
| }
 | |
| 
 | |
| // SwitchTo initializes the graceful switch process, which completes based on
 | |
| // connectivity state changes on the current/pending balancer. Thus, the switch
 | |
| // process is not complete when this method returns. This method must be called
 | |
| // synchronously alongside the rest of the balancer.Balancer methods this
 | |
| // Graceful Switch Balancer implements.
 | |
| func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
 | |
| 	gsb.mu.Lock()
 | |
| 	if gsb.closed {
 | |
| 		gsb.mu.Unlock()
 | |
| 		return errBalancerClosed
 | |
| 	}
 | |
| 	bw := &balancerWrapper{
 | |
| 		gsb: gsb,
 | |
| 		lastState: balancer.State{
 | |
| 			ConnectivityState: connectivity.Connecting,
 | |
| 			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
 | |
| 		},
 | |
| 		subconns: make(map[balancer.SubConn]bool),
 | |
| 	}
 | |
| 	balToClose := gsb.balancerPending // nil if there is no pending balancer
 | |
| 	if gsb.balancerCurrent == nil {
 | |
| 		gsb.balancerCurrent = bw
 | |
| 	} else {
 | |
| 		gsb.balancerPending = bw
 | |
| 	}
 | |
| 	gsb.mu.Unlock()
 | |
| 	balToClose.Close()
 | |
| 	// This function takes a builder instead of a balancer because builder.Build
 | |
| 	// can call back inline, and this utility needs to handle the callbacks.
 | |
| 	newBalancer := builder.Build(bw, gsb.bOpts)
 | |
| 	if newBalancer == nil {
 | |
| 		// This is illegal and should never happen; we clear the balancerWrapper
 | |
| 		// we were constructing if it happens to avoid a potential panic.
 | |
| 		gsb.mu.Lock()
 | |
| 		if gsb.balancerPending != nil {
 | |
| 			gsb.balancerPending = nil
 | |
| 		} else {
 | |
| 			gsb.balancerCurrent = nil
 | |
| 		}
 | |
| 		gsb.mu.Unlock()
 | |
| 		return balancer.ErrBadResolverState
 | |
| 	}
 | |
| 
 | |
| 	// This write doesn't need to take gsb.mu because this field never gets read
 | |
| 	// or written to on any calls from the current or pending. Calls from grpc
 | |
| 	// to this balancer are guaranteed to be called synchronously, so this
 | |
| 	// bw.Balancer field will never be forwarded to until this SwitchTo()
 | |
| 	// function returns.
 | |
| 	bw.Balancer = newBalancer
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Returns nil if the graceful switch balancer is closed.
 | |
| func (gsb *Balancer) latestBalancer() *balancerWrapper {
 | |
| 	gsb.mu.Lock()
 | |
| 	defer gsb.mu.Unlock()
 | |
| 	if gsb.balancerPending != nil {
 | |
| 		return gsb.balancerPending
 | |
| 	}
 | |
| 	return gsb.balancerCurrent
 | |
| }
 | |
| 
 | |
| // UpdateClientConnState forwards the update to the latest balancer created.
 | |
| func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
 | |
| 	// The resolver data is only relevant to the most recent LB Policy.
 | |
| 	balToUpdate := gsb.latestBalancer()
 | |
| 	if balToUpdate == nil {
 | |
| 		return errBalancerClosed
 | |
| 	}
 | |
| 	// Perform this call without gsb.mu to prevent deadlocks if the child calls
 | |
| 	// back into the channel. The latest balancer can never be closed during a
 | |
| 	// call from the channel, even without gsb.mu held.
 | |
| 	return balToUpdate.UpdateClientConnState(state)
 | |
| }
 | |
| 
 | |
| // ResolverError forwards the error to the latest balancer created.
 | |
| func (gsb *Balancer) ResolverError(err error) {
 | |
| 	// The resolver data is only relevant to the most recent LB Policy.
 | |
| 	balToUpdate := gsb.latestBalancer()
 | |
| 	if balToUpdate == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// Perform this call without gsb.mu to prevent deadlocks if the child calls
 | |
| 	// back into the channel. The latest balancer can never be closed during a
 | |
| 	// call from the channel, even without gsb.mu held.
 | |
| 	balToUpdate.ResolverError(err)
 | |
| }
 | |
| 
 | |
| // ExitIdle forwards the call to the latest balancer created.
 | |
| //
 | |
| // If the latest balancer does not support ExitIdle, the subConns are
 | |
| // re-connected to manually.
 | |
| func (gsb *Balancer) ExitIdle() {
 | |
| 	balToUpdate := gsb.latestBalancer()
 | |
| 	if balToUpdate == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// There is no need to protect this read with a mutex, as the write to the
 | |
| 	// Balancer field happens in SwitchTo, which completes before this can be
 | |
| 	// called.
 | |
| 	if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
 | |
| 		ei.ExitIdle()
 | |
| 		return
 | |
| 	}
 | |
| 	gsb.mu.Lock()
 | |
| 	defer gsb.mu.Unlock()
 | |
| 	for sc := range balToUpdate.subconns {
 | |
| 		sc.Connect()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // UpdateSubConnState forwards the update to the appropriate child.
 | |
| func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
 | |
| 	gsb.currentMu.Lock()
 | |
| 	defer gsb.currentMu.Unlock()
 | |
| 	gsb.mu.Lock()
 | |
| 	// Forward update to the appropriate child.  Even if there is a pending
 | |
| 	// balancer, the current balancer should continue to get SubConn updates to
 | |
| 	// maintain the proper state while the pending is still connecting.
 | |
| 	var balToUpdate *balancerWrapper
 | |
| 	if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
 | |
| 		balToUpdate = gsb.balancerCurrent
 | |
| 	} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
 | |
| 		balToUpdate = gsb.balancerPending
 | |
| 	}
 | |
| 	gsb.mu.Unlock()
 | |
| 	if balToUpdate == nil {
 | |
| 		// SubConn belonged to a stale lb policy that has not yet fully closed,
 | |
| 		// or the balancer was already closed.
 | |
| 		return
 | |
| 	}
 | |
| 	balToUpdate.UpdateSubConnState(sc, state)
 | |
| }
 | |
| 
 | |
| // Close closes any active child balancers.
 | |
| func (gsb *Balancer) Close() {
 | |
| 	gsb.mu.Lock()
 | |
| 	gsb.closed = true
 | |
| 	currentBalancerToClose := gsb.balancerCurrent
 | |
| 	gsb.balancerCurrent = nil
 | |
| 	pendingBalancerToClose := gsb.balancerPending
 | |
| 	gsb.balancerPending = nil
 | |
| 	gsb.mu.Unlock()
 | |
| 
 | |
| 	currentBalancerToClose.Close()
 | |
| 	pendingBalancerToClose.Close()
 | |
| }
 | |
| 
 | |
| // balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
 | |
| // methods to help cleanup SubConns created by the wrapped balancer.
 | |
| //
 | |
| // It implements the balancer.ClientConn interface and is passed down in that
 | |
| // capacity to the wrapped balancer. It maintains a set of subConns created by
 | |
| // the wrapped balancer and calls from the latter to create/update/remove
 | |
| // SubConns update this set before being forwarded to the parent ClientConn.
 | |
| // State updates from the wrapped balancer can result in invocation of the
 | |
| // graceful switch logic.
 | |
| type balancerWrapper struct {
 | |
| 	balancer.Balancer
 | |
| 	gsb *Balancer
 | |
| 
 | |
| 	lastState balancer.State
 | |
| 	subconns  map[balancer.SubConn]bool // subconns created by this balancer
 | |
| }
 | |
| 
 | |
| func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
 | |
| 	if state.ConnectivityState == connectivity.Shutdown {
 | |
| 		bw.gsb.mu.Lock()
 | |
| 		delete(bw.subconns, sc)
 | |
| 		bw.gsb.mu.Unlock()
 | |
| 	}
 | |
| 	// There is no need to protect this read with a mutex, as the write to the
 | |
| 	// Balancer field happens in SwitchTo, which completes before this can be
 | |
| 	// called.
 | |
| 	bw.Balancer.UpdateSubConnState(sc, state)
 | |
| }
 | |
| 
 | |
| // Close closes the underlying LB policy and removes the subconns it created. bw
 | |
| // must not be referenced via balancerCurrent or balancerPending in gsb when
 | |
| // called. gsb.mu must not be held.  Does not panic with a nil receiver.
 | |
| func (bw *balancerWrapper) Close() {
 | |
| 	// before Close is called.
 | |
| 	if bw == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// There is no need to protect this read with a mutex, as Close() is
 | |
| 	// impossible to be called concurrently with the write in SwitchTo(). The
 | |
| 	// callsites of Close() for this balancer in Graceful Switch Balancer will
 | |
| 	// never be called until SwitchTo() returns.
 | |
| 	bw.Balancer.Close()
 | |
| 	bw.gsb.mu.Lock()
 | |
| 	for sc := range bw.subconns {
 | |
| 		bw.gsb.cc.RemoveSubConn(sc)
 | |
| 	}
 | |
| 	bw.gsb.mu.Unlock()
 | |
| }
 | |
| 
 | |
| func (bw *balancerWrapper) UpdateState(state balancer.State) {
 | |
| 	// Hold the mutex for this entire call to ensure it cannot occur
 | |
| 	// concurrently with other updateState() calls. This causes updates to
 | |
| 	// lastState and calls to cc.UpdateState to happen atomically.
 | |
| 	bw.gsb.mu.Lock()
 | |
| 	defer bw.gsb.mu.Unlock()
 | |
| 	bw.lastState = state
 | |
| 
 | |
| 	if !bw.gsb.balancerCurrentOrPending(bw) {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if bw == bw.gsb.balancerCurrent {
 | |
| 		// In the case that the current balancer exits READY, and there is a pending
 | |
| 		// balancer, you can forward the pending balancer's cached State up to
 | |
| 		// ClientConn and swap the pending into the current. This is because there
 | |
| 		// is no reason to gracefully switch from and keep using the old policy as
 | |
| 		// the ClientConn is not connected to any backends.
 | |
| 		if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
 | |
| 			bw.gsb.swap()
 | |
| 			return
 | |
| 		}
 | |
| 		// Even if there is a pending balancer waiting to be gracefully switched to,
 | |
| 		// continue to forward current balancer updates to the Client Conn. Ignoring
 | |
| 		// state + picker from the current would cause undefined behavior/cause the
 | |
| 		// system to behave incorrectly from the current LB policies perspective.
 | |
| 		// Also, the current LB is still being used by grpc to choose SubConns per
 | |
| 		// RPC, and thus should use the most updated form of the current balancer.
 | |
| 		bw.gsb.cc.UpdateState(state)
 | |
| 		return
 | |
| 	}
 | |
| 	// This method is now dealing with a state update from the pending balancer.
 | |
| 	// If the current balancer is currently in a state other than READY, the new
 | |
| 	// policy can be swapped into place immediately. This is because there is no
 | |
| 	// reason to gracefully switch from and keep using the old policy as the
 | |
| 	// ClientConn is not connected to any backends.
 | |
| 	if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
 | |
| 		bw.gsb.swap()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
 | |
| 	bw.gsb.mu.Lock()
 | |
| 	if !bw.gsb.balancerCurrentOrPending(bw) {
 | |
| 		bw.gsb.mu.Unlock()
 | |
| 		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
 | |
| 	}
 | |
| 	bw.gsb.mu.Unlock()
 | |
| 
 | |
| 	sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	bw.gsb.mu.Lock()
 | |
| 	if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
 | |
| 		bw.gsb.cc.RemoveSubConn(sc)
 | |
| 		bw.gsb.mu.Unlock()
 | |
| 		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
 | |
| 	}
 | |
| 	bw.subconns[sc] = true
 | |
| 	bw.gsb.mu.Unlock()
 | |
| 	return sc, nil
 | |
| }
 | |
| 
 | |
| func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
 | |
| 	// Ignore ResolveNow requests from anything other than the most recent
 | |
| 	// balancer, because older balancers were already removed from the config.
 | |
| 	if bw != bw.gsb.latestBalancer() {
 | |
| 		return
 | |
| 	}
 | |
| 	bw.gsb.cc.ResolveNow(opts)
 | |
| }
 | |
| 
 | |
| func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
 | |
| 	bw.gsb.mu.Lock()
 | |
| 	if !bw.gsb.balancerCurrentOrPending(bw) {
 | |
| 		bw.gsb.mu.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 	bw.gsb.mu.Unlock()
 | |
| 	bw.gsb.cc.RemoveSubConn(sc)
 | |
| }
 | |
| 
 | |
| func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
 | |
| 	bw.gsb.mu.Lock()
 | |
| 	if !bw.gsb.balancerCurrentOrPending(bw) {
 | |
| 		bw.gsb.mu.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 	bw.gsb.mu.Unlock()
 | |
| 	bw.gsb.cc.UpdateAddresses(sc, addrs)
 | |
| }
 | |
| 
 | |
| func (bw *balancerWrapper) Target() string {
 | |
| 	return bw.gsb.cc.Target()
 | |
| }
 | 
