Separate sync and list functionality in the reflector. #23394

This commit is contained in:
Robert Rati
2016-04-22 13:12:22 -04:00
parent c63ac4e664
commit e388c137bb
11 changed files with 236 additions and 67 deletions

View File

@@ -29,10 +29,23 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/google/gofuzz"
)
type testLW struct {
ListFunc func(options api.ListOptions) (runtime.Object, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error)
}
func (t *testLW) List(options api.ListOptions) (runtime.Object, error) {
return t.ListFunc(options)
}
func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
return t.WatchFunc(options)
}
func Example() {
// source simulates an apiserver object endpoint.
source := framework.NewFakeControllerSource()
@@ -295,18 +308,15 @@ func TestUpdate(t *testing.T) {
source := framework.NewFakeControllerSource()
const (
FROM = "from"
ADD_MISSED = "missed the add event"
TO = "to"
FROM = "from"
TO = "to"
)
// These are the transitions we expect to see; because this is
// asynchronous, there are a lot of valid possibilities.
type pair struct{ from, to string }
allowedTransitions := map[pair]bool{
pair{FROM, TO}: true,
pair{FROM, ADD_MISSED}: true,
pair{ADD_MISSED, TO}: true,
pair{FROM, TO}: true,
// Because a resync can happen when we've already observed one
// of the above but before the item is deleted.
@@ -337,21 +347,6 @@ func TestUpdate(t *testing.T) {
source.Add(pod(name, FROM, false))
source.Modify(pod(name, TO, true))
},
func(name string) {
name = "b-" + name
source.Add(pod(name, FROM, false))
source.ModifyDropWatch(pod(name, TO, true))
},
func(name string) {
name = "c-" + name
source.AddDropWatch(pod(name, FROM, false))
source.Modify(pod(name, ADD_MISSED, false))
source.Modify(pod(name, TO, true))
},
func(name string) {
name = "d-" + name
source.Add(pod(name, FROM, true))
},
}
const threads = 3
@@ -362,10 +357,20 @@ func TestUpdate(t *testing.T) {
// Make a controller that deletes things once it observes an update.
// It calls Done() on the wait group on deletions so we can tell when
// everything we've added has been deleted.
watchCh := make(chan struct{})
_, controller := framework.NewInformer(
source,
&testLW{
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
watch, err := source.Watch(options)
close(watchCh)
return watch, err
},
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return source.List(options)
},
},
&api.Pod{},
time.Millisecond*1,
0,
framework.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
o, n := oldObj.(*api.Pod), newObj.(*api.Pod)
@@ -388,6 +393,7 @@ func TestUpdate(t *testing.T) {
// all testDoneWG.Add() calls must happen before this point
stop := make(chan struct{})
go controller.Run(stop)
<-watchCh
// run every test a few times, in parallel
var wg sync.WaitGroup