Add test for watch goroutine handling, fix other review comments

This commit is contained in:
Daniel Smith
2014-06-18 13:10:19 -07:00
parent d8206503b8
commit a253209a2c
5 changed files with 131 additions and 31 deletions

View File

@@ -38,6 +38,9 @@ type ReplicationManager struct {
kubeClient client.ClientInterface
podControl PodControlInterface
syncTime <-chan time.Time
// To allow injection of syncReplicationController for testing.
syncHandler func(controllerSpec api.ReplicationController) error
}
// An interface that knows how to add or delete pods
@@ -74,13 +77,17 @@ func (r RealPodControl) deletePod(podID string) error {
}
func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager {
return &ReplicationManager{
rm := &ReplicationManager{
kubeClient: kubeClient,
etcdClient: etcdClient,
podControl: RealPodControl{
kubeClient: kubeClient,
},
}
rm.syncHandler = func(controllerSpec api.ReplicationController) error {
return rm.syncReplicationController(controllerSpec)
}
return rm
}
// Begin watching and syncing.
@@ -91,35 +98,40 @@ func (rm *ReplicationManager) Run(period time.Duration) {
func (rm *ReplicationManager) watchControllers() {
watchChannel := make(chan *etcd.Response)
stop := make(chan bool)
defer func() {
// Ensure that the call to watch ends.
close(stop)
}()
go func() {
defer util.HandleCrash()
defer func() {
close(watchChannel)
}()
rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil)
_, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop)
if err != etcd.ErrWatchStoppedByUser {
log.Printf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
}()
for {
select {
case <-rm.syncTime:
rm.synchronize()
case watchResponse, ok := <-watchChannel:
if !ok {
// watchChannel has been closed. Let the util.Forever() that
// called us call us again.
case watchResponse, open := <-watchChannel:
if !open || watchResponse == nil {
// watchChannel has been closed, or something else went
// wrong with our etcd watch call. Let the util.Forever()
// that called us call us again.
return
}
if watchResponse == nil {
time.Sleep(time.Second * 10)
continue
}
log.Printf("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue
}
rm.syncReplicationController(*controller)
rm.syncHandler(*controller)
}
}
}
@@ -179,9 +191,10 @@ func (rm *ReplicationManager) synchronize() {
err := helper.ExtractList("/registry/controllers", &controllerSpecs)
if err != nil {
log.Printf("Synchronization error: %v (%#v)", err, err)
return
}
for _, controllerSpec := range controllerSpecs {
err = rm.syncReplicationController(controllerSpec)
err = rm.syncHandler(controllerSpec)
if err != nil {
log.Printf("Error synchronizing: %#v", err)
}