Revert "Version bump to etcd v3.2.11, grpc v1.7.5"
This commit is contained in:

committed by
GitHub

parent
0d42e742da
commit
4e8526dc6b
4
vendor/github.com/coreos/etcd/mvcc/BUILD
generated
vendored
4
vendor/github.com/coreos/etcd/mvcc/BUILD
generated
vendored
@@ -7,16 +7,12 @@ go_library(
|
||||
"index.go",
|
||||
"key_index.go",
|
||||
"kv.go",
|
||||
"kv_view.go",
|
||||
"kvstore.go",
|
||||
"kvstore_compaction.go",
|
||||
"kvstore_txn.go",
|
||||
"metrics.go",
|
||||
"metrics_txn.go",
|
||||
"revision.go",
|
||||
"util.go",
|
||||
"watchable_store.go",
|
||||
"watchable_store_txn.go",
|
||||
"watcher.go",
|
||||
"watcher_group.go",
|
||||
],
|
||||
|
11
vendor/github.com/coreos/etcd/mvcc/backend/BUILD
generated
vendored
11
vendor/github.com/coreos/etcd/mvcc/backend/BUILD
generated
vendored
@@ -5,24 +5,19 @@ go_library(
|
||||
srcs = [
|
||||
"backend.go",
|
||||
"batch_tx.go",
|
||||
"config_default.go",
|
||||
"boltoption_default.go",
|
||||
"doc.go",
|
||||
"metrics.go",
|
||||
"read_tx.go",
|
||||
"tx_buffer.go",
|
||||
] + select({
|
||||
"@io_bazel_rules_go//go/platform:linux_amd64": [
|
||||
"config_linux.go",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:windows_amd64": [
|
||||
"config_windows.go",
|
||||
"boltoption_linux.go",
|
||||
],
|
||||
"//conditions:default": [],
|
||||
}),
|
||||
importpath = "github.com/coreos/etcd/mvcc/backend",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//vendor/github.com/coreos/bbolt:go_default_library",
|
||||
"//vendor/github.com/boltdb/bolt:go_default_library",
|
||||
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
],
|
||||
|
141
vendor/github.com/coreos/etcd/mvcc/backend/backend.go
generated
vendored
141
vendor/github.com/coreos/etcd/mvcc/backend/backend.go
generated
vendored
@@ -25,7 +25,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
|
||||
@@ -35,21 +35,25 @@ var (
|
||||
|
||||
defragLimit = 10000
|
||||
|
||||
// initialMmapSize is the initial size of the mmapped region. Setting this larger than
|
||||
// InitialMmapSize is the initial size of the mmapped region. Setting this larger than
|
||||
// the potential max db size can prevent writer from blocking reader.
|
||||
// This only works for linux.
|
||||
initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
|
||||
InitialMmapSize = int64(10 * 1024 * 1024 * 1024)
|
||||
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc/backend")
|
||||
)
|
||||
|
||||
// minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
|
||||
minSnapshotWarningTimeout = time.Duration(30 * time.Second)
|
||||
const (
|
||||
// DefaultQuotaBytes is the number of bytes the backend Size may
|
||||
// consume before exceeding the space quota.
|
||||
DefaultQuotaBytes = int64(2 * 1024 * 1024 * 1024) // 2GB
|
||||
// MaxQuotaBytes is the maximum number of bytes suggested for a backend
|
||||
// quota. A larger quota may lead to degraded performance.
|
||||
MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB
|
||||
)
|
||||
|
||||
type Backend interface {
|
||||
ReadTx() ReadTx
|
||||
BatchTx() BatchTx
|
||||
|
||||
Snapshot() Snapshot
|
||||
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
||||
// Size returns the current size of the backend.
|
||||
@@ -82,71 +86,36 @@ type backend struct {
|
||||
|
||||
batchInterval time.Duration
|
||||
batchLimit int
|
||||
batchTx *batchTxBuffered
|
||||
|
||||
readTx *readTx
|
||||
batchTx *batchTx
|
||||
|
||||
stopc chan struct{}
|
||||
donec chan struct{}
|
||||
}
|
||||
|
||||
type BackendConfig struct {
|
||||
// Path is the file path to the backend file.
|
||||
Path string
|
||||
// BatchInterval is the maximum time before flushing the BatchTx.
|
||||
BatchInterval time.Duration
|
||||
// BatchLimit is the maximum puts before flushing the BatchTx.
|
||||
BatchLimit int
|
||||
// MmapSize is the number of bytes to mmap for the backend.
|
||||
MmapSize uint64
|
||||
}
|
||||
|
||||
func DefaultBackendConfig() BackendConfig {
|
||||
return BackendConfig{
|
||||
BatchInterval: defaultBatchInterval,
|
||||
BatchLimit: defaultBatchLimit,
|
||||
MmapSize: initialMmapSize,
|
||||
}
|
||||
}
|
||||
|
||||
func New(bcfg BackendConfig) Backend {
|
||||
return newBackend(bcfg)
|
||||
func New(path string, d time.Duration, limit int) Backend {
|
||||
return newBackend(path, d, limit)
|
||||
}
|
||||
|
||||
func NewDefaultBackend(path string) Backend {
|
||||
bcfg := DefaultBackendConfig()
|
||||
bcfg.Path = path
|
||||
return newBackend(bcfg)
|
||||
return newBackend(path, defaultBatchInterval, defaultBatchLimit)
|
||||
}
|
||||
|
||||
func newBackend(bcfg BackendConfig) *backend {
|
||||
bopts := &bolt.Options{}
|
||||
if boltOpenOptions != nil {
|
||||
*bopts = *boltOpenOptions
|
||||
}
|
||||
bopts.InitialMmapSize = bcfg.mmapSize()
|
||||
|
||||
db, err := bolt.Open(bcfg.Path, 0600, bopts)
|
||||
func newBackend(path string, d time.Duration, limit int) *backend {
|
||||
db, err := bolt.Open(path, 0600, boltOpenOptions)
|
||||
if err != nil {
|
||||
plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
|
||||
plog.Panicf("cannot open database at %s (%v)", path, err)
|
||||
}
|
||||
|
||||
// In future, may want to make buffering optional for low-concurrency systems
|
||||
// or dynamically swap between buffered/non-buffered depending on workload.
|
||||
b := &backend{
|
||||
db: db,
|
||||
|
||||
batchInterval: bcfg.BatchInterval,
|
||||
batchLimit: bcfg.BatchLimit,
|
||||
|
||||
readTx: &readTx{buf: txReadBuffer{
|
||||
txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
|
||||
},
|
||||
batchInterval: d,
|
||||
batchLimit: limit,
|
||||
|
||||
stopc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
}
|
||||
b.batchTx = newBatchTxBuffered(b)
|
||||
b.batchTx = newBatchTx(b)
|
||||
go b.run()
|
||||
return b
|
||||
}
|
||||
@@ -158,8 +127,6 @@ func (b *backend) BatchTx() BatchTx {
|
||||
return b.batchTx
|
||||
}
|
||||
|
||||
func (b *backend) ReadTx() ReadTx { return b.readTx }
|
||||
|
||||
// ForceCommit forces the current batching tx to commit.
|
||||
func (b *backend) ForceCommit() {
|
||||
b.batchTx.Commit()
|
||||
@@ -174,33 +141,7 @@ func (b *backend) Snapshot() Snapshot {
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
|
||||
stopc, donec := make(chan struct{}), make(chan struct{})
|
||||
dbBytes := tx.Size()
|
||||
go func() {
|
||||
defer close(donec)
|
||||
// sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
|
||||
// assuming a min tcp throughput of 100MB/s.
|
||||
var sendRateBytes int64 = 100 * 1024 * 1014
|
||||
warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
|
||||
if warningTimeout < minSnapshotWarningTimeout {
|
||||
warningTimeout = minSnapshotWarningTimeout
|
||||
}
|
||||
start := time.Now()
|
||||
ticker := time.NewTicker(warningTimeout)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
|
||||
case <-stopc:
|
||||
snapshotDurations.Observe(time.Since(start).Seconds())
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return &snapshot{tx, stopc, donec}
|
||||
return &snapshot{tx}
|
||||
}
|
||||
|
||||
type IgnoreKey struct {
|
||||
@@ -294,11 +235,7 @@ func (b *backend) defrag() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// block concurrent read requests while resetting tx
|
||||
b.readTx.mu.Lock()
|
||||
defer b.readTx.mu.Unlock()
|
||||
|
||||
b.batchTx.unsafeCommit(true)
|
||||
b.batchTx.commit(true)
|
||||
b.batchTx.tx = nil
|
||||
|
||||
tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
|
||||
@@ -339,10 +276,6 @@ func (b *backend) defrag() error {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
|
||||
b.readTx.buf.reset()
|
||||
b.readTx.tx = b.unsafeBegin(false)
|
||||
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -398,22 +331,6 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
return tmptx.Commit()
|
||||
}
|
||||
|
||||
func (b *backend) begin(write bool) *bolt.Tx {
|
||||
b.mu.RLock()
|
||||
tx := b.unsafeBegin(write)
|
||||
b.mu.RUnlock()
|
||||
atomic.StoreInt64(&b.size, tx.Size())
|
||||
return tx
|
||||
}
|
||||
|
||||
func (b *backend) unsafeBegin(write bool) *bolt.Tx {
|
||||
tx, err := b.db.Begin(write)
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
return tx
|
||||
}
|
||||
|
||||
// NewTmpBackend creates a backend implementation for testing.
|
||||
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
|
||||
@@ -421,9 +338,7 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
|
||||
plog.Fatal(err)
|
||||
}
|
||||
tmpPath := filepath.Join(dir, "database")
|
||||
bcfg := DefaultBackendConfig()
|
||||
bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
|
||||
return newBackend(bcfg), tmpPath
|
||||
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
|
||||
}
|
||||
|
||||
func NewDefaultTmpBackend() (*backend, string) {
|
||||
@@ -432,12 +347,6 @@ func NewDefaultTmpBackend() (*backend, string) {
|
||||
|
||||
type snapshot struct {
|
||||
*bolt.Tx
|
||||
stopc chan struct{}
|
||||
donec chan struct{}
|
||||
}
|
||||
|
||||
func (s *snapshot) Close() error {
|
||||
close(s.stopc)
|
||||
<-s.donec
|
||||
return s.Tx.Rollback()
|
||||
}
|
||||
func (s *snapshot) Close() error { return s.Tx.Rollback() }
|
||||
|
168
vendor/github.com/coreos/etcd/mvcc/backend/batch_tx.go
generated
vendored
168
vendor/github.com/coreos/etcd/mvcc/backend/batch_tx.go
generated
vendored
@@ -16,24 +16,23 @@ package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
type BatchTx interface {
|
||||
ReadTx
|
||||
Lock()
|
||||
Unlock()
|
||||
UnsafeCreateBucket(name []byte)
|
||||
UnsafePut(bucketName []byte, key []byte, value []byte)
|
||||
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
|
||||
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
|
||||
UnsafeDelete(bucketName []byte, key []byte)
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
|
||||
Commit()
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
CommitAndStop()
|
||||
}
|
||||
|
||||
@@ -41,10 +40,15 @@ type batchTx struct {
|
||||
sync.Mutex
|
||||
tx *bolt.Tx
|
||||
backend *backend
|
||||
|
||||
pending int
|
||||
}
|
||||
|
||||
func newBatchTx(backend *backend) *batchTx {
|
||||
tx := &batchTx{backend: backend}
|
||||
tx.Commit()
|
||||
return tx
|
||||
}
|
||||
|
||||
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
||||
_, err := t.tx.CreateBucket(name)
|
||||
if err != nil && err != bolt.ErrBucketExists {
|
||||
@@ -80,37 +84,30 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
|
||||
}
|
||||
|
||||
// UnsafeRange must be called holding the lock on the tx.
|
||||
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||
k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
|
||||
if err != nil {
|
||||
plog.Fatal(err)
|
||||
}
|
||||
return k, v
|
||||
}
|
||||
|
||||
func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
|
||||
bucket := tx.Bucket(bucketName)
|
||||
func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
|
||||
bucket := t.tx.Bucket(bucketName)
|
||||
if bucket == nil {
|
||||
return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
|
||||
plog.Fatalf("bucket %s does not exist", bucketName)
|
||||
}
|
||||
|
||||
if len(endKey) == 0 {
|
||||
if v := bucket.Get(key); v != nil {
|
||||
return append(keys, key), append(vs, v), nil
|
||||
if v := bucket.Get(key); v == nil {
|
||||
return keys, vs
|
||||
} else {
|
||||
return append(keys, key), append(vs, v)
|
||||
}
|
||||
return nil, nil, nil
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = math.MaxInt64
|
||||
}
|
||||
|
||||
c := bucket.Cursor()
|
||||
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
|
||||
vs = append(vs, cv)
|
||||
keys = append(keys, ck)
|
||||
if limit == int64(len(keys)) {
|
||||
if limit > 0 && limit == int64(len(keys)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return keys, vs, nil
|
||||
|
||||
return keys, vs
|
||||
}
|
||||
|
||||
// UnsafeDelete must be called holding the lock on the tx.
|
||||
@@ -128,14 +125,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
|
||||
|
||||
// UnsafeForEach must be called holding the lock on the tx.
|
||||
func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
|
||||
return unsafeForEach(t.tx, bucketName, visitor)
|
||||
}
|
||||
|
||||
func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
|
||||
if b := tx.Bucket(bucket); b != nil {
|
||||
return b.ForEach(visitor)
|
||||
b := t.tx.Bucket(bucketName)
|
||||
if b == nil {
|
||||
// bucket does not exist
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
return b.ForEach(visitor)
|
||||
}
|
||||
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
@@ -145,7 +140,7 @@ func (t *batchTx) Commit() {
|
||||
t.commit(false)
|
||||
}
|
||||
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
// CommitAndStop commits the previous tx and do not create a new one.
|
||||
func (t *batchTx) CommitAndStop() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
@@ -155,28 +150,37 @@ func (t *batchTx) CommitAndStop() {
|
||||
func (t *batchTx) Unlock() {
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
t.commit(false)
|
||||
t.pending = 0
|
||||
}
|
||||
t.Mutex.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTx) commit(stop bool) {
|
||||
var err error
|
||||
// commit the last tx
|
||||
if t.tx != nil {
|
||||
if t.pending == 0 && !stop {
|
||||
t.backend.mu.RLock()
|
||||
defer t.backend.mu.RUnlock()
|
||||
|
||||
// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
|
||||
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
|
||||
// Server must make sure 'batchTx.commit(false)' does not follow
|
||||
// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
// batchTx.commit(true) calls *bolt.Tx.Commit, which
|
||||
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
|
||||
// and subsequent *bolt.Tx.Size() call panics.
|
||||
//
|
||||
// This nil pointer reference panic happens when:
|
||||
// 1. batchTx.commit(false) from newBatchTx
|
||||
// 2. batchTx.commit(true) from stopping backend
|
||||
// 3. batchTx.commit(false) from inflight mvcc Hash call
|
||||
//
|
||||
// Check if db is nil to prevent this panic
|
||||
if t.tx.DB() != nil {
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
// gofail: var beforeCommit struct{}
|
||||
err := t.tx.Commit()
|
||||
err = t.tx.Commit()
|
||||
// gofail: var afterCommit struct{}
|
||||
commitDurations.Observe(time.Since(start).Seconds())
|
||||
atomic.AddInt64(&t.backend.commits, 1)
|
||||
@@ -186,81 +190,17 @@ func (t *batchTx) commit(stop bool) {
|
||||
plog.Fatalf("cannot commit tx (%s)", err)
|
||||
}
|
||||
}
|
||||
if !stop {
|
||||
t.tx = t.backend.begin(true)
|
||||
}
|
||||
}
|
||||
|
||||
type batchTxBuffered struct {
|
||||
batchTx
|
||||
buf txWriteBuffer
|
||||
}
|
||||
|
||||
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
||||
tx := &batchTxBuffered{
|
||||
batchTx: batchTx{backend: backend},
|
||||
buf: txWriteBuffer{
|
||||
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
|
||||
seq: true,
|
||||
},
|
||||
}
|
||||
tx.Commit()
|
||||
return tx
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) Unlock() {
|
||||
if t.pending != 0 {
|
||||
t.backend.readTx.mu.Lock()
|
||||
t.buf.writeback(&t.backend.readTx.buf)
|
||||
t.backend.readTx.mu.Unlock()
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
t.commit(false)
|
||||
}
|
||||
}
|
||||
t.batchTx.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) Commit() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(false)
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) CommitAndStop() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(true)
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) commit(stop bool) {
|
||||
// all read txs must be closed to acquire boltdb commit rwlock
|
||||
t.backend.readTx.mu.Lock()
|
||||
defer t.backend.readTx.mu.Unlock()
|
||||
t.unsafeCommit(stop)
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
if t.backend.readTx.tx != nil {
|
||||
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||
}
|
||||
t.backend.readTx.buf.reset()
|
||||
t.backend.readTx.tx = nil
|
||||
if stop {
|
||||
return
|
||||
}
|
||||
|
||||
t.batchTx.commit(stop)
|
||||
|
||||
if !stop {
|
||||
t.backend.readTx.tx = t.backend.begin(false)
|
||||
t.backend.mu.RLock()
|
||||
defer t.backend.mu.RUnlock()
|
||||
// begin a new tx
|
||||
t.tx, err = t.backend.db.Begin(true)
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
|
||||
t.batchTx.UnsafePut(bucketName, key, value)
|
||||
t.buf.put(bucketName, key, value)
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
|
||||
t.batchTx.UnsafeSeqPut(bucketName, key, value)
|
||||
t.buf.putSeq(bucketName, key, value)
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
}
|
||||
|
@@ -12,12 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !linux,!windows
|
||||
// +build !linux
|
||||
|
||||
package backend
|
||||
|
||||
import bolt "github.com/coreos/bbolt"
|
||||
import "github.com/boltdb/bolt"
|
||||
|
||||
var boltOpenOptions *bolt.Options = nil
|
||||
|
||||
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
|
@@ -17,7 +17,7 @@ package backend
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
// syscall.MAP_POPULATE on linux 2.6.23+ does sequential read-ahead
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
// (https://github.com/torvalds/linux/releases/tag/v2.6.23), mmap might
|
||||
// silently ignore this flag. Please update your kernel to prevent this.
|
||||
var boltOpenOptions = &bolt.Options{
|
||||
MmapFlags: syscall.MAP_POPULATE,
|
||||
MmapFlags: syscall.MAP_POPULATE,
|
||||
InitialMmapSize: int(InitialMmapSize),
|
||||
}
|
||||
|
||||
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
|
26
vendor/github.com/coreos/etcd/mvcc/backend/config_windows.go
generated
vendored
26
vendor/github.com/coreos/etcd/mvcc/backend/config_windows.go
generated
vendored
@@ -1,26 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build windows
|
||||
|
||||
package backend
|
||||
|
||||
import bolt "github.com/coreos/bbolt"
|
||||
|
||||
var boltOpenOptions *bolt.Options = nil
|
||||
|
||||
// setting mmap size != 0 on windows will allocate the entire
|
||||
// mmap size for the file, instead of growing it. So, force 0.
|
||||
|
||||
func (bcfg *BackendConfig) mmapSize() int { return 0 }
|
10
vendor/github.com/coreos/etcd/mvcc/backend/metrics.go
generated
vendored
10
vendor/github.com/coreos/etcd/mvcc/backend/metrics.go
generated
vendored
@@ -24,18 +24,8 @@ var (
|
||||
Help: "The latency distributions of commit called by backend.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
|
||||
snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "disk",
|
||||
Name: "backend_snapshot_duration_seconds",
|
||||
Help: "The latency distribution of backend snapshots.",
|
||||
// 10 ms -> 655 seconds
|
||||
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(commitDurations)
|
||||
prometheus.MustRegister(snapshotDurations)
|
||||
}
|
||||
|
92
vendor/github.com/coreos/etcd/mvcc/backend/read_tx.go
generated
vendored
92
vendor/github.com/coreos/etcd/mvcc/backend/read_tx.go
generated
vendored
@@ -1,92 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
)
|
||||
|
||||
// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
|
||||
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
|
||||
// is known to never overwrite any key so range is safe.
|
||||
var safeRangeBucket = []byte("key")
|
||||
|
||||
type ReadTx interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
|
||||
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
|
||||
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
|
||||
}
|
||||
|
||||
type readTx struct {
|
||||
// mu protects accesses to the txReadBuffer
|
||||
mu sync.RWMutex
|
||||
buf txReadBuffer
|
||||
|
||||
// txmu protects accesses to the Tx on Range requests
|
||||
txmu sync.Mutex
|
||||
tx *bolt.Tx
|
||||
}
|
||||
|
||||
func (rt *readTx) Lock() { rt.mu.RLock() }
|
||||
func (rt *readTx) Unlock() { rt.mu.RUnlock() }
|
||||
|
||||
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||
if endKey == nil {
|
||||
// forbid duplicates for single keys
|
||||
limit = 1
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = math.MaxInt64
|
||||
}
|
||||
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
|
||||
panic("do not use unsafeRange on non-keys bucket")
|
||||
}
|
||||
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
|
||||
if int64(len(keys)) == limit {
|
||||
return keys, vals
|
||||
}
|
||||
rt.txmu.Lock()
|
||||
// ignore error since bucket may have been created in this batch
|
||||
k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)))
|
||||
rt.txmu.Unlock()
|
||||
return append(k2, keys...), append(v2, vals...)
|
||||
}
|
||||
|
||||
func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
|
||||
dups := make(map[string]struct{})
|
||||
f1 := func(k, v []byte) error {
|
||||
dups[string(k)] = struct{}{}
|
||||
return visitor(k, v)
|
||||
}
|
||||
f2 := func(k, v []byte) error {
|
||||
if _, ok := dups[string(k)]; ok {
|
||||
return nil
|
||||
}
|
||||
return visitor(k, v)
|
||||
}
|
||||
if err := rt.buf.ForEach(bucketName, f1); err != nil {
|
||||
return err
|
||||
}
|
||||
rt.txmu.Lock()
|
||||
err := unsafeForEach(rt.tx, bucketName, f2)
|
||||
rt.txmu.Unlock()
|
||||
return err
|
||||
}
|
181
vendor/github.com/coreos/etcd/mvcc/backend/tx_buffer.go
generated
vendored
181
vendor/github.com/coreos/etcd/mvcc/backend/tx_buffer.go
generated
vendored
@@ -1,181 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
|
||||
type txBuffer struct {
|
||||
buckets map[string]*bucketBuffer
|
||||
}
|
||||
|
||||
func (txb *txBuffer) reset() {
|
||||
for k, v := range txb.buckets {
|
||||
if v.used == 0 {
|
||||
// demote
|
||||
delete(txb.buckets, k)
|
||||
}
|
||||
v.used = 0
|
||||
}
|
||||
}
|
||||
|
||||
// txWriteBuffer buffers writes of pending updates that have not yet committed.
|
||||
type txWriteBuffer struct {
|
||||
txBuffer
|
||||
seq bool
|
||||
}
|
||||
|
||||
func (txw *txWriteBuffer) put(bucket, k, v []byte) {
|
||||
txw.seq = false
|
||||
txw.putSeq(bucket, k, v)
|
||||
}
|
||||
|
||||
func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
|
||||
b, ok := txw.buckets[string(bucket)]
|
||||
if !ok {
|
||||
b = newBucketBuffer()
|
||||
txw.buckets[string(bucket)] = b
|
||||
}
|
||||
b.add(k, v)
|
||||
}
|
||||
|
||||
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
||||
for k, wb := range txw.buckets {
|
||||
rb, ok := txr.buckets[k]
|
||||
if !ok {
|
||||
delete(txw.buckets, k)
|
||||
txr.buckets[k] = wb
|
||||
continue
|
||||
}
|
||||
if !txw.seq && wb.used > 1 {
|
||||
// assume no duplicate keys
|
||||
sort.Sort(wb)
|
||||
}
|
||||
rb.merge(wb)
|
||||
}
|
||||
txw.reset()
|
||||
}
|
||||
|
||||
// txReadBuffer accesses buffered updates.
|
||||
type txReadBuffer struct{ txBuffer }
|
||||
|
||||
func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||
if b := txr.buckets[string(bucketName)]; b != nil {
|
||||
return b.Range(key, endKey, limit)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
|
||||
if b := txr.buckets[string(bucketName)]; b != nil {
|
||||
return b.ForEach(visitor)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
key []byte
|
||||
val []byte
|
||||
}
|
||||
|
||||
// bucketBuffer buffers key-value pairs that are pending commit.
|
||||
type bucketBuffer struct {
|
||||
buf []kv
|
||||
// used tracks number of elements in use so buf can be reused without reallocation.
|
||||
used int
|
||||
}
|
||||
|
||||
func newBucketBuffer() *bucketBuffer {
|
||||
return &bucketBuffer{buf: make([]kv, 512), used: 0}
|
||||
}
|
||||
|
||||
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
|
||||
f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
|
||||
idx := sort.Search(bb.used, f)
|
||||
if idx < 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if len(endKey) == 0 {
|
||||
if bytes.Equal(key, bb.buf[idx].key) {
|
||||
keys = append(keys, bb.buf[idx].key)
|
||||
vals = append(vals, bb.buf[idx].val)
|
||||
}
|
||||
return keys, vals
|
||||
}
|
||||
if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
|
||||
return nil, nil
|
||||
}
|
||||
for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
|
||||
if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
|
||||
break
|
||||
}
|
||||
keys = append(keys, bb.buf[i].key)
|
||||
vals = append(vals, bb.buf[i].val)
|
||||
}
|
||||
return keys, vals
|
||||
}
|
||||
|
||||
func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
|
||||
for i := 0; i < bb.used; i++ {
|
||||
if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bb *bucketBuffer) add(k, v []byte) {
|
||||
bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
|
||||
bb.used++
|
||||
if bb.used == len(bb.buf) {
|
||||
buf := make([]kv, (3*len(bb.buf))/2)
|
||||
copy(buf, bb.buf)
|
||||
bb.buf = buf
|
||||
}
|
||||
}
|
||||
|
||||
// merge merges data from bb into bbsrc.
|
||||
func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
|
||||
for i := 0; i < bbsrc.used; i++ {
|
||||
bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
|
||||
}
|
||||
if bb.used == bbsrc.used {
|
||||
return
|
||||
}
|
||||
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
sort.Stable(bb)
|
||||
|
||||
// remove duplicates, using only newest update
|
||||
widx := 0
|
||||
for ridx := 1; ridx < bb.used; ridx++ {
|
||||
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
|
||||
widx++
|
||||
}
|
||||
bb.buf[widx] = bb.buf[ridx]
|
||||
}
|
||||
bb.used = widx + 1
|
||||
}
|
||||
|
||||
func (bb *bucketBuffer) Len() int { return bb.used }
|
||||
func (bb *bucketBuffer) Less(i, j int) bool {
|
||||
return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
|
||||
}
|
||||
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }
|
21
vendor/github.com/coreos/etcd/mvcc/index.go
generated
vendored
21
vendor/github.com/coreos/etcd/mvcc/index.go
generated
vendored
@@ -29,9 +29,7 @@ type index interface {
|
||||
RangeSince(key, end []byte, rev int64) []revision
|
||||
Compact(rev int64) map[revision]struct{}
|
||||
Equal(b index) bool
|
||||
|
||||
Insert(ki *keyIndex)
|
||||
KeyIndex(ki *keyIndex) *keyIndex
|
||||
}
|
||||
|
||||
type treeIndex struct {
|
||||
@@ -62,27 +60,18 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
|
||||
|
||||
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
|
||||
keyi := &keyIndex{key: key}
|
||||
|
||||
ti.RLock()
|
||||
defer ti.RUnlock()
|
||||
if keyi = ti.keyIndex(keyi); keyi == nil {
|
||||
item := ti.tree.Get(keyi)
|
||||
if item == nil {
|
||||
return revision{}, revision{}, 0, ErrRevisionNotFound
|
||||
}
|
||||
|
||||
keyi = item.(*keyIndex)
|
||||
return keyi.get(atRev)
|
||||
}
|
||||
|
||||
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
|
||||
ti.RLock()
|
||||
defer ti.RUnlock()
|
||||
return ti.keyIndex(keyi)
|
||||
}
|
||||
|
||||
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
|
||||
if item := ti.tree.Get(keyi); item != nil {
|
||||
return item.(*keyIndex)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
|
||||
if end == nil {
|
||||
rev, _, _, err := ti.Get(key, atRev)
|
||||
|
1
vendor/github.com/coreos/etcd/mvcc/key_index.go
generated
vendored
1
vendor/github.com/coreos/etcd/mvcc/key_index.go
generated
vendored
@@ -222,6 +222,7 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
|
||||
}
|
||||
// remove the previous generations.
|
||||
ki.generations = ki.generations[i:]
|
||||
return
|
||||
}
|
||||
|
||||
func (ki *keyIndex) isEmpty() bool {
|
||||
|
82
vendor/github.com/coreos/etcd/mvcc/kv.go
generated
vendored
82
vendor/github.com/coreos/etcd/mvcc/kv.go
generated
vendored
@@ -32,15 +32,15 @@ type RangeResult struct {
|
||||
Count int
|
||||
}
|
||||
|
||||
type ReadView interface {
|
||||
// FirstRev returns the first KV revision at the time of opening the txn.
|
||||
type KV interface {
|
||||
// Rev returns the current revision of the KV.
|
||||
Rev() int64
|
||||
|
||||
// FirstRev returns the first revision of the KV.
|
||||
// After a compaction, the first revision increases to the compaction
|
||||
// revision.
|
||||
FirstRev() int64
|
||||
|
||||
// Rev returns the revision of the KV at the time of opening the txn.
|
||||
Rev() int64
|
||||
|
||||
// Range gets the keys in the range at rangeRev.
|
||||
// The returned rev is the current revision of the KV when the operation is executed.
|
||||
// If rangeRev <=0, range gets the keys at currentRev.
|
||||
@@ -50,17 +50,14 @@ type ReadView interface {
|
||||
// Limit limits the number of keys returned.
|
||||
// If the required rev is compacted, ErrCompacted will be returned.
|
||||
Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
|
||||
}
|
||||
|
||||
// TxnRead represents a read-only transaction with operations that will not
|
||||
// block other read transactions.
|
||||
type TxnRead interface {
|
||||
ReadView
|
||||
// End marks the transaction is complete and ready to commit.
|
||||
End()
|
||||
}
|
||||
// Put puts the given key, value into the store. Put also takes additional argument lease to
|
||||
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
|
||||
// id.
|
||||
// A put also increases the rev of the store, and generates one event in the event history.
|
||||
// The returned rev is the current revision of the KV when the operation is executed.
|
||||
Put(key, value []byte, lease lease.LeaseID) (rev int64)
|
||||
|
||||
type WriteView interface {
|
||||
// DeleteRange deletes the given range from the store.
|
||||
// A deleteRange increases the rev of the store if any key in the range exists.
|
||||
// The number of key deleted will be returned.
|
||||
@@ -70,51 +67,26 @@ type WriteView interface {
|
||||
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
|
||||
DeleteRange(key, end []byte) (n, rev int64)
|
||||
|
||||
// Put puts the given key, value into the store. Put also takes additional argument lease to
|
||||
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
|
||||
// id.
|
||||
// A put also increases the rev of the store, and generates one event in the event history.
|
||||
// The returned rev is the current revision of the KV when the operation is executed.
|
||||
Put(key, value []byte, lease lease.LeaseID) (rev int64)
|
||||
}
|
||||
|
||||
// TxnWrite represents a transaction that can modify the store.
|
||||
type TxnWrite interface {
|
||||
TxnRead
|
||||
WriteView
|
||||
// Changes gets the changes made since opening the write txn.
|
||||
Changes() []mvccpb.KeyValue
|
||||
}
|
||||
|
||||
// txnReadWrite coerces a read txn to a write, panicking on any write operation.
|
||||
type txnReadWrite struct{ TxnRead }
|
||||
|
||||
func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
|
||||
func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
panic("unexpected Put")
|
||||
}
|
||||
func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
|
||||
|
||||
func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
|
||||
|
||||
type KV interface {
|
||||
ReadView
|
||||
WriteView
|
||||
|
||||
// Read creates a read transaction.
|
||||
Read() TxnRead
|
||||
|
||||
// Write creates a write transaction.
|
||||
Write() TxnWrite
|
||||
|
||||
// Hash retrieves the hash of KV state and revision.
|
||||
// This method is designed for consistency checking purposes.
|
||||
Hash() (hash uint32, revision int64, err error)
|
||||
// TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked
|
||||
// until txn ends. Only one on-going txn is allowed.
|
||||
// TxnBegin returns an int64 txn ID.
|
||||
// All txn prefixed operations with same txn ID will be done with the same rev.
|
||||
TxnBegin() int64
|
||||
// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
|
||||
TxnEnd(txnID int64) error
|
||||
// TxnRange returns the current revision of the KV when the operation is executed.
|
||||
TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
|
||||
TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
|
||||
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
|
||||
|
||||
// Compact frees all superseded keys with revisions less than rev.
|
||||
Compact(rev int64) (<-chan struct{}, error)
|
||||
|
||||
// Commit commits outstanding txns into the underlying backend.
|
||||
// Hash retrieves the hash of KV state and revision.
|
||||
// This method is designed for consistency checking purpose.
|
||||
Hash() (hash uint32, revision int64, err error)
|
||||
|
||||
// Commit commits txns into the underlying backend.
|
||||
Commit()
|
||||
|
||||
// Restore restores the KV store from a backend.
|
||||
|
53
vendor/github.com/coreos/etcd/mvcc/kv_view.go
generated
vendored
53
vendor/github.com/coreos/etcd/mvcc/kv_view.go
generated
vendored
@@ -1,53 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/lease"
|
||||
)
|
||||
|
||||
type readView struct{ kv KV }
|
||||
|
||||
func (rv *readView) FirstRev() int64 {
|
||||
tr := rv.kv.Read()
|
||||
defer tr.End()
|
||||
return tr.FirstRev()
|
||||
}
|
||||
|
||||
func (rv *readView) Rev() int64 {
|
||||
tr := rv.kv.Read()
|
||||
defer tr.End()
|
||||
return tr.Rev()
|
||||
}
|
||||
|
||||
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
tr := rv.kv.Read()
|
||||
defer tr.End()
|
||||
return tr.Range(key, end, ro)
|
||||
}
|
||||
|
||||
type writeView struct{ kv KV }
|
||||
|
||||
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
|
||||
tw := wv.kv.Write()
|
||||
defer tw.End()
|
||||
return tw.DeleteRange(key, end)
|
||||
}
|
||||
|
||||
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
tw := wv.kv.Write()
|
||||
defer tw.End()
|
||||
return tw.Put(key, value, lease)
|
||||
}
|
557
vendor/github.com/coreos/etcd/mvcc/kvstore.go
generated
vendored
557
vendor/github.com/coreos/etcd/mvcc/kvstore.go
generated
vendored
@@ -18,6 +18,7 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -33,28 +34,24 @@ var (
|
||||
keyBucketName = []byte("key")
|
||||
metaBucketName = []byte("meta")
|
||||
|
||||
consistentIndexKeyName = []byte("consistent_index")
|
||||
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||
finishedCompactKeyName = []byte("finishedCompactRev")
|
||||
|
||||
ErrCompacted = errors.New("mvcc: required revision has been compacted")
|
||||
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
|
||||
ErrCanceled = errors.New("mvcc: watcher is canceled")
|
||||
ErrClosed = errors.New("mvcc: closed")
|
||||
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
|
||||
)
|
||||
|
||||
const (
|
||||
// markedRevBytesLen is the byte length of marked revision.
|
||||
// The first `revBytesLen` bytes represents a normal revision. The last
|
||||
// one byte is the mark.
|
||||
markedRevBytesLen = revBytesLen + 1
|
||||
markBytePosition = markedRevBytesLen - 1
|
||||
markTombstone byte = 't'
|
||||
)
|
||||
|
||||
var restoreChunkKeys = 10000 // non-const for testing
|
||||
consistentIndexKeyName = []byte("consistent_index")
|
||||
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||
finishedCompactKeyName = []byte("finishedCompactRev")
|
||||
|
||||
ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch")
|
||||
ErrCompacted = errors.New("mvcc: required revision has been compacted")
|
||||
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
|
||||
ErrCanceled = errors.New("mvcc: watcher is canceled")
|
||||
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
|
||||
)
|
||||
|
||||
// ConsistentIndexGetter is an interface that wraps the Get method.
|
||||
// Consistent index is the offset of an entry in a consistent replicated log.
|
||||
@@ -64,11 +61,7 @@ type ConsistentIndexGetter interface {
|
||||
}
|
||||
|
||||
type store struct {
|
||||
ReadView
|
||||
WriteView
|
||||
|
||||
// mu read locks for txns and write locks for non-txn store changes.
|
||||
mu sync.RWMutex
|
||||
mu sync.Mutex // guards the following
|
||||
|
||||
ig ConsistentIndexGetter
|
||||
|
||||
@@ -77,19 +70,19 @@ type store struct {
|
||||
|
||||
le lease.Lessor
|
||||
|
||||
// revMuLock protects currentRev and compactMainRev.
|
||||
// Locked at end of write txn and released after write txn unlock lock.
|
||||
// Locked before locking read txn and released after locking.
|
||||
revMu sync.RWMutex
|
||||
// currentRev is the revision of the last completed transaction.
|
||||
currentRev int64
|
||||
// compactMainRev is the main revision of the last compaction.
|
||||
currentRev revision
|
||||
// the main revision of the last compaction
|
||||
compactMainRev int64
|
||||
|
||||
tx backend.BatchTx
|
||||
txnID int64 // tracks the current txnID to verify txn operations
|
||||
txnModify bool
|
||||
|
||||
// bytesBuf8 is a byte slice of length 8
|
||||
// to avoid a repetitive allocation in saveIndex.
|
||||
bytesBuf8 []byte
|
||||
|
||||
changes []mvccpb.KeyValue
|
||||
fifoSched schedule.Scheduler
|
||||
|
||||
stopc chan struct{}
|
||||
@@ -105,18 +98,17 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
|
||||
|
||||
le: le,
|
||||
|
||||
currentRev: 1,
|
||||
currentRev: revision{main: 1},
|
||||
compactMainRev: -1,
|
||||
|
||||
bytesBuf8: make([]byte, 8),
|
||||
bytesBuf8: make([]byte, 8, 8),
|
||||
fifoSched: schedule.NewFIFOScheduler(),
|
||||
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s.ReadView = &readView{s}
|
||||
s.WriteView = &writeView{s}
|
||||
|
||||
if s.le != nil {
|
||||
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
|
||||
s.le.SetRangeDeleter(s)
|
||||
}
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
@@ -134,6 +126,140 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *store) Rev() int64 {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.currentRev.main
|
||||
}
|
||||
|
||||
func (s *store) FirstRev() int64 {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.compactMainRev
|
||||
}
|
||||
|
||||
func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
|
||||
id := s.TxnBegin()
|
||||
s.put(key, value, lease)
|
||||
s.txnEnd(id)
|
||||
|
||||
putCounter.Inc()
|
||||
|
||||
return int64(s.currentRev.main)
|
||||
}
|
||||
|
||||
func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
id := s.TxnBegin()
|
||||
kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
|
||||
s.txnEnd(id)
|
||||
|
||||
rangeCounter.Inc()
|
||||
|
||||
r = &RangeResult{
|
||||
KVs: kvs,
|
||||
Count: count,
|
||||
Rev: rev,
|
||||
}
|
||||
|
||||
return r, err
|
||||
}
|
||||
|
||||
func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
|
||||
id := s.TxnBegin()
|
||||
n = s.deleteRange(key, end)
|
||||
s.txnEnd(id)
|
||||
|
||||
deleteCounter.Inc()
|
||||
|
||||
return n, int64(s.currentRev.main)
|
||||
}
|
||||
|
||||
func (s *store) TxnBegin() int64 {
|
||||
s.mu.Lock()
|
||||
s.currentRev.sub = 0
|
||||
s.tx = s.b.BatchTx()
|
||||
s.tx.Lock()
|
||||
|
||||
s.txnID = rand.Int63()
|
||||
return s.txnID
|
||||
}
|
||||
|
||||
func (s *store) TxnEnd(txnID int64) error {
|
||||
err := s.txnEnd(txnID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txnCounter.Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
// txnEnd is used for unlocking an internal txn. It does
|
||||
// not increase the txnCounter.
|
||||
func (s *store) txnEnd(txnID int64) error {
|
||||
if txnID != s.txnID {
|
||||
return ErrTxnIDMismatch
|
||||
}
|
||||
|
||||
// only update index if the txn modifies the mvcc state.
|
||||
// read only txn might execute with one write txn concurrently,
|
||||
// it should not write its index to mvcc.
|
||||
if s.txnModify {
|
||||
s.saveIndex()
|
||||
}
|
||||
s.txnModify = false
|
||||
|
||||
s.tx.Unlock()
|
||||
if s.currentRev.sub != 0 {
|
||||
s.currentRev.main += 1
|
||||
}
|
||||
s.currentRev.sub = 0
|
||||
|
||||
dbTotalSize.Set(float64(s.b.Size()))
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
if txnID != s.txnID {
|
||||
return nil, ErrTxnIDMismatch
|
||||
}
|
||||
|
||||
kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
|
||||
|
||||
r = &RangeResult{
|
||||
KVs: kvs,
|
||||
Count: count,
|
||||
Rev: rev,
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
||||
func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
||||
if txnID != s.txnID {
|
||||
return 0, ErrTxnIDMismatch
|
||||
}
|
||||
|
||||
s.put(key, value, lease)
|
||||
return int64(s.currentRev.main + 1), nil
|
||||
}
|
||||
|
||||
func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
|
||||
if txnID != s.txnID {
|
||||
return 0, 0, ErrTxnIDMismatch
|
||||
}
|
||||
|
||||
n = s.deleteRange(key, end)
|
||||
if n != 0 || s.currentRev.sub != 0 {
|
||||
rev = int64(s.currentRev.main + 1)
|
||||
} else {
|
||||
rev = int64(s.currentRev.main)
|
||||
}
|
||||
return n, rev, nil
|
||||
}
|
||||
|
||||
func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
|
||||
if ctx == nil || ctx.Err() != nil {
|
||||
s.mu.Lock()
|
||||
@@ -149,25 +275,16 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func (s *store) Hash() (hash uint32, revision int64, err error) {
|
||||
s.b.ForceCommit()
|
||||
h, err := s.b.Hash(DefaultIgnores)
|
||||
return h, s.currentRev, err
|
||||
}
|
||||
|
||||
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.revMu.Lock()
|
||||
defer s.revMu.Unlock()
|
||||
|
||||
if rev <= s.compactMainRev {
|
||||
ch := make(chan struct{})
|
||||
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
|
||||
s.fifoSched.Schedule(f)
|
||||
return ch, ErrCompacted
|
||||
}
|
||||
if rev > s.currentRev {
|
||||
if rev > s.currentRev.main {
|
||||
return nil, ErrFutureRev
|
||||
}
|
||||
|
||||
@@ -216,14 +333,24 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *store) Hash() (uint32, int64, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.b.ForceCommit()
|
||||
|
||||
h, err := s.b.Hash(DefaultIgnores)
|
||||
rev := s.currentRev.main
|
||||
return h, rev, err
|
||||
}
|
||||
|
||||
func (s *store) Commit() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
s.saveIndex(tx)
|
||||
tx.Unlock()
|
||||
s.tx = s.b.BatchTx()
|
||||
s.tx.Lock()
|
||||
s.saveIndex()
|
||||
s.tx.Unlock()
|
||||
s.b.ForceCommit()
|
||||
}
|
||||
|
||||
@@ -236,8 +363,10 @@ func (s *store) Restore(b backend.Backend) error {
|
||||
|
||||
s.b = b
|
||||
s.kvindex = newTreeIndex()
|
||||
s.currentRev = 1
|
||||
s.currentRev = revision{main: 1}
|
||||
s.compactMainRev = -1
|
||||
s.tx = b.BatchTx()
|
||||
s.txnID = -1
|
||||
s.fifoSched = schedule.NewFIFOScheduler()
|
||||
s.stopc = make(chan struct{})
|
||||
|
||||
@@ -245,63 +374,75 @@ func (s *store) Restore(b backend.Backend) error {
|
||||
}
|
||||
|
||||
func (s *store) restore() error {
|
||||
reportDbTotalSizeInBytesMu.Lock()
|
||||
b := s.b
|
||||
reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
|
||||
reportDbTotalSizeInBytesMu.Unlock()
|
||||
|
||||
min, max := newRevBytes(), newRevBytes()
|
||||
revToBytes(revision{main: 1}, min)
|
||||
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
||||
|
||||
keyToLease := make(map[string]lease.LeaseID)
|
||||
|
||||
// use an unordered map to hold the temp index data to speed up
|
||||
// the initial key index recovery.
|
||||
// we will convert this unordered map into the tree index later.
|
||||
unordered := make(map[string]*keyIndex, 100000)
|
||||
|
||||
// restore index
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
|
||||
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
|
||||
if len(finishedCompactBytes) != 0 {
|
||||
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
|
||||
plog.Printf("restore compact to %d", s.compactMainRev)
|
||||
}
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
||||
scheduledCompact := int64(0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
|
||||
|
||||
// TODO: limit N to reduce max memory usage
|
||||
keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
|
||||
for i, key := range keys {
|
||||
var kv mvccpb.KeyValue
|
||||
if err := kv.Unmarshal(vals[i]); err != nil {
|
||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||
}
|
||||
|
||||
rev := bytesToRev(key[:revBytesLen])
|
||||
|
||||
// restore index
|
||||
switch {
|
||||
case isTombstone(key):
|
||||
if ki, ok := unordered[string(kv.Key)]; ok {
|
||||
ki.tombstone(rev.main, rev.sub)
|
||||
}
|
||||
delete(keyToLease, string(kv.Key))
|
||||
|
||||
default:
|
||||
ki, ok := unordered[string(kv.Key)]
|
||||
if ok {
|
||||
ki.put(rev.main, rev.sub)
|
||||
} else {
|
||||
ki = &keyIndex{key: kv.Key}
|
||||
ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
|
||||
unordered[string(kv.Key)] = ki
|
||||
}
|
||||
|
||||
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
|
||||
keyToLease[string(kv.Key)] = lid
|
||||
} else {
|
||||
delete(keyToLease, string(kv.Key))
|
||||
}
|
||||
}
|
||||
|
||||
// update revision
|
||||
s.currentRev = rev
|
||||
}
|
||||
|
||||
// index keys concurrently as they're loaded in from tx
|
||||
keysGauge.Set(0)
|
||||
rkvc, revc := restoreIntoIndex(s.kvindex)
|
||||
for {
|
||||
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
|
||||
if len(keys) == 0 {
|
||||
break
|
||||
}
|
||||
// rkvc blocks if the total pending keys exceeds the restore
|
||||
// chunk size to keep keys from consuming too much memory.
|
||||
restoreChunk(rkvc, keys, vals, keyToLease)
|
||||
if len(keys) < restoreChunkKeys {
|
||||
// partial set implies final set
|
||||
break
|
||||
}
|
||||
// next set begins after where this one ended
|
||||
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
|
||||
newMin.sub++
|
||||
revToBytes(newMin, min)
|
||||
// restore the tree index from the unordered index.
|
||||
for _, v := range unordered {
|
||||
s.kvindex.Insert(v)
|
||||
}
|
||||
close(rkvc)
|
||||
s.currentRev = <-revc
|
||||
|
||||
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
|
||||
// the correct revision should be set to compaction revision in the case, not the largest revision
|
||||
// we have seen.
|
||||
if s.currentRev < s.compactMainRev {
|
||||
s.currentRev = s.compactMainRev
|
||||
}
|
||||
if scheduledCompact <= s.compactMainRev {
|
||||
scheduledCompact = 0
|
||||
if s.currentRev.main < s.compactMainRev {
|
||||
s.currentRev.main = s.compactMainRev
|
||||
}
|
||||
|
||||
for key, lid := range keyToLease {
|
||||
@@ -314,6 +455,15 @@ func (s *store) restore() error {
|
||||
}
|
||||
}
|
||||
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
||||
scheduledCompact := int64(0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
|
||||
if scheduledCompact <= s.compactMainRev {
|
||||
scheduledCompact = 0
|
||||
}
|
||||
}
|
||||
|
||||
tx.Unlock()
|
||||
|
||||
if scheduledCompact != 0 {
|
||||
@@ -324,75 +474,6 @@ func (s *store) restore() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type revKeyValue struct {
|
||||
key []byte
|
||||
kv mvccpb.KeyValue
|
||||
kstr string
|
||||
}
|
||||
|
||||
func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
|
||||
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
|
||||
go func() {
|
||||
currentRev := int64(1)
|
||||
defer func() { revc <- currentRev }()
|
||||
// restore the tree index from streaming the unordered index.
|
||||
kiCache := make(map[string]*keyIndex, restoreChunkKeys)
|
||||
for rkv := range rkvc {
|
||||
ki, ok := kiCache[rkv.kstr]
|
||||
// purge kiCache if many keys but still missing in the cache
|
||||
if !ok && len(kiCache) >= restoreChunkKeys {
|
||||
i := 10
|
||||
for k := range kiCache {
|
||||
delete(kiCache, k)
|
||||
if i--; i == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// cache miss, fetch from tree index if there
|
||||
if !ok {
|
||||
ki = &keyIndex{key: rkv.kv.Key}
|
||||
if idxKey := idx.KeyIndex(ki); idxKey != nil {
|
||||
kiCache[rkv.kstr], ki = idxKey, idxKey
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
rev := bytesToRev(rkv.key)
|
||||
currentRev = rev.main
|
||||
if ok {
|
||||
if isTombstone(rkv.key) {
|
||||
ki.tombstone(rev.main, rev.sub)
|
||||
continue
|
||||
}
|
||||
ki.put(rev.main, rev.sub)
|
||||
} else if !isTombstone(rkv.key) {
|
||||
ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
|
||||
idx.Insert(ki)
|
||||
kiCache[rkv.kstr] = ki
|
||||
}
|
||||
}
|
||||
}()
|
||||
return rkvc, revc
|
||||
}
|
||||
|
||||
func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
|
||||
for i, key := range keys {
|
||||
rkv := revKeyValue{key: key}
|
||||
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
|
||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||
}
|
||||
rkv.kstr = string(rkv.kv.Key)
|
||||
if isTombstone(key) {
|
||||
delete(keyToLease, rkv.kstr)
|
||||
} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
|
||||
keyToLease[rkv.kstr] = lid
|
||||
} else {
|
||||
delete(keyToLease, rkv.kstr)
|
||||
}
|
||||
kvc <- rkv
|
||||
}
|
||||
}
|
||||
|
||||
func (s *store) Close() error {
|
||||
close(s.stopc)
|
||||
s.fifoSched.Stop()
|
||||
@@ -409,10 +490,180 @@ func (a *store) Equal(b *store) bool {
|
||||
return a.kvindex.Equal(b.kvindex)
|
||||
}
|
||||
|
||||
func (s *store) saveIndex(tx backend.BatchTx) {
|
||||
// range is a keyword in Go, add Keys suffix.
|
||||
func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) {
|
||||
curRev = int64(s.currentRev.main)
|
||||
if s.currentRev.sub > 0 {
|
||||
curRev += 1
|
||||
}
|
||||
|
||||
if rangeRev > curRev {
|
||||
return nil, -1, s.currentRev.main, ErrFutureRev
|
||||
}
|
||||
var rev int64
|
||||
if rangeRev <= 0 {
|
||||
rev = curRev
|
||||
} else {
|
||||
rev = rangeRev
|
||||
}
|
||||
if rev < s.compactMainRev {
|
||||
return nil, -1, 0, ErrCompacted
|
||||
}
|
||||
|
||||
_, revpairs := s.kvindex.Range(key, end, int64(rev))
|
||||
if len(revpairs) == 0 {
|
||||
return nil, 0, curRev, nil
|
||||
}
|
||||
if countOnly {
|
||||
return nil, len(revpairs), curRev, nil
|
||||
}
|
||||
|
||||
for _, revpair := range revpairs {
|
||||
start, end := revBytesRange(revpair)
|
||||
|
||||
_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
|
||||
if len(vs) != 1 {
|
||||
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
|
||||
}
|
||||
|
||||
var kv mvccpb.KeyValue
|
||||
if err := kv.Unmarshal(vs[0]); err != nil {
|
||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||
}
|
||||
kvs = append(kvs, kv)
|
||||
if limit > 0 && len(kvs) >= int(limit) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return kvs, len(revpairs), curRev, nil
|
||||
}
|
||||
|
||||
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
s.txnModify = true
|
||||
|
||||
rev := s.currentRev.main + 1
|
||||
c := rev
|
||||
oldLease := lease.NoLease
|
||||
|
||||
// if the key exists before, use its previous created and
|
||||
// get its previous leaseID
|
||||
_, created, ver, err := s.kvindex.Get(key, rev)
|
||||
if err == nil {
|
||||
c = created.main
|
||||
oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
|
||||
}
|
||||
|
||||
ibytes := newRevBytes()
|
||||
revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
|
||||
|
||||
ver = ver + 1
|
||||
kv := mvccpb.KeyValue{
|
||||
Key: key,
|
||||
Value: value,
|
||||
CreateRevision: c,
|
||||
ModRevision: rev,
|
||||
Version: ver,
|
||||
Lease: int64(leaseID),
|
||||
}
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||
s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
|
||||
s.changes = append(s.changes, kv)
|
||||
s.currentRev.sub += 1
|
||||
|
||||
if oldLease != lease.NoLease {
|
||||
if s.le == nil {
|
||||
panic("no lessor to detach lease")
|
||||
}
|
||||
|
||||
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||
if err != nil {
|
||||
plog.Errorf("unexpected error from lease detach: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if leaseID != lease.NoLease {
|
||||
if s.le == nil {
|
||||
panic("no lessor to attach lease")
|
||||
}
|
||||
|
||||
err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
|
||||
if err != nil {
|
||||
panic("unexpected error from lease Attach")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *store) deleteRange(key, end []byte) int64 {
|
||||
s.txnModify = true
|
||||
|
||||
rrev := s.currentRev.main
|
||||
if s.currentRev.sub > 0 {
|
||||
rrev += 1
|
||||
}
|
||||
keys, revs := s.kvindex.Range(key, end, rrev)
|
||||
|
||||
if len(keys) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
for i, key := range keys {
|
||||
s.delete(key, revs[i])
|
||||
}
|
||||
return int64(len(keys))
|
||||
}
|
||||
|
||||
func (s *store) delete(key []byte, rev revision) {
|
||||
mainrev := s.currentRev.main + 1
|
||||
|
||||
ibytes := newRevBytes()
|
||||
revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
|
||||
ibytes = appendMarkTombstone(ibytes)
|
||||
|
||||
kv := mvccpb.KeyValue{
|
||||
Key: key,
|
||||
}
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||
err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
|
||||
}
|
||||
s.changes = append(s.changes, kv)
|
||||
s.currentRev.sub += 1
|
||||
|
||||
item := lease.LeaseItem{Key: string(key)}
|
||||
leaseID := s.le.GetLease(item)
|
||||
|
||||
if leaseID != lease.NoLease {
|
||||
err = s.le.Detach(leaseID, []lease.LeaseItem{item})
|
||||
if err != nil {
|
||||
plog.Errorf("cannot detach %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *store) getChanges() []mvccpb.KeyValue {
|
||||
changes := s.changes
|
||||
s.changes = make([]mvccpb.KeyValue, 0, 4)
|
||||
return changes
|
||||
}
|
||||
|
||||
func (s *store) saveIndex() {
|
||||
if s.ig == nil {
|
||||
return
|
||||
}
|
||||
tx := s.tx
|
||||
bs := s.bytesBuf8
|
||||
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
|
||||
// put the index into the underlying backend
|
||||
|
253
vendor/github.com/coreos/etcd/mvcc/kvstore_txn.go
generated
vendored
253
vendor/github.com/coreos/etcd/mvcc/kvstore_txn.go
generated
vendored
@@ -1,253 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
)
|
||||
|
||||
type storeTxnRead struct {
|
||||
s *store
|
||||
tx backend.ReadTx
|
||||
|
||||
firstRev int64
|
||||
rev int64
|
||||
}
|
||||
|
||||
func (s *store) Read() TxnRead {
|
||||
s.mu.RLock()
|
||||
tx := s.b.ReadTx()
|
||||
s.revMu.RLock()
|
||||
tx.Lock()
|
||||
firstRev, rev := s.compactMainRev, s.currentRev
|
||||
s.revMu.RUnlock()
|
||||
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
|
||||
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
|
||||
|
||||
func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
return tr.rangeKeys(key, end, tr.Rev(), ro)
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) End() {
|
||||
tr.tx.Unlock()
|
||||
tr.s.mu.RUnlock()
|
||||
}
|
||||
|
||||
type storeTxnWrite struct {
|
||||
*storeTxnRead
|
||||
tx backend.BatchTx
|
||||
// beginRev is the revision where the txn begins; it will write to the next revision.
|
||||
beginRev int64
|
||||
changes []mvccpb.KeyValue
|
||||
}
|
||||
|
||||
func (s *store) Write() TxnWrite {
|
||||
s.mu.RLock()
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tw := &storeTxnWrite{
|
||||
storeTxnRead: &storeTxnRead{s, tx, 0, 0},
|
||||
tx: tx,
|
||||
beginRev: s.currentRev,
|
||||
changes: make([]mvccpb.KeyValue, 0, 4),
|
||||
}
|
||||
return newMetricsTxnWrite(tw)
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
|
||||
|
||||
func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
rev := tw.beginRev
|
||||
if len(tw.changes) > 0 {
|
||||
rev++
|
||||
}
|
||||
return tw.rangeKeys(key, end, rev, ro)
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
|
||||
if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
|
||||
return n, int64(tw.beginRev + 1)
|
||||
}
|
||||
return 0, int64(tw.beginRev)
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
|
||||
tw.put(key, value, lease)
|
||||
return int64(tw.beginRev + 1)
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) End() {
|
||||
// only update index if the txn modifies the mvcc state.
|
||||
if len(tw.changes) != 0 {
|
||||
tw.s.saveIndex(tw.tx)
|
||||
// hold revMu lock to prevent new read txns from opening until writeback.
|
||||
tw.s.revMu.Lock()
|
||||
tw.s.currentRev++
|
||||
}
|
||||
tw.tx.Unlock()
|
||||
if len(tw.changes) != 0 {
|
||||
tw.s.revMu.Unlock()
|
||||
}
|
||||
tw.s.mu.RUnlock()
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
||||
rev := ro.Rev
|
||||
if rev > curRev {
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
||||
}
|
||||
if rev <= 0 {
|
||||
rev = curRev
|
||||
}
|
||||
if rev < tr.s.compactMainRev {
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
|
||||
}
|
||||
|
||||
_, revpairs := tr.s.kvindex.Range(key, end, int64(rev))
|
||||
if len(revpairs) == 0 {
|
||||
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
|
||||
}
|
||||
if ro.Count {
|
||||
return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
|
||||
}
|
||||
|
||||
var kvs []mvccpb.KeyValue
|
||||
for _, revpair := range revpairs {
|
||||
start, end := revBytesRange(revpair)
|
||||
_, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0)
|
||||
if len(vs) != 1 {
|
||||
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
|
||||
}
|
||||
|
||||
var kv mvccpb.KeyValue
|
||||
if err := kv.Unmarshal(vs[0]); err != nil {
|
||||
plog.Fatalf("cannot unmarshal event: %v", err)
|
||||
}
|
||||
kvs = append(kvs, kv)
|
||||
if ro.Limit > 0 && len(kvs) >= int(ro.Limit) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
rev := tw.beginRev + 1
|
||||
c := rev
|
||||
oldLease := lease.NoLease
|
||||
|
||||
// if the key exists before, use its previous created and
|
||||
// get its previous leaseID
|
||||
_, created, ver, err := tw.s.kvindex.Get(key, rev)
|
||||
if err == nil {
|
||||
c = created.main
|
||||
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
|
||||
}
|
||||
|
||||
ibytes := newRevBytes()
|
||||
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
|
||||
revToBytes(idxRev, ibytes)
|
||||
|
||||
ver = ver + 1
|
||||
kv := mvccpb.KeyValue{
|
||||
Key: key,
|
||||
Value: value,
|
||||
CreateRevision: c,
|
||||
ModRevision: rev,
|
||||
Version: ver,
|
||||
Lease: int64(leaseID),
|
||||
}
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||
tw.s.kvindex.Put(key, idxRev)
|
||||
tw.changes = append(tw.changes, kv)
|
||||
|
||||
if oldLease != lease.NoLease {
|
||||
if tw.s.le == nil {
|
||||
panic("no lessor to detach lease")
|
||||
}
|
||||
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||
if err != nil {
|
||||
plog.Errorf("unexpected error from lease detach: %v", err)
|
||||
}
|
||||
}
|
||||
if leaseID != lease.NoLease {
|
||||
if tw.s.le == nil {
|
||||
panic("no lessor to attach lease")
|
||||
}
|
||||
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
|
||||
if err != nil {
|
||||
panic("unexpected error from lease Attach")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
|
||||
rrev := tw.beginRev
|
||||
if len(tw.changes) > 0 {
|
||||
rrev += 1
|
||||
}
|
||||
keys, revs := tw.s.kvindex.Range(key, end, rrev)
|
||||
if len(keys) == 0 {
|
||||
return 0
|
||||
}
|
||||
for i, key := range keys {
|
||||
tw.delete(key, revs[i])
|
||||
}
|
||||
return int64(len(keys))
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) delete(key []byte, rev revision) {
|
||||
ibytes := newRevBytes()
|
||||
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
|
||||
revToBytes(idxRev, ibytes)
|
||||
ibytes = appendMarkTombstone(ibytes)
|
||||
|
||||
kv := mvccpb.KeyValue{Key: key}
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
|
||||
err = tw.s.kvindex.Tombstone(key, idxRev)
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
|
||||
}
|
||||
tw.changes = append(tw.changes, kv)
|
||||
|
||||
item := lease.LeaseItem{Key: string(key)}
|
||||
leaseID := tw.s.le.GetLease(item)
|
||||
|
||||
if leaseID != lease.NoLease {
|
||||
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
|
||||
if err != nil {
|
||||
plog.Errorf("cannot detach %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
|
15
vendor/github.com/coreos/etcd/mvcc/metrics.go
generated
vendored
15
vendor/github.com/coreos/etcd/mvcc/metrics.go
generated
vendored
@@ -15,8 +15,6 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
@@ -131,21 +129,12 @@ var (
|
||||
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
|
||||
})
|
||||
|
||||
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
dbTotalSize = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd_debugging",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_total_size_in_bytes",
|
||||
Help: "Total size of the underlying database in bytes.",
|
||||
},
|
||||
func() float64 {
|
||||
reportDbTotalSizeInBytesMu.RLock()
|
||||
defer reportDbTotalSizeInBytesMu.RUnlock()
|
||||
return reportDbTotalSizeInBytes()
|
||||
},
|
||||
)
|
||||
// overridden by mvcc initialization
|
||||
reportDbTotalSizeInBytesMu sync.RWMutex
|
||||
reportDbTotalSizeInBytes func() float64 = func() float64 { return 0 }
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
67
vendor/github.com/coreos/etcd/mvcc/metrics_txn.go
generated
vendored
67
vendor/github.com/coreos/etcd/mvcc/metrics_txn.go
generated
vendored
@@ -1,67 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/lease"
|
||||
)
|
||||
|
||||
type metricsTxnWrite struct {
|
||||
TxnWrite
|
||||
ranges uint
|
||||
puts uint
|
||||
deletes uint
|
||||
}
|
||||
|
||||
func newMetricsTxnRead(tr TxnRead) TxnRead {
|
||||
return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
|
||||
}
|
||||
|
||||
func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
|
||||
return &metricsTxnWrite{tw, 0, 0, 0}
|
||||
}
|
||||
|
||||
func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||
tw.ranges++
|
||||
return tw.TxnWrite.Range(key, end, ro)
|
||||
}
|
||||
|
||||
func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
|
||||
tw.deletes++
|
||||
return tw.TxnWrite.DeleteRange(key, end)
|
||||
}
|
||||
|
||||
func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
tw.puts++
|
||||
return tw.TxnWrite.Put(key, value, lease)
|
||||
}
|
||||
|
||||
func (tw *metricsTxnWrite) End() {
|
||||
defer tw.TxnWrite.End()
|
||||
if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 {
|
||||
if sum > 1 {
|
||||
txnCounter.Inc()
|
||||
}
|
||||
return
|
||||
}
|
||||
switch {
|
||||
case tw.ranges == 1:
|
||||
rangeCounter.Inc()
|
||||
case tw.puts == 1:
|
||||
putCounter.Inc()
|
||||
case tw.deletes == 1:
|
||||
deleteCounter.Inc()
|
||||
}
|
||||
}
|
2
vendor/github.com/coreos/etcd/mvcc/mvccpb/kv.pb.go
generated
vendored
2
vendor/github.com/coreos/etcd/mvcc/mvccpb/kv.pb.go
generated
vendored
@@ -713,7 +713,7 @@ func init() { proto.RegisterFile("kv.proto", fileDescriptorKv) }
|
||||
|
||||
var fileDescriptorKv = []byte{
|
||||
// 303 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
|
||||
0x14, 0x86, 0x3b, 0x14, 0x0a, 0x3e, 0x08, 0x36, 0x13, 0x12, 0x27, 0x2e, 0x26, 0x95, 0x8d, 0x18,
|
||||
0x13, 0x4c, 0xf0, 0x06, 0xc6, 0xae, 0x70, 0x61, 0x1a, 0x74, 0x4b, 0x4a, 0x79, 0x21, 0xa4, 0x94,
|
||||
0x69, 0x4a, 0x9d, 0xa4, 0x37, 0x71, 0xef, 0xde, 0x73, 0xb0, 0xe4, 0x08, 0x52, 0x2f, 0x62, 0xfa,
|
||||
|
165
vendor/github.com/coreos/etcd/mvcc/watchable_store.go
generated
vendored
165
vendor/github.com/coreos/etcd/mvcc/watchable_store.go
generated
vendored
@@ -41,11 +41,9 @@ type watchable interface {
|
||||
}
|
||||
|
||||
type watchableStore struct {
|
||||
*store
|
||||
mu sync.Mutex
|
||||
|
||||
// mu protects watcher groups and batches. It should never be locked
|
||||
// before locking store.mu to avoid deadlock.
|
||||
mu sync.RWMutex
|
||||
*store
|
||||
|
||||
// victims are watcher batches that were blocked on the watch channel
|
||||
victims []watcherBatch
|
||||
@@ -78,11 +76,9 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s.store.ReadView = &readView{s}
|
||||
s.store.WriteView = &writeView{s}
|
||||
if s.le != nil {
|
||||
// use this store as the deleter so revokes trigger watch events
|
||||
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
|
||||
s.le.SetRangeDeleter(s)
|
||||
}
|
||||
s.wg.Add(2)
|
||||
go s.syncWatchersLoop()
|
||||
@@ -90,6 +86,89 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
rev = s.store.Put(key, value, lease)
|
||||
changes := s.store.getChanges()
|
||||
if len(changes) != 1 {
|
||||
plog.Panicf("unexpected len(changes) != 1 after put")
|
||||
}
|
||||
|
||||
ev := mvccpb.Event{
|
||||
Type: mvccpb.PUT,
|
||||
Kv: &changes[0],
|
||||
}
|
||||
s.notify(rev, []mvccpb.Event{ev})
|
||||
return rev
|
||||
}
|
||||
|
||||
func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
n, rev = s.store.DeleteRange(key, end)
|
||||
changes := s.store.getChanges()
|
||||
|
||||
if len(changes) != int(n) {
|
||||
plog.Panicf("unexpected len(changes) != n after deleteRange")
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
return n, rev
|
||||
}
|
||||
|
||||
evs := make([]mvccpb.Event, n)
|
||||
for i := range changes {
|
||||
evs[i] = mvccpb.Event{
|
||||
Type: mvccpb.DELETE,
|
||||
Kv: &changes[i]}
|
||||
evs[i].Kv.ModRevision = rev
|
||||
}
|
||||
s.notify(rev, evs)
|
||||
return n, rev
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnBegin() int64 {
|
||||
s.mu.Lock()
|
||||
return s.store.TxnBegin()
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnEnd(txnID int64) error {
|
||||
err := s.store.TxnEnd(txnID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
changes := s.getChanges()
|
||||
if len(changes) == 0 {
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
rev := s.store.Rev()
|
||||
evs := make([]mvccpb.Event, len(changes))
|
||||
for i, change := range changes {
|
||||
switch change.CreateRevision {
|
||||
case 0:
|
||||
evs[i] = mvccpb.Event{
|
||||
Type: mvccpb.DELETE,
|
||||
Kv: &changes[i]}
|
||||
evs[i].Kv.ModRevision = rev
|
||||
default:
|
||||
evs[i] = mvccpb.Event{
|
||||
Type: mvccpb.PUT,
|
||||
Kv: &changes[i]}
|
||||
}
|
||||
}
|
||||
|
||||
s.notify(rev, evs)
|
||||
s.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *watchableStore) Close() error {
|
||||
close(s.stopc)
|
||||
s.wg.Wait()
|
||||
@@ -107,6 +186,9 @@ func (s *watchableStore) NewWatchStream() WatchStream {
|
||||
}
|
||||
|
||||
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
wa := &watcher{
|
||||
key: key,
|
||||
end: end,
|
||||
@@ -116,24 +198,21 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
|
||||
fcs: fcs,
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.revMu.RLock()
|
||||
synced := startRev > s.store.currentRev || startRev == 0
|
||||
s.store.mu.Lock()
|
||||
synced := startRev > s.store.currentRev.main || startRev == 0
|
||||
if synced {
|
||||
wa.minRev = s.store.currentRev + 1
|
||||
wa.minRev = s.store.currentRev.main + 1
|
||||
if startRev > wa.minRev {
|
||||
wa.minRev = startRev
|
||||
}
|
||||
}
|
||||
s.store.mu.Unlock()
|
||||
if synced {
|
||||
s.synced.add(wa)
|
||||
} else {
|
||||
slowWatcherGauge.Inc()
|
||||
s.unsynced.add(wa)
|
||||
}
|
||||
s.revMu.RUnlock()
|
||||
s.mu.Unlock()
|
||||
|
||||
watcherGauge.Inc()
|
||||
|
||||
return wa, func() { s.cancelWatcher(wa) }
|
||||
@@ -179,35 +258,17 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *watchableStore) Restore(b backend.Backend) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
err := s.store.Restore(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for wa := range s.synced.watchers {
|
||||
s.unsynced.watchers.add(wa)
|
||||
}
|
||||
s.synced = newWatcherGroup()
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
|
||||
func (s *watchableStore) syncWatchersLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
s.mu.RLock()
|
||||
s.mu.Lock()
|
||||
st := time.Now()
|
||||
lastUnsyncedWatchers := s.unsynced.size()
|
||||
s.mu.RUnlock()
|
||||
|
||||
unsyncedWatchers := 0
|
||||
if lastUnsyncedWatchers > 0 {
|
||||
unsyncedWatchers = s.syncWatchers()
|
||||
}
|
||||
s.syncWatchers()
|
||||
unsyncedWatchers := s.unsynced.size()
|
||||
s.mu.Unlock()
|
||||
syncDuration := time.Since(st)
|
||||
|
||||
waitDuration := 100 * time.Millisecond
|
||||
@@ -234,9 +295,9 @@ func (s *watchableStore) syncVictimsLoop() {
|
||||
for s.moveVictims() != 0 {
|
||||
// try to update all victim watchers
|
||||
}
|
||||
s.mu.RLock()
|
||||
s.mu.Lock()
|
||||
isEmpty := len(s.victims) == 0
|
||||
s.mu.RUnlock()
|
||||
s.mu.Unlock()
|
||||
|
||||
var tickc <-chan time.Time
|
||||
if !isEmpty {
|
||||
@@ -279,8 +340,8 @@ func (s *watchableStore) moveVictims() (moved int) {
|
||||
|
||||
// assign completed victim watchers to unsync/sync
|
||||
s.mu.Lock()
|
||||
s.store.revMu.RLock()
|
||||
curRev := s.store.currentRev
|
||||
s.store.mu.Lock()
|
||||
curRev := s.store.currentRev.main
|
||||
for w, eb := range wb {
|
||||
if newVictim != nil && newVictim[w] != nil {
|
||||
// couldn't send watch response; stays victim
|
||||
@@ -297,7 +358,7 @@ func (s *watchableStore) moveVictims() (moved int) {
|
||||
s.synced.add(w)
|
||||
}
|
||||
}
|
||||
s.store.revMu.RUnlock()
|
||||
s.store.mu.Unlock()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -315,23 +376,19 @@ func (s *watchableStore) moveVictims() (moved int) {
|
||||
// 2. iterate over the set to get the minimum revision and remove compacted watchers
|
||||
// 3. use minimum revision to get all key-value pairs and send those events to watchers
|
||||
// 4. remove synced watchers in set from unsynced group and move to synced group
|
||||
func (s *watchableStore) syncWatchers() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
func (s *watchableStore) syncWatchers() {
|
||||
if s.unsynced.size() == 0 {
|
||||
return 0
|
||||
return
|
||||
}
|
||||
|
||||
s.store.revMu.RLock()
|
||||
defer s.store.revMu.RUnlock()
|
||||
s.store.mu.Lock()
|
||||
defer s.store.mu.Unlock()
|
||||
|
||||
// in order to find key-value pairs from unsynced watchers, we need to
|
||||
// find min revision index, and these revisions can be used to
|
||||
// query the backend store of key-value pairs
|
||||
curRev := s.store.currentRev
|
||||
curRev := s.store.currentRev.main
|
||||
compactionRev := s.store.compactMainRev
|
||||
|
||||
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
|
||||
minBytes, maxBytes := newRevBytes(), newRevBytes()
|
||||
revToBytes(revision{main: minRev}, minBytes)
|
||||
@@ -339,7 +396,7 @@ func (s *watchableStore) syncWatchers() int {
|
||||
|
||||
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
||||
// values are actual key-value pairs in backend.
|
||||
tx := s.store.b.ReadTx()
|
||||
tx := s.store.b.BatchTx()
|
||||
tx.Lock()
|
||||
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
|
||||
evs := kvsToEvents(wg, revs, vs)
|
||||
@@ -389,8 +446,6 @@ func (s *watchableStore) syncWatchers() int {
|
||||
vsz += len(v)
|
||||
}
|
||||
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
|
||||
|
||||
return s.unsynced.size()
|
||||
}
|
||||
|
||||
// kvsToEvents gets all events for the watchers from all key-value pairs
|
||||
@@ -456,8 +511,8 @@ func (s *watchableStore) addVictim(victim watcherBatch) {
|
||||
func (s *watchableStore) rev() int64 { return s.store.Rev() }
|
||||
|
||||
func (s *watchableStore) progress(w *watcher) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if _, ok := s.synced.watchers[w]; ok {
|
||||
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
|
||||
|
53
vendor/github.com/coreos/etcd/mvcc/watchable_store_txn.go
generated
vendored
53
vendor/github.com/coreos/etcd/mvcc/watchable_store_txn.go
generated
vendored
@@ -1,53 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
)
|
||||
|
||||
func (tw *watchableStoreTxnWrite) End() {
|
||||
changes := tw.Changes()
|
||||
if len(changes) == 0 {
|
||||
tw.TxnWrite.End()
|
||||
return
|
||||
}
|
||||
|
||||
rev := tw.Rev() + 1
|
||||
evs := make([]mvccpb.Event, len(changes))
|
||||
for i, change := range changes {
|
||||
evs[i].Kv = &changes[i]
|
||||
if change.CreateRevision == 0 {
|
||||
evs[i].Type = mvccpb.DELETE
|
||||
evs[i].Kv.ModRevision = rev
|
||||
} else {
|
||||
evs[i].Type = mvccpb.PUT
|
||||
}
|
||||
}
|
||||
|
||||
// end write txn under watchable store lock so the updates are visible
|
||||
// when asynchronous event posting checks the current store revision
|
||||
tw.s.mu.Lock()
|
||||
tw.s.notify(rev, evs)
|
||||
tw.TxnWrite.End()
|
||||
tw.s.mu.Unlock()
|
||||
}
|
||||
|
||||
type watchableStoreTxnWrite struct {
|
||||
TxnWrite
|
||||
s *watchableStore
|
||||
}
|
||||
|
||||
func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }
|
2
vendor/github.com/coreos/etcd/mvcc/watcher_group.go
generated
vendored
2
vendor/github.com/coreos/etcd/mvcc/watcher_group.go
generated
vendored
@@ -183,7 +183,7 @@ func (wg *watcherGroup) add(wa *watcher) {
|
||||
// contains is whether the given key has a watcher in the group.
|
||||
func (wg *watcherGroup) contains(key string) bool {
|
||||
_, ok := wg.keyWatchers[key]
|
||||
return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
|
||||
return ok || wg.ranges.Contains(adt.NewStringAffinePoint(key))
|
||||
}
|
||||
|
||||
// size gives the number of unique watchers in the group.
|
||||
|
Reference in New Issue
Block a user