Level sets dependency graph to consume etcd 3.1.5

This commit is contained in:
Timothy St. Clair
2017-04-04 20:54:55 -05:00
parent 1c34102d5b
commit 93c051e28f
392 changed files with 39050 additions and 21582 deletions

View File

@@ -20,7 +20,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
@@ -40,7 +40,7 @@ var (
// This only works for linux.
InitialMmapSize = int64(10 * 1024 * 1024 * 1024)
plog = capnslog.NewPackageLogger("github.com/coreos/etcd/mvcc", "backend")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc/backend")
)
const (
@@ -303,6 +303,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
@@ -319,6 +320,8 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for seq write in for each
count = 0
}
return tmpb.Put(k, v)
@@ -334,7 +337,7 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
if err != nil {
plog.Fatal(err)
}
tmpPath := path.Join(dir, "database")
tmpPath := filepath.Join(dir, "database")
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
}

View File

@@ -162,11 +162,26 @@ func (t *batchTx) commit(stop bool) {
if t.pending == 0 && !stop {
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
atomic.StoreInt64(&t.backend.size, t.tx.Size())
// batchTx.commit(true) calls *bolt.Tx.Commit, which
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
// and subsequent *bolt.Tx.Size() call panics.
//
// This nil pointer reference panic happens when:
// 1. batchTx.commit(false) from newBatchTx
// 2. batchTx.commit(true) from stopping backend
// 3. batchTx.commit(false) from inflight mvcc Hash call
//
// Check if db is nil to prevent this panic
if t.tx.DB() != nil {
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}
return
}
start := time.Now()
// gofail: var beforeCommit struct{}
err = t.tx.Commit()
// gofail: var afterCommit struct{}
commitDurations.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)

View File

@@ -25,11 +25,11 @@ type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Put(key []byte, rev revision)
Restore(key []byte, created, modified revision, ver int64)
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
}
type treeIndex struct {
@@ -58,21 +58,6 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
okeyi.put(rev.main, rev.sub)
}
func (ti *treeIndex) Restore(key []byte, created, modified revision, ver int64) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.restore(created, modified, ver)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(modified.main, modified.sub)
}
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}
@@ -215,3 +200,9 @@ func (a *treeIndex) Equal(bi index) bool {
return equal
}
func (ti *treeIndex) Insert(ki *keyIndex) {
ti.Lock()
defer ti.Unlock()
ti.tree.ReplaceOrInsert(ki)
}

View File

@@ -74,8 +74,9 @@ type store struct {
// the main revision of the last compaction
compactMainRev int64
tx backend.BatchTx
txnID int64 // tracks the current txnID to verify txn operations
tx backend.BatchTx
txnID int64 // tracks the current txnID to verify txn operations
txnModify bool
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
@@ -180,7 +181,6 @@ func (s *store) TxnBegin() int64 {
s.currentRev.sub = 0
s.tx = s.b.BatchTx()
s.tx.Lock()
s.saveIndex()
s.txnID = rand.Int63()
return s.txnID
@@ -203,6 +203,14 @@ func (s *store) txnEnd(txnID int64) error {
return ErrTxnIDMismatch
}
// only update index if the txn modifies the mvcc state.
// read only txn might execute with one write txn concurrently,
// it should not write its index to mvcc.
if s.txnModify {
s.saveIndex()
}
s.txnModify = false
s.tx.Unlock()
if s.currentRev.sub != 0 {
s.currentRev.main += 1
@@ -314,20 +322,23 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
return ch, nil
}
func (s *store) Hash() (uint32, int64, error) {
s.b.ForceCommit()
// DefaultIgnores is a map of keys to ignore in hash checking.
var DefaultIgnores map[backend.IgnoreKey]struct{}
func init() {
DefaultIgnores = map[backend.IgnoreKey]struct{}{
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
}
}
func (s *store) Hash() (uint32, int64, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.b.ForceCommit()
// ignore hash consistent index field for now.
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
ignores := make(map[backend.IgnoreKey]struct{})
bk := backend.IgnoreKey{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}
ignores[bk] = struct{}{}
h, err := s.b.Hash(ignores)
h, err := s.b.Hash(DefaultIgnores)
rev := s.currentRev.main
return h, rev, err
}
@@ -369,6 +380,11 @@ func (s *store) restore() error {
keyToLease := make(map[string]lease.LeaseID)
// use an unordered map to hold the temp index data to speed up
// the initial key index recovery.
// we will convert this unordered map into the tree index later.
unordered := make(map[string]*keyIndex, 100000)
// restore index
tx := s.b.BatchTx()
tx.Lock()
@@ -391,11 +407,20 @@ func (s *store) restore() error {
// restore index
switch {
case isTombstone(key):
s.kvindex.Tombstone(kv.Key, rev)
if ki, ok := unordered[string(kv.Key)]; ok {
ki.tombstone(rev.main, rev.sub)
}
delete(keyToLease, string(kv.Key))
default:
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
ki, ok := unordered[string(kv.Key)]
if ok {
ki.put(rev.main, rev.sub)
} else {
ki = &keyIndex{key: kv.Key}
ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
unordered[string(kv.Key)] = ki
}
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[string(kv.Key)] = lid
@@ -408,6 +433,11 @@ func (s *store) restore() error {
s.currentRev = rev
}
// restore the tree index from the unordered index.
for _, v := range unordered {
s.kvindex.Insert(v)
}
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
@@ -509,23 +539,18 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool
}
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
s.txnModify = true
rev := s.currentRev.main + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
grev, created, ver, err := s.kvindex.Get(key, rev)
_, created, ver, err := s.kvindex.Get(key, rev)
if err == nil {
c = created.main
ibytes := newRevBytes()
revToBytes(grev, ibytes)
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
var kv mvccpb.KeyValue
if err = kv.Unmarshal(vs[0]); err != nil {
plog.Fatalf("cannot unmarshal value: %v", err)
}
oldLease = lease.LeaseID(kv.Lease)
oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
@@ -575,6 +600,8 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
}
func (s *store) deleteRange(key, end []byte) int64 {
s.txnModify = true
rrev := s.currentRev.main
if s.currentRev.sub > 0 {
rrev += 1
@@ -615,17 +642,11 @@ func (s *store) delete(key []byte, rev revision) {
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
ibytes = newRevBytes()
revToBytes(rev, ibytes)
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
item := lease.LeaseItem{Key: string(key)}
leaseID := s.le.GetLease(item)
kv.Reset()
if err = kv.Unmarshal(vs[0]); err != nil {
plog.Fatalf("cannot unmarshal value: %v", err)
}
if lease.LeaseID(kv.Lease) != lease.NoLease {
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if leaseID != lease.NoLease {
err = s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
plog.Errorf("cannot detach %v", err)
}

View File

@@ -31,7 +31,9 @@ var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
const _ = proto.ProtoPackageIsVersion1
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Event_EventType int32
@@ -103,91 +105,91 @@ func init() {
proto.RegisterType((*Event)(nil), "mvccpb.Event")
proto.RegisterEnum("mvccpb.Event_EventType", Event_EventType_name, Event_EventType_value)
}
func (m *KeyValue) Marshal() (data []byte, err error) {
func (m *KeyValue) Marshal() (dAtA []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return data[:n], nil
return dAtA[:n], nil
}
func (m *KeyValue) MarshalTo(data []byte) (int, error) {
func (m *KeyValue) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Key) > 0 {
data[i] = 0xa
dAtA[i] = 0xa
i++
i = encodeVarintKv(data, i, uint64(len(m.Key)))
i += copy(data[i:], m.Key)
i = encodeVarintKv(dAtA, i, uint64(len(m.Key)))
i += copy(dAtA[i:], m.Key)
}
if m.CreateRevision != 0 {
data[i] = 0x10
dAtA[i] = 0x10
i++
i = encodeVarintKv(data, i, uint64(m.CreateRevision))
i = encodeVarintKv(dAtA, i, uint64(m.CreateRevision))
}
if m.ModRevision != 0 {
data[i] = 0x18
dAtA[i] = 0x18
i++
i = encodeVarintKv(data, i, uint64(m.ModRevision))
i = encodeVarintKv(dAtA, i, uint64(m.ModRevision))
}
if m.Version != 0 {
data[i] = 0x20
dAtA[i] = 0x20
i++
i = encodeVarintKv(data, i, uint64(m.Version))
i = encodeVarintKv(dAtA, i, uint64(m.Version))
}
if len(m.Value) > 0 {
data[i] = 0x2a
dAtA[i] = 0x2a
i++
i = encodeVarintKv(data, i, uint64(len(m.Value)))
i += copy(data[i:], m.Value)
i = encodeVarintKv(dAtA, i, uint64(len(m.Value)))
i += copy(dAtA[i:], m.Value)
}
if m.Lease != 0 {
data[i] = 0x30
dAtA[i] = 0x30
i++
i = encodeVarintKv(data, i, uint64(m.Lease))
i = encodeVarintKv(dAtA, i, uint64(m.Lease))
}
return i, nil
}
func (m *Event) Marshal() (data []byte, err error) {
func (m *Event) Marshal() (dAtA []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return data[:n], nil
return dAtA[:n], nil
}
func (m *Event) MarshalTo(data []byte) (int, error) {
func (m *Event) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Type != 0 {
data[i] = 0x8
dAtA[i] = 0x8
i++
i = encodeVarintKv(data, i, uint64(m.Type))
i = encodeVarintKv(dAtA, i, uint64(m.Type))
}
if m.Kv != nil {
data[i] = 0x12
dAtA[i] = 0x12
i++
i = encodeVarintKv(data, i, uint64(m.Kv.Size()))
n1, err := m.Kv.MarshalTo(data[i:])
i = encodeVarintKv(dAtA, i, uint64(m.Kv.Size()))
n1, err := m.Kv.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
if m.PrevKv != nil {
data[i] = 0x1a
dAtA[i] = 0x1a
i++
i = encodeVarintKv(data, i, uint64(m.PrevKv.Size()))
n2, err := m.PrevKv.MarshalTo(data[i:])
i = encodeVarintKv(dAtA, i, uint64(m.PrevKv.Size()))
n2, err := m.PrevKv.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
@@ -196,31 +198,31 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
return i, nil
}
func encodeFixed64Kv(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
data[offset+4] = uint8(v >> 32)
data[offset+5] = uint8(v >> 40)
data[offset+6] = uint8(v >> 48)
data[offset+7] = uint8(v >> 56)
func encodeFixed64Kv(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
dAtA[offset+4] = uint8(v >> 32)
dAtA[offset+5] = uint8(v >> 40)
dAtA[offset+6] = uint8(v >> 48)
dAtA[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Kv(data []byte, offset int, v uint32) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
func encodeFixed32Kv(dAtA []byte, offset int, v uint32) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintKv(data []byte, offset int, v uint64) int {
func encodeVarintKv(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80)
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
data[offset] = uint8(v)
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *KeyValue) Size() (n int) {
@@ -279,8 +281,8 @@ func sovKv(x uint64) (n int) {
func sozKv(x uint64) (n int) {
return sovKv(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *KeyValue) Unmarshal(data []byte) error {
l := len(data)
func (m *KeyValue) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
@@ -292,7 +294,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
@@ -320,7 +322,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
@@ -334,7 +336,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = append(m.Key[:0], data[iNdEx:postIndex]...)
m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
@@ -351,7 +353,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
m.CreateRevision |= (int64(b) & 0x7F) << shift
if b < 0x80 {
@@ -370,7 +372,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
m.ModRevision |= (int64(b) & 0x7F) << shift
if b < 0x80 {
@@ -389,7 +391,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
m.Version |= (int64(b) & 0x7F) << shift
if b < 0x80 {
@@ -408,7 +410,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
@@ -422,7 +424,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = append(m.Value[:0], data[iNdEx:postIndex]...)
m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...)
if m.Value == nil {
m.Value = []byte{}
}
@@ -439,7 +441,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
m.Lease |= (int64(b) & 0x7F) << shift
if b < 0x80 {
@@ -448,7 +450,7 @@ func (m *KeyValue) Unmarshal(data []byte) error {
}
default:
iNdEx = preIndex
skippy, err := skipKv(data[iNdEx:])
skippy, err := skipKv(dAtA[iNdEx:])
if err != nil {
return err
}
@@ -467,8 +469,8 @@ func (m *KeyValue) Unmarshal(data []byte) error {
}
return nil
}
func (m *Event) Unmarshal(data []byte) error {
l := len(data)
func (m *Event) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
@@ -480,7 +482,7 @@ func (m *Event) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
@@ -508,7 +510,7 @@ func (m *Event) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
m.Type |= (Event_EventType(b) & 0x7F) << shift
if b < 0x80 {
@@ -527,7 +529,7 @@ func (m *Event) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
@@ -544,7 +546,7 @@ func (m *Event) Unmarshal(data []byte) error {
if m.Kv == nil {
m.Kv = &KeyValue{}
}
if err := m.Kv.Unmarshal(data[iNdEx:postIndex]); err != nil {
if err := m.Kv.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
@@ -560,7 +562,7 @@ func (m *Event) Unmarshal(data []byte) error {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
@@ -577,13 +579,13 @@ func (m *Event) Unmarshal(data []byte) error {
if m.PrevKv == nil {
m.PrevKv = &KeyValue{}
}
if err := m.PrevKv.Unmarshal(data[iNdEx:postIndex]); err != nil {
if err := m.PrevKv.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipKv(data[iNdEx:])
skippy, err := skipKv(dAtA[iNdEx:])
if err != nil {
return err
}
@@ -602,8 +604,8 @@ func (m *Event) Unmarshal(data []byte) error {
}
return nil
}
func skipKv(data []byte) (n int, err error) {
l := len(data)
func skipKv(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
@@ -614,7 +616,7 @@ func skipKv(data []byte) (n int, err error) {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
@@ -632,7 +634,7 @@ func skipKv(data []byte) (n int, err error) {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if data[iNdEx-1] < 0x80 {
if dAtA[iNdEx-1] < 0x80 {
break
}
}
@@ -649,7 +651,7 @@ func skipKv(data []byte) (n int, err error) {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
@@ -672,7 +674,7 @@ func skipKv(data []byte) (n int, err error) {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
@@ -683,7 +685,7 @@ func skipKv(data []byte) (n int, err error) {
if innerWireType == 4 {
break
}
next, err := skipKv(data[start:])
next, err := skipKv(dAtA[start:])
if err != nil {
return 0, err
}
@@ -707,6 +709,8 @@ var (
ErrIntOverflowKv = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("kv.proto", fileDescriptorKv) }
var fileDescriptorKv = []byte{
// 303 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,

View File

@@ -43,6 +43,7 @@ message Event {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}

View File

@@ -35,7 +35,7 @@ const (
)
type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
rev() int64
}
@@ -185,7 +185,7 @@ func (s *watchableStore) NewWatchStream() WatchStream {
}
}
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) {
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -195,6 +195,7 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
minRev: startRev,
id: id,
ch: ch,
fcs: fcs,
}
s.store.mu.Lock()
@@ -325,10 +326,9 @@ func (s *watchableStore) moveVictims() (moved int) {
for w, eb := range wb {
// watcher has observed the store up to, but not including, w.minRev
rev := w.minRev - 1
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}:
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
default:
} else {
if newVictim == nil {
newVictim = make(watcherBatch)
}
@@ -419,10 +419,9 @@ func (s *watchableStore) syncWatchers() {
w.minRev = eb.moreRev
}
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
default:
} else {
if victims == nil {
victims = make(watcherBatch)
}
@@ -480,10 +479,10 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
if eb.revs != 1 {
plog.Panicf("unexpected multiple revisions in notification")
}
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}:
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
default:
} else {
// move slow watcher to victims
w.minRev = rev + 1
if victim == nil {
@@ -516,12 +515,9 @@ func (s *watchableStore) progress(w *watcher) {
defer s.mu.Unlock()
if _, ok := s.synced.watchers[w]; ok {
select {
case w.ch <- WatchResponse{WatchID: w.id, Revision: s.rev()}:
default:
// If the ch is full, this watcher is receiving events.
// We do not need to send progress at all.
}
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
// If the ch is full, this watcher is receiving events.
// We do not need to send progress at all.
}
}
@@ -542,7 +538,40 @@ type watcher struct {
minRev int64
id WatchID
fcs []FilterFunc
// a chan to send out the watch response.
// The chan might be shared with other watchers.
ch chan<- WatchResponse
}
func (w *watcher) send(wr WatchResponse) bool {
progressEvent := len(wr.Events) == 0
if len(w.fcs) != 0 {
ne := make([]mvccpb.Event, 0, len(wr.Events))
for i := range wr.Events {
filtered := false
for _, filter := range w.fcs {
if filter(wr.Events[i]) {
filtered = true
break
}
}
if !filtered {
ne = append(ne, wr.Events[i])
}
}
wr.Events = ne
}
// if all events are filtered out, we should send nothing.
if !progressEvent && len(wr.Events) == 0 {
return true
}
select {
case w.ch <- wr:
return true
default:
return false
}
}

View File

@@ -28,6 +28,9 @@ var (
type WatchID int64
// FilterFunc returns true if the given event should be filtered out.
type FilterFunc func(e mvccpb.Event) bool
type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or range [key, end) from the given startRev.
@@ -38,7 +41,7 @@ type WatchStream interface {
// The returned `id` is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
//
Watch(key, end []byte, startRev int64) WatchID
Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID
// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
@@ -96,7 +99,7 @@ type watchStream struct {
// Watch creates a new watcher in the stream and returns its WatchID.
// TODO: return error if ws is closed?
func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID {
func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
@@ -112,7 +115,7 @@ func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID {
id := ws.nextID
ws.nextID++
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch)
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
ws.cancels[id] = c
ws.watchers[id] = w

View File

@@ -78,6 +78,10 @@ func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
// newWatcherBatch maps watchers to their matched events. It enables quick
// events look up by watcher.
func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
if len(wg.watchers) == 0 {
return nil
}
wb := make(watcherBatch)
for _, ev := range evs {
for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {