Use latest etcd from release-3.3 branch for dropping ugorji

Pick up changes from:
https://github.com/etcd-io/etcd/pull/10675

Change-Id: Ic4d6daa3c54824d3d27809a125b798e88db0bf7e
This commit is contained in:
Davanum Srinivas
2019-04-22 20:40:57 -04:00
parent c3e8a434eb
commit 8824e0fcf7
68 changed files with 318 additions and 55083 deletions

View File

@@ -982,10 +982,23 @@ func (as *authStore) AuthInfoFromTLS(ctx context.Context) *AuthInfo {
cn := chain.Subject.CommonName
plog.Debugf("found common name %s", cn)
return &AuthInfo{
ai := &AuthInfo{
Username: cn,
Revision: as.Revision(),
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil
}
// gRPC-gateway proxy request to etcd server includes Grpcgateway-Accept
// header. The proxy uses etcd client server certificate. If the certificate
// has a CommonName we should never use this for authentication.
if gw := md["grpcgateway-accept"]; len(gw) > 0 {
plog.Warningf("ignoring common name in gRPC-gateway proxy request %s", ai.Username)
return nil
}
return ai
}
}

View File

@@ -11,7 +11,7 @@ go_library(
"curl.go",
"discover.go",
"doc.go",
"keys.generated.go",
"json.go",
"keys.go",
"members.go",
"util.go",
@@ -24,7 +24,8 @@ go_library(
"//vendor/github.com/coreos/etcd/pkg/srv:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/types:go_default_library",
"//vendor/github.com/coreos/etcd/version:go_default_library",
"//vendor/github.com/ugorji/go/codec:go_default_library",
"//vendor/github.com/json-iterator/go:go_default_library",
"//vendor/github.com/modern-go/reflect2:go_default_library",
],
)

72
vendor/github.com/coreos/etcd/client/json.go generated vendored Normal file
View File

@@ -0,0 +1,72 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"github.com/json-iterator/go"
"github.com/modern-go/reflect2"
"strconv"
"unsafe"
)
type customNumberExtension struct {
jsoniter.DummyExtension
}
func (cne *customNumberExtension) CreateDecoder(typ reflect2.Type) jsoniter.ValDecoder {
if typ.String() == "interface {}" {
return customNumberDecoder{}
}
return nil
}
type customNumberDecoder struct {
}
func (customNumberDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
switch iter.WhatIsNext() {
case jsoniter.NumberValue:
var number jsoniter.Number
iter.ReadVal(&number)
i64, err := strconv.ParseInt(string(number), 10, 64)
if err == nil {
*(*interface{})(ptr) = i64
return
}
f64, err := strconv.ParseFloat(string(number), 64)
if err == nil {
*(*interface{})(ptr) = f64
return
}
iter.ReportError("DecodeNumber", err.Error())
default:
*(*interface{})(ptr) = iter.Read()
}
}
// caseSensitiveJsonIterator returns a jsoniterator API that's configured to be
// case-sensitive when unmarshalling, and otherwise compatible with
// the encoding/json standard library.
func caseSensitiveJsonIterator() jsoniter.API {
config := jsoniter.Config{
EscapeHTML: true,
SortMapKeys: true,
ValidateJsonRawMessage: true,
CaseSensitive: true,
}.Froze()
// Force jsoniter to decode number to interface{} via int64/float64, if possible.
config.RegisterExtension(&customNumberExtension{})
return config
}

File diff suppressed because it is too large Load Diff

View File

@@ -14,8 +14,6 @@
package client
//go:generate codecgen -d 1819 -r "Node|Response|Nodes" -o keys.generated.go keys.go
import (
"context"
"encoding/json"
@@ -28,7 +26,6 @@ import (
"time"
"github.com/coreos/etcd/pkg/pathutil"
"github.com/ugorji/go/codec"
)
const (
@@ -656,9 +653,11 @@ func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Resp
return res, err
}
var jsonIterator = caseSensitiveJsonIterator()
func unmarshalSuccessfulKeysResponse(header http.Header, body []byte) (*Response, error) {
var res Response
err := codec.NewDecoderBytes(body, new(codec.JsonHandle)).Decode(&res)
err := jsonIterator.Unmarshal(body, &res)
if err != nil {
return nil, ErrInvalidJSON
}

View File

@@ -56,7 +56,7 @@ type Client struct {
cfg Config
creds *credentials.TransportCredentials
balancer *healthBalancer
mu *sync.Mutex
mu *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
@@ -110,11 +110,13 @@ func (c *Client) Close() error {
func (c *Client) Ctx() context.Context { return c.ctx }
// Endpoints lists the registered endpoints for the client.
func (c *Client) Endpoints() (eps []string) {
func (c *Client) Endpoints() []string {
c.mu.RLock()
defer c.mu.RUnlock()
// copy the slice; protect original endpoints from being changed
eps = make([]string, len(c.cfg.Endpoints))
eps := make([]string, len(c.cfg.Endpoints))
copy(eps, c.cfg.Endpoints)
return
return eps
}
// SetEndpoints updates client's endpoints.
@@ -387,7 +389,7 @@ func newClient(cfg *Config) (*Client, error) {
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
}
if cfg.Username != "" && cfg.Password != "" {

View File

@@ -29,19 +29,19 @@ import (
)
const (
pathMetrics = "/metrics"
PathMetrics = "/metrics"
PathHealth = "/health"
)
// HandleMetricsHealth registers metrics and health handlers.
func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(pathMetrics, promhttp.Handler())
mux.Handle(PathMetrics, promhttp.Handler())
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}
// HandlePrometheus registers prometheus handler on '/metrics'.
func HandlePrometheus(mux *http.ServeMux) {
mux.Handle(pathMetrics, promhttp.Handler())
mux.Handle(PathMetrics, promhttp.Handler())
}
// NewHealthHandler handles '/health' requests.

View File

@@ -344,7 +344,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
ok, exceed := r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v, to %x)", r.heartbeat, exceed, ms[i].To)
plog.Warningf("server is likely overloaded")
heartbeatSendFailures.Inc()
}
@@ -403,7 +403,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
},
)
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
plog.Fatalf("create wal error: %v", err)
plog.Panicf("create wal error: %v", err)
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {

View File

@@ -21,7 +21,7 @@ import (
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
totalStart := time.Now()
defer dbCompactionTotalDurations.Observe(float64(time.Since(totalStart) / time.Millisecond))
defer func() { dbCompactionTotalDurations.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
keyCompactions := 0
defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()

View File

@@ -99,9 +99,12 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
iv = c.cachedRanges.Find(ivl)
if iv == nil {
c.cachedRanges.Insert(ivl, []string{key})
val := map[string]struct{}{key: {}}
c.cachedRanges.Insert(ivl, val)
} else {
iv.Val = append(iv.Val.([]string), key)
val := iv.Val.(map[string]struct{})
val[key] = struct{}{}
iv.Val = val
}
}
@@ -141,8 +144,8 @@ func (c *cache) Invalidate(key, endkey []byte) {
ivs = c.cachedRanges.Stab(ivl)
for _, iv := range ivs {
keys := iv.Val.([]string)
for _, key := range keys {
keys := iv.Val.(map[string]struct{})
for key := range keys {
c.lru.Remove(key)
}
}

View File

@@ -14,7 +14,17 @@
package grpcproxy
import "github.com/prometheus/client_golang/prometheus"
import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strings"
"time"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/prometheus/client_golang/prometheus"
)
var (
watchersCoalescing = prometheus.NewGauge(prometheus.GaugeOpts{
@@ -56,3 +66,49 @@ func init() {
prometheus.MustRegister(cacheHits)
prometheus.MustRegister(cachedMisses)
}
// HandleMetrics performs a GET request against etcd endpoint and returns '/metrics'.
func HandleMetrics(mux *http.ServeMux, c *http.Client, eps []string) {
// random shuffle endpoints
r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
if len(eps) > 1 {
eps = shuffleEndpoints(r, eps)
}
pathMetrics := etcdhttp.PathMetrics
mux.HandleFunc(pathMetrics, func(w http.ResponseWriter, r *http.Request) {
target := fmt.Sprintf("%s%s", eps[0], pathMetrics)
if !strings.HasPrefix(target, "http") {
scheme := "http"
if r.TLS != nil {
scheme = "https"
}
target = fmt.Sprintf("%s://%s", scheme, target)
}
resp, err := c.Get(target)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
defer resp.Body.Close()
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
body, _ := ioutil.ReadAll(resp.Body)
fmt.Fprintf(w, "%s", body)
})
}
func shuffleEndpoints(r *rand.Rand, eps []string) []string {
// copied from Go 1.9<= rand.Rand.Perm
n := len(eps)
p := make([]int, n)
for i := 0; i < n; i++ {
j := r.Intn(i + 1)
p[i] = p[j]
p[j] = i
}
neps := make([]string, n)
for i, k := range p {
neps[i] = eps[k]
}
return neps
}

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.3.10"
Version = "3.3.13"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@@ -223,44 +223,16 @@ func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
}
func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, err := readWalNames(dirpath)
names, nameIndex, err := selectWALFiles(dirpath, snap)
if err != nil {
return nil, err
}
nameIndex, ok := searchIndex(names, snap.Index)
if !ok || !isValidSeq(names[nameIndex:]) {
return nil, ErrFileNotFound
rs, ls, closer, err := openWALFiles(dirpath, names, nameIndex, write)
if err != nil {
return nil, err
}
// open the wal files
rcs := make([]io.ReadCloser, 0)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
}
ls = append(ls, l)
rcs = append(rcs, l)
} else {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
}
ls = append(ls, nil)
rcs = append(rcs, rf)
}
rs = append(rs, rcs[len(rcs)-1])
}
closer := func() error { return closeAll(rcs...) }
// create a WAL ready for reading
w := &WAL{
dir: dirpath,
@@ -284,6 +256,52 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
return w, nil
}
func selectWALFiles(dirpath string, snap walpb.Snapshot) ([]string, int, error) {
names, err := readWalNames(dirpath)
if err != nil {
return nil, -1, err
}
nameIndex, ok := searchIndex(names, snap.Index)
if !ok || !isValidSeq(names[nameIndex:]) {
err = ErrFileNotFound
return nil, -1, err
}
return names, nameIndex, nil
}
func openWALFiles(dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
rcs := make([]io.ReadCloser, 0)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, nil, nil, err
}
ls = append(ls, l)
rcs = append(rcs, l)
} else {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, nil, nil, err
}
ls = append(ls, nil)
rcs = append(rcs, rf)
}
rs = append(rs, rcs[len(rcs)-1])
}
closer := func() error { return closeAll(rcs...) }
return rs, ls, closer, nil
}
// ReadAll reads out records of the current WAL.
// If opened in write mode, it must read out all records until EOF. Or an error
// will be returned.
@@ -398,6 +416,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return metadata, state, ents, err
}
// Verify reads through the given WAL and verifies that it is not corrupted.
// It creates a new decoder to read through the records of the given WAL.
// It does not conflict with any open WAL, but it is recommended not to
// call this function after opening the WAL for writing.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If the loaded snap doesn't match with the expected one, it will
// return error ErrSnapshotMismatch.
func Verify(walDir string, snap walpb.Snapshot) error {
var metadata []byte
var err error
var match bool
rec := &walpb.Record{}
names, nameIndex, err := selectWALFiles(walDir, snap)
if err != nil {
return err
}
// open wal files in read mode, so that there is no conflict
// when the same WAL is opened elsewhere in write mode
rs, _, closer, err := openWALFiles(walDir, names, nameIndex, false)
if err != nil {
return err
}
// create a new decoder from the readers on the WAL files
decoder := newDecoder(rs...)
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
switch rec.Type {
case metadataType:
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
return ErrMetadataConflict
}
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
// Current crc of decoder must match the crc of the record.
// We need not match 0 crc, since the decoder is a new one at this point.
if crc != 0 && rec.Validate(crc) != nil {
return ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
case snapshotType:
var loadedSnap walpb.Snapshot
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
if loadedSnap.Index == snap.Index {
if loadedSnap.Term != snap.Term {
return ErrSnapshotMismatch
}
match = true
}
// We ignore all entry and state type records as these
// are not necessary for validating the WAL contents
case entryType:
case stateType:
default:
return fmt.Errorf("unexpected block type %d", rec.Type)
}
}
if closer != nil {
closer()
}
// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
if !match {
return ErrSnapshotNotFound
}
return nil
}
// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.