Update etcd client to 3.3.9

This commit is contained in:
Joe Betz
2018-10-01 16:53:57 -07:00
parent 5d0c19c261
commit 4263c75211
432 changed files with 44092 additions and 43584 deletions

View File

@@ -32,7 +32,6 @@ go_library(
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
"//vendor/github.com/google/btree:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
],
)

View File

@@ -147,8 +147,11 @@ func newBackend(bcfg BackendConfig) *backend {
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx{buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
readTx: &readTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
},
stopc: make(chan struct{}),
@@ -289,7 +292,7 @@ func (b *backend) Defrag() error {
func (b *backend) defrag() error {
now := time.Now()
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
@@ -345,7 +348,7 @@ func (b *backend) defrag() error {
plog.Fatalf("cannot begin tx (%s)", err)
}
b.readTx.buf.reset()
b.readTx.reset()
b.readTx.tx = b.unsafeBegin(false)
size := b.readTx.tx.Size()

View File

@@ -16,7 +16,6 @@ package backend
import (
"bytes"
"fmt"
"math"
"sync"
"sync/atomic"
@@ -81,36 +80,32 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
// UnsafeRange must be called holding the lock on the tx.
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
if err != nil {
plog.Fatal(err)
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
}
return k, v
return unsafeRange(bucket.Cursor(), key, endKey, limit)
}
func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
bucket := tx.Bucket(bucketName)
if bucket == nil {
return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
}
if len(endKey) == 0 {
if v := bucket.Get(key); v != nil {
return append(keys, key), append(vs, v), nil
}
return nil, nil, nil
}
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
if limit <= 0 {
limit = math.MaxInt64
}
c := bucket.Cursor()
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
var isMatch func(b []byte) bool
if len(endKey) > 0 {
isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
} else {
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
limit = 1
}
for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
if limit == int64(len(keys)) {
break
}
}
return keys, vs, nil
return keys, vs
}
// UnsafeDelete must be called holding the lock on the tx.
@@ -238,8 +233,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
if err := t.backend.readTx.tx.Rollback(); err != nil {
plog.Fatalf("cannot rollback tx (%s)", err)
}
t.backend.readTx.buf.reset()
t.backend.readTx.tx = nil
t.backend.readTx.reset()
}
t.batchTx.commit(stop)

View File

@@ -27,7 +27,8 @@ import (
// (https://github.com/torvalds/linux/releases/tag/v2.6.23), mmap might
// silently ignore this flag. Please update your kernel to prevent this.
var boltOpenOptions = &bolt.Options{
MmapFlags: syscall.MAP_POPULATE,
MmapFlags: syscall.MAP_POPULATE,
NoFreelistSync: true,
}
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }

View File

@@ -40,9 +40,10 @@ type readTx struct {
mu sync.RWMutex
buf txReadBuffer
// txmu protects accesses to the Tx on Range requests
txmu sync.Mutex
tx *bolt.Tx
// txmu protects accesses to buckets and tx on Range requests.
txmu sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
}
func (rt *readTx) Lock() { rt.mu.RLock() }
@@ -63,30 +64,57 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
if int64(len(keys)) == limit {
return keys, vals
}
// find/cache bucket
bn := string(bucketName)
rt.txmu.RLock()
bucket, ok := rt.buckets[bn]
rt.txmu.RUnlock()
if !ok {
rt.txmu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txmu.Unlock()
}
// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txmu.Lock()
// ignore error since bucket may have been created in this batch
k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)))
c := bucket.Cursor()
rt.txmu.Unlock()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
f1 := func(k, v []byte) error {
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return visitor(k, v)
return nil
}
f2 := func(k, v []byte) error {
visitNoDup := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
if err := rt.buf.ForEach(bucketName, f1); err != nil {
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txmu.Lock()
err := unsafeForEach(rt.tx, bucketName, f2)
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txmu.Unlock()
return err
if err != nil {
return err
}
return rt.buf.ForEach(bucketName, visitor)
}
func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[string]*bolt.Bucket)
rt.tx = nil
}

View File

@@ -24,10 +24,12 @@ import (
type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64) []revision
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
@@ -83,17 +85,8 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
return nil
}
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, nil
}
return [][]byte{key}, []revision{rev}
}
keyi := &keyIndex{key: key}
endi := &keyIndex{key: end}
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
ti.RLock()
defer ti.RUnlock()
@@ -102,16 +95,41 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
curKeyi := item.(*keyIndex)
rev, _, _, err := curKeyi.get(atRev)
if err != nil {
return true
}
revs = append(revs, rev)
keys = append(keys, curKeyi.key)
f(item.(*keyIndex))
return true
})
}
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil
}
return []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
if rev, _, _, err := ki.get(atRev); err == nil {
revs = append(revs, rev)
}
})
return revs
}
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, nil
}
return [][]byte{key}, []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
if rev, _, _, err := ki.get(atRev); err == nil {
revs = append(revs, rev)
keys = append(keys, ki.key)
}
})
return keys, revs
}
@@ -133,10 +151,11 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
// at or after the given rev. The returned slice is sorted in the order
// of revision.
func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
keyi := &keyIndex{key: key}
ti.RLock()
defer ti.RUnlock()
keyi := &keyIndex{key: key}
if end == nil {
item := ti.tree.Get(keyi)
if item == nil {
@@ -179,6 +198,19 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
return available
}
// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.keep(rev, available)
return true
})
return available
}
func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
@@ -190,16 +222,16 @@ func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyInd
}
}
func (a *treeIndex) Equal(bi index) bool {
func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)
if a.tree.Len() != b.tree.Len() {
if ti.tree.Len() != b.tree.Len() {
return false
}
equal := true
a.tree.Ascend(func(item btree.Item) bool {
ti.tree.Ascend(func(item btree.Item) bool {
aki := item.(*keyIndex)
bki := b.tree.Get(item).(*keyIndex)
if !aki.equal(bki) {

View File

@@ -46,7 +46,7 @@ var (
// rev except the largest one. If the generation becomes empty
// during compaction, it will be removed. if all the generations get
// removed, the keyIndex should be removed.
//
// For example:
// compact(2) on the previous example
// generations:
@@ -187,9 +187,44 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
// walk until reaching the first revision that has an revision smaller or equal to
// the atRev.
// add it to the available map
genIdx, revIndex := ki.doCompact(atRev, available)
g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove the previous contents.
if revIndex != -1 {
g.revs = g.revs[revIndex:]
}
// remove any tombstone
if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[0])
genIdx++
}
}
// remove the previous generations.
ki.generations = ki.generations[genIdx:]
}
// keep finds the revision to be kept if compact is called at given atRev.
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
return
}
genIdx, revIndex := ki.doCompact(atRev, available)
g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove any tombstone
if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[revIndex])
}
}
}
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
// walk until reaching the first revision smaller or equal to "atRev",
// and add the revision to the available map
f := func(rev revision) bool {
if rev.main <= atRev {
available[rev] = struct{}{}
@@ -198,30 +233,19 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
return true
}
i, g := 0, &ki.generations[0]
genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev
for i < len(ki.generations)-1 {
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
break
}
i++
g = &ki.generations[i]
genIdx++
g = &ki.generations[genIdx]
}
if !g.isEmpty() {
n := g.walk(f)
// remove the previous contents.
if n != -1 {
g.revs = g.revs[n:]
}
// remove any tombstone
if len(g.revs) == 1 && i != len(ki.generations)-1 {
delete(available, g.revs[0])
i++
}
}
// remove the previous generations.
ki.generations = ki.generations[i:]
revIndex = g.walk(f)
return genIdx, revIndex
}
func (ki *keyIndex) isEmpty() bool {

View File

@@ -107,10 +107,12 @@ type KV interface {
// Write creates a write transaction.
Write() TxnWrite
// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purposes.
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)

View File

@@ -15,10 +15,13 @@
package mvcc
import (
"context"
"encoding/binary"
"errors"
"hash/crc32"
"math"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/lease"
@@ -26,7 +29,6 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/schedule"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
var (
@@ -67,6 +69,10 @@ type store struct {
ReadView
WriteView
// consistentIndex caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex
@@ -159,6 +165,59 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
return h, s.currentRev, err
}
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
start := time.Now()
s.mu.RLock()
s.revMu.RLock()
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()
if rev > 0 && rev <= compactRev {
s.mu.RUnlock()
return 0, 0, compactRev, ErrCompacted
} else if rev > 0 && rev > currentRev {
s.mu.RUnlock()
return 0, currentRev, 0, ErrFutureRev
}
if rev == 0 {
rev = currentRev
}
keep := s.kvindex.Keep(rev)
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
s.mu.RUnlock()
upper := revision{main: rev + 1}
lower := revision{main: compactRev + 1}
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(keyBucketName)
err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
kr := bytesToRev(k)
if !upper.GreaterThan(kr) {
return nil
}
// skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(keep) > 0 {
if _, ok := keep[kr]; !ok {
return nil
}
}
h.Write(k)
h.Write(v)
return nil
})
hash = h.Sum32()
hashRevDurations.Observe(time.Since(start).Seconds())
return hash, currentRev, compactRev, err
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -238,6 +297,7 @@ func (s *store) Restore(b backend.Backend) error {
close(s.stopc)
s.fifoSched.Stop()
atomic.StoreUint64(&s.consistentIndex, 0)
s.b = b
s.kvindex = newTreeIndex()
s.currentRev = 1
@@ -407,29 +467,23 @@ func (s *store) Close() error {
return nil
}
func (a *store) Equal(b *store) bool {
if a.currentRev != b.currentRev {
return false
}
if a.compactMainRev != b.compactMainRev {
return false
}
return a.kvindex.Equal(b.kvindex)
}
func (s *store) saveIndex(tx backend.BatchTx) {
if s.ig == nil {
return
}
bs := s.bytesBuf8
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
ci := s.ig.ConsistentIndex()
binary.BigEndian.PutUint64(bs, ci)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
atomic.StoreUint64(&s.consistentIndex, ci)
}
func (s *store) ConsistentIndex() uint64 {
// TODO: cache index in a uint64 field?
if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
return ci
}
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
@@ -437,7 +491,9 @@ func (s *store) ConsistentIndex() uint64 {
if len(vs) == 0 {
return 0
}
return binary.BigEndian.Uint64(vs[0])
v := binary.BigEndian.Uint64(vs[0])
atomic.StoreUint64(&s.consistentIndex, v)
return v
}
// appendMarkTombstone appends tombstone mark to normal revision bytes.
@@ -452,16 +508,3 @@ func appendMarkTombstone(b []byte) []byte {
func isTombstone(b []byte) bool {
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
}
// revBytesRange returns the range of revision bytes at
// the given revision.
func revBytesRange(rev revision) (start, end []byte) {
start = newRevBytes()
revToBytes(rev, start)
end = newRevBytes()
endRev := revision{main: rev.main, sub: rev.sub + 1}
revToBytes(endRev, end)
return start, end
}

View File

@@ -22,6 +22,8 @@ import (
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
totalStart := time.Now()
defer dbCompactionTotalDurations.Observe(float64(time.Since(totalStart) / time.Millisecond))
keyCompactions := 0
defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
@@ -40,6 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
rev = bytesToRev(key)
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(keyBucketName, key)
keyCompactions++
}
}

View File

@@ -51,7 +51,7 @@ func (tr *storeTxnRead) End() {
}
type storeTxnWrite struct {
*storeTxnRead
storeTxnRead
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
@@ -63,7 +63,7 @@ func (s *store) Write() TxnWrite {
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: &storeTxnRead{s, tx, 0, 0},
storeTxnRead: storeTxnRead{s, tx, 0, 0},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
@@ -120,7 +120,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
_, revpairs := tr.s.kvindex.Range(key, end, int64(rev))
revpairs := tr.s.kvindex.Revisions(key, end, int64(rev))
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
}
@@ -128,22 +128,22 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
}
var kvs []mvccpb.KeyValue
for _, revpair := range revpairs {
start, end := revBytesRange(revpair)
_, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0)
limit := int(ro.Limit)
if limit <= 0 || limit > len(revpairs) {
limit = len(revpairs)
}
kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
if len(vs) != 1 {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
if err := kvs[i].Unmarshal(vs[0]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
kvs = append(kvs, kv)
if ro.Limit > 0 && len(kvs) >= int(ro.Limit) {
break
}
}
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}

View File

@@ -131,6 +131,14 @@ var (
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
})
dbCompactionKeysCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "db_compaction_keys_total",
Help: "Total number of db keys compacted.",
})
dbTotalSizeDebugging = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
@@ -186,6 +194,18 @@ var (
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
})
hashRevDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "hash_rev_duration_seconds",
Help: "The latency distribution of storage hash by revision operation.",
// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
})
)
func init() {
@@ -202,10 +222,12 @@ func init() {
prometheus.MustRegister(indexCompactionPauseDurations)
prometheus.MustRegister(dbCompactionPauseDurations)
prometheus.MustRegister(dbCompactionTotalDurations)
prometheus.MustRegister(dbCompactionKeysCounter)
prometheus.MustRegister(dbTotalSizeDebugging)
prometheus.MustRegister(dbTotalSize)
prometheus.MustRegister(dbTotalSizeInUse)
prometheus.MustRegister(hashDurations)
prometheus.MustRegister(hashRevDurations)
}
// ReportEventReceived reports that an event is received.

View File

@@ -50,18 +50,10 @@ func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int6
func (tw *metricsTxnWrite) End() {
defer tw.TxnWrite.End()
if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 {
if sum > 1 {
txnCounter.Inc()
}
return
}
switch {
case tw.ranges == 1:
rangeCounter.Inc()
case tw.puts == 1:
putCounter.Inc()
case tw.deletes == 1:
deleteCounter.Inc()
if sum := tw.ranges + tw.puts + tw.deletes; sum > 1 {
txnCounter.Inc()
}
rangeCounter.Add(float64(tw.ranges))
putCounter.Add(float64(tw.puts))
deleteCounter.Add(float64(tw.deletes))
}

View File

@@ -6,7 +6,10 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/github.com/coreos/etcd/mvcc/mvccpb",
importpath = "github.com/coreos/etcd/mvcc/mvccpb",
visibility = ["//visibility:public"],
deps = ["//vendor/github.com/golang/protobuf/proto:go_default_library"],
deps = [
"//vendor/github.com/gogo/protobuf/gogoproto:go_default_library",
"//vendor/github.com/golang/protobuf/proto:go_default_library",
],
)
filegroup(

View File

@@ -1,6 +1,5 @@
// Code generated by protoc-gen-gogo.
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: kv.proto
// DO NOT EDIT!
/*
Package mvccpb is a generated protocol buffer package.
@@ -21,6 +20,8 @@ import (
math "math"
_ "github.com/gogo/protobuf/gogoproto"
io "io"
)
@@ -198,24 +199,6 @@ func (m *Event) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
func encodeFixed64Kv(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
dAtA[offset+4] = uint8(v >> 32)
dAtA[offset+5] = uint8(v >> 40)
dAtA[offset+6] = uint8(v >> 48)
dAtA[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Kv(dAtA []byte, offset int, v uint32) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintKv(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)

View File

@@ -23,7 +23,8 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
)
const (
// non-const so modifiable by tests
var (
// chanBufLen is the length of the buffered chan
// for sending out watched events.
// TODO: find a good buf value. 1024 is just a random one that
@@ -143,7 +144,6 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
func (s *watchableStore) cancelWatcher(wa *watcher) {
for {
s.mu.Lock()
if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
break
@@ -151,6 +151,9 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
break
} else if wa.compacted {
break
} else if wa.ch == nil {
// already canceled (e.g., cancel/close race)
break
}
if !wa.victim {
@@ -176,6 +179,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
}
watcherGauge.Dec()
wa.ch = nil
s.mu.Unlock()
}
@@ -425,7 +429,6 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
if eb.revs != 1 {
plog.Panicf("unexpected multiple revisions in notification")
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {

View File

@@ -129,16 +129,25 @@ func (ws *watchStream) Chan() <-chan WatchResponse {
func (ws *watchStream) Cancel(id WatchID) error {
ws.mu.Lock()
cancel, ok := ws.cancels[id]
w := ws.watchers[id]
ok = ok && !ws.closed
if ok {
delete(ws.cancels, id)
delete(ws.watchers, id)
}
ws.mu.Unlock()
if !ok {
return ErrWatcherNotExist
}
cancel()
ws.mu.Lock()
// The watch isn't removed until cancel so that if Close() is called,
// it will wait for the cancel. Otherwise, Close() could close the
// watch channel while the store is still posting events.
if ww := ws.watchers[id]; ww == w {
delete(ws.cancels, id)
delete(ws.watchers, id)
}
ws.mu.Unlock()
return nil
}