Update vendor package github.com/coreos/...

This commit is contained in:
Guoliang Wang
2018-10-24 05:43:42 +00:00
parent c0974d7399
commit d462e1e8d7
137 changed files with 11138 additions and 556 deletions

View File

@@ -68,11 +68,10 @@ func (m *Mutex) Lock(ctx context.Context) error {
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if cancelled
select {
case <-ctx.Done():
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
default:
} else {
m.hdr = hdr
}
return werr

View File

@@ -23,6 +23,7 @@ go_library(
"//vendor/github.com/coreos/etcd/rafthttp:go_default_library",
"//vendor/github.com/coreos/etcd/version:go_default_library",
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library",
],
)

View File

@@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/raft"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
@@ -43,11 +44,6 @@ func HandlePrometheus(mux *http.ServeMux) {
mux.Handle(pathMetrics, promhttp.Handler())
}
// HandleHealth registers health handler on '/health'.
func HandleHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}
// NewHealthHandler handles '/health' requests.
func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
@@ -67,6 +63,26 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
}
}
var (
healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_success",
Help: "The total number of successful health checks",
})
healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_failures",
Help: "The total number of failed health checks",
})
)
func init() {
prometheus.MustRegister(healthSuccess)
prometheus.MustRegister(healthFailed)
}
// Health defines etcd server health status.
// TODO: remove manual parsing in etcdctl cluster-health
type Health struct {
@@ -97,5 +113,11 @@ func checkHealth(srv etcdserver.ServerV2) Health {
h.Health = "false"
}
}
if h.Health == "true" {
healthSuccess.Inc()
} else {
healthFailed.Inc()
}
return h
}

View File

@@ -37,15 +37,17 @@ go_library(
"//vendor/github.com/coreos/etcd/version:go_default_library",
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
"//vendor/github.com/gogo/protobuf/proto:go_default_library",
"//vendor/github.com/grpc-ecosystem/go-grpc-middleware:go_default_library",
"//vendor/github.com/grpc-ecosystem/go-grpc-prometheus:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/go.uber.org/zap:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/credentials:go_default_library",
"//vendor/google.golang.org/grpc/grpclog:go_default_library",
"//vendor/google.golang.org/grpc/health:go_default_library",
"//vendor/google.golang.org/grpc/health/grpc_health_v1:go_default_library",
"//vendor/google.golang.org/grpc/metadata:go_default_library",
"//vendor/google.golang.org/grpc/peer:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
],
)

View File

@@ -16,18 +16,15 @@ package v3rpc
import (
"crypto/tls"
"io/ioutil"
"math"
"os"
"sync"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -38,17 +35,21 @@ const (
maxSendBytes = math.MaxInt32
)
// integration tests call this multiple times, which is racey in gRPC side
var grpclogOnce sync.Once
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
}
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
newLogUnaryInterceptor(s),
newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor,
)))
opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
newStreamInterceptor(s),
grpc_prometheus.StreamServerInterceptor,
)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
@@ -71,16 +72,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
grpclogOnce.Do(func() {
if s.Cfg.Debug {
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
})
return grpcServer
}

View File

@@ -25,9 +25,11 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
const (
@@ -40,7 +42,7 @@ type streamsMap struct {
}
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
return nil, rpctypes.ErrGRPCNotCapable
}
@@ -54,7 +56,124 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
}
}
return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
return handler(ctx, req)
}
}
func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
resp, err := handler(ctx, req)
defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
return resp, err
}
}
func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
duration := time.Since(startTime)
remote := "No remote client info."
peerInfo, ok := peer.FromContext(ctx)
if ok {
remote = peerInfo.Addr.String()
}
var responseType string = info.FullMethod
var reqCount, respCount int64
var reqSize, respSize int
var reqContent string
switch _resp := resp.(type) {
case *pb.RangeResponse:
_req, ok := req.(*pb.RangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.GetCount()
respSize = _resp.Size()
}
case *pb.PutResponse:
_req, ok := req.(*pb.PutRequest)
if ok {
reqCount = 1
reqSize = _req.Size()
reqContent = pb.NewLoggablePutRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
case *pb.DeleteRangeResponse:
_req, ok := req.(*pb.DeleteRangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.GetDeleted()
respSize = _resp.Size()
}
case *pb.TxnResponse:
_req, ok := req.(*pb.TxnRequest)
if ok && _resp != nil {
if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
reqCount = int64(len(_req.GetSuccess()))
reqSize = 0
for _, r := range _req.GetSuccess() {
reqSize += r.Size()
}
} else {
reqCount = int64(len(_req.GetFailure()))
reqSize = 0
for _, r := range _req.GetFailure() {
reqSize += r.Size()
}
}
reqContent = pb.NewLoggableTxnRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
default:
reqCount = -1
reqSize = -1
respCount = -1
respSize = -1
}
logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
}
func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
if lg == nil {
plog.Debugf("start time = %v, "+
"time spent = %v, "+
"remote = %s, "+
"response type = %s, "+
"request count = %d, "+
"request size = %d, "+
"response count = %d, "+
"response size = %d, "+
"request content = %s",
startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
)
} else {
lg.Debug("request stats",
zap.Time("start time", startTime),
zap.Duration("time spent", duration),
zap.String("remote", remote),
zap.String("response type", responseType),
zap.Int64("request count", reqCount),
zap.Int("request size", reqSize),
zap.Int64("response count", respCount),
zap.Int("response size", respSize),
zap.String("request content", reqContent),
)
}
}
@@ -90,7 +209,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
}
}
return prometheus.StreamServerInterceptor(srv, ss, info, handler)
return handler(srv, ss)
}
}

View File

@@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string {
case as.Request.Put != nil:
return fmt.Sprintf("header:<%s> put:<%s>",
as.Request.Header.String(),
newLoggablePutRequest(as.Request.Put).String(),
NewLoggablePutRequest(as.Request.Put).String(),
)
case as.Request.Txn != nil:
return fmt.Sprintf("header:<%s> txn:<%s>",
@@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
func (as *requestOpStringer) String() string {
switch op := as.Op.Request.(type) {
case *RequestOp_RequestPut:
return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
case *RequestOp_RequestTxn:
return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String())
default:
@@ -167,7 +167,7 @@ type loggablePutRequest struct {
IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
}
func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
return &loggablePutRequest{
request.Key,
len(request.Value),

View File

@@ -90,6 +90,12 @@ var (
Name: "slow_read_indexes_total",
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
})
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "read_indexes_failed_total",
Help: "The total number of failed read indexes seen.",
})
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
@@ -110,6 +116,13 @@ var (
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
},
[]string{"server_go_version"})
serverID = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "id",
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
},
[]string{"server_id"})
)
func init() {
@@ -124,9 +137,11 @@ func init() {
prometheus.MustRegister(proposalsFailed)
prometheus.MustRegister(leaseExpired)
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(readIndexFailed)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
prometheus.MustRegister(currentGoVersion)
prometheus.MustRegister(serverID)
currentVersion.With(prometheus.Labels{
"server_version": version.Version,

View File

@@ -59,6 +59,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
"github.com/prometheus/client_golang/prometheus"
)
const (
@@ -435,6 +436,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}

View File

@@ -634,6 +634,7 @@ func (s *EtcdServer) linearizableReadLoop() {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
readIndexFailed.Inc()
nr.notify(err)
continue
}
@@ -659,7 +660,7 @@ func (s *EtcdServer) linearizableReadLoop() {
}
case <-time.After(s.Cfg.ReqTimeout()):
plog.Warningf("timed out waiting for read index response")
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()

View File

@@ -22,6 +22,7 @@ import (
"net/http"
"path"
"strings"
"time"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
@@ -149,6 +150,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
}
}
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
@@ -159,9 +162,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
@@ -169,6 +175,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
@@ -177,19 +184,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dec := &messageDecoder{r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
m, err := dec.decodeLimit(uint64(1 << 63))
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
plog.Errorf(msg)
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
if m.Type != raftpb.MsgSnap {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
@@ -200,9 +210,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
plog.Error(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
receivedBytes.WithLabelValues(from).Add(float64(n))
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
if err := h.r.Process(context.TODO(), m); err != nil {
@@ -215,12 +226,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to process raft message (%v)", err)
plog.Warningf(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}
// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}
type streamHandler struct {

View File

@@ -53,6 +53,68 @@ var (
[]string{"From"},
)
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_success",
Help: "Total number of successful snapshot sends",
},
[]string{"To"},
)
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_failures",
Help: "Total number of snapshot send failures",
},
[]string{"To"},
)
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot sends",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"To"},
)
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_success",
Help: "Total number of successful snapshot receives",
},
[]string{"From"},
)
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_failures",
Help: "Total number of snapshot receive failures",
},
[]string{"From"},
)
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot receives",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"From"},
)
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
@@ -69,5 +131,13 @@ func init() {
prometheus.MustRegister(receivedBytes)
prometheus.MustRegister(sentFailures)
prometheus.MustRegister(recvFailures)
prometheus.MustRegister(snapshotSend)
prometheus.MustRegister(snapshotSendFailures)
prometheus.MustRegister(snapshotSendSeconds)
prometheus.MustRegister(snapshotReceive)
prometheus.MustRegister(snapshotReceiveFailures)
prometheus.MustRegister(snapshotReceiveSeconds)
prometheus.MustRegister(rtts)
}

View File

@@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
if s.active {
plog.Errorf(msg)
plog.Infof("peer %s became inactive", s.id)
plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
s.active = false
s.since = time.Time{}
return

View File

@@ -17,6 +17,7 @@ package rafthttp
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/xiang90/probing"
)
@@ -28,7 +29,15 @@ var (
statusErrorInterval = 5 * time.Second
)
func addPeerToProber(p probing.Prober, id string, us []string) {
const (
// RoundTripperNameRaftMessage is the name of round-tripper that sends
// all other Raft messages, other than "snap.Message".
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
)
func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
hus := make([]string, len(us))
for i := range us {
hus[i] = us[i] + ProbingPrefix
@@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
if err != nil {
plog.Errorf("failed to add peer %s into prober", id)
} else {
go monitorProbingStatus(s, id)
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
}
}
func monitorProbingStatus(s probing.Status, id string) {
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
// set the first interval short to log error early.
interval := statusErrorInterval
for {
select {
case <-time.After(interval):
if !s.Health() {
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
interval = statusErrorInterval
} else {
interval = statusMonitoringInterval
}
if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
}
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
case <-s.StopNotify():
return
}

View File

@@ -64,7 +64,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
func (s *snapshotSender) stop() { close(s.stopc) }
func (s *snapshotSender) send(merged snap.Message) {
start := time.Now()
m := merged.Message
to := types.ID(m.To).String()
body := createSnapBody(merged)
defer body.Close()
@@ -92,14 +95,18 @@ func (s *snapshotSender) send(merged snap.Message) {
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
sentFailures.WithLabelValues(to).Inc()
snapshotSendFailures.WithLabelValues(to).Inc()
return
}
s.status.activate()
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
snapshotSend.WithLabelValues(to).Inc()
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
}
// post posts the given request.

View File

@@ -127,7 +127,8 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
prober probing.Prober
pipelineProber probing.Prober
streamProber probing.Prober
}
func (t *Transport) Start() error {
@@ -142,7 +143,8 @@ func (t *Transport) Start() error {
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
t.pipelineProber = probing.NewProber(t.pipelineRt)
t.streamProber = probing.NewProber(t.streamRt)
// If client didn't provide dial retry frequency, use the default
// (100ms backoff between attempts to create a new stream),
@@ -210,7 +212,8 @@ func (t *Transport) Stop() {
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
t.pipelineProber.RemoveAll()
t.streamProber.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
@@ -289,8 +292,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, id, fs)
addPeerToProber(t.prober, id.String(), us)
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("added peer %s", id)
}
@@ -317,7 +320,8 @@ func (t *Transport) removePeer(id types.ID) {
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
t.pipelineProber.Remove(id.String())
t.streamProber.Remove(id.String())
plog.Infof("removed peer %s", id)
}
@@ -334,8 +338,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
}
t.peers[id].update(urls)
t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us)
t.pipelineProber.Remove(id.String())
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
t.streamProber.Remove(id.String())
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("updated peer %s", id)
}

View File

@@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/coreos/etcd/pkg/fileutil"
)
@@ -30,6 +31,8 @@ var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
start := time.Now()
f, err := ioutil.TempFile(s.dir, "tmp")
if err != nil {
return 0, err
@@ -37,7 +40,9 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
var n int64
n, err = io.Copy(f, r)
if err == nil {
fsyncStart := time.Now()
err = fileutil.Fsync(f)
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
}
f.Close()
if err != nil {
@@ -57,6 +62,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
snapDBSaveSec.Observe(time.Since(start).Seconds())
return n, nil
}

View File

@@ -33,9 +33,33 @@ var (
Help: "The marshalling cost distributions of save called by snapshot.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "save_total_duration_seconds",
Help: "The total latency distributions of v3 snapshot save",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
})
snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "fsync_duration_seconds",
Help: "The latency distributions of fsyncing .snap.db file",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
)
func init() {
prometheus.MustRegister(saveDurations)
prometheus.MustRegister(marshallingDurations)
prometheus.MustRegister(snapDBSaveSec)
prometheus.MustRegister(snapDBFsyncSec)
}

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.3.9"
Version = "3.3.10"
APIVersion = "unknown"
// Git SHA Value will be set during build