Update etcd godep to 3.0.10 to fix known watch issue.

This commit is contained in:
Timothy St. Clair
2016-09-23 16:05:38 -05:00
parent 69083bcfce
commit a8dbab6631
4 changed files with 135 additions and 126 deletions

View File

@@ -127,6 +127,8 @@ type watchGrpcStream struct {
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// the error that closed the watch stream
closeErr error
@@ -189,11 +191,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
streams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
}
go wgs.run()
return wgs
@@ -242,7 +245,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
@@ -352,15 +354,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
go w.serveStream(ws)
}
// closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
empty := len(w.streams) == 0
if empty && w.stopc != nil {
w.stopc = nil
}
w.mu.Unlock()
return empty
}
// run is the root of the goroutines for managing a watcher client
@@ -464,6 +470,10 @@ func (w *watchGrpcStream) run() {
cancelSet = make(map[int64]struct{})
case <-stopc:
return
case ws := <-w.closingc:
if w.closeStream(ws) {
return
}
}
// send failed; queue for retry
@@ -522,6 +532,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
defer func() {
// signal that this watcherStream is finished
select {
case w.closingc <- ws:
case <-w.donec:
w.closeStream(ws)
}
}()
var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
@@ -602,20 +621,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}
}
w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}
func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {

View File

@@ -49,6 +49,7 @@ var (
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
}
)

View File

@@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.9"
Version = "3.0.10"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"