Refactor package controller

This commit is contained in:
Ananya Kumar
2015-07-27 18:21:37 -07:00
committed by System Administrator
parent a89aeeb55b
commit 47dd0bc6f9
12 changed files with 612 additions and 415 deletions

View File

@@ -0,0 +1,19 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package replication contains logic for watching and synchronizing
// replication controllers.
package replication

View File

@@ -0,0 +1,434 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replication
import (
"reflect"
"sort"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (
// We'll attempt to recompute the required replicas of all replication controllers
// the have fulfilled their expectations at least this often. This recomputation
// happens based on contents in local pod storage.
FullControllerResyncPeriod = 30 * time.Second
// If a watch misdelivers info about a pod, it'll take at least this long
// to rectify the number of replicas. Note that dropped deletes are only
// rectified after the expectation times out because we don't know the
// final resting state of the pod.
PodRelistPeriod = 5 * time.Minute
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// The number of times we retry updating a replication controller's status.
statusUpdateRetries = 1
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
type ReplicationManager struct {
kubeClient client.Interface
podControl controller.PodControlInterface
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncReplicationController for testing.
syncHandler func(rcKey string) error
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
// A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface
// A store of replication controllers, populated by the rcController
rcStore cache.StoreToReplicationControllerLister
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all replication controllers
rcController *framework.Controller
// Watches changes to all pods
podController *framework.Controller
// Controllers that need to be updated
queue *workqueue.Type
}
// NewReplicationManager creates a new ReplicationManager.
func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
rm := &ReplicationManager{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.New(),
}
rm.rcStore.Store, rm.rcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.ReplicationController{},
FullControllerResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
// We only really need to do this when spec changes, but for correctness it is safer to
// periodically double check. It is overkill for 2 reasons:
// 1. Status.Replica updates will cause a sync
// 2. Every 30s we will get a full resync (this will happen anyway every 5 minutes when pods relist)
// However, it shouldn't be that bad as rcs that haven't met expectations won't sync, and all
// the listing is done using local stores.
oldRC := old.(*api.ReplicationController)
curRC := cur.(*api.ReplicationController)
if oldRC.Status.Replicas != curRC.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas)
}
rm.enqueueController(cur)
},
// This will enter the sync loop and no-op, becuase the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
},
)
rm.podStore.Store, rm.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return rm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return rm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Pod{},
PodRelistPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
},
)
rm.syncHandler = rm.syncReplicationController
rm.podStoreSynced = rm.podController.HasSynced
return rm
}
// SetEventRecorder replaces the event recorder used by the replication manager
// with the given recorder. Only used for testing.
func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rm.podControl = controller.RealPodControl{rm.kubeClient, recorder}
}
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go rm.rcController.Run(stopCh)
go rm.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(rm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down RC Manager")
rm.queue.ShutDown()
}
// getPodControllers returns the controller managing the given pod.
// TODO: Surface that we are ignoring multiple controllers for a single pod.
func (rm *ReplicationManager) getPodControllers(pod *api.Pod) *api.ReplicationController {
controllers, err := rm.rcStore.GetPodControllers(pod)
if err != nil {
glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name)
return nil
}
// In theory, overlapping controllers is user error. This sorting will not prevent
// osciallation of replicas in all cases, eg:
// rc1 (older rc): [(k1:v1)], replicas=1 rc2: [(k2:v2), (k1:v1)], replicas=2
// pod: [(k1:v1)] will wake both rc1 and rc2, and we will sync rc1.
// pod: [(k2:v2), (k1:v1)] will wake rc2 which creates a new replica.
sort.Sort(overlappingControllers(controllers))
return &controllers[0]
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (rm *ReplicationManager) addPod(obj interface{}) {
pod := obj.(*api.Pod)
if rc := rm.getPodControllers(pod); rc != nil {
rcKey, err := controller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
return
}
rm.expectations.CreationObserved(rcKey)
rm.enqueueController(rc)
}
}
// When a pod is updated, figure out what controller/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
// and new controller. old and cur must be *api.Pod types.
func (rm *ReplicationManager) updatePod(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
// A periodic relist will send update events for all known pods.
return
}
// TODO: Write a unittest for this case
curPod := cur.(*api.Pod)
if rc := rm.getPodControllers(curPod); rc != nil {
rm.enqueueController(rc)
}
oldPod := old.(*api.Pod)
// Only need to get the old controller if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
// If the old and new rc are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldRC := rm.getPodControllers(oldPod); oldRC != nil {
rm.enqueueController(oldRC)
}
}
}
// When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) deletePod(obj interface{}) {
pod, ok := obj.(*api.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new rc will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout)
return
}
}
if rc := rm.getPodControllers(pod); rc != nil {
rcKey, err := controller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
return
}
rm.expectations.DeletionObserved(rcKey)
rm.enqueueController(rc)
}
}
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
rm.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
for {
func() {
key, quit := rm.queue.Get()
if quit {
return
}
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing replication controller: %v", err)
}
}()
}
}
// manageReplicas checks and updates replicas for the given replication controller.
func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) {
diff := len(filteredPods) - rc.Spec.Replicas
rcKey, err := controller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
return
}
if diff < 0 {
diff *= -1
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
rm.expectations.ExpectCreations(rcKey, diff)
wait := sync.WaitGroup{}
wait.Add(diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
if err := rm.podControl.CreateReplica(rc.Namespace, rc); err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey)
util.HandleError(err)
}
}()
}
wait.Wait()
} else if diff > 0 {
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
rm.expectations.ExpectDeletions(rcKey, diff)
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
// No need to sort pods if we are about to delete all of them
if rc.Spec.Replicas != 0 {
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey)
}
}(i)
}
wait.Wait()
}
}
// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (rm *ReplicationManager) syncReplicationController(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := rm.rcStore.Store.GetByKey(key)
if !exists {
glog.Infof("Replication Controller has been deleted %v", key)
rm.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve rc %v from store: %v", key, err)
rm.queue.Add(key)
return err
}
rc := *obj.(*api.ReplicationController)
if !rm.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing rc %v", rc.Name)
rm.enqueueController(&rc)
return nil
}
// Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the rc sync is just deferred till the next relist.
rcKey, err := controller.KeyFunc(&rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
return err
}
rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey)
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
return err
}
// TODO: Do this in a single pass, or use an index.
filteredPods := controller.FilterActivePods(podList.Items)
if rcNeedsSync {
rm.manageReplicas(filteredPods, &rc)
}
// Always updates status as pods come up or die.
if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(rc.Namespace), rc, len(filteredPods)); err != nil {
// Multiple things could lead to this update failing. Requeuing the controller ensures
// we retry with some fairness.
glog.V(2).Infof("Failed to update replica count for controller %v, requeuing", rc.Name)
rm.enqueueController(&rc)
}
return nil
}

View File

@@ -0,0 +1,977 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replication
import (
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/securitycontext"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
type FakePodControl struct {
controllerSpec []api.ReplicationController
deletePodName []string
lock sync.Mutex
err error
}
// Give each test that starts a background controller upto 1/2 a second.
// Since we need to start up a goroutine to test watch, this routine needs
// to get cpu before the test can complete. If the test is starved of cpu,
// the watch test will take upto 1/2 a second before timing out.
const controllerTimeout = 500 * time.Millisecond
var alwaysReady = func() bool { return true }
func init() {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
}
func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.err
}
f.controllerSpec = append(f.controllerSpec, *spec)
return nil
}
func (f *FakePodControl) DeletePod(namespace string, podName string) error {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.err
}
f.deletePodName = append(f.deletePodName, podName)
return nil
}
func (f *FakePodControl) clear() {
f.lock.Lock()
defer f.lock.Unlock()
f.deletePodName = []string{}
f.controllerSpec = []api.ReplicationController{}
}
func getKey(rc *api.ReplicationController, t *testing.T) string {
if key, err := controller.KeyFunc(rc); err != nil {
t.Errorf("Unexpected error getting key for rc %v: %v", rc.Name, err)
return ""
} else {
return key
}
}
func newReplicationController(replicas int) *api.ReplicationController {
rc := &api.ReplicationController{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
ObjectMeta: api.ObjectMeta{
UID: util.NewUUID(),
Name: "foobar",
Namespace: api.NamespaceDefault,
ResourceVersion: "18",
},
Spec: api.ReplicationControllerSpec{
Replicas: replicas,
Selector: map[string]string{"foo": "bar"},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "foo",
"type": "production",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo/bar",
TerminationMessagePath: api.TerminationMessagePathDefault,
ImagePullPolicy: api.PullIfNotPresent,
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
},
},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSDefault,
NodeSelector: map[string]string{
"baz": "blah",
},
},
},
},
}
return rc
}
// create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store.
func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController) *api.PodList {
pods := []api.Pod{}
for i := 0; i < count; i++ {
newPod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("pod%d", i),
Labels: rc.Spec.Selector,
Namespace: rc.Namespace,
},
Status: api.PodStatus{Phase: status},
}
if store != nil {
store.Add(&newPod)
}
pods = append(pods, newPod)
}
return &api.PodList{
Items: pods,
}
}
func validateSyncReplication(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) {
if len(fakePodControl.controllerSpec) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.controllerSpec))
}
if len(fakePodControl.deletePodName) != expectedDeletes {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName))
}
}
func replicationControllerResourceName() string {
return "replicationcontrollers"
}
type serverResponse struct {
statusCode int
obj interface{}
}
func makeTestServer(t *testing.T, namespace, name string, podResponse, controllerResponse, updateResponse serverResponse) (*httptest.Server, *util.FakeHandler) {
fakePodHandler := util.FakeHandler{
StatusCode: podResponse.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), podResponse.obj.(runtime.Object)),
}
fakeControllerHandler := util.FakeHandler{
StatusCode: controllerResponse.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), controllerResponse.obj.(runtime.Object)),
}
fakeUpdateHandler := util.FakeHandler{
StatusCode: updateResponse.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), updateResponse.obj.(runtime.Object)),
}
mux := http.NewServeMux()
mux.Handle(testapi.ResourcePath("pods", namespace, ""), &fakePodHandler)
mux.Handle(testapi.ResourcePath(replicationControllerResourceName(), "", ""), &fakeControllerHandler)
if namespace != "" {
mux.Handle(testapi.ResourcePath(replicationControllerResourceName(), namespace, ""), &fakeControllerHandler)
}
if name != "" {
mux.Handle(testapi.ResourcePath(replicationControllerResourceName(), namespace, name), &fakeUpdateHandler)
}
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.WriteHeader(http.StatusNotFound)
})
return httptest.NewServer(mux), &fakeUpdateHandler
}
func startManagerAndWait(manager *ReplicationManager, pods int, t *testing.T) chan struct{} {
stopCh := make(chan struct{})
go manager.Run(1, stopCh)
err := wait.Poll(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) {
podList, err := manager.podStore.List(labels.Everything())
if err != nil {
return false, err
}
return len(podList) == pods, nil
})
if err != nil {
t.Errorf("Failed to observe %d pods in 100ms", pods)
}
return stopCh
}
func TestSyncReplicationControllerDoesNothing(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
controllerSpec := newReplicationController(2)
manager.rcStore.Store.Add(controllerSpec)
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec)
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
}
func TestSyncReplicationControllerDeletes(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
// 2 running pods and a controller with 1 replica, one pod delete expected
controllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(controllerSpec)
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec)
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 1)
}
func TestDeleteFinalStateUnknown(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
received := make(chan string)
manager.syncHandler = func(key string) error {
received <- key
return nil
}
// The DeletedFinalStateUnknown object should cause the rc manager to insert
// the controller matching the selectors of the deleted pod into the work queue.
controllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(controllerSpec)
pods := newPodList(nil, 1, api.PodRunning, controllerSpec)
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
go manager.worker()
expected := getKey(controllerSpec, t)
select {
case key := <-received:
if key != expected {
t.Errorf("Unexpected sync all for rc %v, expected %v", key, expected)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
}
}
func TestSyncReplicationControllerCreates(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
controller := newReplicationController(2)
manager.rcStore.Store.Add(controller)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(controller, t))
validateSyncReplication(t, &fakePodControl, 2, 0)
}
func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// Setup a fake server to listen for requests, and run the rc manager in steady state
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: "",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Steady state for the replication controller, no Status.Replicas updates expected
activePods := 5
rc := newReplicationController(activePods)
manager.rcStore.Store.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: activePods}
newPodList(manager.podStore.Store, activePods, api.PodRunning, rc)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
if fakeHandler.RequestReceived != nil {
t.Errorf("Unexpected update when pods and rcs are in a steady state")
}
// This response body is just so we don't err out decoding the http response, all
// we care about is the request body sent below.
response := runtime.EncodeOrDie(testapi.Codec(), &api.ReplicationController{})
fakeHandler.ResponseBody = response
rc.Generation = rc.Generation + 1
manager.syncReplicationController(getKey(rc, t))
rc.Status.ObservedGeneration = rc.Generation
updatedRc := runtime.EncodeOrDie(testapi.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name), "PUT", &updatedRc)
}
func TestControllerUpdateReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: "",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(5)
manager.rcStore.Store.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ObservedGeneration: 0}
rc.Generation = 1
newPodList(manager.podStore.Store, 4, api.PodRunning, rc)
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Codec(), &api.ReplicationController{})
fakeHandler.ResponseBody = response
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(rc, t))
// 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod.
// 2. Every update to the status should include the Generation of the spec.
rc.Status = api.ReplicationControllerStatus{Replicas: 4, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name), "PUT", &decRc)
validateSyncReplication(t, &fakePodControl, 1, 0)
}
func TestSyncReplicationControllerDormancy(t *testing.T) {
// Setup a test server so we can lie about the current state of pods
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: "",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
manager.rcStore.Store.Add(controllerSpec)
newPodList(manager.podStore.Store, 1, api.PodRunning, controllerSpec)
// Creates a replica and sets expectations
controllerSpec.Status.Replicas = 1
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 1, 0)
// Expectations prevents replicas but not an update on status
controllerSpec.Status.Replicas = 0
fakePodControl.clear()
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
// Get the key for the controller
rcKey, err := controller.KeyFunc(controllerSpec)
if err != nil {
t.Errorf("Couldn't get key for object %+v: %v", controllerSpec, err)
}
// Lowering expectations should lead to a sync that creates a replica, however the
// fakePodControl error will prevent this, leaving expectations at 0, 0
manager.expectations.CreationObserved(rcKey)
controllerSpec.Status.Replicas = 1
fakePodControl.clear()
fakePodControl.err = fmt.Errorf("Fake Error")
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
// This replica should not need a Lowering of expectations, since the previous create failed
fakePodControl.err = nil
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 1, 0)
// 1 PUT for the rc status during dormancy window.
// Note that the pod creates go through pod control so they're not recorded.
fakeHandler.ValidateRequestCount(t, 1)
}
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}), BurstReplicas)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRCs []*api.ReplicationController
pod *api.Pod
outRCName string
}{
// pods without labels don't match any rcs
{
inRCs: []*api.ReplicationController{
{ObjectMeta: api.ObjectMeta{Name: "basic"}}},
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll}},
outRCName: "",
},
// Matching labels, not namespace
{
inRCs: []*api.ReplicationController{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ReplicationControllerSpec{
Selector: map[string]string{"foo": "bar"},
},
},
},
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
outRCName: "",
},
// Matching ns and labels returns the key to the rc, not the rc name
{
inRCs: []*api.ReplicationController{
{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
Spec: api.ReplicationControllerSpec{
Selector: map[string]string{"foo": "bar"},
},
},
},
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
outRCName: "bar",
},
}
for _, c := range testCases {
for _, r := range c.inRCs {
manager.rcStore.Add(r)
}
if rc := manager.getPodControllers(c.pod); rc != nil {
if c.outRCName != rc.Name {
t.Errorf("Got controller %+v expected %+v", rc.Name, c.outRCName)
}
} else if c.outRCName != "" {
t.Errorf("Expected a controller %v pod %v, found none", c.outRCName, c.pod.Name)
}
}
}
type FakeWatcher struct {
w *watch.FakeWatcher
*testclient.Fake
}
func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{Watch: fakeWatch}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
var testControllerSpec api.ReplicationController
received := make(chan string)
// The update sent through the fakeWatcher should make its way into the workqueue,
// and eventually into the syncHandler. The handler validates the received controller
// and closes the received channel to indicate that the test can finish.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Store.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
controllerSpec := *obj.(*api.ReplicationController)
if !api.Semantic.DeepDerivative(controllerSpec, testControllerSpec) {
t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec)
}
close(received)
return nil
}
// Start only the rc watcher and the workqueue, send a watch event,
// and make sure it hits the sync method.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.rcController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
testControllerSpec.Name = "foo"
fakeWatch.Add(&testControllerSpec)
select {
case <-received:
case <-time.After(controllerTimeout):
t.Errorf("Expected 1 call but got 0")
}
}
func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{Watch: fakeWatch}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Put one rc and one pod into the controller's stores
testControllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(testControllerSpec)
received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing rc and
// send it into the syncHandler.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Store.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
controllerSpec := obj.(*api.ReplicationController)
if !api.Semantic.DeepDerivative(controllerSpec, testControllerSpec) {
t.Errorf("\nExpected %#v,\nbut got %#v", testControllerSpec, controllerSpec)
}
close(received)
return nil
}
// Start only the pod watcher and the workqueue, send a watch event,
// and make sure it hits the sync method for the right rc.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(nil, 1, api.PodRunning, testControllerSpec)
testPod := pods.Items[0]
testPod.Status.Phase = api.PodFailed
fakeWatch.Add(&testPod)
select {
case <-received:
case <-time.After(controllerTimeout):
t.Errorf("Expected 1 call but got 0")
}
}
func TestUpdatePods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{Watch: fakeWatch}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
received := make(chan string)
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Store.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
received <- obj.(*api.ReplicationController).Name
return nil
}
stopCh := make(chan struct{})
defer close(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
// Put 2 rcs and one pod into the controller's stores
testControllerSpec1 := newReplicationController(1)
manager.rcStore.Store.Add(testControllerSpec1)
testControllerSpec2 := *testControllerSpec1
testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"}
testControllerSpec2.Name = "barfoo"
manager.rcStore.Store.Add(&testControllerSpec2)
// Put one pod in the podStore
pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, testControllerSpec1).Items[0]
pod2 := pod1
pod2.Labels = testControllerSpec2.Spec.Selector
// Send an update of the same pod with modified labels, and confirm we get a sync request for
// both controllers
manager.updatePod(&pod1, &pod2)
expected := util.NewStringSet(testControllerSpec1.Name, testControllerSpec2.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
select {
case got := <-received:
if !expected.Has(got) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(controllerTimeout):
t.Errorf("Expected update notifications for controllers within 100ms each")
}
}
}
func TestControllerUpdateRequeue(t *testing.T) {
// This server should force a requeue of the controller becuase it fails to update status.Replicas.
fakeHandler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
manager.rcStore.Store.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: 2}
newPodList(manager.podStore.Store, 1, api.PodRunning, rc)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(rc, t))
ch := make(chan interface{})
go func() {
item, _ := manager.queue.Get()
ch <- item
}()
select {
case key := <-ch:
expectedKey := getKey(rc, t)
if key != expectedKey {
t.Errorf("Expected requeue of controller with key %s got %s", expectedKey, key)
}
case <-time.After(controllerTimeout):
manager.queue.ShutDown()
t.Errorf("Expected to find an rc in the queue, found none.")
}
// 1 Update and 1 GET, both of which fail
fakeHandler.ValidateRequestCount(t, 2)
}
func TestControllerUpdateStatusWithFailure(t *testing.T) {
rc := newReplicationController(1)
fakeClient := &testclient.Fake{
ReactFn: func(f testclient.FakeAction) (runtime.Object, error) {
if f.Action == testclient.GetControllerAction {
return rc, nil
}
return &api.ReplicationController{}, fmt.Errorf("Fake error")
},
}
fakeRCClient := &testclient.FakeReplicationControllers{fakeClient, "default"}
numReplicas := 10
updateReplicaCount(fakeRCClient, *rc, numReplicas)
updates, gets := 0, 0
for _, a := range fakeClient.Actions {
switch a.Action {
case testclient.GetControllerAction:
gets++
// Make sure the get is for the right rc even though the update failed.
if s, ok := a.Value.(string); !ok || s != rc.Name {
t.Errorf("Expected get for rc %v, got %+v instead", rc.Name, s)
}
case testclient.UpdateControllerAction:
updates++
// Confirm that the update has the right status.Replicas even though the Get
// returned an rc with replicas=1.
if c, ok := a.Value.(*api.ReplicationController); !ok {
t.Errorf("Expected an rc as the argument to update, got %T", c)
} else if c.Status.Replicas != numReplicas {
t.Errorf("Expected update for rc to contain replicas %v, got %v instead",
numReplicas, c.Status.Replicas)
}
default:
t.Errorf("Unexpected action %+v", a)
break
}
}
if gets != 1 || updates != 2 {
t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates)
}
}
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, burstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(numReplicas)
manager.rcStore.Store.Add(controllerSpec)
expectedPods := 0
pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec)
rcKey, err := controller.KeyFunc(controllerSpec)
if err != nil {
t.Errorf("Couldn't get key for object %+v: %v", controllerSpec, err)
}
// Size up the controller, then size it down, and confirm the expected create/delete pattern
for _, replicas := range []int{numReplicas, 0} {
controllerSpec.Spec.Replicas = replicas
manager.rcStore.Store.Add(controllerSpec)
for i := 0; i < numReplicas; i += burstReplicas {
manager.syncReplicationController(getKey(controllerSpec, t))
// The store accrues active pods. It's also used by the rc to determine how many
// replicas to create.
activePods := len(manager.podStore.Store.List())
if replicas != 0 {
// This is the number of pods currently "in flight". They were created by the rc manager above,
// which then puts the rc to sleep till all of them have been observed.
expectedPods = replicas - activePods
if expectedPods > burstReplicas {
expectedPods = burstReplicas
}
// This validates the rc manager sync actually created pods
validateSyncReplication(t, &fakePodControl, expectedPods, 0)
// This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas.
for i := 0; i < expectedPods-1; i++ {
manager.podStore.Store.Add(&pods.Items[i])
manager.addPod(&pods.Items[i])
}
podExp, exists, err := manager.expectations.GetExpectations(rcKey)
if !exists || err != nil {
t.Fatalf("Did not find expectations for rc.")
}
if add, _ := podExp.GetExpectations(); add != 1 {
t.Fatalf("Expectations are wrong %v", podExp)
}
} else {
expectedPods = (replicas - activePods) * -1
if expectedPods > burstReplicas {
expectedPods = burstReplicas
}
validateSyncReplication(t, &fakePodControl, 0, expectedPods)
for i := 0; i < expectedPods-1; i++ {
manager.podStore.Store.Delete(&pods.Items[i])
manager.deletePod(&pods.Items[i])
}
podExp, exists, err := manager.expectations.GetExpectations(rcKey)
if !exists || err != nil {
t.Fatalf("Did not find expectations for rc.")
}
if _, del := podExp.GetExpectations(); del != 1 {
t.Fatalf("Expectations are wrong %v", podExp)
}
}
// Check that the rc didn't take any action for all the above pods
fakePodControl.clear()
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
// Create/Delete the last pod
// The last add pod will decrease the expectation of the rc to 0,
// which will cause it to create/delete the remaining replicas upto burstReplicas.
if replicas != 0 {
manager.podStore.Store.Add(&pods.Items[expectedPods-1])
manager.addPod(&pods.Items[expectedPods-1])
} else {
manager.podStore.Store.Delete(&pods.Items[expectedPods-1])
manager.deletePod(&pods.Items[expectedPods-1])
}
pods.Items = pods.Items[expectedPods:]
}
// Confirm that we've created the right number of replicas
activePods := len(manager.podStore.Store.List())
if activePods != controllerSpec.Spec.Replicas {
t.Fatalf("Unexpected number of active pods, expected %d, got %d", controllerSpec.Spec.Replicas, activePods)
}
// Replenish the pod list, since we cut it down sizing up
pods = newPodList(nil, replicas, api.PodRunning, controllerSpec)
}
}
func TestControllerBurstReplicas(t *testing.T) {
doTestControllerBurstReplicas(t, 5, 30)
doTestControllerBurstReplicas(t, 5, 12)
doTestControllerBurstReplicas(t, 3, 2)
}
type FakeRCExpectations struct {
*controller.ControllerExpectations
satisfied bool
expSatisfied func()
}
func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool {
fe.expSatisfied()
return fe.satisfied
}
// TestRCSyncExpectations tests that a pod cannot sneak in between counting active pods
// and checking expectations.
func TestRCSyncExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, 2)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
manager.rcStore.Store.Add(controllerSpec)
pods := newPodList(nil, 2, api.PodPending, controllerSpec)
manager.podStore.Store.Add(&pods.Items[0])
postExpectationsPod := pods.Items[1]
manager.expectations = FakeRCExpectations{
controller.NewControllerExpectations(), true, func() {
// If we check active pods before checking expectataions, the rc
// will create a new replica because it doesn't see this pod, but
// has fulfilled its expectations.
manager.podStore.Store.Add(&postExpectationsPod)
},
}
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
}
func TestDeleteControllerAndExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
manager := NewReplicationManager(client, 10)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
manager.rcStore.Store.Add(rc)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
// This should set expectations for the rc
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 1, 0)
fakePodControl.clear()
// Get the RC key
rcKey, err := controller.KeyFunc(rc)
if err != nil {
t.Errorf("Couldn't get key for object %+v: %v", rc, err)
}
// This is to simulate a concurrent addPod, that has a handle on the expectations
// as the controller deletes it.
podExp, exists, err := manager.expectations.GetExpectations(rcKey)
if !exists || err != nil {
t.Errorf("No expectations found for rc")
}
manager.rcStore.Delete(rc)
manager.syncReplicationController(getKey(rc, t))
if _, exists, err = manager.expectations.GetExpectations(rcKey); exists {
t.Errorf("Found expectaions, expected none since the rc has been deleted.")
}
// This should have no effect, since we've deleted the rc.
podExp.Seen(1, 0)
manager.podStore.Store.Replace(make([]interface{}, 0))
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
}
func TestRCManagerNotReady(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, 2)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
// Simulates the rc reflector running before the pod reflector. We don't
// want to end up creating replicas in this case until the pod reflector
// has synced, so the rc manager should just requeue the rc.
controllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(controllerSpec)
rcKey := getKey(controllerSpec, t)
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 0, 0)
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
}
manager.podStoreSynced = alwaysReady
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 1, 0)
}
// shuffle returns a new shuffled list of container controllers.
func shuffle(controllers []*api.ReplicationController) []*api.ReplicationController {
numControllers := len(controllers)
randIndexes := rand.Perm(numControllers)
shuffled := make([]*api.ReplicationController, numControllers)
for i := 0; i < numControllers; i++ {
shuffled[i] = controllers[randIndexes[i]]
}
return shuffled
}
func TestOverlappingRCs(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
for i := 0; i < 5; i++ {
manager := NewReplicationManager(client, 10)
manager.podStoreSynced = alwaysReady
// Create 10 rcs, shuffled them randomly and insert them into the rc manager's store
var controllers []*api.ReplicationController
for j := 1; j < 10; j++ {
controllerSpec := newReplicationController(1)
controllerSpec.CreationTimestamp = util.Date(2014, time.December, j, 0, 0, 0, 0, time.Local)
controllerSpec.Name = string(util.NewUUID())
controllers = append(controllers, controllerSpec)
}
shuffledControllers := shuffle(controllers)
for j := range shuffledControllers {
manager.rcStore.Store.Add(shuffledControllers[j])
}
// Add a pod and make sure only the oldest rc is synced
pods := newPodList(nil, 1, api.PodPending, controllers[0])
rcKey := getKey(controllers[0], t)
manager.addPod(&pods.Items[0])
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
}
}
}

View File

@@ -0,0 +1,72 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replication
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/golang/glog"
)
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) {
// This is the steady state. It happens when the rc doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if controller.Status.Replicas == numReplicas &&
controller.Generation == controller.Status.ObservedGeneration {
return nil
}
// Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry.
// TODO: This can clobber an update if we allow multiple agents to write to the
// same status.
generation := controller.Generation
var getErr error
for i, rc := 0, &controller; ; i++ {
glog.V(4).Infof("Updating replica count for rc: %v, %d->%d (need %d), sequence No: %v->%v",
controller.Name, controller.Status.Replicas, numReplicas, controller.Spec.Replicas, controller.Status.ObservedGeneration, generation)
rc.Status = api.ReplicationControllerStatus{Replicas: numReplicas, ObservedGeneration: generation}
_, updateErr = rcClient.Update(rc)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr
}
// Update the controller with the latest resource version for the next poll
if rc, getErr = rcClient.Get(controller.Name); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr
}
}
// Failed 2 updates one of which was with the latest controller, return the update error
return
}
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
type overlappingControllers []api.ReplicationController
func (o overlappingControllers) Len() int { return len(o) }
func (o overlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o overlappingControllers) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}