WatcherDispatcher for federated controller tests

This commit is contained in:
Marcin Wielgus
2016-09-07 23:53:37 +02:00
parent 23147d30e9
commit 961b1a94e2

View File

@@ -17,6 +17,9 @@ limitations under the License.
package testutil package testutil
import ( import (
"os"
"runtime/pprof"
"sync"
"time" "time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
@@ -25,14 +28,110 @@ import (
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
) )
// A structure that distributes eventes to multiple watchers.
type WatcherDispatcher struct {
sync.Mutex
watchers []*watch.FakeWatcher
eventsSoFar []*watch.Event
}
func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) {
wd.Lock()
defer wd.Unlock()
wd.watchers = append(wd.watchers, watcher)
for _, event := range wd.eventsSoFar {
go watcher.Action(event.Type, event.Object)
}
}
// Add sends an add event.
func (wd *WatcherDispatcher) Add(obj runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Added,
Object: obj,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
for _, watcher := range wd.watchers {
go watcher.Add(obj)
}
}
// Modify sends a modify event.
func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Modified,
Object: obj,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
for _, watcher := range wd.watchers {
go watcher.Modify(obj)
}
}
// Delete sends a delete event.
func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Deleted,
Object: lastValue,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
for _, watcher := range wd.watchers {
go watcher.Delete(lastValue)
}
}
// Error sends an Error event.
func (wd *WatcherDispatcher) Error(errValue runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Error,
Object: errValue,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
for _, watcher := range wd.watchers {
go watcher.Error(errValue)
}
}
// Action sends an event of the requested type, for table-based testing.
func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: action,
Object: obj,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
for _, watcher := range wd.watchers {
go watcher.Action(action, obj)
}
}
// RegisterFakeWatch adds a new fake watcher for the specified resource in the given fake client. // RegisterFakeWatch adds a new fake watcher for the specified resource in the given fake client.
// All subsequent requrest for watch on the client will result in returning this fake watcher. // All subsequent requests for a watch on the client will result in returning this fake watcher.
func RegisterFakeWatch(resource string, client *core.Fake) *watch.FakeWatcher { func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
watcher := watch.NewFake() dispatcher := &WatcherDispatcher{
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil }) watchers: make([]*watch.FakeWatcher, 0),
return watcher eventsSoFar: make([]*watch.Event, 0),
}
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
watcher := watch.NewFake()
dispatcher.register(watcher)
return true, watcher, nil
})
return dispatcher
} }
// RegisterFakeList registers a list response for the specified resource inside the given fake client. // RegisterFakeList registers a list response for the specified resource inside the given fake client.
@@ -43,10 +142,10 @@ func RegisterFakeList(resource string, client *core.Fake, obj runtime.Object) {
}) })
} }
// RegisterFakeCopyOnCreate register a reactor in the given fake client that passes // RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes
// all created object to the given watcher and also copies them to a channel for // all created objects to the given watcher and also copies them to a channel for
// in-test inspection. // in-test inspection.
func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object { func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object {
objChan := make(chan runtime.Object, 100) objChan := make(chan runtime.Object, 100)
client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction) createAction := action.(core.CreateAction)
@@ -60,15 +159,22 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *watch
return objChan return objChan
} }
// RegisterFakeCopyOnCreate register a reactor in the given fake client that passes // RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes
// all updated object to the given watcher and also copies them to a channel for // all updated objects to the given watcher and also copies them to a channel for
// in-test inspection. // in-test inspection.
func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object { func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object {
objChan := make(chan runtime.Object, 100) objChan := make(chan runtime.Object, 100)
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
updateAction := action.(core.UpdateAction) updateAction := action.(core.UpdateAction)
obj := updateAction.GetObject() obj := updateAction.GetObject()
go func() { go func() {
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
defer func() {
// Sometimes the channel is already closed.
if panicVal := recover(); panicVal != nil {
glog.Errorf("Recovering from panic: %v", panicVal)
}
}()
watcher.Modify(obj) watcher.Modify(obj)
objChan <- obj objChan <- obj
}() }()
@@ -83,7 +189,8 @@ func GetObjectFromChan(c chan runtime.Object) runtime.Object {
select { select {
case obj := <-c: case obj := <-c:
return obj return obj
case <-time.After(time.Minute): case <-time.After(10 * time.Second):
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
return nil return nil
} }
} }
@@ -93,11 +200,12 @@ func ToFederatedInformerForTestOnly(informer util.FederatedInformer) util.Federa
return inter.(util.FederatedInformerForTestOnly) return inter.(util.FederatedInformerForTestOnly)
} }
// NewCluster build a new cluster object. // NewCluster builds a new cluster object.
func NewCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster { func NewCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster {
return &federation_api.Cluster{ return &federation_api.Cluster{
ObjectMeta: api_v1.ObjectMeta{ ObjectMeta: api_v1.ObjectMeta{
Name: name, Name: name,
Annotations: map[string]string{},
}, },
Status: federation_api.ClusterStatus{ Status: federation_api.ClusterStatus{
Conditions: []federation_api.ClusterCondition{ Conditions: []federation_api.ClusterCondition{