Merge pull request #13058 from mvdan/go1.5

Race condition and test fixes
This commit is contained in:
Mike Danese 2015-09-10 15:10:19 -07:00
commit b0457bee94
9 changed files with 105 additions and 55 deletions

View File

@ -711,19 +711,19 @@ __EOF__
### Create and delete persistent volume examples ### Create and delete persistent volume examples
# Pre-condition: no persistent volumes currently exist # Pre-condition: no persistent volumes currently exist
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" '' kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
# Command # Command
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}"
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0001:' kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0001:'
kubectl delete pv pv0001 "${kube_flags[@]}" kubectl delete pv pv0001 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}"
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0002:' kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0002:'
kubectl delete pv pv0002 "${kube_flags[@]}" kubectl delete pv pv0002 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}"
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0003:' kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0003:'
kubectl delete pv pv0003 "${kube_flags[@]}" kubectl delete pv pv0003 "${kube_flags[@]}"
# Post-condition: no PVs # Post-condition: no PVs
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" '' kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
############################ ############################
# Persistent Volume Claims # # Persistent Volume Claims #
@ -731,21 +731,21 @@ __EOF__
### Create and delete persistent volume claim examples ### Create and delete persistent volume claim examples
# Pre-condition: no persistent volume claims currently exist # Pre-condition: no persistent volume claims currently exist
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" '' kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''
# Command # Command
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}"
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-1:' kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-1:'
kubectl delete pvc myclaim-1 "${kube_flags[@]}" kubectl delete pvc myclaim-1 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}"
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-2:' kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-2:'
kubectl delete pvc myclaim-2 "${kube_flags[@]}" kubectl delete pvc myclaim-2 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}"
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-3:' kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-3:'
kubectl delete pvc myclaim-3 "${kube_flags[@]}" kubectl delete pvc myclaim-3 "${kube_flags[@]}"
# Post-condition: no PVCs # Post-condition: no PVCs
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" '' kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''

View File

@ -21,6 +21,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -271,39 +272,41 @@ func TestEventf(t *testing.T) {
} }
for _, item := range table { for _, item := range table {
called := make(chan struct{}) var wg sync.WaitGroup
// We expect only one callback
wg.Add(1)
testEvents := testEventSink{ testEvents := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) { OnCreate: func(event *api.Event) (*api.Event, error) {
defer wg.Done()
returnEvent, _ := validateEvent(event, item.expect, t) returnEvent, _ := validateEvent(event, item.expect, t)
if item.expectUpdate { if item.expectUpdate {
t.Errorf("Expected event update(), got event create()") t.Errorf("Expected event update(), got event create()")
} }
called <- struct{}{}
return returnEvent, nil return returnEvent, nil
}, },
OnUpdate: func(event *api.Event) (*api.Event, error) { OnUpdate: func(event *api.Event) (*api.Event, error) {
defer wg.Done()
returnEvent, _ := validateEvent(event, item.expect, t) returnEvent, _ := validateEvent(event, item.expect, t)
if !item.expectUpdate { if !item.expectUpdate {
t.Errorf("Expected event create(), got event update()") t.Errorf("Expected event create(), got event update()")
} }
called <- struct{}{}
return returnEvent, nil return returnEvent, nil
}, },
} }
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
wg.Add(1)
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
defer wg.Done()
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
t.Errorf("Expected '%v', got '%v'", e, a) t.Errorf("Expected '%v', got '%v'", e, a)
} }
called <- struct{}{}
}) })
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
<-called wg.Wait()
<-called
sinkWatcher.Stop() sinkWatcher.Stop()
logWatcher1.Stop() logWatcher1.Stop()
logWatcher2.Stop() logWatcher2.Stop()
@ -316,17 +319,17 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.
if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() { if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() {
t.Errorf("timestamp wasn't set: %#v", *actualEvent) t.Errorf("timestamp wasn't set: %#v", *actualEvent)
} }
if actualEvent.FirstTimestamp.Equal(actualEvent.LastTimestamp) {
if expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
}
} else {
if !expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
}
}
actualFirstTimestamp := actualEvent.FirstTimestamp actualFirstTimestamp := actualEvent.FirstTimestamp
actualLastTimestamp := actualEvent.LastTimestamp actualLastTimestamp := actualEvent.LastTimestamp
if actualFirstTimestamp.Equal(actualLastTimestamp) {
if expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
}
} else {
if expectedEvent.Count == 1 {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
}
}
// Temp clear time stamps for comparison because actual values don't matter for comparison // Temp clear time stamps for comparison because actual values don't matter for comparison
actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp
actualEvent.LastTimestamp = expectedEvent.LastTimestamp actualEvent.LastTimestamp = expectedEvent.LastTimestamp

View File

@ -335,10 +335,6 @@ func TestUpdate(t *testing.T) {
}, },
) )
// Run the controller and run it until we close stop.
stop := make(chan struct{})
go controller.Run(stop)
pod := func(name, check string) *api.Pod { pod := func(name, check string) *api.Pod {
return &api.Pod{ return &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
@ -371,11 +367,18 @@ func TestUpdate(t *testing.T) {
}, },
} }
// run every test a few times, in parallel
const threads = 3 const threads = 3
testDoneWG.Add(threads * len(tests))
// Run the controller and run it until we close stop.
// Once Run() is called, calls to testDoneWG.Done() might start, so
// all testDoneWG.Add() calls must happen before this point
stop := make(chan struct{})
go controller.Run(stop)
// run every test a few times, in parallel
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(threads * len(tests)) wg.Add(threads * len(tests))
testDoneWG.Add(threads * len(tests))
for i := 0; i < threads; i++ { for i := 0; i < threads; i++ {
for j, f := range tests { for j, f := range tests {
go func(name string, f func(string)) { go func(name string, f func(string)) {

View File

@ -29,6 +29,15 @@ import (
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
) )
func containsAny(s string, substrs []string) bool {
for _, substr := range substrs {
if strings.Contains(s, substr) {
return true
}
}
return false
}
func TestHTTPProbeChecker(t *testing.T) { func TestHTTPProbeChecker(t *testing.T) {
handleReq := func(s int, body string) func(w http.ResponseWriter) { handleReq := func(s int, body string) func(w http.ResponseWriter) {
return func(w http.ResponseWriter) { return func(w http.ResponseWriter) {
@ -41,12 +50,31 @@ func TestHTTPProbeChecker(t *testing.T) {
testCases := []struct { testCases := []struct {
handler func(w http.ResponseWriter) handler func(w http.ResponseWriter)
health probe.Result health probe.Result
body string // go1.5: error message changed for timeout, need to support
// both old and new
accBodies []string
}{ }{
// The probe will be filled in below. This is primarily testing that an HTTP GET happens. // The probe will be filled in below. This is primarily testing that an HTTP GET happens.
{handleReq(http.StatusOK, "ok body"), probe.Success, "ok body"}, {
{handleReq(-1, "fail body"), probe.Failure, "fail body"}, handleReq(http.StatusOK, "ok body"),
{func(w http.ResponseWriter) { time.Sleep(3 * time.Second) }, probe.Failure, "use of closed network connection"}, probe.Success,
[]string{"ok body"},
},
{
handleReq(-1, "fail body"),
probe.Failure,
[]string{"fail body"},
},
{
func(w http.ResponseWriter) {
time.Sleep(3 * time.Second)
},
probe.Failure,
[]string{
"use of closed network connection",
"request canceled (Client.Timeout exceeded while awaiting headers)",
},
},
} }
for _, test := range testCases { for _, test := range testCases {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -74,8 +102,8 @@ func TestHTTPProbeChecker(t *testing.T) {
if health != test.health { if health != test.health {
t.Errorf("Expected %v, got %v", test.health, health) t.Errorf("Expected %v, got %v", test.health, health)
} }
if !strings.Contains(output, test.body) { if !containsAny(output, test.accBodies) {
t.Errorf("Expected %v, got %v", test.body, output) t.Errorf("Expected one of %#v, got %v", test.accBodies, output)
} }
} }
} }

View File

@ -29,17 +29,28 @@ import (
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
) )
func containsAny(s string, substrs []string) bool {
for _, substr := range substrs {
if strings.Contains(s, substr) {
return true
}
}
return false
}
func TestTcpHealthChecker(t *testing.T) { func TestTcpHealthChecker(t *testing.T) {
prober := New() prober := New()
tests := []struct { tests := []struct {
expectedStatus probe.Result expectedStatus probe.Result
usePort bool usePort bool
expectError bool expectError bool
output string // Some errors are different depending on your system, make
// the test pass on all of them
accOutputs []string
}{ }{
// The probe will be filled in below. This is primarily testing that a connection is made. // The probe will be filled in below. This is primarily testing that a connection is made.
{probe.Success, true, false, ""}, {probe.Success, true, false, []string{""}},
{probe.Failure, false, false, "tcp: unknown port"}, {probe.Failure, false, false, []string{"unknown port", "Servname not supported for ai_socktype"}},
} }
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -72,8 +83,8 @@ func TestTcpHealthChecker(t *testing.T) {
if err == nil && test.expectError { if err == nil && test.expectError {
t.Errorf("unexpected non-error.") t.Errorf("unexpected non-error.")
} }
if !strings.Contains(output, test.output) { if !containsAny(output, test.accOutputs) {
t.Errorf("expected %s, got %s", test.output, output) t.Errorf("expected one of %#v, got %s", test.accOutputs, output)
} }
} }
} }

View File

@ -217,7 +217,7 @@ func (j *JSONPath) evalArray(input []reflect.Value, node *ArrayNode) ([]reflect.
value, isNil := template.Indirect(value) value, isNil := template.Indirect(value)
if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) { if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) {
return input, fmt.Errorf("%v is not array or slice", value) return input, fmt.Errorf("%v is not array or slice", value.Type())
} }
params := node.Params params := node.Params
if !params[0].Known { if !params[0].Known {

View File

@ -162,7 +162,7 @@ func TestStructInput(t *testing.T) {
failStoreTests := []jsonpathTest{ failStoreTests := []jsonpathTest{
{"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"}, {"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"},
{"nonexistent field", "{.hello}", storeData, "hello is not found"}, {"nonexistent field", "{.hello}", storeData, "hello is not found"},
{"invalid array", "{.Labels[0]}", storeData, "<map[string]int Value> is not array or slice"}, {"invalid array", "{.Labels[0]}", storeData, "map[string]int is not array or slice"},
{"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"}, {"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"},
{"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"}, {"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"},
} }

View File

@ -43,6 +43,7 @@ type Broadcaster struct {
watchers map[int64]*broadcasterWatcher watchers map[int64]*broadcasterWatcher
nextWatcher int64 nextWatcher int64
distributing sync.WaitGroup
incoming chan Event incoming chan Event
@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
watchQueueLength: queueLength, watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior, fullChannelBehavior: fullChannelBehavior,
} }
m.distributing.Add(1)
go m.loop() go m.loop()
return m return m
} }
@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
} }
// Shutdown disconnects all watchers (but any queued events will still be distributed). // Shutdown disconnects all watchers (but any queued events will still be distributed).
// You must not call Action after calling Shutdown. // You must not call Action or Watch* after calling Shutdown. This call blocks
// until all events have been distributed through the outbound channels. Note
// that since they can be buffered, this means that the watchers might not
// have received the data yet as it can remain sitting in the buffered
// channel.
func (m *Broadcaster) Shutdown() { func (m *Broadcaster) Shutdown() {
close(m.incoming) close(m.incoming)
m.distributing.Wait()
} }
// loop receives from m.incoming and distributes to all watchers. // loop receives from m.incoming and distributes to all watchers.
@ -163,6 +170,7 @@ func (m *Broadcaster) loop() {
m.distribute(event) m.distribute(event)
} }
m.closeAll() m.closeAll()
m.distributing.Done()
} }
// distribute sends event to all watchers. Blocking. // distribute sends event to all watchers. Blocking.

View File

@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
event2 := Event{Added, &myType{"bar", "hello world 2"}} event2 := Event{Added, &myType{"bar", "hello world 2"}}
// Add a couple watchers // Add a couple watchers
const testWatchers = 2 watches := make([]Interface, 2)
watches := make([]Interface, testWatchers) for i := range watches {
for i := 0; i < testWatchers; i++ {
watches[i] = m.Watch() watches[i] = m.Watch()
} }
@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
// Pull events from the queue. // Pull events from the queue.
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(testWatchers) wg.Add(len(watches))
for i := 0; i < testWatchers; i++ { for i := range watches {
// Verify that each watcher only gets the first event because its watch // Verify that each watcher only gets the first event because its watch
// queue of length one was full from the first one. // queue of length one was full from the first one.
go func(watcher int, w Interface) { go func(watcher int, w Interface) {
@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
e1, ok := <-w.ResultChan() e1, ok := <-w.ResultChan()
if !ok { if !ok {
t.Errorf("Watcher %v failed to retrieve first event.", watcher) t.Errorf("Watcher %v failed to retrieve first event.", watcher)
return
} }
if e, a := event1, e1; !reflect.DeepEqual(e, a) { if e, a := event1, e1; !reflect.DeepEqual(e, a) {
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
watcher, e.Type, e.Object, a.Type, a.Object) watcher, e.Type, e.Object, a.Type, a.Object)
} else {
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
} }
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
e2, ok := <-w.ResultChan() e2, ok := <-w.ResultChan()
if ok { if ok {
t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.", t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",