Update etcd Godeps to 3.0.17

This commit is contained in:
Wojciech Tyczynski
2017-02-16 09:10:26 +01:00
parent 567595f550
commit d53add76dd
14 changed files with 297 additions and 221 deletions

View File

@@ -73,14 +73,14 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)
ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
}
if err != nil {
return err
return togRPCError(err)
}
resp.TTL = ttl

View File

@@ -49,9 +49,13 @@ var (
ErrGRPCRoleNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role is not granted to the user")
ErrGRPCPermissionNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: permission is not granted to the role")
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
ErrGRPCTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out")
ErrGRPCTimeoutDueToLeaderFail = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure")
ErrGRPCTimeoutDueToConnectionLost = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost")
ErrGRPCUnhealthy = grpc.Errorf(codes.Unavailable, "etcdserver: unhealthy cluster")
errStringToError = map[string]error{
grpc.ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
@@ -82,9 +86,13 @@ var (
grpc.ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted,
grpc.ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted,
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
grpc.ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
grpc.ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
grpc.ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
grpc.ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
}
// client-side error
@@ -116,9 +124,13 @@ var (
ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted)
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
)
// EtcdError defines gRPC server errors.

View File

@@ -38,6 +38,17 @@ func togRPCError(err error) error {
case etcdserver.ErrNoSpace:
return rpctypes.ErrGRPCNoSpace
case etcdserver.ErrNoLeader:
return rpctypes.ErrGRPCNoLeader
case etcdserver.ErrStopped:
return rpctypes.ErrGRPCStopped
case etcdserver.ErrTimeout:
return rpctypes.ErrGRPCTimeout
case etcdserver.ErrTimeoutDueToLeaderFail:
return rpctypes.ErrGRPCTimeoutDueToLeaderFail
case etcdserver.ErrTimeoutDueToConnectionLost:
return rpctypes.ErrGRPCTimeoutDueToConnectionLost
case auth.ErrRootUserNotExist:
return rpctypes.ErrGRPCRootUserNotExist
case auth.ErrRootRoleNotExist:

View File

@@ -18,6 +18,7 @@ import (
"encoding/json"
"expvar"
"fmt"
"math"
"math/rand"
"net/http"
"os"
@@ -398,11 +399,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
srv.be = be
srv.lessor = lease.NewLessor(srv.be)
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be)
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()

View File

@@ -20,6 +20,7 @@ import (
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
@@ -54,7 +55,7 @@ type Lessor interface {
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(id lease.LeaseID) (int64, error)
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
}
type Authenticator interface {
@@ -218,7 +219,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return result.resp.(*pb.LeaseRevokeResponse), nil
}
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil {
return ttl, nil
@@ -228,29 +229,44 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
}
// renewals don't go through raft; forward to leader manually
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil && err != nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
}
return -1, ErrTimeout
}
func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
leader := s.cluster.Member(s.Leader())
for i := 0; i < 5 && leader == nil; i++ {
for leader == nil {
// wait an election
dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
select {
case <-time.After(dur):
leader = s.cluster.Member(s.Leader())
case <-s.done:
return -1, ErrStopped
return nil, ErrStopped
case <-ctx.Done():
return nil, ErrNoLeader
}
}
if leader == nil || len(leader.PeerURLs) == 0 {
return -1, ErrNoLeader
return nil, ErrNoLeader
}
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
if err == nil {
break
}
}
return ttl, err
return leader, nil
}
func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {