add FullyLabeledReplicas in Replicaset Status and ReplicationController Status
This commit is contained in:
@@ -547,8 +547,21 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
rsc.manageReplicas(filteredPods, &rs)
|
||||
}
|
||||
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
// template of the replicaSet, the matching pods may have more labels than
|
||||
// are in the template. Because the label of podTemplateSpec is a superset
|
||||
// of the selector of the replicaset, so the possible matching pods must be
|
||||
// part of the filteredPods.
|
||||
fullyLabeledReplicasCount := 0
|
||||
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelector()
|
||||
for _, pod := range filteredPods {
|
||||
if templateLabel.Matches(labels.Set(pod.Labels)) {
|
||||
fullyLabeledReplicasCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Always updates status as pods come up or die.
|
||||
if err := updateReplicaCount(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, len(filteredPods)); err != nil {
|
||||
if err := updateReplicaCount(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, len(filteredPods), fullyLabeledReplicasCount); err != nil {
|
||||
// Multiple things could lead to this update failing. Requeuing the replica set ensures
|
||||
// we retry with some fairness.
|
||||
glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rs.Namespace, rs.Name, err)
|
||||
|
@@ -98,12 +98,12 @@ func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.Repl
|
||||
}
|
||||
|
||||
// create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store.
|
||||
func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet) *api.PodList {
|
||||
func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet, name string) *api.PodList {
|
||||
pods := []api.Pod{}
|
||||
for i := 0; i < count; i++ {
|
||||
newPod := api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: fmt.Sprintf("pod%d", i),
|
||||
Name: fmt.Sprintf("%s%d", name, i),
|
||||
Labels: labelMap,
|
||||
Namespace: rs.Namespace,
|
||||
},
|
||||
@@ -147,7 +147,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rsSpec := newReplicaSet(2, labelMap)
|
||||
manager.rsStore.Store.Add(rsSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec, "pod")
|
||||
|
||||
manager.podControl = &fakePodControl
|
||||
manager.syncReplicaSet(getKey(rsSpec, t))
|
||||
@@ -165,7 +165,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rsSpec := newReplicaSet(1, labelMap)
|
||||
manager.rsStore.Store.Add(rsSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec, "pod")
|
||||
|
||||
manager.syncReplicaSet(getKey(rsSpec, t))
|
||||
validateSyncReplicaSet(t, &fakePodControl, 0, 1)
|
||||
@@ -189,7 +189,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rsSpec := newReplicaSet(1, labelMap)
|
||||
manager.rsStore.Store.Add(rsSpec)
|
||||
pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec)
|
||||
pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod")
|
||||
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
|
||||
|
||||
go manager.worker()
|
||||
@@ -239,7 +239,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
|
||||
rs := newReplicaSet(activePods, labelMap)
|
||||
manager.rsStore.Store.Add(rs)
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: activePods}
|
||||
newPodList(manager.podStore.Store, activePods, api.PodRunning, labelMap, rs)
|
||||
newPodList(manager.podStore.Store, activePods, api.PodRunning, labelMap, rs, "pod")
|
||||
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
@@ -279,11 +279,14 @@ func TestControllerUpdateReplicas(t *testing.T) {
|
||||
// Insufficient number of pods in the system, and Status.Replicas is wrong;
|
||||
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
|
||||
rs := newReplicaSet(5, labelMap)
|
||||
rs.Spec.Template.Labels = extraLabelMap
|
||||
manager.rsStore.Store.Add(rs)
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ObservedGeneration: 0}
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0}
|
||||
rs.Generation = 1
|
||||
newPodList(manager.podStore.Store, 4, api.PodRunning, labelMap, rs)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rs, "pod")
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel")
|
||||
|
||||
// This response body is just so we don't err out decoding the http response
|
||||
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
|
||||
@@ -295,8 +298,10 @@ func TestControllerUpdateReplicas(t *testing.T) {
|
||||
manager.syncReplicaSet(getKey(rs, t))
|
||||
|
||||
// 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod.
|
||||
// 2. Every update to the status should include the Generation of the spec.
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: 4, ObservedGeneration: 1}
|
||||
// 2. Status.FullyLabeledReplicas should equal to the number of pods that
|
||||
// has the extra labels, i.e., 2.
|
||||
// 3. Every update to the status should include the Generation of the spec.
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: 4, FullyLabeledReplicas: 2, ObservedGeneration: 1}
|
||||
|
||||
decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
|
||||
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc)
|
||||
@@ -321,7 +326,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rsSpec := newReplicaSet(2, labelMap)
|
||||
manager.rsStore.Store.Add(rsSpec)
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rsSpec)
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rsSpec, "pod")
|
||||
|
||||
// Creates a replica and sets expectations
|
||||
rsSpec.Status.Replicas = 1
|
||||
@@ -502,7 +507,7 @@ func TestWatchPods(t *testing.T) {
|
||||
go manager.podController.Run(stopCh)
|
||||
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
|
||||
|
||||
pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec)
|
||||
pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod")
|
||||
testPod := pods.Items[0]
|
||||
testPod.Status.Phase = api.PodFailed
|
||||
fakeWatch.Add(&testPod)
|
||||
@@ -544,7 +549,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
manager.rsStore.Store.Add(&testRSSpec2)
|
||||
|
||||
// Put one pod in the podStore
|
||||
pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap1, testRSSpec1).Items[0]
|
||||
pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
|
||||
pod2 := pod1
|
||||
pod2.Labels = labelMap2
|
||||
|
||||
@@ -583,7 +588,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
|
||||
rs := newReplicaSet(1, labelMap)
|
||||
manager.rsStore.Store.Add(rs)
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: 2}
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rs)
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rs, "pod")
|
||||
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
@@ -618,7 +623,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
})
|
||||
fakeRSClient := fakeClient.Extensions().ReplicaSets("default")
|
||||
numReplicas := 10
|
||||
updateReplicaCount(fakeRSClient, *rs, numReplicas)
|
||||
updateReplicaCount(fakeRSClient, *rs, numReplicas, 0)
|
||||
updates, gets := 0, 0
|
||||
for _, a := range fakeClient.Actions() {
|
||||
if a.GetResource() != "replicasets" {
|
||||
@@ -666,7 +671,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
manager.rsStore.Store.Add(rsSpec)
|
||||
|
||||
expectedPods := 0
|
||||
pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec)
|
||||
pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod")
|
||||
|
||||
rsKey, err := controller.KeyFunc(rsSpec)
|
||||
if err != nil {
|
||||
@@ -783,7 +788,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods)
|
||||
}
|
||||
// Replenish the pod list, since we cut it down sizing up
|
||||
pods = newPodList(nil, replicas, api.PodRunning, labelMap, rsSpec)
|
||||
pods = newPodList(nil, replicas, api.PodRunning, labelMap, rsSpec, "pod")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -816,7 +821,7 @@ func TestRSSyncExpectations(t *testing.T) {
|
||||
labelMap := map[string]string{"foo": "bar"}
|
||||
rsSpec := newReplicaSet(2, labelMap)
|
||||
manager.rsStore.Store.Add(rsSpec)
|
||||
pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec)
|
||||
pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod")
|
||||
manager.podStore.Store.Add(&pods.Items[0])
|
||||
postExpectationsPod := pods.Items[1]
|
||||
|
||||
@@ -932,7 +937,7 @@ func TestOverlappingRSs(t *testing.T) {
|
||||
manager.rsStore.Store.Add(shuffledControllers[j])
|
||||
}
|
||||
// Add a pod and make sure only the oldest ReplicaSet is synced
|
||||
pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0])
|
||||
pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod")
|
||||
rsKey := getKey(controllers[0], t)
|
||||
|
||||
manager.addPod(&pods.Items[0])
|
||||
@@ -955,7 +960,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Couldn't get key for object %+v: %v", rs, err)
|
||||
}
|
||||
pod := newPodList(nil, 1, api.PodPending, labelMap, rs).Items[0]
|
||||
pod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
|
||||
pod.DeletionTimestamp = &unversioned.Time{time.Now()}
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||
|
||||
@@ -975,7 +980,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
|
||||
// An update from no deletion timestamp to having one should be treated
|
||||
// as a deletion.
|
||||
oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs).Items[0]
|
||||
oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
|
||||
|
@@ -19,17 +19,20 @@ limitations under the License.
|
||||
package replicaset
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
)
|
||||
|
||||
// updateReplicaCount attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
|
||||
func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas int) (updateErr error) {
|
||||
func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas, numFullyLabeledReplicas int) (updateErr error) {
|
||||
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
|
||||
// we do a periodic relist every 30s. If the generations differ but the replicas are
|
||||
// the same, a caller might've resized to the same replica count.
|
||||
if rs.Status.Replicas == numReplicas &&
|
||||
rs.Status.FullyLabeledReplicas == numFullyLabeledReplicas &&
|
||||
rs.Generation == rs.Status.ObservedGeneration {
|
||||
return nil
|
||||
}
|
||||
@@ -41,10 +44,12 @@ func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.Repli
|
||||
|
||||
var getErr error
|
||||
for i, rs := 0, &rs; ; i++ {
|
||||
glog.V(4).Infof("Updating replica count for ReplicaSet: %v, %d->%d (need %d), sequence No: %v->%v",
|
||||
rs.Name, rs.Status.Replicas, numReplicas, rs.Spec.Replicas, rs.Status.ObservedGeneration, generation)
|
||||
glog.V(4).Infof(fmt.Sprintf("Updating replica count for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
|
||||
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, numReplicas, rs.Spec.Replicas) +
|
||||
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
|
||||
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, generation))
|
||||
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: numReplicas, ObservedGeneration: generation}
|
||||
rs.Status = extensions.ReplicaSetStatus{Replicas: numReplicas, FullyLabeledReplicas: numFullyLabeledReplicas, ObservedGeneration: generation}
|
||||
_, updateErr = rsClient.UpdateStatus(rs)
|
||||
if updateErr == nil || i >= statusUpdateRetries {
|
||||
return updateErr
|
||||
|
@@ -540,8 +540,21 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
||||
rm.manageReplicas(filteredPods, &rc)
|
||||
}
|
||||
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
// template of the replication controller, the matching pods may have more
|
||||
// labels than are in the template. Because the label of podTemplateSpec is
|
||||
// a superset of the selector of the replication controller, so the possible
|
||||
// matching pods must be part of the filteredPods.
|
||||
fullyLabeledReplicasCount := 0
|
||||
templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelector()
|
||||
for _, pod := range filteredPods {
|
||||
if templateLabel.Matches(labels.Set(pod.Labels)) {
|
||||
fullyLabeledReplicasCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Always updates status as pods come up or die.
|
||||
if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods)); err != nil {
|
||||
if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods), fullyLabeledReplicasCount); err != nil {
|
||||
// Multiple things could lead to this update failing. Requeuing the controller ensures
|
||||
// we retry with some fairness.
|
||||
glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rc.Namespace, rc.Name, err)
|
||||
|
@@ -96,12 +96,12 @@ func newReplicationController(replicas int) *api.ReplicationController {
|
||||
}
|
||||
|
||||
// create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store.
|
||||
func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController) *api.PodList {
|
||||
func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController, name string) *api.PodList {
|
||||
pods := []api.Pod{}
|
||||
for i := 0; i < count; i++ {
|
||||
newPod := api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: fmt.Sprintf("pod%d", i),
|
||||
Name: fmt.Sprintf("%s%d", name, i),
|
||||
Labels: rc.Spec.Selector,
|
||||
Namespace: rc.Namespace,
|
||||
},
|
||||
@@ -144,7 +144,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
|
||||
// 2 running pods, a controller with 2 replicas, sync is a no-op
|
||||
controllerSpec := newReplicationController(2)
|
||||
manager.rcStore.Store.Add(controllerSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec, "pod")
|
||||
|
||||
manager.podControl = &fakePodControl
|
||||
manager.syncReplicationController(getKey(controllerSpec, t))
|
||||
@@ -161,7 +161,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
|
||||
// 2 running pods and a controller with 1 replica, one pod delete expected
|
||||
controllerSpec := newReplicationController(1)
|
||||
manager.rcStore.Store.Add(controllerSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec, "pod")
|
||||
|
||||
manager.syncReplicationController(getKey(controllerSpec, t))
|
||||
validateSyncReplication(t, &fakePodControl, 0, 1)
|
||||
@@ -184,7 +184,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
// the controller matching the selectors of the deleted pod into the work queue.
|
||||
controllerSpec := newReplicationController(1)
|
||||
manager.rcStore.Store.Add(controllerSpec)
|
||||
pods := newPodList(nil, 1, api.PodRunning, controllerSpec)
|
||||
pods := newPodList(nil, 1, api.PodRunning, controllerSpec, "pod")
|
||||
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
|
||||
|
||||
go manager.worker()
|
||||
@@ -233,7 +233,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
|
||||
rc := newReplicationController(activePods)
|
||||
manager.rcStore.Store.Add(rc)
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: activePods}
|
||||
newPodList(manager.podStore.Store, activePods, api.PodRunning, rc)
|
||||
newPodList(manager.podStore.Store, activePods, api.PodRunning, rc, "pod")
|
||||
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
@@ -274,9 +274,13 @@ func TestControllerUpdateReplicas(t *testing.T) {
|
||||
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
|
||||
rc := newReplicationController(5)
|
||||
manager.rcStore.Store.Add(rc)
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ObservedGeneration: 0}
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0}
|
||||
rc.Generation = 1
|
||||
newPodList(manager.podStore.Store, 4, api.PodRunning, rc)
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, rc, "pod")
|
||||
rcCopy := *rc
|
||||
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
|
||||
rcCopy.Spec.Selector = extraLabelMap
|
||||
newPodList(manager.podStore.Store, 2, api.PodRunning, &rcCopy, "podWithExtraLabel")
|
||||
|
||||
// This response body is just so we don't err out decoding the http response
|
||||
response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{})
|
||||
@@ -288,7 +292,9 @@ func TestControllerUpdateReplicas(t *testing.T) {
|
||||
manager.syncReplicationController(getKey(rc, t))
|
||||
|
||||
// 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod.
|
||||
// 2. Every update to the status should include the Generation of the spec.
|
||||
// 2. Status.FullyLabeledReplicas should equal to the number of pods that
|
||||
// has the extra labels, i.e., 2.
|
||||
// 3. Every update to the status should include the Generation of the spec.
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 4, ObservedGeneration: 1}
|
||||
|
||||
decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
|
||||
@@ -313,7 +319,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
|
||||
|
||||
controllerSpec := newReplicationController(2)
|
||||
manager.rcStore.Store.Add(controllerSpec)
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, controllerSpec)
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, controllerSpec, "pod")
|
||||
|
||||
// Creates a replica and sets expectations
|
||||
controllerSpec.Status.Replicas = 1
|
||||
@@ -488,7 +494,7 @@ func TestWatchPods(t *testing.T) {
|
||||
go manager.podController.Run(stopCh)
|
||||
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
|
||||
|
||||
pods := newPodList(nil, 1, api.PodRunning, testControllerSpec)
|
||||
pods := newPodList(nil, 1, api.PodRunning, testControllerSpec, "pod")
|
||||
testPod := pods.Items[0]
|
||||
testPod.Status.Phase = api.PodFailed
|
||||
fakeWatch.Add(&testPod)
|
||||
@@ -528,7 +534,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
manager.rcStore.Store.Add(&testControllerSpec2)
|
||||
|
||||
// Put one pod in the podStore
|
||||
pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, testControllerSpec1).Items[0]
|
||||
pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, testControllerSpec1, "pod").Items[0]
|
||||
pod2 := pod1
|
||||
pod2.Labels = testControllerSpec2.Spec.Selector
|
||||
|
||||
@@ -567,7 +573,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
|
||||
rc := newReplicationController(1)
|
||||
manager.rcStore.Store.Add(rc)
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 2}
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, rc)
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, rc, "pod")
|
||||
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
@@ -604,7 +610,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
})
|
||||
fakeRCClient := c.Core().ReplicationControllers("default")
|
||||
numReplicas := 10
|
||||
updateReplicaCount(fakeRCClient, *rc, numReplicas)
|
||||
updateReplicaCount(fakeRCClient, *rc, numReplicas, 0)
|
||||
updates, gets := 0, 0
|
||||
for _, a := range c.Actions() {
|
||||
if a.GetResource() != "replicationcontrollers" {
|
||||
@@ -651,7 +657,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
manager.rcStore.Store.Add(controllerSpec)
|
||||
|
||||
expectedPods := 0
|
||||
pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec)
|
||||
pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec, "pod")
|
||||
|
||||
rcKey, err := controller.KeyFunc(controllerSpec)
|
||||
if err != nil {
|
||||
@@ -767,7 +773,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
t.Fatalf("Unexpected number of active pods, expected %d, got %d", controllerSpec.Spec.Replicas, activePods)
|
||||
}
|
||||
// Replenish the pod list, since we cut it down sizing up
|
||||
pods = newPodList(nil, replicas, api.PodRunning, controllerSpec)
|
||||
pods = newPodList(nil, replicas, api.PodRunning, controllerSpec, "pod")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -799,7 +805,7 @@ func TestRCSyncExpectations(t *testing.T) {
|
||||
|
||||
controllerSpec := newReplicationController(2)
|
||||
manager.rcStore.Store.Add(controllerSpec)
|
||||
pods := newPodList(nil, 2, api.PodPending, controllerSpec)
|
||||
pods := newPodList(nil, 2, api.PodPending, controllerSpec, "pod")
|
||||
manager.podStore.Store.Add(&pods.Items[0])
|
||||
postExpectationsPod := pods.Items[1]
|
||||
|
||||
@@ -914,7 +920,7 @@ func TestOverlappingRCs(t *testing.T) {
|
||||
manager.rcStore.Store.Add(shuffledControllers[j])
|
||||
}
|
||||
// Add a pod and make sure only the oldest rc is synced
|
||||
pods := newPodList(nil, 1, api.PodPending, controllers[0])
|
||||
pods := newPodList(nil, 1, api.PodPending, controllers[0], "pod")
|
||||
rcKey := getKey(controllers[0], t)
|
||||
|
||||
manager.addPod(&pods.Items[0])
|
||||
@@ -936,7 +942,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Couldn't get key for object %+v: %v", controllerSpec, err)
|
||||
}
|
||||
pod := newPodList(nil, 1, api.PodPending, controllerSpec).Items[0]
|
||||
pod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
|
||||
pod.DeletionTimestamp = &unversioned.Time{time.Now()}
|
||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
||||
|
||||
@@ -956,7 +962,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||
|
||||
// An update from no deletion timestamp to having one should be treated
|
||||
// as a deletion.
|
||||
oldPod := newPodList(nil, 1, api.PodPending, controllerSpec).Items[0]
|
||||
oldPod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
|
||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
|
||||
|
@@ -19,17 +19,20 @@ limitations under the License.
|
||||
package replication
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
|
||||
)
|
||||
|
||||
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
|
||||
func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) {
|
||||
func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, controller api.ReplicationController, numReplicas, numFullyLabeledReplicas int) (updateErr error) {
|
||||
// This is the steady state. It happens when the rc doesn't have any expectations, since
|
||||
// we do a periodic relist every 30s. If the generations differ but the replicas are
|
||||
// the same, a caller might've resized to the same replica count.
|
||||
if controller.Status.Replicas == numReplicas &&
|
||||
controller.Status.FullyLabeledReplicas == numFullyLabeledReplicas &&
|
||||
controller.Generation == controller.Status.ObservedGeneration {
|
||||
return nil
|
||||
}
|
||||
@@ -41,10 +44,12 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface,
|
||||
|
||||
var getErr error
|
||||
for i, rc := 0, &controller; ; i++ {
|
||||
glog.V(4).Infof("Updating replica count for rc: %v, %d->%d (need %d), sequence No: %v->%v",
|
||||
controller.Name, controller.Status.Replicas, numReplicas, controller.Spec.Replicas, controller.Status.ObservedGeneration, generation)
|
||||
glog.V(4).Infof(fmt.Sprintf("Updating replica count for rc: %s/%s, ", controller.Namespace, controller.Name) +
|
||||
fmt.Sprintf("replicas %d->%d (need %d), ", controller.Status.Replicas, numReplicas, controller.Spec.Replicas) +
|
||||
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", controller.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
|
||||
fmt.Sprintf("sequence No: %v->%v", controller.Status.ObservedGeneration, generation))
|
||||
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: numReplicas, ObservedGeneration: generation}
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: numReplicas, FullyLabeledReplicas: numFullyLabeledReplicas, ObservedGeneration: generation}
|
||||
_, updateErr = rcClient.UpdateStatus(rc)
|
||||
if updateErr == nil || i >= statusUpdateRetries {
|
||||
return updateErr
|
||||
|
Reference in New Issue
Block a user