Update etcd client to 3.2.24 for latest release

Signed-off-by: Timothy St. Clair <timothysc@gmail.com>
This commit is contained in:
Timothy St. Clair
2018-08-31 13:57:37 -05:00
parent 674401ace1
commit 0bb21f647f
54 changed files with 1472 additions and 613 deletions

View File

@@ -61,6 +61,7 @@ go_library(
"//vendor/github.com/coreos/go-semver/semver:go_default_library",
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
"//vendor/github.com/gogo/protobuf/proto:go_default_library",
"//vendor/github.com/golang/protobuf/proto:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
],

View File

@@ -15,6 +15,8 @@
package v3election
import (
"errors"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
@@ -22,6 +24,10 @@ import (
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
)
// ErrMissingLeaderKey is returned when election API request
// is missing the "leader" field.
var ErrMissingLeaderKey = errors.New(`"leader" field must be provided`)
type electionServer struct {
c *clientv3.Client
}
@@ -51,6 +57,9 @@ func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest
}
func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) {
if req.Leader == nil {
return nil, ErrMissingLeaderKey
}
s, err := es.session(ctx, req.Leader.Lease)
if err != nil {
return nil, err
@@ -98,6 +107,9 @@ func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*
}
func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) {
if req.Leader == nil {
return nil, ErrMissingLeaderKey
}
s, err := es.session(ctx, req.Leader.Lease)
if err != nil {
return nil, err

View File

@@ -44,6 +44,7 @@ go_library(
"//vendor/google.golang.org/grpc/credentials:go_default_library",
"//vendor/google.golang.org/grpc/grpclog:go_default_library",
"//vendor/google.golang.org/grpc/metadata:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
],
)

View File

@@ -92,7 +92,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
return nil
}
if err != nil {
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
}
return err
}
@@ -118,7 +122,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
}
return err
}
}

View File

@@ -32,8 +32,9 @@ var (
ErrGRPCFutureRev = grpc.Errorf(codes.OutOfRange, "etcdserver: mvcc: required revision is a future revision")
ErrGRPCNoSpace = grpc.Errorf(codes.ResourceExhausted, "etcdserver: mvcc: database space exceeded")
ErrGRPCLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found")
ErrGRPCLeaseExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists")
ErrGRPCLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found")
ErrGRPCLeaseExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists")
ErrGRPCLeaseTTLTooLarge = grpc.Errorf(codes.OutOfRange, "etcdserver: too large lease TTL")
ErrGRPCMemberExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: member ID already exist")
ErrGRPCPeerURLExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: Peer URLs already exists")
@@ -79,8 +80,9 @@ var (
grpc.ErrorDesc(ErrGRPCFutureRev): ErrGRPCFutureRev,
grpc.ErrorDesc(ErrGRPCNoSpace): ErrGRPCNoSpace,
grpc.ErrorDesc(ErrGRPCLeaseNotFound): ErrGRPCLeaseNotFound,
grpc.ErrorDesc(ErrGRPCLeaseExist): ErrGRPCLeaseExist,
grpc.ErrorDesc(ErrGRPCLeaseNotFound): ErrGRPCLeaseNotFound,
grpc.ErrorDesc(ErrGRPCLeaseExist): ErrGRPCLeaseExist,
grpc.ErrorDesc(ErrGRPCLeaseTTLTooLarge): ErrGRPCLeaseTTLTooLarge,
grpc.ErrorDesc(ErrGRPCMemberExist): ErrGRPCMemberExist,
grpc.ErrorDesc(ErrGRPCPeerURLExist): ErrGRPCPeerURLExist,
@@ -126,8 +128,9 @@ var (
ErrFutureRev = Error(ErrGRPCFutureRev)
ErrNoSpace = Error(ErrGRPCNoSpace)
ErrLeaseNotFound = Error(ErrGRPCLeaseNotFound)
ErrLeaseExist = Error(ErrGRPCLeaseExist)
ErrLeaseNotFound = Error(ErrGRPCLeaseNotFound)
ErrLeaseExist = Error(ErrGRPCLeaseExist)
ErrLeaseTTLTooLarge = Error(ErrGRPCLeaseTTLTooLarge)
ErrMemberExist = Error(ErrGRPCMemberExist)
ErrPeerURLExist = Error(ErrGRPCPeerURLExist)

View File

@@ -15,14 +15,18 @@
package v3rpc
import (
"strings"
"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func togRPCError(err error) error {
@@ -68,6 +72,8 @@ func togRPCError(err error) error {
return rpctypes.ErrGRPCLeaseNotFound
case lease.ErrLeaseExists:
return rpctypes.ErrGRPCLeaseExist
case lease.ErrLeaseTTLTooLarge:
return rpctypes.ErrGRPCLeaseTTLTooLarge
case auth.ErrRootUserNotExist:
return rpctypes.ErrGRPCRootUserNotExist
@@ -101,3 +107,35 @@ func togRPCError(err error) error {
return grpc.Errorf(codes.Unknown, err.Error())
}
}
func isClientCtxErr(ctxErr error, err error) bool {
if ctxErr != nil {
return true
}
ev, ok := status.FromError(err)
if !ok {
return false
}
switch ev.Code() {
case codes.Canceled, codes.DeadlineExceeded:
// client-side context cancel or deadline exceeded
// "rpc error: code = Canceled desc = context canceled"
// "rpc error: code = DeadlineExceeded desc = context deadline exceeded"
return true
case codes.Unavailable:
msg := ev.Message()
// client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected")
// "rpc error: code = Unavailable desc = client disconnected"
if msg == "client disconnected" {
return true
}
// "grpc/transport.ClientTransport.CloseStream" on canceled streams
// "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL")
if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") {
return true
}
}
return false
}

View File

@@ -141,7 +141,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
// deadlock when calling sws.close().
go func() {
if rerr := sws.recvLoop(); rerr != nil {
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
if isClientCtxErr(stream.Context().Err(), rerr) {
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
} else {
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
}
errc <- rerr
}
}()
@@ -338,7 +342,11 @@ func (sws *serverWatchStream) sendLoop() {
mvcc.ReportEventReceived(len(evs))
if err := sws.gRPCStream.Send(wr); err != nil {
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
}
return
}
@@ -355,7 +363,11 @@ func (sws *serverWatchStream) sendLoop() {
}
if err := sws.gRPCStream.Send(c); err != nil {
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
}
return
}
@@ -371,7 +383,11 @@ func (sws *serverWatchStream) sendLoop() {
for _, v := range pending[wid] {
mvcc.ReportEventReceived(len(v.Events))
if err := sws.gRPCStream.Send(v); err != nil {
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
}
return
}
}

View File

@@ -89,6 +89,9 @@ func (s *EtcdServer) newApplierV3() applierV3 {
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
defer func(start time.Time) {
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}(time.Now())
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {

View File

@@ -105,10 +105,12 @@ func (a *applierV2store) Sync(r *pb.Request) Response {
return Response{}
}
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
// from store.Event
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
toTTLOptions(r)
switch r.Method {
case "POST":
return s.applyV2.Post(r)

View File

@@ -48,8 +48,38 @@ type ServerConfig struct {
ForceNewCluster bool
PeerTLSInfo transport.TLSInfo
TickMs uint
ElectionTicks int
TickMs uint
ElectionTicks int
// InitialElectionTickAdvance is true, then local member fast-forwards
// election ticks to speed up "initial" leader election trigger. This
// benefits the case of larger election ticks. For instance, cross
// datacenter deployment may require longer election timeout of 10-second.
// If true, local node does not need wait up to 10-second. Instead,
// forwards its election ticks to 8-second, and have only 2-second left
// before leader election.
//
// Major assumptions are that:
// - cluster has no active leader thus advancing ticks enables faster
// leader election, or
// - cluster already has an established leader, and rejoining follower
// is likely to receive heartbeats from the leader after tick advance
// and before election timeout.
//
// However, when network from leader to rejoining follower is congested,
// and the follower does not receive leader heartbeat within left election
// ticks, disruptive election has to happen thus affecting cluster
// availabilities.
//
// Disabling this would slow down initial bootstrap process for cross
// datacenter deployments. Make your own tradeoffs by configuring
// --initial-election-tick-advance at the cost of slow initial bootstrap.
//
// If single-node, it advances ticks regardless.
//
// See https://github.com/coreos/etcd/issues/9333 for more detail.
InitialElectionTickAdvance bool
BootstrapTimeout time.Duration
AutoCompactionRetention int

View File

@@ -5,6 +5,7 @@ go_library(
srcs = [
"etcdserver.pb.go",
"raft_internal.pb.go",
"raft_internal_stringer.go",
"rpc.pb.go",
],
importmap = "k8s.io/kubernetes/vendor/github.com/coreos/etcd/etcdserver/etcdserverpb",

View File

@@ -0,0 +1,179 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserverpb
import (
"fmt"
"strings"
proto "github.com/golang/protobuf/proto"
)
// InternalRaftStringer implements custom proto Stringer:
// redact password, replace value fields with value_size fields.
type InternalRaftStringer struct {
Request *InternalRaftRequest
}
func (as *InternalRaftStringer) String() string {
switch {
case as.Request.LeaseGrant != nil:
return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
as.Request.Header.String(),
as.Request.LeaseGrant.TTL,
as.Request.LeaseGrant.ID,
)
case as.Request.LeaseRevoke != nil:
return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
as.Request.Header.String(),
as.Request.LeaseRevoke.ID,
)
case as.Request.Authenticate != nil:
return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
as.Request.Header.String(),
as.Request.Authenticate.Name,
as.Request.Authenticate.SimpleToken,
)
case as.Request.AuthUserAdd != nil:
return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserAdd.Name,
)
case as.Request.AuthUserChangePassword != nil:
return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserChangePassword.Name,
)
case as.Request.Put != nil:
return fmt.Sprintf("header:<%s> put:<%s>",
as.Request.Header.String(),
newLoggablePutRequest(as.Request.Put).String(),
)
case as.Request.Txn != nil:
return fmt.Sprintf("header:<%s> txn:<%s>",
as.Request.Header.String(),
NewLoggableTxnRequest(as.Request.Txn).String(),
)
default:
// nothing to redact
}
return as.Request.String()
}
// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
// fields in any nested txn and put operations.
type txnRequestStringer struct {
Request *TxnRequest
}
func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
return &txnRequestStringer{request}
}
func (as *txnRequestStringer) String() string {
var compare []string
for _, c := range as.Request.Compare {
switch cv := c.TargetUnion.(type) {
case *Compare_Value:
compare = append(compare, newLoggableValueCompare(c, cv).String())
default:
// nothing to redact
compare = append(compare, c.String())
}
}
var success []string
for _, s := range as.Request.Success {
success = append(success, newLoggableRequestOp(s).String())
}
var failure []string
for _, f := range as.Request.Failure {
failure = append(failure, newLoggableRequestOp(f).String())
}
return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
strings.Join(compare, " "),
strings.Join(success, " "),
strings.Join(failure, " "),
)
}
// requestOpStringer implements a custom proto String to replace value bytes fields with value
// size fields in any nested txn and put operations.
type requestOpStringer struct {
Op *RequestOp
}
func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
return &requestOpStringer{op}
}
func (as *requestOpStringer) String() string {
switch op := as.Op.Request.(type) {
case *RequestOp_RequestPut:
return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
default:
// nothing to redact
}
return as.Op.String()
}
// loggableValueCompare implements a custom proto String for Compare.Value union member types to
// replace the value bytes field with a value size field.
// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
type loggableValueCompare struct {
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
}
func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
return &loggableValueCompare{
c.Result,
c.Target,
c.Key,
len(cv.Value),
}
}
func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} }
func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
func (*loggableValueCompare) ProtoMessage() {}
// loggablePutRequest implements a custom proto String to replace value bytes field with a value
// size field.
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
type loggablePutRequest struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
IgnoreValue bool `protobuf:"varint,5,opt,name=ignore_value,proto3"`
IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
}
func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
return &loggablePutRequest{
request.Key,
len(request.Value),
request.Lease,
request.PrevKv,
request.IgnoreValue,
request.IgnoreLease,
}
}
func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} }
func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
func (*loggablePutRequest) ProtoMessage() {}

View File

@@ -15,9 +15,11 @@
package etcdserver
import (
goruntime "runtime"
"time"
"github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/version"
"github.com/prometheus/client_golang/prometheus"
)
@@ -28,12 +30,30 @@ var (
Name: "has_leader",
Help: "Whether or not a leader exists. 1 is existence, 0 is not.",
})
isLeader = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "is_leader",
Help: "Whether or not this member is a leader. 1 if is, 0 otherwise.",
})
leaderChanges = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "leader_changes_seen_total",
Help: "The number of leader changes seen.",
})
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "heartbeat_send_failures_total",
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
})
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_apply_total",
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
})
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
@@ -64,16 +84,56 @@ var (
Name: "lease_expired_total",
Help: "The total number of expired leases.",
})
slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_read_indexes_total",
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
})
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "quota_backend_bytes",
Help: "Current backend storage quota size in bytes.",
})
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "version",
Help: "Which version is running. 1 for 'server_version' label with current version.",
},
[]string{"server_version"})
currentGoVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "go_version",
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
},
[]string{"server_go_version"})
)
func init() {
prometheus.MustRegister(hasLeader)
prometheus.MustRegister(isLeader)
prometheus.MustRegister(leaderChanges)
prometheus.MustRegister(heartbeatSendFailures)
prometheus.MustRegister(slowApplies)
prometheus.MustRegister(proposalsCommitted)
prometheus.MustRegister(proposalsApplied)
prometheus.MustRegister(proposalsPending)
prometheus.MustRegister(proposalsFailed)
prometheus.MustRegister(leaseExpired)
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
prometheus.MustRegister(currentGoVersion)
currentVersion.With(prometheus.Labels{
"server_version": version.Version,
}).Set(1)
currentGoVersion.With(prometheus.Labels{
"server_go_version": goruntime.Version(),
}).Set(1)
}
func monitorFileDescriptor(done <-chan struct{}) {

View File

@@ -14,9 +14,7 @@
package etcdserver
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
import pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
const (
// DefaultQuotaBytes is the number of bytes the backend Size may
@@ -58,15 +56,20 @@ const (
)
func NewBackendQuota(s *EtcdServer) Quota {
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
if s.Cfg.QuotaBackendBytes < 0 {
// disable quotas if negative
plog.Warningf("disabling backend quota")
return &passthroughQuota{}
}
if s.Cfg.QuotaBackendBytes == 0 {
// use default size if no quota size given
quotaBackendBytes.Set(float64(DefaultQuotaBytes))
return &backendQuota{s, DefaultQuotaBytes}
}
if s.Cfg.QuotaBackendBytes > MaxQuotaBytes {
plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
}

View File

@@ -95,6 +95,7 @@ type raftNode struct {
term uint64
lead uint64
tickMu *sync.Mutex
raftNodeConfig
// a chan to send/receive snapshot
@@ -131,6 +132,7 @@ type raftNodeConfig struct {
func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
tickMu: new(sync.Mutex),
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
@@ -149,6 +151,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
return r
}
// raft.Node does not have locks in Raft package
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick()
r.tickMu.Unlock()
}
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
@@ -161,7 +170,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
for {
select {
case <-r.ticker.C:
r.Tick()
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
@@ -177,6 +186,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
if islead {
isLeader.Set(1)
} else {
isLeader.Set(0)
}
rh.updateLeadership(newLeader)
r.td.Reset()
}
@@ -332,6 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
heartbeatSendFailures.Inc()
}
}
}
@@ -368,13 +383,13 @@ func (r *raftNode) resumeSending() {
p.Resume()
}
// advanceTicksForElection advances ticks to the node for fast election.
// This reduces the time to wait for first leader election if bootstrapping the whole
// cluster, while leaving at least 1 heartbeat for possible existing leader
// to contact it.
func advanceTicksForElection(n raft.Node, electionTicks int) {
for i := 0; i < electionTicks-1; i++ {
n.Tick()
// advanceTicks advances ticks of Raft node.
// This can be used for fast-forwarding election
// ticks in multi data-center deployments, thus
// speeding up election process.
func (r *raftNode) advanceTicks(ticks int) {
for i := 0; i < ticks; i++ {
r.tick()
}
}
@@ -415,8 +430,8 @@ func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (i
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
return
return id, n, s, w
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
@@ -449,7 +464,6 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membe
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
return id, cl, n, s, w
}
@@ -498,6 +512,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
}
n := raft.RestartNode(c)
raftStatus = n.Status

View File

@@ -513,11 +513,56 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
return srv, nil
}
// Start prepares and starts server in a new goroutine. It is no longer safe to
// modify a server's fields after it has been sent to Start.
// It also starts a goroutine to publish its server information.
func (s *EtcdServer) adjustTicks() {
clusterN := len(s.cluster.Members())
// single-node fresh start, or single-node recovers from snapshot
if clusterN == 1 {
ticks := s.Cfg.ElectionTicks - 1
plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
s.r.advanceTicks(ticks)
return
}
if !s.Cfg.InitialElectionTickAdvance {
plog.Infof("skipping initial election tick advance (election tick %d)", s.Cfg.ElectionTicks)
return
}
// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
// until peer connection reports; otherwise:
// 1. all connections failed, or
// 2. no active peers, or
// 3. restarted single-node with no snapshot
// then, do nothing, because advancing ticks would have no effect
waitTime := rafthttp.ConnReadTimeout
itv := 50 * time.Millisecond
for i := int64(0); i < int64(waitTime/itv); i++ {
select {
case <-time.After(itv):
case <-s.stopping:
return
}
peerN := s.r.transport.ActivePeers()
if peerN > 1 {
// multi-node received peer connection reports
// adjust ticks, in case slow leader message receive
ticks := s.Cfg.ElectionTicks - 2
plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
s.r.advanceTicks(ticks)
return
}
}
}
// Start performs any initialization of the Server necessary for it to
// begin serving requests. It must be called before Do or Process.
// Start must be non-blocking; any long-running server functionality
// should be implemented in goroutines.
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
@@ -552,18 +597,21 @@ func (s *EtcdServer) start() {
}
func (s *EtcdServer) purgeFile() {
var serrc, werrc <-chan error
var dberrc, serrc, werrc <-chan error
if s.Cfg.MaxSnapFiles > 0 {
dberrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
}
if s.Cfg.MaxWALFiles > 0 {
werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
}
select {
case e := <-werrc:
plog.Fatalf("failed to purge wal file %v", e)
case e := <-dberrc:
plog.Fatalf("failed to purge snap db file %v", e)
case e := <-serrc:
plog.Fatalf("failed to purge snap file %v", e)
case e := <-werrc:
plog.Fatalf("failed to purge wal file %v", e)
case <-s.stopping:
return
}
@@ -743,8 +791,13 @@ func (s *EtcdServer) run() {
}
lid := lease.ID
s.goAttach(func() {
s.LeaseRevoke(s.ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
leaseExpired.Inc()
_, lerr := s.LeaseRevoke(s.ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
leaseExpired.Inc()
} else {
plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
}
<-c
})
}
@@ -765,14 +818,8 @@ func (s *EtcdServer) run() {
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
st := time.Now()
s.applyEntries(ep, apply)
d := time.Since(st)
entriesNum := len(apply.entries)
if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
plog.Warningf("avoid queries with large range/delete range!")
}
proposalsApplied.Set(float64(ep.appliedi))
s.applyWait.Trigger(ep.appliedi)
// wait for the raft routine to finish the disk writes before triggering a

View File

@@ -74,10 +74,10 @@ type serverStats struct {
func (ss *ServerStats) JSON() []byte {
ss.Lock()
stats := ss.serverStats
ss.Unlock()
stats.LeaderInfo.Uptime = time.Since(stats.LeaderInfo.StartTime).String()
stats.SendingPkgRate, stats.SendingBandwidthRate = stats.sendRateQueue.Rate()
stats.RecvingPkgRate, stats.RecvingBandwidthRate = stats.recvRateQueue.Rate()
stats.LeaderInfo.Uptime = time.Since(stats.LeaderInfo.StartTime).String()
ss.Unlock()
b, err := json.Marshal(stats)
// TODO(jonboulle): appropriate error handling?
if err != nil {

View File

@@ -15,11 +15,16 @@
package etcdserver
import (
"fmt"
"reflect"
"strings"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/golang/protobuf/proto"
)
// isConnectedToQuorumSince checks whether the local member is connected to the
@@ -95,3 +100,56 @@ func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}
func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
}
func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
var resps []string
for _, r := range txnResponse.Responses {
switch op := r.Response.(type) {
case *pb.ResponseOp_ResponseRange:
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
default:
// only range responses should be in a read only txn request
}
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
}
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
}
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
}
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
}
func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
// TODO: add metrics
d := time.Since(now)
if d > warnApplyDuration {
var result string
if err != nil {
result = fmt.Sprintf("error:%v", err)
} else {
result = resp
}
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
slowApplies.Inc()
}
}
func isNil(msg proto.Message) bool {
return msg == nil || reflect.ValueOf(msg).IsNil()
}

View File

@@ -19,8 +19,6 @@ import (
"encoding/binary"
"time"
"github.com/gogo/protobuf/proto"
"github.com/coreos/etcd/auth"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
@@ -28,7 +26,7 @@ import (
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)
@@ -82,20 +80,26 @@ type Authenticator interface {
}
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
var resp *pb.RangeResponse
var err error
defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
}(time.Now())
if !r.Serializable {
err := s.linearizableReadNotify(ctx)
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
}
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
err = serr
return nil, err
}
return resp, err
}
@@ -129,12 +133,18 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
chk := func(ai *auth.AuthInfo) error {
return checkTxnAuth(s.authStore, ai, r)
}
defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
}(time.Now())
get := func() { resp, err = s.applyV3Base.Txn(r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return resp, err
}
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
@@ -587,8 +597,9 @@ func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
for {
ctx := make([]byte, 8)
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)
select {
case <-s.readwaitc:
@@ -604,7 +615,7 @@ func (s *EtcdServer) linearizableReadLoop() {
s.readMu.Unlock()
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if err := s.r.ReadIndex(cctx, ctx); err != nil {
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
cancel()
if err == raft.ErrStopped {
return
@@ -622,16 +633,24 @@ func (s *EtcdServer) linearizableReadLoop() {
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctx)
done = bytes.Equal(rs.RequestCtx, ctxToSend)
if !done {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
id2 := uint64(0)
if len(rs.RequestCtx) == 8 {
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
}
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
slowReadIndex.Inc()
}
case <-time.After(s.Cfg.ReqTimeout()):
plog.Warningf("timed out waiting for read index response")
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()
case <-s.stopping:
return
}
@@ -681,6 +700,5 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
return authInfo, nil
}
}
return s.AuthStore().AuthInfoFromCtx(ctx)
}