Move deps from _workspace/ to vendor/

godep restore
pushd $GOPATH/src/github.com/appc/spec
git co master
popd
go get go4.org/errorutil
rm -rf Godeps
godep save ./...
git add vendor
git add -f $(git ls-files --other vendor/)
git co -- Godeps/LICENSES Godeps/.license_file_state Godeps/OWNERS
This commit is contained in:
Tim Hockin
2016-05-08 20:30:21 -07:00
parent 899f9b4e31
commit 3c0c5ed4e0
4400 changed files with 16739 additions and 376 deletions

27
vendor/github.com/coreos/etcd/rafthttp/coder.go generated vendored Normal file
View File

@@ -0,0 +1,27 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import "github.com/coreos/etcd/raft/raftpb"
type encoder interface {
// encode encodes the given message to an output stream.
encode(m raftpb.Message) error
}
type decoder interface {
// decode decodes the message from an input stream.
decode() (raftpb.Message, error)
}

16
vendor/github.com/coreos/etcd/rafthttp/doc.go generated vendored Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package rafthttp implements HTTP transportation layer for etcd/raft pkg.
package rafthttp

343
vendor/github.com/coreos/etcd/rafthttp/http.go generated vendored Normal file
View File

@@ -0,0 +1,343 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"path"
"strings"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/version"
"golang.org/x/net/context"
)
const (
// connReadLimitByte limits the number of bytes
// a single read can read out.
//
// 64KB should be large enough for not causing
// throughput bottleneck as well as small enough
// for not causing a read timeout.
connReadLimitByte = 64 * 1024
)
var (
RaftPrefix = "/raft"
ProbingPrefix = path.Join(RaftPrefix, "probing")
RaftStreamPrefix = path.Join(RaftPrefix, "stream")
RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
errIncompatibleVersion = errors.New("incompatible version")
errClusterIDMismatch = errors.New("cluster ID mismatch")
)
type peerGetter interface {
Get(id types.ID) Peer
}
type writerToResponse interface {
WriteTo(w http.ResponseWriter)
}
type pipelineHandler struct {
tr Transporter
r Raft
cid types.ID
}
// newPipelineHandler returns a handler for handling raft messages
// from pipeline for RaftPrefix.
//
// The handler reads out the raft message from request body,
// and forwards it to the given raft state machine for processing.
func newPipelineHandler(tr Transporter, r Raft, cid types.ID) http.Handler {
return &pipelineHandler{
tr: tr,
r: r,
cid: cid,
}
}
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err != nil {
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
h.tr.AddRemote(from, strings.Split(urls, ","))
}
}
// Limit the data size that could be read from the request body, which ensures that read from
// connection will not time out accidentally due to possible blocking in underlying implementation.
limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
b, err := ioutil.ReadAll(limitedr)
if err != nil {
plog.Errorf("failed to read raft message (%v)", err)
http.Error(w, "error reading raft message", http.StatusBadRequest)
return
}
var m raftpb.Message
if err := m.Unmarshal(b); err != nil {
plog.Errorf("failed to unmarshal raft message (%v)", err)
http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
return
}
if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) {
case writerToResponse:
v.WriteTo(w)
default:
plog.Warningf("failed to process raft message (%v)", err)
http.Error(w, "error processing raft message", http.StatusInternalServerError)
}
return
}
// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
}
type snapshotHandler struct {
tr Transporter
r Raft
snapshotter *snap.Snapshotter
cid types.ID
}
func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
return &snapshotHandler{
tr: tr,
r: r,
snapshotter: snapshotter,
cid: cid,
}
}
// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
// the handler will keep waiting for the request body until TCP keepalive
// finds out that the connection is broken after several minutes.
// This is acceptable because
// 1. snapshot messages sent through other TCP connections could still be
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err != nil {
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
h.tr.AddRemote(from, strings.Split(urls, ","))
}
}
dec := &messageDecoder{r: r.Body}
m, err := dec.decode()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
plog.Errorf(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
if m.Type != raftpb.MsgSnap {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
return
}
plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
// save incoming database snapshot.
if err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
plog.Error(msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) {
// Process may return writerToResponse error when doing some
// additional checks before calling raft.Node.Step.
case writerToResponse:
v.WriteTo(w)
default:
msg := fmt.Sprintf("failed to process raft message (%v)", err)
plog.Warningf(msg)
http.Error(w, msg, http.StatusInternalServerError)
}
return
}
// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
}
type streamHandler struct {
tr *Transport
peerGetter peerGetter
r Raft
id types.ID
cid types.ID
}
func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
return &streamHandler{
tr: tr,
peerGetter: pg,
r: r,
id: id,
cid: cid,
}
}
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.Header().Set("Allow", "GET")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Server-Version", version.Version)
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
var t streamType
switch path.Dir(r.URL.Path) {
case streamTypeMsgAppV2.endpoint():
t = streamTypeMsgAppV2
case streamTypeMessage.endpoint():
t = streamTypeMessage
default:
plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
http.Error(w, "invalid path", http.StatusNotFound)
return
}
fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
if err != nil {
plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
http.Error(w, "invalid from", http.StatusNotFound)
return
}
if h.r.IsIDRemoved(uint64(from)) {
plog.Warningf("rejected the stream from peer %s since it was removed", from)
http.Error(w, "removed member", http.StatusGone)
return
}
p := h.peerGetter.Get(from)
if p == nil {
// This may happen in following cases:
// 1. user starts a remote peer that belongs to a different cluster
// with the same cluster ID.
// 2. local etcd falls behind of the cluster, and cannot recognize
// the members that joined after its current progress.
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
h.tr.AddRemote(from, strings.Split(urls, ","))
}
plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
http.Error(w, "error sender not found", http.StatusNotFound)
return
}
wto := h.id.String()
if gto := r.Header.Get("X-Raft-To"); gto != wto {
plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
return
}
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
c := newCloseNotifier()
conn := &outgoingConn{
t: t,
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,
}
p.attachOutgoingConn(conn)
<-c.closeNotify()
}
// checkClusterCompatibilityFromHeader checks the cluster compatibility of
// the local member from the given header.
// It checks whether the version of local member is compatible with
// the versions in the header, and whether the cluster ID of local member
// matches the one in the header.
func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error {
if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil {
plog.Errorf("request version incompatibility (%v)", err)
return errIncompatibleVersion
}
if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
return errClusterIDMismatch
}
return nil
}
type closeNotifier struct {
done chan struct{}
}
func newCloseNotifier() *closeNotifier {
return &closeNotifier{
done: make(chan struct{}),
}
}
func (n *closeNotifier) Close() error {
close(n.done)
return nil
}
func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }

70
vendor/github.com/coreos/etcd/rafthttp/metrics.go generated vendored Normal file
View File

@@ -0,0 +1,70 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"time"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/prometheus/client_golang/prometheus"
)
var (
// TODO: create a separate histogram for recording
// snapshot sending metric. snapshot can be large and
// take a long time to send. So it needs a different
// time range than other type of messages.
msgSentDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "rafthttp",
Name: "message_sent_latency_seconds",
Help: "message sent latency distributions.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
},
[]string{"sendingType", "remoteID", "msgType"},
)
msgSentFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "rafthttp",
Name: "message_sent_failed_total",
Help: "The total number of failed messages sent.",
},
[]string{"sendingType", "remoteID", "msgType"},
)
)
func init() {
prometheus.MustRegister(msgSentDuration)
prometheus.MustRegister(msgSentFailed)
}
func reportSentDuration(sendingType string, m raftpb.Message, duration time.Duration) {
typ := m.Type.String()
if isLinkHeartbeatMessage(m) {
typ = "MsgLinkHeartbeat"
}
msgSentDuration.WithLabelValues(sendingType, types.ID(m.To).String(), typ).Observe(float64(duration) / float64(time.Second))
}
func reportSentFailure(sendingType string, m raftpb.Message) {
typ := m.Type.String()
if isLinkHeartbeatMessage(m) {
typ = "MsgLinkHeartbeat"
}
msgSentFailed.WithLabelValues(sendingType, types.ID(m.To).String(), typ).Inc()
}

55
vendor/github.com/coreos/etcd/rafthttp/msg_codec.go generated vendored Normal file
View File

@@ -0,0 +1,55 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"encoding/binary"
"io"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft/raftpb"
)
// messageEncoder is a encoder that can encode all kinds of messages.
// It MUST be used with a paired messageDecoder.
type messageEncoder struct {
w io.Writer
}
func (enc *messageEncoder) encode(m raftpb.Message) error {
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
return err
}
_, err := enc.w.Write(pbutil.MustMarshal(&m))
return err
}
// messageDecoder is a decoder that can decode all kinds of messages.
type messageDecoder struct {
r io.Reader
}
func (dec *messageDecoder) decode() (raftpb.Message, error) {
var m raftpb.Message
var l uint64
if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
return m, err
}
buf := make([]byte, int(l))
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
return m, m.Unmarshal(buf)
}

View File

@@ -0,0 +1,248 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"encoding/binary"
"fmt"
"io"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
const (
msgTypeLinkHeartbeat uint8 = 0
msgTypeAppEntries uint8 = 1
msgTypeApp uint8 = 2
msgAppV2BufSize = 1024 * 1024
)
// msgappv2 stream sends three types of message: linkHeartbeatMessage,
// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
// replicate state in raft, whose index and term are fully predictable.
//
// Data format of linkHeartbeatMessage:
// | offset | bytes | description |
// +--------+-------+-------------+
// | 0 | 1 | \x00 |
//
// Data format of AppEntries:
// | offset | bytes | description |
// +--------+-------+-------------+
// | 0 | 1 | \x01 |
// | 1 | 8 | length of entries |
// | 9 | 8 | length of first entry |
// | 17 | n1 | first entry |
// ...
// | x | 8 | length of k-th entry data |
// | x+8 | nk | k-th entry data |
// | x+8+nk | 8 | commit index |
//
// Data format of MsgApp:
// | offset | bytes | description |
// +--------+-------+-------------+
// | 0 | 1 | \x01 |
// | 1 | 8 | length of encoded message |
// | 9 | n | encoded message |
type msgAppV2Encoder struct {
w io.Writer
fs *stats.FollowerStats
term uint64
index uint64
buf []byte
uint64buf []byte
uint8buf []byte
}
func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
return &msgAppV2Encoder{
w: w,
fs: fs,
buf: make([]byte, msgAppV2BufSize),
uint64buf: make([]byte, 8),
uint8buf: make([]byte, 1),
}
}
func (enc *msgAppV2Encoder) encode(m raftpb.Message) error {
start := time.Now()
switch {
case isLinkHeartbeatMessage(m):
enc.uint8buf[0] = byte(msgTypeLinkHeartbeat)
if _, err := enc.w.Write(enc.uint8buf); err != nil {
return err
}
case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
enc.uint8buf[0] = byte(msgTypeAppEntries)
if _, err := enc.w.Write(enc.uint8buf); err != nil {
return err
}
// write length of entries
binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries)))
if _, err := enc.w.Write(enc.uint64buf); err != nil {
return err
}
for i := 0; i < len(m.Entries); i++ {
// write length of entry
binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size()))
if _, err := enc.w.Write(enc.uint64buf); err != nil {
return err
}
if n := m.Entries[i].Size(); n < msgAppV2BufSize {
if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil {
return err
}
if _, err := enc.w.Write(enc.buf[:n]); err != nil {
return err
}
} else {
if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
return err
}
}
enc.index++
}
// write commit index
binary.BigEndian.PutUint64(enc.uint64buf, m.Commit)
if _, err := enc.w.Write(enc.uint64buf); err != nil {
return err
}
enc.fs.Succ(time.Since(start))
default:
if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
return err
}
// write size of message
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
return err
}
// write message
if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil {
return err
}
enc.term = m.Term
enc.index = m.Index
if l := len(m.Entries); l > 0 {
enc.index = m.Entries[l-1].Index
}
enc.fs.Succ(time.Since(start))
}
return nil
}
type msgAppV2Decoder struct {
r io.Reader
local, remote types.ID
term uint64
index uint64
buf []byte
uint64buf []byte
uint8buf []byte
}
func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder {
return &msgAppV2Decoder{
r: r,
local: local,
remote: remote,
buf: make([]byte, msgAppV2BufSize),
uint64buf: make([]byte, 8),
uint8buf: make([]byte, 1),
}
}
func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
var (
m raftpb.Message
typ uint8
)
if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil {
return m, err
}
typ = uint8(dec.uint8buf[0])
switch typ {
case msgTypeLinkHeartbeat:
return linkHeartbeatMessage, nil
case msgTypeAppEntries:
m = raftpb.Message{
Type: raftpb.MsgApp,
From: uint64(dec.remote),
To: uint64(dec.local),
Term: dec.term,
LogTerm: dec.term,
Index: dec.index,
}
// decode entries
if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
return m, err
}
l := binary.BigEndian.Uint64(dec.uint64buf)
m.Entries = make([]raftpb.Entry, int(l))
for i := 0; i < int(l); i++ {
if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
return m, err
}
size := binary.BigEndian.Uint64(dec.uint64buf)
var buf []byte
if size < msgAppV2BufSize {
buf = dec.buf[:size]
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
} else {
buf = make([]byte, int(size))
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
}
dec.index++
// 1 alloc
pbutil.MustUnmarshal(&m.Entries[i], buf)
}
// decode commit index
if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
return m, err
}
m.Commit = binary.BigEndian.Uint64(dec.uint64buf)
case msgTypeApp:
var size uint64
if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
return m, err
}
buf := make([]byte, int(size))
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
pbutil.MustUnmarshal(&m, buf)
dec.term = m.Term
dec.index = m.Index
if l := len(m.Entries); l > 0 {
dec.index = m.Entries[l-1].Index
}
default:
return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
}
return m, nil
}

272
vendor/github.com/coreos/etcd/rafthttp/peer.go generated vendored Normal file
View File

@@ -0,0 +1,272 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"sync"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"golang.org/x/net/context"
)
const (
// ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
// A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
// tcp keepalive failing to detect a bad connection, which is at minutes level.
// For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
// to keep the connection alive.
// For short term pipeline connections, the connection MUST be killed to avoid it being
// put back to http pkg connection pool.
ConnReadTimeout = 5 * time.Second
ConnWriteTimeout = 5 * time.Second
recvBufSize = 4096
// maxPendingProposals holds the proposals during one leader election process.
// Generally one leader election takes at most 1 sec. It should have
// 0-2 election conflicts, and each one takes 0.5 sec.
// We assume the number of concurrent proposers is smaller than 4096.
// One client blocks on its proposal for at least 1 sec, so 4096 is enough
// to hold all proposals.
maxPendingProposals = 4096
streamAppV2 = "streamMsgAppV2"
streamMsg = "streamMsg"
pipelineMsg = "pipeline"
sendSnap = "sendMsgSnap"
)
type Peer interface {
// send sends the message to the remote peer. The function is non-blocking
// and has no promise that the message will be received by the remote.
// When it fails to send message out, it will report the status to underlying
// raft.
send(m raftpb.Message)
// sendSnap sends the merged snapshot message to the remote peer. Its behavior
// is similar to send.
sendSnap(m snap.Message)
// update updates the urls of remote peer.
update(urls types.URLs)
// attachOutgoingConn attaches the outgoing connection to the peer for
// stream usage. After the call, the ownership of the outgoing
// connection hands over to the peer. The peer will close the connection
// when it is no longer used.
attachOutgoingConn(conn *outgoingConn)
// activeSince returns the time that the connection with the
// peer becomes active.
activeSince() time.Time
// stop performs any necessary finalization and terminates the peer
// elegantly.
stop()
}
// peer is the representative of a remote raft node. Local raft node sends
// messages to the remote through peer.
// Each peer has two underlying mechanisms to send out a message: stream and
// pipeline.
// A stream is a receiver initialized long-polling connection, which
// is always open to transfer messages. Besides general stream, peer also has
// a optimized stream for sending msgApp since msgApp accounts for large part
// of all messages. Only raft leader uses the optimized stream to send msgApp
// to the remote follower node.
// A pipeline is a series of http clients that send http requests to the remote.
// It is only used when the stream has not been established.
type peer struct {
// id of the remote raft peer node
id types.ID
r Raft
status *peerStatus
picker *urlPicker
msgAppV2Writer *streamWriter
writer *streamWriter
pipeline *pipeline
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
msgAppV2Reader *streamReader
msgAppReader *streamReader
sendc chan raftpb.Message
recvc chan raftpb.Message
propc chan raftpb.Message
mu sync.Mutex
paused bool
cancel context.CancelFunc // cancel pending works in go routine created by peer.
stopc chan struct{}
}
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
status := newPeerStatus(to)
picker := newURLPicker(urls)
p := &peer{
id: to,
r: r,
status: status,
picker: picker,
msgAppV2Writer: startStreamWriter(to, status, fs, r),
writer: startStreamWriter(to, status, fs, r),
pipeline: newPipeline(transport, picker, local, to, cid, status, fs, r, errorc),
snapSender: newSnapshotSender(transport, picker, local, to, cid, status, r, errorc),
sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals),
stopc: make(chan struct{}),
}
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
plog.Warningf("failed to process raft message (%v)", err)
}
case <-p.stopc:
return
}
}
}()
// r.Process might block for processing proposal when there is no leader.
// Thus propc must be put into a separate routine with recvc to avoid blocking
// processing other raft messages.
go func() {
for {
select {
case mm := <-p.propc:
if err := r.Process(ctx, mm); err != nil {
plog.Warningf("failed to process raft message (%v)", err)
}
case <-p.stopc:
return
}
}
}()
p.msgAppV2Reader = startStreamReader(transport, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
p.msgAppReader = startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
return p
}
func (p *peer) send(m raftpb.Message) {
p.mu.Lock()
paused := p.paused
p.mu.Unlock()
if paused {
return
}
writec, name := p.pick(m)
select {
case writec <- m:
default:
p.r.ReportUnreachable(m.To)
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
if p.status.isActive() {
plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
}
plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
}
}
func (p *peer) sendSnap(m snap.Message) {
go p.snapSender.send(m)
}
func (p *peer) update(urls types.URLs) {
p.picker.update(urls)
}
func (p *peer) attachOutgoingConn(conn *outgoingConn) {
var ok bool
switch conn.t {
case streamTypeMsgAppV2:
ok = p.msgAppV2Writer.attach(conn)
case streamTypeMessage:
ok = p.writer.attach(conn)
default:
plog.Panicf("unhandled stream type %s", conn.t)
}
if !ok {
conn.Close()
}
}
func (p *peer) activeSince() time.Time { return p.status.activeSince }
// Pause pauses the peer. The peer will simply drops all incoming
// messages without returning an error.
func (p *peer) Pause() {
p.mu.Lock()
defer p.mu.Unlock()
p.paused = true
p.msgAppReader.pause()
p.msgAppV2Reader.pause()
}
// Resume resumes a paused peer.
func (p *peer) Resume() {
p.mu.Lock()
defer p.mu.Unlock()
p.paused = false
p.msgAppReader.resume()
p.msgAppV2Reader.resume()
}
func (p *peer) stop() {
close(p.stopc)
p.cancel()
p.msgAppV2Writer.stop()
p.writer.stop()
p.pipeline.stop()
p.snapSender.stop()
p.msgAppV2Reader.stop()
p.msgAppReader.stop()
}
// pick picks a chan for sending the given message. The picked chan and the picked chan
// string name are returned.
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
var ok bool
// Considering MsgSnap may have a big size, e.g., 1G, and will block
// stream for a long time, only use one of the N pipelines to send MsgSnap.
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
return writec, streamAppV2
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
return p.pipeline.msgc, pipelineMsg
}
func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

71
vendor/github.com/coreos/etcd/rafthttp/peer_status.go generated vendored Normal file
View File

@@ -0,0 +1,71 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"fmt"
"sync"
"time"
"github.com/coreos/etcd/pkg/types"
)
type failureType struct {
source string
action string
}
type peerStatus struct {
id types.ID
mu sync.Mutex // protect variables below
active bool
activeSince time.Time
}
func newPeerStatus(id types.ID) *peerStatus {
return &peerStatus{
id: id,
}
}
func (s *peerStatus) activate() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.active {
plog.Infof("the connection with %s became active", s.id)
s.active = true
s.activeSince = time.Now()
}
}
func (s *peerStatus) deactivate(failure failureType, reason string) {
s.mu.Lock()
defer s.mu.Unlock()
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
if s.active {
plog.Errorf(msg)
plog.Infof("the connection with %s became inactive", s.id)
s.active = false
s.activeSince = time.Time{}
return
}
plog.Debugf(msg)
}
func (s *peerStatus) isActive() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.active
}

169
vendor/github.com/coreos/etcd/rafthttp/pipeline.go generated vendored Normal file
View File

@@ -0,0 +1,169 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"bytes"
"errors"
"io/ioutil"
"sync"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)
const (
connPerPipeline = 4
// pipelineBufSize is the size of pipeline buffer, which helps hold the
// temporary network latency.
// The size ensures that pipeline does not drop messages when the network
// is out of work for less than 1 second in good path.
pipelineBufSize = 64
)
var errStopped = errors.New("stopped")
type pipeline struct {
from, to types.ID
cid types.ID
tr *Transport
picker *urlPicker
status *peerStatus
fs *stats.FollowerStats
r Raft
errorc chan error
msgc chan raftpb.Message
// wait for the handling routines
wg sync.WaitGroup
stopc chan struct{}
}
func newPipeline(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
p := &pipeline{
from: from,
to: to,
cid: cid,
tr: tr,
picker: picker,
status: status,
fs: fs,
r: r,
errorc: errorc,
stopc: make(chan struct{}),
msgc: make(chan raftpb.Message, pipelineBufSize),
}
p.wg.Add(connPerPipeline)
for i := 0; i < connPerPipeline; i++ {
go p.handle()
}
return p
}
func (p *pipeline) stop() {
close(p.stopc)
p.wg.Wait()
}
func (p *pipeline) handle() {
defer p.wg.Done()
for {
select {
case m := <-p.msgc:
start := time.Now()
err := p.post(pbutil.MustMarshal(&m))
end := time.Now()
if err != nil {
p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
reportSentFailure(pipelineMsg, m)
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail()
}
p.r.ReportUnreachable(m.To)
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
continue
}
p.status.activate()
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
}
reportSentDuration(pipelineMsg, m, time.Since(start))
case <-p.stopc:
return
}
}
}
// post POSTs a data payload to a url. Returns nil if the POST succeeds,
// error on any failure.
func (p *pipeline) post(data []byte) (err error) {
u := p.picker.pick()
req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.from, p.cid)
done := make(chan struct{}, 1)
cancel := httputil.RequestCanceler(p.tr.pipelineRt, req)
go func() {
select {
case <-done:
case <-p.stopc:
waitSchedule()
cancel()
}
}()
resp, err := p.tr.pipelineRt.RoundTrip(req)
done <- struct{}{}
if err != nil {
p.picker.unreachable(u)
return err
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
p.picker.unreachable(u)
return err
}
resp.Body.Close()
err = checkPostResponse(resp, b, req, p.to)
if err != nil {
p.picker.unreachable(u)
// errMemberRemoved is a critical error since a removed member should
// always be stopped. So we use reportCriticalError to report it to errorc.
if err == errMemberRemoved {
reportCriticalError(err, p.errorc)
}
return err
}
return nil
}
// waitSchedule waits other goroutines to be scheduled for a while
func waitSchedule() { time.Sleep(time.Millisecond) }

View File

@@ -0,0 +1,60 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"time"
"github.com/xiang90/probing"
)
var (
// proberInterval must be shorter than read timeout.
// Or the connection will time-out.
proberInterval = ConnReadTimeout - time.Second
statusMonitoringInterval = 30 * time.Second
)
func addPeerToProber(p probing.Prober, id string, us []string) {
hus := make([]string, len(us))
for i := range us {
hus[i] = us[i] + ProbingPrefix
}
p.AddHTTP(id, proberInterval, hus)
s, err := p.Status(id)
if err != nil {
plog.Errorf("failed to add peer %s into prober", id)
} else {
go monitorProbingStatus(s, id)
}
}
func monitorProbingStatus(s probing.Status, id string) {
for {
select {
case <-time.After(statusMonitoringInterval):
if !s.Health() {
plog.Warningf("the connection to peer %s is unhealthy", id)
}
if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
}
case <-s.StopNotify():
return
}
}
}

51
vendor/github.com/coreos/etcd/rafthttp/remote.go generated vendored Normal file
View File

@@ -0,0 +1,51 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
type remote struct {
id types.ID
status *peerStatus
pipeline *pipeline
}
func startRemote(tr *Transport, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
picker := newURLPicker(urls)
status := newPeerStatus(to)
return &remote{
id: to,
status: status,
pipeline: newPipeline(tr, picker, local, to, cid, status, nil, r, errorc),
}
}
func (g *remote) send(m raftpb.Message) {
select {
case g.pipeline.msgc <- m:
default:
if g.status.isActive() {
plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
}
plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
}
}
func (g *remote) stop() {
g.pipeline.stop()
}

View File

@@ -0,0 +1,156 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/coreos/etcd/pkg/httputil"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/snap"
)
var (
// timeout for reading snapshot response body
snapResponseReadTimeout = 5 * time.Second
)
type snapshotSender struct {
from, to types.ID
cid types.ID
tr *Transport
picker *urlPicker
status *peerStatus
r Raft
errorc chan error
stopc chan struct{}
}
func newSnapshotSender(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender {
return &snapshotSender{
from: from,
to: to,
cid: cid,
tr: tr,
picker: picker,
status: status,
r: r,
errorc: errorc,
stopc: make(chan struct{}),
}
}
func (s *snapshotSender) stop() { close(s.stopc) }
func (s *snapshotSender) send(merged snap.Message) {
m := merged.Message
start := time.Now()
body := createSnapBody(merged)
defer body.Close()
u := s.picker.pick()
req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
err := s.post(req)
defer merged.CloseWithError(err)
if err != nil {
plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
// errMemberRemoved is a critical error since a removed member should
// always be stopped. So we use reportCriticalError to report it to errorc.
if err == errMemberRemoved {
reportCriticalError(err, s.errorc)
}
s.picker.unreachable(u)
reportSentFailure(sendSnap, m)
s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
s.r.ReportUnreachable(m.To)
// report SnapshotFailure to raft state machine. After raft state
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
return
}
reportSentDuration(sendSnap, m, time.Since(start))
s.status.activate()
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
}
// post posts the given request.
// It returns nil when request is sent out and processed successfully.
func (s *snapshotSender) post(req *http.Request) (err error) {
cancel := httputil.RequestCanceler(s.tr.pipelineRt, req)
type responseAndError struct {
resp *http.Response
body []byte
err error
}
result := make(chan responseAndError, 1)
go func() {
resp, err := s.tr.pipelineRt.RoundTrip(req)
if err != nil {
result <- responseAndError{resp, nil, err}
return
}
// close the response body when timeouts.
// prevents from reading the body forever when the other side dies right after
// successfully receives the request body.
time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) })
body, err := ioutil.ReadAll(resp.Body)
result <- responseAndError{resp, body, err}
}()
select {
case <-s.stopc:
cancel()
return errStopped
case r := <-result:
if r.err != nil {
return r.err
}
return checkPostResponse(r.resp, r.body, req, s.to)
}
}
func createSnapBody(merged snap.Message) io.ReadCloser {
buf := new(bytes.Buffer)
enc := &messageEncoder{w: buf}
// encode raft message
if err := enc.encode(merged.Message); err != nil {
plog.Panicf("encode message error (%v)", err)
}
return &pioutil.ReaderAndCloser{
Reader: io.MultiReader(buf, merged.ReadCloser),
Closer: merged.ReadCloser,
}
}

496
vendor/github.com/coreos/etcd/rafthttp/stream.go generated vendored Normal file
View File

@@ -0,0 +1,496 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"path"
"strings"
"sync"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
)
const (
streamTypeMessage streamType = "message"
streamTypeMsgAppV2 streamType = "msgappv2"
streamBufSize = 4096
)
var (
errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
// the key is in string format "major.minor.patch"
supportedStream = map[string][]streamType{
"2.0.0": {},
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
}
)
type streamType string
func (t streamType) endpoint() string {
switch t {
case streamTypeMsgAppV2:
return path.Join(RaftStreamPrefix, "msgapp")
case streamTypeMessage:
return path.Join(RaftStreamPrefix, "message")
default:
plog.Panicf("unhandled stream type %v", t)
return ""
}
}
func (t streamType) String() string {
switch t {
case streamTypeMsgAppV2:
return "stream MsgApp v2"
case streamTypeMessage:
return "stream Message"
default:
return "unknown stream"
}
}
var (
// linkHeartbeatMessage is a special message used as heartbeat message in
// link layer. It never conflicts with messages from raft because raft
// doesn't send out messages without From and To fields.
linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
)
func isLinkHeartbeatMessage(m raftpb.Message) bool {
return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
}
type outgoingConn struct {
t streamType
io.Writer
http.Flusher
io.Closer
}
// streamWriter writes messages to the attached outgoingConn.
type streamWriter struct {
id types.ID
status *peerStatus
fs *stats.FollowerStats
r Raft
mu sync.Mutex // guard field working and closer
closer io.Closer
working bool
msgc chan raftpb.Message
connc chan *outgoingConn
stopc chan struct{}
done chan struct{}
}
// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
// messages and writes to the attached outgoing connection.
func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
w := &streamWriter{
id: id,
status: status,
fs: fs,
r: r,
msgc: make(chan raftpb.Message, streamBufSize),
connc: make(chan *outgoingConn),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
go w.run()
return w
}
func (cw *streamWriter) run() {
var (
msgc chan raftpb.Message
heartbeatc <-chan time.Time
t streamType
enc encoder
flusher http.Flusher
batched int
)
tickc := time.Tick(ConnReadTimeout / 3)
for {
select {
case <-heartbeatc:
start := time.Now()
err := enc.encode(linkHeartbeatMessage)
if err == nil {
flusher.Flush()
batched = 0
reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start))
continue
}
reportSentFailure(string(t), linkHeartbeatMessage)
cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
cw.close()
heartbeatc, msgc = nil, nil
case m := <-msgc:
start := time.Now()
err := enc.encode(m)
if err == nil {
if len(msgc) == 0 || batched > streamBufSize/2 {
flusher.Flush()
batched = 0
} else {
batched++
}
reportSentDuration(string(t), m, time.Since(start))
continue
}
reportSentFailure(string(t), m)
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
cw.close()
heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To)
case conn := <-cw.connc:
cw.close()
t = conn.t
switch conn.t {
case streamTypeMsgAppV2:
enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
case streamTypeMessage:
enc = &messageEncoder{w: conn.Writer}
default:
plog.Panicf("unhandled stream type %s", conn.t)
}
flusher = conn.Flusher
cw.mu.Lock()
cw.status.activate()
cw.closer = conn.Closer
cw.working = true
cw.mu.Unlock()
heartbeatc, msgc = tickc, cw.msgc
case <-cw.stopc:
cw.close()
close(cw.done)
return
}
}
}
func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
cw.mu.Lock()
defer cw.mu.Unlock()
return cw.msgc, cw.working
}
func (cw *streamWriter) close() {
cw.mu.Lock()
defer cw.mu.Unlock()
if !cw.working {
return
}
cw.closer.Close()
if len(cw.msgc) > 0 {
cw.r.ReportUnreachable(uint64(cw.id))
}
cw.msgc = make(chan raftpb.Message, streamBufSize)
cw.working = false
}
func (cw *streamWriter) attach(conn *outgoingConn) bool {
select {
case cw.connc <- conn:
return true
case <-cw.done:
return false
}
}
func (cw *streamWriter) stop() {
close(cw.stopc)
<-cw.done
}
// streamReader is a long-running go-routine that dials to the remote stream
// endpoint and reads messages from the response body returned.
type streamReader struct {
tr *Transport
picker *urlPicker
t streamType
local, remote types.ID
cid types.ID
status *peerStatus
recvc chan<- raftpb.Message
propc chan<- raftpb.Message
errorc chan<- error
mu sync.Mutex
paused bool
cancel func()
closer io.Closer
stopc chan struct{}
done chan struct{}
}
func startStreamReader(tr *Transport, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
r := &streamReader{
tr: tr,
picker: picker,
t: t,
local: local,
remote: remote,
cid: cid,
status: status,
recvc: recvc,
propc: propc,
errorc: errorc,
stopc: make(chan struct{}),
done: make(chan struct{}),
}
go r.run()
return r
}
func (cr *streamReader) run() {
for {
t := cr.t
rc, err := cr.dial(t)
if err != nil {
if err != errUnsupportedStreamType {
cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
}
} else {
cr.status.activate()
err := cr.decodeLoop(rc, t)
switch {
// all data is read out
case err == io.EOF:
// connection is closed by the remote
case isClosedConnectionError(err):
default:
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
}
}
select {
// Wait 100ms to create a new stream, so it doesn't bring too much
// overhead when retry.
case <-time.After(100 * time.Millisecond):
case <-cr.stopc:
close(cr.done)
return
}
}
}
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder
cr.mu.Lock()
switch t {
case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.local, cr.remote)
case streamTypeMessage:
dec = &messageDecoder{r: rc}
default:
plog.Panicf("unhandled stream type %s", t)
}
cr.closer = rc
cr.mu.Unlock()
for {
m, err := dec.decode()
if err != nil {
cr.mu.Lock()
cr.close()
cr.mu.Unlock()
return err
}
cr.mu.Lock()
paused := cr.paused
cr.mu.Unlock()
if paused {
continue
}
if isLinkHeartbeatMessage(m) {
// raft is not interested in link layer
// heartbeat message, so we should ignore
// it.
continue
}
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
default:
if cr.status.isActive() {
plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
}
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
}
}
}
func (cr *streamReader) stop() {
close(cr.stopc)
cr.mu.Lock()
if cr.cancel != nil {
cr.cancel()
}
cr.close()
cr.mu.Unlock()
<-cr.done
}
func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
u := cr.picker.pick()
uu := u
uu.Path = path.Join(t.endpoint(), cr.local.String())
req, err := http.NewRequest("GET", uu.String(), nil)
if err != nil {
cr.picker.unreachable(u)
return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err)
}
req.Header.Set("X-Server-From", cr.local.String())
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
req.Header.Set("X-Raft-To", cr.remote.String())
setPeerURLsHeader(req, cr.tr.URLs)
cr.mu.Lock()
select {
case <-cr.stopc:
cr.mu.Unlock()
return nil, fmt.Errorf("stream reader is stopped")
default:
}
cr.cancel = httputil.RequestCanceler(cr.tr.streamRt, req)
cr.mu.Unlock()
resp, err := cr.tr.streamRt.RoundTrip(req)
if err != nil {
cr.picker.unreachable(u)
return nil, err
}
rv := serverVersion(resp.Header)
lv := semver.Must(semver.NewVersion(version.Version))
if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
httputil.GracefulClose(resp)
cr.picker.unreachable(u)
return nil, errUnsupportedStreamType
}
switch resp.StatusCode {
case http.StatusGone:
httputil.GracefulClose(resp)
cr.picker.unreachable(u)
err := fmt.Errorf("the member has been permanently removed from the cluster")
select {
case cr.errorc <- err:
default:
}
return nil, err
case http.StatusOK:
return resp.Body, nil
case http.StatusNotFound:
httputil.GracefulClose(resp)
cr.picker.unreachable(u)
return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote)
case http.StatusPreconditionFailed:
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
cr.picker.unreachable(u)
return nil, err
}
httputil.GracefulClose(resp)
cr.picker.unreachable(u)
switch strings.TrimSuffix(string(b), "\n") {
case errIncompatibleVersion.Error():
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.remote)
return nil, errIncompatibleVersion
case errClusterIDMismatch.Error():
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
cr.remote, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid)
return nil, errClusterIDMismatch
default:
return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
}
default:
httputil.GracefulClose(resp)
cr.picker.unreachable(u)
return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
}
}
func (cr *streamReader) close() {
if cr.closer != nil {
cr.closer.Close()
}
cr.closer = nil
}
func (cr *streamReader) pause() {
cr.mu.Lock()
defer cr.mu.Unlock()
cr.paused = true
}
func (cr *streamReader) resume() {
cr.mu.Lock()
defer cr.mu.Unlock()
cr.paused = false
}
func isClosedConnectionError(err error) bool {
operr, ok := err.(*net.OpError)
return ok && operr.Err.Error() == "use of closed network connection"
}
// checkStreamSupport checks whether the stream type is supported in the
// given version.
func checkStreamSupport(v *semver.Version, t streamType) bool {
nv := &semver.Version{Major: v.Major, Minor: v.Minor}
for _, s := range supportedStream[nv.String()] {
if s == t {
return true
}
}
return false
}

356
vendor/github.com/coreos/etcd/rafthttp/transport.go generated vendored Normal file
View File

@@ -0,0 +1,356 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/http"
"sync"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/pkg/capnslog"
"github.com/xiang90/probing"
"golang.org/x/net/context"
)
var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
type Raft interface {
Process(ctx context.Context, m raftpb.Message) error
IsIDRemoved(id uint64) bool
ReportUnreachable(id uint64)
ReportSnapshot(id uint64, status raft.SnapshotStatus)
}
type Transporter interface {
// Start starts the given Transporter.
// Start MUST be called before calling other functions in the interface.
Start() error
// Handler returns the HTTP handler of the transporter.
// A transporter HTTP handler handles the HTTP requests
// from remote peers.
// The handler MUST be used to handle RaftPrefix(/raft)
// endpoint.
Handler() http.Handler
// Send sends out the given messages to the remote peers.
// Each message has a To field, which is an id that maps
// to an existing peer in the transport.
// If the id cannot be found in the transport, the message
// will be ignored.
Send(m []raftpb.Message)
// SendSnapshot sends out the given snapshot message to a remote peer.
// The behavior of SendSnapshot is similar to Send.
SendSnapshot(m snap.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
// It is the caller's responsibility to ensure the urls are all valid,
// or it panics.
AddRemote(id types.ID, urls []string)
// AddPeer adds a peer with given peer urls into the transport.
// It is the caller's responsibility to ensure the urls are all valid,
// or it panics.
// Peer urls are used to connect to the remote peer.
AddPeer(id types.ID, urls []string)
// RemovePeer removes the peer with given id.
RemovePeer(id types.ID)
// RemoveAllPeers removes all the existing peers in the transport.
RemoveAllPeers()
// UpdatePeer updates the peer urls of the peer with the given id.
// It is the caller's responsibility to ensure the urls are all valid,
// or it panics.
UpdatePeer(id types.ID, urls []string)
// ActiveSince returns the time that the connection with the peer
// of the given id becomes active.
// If the connection is active since peer was added, it returns the adding time.
// If the connection is currently inactive, it returns zero time.
ActiveSince(id types.ID) time.Time
// Stop closes the connections and stops the transporter.
Stop()
}
// Transport implements Transporter interface. It provides the functionality
// to send raft messages to peers, and receive raft messages from peers.
// User should call Handler method to get a handler to serve requests
// received from peerURLs.
// User needs to call Start before calling other functions, and call
// Stop when the Transport is no longer used.
type Transport struct {
DialTimeout time.Duration // maximum duration before timing out dial of the request
TLSInfo transport.TLSInfo // TLS information used when creating connection
ID types.ID // local member ID
URLs types.URLs // local peer URLs
ClusterID types.ID // raft cluster ID for request validation
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
Snapshotter *snap.Snapshotter
ServerStats *stats.ServerStats // used to record general transportation statistics
// used to record transportation statistics with followers when
// performing as leader in raft protocol
LeaderStats *stats.LeaderStats
// ErrorC is used to report detected critical errors, e.g.,
// the member has been permanently removed from the cluster
// When an error is received from ErrorC, user should stop raft state
// machine and thus stop the Transport.
ErrorC chan error
streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
prober probing.Prober
}
func (t *Transport) Start() error {
var err error
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
return nil
}
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}
func (t *Transport) Get(id types.ID) Peer {
t.mu.RLock()
defer t.mu.RUnlock()
return t.peers[id]
}
func (t *Transport) Send(msgs []raftpb.Message) {
for _, m := range msgs {
if m.To == 0 {
// ignore intentionally dropped message
continue
}
to := types.ID(m.To)
t.mu.RLock()
p, pok := t.peers[to]
g, rok := t.remotes[to]
t.mu.RUnlock()
if pok {
if m.Type == raftpb.MsgApp {
t.ServerStats.SendAppendReq(m.Size())
}
p.send(m)
continue
}
if rok {
g.send(m)
continue
}
plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
}
}
func (t *Transport) Stop() {
t.mu.Lock()
defer t.mu.Unlock()
for _, r := range t.remotes {
r.stop()
}
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
if tr, ok := t.pipelineRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
}
func (t *Transport) AddRemote(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.peers[id]; ok {
return
}
if _, ok := t.remotes[id]; ok {
return
}
urls, err := types.NewURLs(us)
if err != nil {
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
t.remotes[id] = startRemote(t, urls, t.ID, id, t.ClusterID, t.Raft, t.ErrorC)
}
func (t *Transport) AddPeer(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.peers[id]; ok {
return
}
urls, err := types.NewURLs(us)
if err != nil {
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
addPeerToProber(t.prober, id.String(), us)
}
func (t *Transport) RemovePeer(id types.ID) {
t.mu.Lock()
defer t.mu.Unlock()
t.removePeer(id)
}
func (t *Transport) RemoveAllPeers() {
t.mu.Lock()
defer t.mu.Unlock()
for id := range t.peers {
t.removePeer(id)
}
}
// the caller of this function must have the peers mutex.
func (t *Transport) removePeer(id types.ID) {
if peer, ok := t.peers[id]; ok {
peer.stop()
} else {
plog.Panicf("unexpected removal of unknown peer '%d'", id)
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
}
func (t *Transport) UpdatePeer(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
// TODO: return error or just panic?
if _, ok := t.peers[id]; !ok {
return
}
urls, err := types.NewURLs(us)
if err != nil {
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
t.peers[id].update(urls)
t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us)
}
func (t *Transport) ActiveSince(id types.ID) time.Time {
t.mu.Lock()
defer t.mu.Unlock()
if p, ok := t.peers[id]; ok {
return p.activeSince()
}
return time.Time{}
}
func (t *Transport) SendSnapshot(m snap.Message) {
t.mu.Lock()
defer t.mu.Unlock()
p := t.peers[types.ID(m.To)]
if p == nil {
m.CloseWithError(errMemberNotFound)
return
}
p.sendSnap(m)
}
// Pausable is a testing interface for pausing transport traffic.
type Pausable interface {
Pause()
Resume()
}
func (t *Transport) Pause() {
for _, p := range t.peers {
p.(Pausable).Pause()
}
}
func (t *Transport) Resume() {
for _, p := range t.peers {
p.(Pausable).Resume()
}
}
type nopTransporter struct{}
func NewNopTransporter() Transporter {
return &nopTransporter{}
}
func (s *nopTransporter) Start() error { return nil }
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}
type snapTransporter struct {
nopTransporter
snapDoneC chan snap.Message
snapDir string
}
func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) {
ch := make(chan snap.Message, 1)
tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
return tr, ch
}
func (s *snapTransporter) SendSnapshot(m snap.Message) {
ss := snap.New(s.snapDir)
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
m.CloseWithError(nil)
s.snapDoneC <- m
}

57
vendor/github.com/coreos/etcd/rafthttp/urlpick.go generated vendored Normal file
View File

@@ -0,0 +1,57 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/url"
"sync"
"github.com/coreos/etcd/pkg/types"
)
type urlPicker struct {
mu sync.Mutex // guards urls and picked
urls types.URLs
picked int
}
func newURLPicker(urls types.URLs) *urlPicker {
return &urlPicker{
urls: urls,
}
}
func (p *urlPicker) update(urls types.URLs) {
p.mu.Lock()
defer p.mu.Unlock()
p.urls = urls
p.picked = 0
}
func (p *urlPicker) pick() url.URL {
p.mu.Lock()
defer p.mu.Unlock()
return p.urls[p.picked]
}
// unreachable notices the picker that the given url is unreachable,
// and it should use other possible urls.
func (p *urlPicker) unreachable(u url.URL) {
p.mu.Lock()
defer p.mu.Unlock()
if u == p.urls[p.picked] {
p.picked = (p.picked + 1) % len(p.urls)
}
}

205
vendor/github.com/coreos/etcd/rafthttp/util.go generated vendored Normal file
View File

@@ -0,0 +1,205 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"crypto/tls"
"encoding/binary"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
)
var (
errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
errMemberNotFound = fmt.Errorf("member not found")
)
// NewListener returns a listener for raft message transfer between peers.
// It uses timeout listener to identify broken streams promptly.
func NewListener(u url.URL, tlscfg *tls.Config) (net.Listener, error) {
return transport.NewTimeoutListener(u.Host, u.Scheme, tlscfg, ConnReadTimeout, ConnWriteTimeout)
}
// NewRoundTripper returns a roundTripper used to send requests
// to rafthttp listener of remote peers.
func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
// It uses timeout transport to pair with remote timeout listeners.
// It sets no read/write timeout, because message in requests may
// take long time to write out before reading out the response.
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
}
// newStreamRoundTripper returns a roundTripper used to send stream requests
// to rafthttp listener of remote peers.
// Read/write timeout is set for stream roundTripper to promptly
// find out broken status, which minimizes the number of messages
// sent on broken connection.
func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
}
func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
size := ent.Size()
if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
return err
}
b, err := ent.Marshal()
if err != nil {
return err
}
_, err = w.Write(b)
return err
}
func readEntryFrom(r io.Reader, ent *raftpb.Entry) error {
var l uint64
if err := binary.Read(r, binary.BigEndian, &l); err != nil {
return err
}
buf := make([]byte, int(l))
if _, err := io.ReadFull(r, buf); err != nil {
return err
}
return ent.Unmarshal(buf)
}
// createPostRequest creates a HTTP POST request that sends raft message.
func createPostRequest(u url.URL, path string, body io.Reader, ct string, urls types.URLs, from, cid types.ID) *http.Request {
uu := u
uu.Path = path
req, err := http.NewRequest("POST", uu.String(), body)
if err != nil {
plog.Panicf("unexpected new request error (%v)", err)
}
req.Header.Set("Content-Type", ct)
req.Header.Set("X-Server-From", from.String())
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
setPeerURLsHeader(req, urls)
return req
}
// checkPostResponse checks the response of the HTTP POST request that sends
// raft message.
func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
switch resp.StatusCode {
case http.StatusPreconditionFailed:
switch strings.TrimSuffix(string(body), "\n") {
case errIncompatibleVersion.Error():
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
return errIncompatibleVersion
case errClusterIDMismatch.Error():
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
return errClusterIDMismatch
default:
return fmt.Errorf("unhandled error %q when precondition failed", string(body))
}
case http.StatusForbidden:
return errMemberRemoved
case http.StatusNoContent:
return nil
default:
return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
}
}
// reportCriticalError reports the given error through sending it into
// the given error channel.
// If the error channel is filled up when sending error, it drops the error
// because the fact that error has happened is reported, which is
// good enough.
func reportCriticalError(err error, errc chan<- error) {
select {
case errc <- err:
default:
}
}
// compareMajorMinorVersion returns an integer comparing two versions based on
// their major and minor version. The result will be 0 if a==b, -1 if a < b,
// and 1 if a > b.
func compareMajorMinorVersion(a, b *semver.Version) int {
na := &semver.Version{Major: a.Major, Minor: a.Minor}
nb := &semver.Version{Major: b.Major, Minor: b.Minor}
switch {
case na.LessThan(*nb):
return -1
case nb.LessThan(*na):
return 1
default:
return 0
}
}
// serverVersion returns the server version from the given header.
func serverVersion(h http.Header) *semver.Version {
verStr := h.Get("X-Server-Version")
// backward compatibility with etcd 2.0
if verStr == "" {
verStr = "2.0.0"
}
return semver.Must(semver.NewVersion(verStr))
}
// serverVersion returns the min cluster version from the given header.
func minClusterVersion(h http.Header) *semver.Version {
verStr := h.Get("X-Min-Cluster-Version")
// backward compatibility with etcd 2.0
if verStr == "" {
verStr = "2.0.0"
}
return semver.Must(semver.NewVersion(verStr))
}
// checkVersionCompability checks whether the given version is compatible
// with the local version.
func checkVersionCompability(name string, server, minCluster *semver.Version) error {
localServer := semver.Must(semver.NewVersion(version.Version))
localMinCluster := semver.Must(semver.NewVersion(version.MinClusterVersion))
if compareMajorMinorVersion(server, localMinCluster) == -1 {
return fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
}
if compareMajorMinorVersion(minCluster, localServer) == 1 {
return fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
}
return nil
}
// setPeerURLsHeader reports local urls for peer discovery
func setPeerURLsHeader(req *http.Request, urls types.URLs) {
if urls == nil {
// often not set in unit tests
return
}
var peerURLs []string
for _, url := range urls {
peerURLs = append(peerURLs, url.String())
}
req.Header.Set("X-PeerURLs", strings.Join(peerURLs, ","))
}