Merge pull request #13058 from mvdan/go1.5

Race condition and test fixes
This commit is contained in:
Mike Danese 2015-09-10 15:10:19 -07:00
commit b0457bee94
9 changed files with 105 additions and 55 deletions

View File

@ -711,19 +711,19 @@ __EOF__
### Create and delete persistent volume examples
# Pre-condition: no persistent volumes currently exist
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" ''
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
# Command
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}"
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0001:'
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0001:'
kubectl delete pv pv0001 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}"
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0002:'
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0002:'
kubectl delete pv pv0002 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}"
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0003:'
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0003:'
kubectl delete pv pv0003 "${kube_flags[@]}"
# Post-condition: no PVs
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" ''
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
############################
# Persistent Volume Claims #
@ -731,21 +731,21 @@ __EOF__
### Create and delete persistent volume claim examples
# Pre-condition: no persistent volume claims currently exist
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" ''
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''
# Command
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}"
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-1:'
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-1:'
kubectl delete pvc myclaim-1 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}"
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-2:'
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-2:'
kubectl delete pvc myclaim-2 "${kube_flags[@]}"
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}"
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-3:'
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-3:'
kubectl delete pvc myclaim-3 "${kube_flags[@]}"
# Post-condition: no PVCs
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" ''
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''

View File

@ -21,6 +21,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"testing"
"k8s.io/kubernetes/pkg/api"
@ -271,39 +272,41 @@ func TestEventf(t *testing.T) {
}
for _, item := range table {
called := make(chan struct{})
var wg sync.WaitGroup
// We expect only one callback
wg.Add(1)
testEvents := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
defer wg.Done()
returnEvent, _ := validateEvent(event, item.expect, t)
if item.expectUpdate {
t.Errorf("Expected event update(), got event create()")
}
called <- struct{}{}
return returnEvent, nil
},
OnUpdate: func(event *api.Event) (*api.Event, error) {
defer wg.Done()
returnEvent, _ := validateEvent(event, item.expect, t)
if !item.expectUpdate {
t.Errorf("Expected event create(), got event update()")
}
called <- struct{}{}
return returnEvent, nil
},
}
eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
wg.Add(1)
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
defer wg.Done()
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
t.Errorf("Expected '%v', got '%v'", e, a)
}
called <- struct{}{}
})
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
<-called
<-called
wg.Wait()
sinkWatcher.Stop()
logWatcher1.Stop()
logWatcher2.Stop()
@ -316,17 +319,17 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.
if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() {
t.Errorf("timestamp wasn't set: %#v", *actualEvent)
}
if actualEvent.FirstTimestamp.Equal(actualEvent.LastTimestamp) {
if expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
}
} else {
if !expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
}
}
actualFirstTimestamp := actualEvent.FirstTimestamp
actualLastTimestamp := actualEvent.LastTimestamp
if actualFirstTimestamp.Equal(actualLastTimestamp) {
if expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
}
} else {
if expectedEvent.Count == 1 {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
}
}
// Temp clear time stamps for comparison because actual values don't matter for comparison
actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp
actualEvent.LastTimestamp = expectedEvent.LastTimestamp

View File

@ -335,10 +335,6 @@ func TestUpdate(t *testing.T) {
},
)
// Run the controller and run it until we close stop.
stop := make(chan struct{})
go controller.Run(stop)
pod := func(name, check string) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -371,11 +367,18 @@ func TestUpdate(t *testing.T) {
},
}
// run every test a few times, in parallel
const threads = 3
testDoneWG.Add(threads * len(tests))
// Run the controller and run it until we close stop.
// Once Run() is called, calls to testDoneWG.Done() might start, so
// all testDoneWG.Add() calls must happen before this point
stop := make(chan struct{})
go controller.Run(stop)
// run every test a few times, in parallel
var wg sync.WaitGroup
wg.Add(threads * len(tests))
testDoneWG.Add(threads * len(tests))
for i := 0; i < threads; i++ {
for j, f := range tests {
go func(name string, f func(string)) {

View File

@ -29,6 +29,15 @@ import (
"k8s.io/kubernetes/pkg/probe"
)
func containsAny(s string, substrs []string) bool {
for _, substr := range substrs {
if strings.Contains(s, substr) {
return true
}
}
return false
}
func TestHTTPProbeChecker(t *testing.T) {
handleReq := func(s int, body string) func(w http.ResponseWriter) {
return func(w http.ResponseWriter) {
@ -41,12 +50,31 @@ func TestHTTPProbeChecker(t *testing.T) {
testCases := []struct {
handler func(w http.ResponseWriter)
health probe.Result
body string
// go1.5: error message changed for timeout, need to support
// both old and new
accBodies []string
}{
// The probe will be filled in below. This is primarily testing that an HTTP GET happens.
{handleReq(http.StatusOK, "ok body"), probe.Success, "ok body"},
{handleReq(-1, "fail body"), probe.Failure, "fail body"},
{func(w http.ResponseWriter) { time.Sleep(3 * time.Second) }, probe.Failure, "use of closed network connection"},
{
handleReq(http.StatusOK, "ok body"),
probe.Success,
[]string{"ok body"},
},
{
handleReq(-1, "fail body"),
probe.Failure,
[]string{"fail body"},
},
{
func(w http.ResponseWriter) {
time.Sleep(3 * time.Second)
},
probe.Failure,
[]string{
"use of closed network connection",
"request canceled (Client.Timeout exceeded while awaiting headers)",
},
},
}
for _, test := range testCases {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -74,8 +102,8 @@ func TestHTTPProbeChecker(t *testing.T) {
if health != test.health {
t.Errorf("Expected %v, got %v", test.health, health)
}
if !strings.Contains(output, test.body) {
t.Errorf("Expected %v, got %v", test.body, output)
if !containsAny(output, test.accBodies) {
t.Errorf("Expected one of %#v, got %v", test.accBodies, output)
}
}
}

View File

@ -29,17 +29,28 @@ import (
"k8s.io/kubernetes/pkg/probe"
)
func containsAny(s string, substrs []string) bool {
for _, substr := range substrs {
if strings.Contains(s, substr) {
return true
}
}
return false
}
func TestTcpHealthChecker(t *testing.T) {
prober := New()
tests := []struct {
expectedStatus probe.Result
usePort bool
expectError bool
output string
// Some errors are different depending on your system, make
// the test pass on all of them
accOutputs []string
}{
// The probe will be filled in below. This is primarily testing that a connection is made.
{probe.Success, true, false, ""},
{probe.Failure, false, false, "tcp: unknown port"},
{probe.Success, true, false, []string{""}},
{probe.Failure, false, false, []string{"unknown port", "Servname not supported for ai_socktype"}},
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -72,8 +83,8 @@ func TestTcpHealthChecker(t *testing.T) {
if err == nil && test.expectError {
t.Errorf("unexpected non-error.")
}
if !strings.Contains(output, test.output) {
t.Errorf("expected %s, got %s", test.output, output)
if !containsAny(output, test.accOutputs) {
t.Errorf("expected one of %#v, got %s", test.accOutputs, output)
}
}
}

View File

@ -217,7 +217,7 @@ func (j *JSONPath) evalArray(input []reflect.Value, node *ArrayNode) ([]reflect.
value, isNil := template.Indirect(value)
if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) {
return input, fmt.Errorf("%v is not array or slice", value)
return input, fmt.Errorf("%v is not array or slice", value.Type())
}
params := node.Params
if !params[0].Known {

View File

@ -162,7 +162,7 @@ func TestStructInput(t *testing.T) {
failStoreTests := []jsonpathTest{
{"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"},
{"nonexistent field", "{.hello}", storeData, "hello is not found"},
{"invalid array", "{.Labels[0]}", storeData, "<map[string]int Value> is not array or slice"},
{"invalid array", "{.Labels[0]}", storeData, "map[string]int is not array or slice"},
{"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"},
{"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"},
}

View File

@ -41,8 +41,9 @@ const incomingQueueLength = 25
type Broadcaster struct {
lock sync.Mutex
watchers map[int64]*broadcasterWatcher
nextWatcher int64
watchers map[int64]*broadcasterWatcher
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}
@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
}
// Shutdown disconnects all watchers (but any queued events will still be distributed).
// You must not call Action after calling Shutdown.
// You must not call Action or Watch* after calling Shutdown. This call blocks
// until all events have been distributed through the outbound channels. Note
// that since they can be buffered, this means that the watchers might not
// have received the data yet as it can remain sitting in the buffered
// channel.
func (m *Broadcaster) Shutdown() {
close(m.incoming)
m.distributing.Wait()
}
// loop receives from m.incoming and distributes to all watchers.
@ -163,6 +170,7 @@ func (m *Broadcaster) loop() {
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}
// distribute sends event to all watchers. Blocking.

View File

@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
event2 := Event{Added, &myType{"bar", "hello world 2"}}
// Add a couple watchers
const testWatchers = 2
watches := make([]Interface, testWatchers)
for i := 0; i < testWatchers; i++ {
watches := make([]Interface, 2)
for i := range watches {
watches[i] = m.Watch()
}
@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
// Pull events from the queue.
wg := sync.WaitGroup{}
wg.Add(testWatchers)
for i := 0; i < testWatchers; i++ {
wg.Add(len(watches))
for i := range watches {
// Verify that each watcher only gets the first event because its watch
// queue of length one was full from the first one.
go func(watcher int, w Interface) {
@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
e1, ok := <-w.ResultChan()
if !ok {
t.Errorf("Watcher %v failed to retrieve first event.", watcher)
return
}
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
watcher, e.Type, e.Object, a.Type, a.Object)
} else {
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
}
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
e2, ok := <-w.ResultChan()
if ok {
t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",