Merge pull request #505 from lavalamp/clientWatch
Add client watch capability
This commit is contained in:
@@ -18,6 +18,7 @@ package client
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -27,6 +28,8 @@ import (
|
|||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -191,12 +194,7 @@ func (r *Request) PollPeriod(d time.Duration) *Request {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do formats and executes the request. Returns the API object received, or an error.
|
func (r *Request) finalURL() string {
|
||||||
func (r *Request) Do() Result {
|
|
||||||
for {
|
|
||||||
if r.err != nil {
|
|
||||||
return Result{err: r.err}
|
|
||||||
}
|
|
||||||
finalURL := r.c.host + r.path
|
finalURL := r.c.host + r.path
|
||||||
query := url.Values{}
|
query := url.Values{}
|
||||||
if r.selector != nil {
|
if r.selector != nil {
|
||||||
@@ -209,7 +207,38 @@ func (r *Request) Do() Result {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
finalURL += "?" + query.Encode()
|
finalURL += "?" + query.Encode()
|
||||||
req, err := http.NewRequest(r.verb, finalURL, r.body)
|
return finalURL
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempts to begin watching the requested location. Returns a watch.Interface, or an error.
|
||||||
|
func (r *Request) Watch() (watch.Interface, error) {
|
||||||
|
if r.err != nil {
|
||||||
|
return nil, r.err
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if r.c.auth != nil {
|
||||||
|
req.SetBasicAuth(r.c.auth.User, r.c.auth.Password)
|
||||||
|
}
|
||||||
|
response, err := r.c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if response.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
|
||||||
|
}
|
||||||
|
return watch.NewStreamWatcher(tools.NewAPIEventDecoder(response.Body)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do formats and executes the request. Returns the API object received, or an error.
|
||||||
|
func (r *Request) Do() Result {
|
||||||
|
for {
|
||||||
|
if r.err != nil {
|
||||||
|
return Result{err: r.err}
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Result{err: err}
|
return Result{err: err}
|
||||||
}
|
}
|
||||||
|
@@ -18,16 +18,20 @@ package client
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDoRequestNewWay(t *testing.T) {
|
func TestDoRequestNewWay(t *testing.T) {
|
||||||
@@ -303,3 +307,91 @@ func TestPolling(t *testing.T) {
|
|||||||
f()
|
f()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func authFromReq(r *http.Request) (*AuthInfo, bool) {
|
||||||
|
auth, ok := r.Header["Authorization"]
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
encoded := strings.Split(auth[0], " ")
|
||||||
|
if len(encoded) != 2 || encoded[0] != "Basic" {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
decoded, err := base64.StdEncoding.DecodeString(encoded[1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
parts := strings.Split(string(decoded), ":")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return &AuthInfo{User: parts[0], Password: parts[1]}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkAuth sets errors if the auth found in r doesn't match the expectation.
|
||||||
|
// TODO: Move to util, test in more places.
|
||||||
|
func checkAuth(t *testing.T, expect AuthInfo, r *http.Request) {
|
||||||
|
foundAuth, found := authFromReq(r)
|
||||||
|
if !found {
|
||||||
|
t.Errorf("no auth found")
|
||||||
|
} else if e, a := expect, *foundAuth; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Fatalf("Wrong basic auth: wanted %#v, got %#v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatch(t *testing.T) {
|
||||||
|
var table = []struct {
|
||||||
|
t watch.EventType
|
||||||
|
obj interface{}
|
||||||
|
}{
|
||||||
|
{watch.Added, &api.Pod{JSONBase: api.JSONBase{ID: "first"}}},
|
||||||
|
{watch.Modified, &api.Pod{JSONBase: api.JSONBase{ID: "second"}}},
|
||||||
|
{watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "third"}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
auth := AuthInfo{User: "user", Password: "pass"}
|
||||||
|
testServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
checkAuth(t, auth, r)
|
||||||
|
flusher, ok := w.(http.Flusher)
|
||||||
|
if !ok {
|
||||||
|
panic("need flusher!")
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Transfer-Encoding", "chunked")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
encoder := json.NewEncoder(w)
|
||||||
|
for _, item := range table {
|
||||||
|
encoder.Encode(&api.WatchEvent{item.t, api.APIObject{item.obj}})
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
|
||||||
|
s := New(testServer.URL, &auth)
|
||||||
|
|
||||||
|
watching, err := s.Get().Path("path/to/watch/thing").Watch()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
got, ok := <-watching.ResultChan()
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Unexpected early close")
|
||||||
|
}
|
||||||
|
if e, a := item.t, got.Type; e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
if e, a := item.obj, got.Object; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := <-watching.ResultChan()
|
||||||
|
if ok {
|
||||||
|
t.Fatal("Unexpected non-close")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -419,29 +419,7 @@ func TestSyncronize(t *testing.T) {
|
|||||||
validateSyncReplication(t, &fakePodControl, 7, 0)
|
validateSyncReplication(t, &fakePodControl, 7, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
type asyncTimeout struct {
|
|
||||||
doneChan chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func beginTimeout(d time.Duration) *asyncTimeout {
|
|
||||||
a := &asyncTimeout{doneChan: make(chan bool)}
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-a.doneChan:
|
|
||||||
return
|
|
||||||
case <-time.After(d):
|
|
||||||
panic("Timeout expired!")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *asyncTimeout) done() {
|
|
||||||
close(a.doneChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWatchControllers(t *testing.T) {
|
func TestWatchControllers(t *testing.T) {
|
||||||
defer beginTimeout(20 * time.Second).done()
|
|
||||||
fakeEtcd := tools.MakeFakeEtcdClient(t)
|
fakeEtcd := tools.MakeFakeEtcdClient(t)
|
||||||
manager := MakeReplicationManager(fakeEtcd, nil)
|
manager := MakeReplicationManager(fakeEtcd, nil)
|
||||||
var testControllerSpec api.ReplicationController
|
var testControllerSpec api.ReplicationController
|
||||||
|
53
pkg/tools/decoder.go
Normal file
53
pkg/tools/decoder.go
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
|
)
|
||||||
|
|
||||||
|
// APIEventDecoder implements the watch.Decoder interface for io.ReadClosers that
|
||||||
|
// have contents which consist of a series of api.WatchEvent objects encoded via JSON.
|
||||||
|
type APIEventDecoder struct {
|
||||||
|
stream io.ReadCloser
|
||||||
|
decoder *json.Decoder
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewAPIEventDecoder makes an APIEventDecoder for the given stream.
|
||||||
|
func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder {
|
||||||
|
return &APIEventDecoder{
|
||||||
|
stream: stream,
|
||||||
|
decoder: json.NewDecoder(stream),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode blocks until it can return the next object in the stream. Returns an error
|
||||||
|
// if the stream is closed or an object can't be decoded.
|
||||||
|
func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) {
|
||||||
|
var got api.WatchEvent
|
||||||
|
err = d.decoder.Decode(&got)
|
||||||
|
return got.Type, got.Object.Object, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying stream.
|
||||||
|
func (d *APIEventDecoder) Close() {
|
||||||
|
d.stream.Close()
|
||||||
|
}
|
85
pkg/tools/decoder_test.go
Normal file
85
pkg/tools/decoder_test.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDecoder(t *testing.T) {
|
||||||
|
out, in := io.Pipe()
|
||||||
|
encoder := json.NewEncoder(in)
|
||||||
|
decoder := NewAPIEventDecoder(out)
|
||||||
|
|
||||||
|
expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||||
|
go func() {
|
||||||
|
err := encoder.Encode(api.WatchEvent{watch.Added, api.APIObject{expect}})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
action, got, err := decoder.Decode()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
if e, a := watch.Added, action; e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
if e, a := expect, got; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_, _, err := decoder.Decode()
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Unexpected nil error")
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
decoder.Close()
|
||||||
|
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDecoder_SourceClose(t *testing.T) {
|
||||||
|
out, in := io.Pipe()
|
||||||
|
decoder := NewAPIEventDecoder(out)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_, _, err := decoder.Decode()
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Unexpected nil error")
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
in.Close()
|
||||||
|
|
||||||
|
<-done
|
||||||
|
}
|
91
pkg/watch/iowatcher.go
Normal file
91
pkg/watch/iowatcher.go
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package watch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
|
||||||
|
type Decoder interface {
|
||||||
|
// Decode should return the type of event, the decoded object, or an error.
|
||||||
|
// An error will cause StreamWatcher to call Close(). Decode should block until
|
||||||
|
// it has data or an error occurs.
|
||||||
|
Decode() (action EventType, object interface{}, err error)
|
||||||
|
|
||||||
|
// Close should close the underlying io.Reader, signalling to the source of
|
||||||
|
// the stream that it is no longer being watched. Close() must cause any
|
||||||
|
// outstanding call to Decode() to return with an error of some sort.
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamWatcher turns any stream for which you can write a Decoder interface
|
||||||
|
// into a watch.Interface.
|
||||||
|
type StreamWatcher struct {
|
||||||
|
source Decoder
|
||||||
|
result chan Event
|
||||||
|
sync.Mutex
|
||||||
|
stopped bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStreamWatcher creates a StreamWatcher from the given decoder.
|
||||||
|
func NewStreamWatcher(d Decoder) *StreamWatcher {
|
||||||
|
sw := &StreamWatcher{
|
||||||
|
source: d,
|
||||||
|
// It's easy for a consumer to add buffering via an extra
|
||||||
|
// goroutine/channel, but impossible for them to remove it,
|
||||||
|
// so nonbuffered is better.
|
||||||
|
result: make(chan Event),
|
||||||
|
}
|
||||||
|
go sw.receive()
|
||||||
|
return sw
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResultChan implements Interface.
|
||||||
|
func (sw *StreamWatcher) ResultChan() <-chan Event {
|
||||||
|
return sw.result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop implements Interface.
|
||||||
|
func (sw *StreamWatcher) Stop() {
|
||||||
|
// Call Close() exactly once by locking and setting a flag.
|
||||||
|
sw.Lock()
|
||||||
|
defer sw.Unlock()
|
||||||
|
if !sw.stopped {
|
||||||
|
sw.stopped = true
|
||||||
|
sw.source.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// In a loop, read a result from the decoder and send down the result channel.
|
||||||
|
func (sw *StreamWatcher) receive() {
|
||||||
|
defer close(sw.result)
|
||||||
|
defer sw.Stop()
|
||||||
|
defer util.HandleCrash()
|
||||||
|
for {
|
||||||
|
action, obj, err := sw.source.Decode()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sw.result <- Event{
|
||||||
|
Type: action,
|
||||||
|
Object: obj,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
65
pkg/watch/iowatcher_test.go
Normal file
65
pkg/watch/iowatcher_test.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package watch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeDecoder struct {
|
||||||
|
items chan Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f fakeDecoder) Decode() (action EventType, object interface{}, err error) {
|
||||||
|
item, open := <-f.items
|
||||||
|
if !open {
|
||||||
|
return action, nil, io.EOF
|
||||||
|
}
|
||||||
|
return item.Type, item.Object, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f fakeDecoder) Close() {
|
||||||
|
close(f.items)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamWatcher(t *testing.T) {
|
||||||
|
table := []Event{
|
||||||
|
{Added, "foo"},
|
||||||
|
}
|
||||||
|
|
||||||
|
fd := fakeDecoder{make(chan Event, 5)}
|
||||||
|
sw := NewStreamWatcher(fd)
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
fd.items <- item
|
||||||
|
got, open := <-sw.ResultChan()
|
||||||
|
if !open {
|
||||||
|
t.Errorf("unexpected early close")
|
||||||
|
}
|
||||||
|
if e, a := item, got; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sw.Stop()
|
||||||
|
_, open := <-sw.ResultChan()
|
||||||
|
if open {
|
||||||
|
t.Errorf("Unexpected failure to close")
|
||||||
|
}
|
||||||
|
}
|
@@ -29,7 +29,7 @@ type Interface interface {
|
|||||||
// Returns a chan which will receive all the events. If an error occurs
|
// Returns a chan which will receive all the events. If an error occurs
|
||||||
// or Stop() is called, this channel will be closed, in which case the
|
// or Stop() is called, this channel will be closed, in which case the
|
||||||
// watch should be completely cleaned up.
|
// watch should be completely cleaned up.
|
||||||
ResultChan() <-chan *Event
|
ResultChan() <-chan Event
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventType defines the possible types of events.
|
// EventType defines the possible types of events.
|
||||||
@@ -52,14 +52,14 @@ type Event struct {
|
|||||||
|
|
||||||
// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
|
// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
|
||||||
type FakeWatcher struct {
|
type FakeWatcher struct {
|
||||||
result chan *Event
|
result chan Event
|
||||||
Stopped bool
|
Stopped bool
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFake() *FakeWatcher {
|
func NewFake() *FakeWatcher {
|
||||||
return &FakeWatcher{
|
return &FakeWatcher{
|
||||||
result: make(chan *Event),
|
result: make(chan Event),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,30 +67,32 @@ func NewFake() *FakeWatcher {
|
|||||||
func (f *FakeWatcher) Stop() {
|
func (f *FakeWatcher) Stop() {
|
||||||
f.Lock()
|
f.Lock()
|
||||||
defer f.Unlock()
|
defer f.Unlock()
|
||||||
|
if !f.Stopped {
|
||||||
close(f.result)
|
close(f.result)
|
||||||
f.Stopped = true
|
f.Stopped = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeWatcher) ResultChan() <-chan *Event {
|
func (f *FakeWatcher) ResultChan() <-chan Event {
|
||||||
return f.result
|
return f.result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add sends an add event.
|
// Add sends an add event.
|
||||||
func (f *FakeWatcher) Add(obj interface{}) {
|
func (f *FakeWatcher) Add(obj interface{}) {
|
||||||
f.result <- &Event{Added, obj}
|
f.result <- Event{Added, obj}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Modify sends a modify event.
|
// Modify sends a modify event.
|
||||||
func (f *FakeWatcher) Modify(obj interface{}) {
|
func (f *FakeWatcher) Modify(obj interface{}) {
|
||||||
f.result <- &Event{Modified, obj}
|
f.result <- Event{Modified, obj}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete sends a delete event.
|
// Delete sends a delete event.
|
||||||
func (f *FakeWatcher) Delete(lastValue interface{}) {
|
func (f *FakeWatcher) Delete(lastValue interface{}) {
|
||||||
f.result <- &Event{Deleted, lastValue}
|
f.result <- Event{Deleted, lastValue}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Action sends an event of the requested type, for table-based testing.
|
// Action sends an event of the requested type, for table-based testing.
|
||||||
func (f *FakeWatcher) Action(action EventType, obj interface{}) {
|
func (f *FakeWatcher) Action(action EventType, obj interface{}) {
|
||||||
f.result <- &Event{action, obj}
|
f.result <- Event{action, obj}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user