Merge pull request #13058 from mvdan/go1.5
Race condition and test fixes
This commit is contained in:
commit
b0457bee94
@ -711,19 +711,19 @@ __EOF__
|
|||||||
|
|
||||||
### Create and delete persistent volume examples
|
### Create and delete persistent volume examples
|
||||||
# Pre-condition: no persistent volumes currently exist
|
# Pre-condition: no persistent volumes currently exist
|
||||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" ''
|
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
# Command
|
# Command
|
||||||
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}"
|
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}"
|
||||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0001:'
|
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0001:'
|
||||||
kubectl delete pv pv0001 "${kube_flags[@]}"
|
kubectl delete pv pv0001 "${kube_flags[@]}"
|
||||||
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}"
|
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}"
|
||||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0002:'
|
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0002:'
|
||||||
kubectl delete pv pv0002 "${kube_flags[@]}"
|
kubectl delete pv pv0002 "${kube_flags[@]}"
|
||||||
kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}"
|
kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}"
|
||||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0003:'
|
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0003:'
|
||||||
kubectl delete pv pv0003 "${kube_flags[@]}"
|
kubectl delete pv pv0003 "${kube_flags[@]}"
|
||||||
# Post-condition: no PVs
|
# Post-condition: no PVs
|
||||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" ''
|
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
|
|
||||||
############################
|
############################
|
||||||
# Persistent Volume Claims #
|
# Persistent Volume Claims #
|
||||||
@ -731,21 +731,21 @@ __EOF__
|
|||||||
|
|
||||||
### Create and delete persistent volume claim examples
|
### Create and delete persistent volume claim examples
|
||||||
# Pre-condition: no persistent volume claims currently exist
|
# Pre-condition: no persistent volume claims currently exist
|
||||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" ''
|
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
# Command
|
# Command
|
||||||
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}"
|
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}"
|
||||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-1:'
|
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-1:'
|
||||||
kubectl delete pvc myclaim-1 "${kube_flags[@]}"
|
kubectl delete pvc myclaim-1 "${kube_flags[@]}"
|
||||||
|
|
||||||
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}"
|
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}"
|
||||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-2:'
|
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-2:'
|
||||||
kubectl delete pvc myclaim-2 "${kube_flags[@]}"
|
kubectl delete pvc myclaim-2 "${kube_flags[@]}"
|
||||||
|
|
||||||
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}"
|
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}"
|
||||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-3:'
|
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-3:'
|
||||||
kubectl delete pvc myclaim-3 "${kube_flags[@]}"
|
kubectl delete pvc myclaim-3 "${kube_flags[@]}"
|
||||||
# Post-condition: no PVCs
|
# Post-condition: no PVCs
|
||||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" ''
|
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -271,39 +272,41 @@ func TestEventf(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
called := make(chan struct{})
|
var wg sync.WaitGroup
|
||||||
|
// We expect only one callback
|
||||||
|
wg.Add(1)
|
||||||
testEvents := testEventSink{
|
testEvents := testEventSink{
|
||||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
defer wg.Done()
|
||||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
returnEvent, _ := validateEvent(event, item.expect, t)
|
||||||
if item.expectUpdate {
|
if item.expectUpdate {
|
||||||
t.Errorf("Expected event update(), got event create()")
|
t.Errorf("Expected event update(), got event create()")
|
||||||
}
|
}
|
||||||
called <- struct{}{}
|
|
||||||
return returnEvent, nil
|
return returnEvent, nil
|
||||||
},
|
},
|
||||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
defer wg.Done()
|
||||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
returnEvent, _ := validateEvent(event, item.expect, t)
|
||||||
if !item.expectUpdate {
|
if !item.expectUpdate {
|
||||||
t.Errorf("Expected event create(), got event update()")
|
t.Errorf("Expected event create(), got event update()")
|
||||||
}
|
}
|
||||||
called <- struct{}{}
|
|
||||||
return returnEvent, nil
|
return returnEvent, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
eventBroadcaster := NewBroadcaster()
|
eventBroadcaster := NewBroadcaster()
|
||||||
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||||
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
||||||
|
wg.Add(1)
|
||||||
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
||||||
|
defer wg.Done()
|
||||||
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
||||||
t.Errorf("Expected '%v', got '%v'", e, a)
|
t.Errorf("Expected '%v', got '%v'", e, a)
|
||||||
}
|
}
|
||||||
called <- struct{}{}
|
|
||||||
})
|
})
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
||||||
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||||
|
|
||||||
<-called
|
wg.Wait()
|
||||||
<-called
|
|
||||||
sinkWatcher.Stop()
|
sinkWatcher.Stop()
|
||||||
logWatcher1.Stop()
|
logWatcher1.Stop()
|
||||||
logWatcher2.Stop()
|
logWatcher2.Stop()
|
||||||
@ -316,17 +319,17 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.
|
|||||||
if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() {
|
if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() {
|
||||||
t.Errorf("timestamp wasn't set: %#v", *actualEvent)
|
t.Errorf("timestamp wasn't set: %#v", *actualEvent)
|
||||||
}
|
}
|
||||||
if actualEvent.FirstTimestamp.Equal(actualEvent.LastTimestamp) {
|
|
||||||
if expectCompression {
|
|
||||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if !expectCompression {
|
|
||||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
actualFirstTimestamp := actualEvent.FirstTimestamp
|
actualFirstTimestamp := actualEvent.FirstTimestamp
|
||||||
actualLastTimestamp := actualEvent.LastTimestamp
|
actualLastTimestamp := actualEvent.LastTimestamp
|
||||||
|
if actualFirstTimestamp.Equal(actualLastTimestamp) {
|
||||||
|
if expectCompression {
|
||||||
|
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if expectedEvent.Count == 1 {
|
||||||
|
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
// Temp clear time stamps for comparison because actual values don't matter for comparison
|
// Temp clear time stamps for comparison because actual values don't matter for comparison
|
||||||
actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp
|
actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp
|
||||||
actualEvent.LastTimestamp = expectedEvent.LastTimestamp
|
actualEvent.LastTimestamp = expectedEvent.LastTimestamp
|
||||||
|
@ -335,10 +335,6 @@ func TestUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
// Run the controller and run it until we close stop.
|
|
||||||
stop := make(chan struct{})
|
|
||||||
go controller.Run(stop)
|
|
||||||
|
|
||||||
pod := func(name, check string) *api.Pod {
|
pod := func(name, check string) *api.Pod {
|
||||||
return &api.Pod{
|
return &api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
@ -371,11 +367,18 @@ func TestUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// run every test a few times, in parallel
|
|
||||||
const threads = 3
|
const threads = 3
|
||||||
|
testDoneWG.Add(threads * len(tests))
|
||||||
|
|
||||||
|
// Run the controller and run it until we close stop.
|
||||||
|
// Once Run() is called, calls to testDoneWG.Done() might start, so
|
||||||
|
// all testDoneWG.Add() calls must happen before this point
|
||||||
|
stop := make(chan struct{})
|
||||||
|
go controller.Run(stop)
|
||||||
|
|
||||||
|
// run every test a few times, in parallel
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(threads * len(tests))
|
wg.Add(threads * len(tests))
|
||||||
testDoneWG.Add(threads * len(tests))
|
|
||||||
for i := 0; i < threads; i++ {
|
for i := 0; i < threads; i++ {
|
||||||
for j, f := range tests {
|
for j, f := range tests {
|
||||||
go func(name string, f func(string)) {
|
go func(name string, f func(string)) {
|
||||||
|
@ -29,6 +29,15 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/probe"
|
"k8s.io/kubernetes/pkg/probe"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func containsAny(s string, substrs []string) bool {
|
||||||
|
for _, substr := range substrs {
|
||||||
|
if strings.Contains(s, substr) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func TestHTTPProbeChecker(t *testing.T) {
|
func TestHTTPProbeChecker(t *testing.T) {
|
||||||
handleReq := func(s int, body string) func(w http.ResponseWriter) {
|
handleReq := func(s int, body string) func(w http.ResponseWriter) {
|
||||||
return func(w http.ResponseWriter) {
|
return func(w http.ResponseWriter) {
|
||||||
@ -41,12 +50,31 @@ func TestHTTPProbeChecker(t *testing.T) {
|
|||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
handler func(w http.ResponseWriter)
|
handler func(w http.ResponseWriter)
|
||||||
health probe.Result
|
health probe.Result
|
||||||
body string
|
// go1.5: error message changed for timeout, need to support
|
||||||
|
// both old and new
|
||||||
|
accBodies []string
|
||||||
}{
|
}{
|
||||||
// The probe will be filled in below. This is primarily testing that an HTTP GET happens.
|
// The probe will be filled in below. This is primarily testing that an HTTP GET happens.
|
||||||
{handleReq(http.StatusOK, "ok body"), probe.Success, "ok body"},
|
{
|
||||||
{handleReq(-1, "fail body"), probe.Failure, "fail body"},
|
handleReq(http.StatusOK, "ok body"),
|
||||||
{func(w http.ResponseWriter) { time.Sleep(3 * time.Second) }, probe.Failure, "use of closed network connection"},
|
probe.Success,
|
||||||
|
[]string{"ok body"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
handleReq(-1, "fail body"),
|
||||||
|
probe.Failure,
|
||||||
|
[]string{"fail body"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
func(w http.ResponseWriter) {
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
},
|
||||||
|
probe.Failure,
|
||||||
|
[]string{
|
||||||
|
"use of closed network connection",
|
||||||
|
"request canceled (Client.Timeout exceeded while awaiting headers)",
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -74,8 +102,8 @@ func TestHTTPProbeChecker(t *testing.T) {
|
|||||||
if health != test.health {
|
if health != test.health {
|
||||||
t.Errorf("Expected %v, got %v", test.health, health)
|
t.Errorf("Expected %v, got %v", test.health, health)
|
||||||
}
|
}
|
||||||
if !strings.Contains(output, test.body) {
|
if !containsAny(output, test.accBodies) {
|
||||||
t.Errorf("Expected %v, got %v", test.body, output)
|
t.Errorf("Expected one of %#v, got %v", test.accBodies, output)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,17 +29,28 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/probe"
|
"k8s.io/kubernetes/pkg/probe"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func containsAny(s string, substrs []string) bool {
|
||||||
|
for _, substr := range substrs {
|
||||||
|
if strings.Contains(s, substr) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func TestTcpHealthChecker(t *testing.T) {
|
func TestTcpHealthChecker(t *testing.T) {
|
||||||
prober := New()
|
prober := New()
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
expectedStatus probe.Result
|
expectedStatus probe.Result
|
||||||
usePort bool
|
usePort bool
|
||||||
expectError bool
|
expectError bool
|
||||||
output string
|
// Some errors are different depending on your system, make
|
||||||
|
// the test pass on all of them
|
||||||
|
accOutputs []string
|
||||||
}{
|
}{
|
||||||
// The probe will be filled in below. This is primarily testing that a connection is made.
|
// The probe will be filled in below. This is primarily testing that a connection is made.
|
||||||
{probe.Success, true, false, ""},
|
{probe.Success, true, false, []string{""}},
|
||||||
{probe.Failure, false, false, "tcp: unknown port"},
|
{probe.Failure, false, false, []string{"unknown port", "Servname not supported for ai_socktype"}},
|
||||||
}
|
}
|
||||||
|
|
||||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -72,8 +83,8 @@ func TestTcpHealthChecker(t *testing.T) {
|
|||||||
if err == nil && test.expectError {
|
if err == nil && test.expectError {
|
||||||
t.Errorf("unexpected non-error.")
|
t.Errorf("unexpected non-error.")
|
||||||
}
|
}
|
||||||
if !strings.Contains(output, test.output) {
|
if !containsAny(output, test.accOutputs) {
|
||||||
t.Errorf("expected %s, got %s", test.output, output)
|
t.Errorf("expected one of %#v, got %s", test.accOutputs, output)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ func (j *JSONPath) evalArray(input []reflect.Value, node *ArrayNode) ([]reflect.
|
|||||||
|
|
||||||
value, isNil := template.Indirect(value)
|
value, isNil := template.Indirect(value)
|
||||||
if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) {
|
if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) {
|
||||||
return input, fmt.Errorf("%v is not array or slice", value)
|
return input, fmt.Errorf("%v is not array or slice", value.Type())
|
||||||
}
|
}
|
||||||
params := node.Params
|
params := node.Params
|
||||||
if !params[0].Known {
|
if !params[0].Known {
|
||||||
|
@ -162,7 +162,7 @@ func TestStructInput(t *testing.T) {
|
|||||||
failStoreTests := []jsonpathTest{
|
failStoreTests := []jsonpathTest{
|
||||||
{"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"},
|
{"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"},
|
||||||
{"nonexistent field", "{.hello}", storeData, "hello is not found"},
|
{"nonexistent field", "{.hello}", storeData, "hello is not found"},
|
||||||
{"invalid array", "{.Labels[0]}", storeData, "<map[string]int Value> is not array or slice"},
|
{"invalid array", "{.Labels[0]}", storeData, "map[string]int is not array or slice"},
|
||||||
{"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"},
|
{"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"},
|
||||||
{"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"},
|
{"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"},
|
||||||
}
|
}
|
||||||
|
@ -41,8 +41,9 @@ const incomingQueueLength = 25
|
|||||||
type Broadcaster struct {
|
type Broadcaster struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
|
||||||
watchers map[int64]*broadcasterWatcher
|
watchers map[int64]*broadcasterWatcher
|
||||||
nextWatcher int64
|
nextWatcher int64
|
||||||
|
distributing sync.WaitGroup
|
||||||
|
|
||||||
incoming chan Event
|
incoming chan Event
|
||||||
|
|
||||||
@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
|
|||||||
watchQueueLength: queueLength,
|
watchQueueLength: queueLength,
|
||||||
fullChannelBehavior: fullChannelBehavior,
|
fullChannelBehavior: fullChannelBehavior,
|
||||||
}
|
}
|
||||||
|
m.distributing.Add(1)
|
||||||
go m.loop()
|
go m.loop()
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown disconnects all watchers (but any queued events will still be distributed).
|
// Shutdown disconnects all watchers (but any queued events will still be distributed).
|
||||||
// You must not call Action after calling Shutdown.
|
// You must not call Action or Watch* after calling Shutdown. This call blocks
|
||||||
|
// until all events have been distributed through the outbound channels. Note
|
||||||
|
// that since they can be buffered, this means that the watchers might not
|
||||||
|
// have received the data yet as it can remain sitting in the buffered
|
||||||
|
// channel.
|
||||||
func (m *Broadcaster) Shutdown() {
|
func (m *Broadcaster) Shutdown() {
|
||||||
close(m.incoming)
|
close(m.incoming)
|
||||||
|
m.distributing.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// loop receives from m.incoming and distributes to all watchers.
|
// loop receives from m.incoming and distributes to all watchers.
|
||||||
@ -163,6 +170,7 @@ func (m *Broadcaster) loop() {
|
|||||||
m.distribute(event)
|
m.distribute(event)
|
||||||
}
|
}
|
||||||
m.closeAll()
|
m.closeAll()
|
||||||
|
m.distributing.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// distribute sends event to all watchers. Blocking.
|
// distribute sends event to all watchers. Blocking.
|
||||||
|
@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
|||||||
event2 := Event{Added, &myType{"bar", "hello world 2"}}
|
event2 := Event{Added, &myType{"bar", "hello world 2"}}
|
||||||
|
|
||||||
// Add a couple watchers
|
// Add a couple watchers
|
||||||
const testWatchers = 2
|
watches := make([]Interface, 2)
|
||||||
watches := make([]Interface, testWatchers)
|
for i := range watches {
|
||||||
for i := 0; i < testWatchers; i++ {
|
|
||||||
watches[i] = m.Watch()
|
watches[i] = m.Watch()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
|||||||
|
|
||||||
// Pull events from the queue.
|
// Pull events from the queue.
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(testWatchers)
|
wg.Add(len(watches))
|
||||||
for i := 0; i < testWatchers; i++ {
|
for i := range watches {
|
||||||
// Verify that each watcher only gets the first event because its watch
|
// Verify that each watcher only gets the first event because its watch
|
||||||
// queue of length one was full from the first one.
|
// queue of length one was full from the first one.
|
||||||
go func(watcher int, w Interface) {
|
go func(watcher int, w Interface) {
|
||||||
@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
|||||||
e1, ok := <-w.ResultChan()
|
e1, ok := <-w.ResultChan()
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Errorf("Watcher %v failed to retrieve first event.", watcher)
|
t.Errorf("Watcher %v failed to retrieve first event.", watcher)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
|
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
|
||||||
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
|
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
|
||||||
watcher, e.Type, e.Object, a.Type, a.Object)
|
watcher, e.Type, e.Object, a.Type, a.Object)
|
||||||
} else {
|
|
||||||
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
|
|
||||||
}
|
}
|
||||||
|
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
|
||||||
e2, ok := <-w.ResultChan()
|
e2, ok := <-w.ResultChan()
|
||||||
if ok {
|
if ok {
|
||||||
t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",
|
t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",
|
||||||
|
Loading…
Reference in New Issue
Block a user