replacing handwritten client in several controllers

This commit is contained in:
Chao Xu
2016-01-14 21:00:58 -08:00
parent 55f039fd53
commit c72d234bbf
64 changed files with 648 additions and 531 deletions

View File

@@ -27,8 +27,8 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
@@ -237,7 +237,7 @@ type PodControlInterface interface {
// RealPodControl is the default implementation of PodControlInterface.
type RealPodControl struct {
KubeClient client.Interface
KubeClient clientset.Interface
Recorder record.EventRecorder
}
@@ -321,7 +321,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod
if labels.Set(pod.Labels).AsSelector().Empty() {
return fmt.Errorf("unable to create pods, no labels")
}
if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil {
if newPod, err := r.KubeClient.Legacy().Pods(namespace).Create(pod); err != nil {
r.Recorder.Eventf(object, api.EventTypeWarning, "FailedCreate", "Error creating: %v", err)
return fmt.Errorf("unable to create pods: %v", err)
} else {
@@ -336,7 +336,7 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime
if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}
if err := r.KubeClient.Pods(namespace).Delete(podID, nil); err != nil {
if err := r.KubeClient.Legacy().Pods(namespace).Delete(podID, nil); err != nil {
r.Recorder.Eventf(object, api.EventTypeWarning, "FailedDelete", "Error deleting: %v", err)
return fmt.Errorf("unable to delete pods: %v", err)
} else {
@@ -444,12 +444,12 @@ func FilterActivePods(pods []api.Pod) []*api.Pod {
//
// TODO: Extend this logic to load arbitrary local state for the controllers
// instead of just pods.
func SyncAllPodsWithStore(kubeClient client.Interface, store cache.Store) {
func SyncAllPodsWithStore(kubeClient clientset.Interface, store cache.Store) {
var allPods *api.PodList
var err error
listOptions := api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}
for {
if allPods, err = kubeClient.Pods(api.NamespaceAll).List(listOptions); err != nil {
if allPods, err = kubeClient.Legacy().Pods(api.NamespaceAll).List(listOptions); err != nil {
glog.Warningf("Retrying pod list: %v", err)
continue
}

View File

@@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
@@ -192,10 +193,10 @@ func TestCreatePods(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
clientset := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
podControl := RealPodControl{
KubeClient: client,
KubeClient: clientset,
Recorder: &record.FakeRecorder{},
}

View File

@@ -27,8 +27,10 @@ import (
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
unversioned_extensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
unversioned_legacy "k8s.io/kubernetes/pkg/client/typed/generated/legacy/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
@@ -60,7 +62,7 @@ const (
// DaemonSetsController is responsible for synchronizing DaemonSet objects stored
// in the system with actual running pods.
type DaemonSetsController struct {
kubeClient client.Interface
kubeClient clientset.Interface
podControl controller.PodControlInterface
// An dsc is temporarily suspended after creating/deleting these many replicas.
@@ -91,10 +93,11 @@ type DaemonSetsController struct {
queue *workqueue.Type
}
func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DaemonSetsController {
func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversioned_legacy.EventSinkImpl{kubeClient.Legacy().Events("")})
dsc := &DaemonSetsController{
kubeClient: kubeClient,
@@ -142,10 +145,10 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle
dsc.podStore.Store, dsc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Pods(api.NamespaceAll).List(options)
return dsc.kubeClient.Legacy().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dsc.kubeClient.Pods(api.NamespaceAll).Watch(options)
return dsc.kubeClient.Legacy().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
@@ -160,10 +163,10 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle
dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Nodes().List(options)
return dsc.kubeClient.Legacy().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dsc.kubeClient.Nodes().Watch(options)
return dsc.kubeClient.Legacy().Nodes().Watch(options)
},
},
&api.Node{},
@@ -463,7 +466,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
deleteWait.Wait()
}
func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
func storeDaemonSetStatus(dsClient unversioned_extensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled {
return nil
}

View File

@@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
@@ -134,8 +135,8 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
}
func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
podControl := &controller.FakePodControl{}
manager.podControl = podControl
@@ -480,8 +481,8 @@ func TestDSManagerInit(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc)
manager.dsStore.Add(ds)
manager.nodeStore.Add(newNode(nodeName, nil))
manager.podStoreSynced = alwaysReady

View File

@@ -29,8 +29,9 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
unversioned_legacy "k8s.io/kubernetes/pkg/client/typed/generated/legacy/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
@@ -56,8 +57,7 @@ const (
// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running rcs and pods.
type DeploymentController struct {
client client.Interface
expClient client.ExtensionsInterface
client clientset.Interface
eventRecorder record.EventRecorder
// To allow injection of syncDeployment for testing.
@@ -94,14 +94,14 @@ type DeploymentController struct {
}
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(client client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
func NewDeploymentController(client clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(client.Events(""))
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversioned_legacy.EventSinkImpl{client.Legacy().Events("")})
dc := &DeploymentController{
client: client,
expClient: client.Extensions(),
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(),
podExpectations: controller.NewControllerExpectations(),
@@ -111,10 +111,10 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re
dc.dStore.Store, dc.dController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.expClient.Deployments(api.NamespaceAll).List(options)
return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.expClient.Deployments(api.NamespaceAll).Watch(options)
return dc.client.Extensions().Deployments(api.NamespaceAll).Watch(options)
},
},
&extensions.Deployment{},
@@ -143,10 +143,10 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re
dc.rcStore.Store, dc.rcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.ReplicationControllers(api.NamespaceAll).List(options)
return dc.client.Legacy().ReplicationControllers(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.ReplicationControllers(api.NamespaceAll).Watch(options)
return dc.client.Legacy().ReplicationControllers(api.NamespaceAll).Watch(options)
},
},
&api.ReplicationController{},
@@ -161,10 +161,10 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re
dc.podStore.Store, dc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Pods(api.NamespaceAll).List(options)
return dc.client.Legacy().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.Pods(api.NamespaceAll).Watch(options)
return dc.client.Legacy().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
@@ -688,7 +688,7 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment, maxOl
if existingNewRC.Annotations[deploymentutil.RevisionAnnotation] != newRevision {
existingNewRC.Annotations[deploymentutil.RevisionAnnotation] = newRevision
glog.V(4).Infof("update existingNewRC %s revision to %s - %+v\n", existingNewRC.Name, newRevision)
return dc.client.ReplicationControllers(deployment.ObjectMeta.Namespace).Update(existingNewRC)
return dc.client.Legacy().ReplicationControllers(deployment.ObjectMeta.Namespace).Update(existingNewRC)
}
return existingNewRC, nil
}
@@ -728,7 +728,7 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment, maxOl
Template: &newRCTemplate,
},
}
createdRC, err := dc.client.ReplicationControllers(namespace).Create(&newRC)
createdRC, err := dc.client.Legacy().ReplicationControllers(namespace).Create(&newRC)
if err != nil {
dc.rcExpectations.DeleteExpectations(dKey)
return nil, fmt.Errorf("error creating replication controller: %v", err)
@@ -752,7 +752,7 @@ func (dc *DeploymentController) updateRCRevision(rc api.ReplicationController, r
rc.Annotations = make(map[string]string)
}
rc.Annotations[deploymentutil.RevisionAnnotation] = revision
_, err := dc.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(&rc)
_, err := dc.client.Legacy().ReplicationControllers(rc.ObjectMeta.Namespace).Update(&rc)
return err
}
@@ -901,7 +901,7 @@ func (dc *DeploymentController) cleanupOldRcs(oldRCs []*api.ReplicationControlle
if controller.Spec.Replicas != 0 || controller.Generation > controller.Status.ObservedGeneration {
continue
}
if err := dc.client.ReplicationControllers(controller.Namespace).Delete(controller.Name); err != nil && !errors.IsNotFound(err) {
if err := dc.client.Legacy().ReplicationControllers(controller.Namespace).Delete(controller.Name, nil); err != nil && !errors.IsNotFound(err) {
glog.V(2).Infof("Failed deleting old rc %v for deployment %v: %v", controller.Name, deployment.Name, err)
errList = append(errList, err)
}
@@ -923,7 +923,7 @@ func (dc *DeploymentController) updateDeploymentStatus(allRCs []*api.Replication
AvailableReplicas: availableReplicas,
UnavailableReplicas: unavailableReplicas,
}
_, err = dc.expClient.Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
_, err = dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
return err
}
@@ -958,12 +958,12 @@ func (dc *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationControl
func (dc *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) {
// TODO: Using client for now, update to use store when it is ready.
rc.Spec.Replicas = newScale
return dc.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
return dc.client.Legacy().ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
}
func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
// TODO: Using client for now, update to use store when it is ready.
return dc.expClient.Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
}
func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deployment, rc *api.ReplicationController) (d *extensions.Deployment, performedRollback bool, err error) {

View File

@@ -25,6 +25,8 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
exp "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/testing/fake"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
@@ -89,9 +91,9 @@ func TestDeploymentController_reconcileNewRC(t *testing.T) {
oldRc := rc("foo-v2", test.oldReplicas, nil)
allRcs := []*api.ReplicationController{newRc, oldRc}
deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0))
fake := &testclient.Fake{}
fake := fake.Clientset{}
controller := &DeploymentController{
client: fake,
client: &fake,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.reconcileNewRC(allRcs, newRc, deployment)
@@ -166,10 +168,10 @@ func TestDeploymentController_reconcileOldRCs(t *testing.T) {
allRcs := []*api.ReplicationController{oldRc}
oldRcs := []*api.ReplicationController{oldRc}
deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable)
fake := &testclient.Fake{}
fake.AddReactor("list", "pods", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
case testclient.ListAction:
case core.ListAction:
podList := &api.PodList{}
for podIndex := 0; podIndex < test.readyPods; podIndex++ {
podList.Items = append(podList.Items, api.Pod{
@@ -191,7 +193,7 @@ func TestDeploymentController_reconcileOldRCs(t *testing.T) {
return false, nil, nil
})
controller := &DeploymentController{
client: fake,
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment, false)
@@ -201,18 +203,18 @@ func TestDeploymentController_reconcileOldRCs(t *testing.T) {
}
if !test.scaleExpected {
if scaled {
t.Errorf("unexpected scaling: %v", fake.Actions())
t.Errorf("unexpected scaling: %v", fakeClientset.Actions())
}
continue
}
if test.scaleExpected && !scaled {
t.Errorf("expected scaling to occur; actions: %v", fake.Actions())
t.Errorf("expected scaling to occur; actions: %v", fakeClientset.Actions())
continue
}
// There are both list and update actions logged, so extract the update
// action for verification.
var updateAction testclient.UpdateAction
for _, action := range fake.Actions() {
for _, action := range fakeClientset.Actions() {
switch a := action.(type) {
case testclient.UpdateAction:
if updateAction != nil {
@@ -269,7 +271,7 @@ func TestDeploymentController_cleanupOldRCs(t *testing.T) {
}
for i, test := range tests {
fake := &testclient.Fake{}
fake := &fake.Clientset{}
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
controller.eventRecorder = &record.FakeRecorder{}
@@ -395,8 +397,7 @@ func newListOptions() api.ListOptions {
type fixture struct {
t *testing.T
client *testclient.Fake
client *fake.Clientset
// Objects to put in the store.
dStore []*exp.Deployment
rcStore []*api.ReplicationController
@@ -404,22 +405,22 @@ type fixture struct {
// Actions expected to happen on the client. Objects from here are also
// preloaded into NewSimpleFake.
actions []testclient.Action
actions []core.Action
objects *api.List
}
func (f *fixture) expectUpdateDeploymentAction(d *exp.Deployment) {
f.actions = append(f.actions, testclient.NewUpdateAction("deployments", d.Namespace, d))
f.actions = append(f.actions, core.NewUpdateAction("deployments", d.Namespace, d))
f.objects.Items = append(f.objects.Items, d)
}
func (f *fixture) expectCreateRCAction(rc *api.ReplicationController) {
f.actions = append(f.actions, testclient.NewCreateAction("replicationcontrollers", rc.Namespace, rc))
f.actions = append(f.actions, core.NewCreateAction("replicationcontrollers", rc.Namespace, rc))
f.objects.Items = append(f.objects.Items, rc)
}
func (f *fixture) expectUpdateRCAction(rc *api.ReplicationController) {
f.actions = append(f.actions, testclient.NewUpdateAction("replicationcontrollers", rc.Namespace, rc))
f.actions = append(f.actions, core.NewUpdateAction("replicationcontrollers", rc.Namespace, rc))
f.objects.Items = append(f.objects.Items, rc)
}
@@ -435,7 +436,7 @@ func newFixture(t *testing.T) *fixture {
}
func (f *fixture) run(deploymentName string) {
f.client = testclient.NewSimpleFake(f.objects)
f.client = fake.NewSimpleClientset(f.objects)
c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc)
c.eventRecorder = &record.FakeRecorder{}
c.rcStoreSynced = alwaysReady

View File

@@ -27,8 +27,9 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
unversioned_legacy "k8s.io/kubernetes/pkg/client/typed/generated/legacy/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
@@ -39,7 +40,7 @@ import (
)
type JobController struct {
kubeClient client.Interface
kubeClient clientset.Interface
podControl controller.PodControlInterface
// To allow injection of updateJobStatus for testing.
@@ -68,10 +69,11 @@ type JobController struct {
recorder record.EventRecorder
}
func NewJobController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversioned_legacy.EventSinkImpl{kubeClient.Legacy().Events("")})
jm := &JobController{
kubeClient: kubeClient,
@@ -110,10 +112,10 @@ func NewJobController(kubeClient client.Interface, resyncPeriod controller.Resyn
jm.podStore.Store, jm.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return jm.kubeClient.Pods(api.NamespaceAll).List(options)
return jm.kubeClient.Legacy().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return jm.kubeClient.Pods(api.NamespaceAll).Watch(options)
return jm.kubeClient.Legacy().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},

View File

@@ -25,6 +25,9 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/testing/fake"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
@@ -204,8 +207,8 @@ func TestControllerSyncJob(t *testing.T) {
for name, tc := range testCases {
// job manager setup
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@@ -299,8 +302,8 @@ func TestSyncJobPastDeadline(t *testing.T) {
for name, tc := range testCases {
// job manager setup
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@@ -369,8 +372,8 @@ func getCondition(job *extensions.Job, condition extensions.JobConditionType) bo
}
func TestSyncPastDeadlineJobFinished(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@@ -403,8 +406,8 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
}
func TestSyncJobComplete(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@@ -428,8 +431,8 @@ func TestSyncJobComplete(t *testing.T) {
}
func TestSyncJobDeleted(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@@ -448,8 +451,8 @@ func TestSyncJobDeleted(t *testing.T) {
}
func TestSyncJobUpdateRequeue(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@@ -469,8 +472,8 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
}
func TestJobPodLookup(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
testCases := []struct {
job *extensions.Job
@@ -559,8 +562,8 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
// TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods
// and checking expectations.
func TestSyncJobExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@@ -594,10 +597,10 @@ type FakeWatcher struct {
}
func TestWatchJobs(t *testing.T) {
client := testclient.NewSimpleFake()
clientset := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
var testJob extensions.Job
@@ -658,10 +661,10 @@ func TestIsJobFinished(t *testing.T) {
}
func TestWatchPods(t *testing.T) {
client := testclient.NewSimpleFake()
clientset := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(client, controller.NoResyncPeriodFunc)
clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
// Put one job and one pod into the store

View File

@@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
@@ -44,7 +44,7 @@ type PersistentVolumeClaimBinder struct {
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
func NewPersistentVolumeClaimBinder(kubeClient clientset.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
volumeIndex := NewPersistentVolumeOrderedIndex()
binderClient := NewBinderClient(kubeClient)
binder := &PersistentVolumeClaimBinder{
@@ -55,10 +55,10 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(options)
return kubeClient.Legacy().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(options)
return kubeClient.Legacy().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
@@ -73,10 +73,10 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
_, claimController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(options)
return kubeClient.Legacy().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(options)
return kubeClient.Legacy().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
@@ -453,38 +453,38 @@ type binderClient interface {
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
}
func NewBinderClient(c client.Interface) binderClient {
func NewBinderClient(c clientset.Interface) binderClient {
return &realBinderClient{c}
}
type realBinderClient struct {
client client.Interface
client clientset.Interface
}
func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
return c.client.Legacy().PersistentVolumes().Get(name)
}
func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
return c.client.Legacy().PersistentVolumes().Update(volume)
}
func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.PersistentVolumes().Delete(volume.Name)
return c.client.Legacy().PersistentVolumes().Delete(volume.Name, nil)
}
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
return c.client.Legacy().PersistentVolumes().UpdateStatus(volume)
}
func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(namespace).Get(name)
return c.client.Legacy().PersistentVolumeClaims(namespace).Get(name)
}
func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
return c.client.Legacy().PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
return c.client.Legacy().PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}

View File

@@ -28,15 +28,16 @@ import (
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/testing/fake"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/host_path"
)
func TestRunStop(t *testing.T) {
client := &testclient.Fake{}
binder := NewPersistentVolumeClaimBinder(client, 1*time.Second)
clientset := fake.NewSimpleClientset()
binder := NewPersistentVolumeClaimBinder(clientset, 1*time.Second)
if len(binder.stopChannels) != 0 {
t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels))
@@ -290,18 +291,18 @@ func TestExampleObjects(t *testing.T) {
for name, scenario := range scenarios {
codec := api.Codecs.UniversalDecoder()
o := testclient.NewObjects(api.Scheme, codec)
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/"+name, o, codec); err != nil {
o := core.NewObjects(api.Scheme, codec)
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/"+name, o, codec); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{}
client.AddReactor("*", "*", testclient.ObjectReaction(o, api.RESTMapper))
clientset := &fake.Clientset{}
clientset.AddReactor("*", "*", core.ObjectReaction(o, api.RESTMapper))
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolumeClaim{}) {
pvc, err := client.PersistentVolumeClaims("ns").Get("doesntmatter")
pvc, err := clientset.Legacy().PersistentVolumeClaims("ns").Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
t.Fatalf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolumeClaim)
@@ -320,9 +321,9 @@ func TestExampleObjects(t *testing.T) {
}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolume{}) {
pv, err := client.PersistentVolumes().Get("doesntmatter")
pv, err := clientset.Legacy().PersistentVolumes().Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
t.Fatalf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolume)
@@ -354,18 +355,21 @@ func TestBindingWithExamples(t *testing.T) {
defer os.RemoveAll(tmpDir)
codec := api.Codecs.UniversalDecoder()
o := testclient.NewObjects(api.Scheme, codec)
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, codec); err != nil {
o := core.NewObjects(api.Scheme, codec)
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, codec); err != nil {
t.Fatal(err)
}
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, codec); err != nil {
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, codec); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{}
client.AddReactor("*", "*", testclient.ObjectReaction(o, api.RESTMapper))
clientset := &fake.Clientset{}
clientset.AddReactor("*", "*", core.ObjectReaction(o, api.RESTMapper))
pv, err := client.PersistentVolumes().Get("any")
pv, err := clientset.Legacy().PersistentVolumes().Get("any")
if err != nil {
t.Errorf("Unexpected error getting PV from client: %v", err)
}
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle
if err != nil {
t.Errorf("Unexpected error getting PV from client: %v", err)
@@ -377,7 +381,7 @@ func TestBindingWithExamples(t *testing.T) {
// Test that !Pending gets correctly added
pv.Status.Phase = api.VolumeAvailable
claim, error := client.PersistentVolumeClaims("ns").Get("any")
claim, error := clientset.Legacy().PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Errorf("Unexpected error getting PVC from client: %v", err)
}
@@ -393,7 +397,7 @@ func TestBindingWithExamples(t *testing.T) {
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost(tmpDir, nil, nil))
recycler := &PersistentVolumeRecycler{
kubeClient: client,
kubeClient: clientset,
client: mockClient,
pluginMgr: plugMgr,
}
@@ -463,8 +467,8 @@ func TestBindingWithExamples(t *testing.T) {
}
func TestCasting(t *testing.T) {
client := &testclient.Fake{}
binder := NewPersistentVolumeClaimBinder(client, 1*time.Second)
clientset := fake.NewSimpleClientset()
binder := NewPersistentVolumeClaimBinder(clientset, 1*time.Second)
pv := &api.PersistentVolume{}
unk := cache.DeletedFinalStateUnknown{}

View File

@@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
@@ -368,68 +368,68 @@ type controllerClient interface {
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
// provided to give VolumeHost and plugins access to the kube client
GetKubeClient() client.Interface
GetKubeClient() clientset.Interface
}
func NewControllerClient(c client.Interface) controllerClient {
func NewControllerClient(c clientset.Interface) controllerClient {
return &realControllerClient{c}
}
var _ controllerClient = &realControllerClient{}
type realControllerClient struct {
client client.Interface
client clientset.Interface
}
func (c *realControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
return c.client.Legacy().PersistentVolumes().Get(name)
}
func (c *realControllerClient) ListPersistentVolumes(options api.ListOptions) (*api.PersistentVolumeList, error) {
return c.client.PersistentVolumes().List(options)
return c.client.Legacy().PersistentVolumes().List(options)
}
func (c *realControllerClient) WatchPersistentVolumes(options api.ListOptions) (watch.Interface, error) {
return c.client.PersistentVolumes().Watch(options)
return c.client.Legacy().PersistentVolumes().Watch(options)
}
func (c *realControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Create(pv)
return c.client.Legacy().PersistentVolumes().Create(pv)
}
func (c *realControllerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
return c.client.Legacy().PersistentVolumes().Update(volume)
}
func (c *realControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.PersistentVolumes().Delete(volume.Name)
return c.client.Legacy().PersistentVolumes().Delete(volume.Name, nil)
}
func (c *realControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
return c.client.Legacy().PersistentVolumes().UpdateStatus(volume)
}
func (c *realControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(namespace).Get(name)
return c.client.Legacy().PersistentVolumeClaims(namespace).Get(name)
}
func (c *realControllerClient) ListPersistentVolumeClaims(namespace string, options api.ListOptions) (*api.PersistentVolumeClaimList, error) {
return c.client.PersistentVolumeClaims(namespace).List(options)
return c.client.Legacy().PersistentVolumeClaims(namespace).List(options)
}
func (c *realControllerClient) WatchPersistentVolumeClaims(namespace string, options api.ListOptions) (watch.Interface, error) {
return c.client.PersistentVolumeClaims(namespace).Watch(options)
return c.client.Legacy().PersistentVolumeClaims(namespace).Watch(options)
}
func (c *realControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
return c.client.Legacy().PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
return c.client.Legacy().PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}
func (c *realControllerClient) GetKubeClient() client.Interface {
func (c *realControllerClient) GetKubeClient() clientset.Interface {
return c.client
}
@@ -469,7 +469,7 @@ func (c *PersistentVolumeProvisionerController) GetPodPluginDir(podUID types.UID
return ""
}
func (c *PersistentVolumeProvisionerController) GetKubeClient() client.Interface {
func (c *PersistentVolumeProvisionerController) GetKubeClient() clientset.Interface {
return c.client.GetKubeClient()
}

View File

@@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/volume"
@@ -248,6 +248,6 @@ func (c *mockControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.Pers
return claim, nil
}
func (c *mockControllerClient) GetKubeClient() client.Interface {
func (c *mockControllerClient) GetKubeClient() clientset.Interface {
return nil
}

View File

@@ -23,7 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
@@ -43,13 +43,13 @@ type PersistentVolumeRecycler struct {
volumeController *framework.Controller
stopChannel chan struct{}
client recyclerClient
kubeClient client.Interface
kubeClient clientset.Interface
pluginMgr volume.VolumePluginMgr
cloud cloudprovider.Interface
}
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient)
recycler := &PersistentVolumeRecycler{
client: recyclerClient,
@@ -64,10 +64,10 @@ func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Du
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(options)
return kubeClient.Legacy().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(options)
return kubeClient.Legacy().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
@@ -249,28 +249,28 @@ type recyclerClient interface {
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
}
func NewRecyclerClient(c client.Interface) recyclerClient {
func NewRecyclerClient(c clientset.Interface) recyclerClient {
return &realRecyclerClient{c}
}
type realRecyclerClient struct {
client client.Interface
client clientset.Interface
}
func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
return c.client.Legacy().PersistentVolumes().Get(name)
}
func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
return c.client.Legacy().PersistentVolumes().Update(volume)
}
func (c *realRecyclerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.PersistentVolumes().Delete(volume.Name)
return c.client.Legacy().PersistentVolumes().Delete(volume.Name, nil)
}
func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
return c.client.Legacy().PersistentVolumes().UpdateStatus(volume)
}
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
@@ -287,7 +287,7 @@ func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName
return ""
}
func (f *PersistentVolumeRecycler) GetKubeClient() client.Interface {
func (f *PersistentVolumeRecycler) GetKubeClient() clientset.Interface {
return f.kubeClient
}

View File

@@ -21,7 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/client/testing/fake"
"k8s.io/kubernetes/pkg/volume"
)
@@ -56,7 +56,7 @@ func TestFailedRecycling(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
recycler := &PersistentVolumeRecycler{
kubeClient: &testclient.Fake{},
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
}

View File

@@ -28,8 +28,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
unversioned_legacy "k8s.io/kubernetes/pkg/client/typed/generated/legacy/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
@@ -59,7 +60,7 @@ const (
// ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
// in the system with actual running pods.
type ReplicaSetController struct {
kubeClient client.Interface
kubeClient clientset.Interface
podControl controller.PodControlInterface
// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
@@ -88,10 +89,10 @@ type ReplicaSetController struct {
}
// NewReplicaSetController creates a new ReplicaSetController.
func NewReplicaSetController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicaSetController {
func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
eventBroadcaster.StartRecordingToSink(&unversioned_legacy.EventSinkImpl{kubeClient.Legacy().Events("")})
rsc := &ReplicaSetController{
kubeClient: kubeClient,
@@ -148,10 +149,10 @@ func NewReplicaSetController(kubeClient client.Interface, resyncPeriod controlle
rsc.podStore.Store, rsc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rsc.kubeClient.Pods(api.NamespaceAll).List(options)
return rsc.kubeClient.Legacy().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rsc.kubeClient.Pods(api.NamespaceAll).Watch(options)
return rsc.kubeClient.Legacy().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},

View File

@@ -30,6 +30,9 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/testing/fake"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
@@ -133,7 +136,7 @@ type serverResponse struct {
}
func TestSyncReplicaSetDoesNothing(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -150,7 +153,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
}
func TestSyncReplicaSetDeletes(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -167,7 +170,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
}
func TestDeleteFinalStateUnknown(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -201,7 +204,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
}
func TestSyncReplicaSetCreates(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -224,7 +227,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -267,7 +270,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -306,7 +309,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
@@ -356,7 +359,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
}
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicaSetController(client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicaSetController(clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRSs []*extensions.ReplicaSet
@@ -417,13 +420,13 @@ func TestPodControllerLookup(t *testing.T) {
type FakeWatcher struct {
w *watch.FakeWatcher
*testclient.Fake
*fake.Clientset
}
func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
client := &fake.Clientset{}
client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -465,8 +468,8 @@ func TestWatchControllers(t *testing.T) {
func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
client := &fake.Clientset{}
client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -510,7 +513,7 @@ func TestWatchPods(t *testing.T) {
}
func TestUpdatePods(t *testing.T) {
manager := NewReplicaSetController(testclient.NewSimpleFake(), controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicaSetController(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
received := make(chan string)
@@ -570,7 +573,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
@@ -606,12 +609,12 @@ func TestControllerUpdateRequeue(t *testing.T) {
func TestControllerUpdateStatusWithFailure(t *testing.T) {
rs := newReplicaSet(1, map[string]string{"foo": "bar"})
fakeClient := &testclient.FakeExperimental{Fake: &testclient.Fake{}}
fakeClient.AddReactor("get", "replicasets", func(action testclient.Action) (bool, runtime.Object, error) { return true, rs, nil })
fakeClient.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) {
fakeClient := &fake.Clientset{}
fakeClient.AddReactor("get", "replicasets", func(action core.Action) (bool, runtime.Object, error) { return true, rs, nil })
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, &extensions.ReplicaSet{}, fmt.Errorf("Fake error")
})
fakeRSClient := &testclient.FakeReplicaSets{fakeClient, "default"}
fakeRSClient := fakeClient.Extensions().ReplicaSets("default")
numReplicas := 10
updateReplicaCount(fakeRSClient, *rs, numReplicas)
updates, gets := 0, 0
@@ -649,7 +652,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
}
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, burstReplicas)
manager.podStoreSynced = alwaysReady
@@ -771,7 +774,7 @@ func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool {
// TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods
// and checking expectations.
func TestRSSyncExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2)
manager.podStoreSynced = alwaysReady
@@ -797,7 +800,7 @@ func TestRSSyncExpectations(t *testing.T) {
}
func TestDeleteControllerAndExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10)
manager.podStoreSynced = alwaysReady
@@ -839,7 +842,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
}
func TestRSManagerNotReady(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2)
manager.podControl = &fakePodControl
@@ -876,7 +879,7 @@ func shuffle(controllers []*extensions.ReplicaSet) []*extensions.ReplicaSet {
}
func TestOverlappingRSs(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
client := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
labelMap := map[string]string{"foo": "bar"}
for i := 0; i < 5; i++ {

View File

@@ -27,8 +27,9 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
unversioned_legacy "k8s.io/kubernetes/pkg/client/typed/generated/legacy/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
@@ -61,7 +62,7 @@ const (
// TODO: this really should be called ReplicationController. The only reason why it's a Manager
// is to distinguish this type from API object "ReplicationController". We should fix this.
type ReplicationManager struct {
kubeClient client.Interface
kubeClient clientset.Interface
podControl controller.PodControlInterface
// An rc is temporarily suspended after creating/deleting these many replicas.
@@ -90,10 +91,10 @@ type ReplicationManager struct {
}
// NewReplicationManager creates a new ReplicationManager.
func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicationManager {
func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
eventBroadcaster.StartRecordingToSink(&unversioned_legacy.EventSinkImpl{kubeClient.Legacy().Events("")})
rm := &ReplicationManager{
kubeClient: kubeClient,
@@ -109,10 +110,10 @@ func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller.
rm.rcStore.Store, rm.rcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(options)
return rm.kubeClient.Legacy().ReplicationControllers(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(options)
return rm.kubeClient.Legacy().ReplicationControllers(api.NamespaceAll).Watch(options)
},
},
&api.ReplicationController{},
@@ -150,10 +151,10 @@ func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller.
rm.podStore.Store, rm.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rm.kubeClient.Pods(api.NamespaceAll).List(options)
return rm.kubeClient.Legacy().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rm.kubeClient.Pods(api.NamespaceAll).Watch(options)
return rm.kubeClient.Legacy().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
@@ -451,7 +452,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
}
// Always updates status as pods come up or die.
if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(rc.Namespace), rc, len(filteredPods)); err != nil {
if err := updateReplicaCount(rm.kubeClient.Legacy().ReplicationControllers(rc.Namespace), rc, len(filteredPods)); err != nil {
// Multiple things could lead to this update failing. Requeuing the controller ensures
// we retry with some fairness.
glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rc.Namespace, rc.Name, err)

View File

@@ -29,8 +29,10 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/testing/fake"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
@@ -132,9 +134,9 @@ type serverResponse struct {
}
func TestSyncReplicationControllerDoesNothing(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
@@ -148,9 +150,9 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
}
func TestSyncReplicationControllerDeletes(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -164,9 +166,9 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
}
func TestDeleteFinalStateUnknown(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -197,8 +199,8 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
}
func TestSyncReplicationControllerCreates(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
@@ -220,8 +222,8 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
c := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Steady state for the replication controller, no Status.Replicas updates expected
@@ -262,9 +264,8 @@ func TestControllerUpdateReplicas(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
c := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
@@ -302,10 +303,9 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -351,7 +351,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
}
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicationManager(clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRCs []*api.ReplicationController
@@ -410,16 +410,11 @@ func TestPodControllerLookup(t *testing.T) {
}
}
type FakeWatcher struct {
w *watch.FakeWatcher
*testclient.Fake
}
func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
var testControllerSpec api.ReplicationController
@@ -460,9 +455,9 @@ func TestWatchControllers(t *testing.T) {
func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Put one rc and one pod into the controller's stores
@@ -504,7 +499,7 @@ func TestWatchPods(t *testing.T) {
}
func TestUpdatePods(t *testing.T) {
manager := NewReplicationManager(testclient.NewSimpleFake(), controller.NoResyncPeriodFunc, BurstReplicas)
manager := NewReplicationManager(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
received := make(chan string)
@@ -563,8 +558,8 @@ func TestControllerUpdateRequeue(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
c := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@@ -598,31 +593,31 @@ func TestControllerUpdateRequeue(t *testing.T) {
func TestControllerUpdateStatusWithFailure(t *testing.T) {
rc := newReplicationController(1)
fakeClient := &testclient.Fake{}
fakeClient.AddReactor("get", "replicationcontrollers", func(action testclient.Action) (bool, runtime.Object, error) {
c := &fake.Clientset{}
c.AddReactor("get", "replicationcontrollers", func(action core.Action) (bool, runtime.Object, error) {
return true, rc, nil
})
fakeClient.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) {
c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, &api.ReplicationController{}, fmt.Errorf("Fake error")
})
fakeRCClient := &testclient.FakeReplicationControllers{fakeClient, "default"}
fakeRCClient := c.Legacy().ReplicationControllers("default")
numReplicas := 10
updateReplicaCount(fakeRCClient, *rc, numReplicas)
updates, gets := 0, 0
for _, a := range fakeClient.Actions() {
for _, a := range c.Actions() {
if a.GetResource() != "replicationcontrollers" {
t.Errorf("Unexpected action %+v", a)
continue
}
switch action := a.(type) {
case testclient.GetAction:
case core.GetAction:
gets++
// Make sure the get is for the right rc even though the update failed.
if action.GetName() != rc.Name {
t.Errorf("Expected get for rc %v, got %+v instead", rc.Name, action.GetName())
}
case testclient.UpdateAction:
case core.UpdateAction:
updates++
// Confirm that the update has the right status.Replicas even though the Get
// returned an rc with replicas=1.
@@ -643,9 +638,9 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
}
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, burstReplicas)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, burstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -763,9 +758,9 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool {
// TestRCSyncExpectations tests that a pod cannot sneak in between counting active pods
// and checking expectations.
func TestRCSyncExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -788,8 +783,8 @@ func TestRCSyncExpectations(t *testing.T) {
}
func TestDeleteControllerAndExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10)
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@@ -830,9 +825,9 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
}
func TestRCManagerNotReady(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
@@ -867,10 +862,10 @@ func shuffle(controllers []*api.ReplicationController) []*api.ReplicationControl
}
func TestOverlappingRCs(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
c := clientset.NewForConfigOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
for i := 0; i < 5; i++ {
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10)
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10)
manager.podStoreSynced = alwaysReady
// Create 10 rcs, shuffled them randomly and insert them into the rc manager's store
@@ -910,8 +905,8 @@ func TestRCManagerInit(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
c := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas)
manager.rcStore.Store.Add(rc)
manager.podStoreSynced = alwaysReady
controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store)

View File

@@ -21,11 +21,11 @@ package replication
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
unversioned_legacy "k8s.io/kubernetes/pkg/client/typed/generated/legacy/unversioned"
)
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) {
func updateReplicaCount(rcClient unversioned_legacy.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) {
// This is the steady state. It happens when the rc doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.