Merge pull request #13058 from mvdan/go1.5
Race condition and test fixes
This commit is contained in:
commit
b0457bee94
@ -711,19 +711,19 @@ __EOF__
|
||||
|
||||
### Create and delete persistent volume examples
|
||||
# Pre-condition: no persistent volumes currently exist
|
||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" ''
|
||||
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
# Command
|
||||
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}"
|
||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0001:'
|
||||
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0001:'
|
||||
kubectl delete pv pv0001 "${kube_flags[@]}"
|
||||
kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}"
|
||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0002:'
|
||||
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0002:'
|
||||
kubectl delete pv pv0002 "${kube_flags[@]}"
|
||||
kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}"
|
||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0003:'
|
||||
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0003:'
|
||||
kubectl delete pv pv0003 "${kube_flags[@]}"
|
||||
# Post-condition: no PVs
|
||||
kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" ''
|
||||
kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
|
||||
############################
|
||||
# Persistent Volume Claims #
|
||||
@ -731,21 +731,21 @@ __EOF__
|
||||
|
||||
### Create and delete persistent volume claim examples
|
||||
# Pre-condition: no persistent volume claims currently exist
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" ''
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
# Command
|
||||
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}"
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-1:'
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-1:'
|
||||
kubectl delete pvc myclaim-1 "${kube_flags[@]}"
|
||||
|
||||
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}"
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-2:'
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-2:'
|
||||
kubectl delete pvc myclaim-2 "${kube_flags[@]}"
|
||||
|
||||
kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}"
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-3:'
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-3:'
|
||||
kubectl delete pvc myclaim-3 "${kube_flags[@]}"
|
||||
# Post-condition: no PVCs
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" ''
|
||||
kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
|
||||
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
@ -271,39 +272,41 @@ func TestEventf(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
called := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
// We expect only one callback
|
||||
wg.Add(1)
|
||||
testEvents := testEventSink{
|
||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||
defer wg.Done()
|
||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
||||
if item.expectUpdate {
|
||||
t.Errorf("Expected event update(), got event create()")
|
||||
}
|
||||
called <- struct{}{}
|
||||
return returnEvent, nil
|
||||
},
|
||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||
defer wg.Done()
|
||||
returnEvent, _ := validateEvent(event, item.expect, t)
|
||||
if !item.expectUpdate {
|
||||
t.Errorf("Expected event create(), got event update()")
|
||||
}
|
||||
called <- struct{}{}
|
||||
return returnEvent, nil
|
||||
},
|
||||
}
|
||||
eventBroadcaster := NewBroadcaster()
|
||||
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
||||
wg.Add(1)
|
||||
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
||||
defer wg.Done()
|
||||
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
||||
t.Errorf("Expected '%v', got '%v'", e, a)
|
||||
}
|
||||
called <- struct{}{}
|
||||
})
|
||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
||||
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||
|
||||
<-called
|
||||
<-called
|
||||
wg.Wait()
|
||||
sinkWatcher.Stop()
|
||||
logWatcher1.Stop()
|
||||
logWatcher2.Stop()
|
||||
@ -316,17 +319,17 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.
|
||||
if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() {
|
||||
t.Errorf("timestamp wasn't set: %#v", *actualEvent)
|
||||
}
|
||||
if actualEvent.FirstTimestamp.Equal(actualEvent.LastTimestamp) {
|
||||
if expectCompression {
|
||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
|
||||
}
|
||||
} else {
|
||||
if !expectCompression {
|
||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
|
||||
}
|
||||
}
|
||||
actualFirstTimestamp := actualEvent.FirstTimestamp
|
||||
actualLastTimestamp := actualEvent.LastTimestamp
|
||||
if actualFirstTimestamp.Equal(actualLastTimestamp) {
|
||||
if expectCompression {
|
||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
|
||||
}
|
||||
} else {
|
||||
if expectedEvent.Count == 1 {
|
||||
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent)
|
||||
}
|
||||
}
|
||||
// Temp clear time stamps for comparison because actual values don't matter for comparison
|
||||
actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp
|
||||
actualEvent.LastTimestamp = expectedEvent.LastTimestamp
|
||||
|
@ -335,10 +335,6 @@ func TestUpdate(t *testing.T) {
|
||||
},
|
||||
)
|
||||
|
||||
// Run the controller and run it until we close stop.
|
||||
stop := make(chan struct{})
|
||||
go controller.Run(stop)
|
||||
|
||||
pod := func(name, check string) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
@ -371,11 +367,18 @@ func TestUpdate(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
// run every test a few times, in parallel
|
||||
const threads = 3
|
||||
testDoneWG.Add(threads * len(tests))
|
||||
|
||||
// Run the controller and run it until we close stop.
|
||||
// Once Run() is called, calls to testDoneWG.Done() might start, so
|
||||
// all testDoneWG.Add() calls must happen before this point
|
||||
stop := make(chan struct{})
|
||||
go controller.Run(stop)
|
||||
|
||||
// run every test a few times, in parallel
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(threads * len(tests))
|
||||
testDoneWG.Add(threads * len(tests))
|
||||
for i := 0; i < threads; i++ {
|
||||
for j, f := range tests {
|
||||
go func(name string, f func(string)) {
|
||||
|
@ -29,6 +29,15 @@ import (
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
)
|
||||
|
||||
func containsAny(s string, substrs []string) bool {
|
||||
for _, substr := range substrs {
|
||||
if strings.Contains(s, substr) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestHTTPProbeChecker(t *testing.T) {
|
||||
handleReq := func(s int, body string) func(w http.ResponseWriter) {
|
||||
return func(w http.ResponseWriter) {
|
||||
@ -41,12 +50,31 @@ func TestHTTPProbeChecker(t *testing.T) {
|
||||
testCases := []struct {
|
||||
handler func(w http.ResponseWriter)
|
||||
health probe.Result
|
||||
body string
|
||||
// go1.5: error message changed for timeout, need to support
|
||||
// both old and new
|
||||
accBodies []string
|
||||
}{
|
||||
// The probe will be filled in below. This is primarily testing that an HTTP GET happens.
|
||||
{handleReq(http.StatusOK, "ok body"), probe.Success, "ok body"},
|
||||
{handleReq(-1, "fail body"), probe.Failure, "fail body"},
|
||||
{func(w http.ResponseWriter) { time.Sleep(3 * time.Second) }, probe.Failure, "use of closed network connection"},
|
||||
{
|
||||
handleReq(http.StatusOK, "ok body"),
|
||||
probe.Success,
|
||||
[]string{"ok body"},
|
||||
},
|
||||
{
|
||||
handleReq(-1, "fail body"),
|
||||
probe.Failure,
|
||||
[]string{"fail body"},
|
||||
},
|
||||
{
|
||||
func(w http.ResponseWriter) {
|
||||
time.Sleep(3 * time.Second)
|
||||
},
|
||||
probe.Failure,
|
||||
[]string{
|
||||
"use of closed network connection",
|
||||
"request canceled (Client.Timeout exceeded while awaiting headers)",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -74,8 +102,8 @@ func TestHTTPProbeChecker(t *testing.T) {
|
||||
if health != test.health {
|
||||
t.Errorf("Expected %v, got %v", test.health, health)
|
||||
}
|
||||
if !strings.Contains(output, test.body) {
|
||||
t.Errorf("Expected %v, got %v", test.body, output)
|
||||
if !containsAny(output, test.accBodies) {
|
||||
t.Errorf("Expected one of %#v, got %v", test.accBodies, output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,17 +29,28 @@ import (
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
)
|
||||
|
||||
func containsAny(s string, substrs []string) bool {
|
||||
for _, substr := range substrs {
|
||||
if strings.Contains(s, substr) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestTcpHealthChecker(t *testing.T) {
|
||||
prober := New()
|
||||
tests := []struct {
|
||||
expectedStatus probe.Result
|
||||
usePort bool
|
||||
expectError bool
|
||||
output string
|
||||
// Some errors are different depending on your system, make
|
||||
// the test pass on all of them
|
||||
accOutputs []string
|
||||
}{
|
||||
// The probe will be filled in below. This is primarily testing that a connection is made.
|
||||
{probe.Success, true, false, ""},
|
||||
{probe.Failure, false, false, "tcp: unknown port"},
|
||||
{probe.Success, true, false, []string{""}},
|
||||
{probe.Failure, false, false, []string{"unknown port", "Servname not supported for ai_socktype"}},
|
||||
}
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -72,8 +83,8 @@ func TestTcpHealthChecker(t *testing.T) {
|
||||
if err == nil && test.expectError {
|
||||
t.Errorf("unexpected non-error.")
|
||||
}
|
||||
if !strings.Contains(output, test.output) {
|
||||
t.Errorf("expected %s, got %s", test.output, output)
|
||||
if !containsAny(output, test.accOutputs) {
|
||||
t.Errorf("expected one of %#v, got %s", test.accOutputs, output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -217,7 +217,7 @@ func (j *JSONPath) evalArray(input []reflect.Value, node *ArrayNode) ([]reflect.
|
||||
|
||||
value, isNil := template.Indirect(value)
|
||||
if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) {
|
||||
return input, fmt.Errorf("%v is not array or slice", value)
|
||||
return input, fmt.Errorf("%v is not array or slice", value.Type())
|
||||
}
|
||||
params := node.Params
|
||||
if !params[0].Known {
|
||||
|
@ -162,7 +162,7 @@ func TestStructInput(t *testing.T) {
|
||||
failStoreTests := []jsonpathTest{
|
||||
{"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"},
|
||||
{"nonexistent field", "{.hello}", storeData, "hello is not found"},
|
||||
{"invalid array", "{.Labels[0]}", storeData, "<map[string]int Value> is not array or slice"},
|
||||
{"invalid array", "{.Labels[0]}", storeData, "map[string]int is not array or slice"},
|
||||
{"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"},
|
||||
{"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"},
|
||||
}
|
||||
|
@ -41,8 +41,9 @@ const incomingQueueLength = 25
|
||||
type Broadcaster struct {
|
||||
lock sync.Mutex
|
||||
|
||||
watchers map[int64]*broadcasterWatcher
|
||||
nextWatcher int64
|
||||
watchers map[int64]*broadcasterWatcher
|
||||
nextWatcher int64
|
||||
distributing sync.WaitGroup
|
||||
|
||||
incoming chan Event
|
||||
|
||||
@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
|
||||
watchQueueLength: queueLength,
|
||||
fullChannelBehavior: fullChannelBehavior,
|
||||
}
|
||||
m.distributing.Add(1)
|
||||
go m.loop()
|
||||
return m
|
||||
}
|
||||
@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
|
||||
}
|
||||
|
||||
// Shutdown disconnects all watchers (but any queued events will still be distributed).
|
||||
// You must not call Action after calling Shutdown.
|
||||
// You must not call Action or Watch* after calling Shutdown. This call blocks
|
||||
// until all events have been distributed through the outbound channels. Note
|
||||
// that since they can be buffered, this means that the watchers might not
|
||||
// have received the data yet as it can remain sitting in the buffered
|
||||
// channel.
|
||||
func (m *Broadcaster) Shutdown() {
|
||||
close(m.incoming)
|
||||
m.distributing.Wait()
|
||||
}
|
||||
|
||||
// loop receives from m.incoming and distributes to all watchers.
|
||||
@ -163,6 +170,7 @@ func (m *Broadcaster) loop() {
|
||||
m.distribute(event)
|
||||
}
|
||||
m.closeAll()
|
||||
m.distributing.Done()
|
||||
}
|
||||
|
||||
// distribute sends event to all watchers. Blocking.
|
||||
|
@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
||||
event2 := Event{Added, &myType{"bar", "hello world 2"}}
|
||||
|
||||
// Add a couple watchers
|
||||
const testWatchers = 2
|
||||
watches := make([]Interface, testWatchers)
|
||||
for i := 0; i < testWatchers; i++ {
|
||||
watches := make([]Interface, 2)
|
||||
for i := range watches {
|
||||
watches[i] = m.Watch()
|
||||
}
|
||||
|
||||
@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
||||
|
||||
// Pull events from the queue.
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(testWatchers)
|
||||
for i := 0; i < testWatchers; i++ {
|
||||
wg.Add(len(watches))
|
||||
for i := range watches {
|
||||
// Verify that each watcher only gets the first event because its watch
|
||||
// queue of length one was full from the first one.
|
||||
go func(watcher int, w Interface) {
|
||||
@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
||||
e1, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
t.Errorf("Watcher %v failed to retrieve first event.", watcher)
|
||||
return
|
||||
}
|
||||
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
|
||||
watcher, e.Type, e.Object, a.Type, a.Object)
|
||||
} else {
|
||||
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
|
||||
}
|
||||
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
|
||||
e2, ok := <-w.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",
|
||||
|
Loading…
Reference in New Issue
Block a user