Remaining refactor for PodTemplateSpec and fixing test cases
This commit is contained in:
@@ -35,14 +35,14 @@ type ReplicationManager struct {
|
||||
syncTime <-chan time.Time
|
||||
|
||||
// To allow injection of syncReplicationController for testing.
|
||||
syncHandler func(controllerSpec api.ReplicationController) error
|
||||
syncHandler func(controller api.ReplicationController) error
|
||||
}
|
||||
|
||||
// PodControlInterface is an interface that knows how to add or delete pods
|
||||
// created as an interface to allow testing.
|
||||
type PodControlInterface interface {
|
||||
// createReplica creates new replicated pods according to the spec.
|
||||
createReplica(namespace string, controllerSpec api.ReplicationController)
|
||||
createReplica(namespace string, controller api.ReplicationController)
|
||||
// deletePod deletes the pod identified by podID.
|
||||
deletePod(namespace string, podID string) error
|
||||
}
|
||||
@@ -52,16 +52,19 @@ type RealPodControl struct {
|
||||
kubeClient client.Interface
|
||||
}
|
||||
|
||||
func (r RealPodControl) createReplica(namespace string, controllerSpec api.ReplicationController) {
|
||||
func (r RealPodControl) createReplica(namespace string, controller api.ReplicationController) {
|
||||
desiredLabels := make(labels.Set)
|
||||
for k, v := range controllerSpec.DesiredState.PodTemplate.Labels {
|
||||
for k, v := range controller.Spec.Template.Labels {
|
||||
desiredLabels[k] = v
|
||||
}
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: desiredLabels,
|
||||
},
|
||||
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
|
||||
}
|
||||
if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.DesiredState.Manifest); err != nil {
|
||||
glog.Errorf("Unable to convert pod template: %v", err)
|
||||
return
|
||||
}
|
||||
if _, err := r.kubeClient.Pods(namespace).Create(pod); err != nil {
|
||||
glog.Errorf("Unable to create pod replica: %v", err)
|
||||
@@ -142,14 +145,14 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
|
||||
return result
|
||||
}
|
||||
|
||||
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
|
||||
s := labels.Set(controllerSpec.DesiredState.ReplicaSelector).AsSelector()
|
||||
podList, err := rm.kubeClient.Pods(controllerSpec.Namespace).List(s)
|
||||
func (rm *ReplicationManager) syncReplicationController(controller api.ReplicationController) error {
|
||||
s := labels.Set(controller.Spec.Selector).AsSelector()
|
||||
podList, err := rm.kubeClient.Pods(controller.Namespace).List(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filteredList := rm.filterActivePods(podList.Items)
|
||||
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
|
||||
diff := len(filteredList) - controller.Spec.Replicas
|
||||
if diff < 0 {
|
||||
diff *= -1
|
||||
wait := sync.WaitGroup{}
|
||||
@@ -158,7 +161,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
|
||||
for i := 0; i < diff; i++ {
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
rm.podControl.createReplica(controllerSpec.Namespace, controllerSpec)
|
||||
rm.podControl.createReplica(controller.Namespace, controller)
|
||||
}()
|
||||
}
|
||||
wait.Wait()
|
||||
@@ -169,7 +172,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
|
||||
for i := 0; i < diff; i++ {
|
||||
go func(ix int) {
|
||||
defer wait.Done()
|
||||
rm.podControl.deletePod(controllerSpec.Namespace, filteredList[ix].Name)
|
||||
rm.podControl.deletePod(controller.Namespace, filteredList[ix].Name)
|
||||
}(i)
|
||||
}
|
||||
wait.Wait()
|
||||
@@ -180,20 +183,20 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
|
||||
func (rm *ReplicationManager) synchronize() {
|
||||
// TODO: remove this method completely and rely on the watch.
|
||||
// Add resource version tracking to watch to make this work.
|
||||
var controllerSpecs []api.ReplicationController
|
||||
var controllers []api.ReplicationController
|
||||
list, err := rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Synchronization error: %v (%#v)", err, err)
|
||||
return
|
||||
}
|
||||
controllerSpecs = list.Items
|
||||
controllers = list.Items
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(controllerSpecs))
|
||||
for ix := range controllerSpecs {
|
||||
wg.Add(len(controllers))
|
||||
for ix := range controllers {
|
||||
go func(ix int) {
|
||||
defer wg.Done()
|
||||
glog.V(4).Infof("periodic sync of %v", controllerSpecs[ix].Name)
|
||||
err := rm.syncHandler(controllerSpecs[ix])
|
||||
glog.V(4).Infof("periodic sync of %v", controllers[ix].Name)
|
||||
err := rm.syncHandler(controllers[ix])
|
||||
if err != nil {
|
||||
glog.Errorf("Error synchronizing: %#v", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user