Rework client.Interface

This commit is contained in:
derekwaynecarr
2014-10-21 17:14:35 -04:00
parent 2bbd11eda6
commit 580cb5ea4f
20 changed files with 640 additions and 368 deletions

View File

@@ -42,9 +42,9 @@ type ReplicationManager struct {
// created as an interface to allow testing.
type PodControlInterface interface {
// createReplica creates new replicated pods according to the spec.
createReplica(ctx api.Context, controllerSpec api.ReplicationController)
createReplica(namespace string, controllerSpec api.ReplicationController)
// deletePod deletes the pod identified by podID.
deletePod(ctx api.Context, podID string) error
deletePod(namespace string, podID string) error
}
// RealPodControl is the default implementation of PodControllerInterface.
@@ -52,7 +52,7 @@ type RealPodControl struct {
kubeClient client.Interface
}
func (r RealPodControl) createReplica(ctx api.Context, controllerSpec api.ReplicationController) {
func (r RealPodControl) createReplica(namespace string, controllerSpec api.ReplicationController) {
desiredLabels := make(labels.Set)
for k, v := range controllerSpec.DesiredState.PodTemplate.Labels {
desiredLabels[k] = v
@@ -65,13 +65,13 @@ func (r RealPodControl) createReplica(ctx api.Context, controllerSpec api.Replic
},
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
}
if _, err := r.kubeClient.CreatePod(ctx, pod); err != nil {
if _, err := r.kubeClient.Pods(namespace).Create(pod); err != nil {
glog.Errorf("Unable to create pod replica: %v", err)
}
}
func (r RealPodControl) deletePod(ctx api.Context, podID string) error {
return r.kubeClient.DeletePod(ctx, podID)
func (r RealPodControl) deletePod(namespace, podID string) error {
return r.kubeClient.Pods(namespace).Delete(podID)
}
// NewReplicationManager creates a new ReplicationManager.
@@ -95,9 +95,7 @@ func (rm *ReplicationManager) Run(period time.Duration) {
// resourceVersion is a pointer to the resource version to use/update.
func (rm *ReplicationManager) watchControllers(resourceVersion *string) {
ctx := api.NewContext()
watching, err := rm.kubeClient.WatchReplicationControllers(
ctx,
watching, err := rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(
labels.Everything(),
labels.Everything(),
*resourceVersion,
@@ -147,8 +145,7 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
s := labels.Set(controllerSpec.DesiredState.ReplicaSelector).AsSelector()
ctx := api.WithNamespace(api.NewContext(), controllerSpec.Namespace)
podList, err := rm.kubeClient.ListPods(ctx, s)
podList, err := rm.kubeClient.Pods(controllerSpec.Namespace).List(s)
if err != nil {
return err
}
@@ -162,7 +159,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
rm.podControl.createReplica(ctx, controllerSpec)
rm.podControl.createReplica(controllerSpec.Namespace, controllerSpec)
}()
}
wait.Wait()
@@ -173,7 +170,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
rm.podControl.deletePod(ctx, filteredList[ix].Name)
rm.podControl.deletePod(controllerSpec.Namespace, filteredList[ix].Name)
}(i)
}
wait.Wait()
@@ -185,8 +182,7 @@ func (rm *ReplicationManager) synchronize() {
// TODO: remove this method completely and rely on the watch.
// Add resource version tracking to watch to make this work.
var controllerSpecs []api.ReplicationController
ctx := api.NewContext()
list, err := rm.kubeClient.ListReplicationControllers(ctx, labels.Everything())
list, err := rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything())
if err != nil {
glog.Errorf("Synchronization error: %v (%#v)", err, err)
return