vendor: update protobuf, grpc and gogo

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day
2017-11-09 13:24:41 -08:00
parent 2b8ed96d2a
commit 35697865c0
121 changed files with 8472 additions and 4684 deletions

View File

@@ -1,33 +1,18 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
* Copyright 2016 gRPC authors.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
* http://www.apache.org/licenses/LICENSE-2.0
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
@@ -43,7 +28,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/naming"
@@ -74,41 +59,21 @@ type balanceLoadClientStream struct {
ClientStream
}
func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
func (x *balanceLoadClientStream) Send(m *lbmpb.LoadBalanceRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
m := new(lbpb.LoadBalanceResponse)
func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) {
m := new(lbmpb.LoadBalanceResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// AddressType indicates the address type returned by name resolution.
type AddressType uint8
const (
// Backend indicates the server is a backend server.
Backend AddressType = iota
// GRPCLB indicates the server is a grpclb load balancer.
GRPCLB
)
// AddrMetadataGRPCLB contains the information the name resolution for grpclb should provide. The
// name resolver used by grpclb balancer is required to provide this type of metadata in
// its address updates.
type AddrMetadataGRPCLB struct {
// AddrType is the type of server (grpc load balancer or backend).
AddrType AddressType
// ServerName is the name of the grpc load balancer. Used for authentication.
ServerName string
}
// NewGRPCLBBalancer creates a grpclb load balancer.
func NewGRPCLBBalancer(r naming.Resolver) Balancer {
return &balancer{
return &grpclbBalancer{
r: r,
}
}
@@ -131,27 +96,27 @@ type grpclbAddrInfo struct {
dropForLoadBalancing bool
}
type balancer struct {
r naming.Resolver
target string
mu sync.Mutex
seq int // a sequence number to make sure addrCh does not get stale addresses.
w naming.Watcher
addrCh chan []Address
rbs []remoteBalancerInfo
addrs []*grpclbAddrInfo
next int
waitCh chan struct{}
done bool
expTimer *time.Timer
rand *rand.Rand
type grpclbBalancer struct {
r naming.Resolver
target string
mu sync.Mutex
seq int // a sequence number to make sure addrCh does not get stale addresses.
w naming.Watcher
addrCh chan []Address
rbs []remoteBalancerInfo
addrs []*grpclbAddrInfo
next int
waitCh chan struct{}
done bool
rand *rand.Rand
clientStats lbpb.ClientStats
clientStats lbmpb.ClientStats
}
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
func (b *grpclbBalancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
updates, err := w.Next()
if err != nil {
grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
return err
}
b.mu.Lock()
@@ -173,24 +138,24 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
if exist {
continue
}
md, ok := update.Metadata.(*AddrMetadataGRPCLB)
md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
if !ok {
// TODO: Revisit the handling here and may introduce some fallback mechanism.
grpclog.Printf("The name resolution contains unexpected metadata %v", update.Metadata)
grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
continue
}
switch md.AddrType {
case Backend:
case naming.Backend:
// TODO: Revisit the handling here and may introduce some fallback mechanism.
grpclog.Printf("The name resolution does not give grpclb addresses")
grpclog.Errorf("The name resolution does not give grpclb addresses")
continue
case GRPCLB:
case naming.GRPCLB:
b.rbs = append(b.rbs, remoteBalancerInfo{
addr: update.Addr,
name: md.ServerName,
})
default:
grpclog.Printf("Received unknow address type %d", md.AddrType)
grpclog.Errorf("Received unknow address type %d", md.AddrType)
continue
}
case naming.Delete:
@@ -202,7 +167,7 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
}
}
default:
grpclog.Println("Unknown update.Op ", update.Op)
grpclog.Errorf("Unknown update.Op %v", update.Op)
}
}
// TODO: Fall back to the basic round-robin load balancing if the resulting address is
@@ -215,42 +180,33 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
return nil
}
func (b *balancer) serverListExpire(seq int) {
b.mu.Lock()
defer b.mu.Unlock()
// TODO: gRPC interanls do not clear the connections when the server list is stale.
// This means RPCs will keep using the existing server list until b receives new
// server list even though the list is expired. Revisit this behavior later.
if b.done || seq < b.seq {
return
}
b.next = 0
b.addrs = nil
// Ask grpc internals to close all the corresponding connections.
b.addrCh <- nil
}
func convertDuration(d *lbpb.Duration) time.Duration {
func convertDuration(d *lbmpb.Duration) time.Duration {
if d == nil {
return 0
}
return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
}
func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
if l == nil {
return
}
servers := l.GetServers()
expiration := convertDuration(l.GetExpirationInterval())
var (
sl []*grpclbAddrInfo
addrs []Address
)
for _, s := range servers {
md := metadata.Pairs("lb-token", s.LoadBalanceToken)
ip := net.IP(s.IpAddress)
ipStr := ip.String()
if ip.To4() == nil {
// Add square brackets to ipv6 addresses, otherwise net.Dial() and
// net.SplitHostPort() will return too many colons error.
ipStr = fmt.Sprintf("[%s]", ipStr)
}
addr := Address{
Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port),
Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
Metadata: &md,
}
sl = append(sl, &grpclbAddrInfo{
@@ -270,20 +226,11 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
b.next = 0
b.addrs = sl
b.addrCh <- addrs
if b.expTimer != nil {
b.expTimer.Stop()
b.expTimer = nil
}
if expiration > 0 {
b.expTimer = time.AfterFunc(expiration, func() {
b.serverListExpire(seq)
})
}
}
return
}
func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
func (b *grpclbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
@@ -294,29 +241,30 @@ func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Dura
}
b.mu.Lock()
stats := b.clientStats
b.clientStats = lbpb.ClientStats{} // Clear the stats.
b.clientStats = lbmpb.ClientStats{} // Clear the stats.
b.mu.Unlock()
t := time.Now()
stats.Timestamp = &lbpb.Timestamp{
stats.Timestamp = &lbmpb.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
if err := s.Send(&lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
if err := s.Send(&lbmpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_ClientStats{
ClientStats: &stats,
},
}); err != nil {
grpclog.Errorf("grpclb: failed to send load report: %v", err)
return
}
}
}
func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
func (b *grpclbBalancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbc.BalanceLoad(ctx)
if err != nil {
grpclog.Printf("Failed to perform RPC to the remote balancer %v", err)
grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
return
}
b.mu.Lock()
@@ -325,37 +273,39 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
return
}
b.mu.Unlock()
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{
initReq := &lbmpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbmpb.InitialLoadBalanceRequest{
Name: b.target,
},
},
}
if err := stream.Send(initReq); err != nil {
grpclog.Errorf("grpclb: failed to send init request: %v", err)
// TODO: backoff on retry?
return true
}
reply, err := stream.Recv()
if err != nil {
grpclog.Errorf("grpclb: failed to recv init response: %v", err)
// TODO: backoff on retry?
return true
}
initResp := reply.GetInitialResponse()
if initResp == nil {
grpclog.Println("Failed to receive the initial response from the remote balancer.")
grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
return
}
// TODO: Support delegation.
if initResp.LoadBalancerDelegate != "" {
// delegation
grpclog.Println("TODO: Delegation is not supported yet.")
grpclog.Errorf("TODO: Delegation is not supported yet.")
return
}
streamDone := make(chan struct{})
defer close(streamDone)
b.mu.Lock()
b.clientStats = lbpb.ClientStats{} // Clear client stats.
b.clientStats = lbmpb.ClientStats{} // Clear client stats.
b.mu.Unlock()
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
go b.sendLoadReport(stream, d, streamDone)
@@ -364,6 +314,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
for {
reply, err := stream.Recv()
if err != nil {
grpclog.Errorf("grpclb: failed to recv server list: %v", err)
break
}
b.mu.Lock()
@@ -381,7 +332,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
return true
}
func (b *balancer) Start(target string, config BalancerConfig) error {
func (b *grpclbBalancer) Start(target string, config BalancerConfig) error {
b.rand = rand.New(rand.NewSource(time.Now().Unix()))
// TODO: Fall back to the basic direct connection if there is no name resolver.
if b.r == nil {
@@ -397,6 +348,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
w, err := b.r.Resolve(target)
if err != nil {
b.mu.Unlock()
grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
return err
}
b.w = w
@@ -406,7 +358,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
go func() {
for {
if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
close(balancerAddrsCh)
return
}
@@ -490,22 +442,32 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
cc.Close()
}
// Talk to the remote load balancer to get the server list.
var err error
creds := config.DialCreds
ccError = make(chan struct{})
if creds == nil {
cc, err = Dial(rb.addr, WithInsecure())
} else {
var (
err error
dopts []DialOption
)
if creds := config.DialCreds; creds != nil {
if rb.name != "" {
if err := creds.OverrideServerName(rb.name); err != nil {
grpclog.Printf("Failed to override the server name in the credentials: %v", err)
grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
continue
}
}
cc, err = Dial(rb.addr, WithTransportCredentials(creds))
dopts = append(dopts, WithTransportCredentials(creds))
} else {
dopts = append(dopts, WithInsecure())
}
if dialer := config.Dialer; dialer != nil {
// WithDialer takes a different type of function, so we instead use a special DialOption here.
dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
}
dopts = append(dopts, WithBlock())
ccError = make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
cc, err = DialContext(ctx, rb.addr, dopts...)
cancel()
if err != nil {
grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
close(ccError)
continue
}
@@ -529,7 +491,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
return nil
}
func (b *balancer) down(addr Address, err error) {
func (b *grpclbBalancer) down(addr Address, err error) {
b.mu.Lock()
defer b.mu.Unlock()
for _, a := range b.addrs {
@@ -540,7 +502,7 @@ func (b *balancer) down(addr Address, err error) {
}
}
func (b *balancer) Up(addr Address) func(error) {
func (b *grpclbBalancer) Up(addr Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.done {
@@ -568,7 +530,7 @@ func (b *balancer) Up(addr Address) func(error) {
}
}
func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
func (b *grpclbBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{}
b.mu.Lock()
if b.done {
@@ -638,17 +600,10 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
}
}
if !opts.BlockingWait {
if len(b.addrs) == 0 {
b.clientStats.NumCallsFinished++
b.clientStats.NumCallsFinishedWithClientFailedToSend++
b.mu.Unlock()
err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Returns the next addr on b.addrs for a failfast RPC.
addr = b.addrs[b.next].addr
b.next++
b.clientStats.NumCallsFinished++
b.clientStats.NumCallsFinishedWithClientFailedToSend++
b.mu.Unlock()
err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Wait on b.waitCh for non-failfast RPCs.
@@ -725,17 +680,17 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
}
}
func (b *balancer) Notify() <-chan []Address {
func (b *grpclbBalancer) Notify() <-chan []Address {
return b.addrCh
}
func (b *balancer) Close() error {
func (b *grpclbBalancer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
b.done = true
if b.expTimer != nil {
b.expTimer.Stop()
if b.done {
return errBalancerClosed
}
b.done = true
if b.waitCh != nil {
close(b.waitCh)
}