Version bump to etcd v3.2.11

This commit is contained in:
Joe Betz
2017-12-12 16:20:42 -08:00
parent 4956e65d59
commit 05afd248f2
287 changed files with 25980 additions and 5220 deletions

View File

@@ -7,12 +7,16 @@ go_library(
"index.go",
"key_index.go",
"kv.go",
"kv_view.go",
"kvstore.go",
"kvstore_compaction.go",
"kvstore_txn.go",
"metrics.go",
"metrics_txn.go",
"revision.go",
"util.go",
"watchable_store.go",
"watchable_store_txn.go",
"watcher.go",
"watcher_group.go",
],

View File

@@ -5,19 +5,24 @@ go_library(
srcs = [
"backend.go",
"batch_tx.go",
"boltoption_default.go",
"config_default.go",
"doc.go",
"metrics.go",
"read_tx.go",
"tx_buffer.go",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"boltoption_linux.go",
"config_linux.go",
],
"@io_bazel_rules_go//go/platform:windows_amd64": [
"config_windows.go",
],
"//conditions:default": [],
}),
importpath = "github.com/coreos/etcd/mvcc/backend",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/boltdb/bolt:go_default_library",
"//vendor/github.com/coreos/bbolt:go_default_library",
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
],

View File

@@ -25,7 +25,7 @@ import (
"sync/atomic"
"time"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
"github.com/coreos/pkg/capnslog"
)
@@ -35,25 +35,21 @@ var (
defragLimit = 10000
// InitialMmapSize is the initial size of the mmapped region. Setting this larger than
// initialMmapSize is the initial size of the mmapped region. Setting this larger than
// the potential max db size can prevent writer from blocking reader.
// This only works for linux.
InitialMmapSize = int64(10 * 1024 * 1024 * 1024)
initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc/backend")
)
const (
// DefaultQuotaBytes is the number of bytes the backend Size may
// consume before exceeding the space quota.
DefaultQuotaBytes = int64(2 * 1024 * 1024 * 1024) // 2GB
// MaxQuotaBytes is the maximum number of bytes suggested for a backend
// quota. A larger quota may lead to degraded performance.
MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB
// minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
minSnapshotWarningTimeout = time.Duration(30 * time.Second)
)
type Backend interface {
ReadTx() ReadTx
BatchTx() BatchTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// Size returns the current size of the backend.
@@ -86,36 +82,71 @@ type backend struct {
batchInterval time.Duration
batchLimit int
batchTx *batchTx
batchTx *batchTxBuffered
readTx *readTx
stopc chan struct{}
donec chan struct{}
}
func New(path string, d time.Duration, limit int) Backend {
return newBackend(path, d, limit)
type BackendConfig struct {
// Path is the file path to the backend file.
Path string
// BatchInterval is the maximum time before flushing the BatchTx.
BatchInterval time.Duration
// BatchLimit is the maximum puts before flushing the BatchTx.
BatchLimit int
// MmapSize is the number of bytes to mmap for the backend.
MmapSize uint64
}
func DefaultBackendConfig() BackendConfig {
return BackendConfig{
BatchInterval: defaultBatchInterval,
BatchLimit: defaultBatchLimit,
MmapSize: initialMmapSize,
}
}
func New(bcfg BackendConfig) Backend {
return newBackend(bcfg)
}
func NewDefaultBackend(path string) Backend {
return newBackend(path, defaultBatchInterval, defaultBatchLimit)
bcfg := DefaultBackendConfig()
bcfg.Path = path
return newBackend(bcfg)
}
func newBackend(path string, d time.Duration, limit int) *backend {
db, err := bolt.Open(path, 0600, boltOpenOptions)
func newBackend(bcfg BackendConfig) *backend {
bopts := &bolt.Options{}
if boltOpenOptions != nil {
*bopts = *boltOpenOptions
}
bopts.InitialMmapSize = bcfg.mmapSize()
db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil {
plog.Panicf("cannot open database at %s (%v)", path, err)
plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
}
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend{
db: db,
batchInterval: d,
batchLimit: limit,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx{buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
}
b.batchTx = newBatchTx(b)
b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
}
@@ -127,6 +158,8 @@ func (b *backend) BatchTx() BatchTx {
return b.batchTx
}
func (b *backend) ReadTx() ReadTx { return b.readTx }
// ForceCommit forces the current batching tx to commit.
func (b *backend) ForceCommit() {
b.batchTx.Commit()
@@ -141,7 +174,33 @@ func (b *backend) Snapshot() Snapshot {
if err != nil {
plog.Fatalf("cannot begin tx (%s)", err)
}
return &snapshot{tx}
stopc, donec := make(chan struct{}), make(chan struct{})
dbBytes := tx.Size()
go func() {
defer close(donec)
// sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
// assuming a min tcp throughput of 100MB/s.
var sendRateBytes int64 = 100 * 1024 * 1014
warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
if warningTimeout < minSnapshotWarningTimeout {
warningTimeout = minSnapshotWarningTimeout
}
start := time.Now()
ticker := time.NewTicker(warningTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
case <-stopc:
snapshotDurations.Observe(time.Since(start).Seconds())
return
}
}
}()
return &snapshot{tx, stopc, donec}
}
type IgnoreKey struct {
@@ -235,7 +294,11 @@ func (b *backend) defrag() error {
b.mu.Lock()
defer b.mu.Unlock()
b.batchTx.commit(true)
// block concurrent read requests while resetting tx
b.readTx.mu.Lock()
defer b.readTx.mu.Unlock()
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
@@ -276,6 +339,10 @@ func (b *backend) defrag() error {
plog.Fatalf("cannot begin tx (%s)", err)
}
b.readTx.buf.reset()
b.readTx.tx = b.unsafeBegin(false)
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
return nil
}
@@ -331,6 +398,22 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return tmptx.Commit()
}
func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
b.mu.RUnlock()
atomic.StoreInt64(&b.size, tx.Size())
return tx
}
func (b *backend) unsafeBegin(write bool) *bolt.Tx {
tx, err := b.db.Begin(write)
if err != nil {
plog.Fatalf("cannot begin tx (%s)", err)
}
return tx
}
// NewTmpBackend creates a backend implementation for testing.
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
@@ -338,7 +421,9 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
plog.Fatal(err)
}
tmpPath := filepath.Join(dir, "database")
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
bcfg := DefaultBackendConfig()
bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
return newBackend(bcfg), tmpPath
}
func NewDefaultTmpBackend() (*backend, string) {
@@ -347,6 +432,12 @@ func NewDefaultTmpBackend() (*backend, string) {
type snapshot struct {
*bolt.Tx
stopc chan struct{}
donec chan struct{}
}
func (s *snapshot) Close() error { return s.Tx.Rollback() }
func (s *snapshot) Close() error {
close(s.stopc)
<-s.donec
return s.Tx.Rollback()
}

View File

@@ -16,23 +16,24 @@ package backend
import (
"bytes"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
)
type BatchTx interface {
Lock()
Unlock()
ReadTx
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeDelete(bucketName []byte, key []byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
}
@@ -40,13 +41,8 @@ type batchTx struct {
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
}
func newBatchTx(backend *backend) *batchTx {
tx := &batchTx{backend: backend}
tx.Commit()
return tx
pending int
}
func (t *batchTx) UnsafeCreateBucket(name []byte) {
@@ -84,30 +80,37 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
}
// UnsafeRange must be called holding the lock on the tx.
func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
bucket := t.tx.Bucket(bucketName)
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
if err != nil {
plog.Fatal(err)
}
return k, v
}
func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
bucket := tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
}
if len(endKey) == 0 {
if v := bucket.Get(key); v == nil {
return keys, vs
} else {
return append(keys, key), append(vs, v)
if v := bucket.Get(key); v != nil {
return append(keys, key), append(vs, v), nil
}
return nil, nil, nil
}
if limit <= 0 {
limit = math.MaxInt64
}
c := bucket.Cursor()
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
if limit > 0 && limit == int64(len(keys)) {
if limit == int64(len(keys)) {
break
}
}
return keys, vs
return keys, vs, nil
}
// UnsafeDelete must be called holding the lock on the tx.
@@ -125,12 +128,14 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
// UnsafeForEach must be called holding the lock on the tx.
func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
b := t.tx.Bucket(bucketName)
if b == nil {
// bucket does not exist
return nil
return unsafeForEach(t.tx, bucketName, visitor)
}
func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
if b := tx.Bucket(bucket); b != nil {
return b.ForEach(visitor)
}
return b.ForEach(visitor)
return nil
}
// Commit commits a previous tx and begins a new writable one.
@@ -140,7 +145,7 @@ func (t *batchTx) Commit() {
t.commit(false)
}
// CommitAndStop commits the previous tx and do not create a new one.
// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
@@ -150,37 +155,28 @@ func (t *batchTx) CommitAndStop() {
func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
t.pending = 0
}
t.Mutex.Unlock()
}
func (t *batchTx) commit(stop bool) {
var err error
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
// batchTx.commit(true) calls *bolt.Tx.Commit, which
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
// and subsequent *bolt.Tx.Size() call panics.
//
// This nil pointer reference panic happens when:
// 1. batchTx.commit(false) from newBatchTx
// 2. batchTx.commit(true) from stopping backend
// 3. batchTx.commit(false) from inflight mvcc Hash call
//
// Check if db is nil to prevent this panic
if t.tx.DB() != nil {
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}
// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
// Server must make sure 'batchTx.commit(false)' does not follow
// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
atomic.StoreInt64(&t.backend.size, t.tx.Size())
return
}
start := time.Now()
// gofail: var beforeCommit struct{}
err = t.tx.Commit()
err := t.tx.Commit()
// gofail: var afterCommit struct{}
commitDurations.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)
@@ -190,17 +186,81 @@ func (t *batchTx) commit(stop bool) {
plog.Fatalf("cannot commit tx (%s)", err)
}
}
if stop {
return
if !stop {
t.tx = t.backend.begin(true)
}
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
// begin a new tx
t.tx, err = t.backend.db.Begin(true)
if err != nil {
plog.Fatalf("cannot begin tx (%s)", err)
}
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}
type batchTxBuffered struct {
batchTx
buf txWriteBuffer
}
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
tx := &batchTxBuffered{
batchTx: batchTx{backend: backend},
buf: txWriteBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
seq: true,
},
}
tx.Commit()
return tx
}
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.mu.Lock()
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.mu.Unlock()
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
}
t.batchTx.Unlock()
}
func (t *batchTxBuffered) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
}
func (t *batchTxBuffered) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
}
func (t *batchTxBuffered) commit(stop bool) {
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.mu.Lock()
defer t.backend.readTx.mu.Unlock()
t.unsafeCommit(stop)
}
func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
plog.Fatalf("cannot rollback tx (%s)", err)
}
t.backend.readTx.buf.reset()
t.backend.readTx.tx = nil
}
t.batchTx.commit(stop)
if !stop {
t.backend.readTx.tx = t.backend.begin(false)
}
}
func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
}
func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucketName, key, value)
t.buf.putSeq(bucketName, key, value)
}

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !linux
// +build !linux,!windows
package backend
import "github.com/boltdb/bolt"
import bolt "github.com/coreos/bbolt"
var boltOpenOptions *bolt.Options = nil
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }

View File

@@ -17,7 +17,7 @@ package backend
import (
"syscall"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
)
// syscall.MAP_POPULATE on linux 2.6.23+ does sequential read-ahead
@@ -27,6 +27,7 @@ import (
// (https://github.com/torvalds/linux/releases/tag/v2.6.23), mmap might
// silently ignore this flag. Please update your kernel to prevent this.
var boltOpenOptions = &bolt.Options{
MmapFlags: syscall.MAP_POPULATE,
InitialMmapSize: int(InitialMmapSize),
MmapFlags: syscall.MAP_POPULATE,
}
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }

View File

@@ -0,0 +1,26 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build windows
package backend
import bolt "github.com/coreos/bbolt"
var boltOpenOptions *bolt.Options = nil
// setting mmap size != 0 on windows will allocate the entire
// mmap size for the file, instead of growing it. So, force 0.
func (bcfg *BackendConfig) mmapSize() int { return 0 }

View File

@@ -24,8 +24,18 @@ var (
Help: "The latency distributions of commit called by backend.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "backend_snapshot_duration_seconds",
Help: "The latency distribution of backend snapshots.",
// 10 ms -> 655 seconds
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
})
)
func init() {
prometheus.MustRegister(commitDurations)
prometheus.MustRegister(snapshotDurations)
}

92
vendor/github.com/coreos/etcd/mvcc/backend/read_tx.go generated vendored Normal file
View File

@@ -0,0 +1,92 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"bytes"
"math"
"sync"
bolt "github.com/coreos/bbolt"
)
// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// is known to never overwrite any key so range is safe.
var safeRangeBucket = []byte("key")
type ReadTx interface {
Lock()
Unlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
}
type readTx struct {
// mu protects accesses to the txReadBuffer
mu sync.RWMutex
buf txReadBuffer
// txmu protects accesses to the Tx on Range requests
txmu sync.Mutex
tx *bolt.Tx
}
func (rt *readTx) Lock() { rt.mu.RLock() }
func (rt *readTx) Unlock() { rt.mu.RUnlock() }
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
}
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
rt.txmu.Lock()
// ignore error since bucket may have been created in this batch
k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)))
rt.txmu.Unlock()
return append(k2, keys...), append(v2, vals...)
}
func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
f1 := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return visitor(k, v)
}
f2 := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
if err := rt.buf.ForEach(bucketName, f1); err != nil {
return err
}
rt.txmu.Lock()
err := unsafeForEach(rt.tx, bucketName, f2)
rt.txmu.Unlock()
return err
}

181
vendor/github.com/coreos/etcd/mvcc/backend/tx_buffer.go generated vendored Normal file
View File

@@ -0,0 +1,181 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"bytes"
"sort"
)
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[string]*bucketBuffer
}
func (txb *txBuffer) reset() {
for k, v := range txb.buckets {
if v.used == 0 {
// demote
delete(txb.buckets, k)
}
v.used = 0
}
}
// txWriteBuffer buffers writes of pending updates that have not yet committed.
type txWriteBuffer struct {
txBuffer
seq bool
}
func (txw *txWriteBuffer) put(bucket, k, v []byte) {
txw.seq = false
txw.putSeq(bucket, k, v)
}
func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
b, ok := txw.buckets[string(bucket)]
if !ok {
b = newBucketBuffer()
txw.buckets[string(bucket)] = b
}
b.add(k, v)
}
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
for k, wb := range txw.buckets {
rb, ok := txr.buckets[k]
if !ok {
delete(txw.buckets, k)
txr.buckets[k] = wb
continue
}
if !txw.seq && wb.used > 1 {
// assume no duplicate keys
sort.Sort(wb)
}
rb.merge(wb)
}
txw.reset()
}
// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }
func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
return b.Range(key, endKey, limit)
}
return nil, nil
}
func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
if b := txr.buckets[string(bucketName)]; b != nil {
return b.ForEach(visitor)
}
return nil
}
type kv struct {
key []byte
val []byte
}
// bucketBuffer buffers key-value pairs that are pending commit.
type bucketBuffer struct {
buf []kv
// used tracks number of elements in use so buf can be reused without reallocation.
used int
}
func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
}
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
idx := sort.Search(bb.used, f)
if idx < 0 {
return nil, nil
}
if len(endKey) == 0 {
if bytes.Equal(key, bb.buf[idx].key) {
keys = append(keys, bb.buf[idx].key)
vals = append(vals, bb.buf[idx].val)
}
return keys, vals
}
if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
return nil, nil
}
for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
break
}
keys = append(keys, bb.buf[i].key)
vals = append(vals, bb.buf[i].val)
}
return keys, vals
}
func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
for i := 0; i < bb.used; i++ {
if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
return err
}
}
return nil
}
func (bb *bucketBuffer) add(k, v []byte) {
bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
bb.used++
if bb.used == len(bb.buf) {
buf := make([]kv, (3*len(bb.buf))/2)
copy(buf, bb.buf)
bb.buf = buf
}
}
// merge merges data from bb into bbsrc.
func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
for i := 0; i < bbsrc.used; i++ {
bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
}
if bb.used == bbsrc.used {
return
}
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
return
}
sort.Stable(bb)
// remove duplicates, using only newest update
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
widx++
}
bb.buf[widx] = bb.buf[ridx]
}
bb.used = widx + 1
}
func (bb *bucketBuffer) Len() int { return bb.used }
func (bb *bucketBuffer) Less(i, j int) bool {
return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
}
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }

View File

@@ -29,7 +29,9 @@ type index interface {
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
KeyIndex(ki *keyIndex) *keyIndex
}
type treeIndex struct {
@@ -60,18 +62,27 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}
ti.RLock()
defer ti.RUnlock()
item := ti.tree.Get(keyi)
if item == nil {
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
keyi = item.(*keyIndex)
return keyi.get(atRev)
}
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
ti.RLock()
defer ti.RUnlock()
return ti.keyIndex(keyi)
}
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if item := ti.tree.Get(keyi); item != nil {
return item.(*keyIndex)
}
return nil
}
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)

View File

@@ -222,7 +222,6 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
}
// remove the previous generations.
ki.generations = ki.generations[i:]
return
}
func (ki *keyIndex) isEmpty() bool {

View File

@@ -32,15 +32,15 @@ type RangeResult struct {
Count int
}
type KV interface {
// Rev returns the current revision of the KV.
Rev() int64
// FirstRev returns the first revision of the KV.
type ReadView interface {
// FirstRev returns the first KV revision at the time of opening the txn.
// After a compaction, the first revision increases to the compaction
// revision.
FirstRev() int64
// Rev returns the revision of the KV at the time of opening the txn.
Rev() int64
// Range gets the keys in the range at rangeRev.
// The returned rev is the current revision of the KV when the operation is executed.
// If rangeRev <=0, range gets the keys at currentRev.
@@ -50,14 +50,17 @@ type KV interface {
// Limit limits the number of keys returned.
// If the required rev is compacted, ErrCompacted will be returned.
Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
// TxnRead represents a read-only transaction with operations that will not
// block other read transactions.
type TxnRead interface {
ReadView
// End marks the transaction is complete and ready to commit.
End()
}
type WriteView interface {
// DeleteRange deletes the given range from the store.
// A deleteRange increases the rev of the store if any key in the range exists.
// The number of key deleted will be returned.
@@ -67,26 +70,51 @@ type KV interface {
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, rev int64)
// TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked
// until txn ends. Only one on-going txn is allowed.
// TxnBegin returns an int64 txn ID.
// All txn prefixed operations with same txn ID will be done with the same rev.
TxnBegin() int64
// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
TxnEnd(txnID int64) error
// TxnRange returns the current revision of the KV when the operation is executed.
TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}
// TxnWrite represents a transaction that can modify the store.
type TxnWrite interface {
TxnRead
WriteView
// Changes gets the changes made since opening the write txn.
Changes() []mvccpb.KeyValue
}
// txnReadWrite coerces a read txn to a write, panicking on any write operation.
type txnReadWrite struct{ TxnRead }
func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
panic("unexpected Put")
}
func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
type KV interface {
ReadView
WriteView
// Read creates a read transaction.
Read() TxnRead
// Write creates a write transaction.
Write() TxnWrite
// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purposes.
Hash() (hash uint32, revision int64, err error)
// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)
// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purpose.
Hash() (hash uint32, revision int64, err error)
// Commit commits txns into the underlying backend.
// Commit commits outstanding txns into the underlying backend.
Commit()
// Restore restores the KV store from a backend.

53
vendor/github.com/coreos/etcd/mvcc/kv_view.go generated vendored Normal file
View File

@@ -0,0 +1,53 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/lease"
)
type readView struct{ kv KV }
func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read()
defer tr.End()
return tr.FirstRev()
}
func (rv *readView) Rev() int64 {
tr := rv.kv.Read()
defer tr.End()
return tr.Rev()
}
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read()
defer tr.End()
return tr.Range(key, end, ro)
}
type writeView struct{ kv KV }
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
tw := wv.kv.Write()
defer tw.End()
return tw.DeleteRange(key, end)
}
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write()
defer tw.End()
return tw.Put(key, value, lease)
}

View File

@@ -18,7 +18,6 @@ import (
"encoding/binary"
"errors"
"math"
"math/rand"
"sync"
"time"
@@ -34,25 +33,29 @@ var (
keyBucketName = []byte("key")
metaBucketName = []byte("meta")
consistentIndexKeyName = []byte("consistent_index")
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")
ErrCompacted = errors.New("mvcc: required revision has been compacted")
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
ErrCanceled = errors.New("mvcc: watcher is canceled")
ErrClosed = errors.New("mvcc: closed")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
)
const (
// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'
consistentIndexKeyName = []byte("consistent_index")
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")
ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch")
ErrCompacted = errors.New("mvcc: required revision has been compacted")
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
ErrCanceled = errors.New("mvcc: watcher is canceled")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
)
var restoreChunkKeys = 10000 // non-const for testing
// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
@@ -61,7 +64,11 @@ type ConsistentIndexGetter interface {
}
type store struct {
mu sync.Mutex // guards the following
ReadView
WriteView
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex
ig ConsistentIndexGetter
@@ -70,19 +77,19 @@ type store struct {
le lease.Lessor
currentRev revision
// the main revision of the last compaction
// revMuLock protects currentRev and compactMainRev.
// Locked at end of write txn and released after write txn unlock lock.
// Locked before locking read txn and released after locking.
revMu sync.RWMutex
// currentRev is the revision of the last completed transaction.
currentRev int64
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
tx backend.BatchTx
txnID int64 // tracks the current txnID to verify txn operations
txnModify bool
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
changes []mvccpb.KeyValue
fifoSched schedule.Scheduler
stopc chan struct{}
@@ -98,17 +105,18 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
le: le,
currentRev: revision{main: 1},
currentRev: 1,
compactMainRev: -1,
bytesBuf8: make([]byte, 8, 8),
bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),
stopc: make(chan struct{}),
}
s.ReadView = &readView{s}
s.WriteView = &writeView{s}
if s.le != nil {
s.le.SetRangeDeleter(s)
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
tx := s.b.BatchTx()
@@ -126,140 +134,6 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
return s
}
func (s *store) Rev() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.currentRev.main
}
func (s *store) FirstRev() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.compactMainRev
}
func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
id := s.TxnBegin()
s.put(key, value, lease)
s.txnEnd(id)
putCounter.Inc()
return int64(s.currentRev.main)
}
func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
id := s.TxnBegin()
kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
s.txnEnd(id)
rangeCounter.Inc()
r = &RangeResult{
KVs: kvs,
Count: count,
Rev: rev,
}
return r, err
}
func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
id := s.TxnBegin()
n = s.deleteRange(key, end)
s.txnEnd(id)
deleteCounter.Inc()
return n, int64(s.currentRev.main)
}
func (s *store) TxnBegin() int64 {
s.mu.Lock()
s.currentRev.sub = 0
s.tx = s.b.BatchTx()
s.tx.Lock()
s.txnID = rand.Int63()
return s.txnID
}
func (s *store) TxnEnd(txnID int64) error {
err := s.txnEnd(txnID)
if err != nil {
return err
}
txnCounter.Inc()
return nil
}
// txnEnd is used for unlocking an internal txn. It does
// not increase the txnCounter.
func (s *store) txnEnd(txnID int64) error {
if txnID != s.txnID {
return ErrTxnIDMismatch
}
// only update index if the txn modifies the mvcc state.
// read only txn might execute with one write txn concurrently,
// it should not write its index to mvcc.
if s.txnModify {
s.saveIndex()
}
s.txnModify = false
s.tx.Unlock()
if s.currentRev.sub != 0 {
s.currentRev.main += 1
}
s.currentRev.sub = 0
dbTotalSize.Set(float64(s.b.Size()))
s.mu.Unlock()
return nil
}
func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
if txnID != s.txnID {
return nil, ErrTxnIDMismatch
}
kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
r = &RangeResult{
KVs: kvs,
Count: count,
Rev: rev,
}
return r, err
}
func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
if txnID != s.txnID {
return 0, ErrTxnIDMismatch
}
s.put(key, value, lease)
return int64(s.currentRev.main + 1), nil
}
func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
if txnID != s.txnID {
return 0, 0, ErrTxnIDMismatch
}
n = s.deleteRange(key, end)
if n != 0 || s.currentRev.sub != 0 {
rev = int64(s.currentRev.main + 1)
} else {
rev = int64(s.currentRev.main)
}
return n, rev, nil
}
func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
if ctx == nil || ctx.Err() != nil {
s.mu.Lock()
@@ -275,16 +149,25 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
close(ch)
}
func (s *store) Hash() (hash uint32, revision int64, err error) {
s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)
return h, s.currentRev, err
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.revMu.Lock()
defer s.revMu.Unlock()
if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f)
return ch, ErrCompacted
}
if rev > s.currentRev.main {
if rev > s.currentRev {
return nil, ErrFutureRev
}
@@ -333,24 +216,14 @@ func init() {
}
}
func (s *store) Hash() (uint32, int64, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)
rev := s.currentRev.main
return h, rev, err
}
func (s *store) Commit() {
s.mu.Lock()
defer s.mu.Unlock()
s.tx = s.b.BatchTx()
s.tx.Lock()
s.saveIndex()
s.tx.Unlock()
tx := s.b.BatchTx()
tx.Lock()
s.saveIndex(tx)
tx.Unlock()
s.b.ForceCommit()
}
@@ -363,10 +236,8 @@ func (s *store) Restore(b backend.Backend) error {
s.b = b
s.kvindex = newTreeIndex()
s.currentRev = revision{main: 1}
s.currentRev = 1
s.compactMainRev = -1
s.tx = b.BatchTx()
s.txnID = -1
s.fifoSched = schedule.NewFIFOScheduler()
s.stopc = make(chan struct{})
@@ -374,75 +245,63 @@ func (s *store) Restore(b backend.Backend) error {
}
func (s *store) restore() error {
reportDbTotalSizeInBytesMu.Lock()
b := s.b
reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
reportDbTotalSizeInBytesMu.Unlock()
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
keyToLease := make(map[string]lease.LeaseID)
// use an unordered map to hold the temp index data to speed up
// the initial key index recovery.
// we will convert this unordered map into the tree index later.
unordered := make(map[string]*keyIndex, 100000)
// restore index
tx := s.b.BatchTx()
tx.Lock()
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
plog.Printf("restore compact to %d", s.compactMainRev)
}
// TODO: limit N to reduce max memory usage
keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
for i, key := range keys {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vals[i]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
rev := bytesToRev(key[:revBytesLen])
// restore index
switch {
case isTombstone(key):
if ki, ok := unordered[string(kv.Key)]; ok {
ki.tombstone(rev.main, rev.sub)
}
delete(keyToLease, string(kv.Key))
default:
ki, ok := unordered[string(kv.Key)]
if ok {
ki.put(rev.main, rev.sub)
} else {
ki = &keyIndex{key: kv.Key}
ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
unordered[string(kv.Key)] = ki
}
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[string(kv.Key)] = lid
} else {
delete(keyToLease, string(kv.Key))
}
}
// update revision
s.currentRev = rev
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
}
// restore the tree index from the unordered index.
for _, v := range unordered {
s.kvindex.Insert(v)
// index keys concurrently as they're loaded in from tx
keysGauge.Set(0)
rkvc, revc := restoreIntoIndex(s.kvindex)
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
// rkvc blocks if the total pending keys exceeds the restore
// chunk size to keep keys from consuming too much memory.
restoreChunk(rkvc, keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
// partial set implies final set
break
}
// next set begins after where this one ended
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}
close(rkvc)
s.currentRev = <-revc
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev.main < s.compactMainRev {
s.currentRev.main = s.compactMainRev
if s.currentRev < s.compactMainRev {
s.currentRev = s.compactMainRev
}
if scheduledCompact <= s.compactMainRev {
scheduledCompact = 0
}
for key, lid := range keyToLease {
@@ -455,15 +314,6 @@ func (s *store) restore() error {
}
}
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
if scheduledCompact <= s.compactMainRev {
scheduledCompact = 0
}
}
tx.Unlock()
if scheduledCompact != 0 {
@@ -474,6 +324,75 @@ func (s *store) restore() error {
return nil
}
type revKeyValue struct {
key []byte
kv mvccpb.KeyValue
kstr string
}
func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
go func() {
currentRev := int64(1)
defer func() { revc <- currentRev }()
// restore the tree index from streaming the unordered index.
kiCache := make(map[string]*keyIndex, restoreChunkKeys)
for rkv := range rkvc {
ki, ok := kiCache[rkv.kstr]
// purge kiCache if many keys but still missing in the cache
if !ok && len(kiCache) >= restoreChunkKeys {
i := 10
for k := range kiCache {
delete(kiCache, k)
if i--; i == 0 {
break
}
}
}
// cache miss, fetch from tree index if there
if !ok {
ki = &keyIndex{key: rkv.kv.Key}
if idxKey := idx.KeyIndex(ki); idxKey != nil {
kiCache[rkv.kstr], ki = idxKey, idxKey
ok = true
}
}
rev := bytesToRev(rkv.key)
currentRev = rev.main
if ok {
if isTombstone(rkv.key) {
ki.tombstone(rev.main, rev.sub)
continue
}
ki.put(rev.main, rev.sub)
} else if !isTombstone(rkv.key) {
ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
idx.Insert(ki)
kiCache[rkv.kstr] = ki
}
}
}()
return rkvc, revc
}
func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
for i, key := range keys {
rkv := revKeyValue{key: key}
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
rkv.kstr = string(rkv.kv.Key)
if isTombstone(key) {
delete(keyToLease, rkv.kstr)
} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
keyToLease[rkv.kstr] = lid
} else {
delete(keyToLease, rkv.kstr)
}
kvc <- rkv
}
}
func (s *store) Close() error {
close(s.stopc)
s.fifoSched.Stop()
@@ -490,180 +409,10 @@ func (a *store) Equal(b *store) bool {
return a.kvindex.Equal(b.kvindex)
}
// range is a keyword in Go, add Keys suffix.
func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) {
curRev = int64(s.currentRev.main)
if s.currentRev.sub > 0 {
curRev += 1
}
if rangeRev > curRev {
return nil, -1, s.currentRev.main, ErrFutureRev
}
var rev int64
if rangeRev <= 0 {
rev = curRev
} else {
rev = rangeRev
}
if rev < s.compactMainRev {
return nil, -1, 0, ErrCompacted
}
_, revpairs := s.kvindex.Range(key, end, int64(rev))
if len(revpairs) == 0 {
return nil, 0, curRev, nil
}
if countOnly {
return nil, len(revpairs), curRev, nil
}
for _, revpair := range revpairs {
start, end := revBytesRange(revpair)
_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
if len(vs) != 1 {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
kvs = append(kvs, kv)
if limit > 0 && len(kvs) >= int(limit) {
break
}
}
return kvs, len(revpairs), curRev, nil
}
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
s.txnModify = true
rev := s.currentRev.main + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
if oldLease != lease.NoLease {
if s.le == nil {
panic("no lessor to detach lease")
}
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
if leaseID != lease.NoLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
}
func (s *store) deleteRange(key, end []byte) int64 {
s.txnModify = true
rrev := s.currentRev.main
if s.currentRev.sub > 0 {
rrev += 1
}
keys, revs := s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for i, key := range keys {
s.delete(key, revs[i])
}
return int64(len(keys))
}
func (s *store) delete(key []byte, rev revision) {
mainrev := s.currentRev.main + 1
ibytes := newRevBytes()
revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
ibytes = appendMarkTombstone(ibytes)
kv := mvccpb.KeyValue{
Key: key,
}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
if err != nil {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
item := lease.LeaseItem{Key: string(key)}
leaseID := s.le.GetLease(item)
if leaseID != lease.NoLease {
err = s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
plog.Errorf("cannot detach %v", err)
}
}
}
func (s *store) getChanges() []mvccpb.KeyValue {
changes := s.changes
s.changes = make([]mvccpb.KeyValue, 0, 4)
return changes
}
func (s *store) saveIndex() {
func (s *store) saveIndex(tx backend.BatchTx) {
if s.ig == nil {
return
}
tx := s.tx
bs := s.bytesBuf8
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
// put the index into the underlying backend

253
vendor/github.com/coreos/etcd/mvcc/kvstore_txn.go generated vendored Normal file
View File

@@ -0,0 +1,253 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
)
type storeTxnRead struct {
s *store
tx backend.ReadTx
firstRev int64
rev int64
}
func (s *store) Read() TxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
}
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(key, end, tr.Rev(), ro)
}
func (tr *storeTxnRead) End() {
tr.tx.Unlock()
tr.s.mu.RUnlock()
}
type storeTxnWrite struct {
*storeTxnRead
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
changes []mvccpb.KeyValue
}
func (s *store) Write() TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: &storeTxnRead{s, tx, 0, 0},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
rev := tw.beginRev
if len(tw.changes) > 0 {
rev++
}
return tw.rangeKeys(key, end, rev, ro)
}
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
return n, int64(tw.beginRev + 1)
}
return 0, int64(tw.beginRev)
}
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
return int64(tw.beginRev + 1)
}
func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
tw.s.saveIndex(tw.tx)
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
rev := ro.Rev
if rev > curRev {
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
}
if rev <= 0 {
rev = curRev
}
if rev < tr.s.compactMainRev {
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
_, revpairs := tr.s.kvindex.Range(key, end, int64(rev))
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
}
if ro.Count {
return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
}
var kvs []mvccpb.KeyValue
for _, revpair := range revpairs {
start, end := revBytesRange(revpair)
_, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0)
if len(vs) != 1 {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
kvs = append(kvs, kv)
if ro.Limit > 0 && len(kvs) >= int(ro.Limit) {
break
}
}
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
}
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
rrev := tw.beginRev
if len(tw.changes) > 0 {
rrev += 1
}
keys, revs := tw.s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for i, key := range keys {
tw.delete(key, revs[i])
}
return int64(len(keys))
}
func (tw *storeTxnWrite) delete(key []byte, rev revision) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(ibytes)
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
tw.changes = append(tw.changes, kv)
item := lease.LeaseItem{Key: string(key)}
leaseID := tw.s.le.GetLease(item)
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
plog.Errorf("cannot detach %v", err)
}
}
}
func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }

View File

@@ -15,6 +15,8 @@
package mvcc
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
@@ -129,12 +131,21 @@ var (
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
})
dbTotalSize = prometheus.NewGauge(prometheus.GaugeOpts{
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "db_total_size_in_bytes",
Help: "Total size of the underlying database in bytes.",
})
},
func() float64 {
reportDbTotalSizeInBytesMu.RLock()
defer reportDbTotalSizeInBytesMu.RUnlock()
return reportDbTotalSizeInBytes()
},
)
// overridden by mvcc initialization
reportDbTotalSizeInBytesMu sync.RWMutex
reportDbTotalSizeInBytes func() float64 = func() float64 { return 0 }
)
func init() {

67
vendor/github.com/coreos/etcd/mvcc/metrics_txn.go generated vendored Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/lease"
)
type metricsTxnWrite struct {
TxnWrite
ranges uint
puts uint
deletes uint
}
func newMetricsTxnRead(tr TxnRead) TxnRead {
return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
}
func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
return &metricsTxnWrite{tw, 0, 0, 0}
}
func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
tw.ranges++
return tw.TxnWrite.Range(key, end, ro)
}
func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
tw.deletes++
return tw.TxnWrite.DeleteRange(key, end)
}
func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw.puts++
return tw.TxnWrite.Put(key, value, lease)
}
func (tw *metricsTxnWrite) End() {
defer tw.TxnWrite.End()
if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 {
if sum > 1 {
txnCounter.Inc()
}
return
}
switch {
case tw.ranges == 1:
rangeCounter.Inc()
case tw.puts == 1:
putCounter.Inc()
case tw.deletes == 1:
deleteCounter.Inc()
}
}

View File

@@ -713,7 +713,7 @@ func init() { proto.RegisterFile("kv.proto", fileDescriptorKv) }
var fileDescriptorKv = []byte{
// 303 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
0x14, 0x86, 0x3b, 0x14, 0x0a, 0x3e, 0x08, 0x36, 0x13, 0x12, 0x27, 0x2e, 0x26, 0x95, 0x8d, 0x18,
0x13, 0x4c, 0xf0, 0x06, 0xc6, 0xae, 0x70, 0x61, 0x1a, 0x74, 0x4b, 0x4a, 0x79, 0x21, 0xa4, 0x94,
0x69, 0x4a, 0x9d, 0xa4, 0x37, 0x71, 0xef, 0xde, 0x73, 0xb0, 0xe4, 0x08, 0x52, 0x2f, 0x62, 0xfa,

View File

@@ -41,10 +41,12 @@ type watchable interface {
}
type watchableStore struct {
mu sync.Mutex
*store
// mu protects watcher groups and batches. It should never be locked
// before locking store.mu to avoid deadlock.
mu sync.RWMutex
// victims are watcher batches that were blocked on the watch channel
victims []watcherBatch
victimc chan struct{}
@@ -76,9 +78,11 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}
s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
if s.le != nil {
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(s)
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
s.wg.Add(2)
go s.syncWatchersLoop()
@@ -86,89 +90,6 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
return s
}
func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
s.mu.Lock()
defer s.mu.Unlock()
rev = s.store.Put(key, value, lease)
changes := s.store.getChanges()
if len(changes) != 1 {
plog.Panicf("unexpected len(changes) != 1 after put")
}
ev := mvccpb.Event{
Type: mvccpb.PUT,
Kv: &changes[0],
}
s.notify(rev, []mvccpb.Event{ev})
return rev
}
func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
s.mu.Lock()
defer s.mu.Unlock()
n, rev = s.store.DeleteRange(key, end)
changes := s.store.getChanges()
if len(changes) != int(n) {
plog.Panicf("unexpected len(changes) != n after deleteRange")
}
if n == 0 {
return n, rev
}
evs := make([]mvccpb.Event, n)
for i := range changes {
evs[i] = mvccpb.Event{
Type: mvccpb.DELETE,
Kv: &changes[i]}
evs[i].Kv.ModRevision = rev
}
s.notify(rev, evs)
return n, rev
}
func (s *watchableStore) TxnBegin() int64 {
s.mu.Lock()
return s.store.TxnBegin()
}
func (s *watchableStore) TxnEnd(txnID int64) error {
err := s.store.TxnEnd(txnID)
if err != nil {
return err
}
changes := s.getChanges()
if len(changes) == 0 {
s.mu.Unlock()
return nil
}
rev := s.store.Rev()
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
switch change.CreateRevision {
case 0:
evs[i] = mvccpb.Event{
Type: mvccpb.DELETE,
Kv: &changes[i]}
evs[i].Kv.ModRevision = rev
default:
evs[i] = mvccpb.Event{
Type: mvccpb.PUT,
Kv: &changes[i]}
}
}
s.notify(rev, evs)
s.mu.Unlock()
return nil
}
func (s *watchableStore) Close() error {
close(s.stopc)
s.wg.Wait()
@@ -186,9 +107,6 @@ func (s *watchableStore) NewWatchStream() WatchStream {
}
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
wa := &watcher{
key: key,
end: end,
@@ -198,21 +116,24 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
fcs: fcs,
}
s.store.mu.Lock()
synced := startRev > s.store.currentRev.main || startRev == 0
s.mu.Lock()
s.revMu.RLock()
synced := startRev > s.store.currentRev || startRev == 0
if synced {
wa.minRev = s.store.currentRev.main + 1
wa.minRev = s.store.currentRev + 1
if startRev > wa.minRev {
wa.minRev = startRev
}
}
s.store.mu.Unlock()
if synced {
s.synced.add(wa)
} else {
slowWatcherGauge.Inc()
s.unsynced.add(wa)
}
s.revMu.RUnlock()
s.mu.Unlock()
watcherGauge.Inc()
return wa, func() { s.cancelWatcher(wa) }
@@ -258,17 +179,35 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Unlock()
}
func (s *watchableStore) Restore(b backend.Backend) error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.store.Restore(b)
if err != nil {
return err
}
for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa)
}
s.synced = newWatcherGroup()
return nil
}
// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()
for {
s.mu.Lock()
s.mu.RLock()
st := time.Now()
lastUnsyncedWatchers := s.unsynced.size()
s.syncWatchers()
unsyncedWatchers := s.unsynced.size()
s.mu.Unlock()
s.mu.RUnlock()
unsyncedWatchers := 0
if lastUnsyncedWatchers > 0 {
unsyncedWatchers = s.syncWatchers()
}
syncDuration := time.Since(st)
waitDuration := 100 * time.Millisecond
@@ -295,9 +234,9 @@ func (s *watchableStore) syncVictimsLoop() {
for s.moveVictims() != 0 {
// try to update all victim watchers
}
s.mu.Lock()
s.mu.RLock()
isEmpty := len(s.victims) == 0
s.mu.Unlock()
s.mu.RUnlock()
var tickc <-chan time.Time
if !isEmpty {
@@ -340,8 +279,8 @@ func (s *watchableStore) moveVictims() (moved int) {
// assign completed victim watchers to unsync/sync
s.mu.Lock()
s.store.mu.Lock()
curRev := s.store.currentRev.main
s.store.revMu.RLock()
curRev := s.store.currentRev
for w, eb := range wb {
if newVictim != nil && newVictim[w] != nil {
// couldn't send watch response; stays victim
@@ -358,7 +297,7 @@ func (s *watchableStore) moveVictims() (moved int) {
s.synced.add(w)
}
}
s.store.mu.Unlock()
s.store.revMu.RUnlock()
s.mu.Unlock()
}
@@ -376,19 +315,23 @@ func (s *watchableStore) moveVictims() (moved int) {
// 2. iterate over the set to get the minimum revision and remove compacted watchers
// 3. use minimum revision to get all key-value pairs and send those events to watchers
// 4. remove synced watchers in set from unsynced group and move to synced group
func (s *watchableStore) syncWatchers() {
func (s *watchableStore) syncWatchers() int {
s.mu.Lock()
defer s.mu.Unlock()
if s.unsynced.size() == 0 {
return
return 0
}
s.store.mu.Lock()
defer s.store.mu.Unlock()
s.store.revMu.RLock()
defer s.store.revMu.RUnlock()
// in order to find key-value pairs from unsynced watchers, we need to
// find min revision index, and these revisions can be used to
// query the backend store of key-value pairs
curRev := s.store.currentRev.main
curRev := s.store.currentRev
compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes)
@@ -396,7 +339,7 @@ func (s *watchableStore) syncWatchers() {
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.BatchTx()
tx := s.store.b.ReadTx()
tx.Lock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
evs := kvsToEvents(wg, revs, vs)
@@ -446,6 +389,8 @@ func (s *watchableStore) syncWatchers() {
vsz += len(v)
}
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
return s.unsynced.size()
}
// kvsToEvents gets all events for the watchers from all key-value pairs
@@ -511,8 +456,8 @@ func (s *watchableStore) addVictim(victim watcherBatch) {
func (s *watchableStore) rev() int64 { return s.store.Rev() }
func (s *watchableStore) progress(w *watcher) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
if _, ok := s.synced.watchers[w]; ok {
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})

View File

@@ -0,0 +1,53 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"github.com/coreos/etcd/mvcc/mvccpb"
)
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
// end write txn under watchable store lock so the updates are visible
// when asynchronous event posting checks the current store revision
tw.s.mu.Lock()
tw.s.notify(rev, evs)
tw.TxnWrite.End()
tw.s.mu.Unlock()
}
type watchableStoreTxnWrite struct {
TxnWrite
s *watchableStore
}
func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }

View File

@@ -183,7 +183,7 @@ func (wg *watcherGroup) add(wa *watcher) {
// contains is whether the given key has a watcher in the group.
func (wg *watcherGroup) contains(key string) bool {
_, ok := wg.keyWatchers[key]
return ok || wg.ranges.Contains(adt.NewStringAffinePoint(key))
return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
}
// size gives the number of unique watchers in the group.