RcManager uses informers

This commit is contained in:
Prashanth Balasubramanian
2015-04-21 13:40:35 -07:00
parent c9f4d8e57e
commit 7592dabeba
11 changed files with 1160 additions and 361 deletions

View File

@@ -17,107 +17,67 @@ limitations under the License.
package controller
import (
"encoding/json"
"fmt"
"reflect"
"sort"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
var (
rcKeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
)
const (
// We'll attempt to recompute the required replicas of all replication controllers
// the have fulfilled their expectations at least this often.
FullControllerResyncPeriod = 30 * time.Second
// If a watch misdelivers info about a pod, it'll take this long
// to rectify the number of replicas.
PodRelistPeriod = 5 * time.Minute
// If a watch drops an (add, delete) event for a pod, it'll take this long
// before a dormant rc waiting for those packets is woken up anyway. This
// should typically be somewhere between the PodRelistPeriod and the
// FullControllerResyncPeriod. It is specifically targeted at the case
// where some problem prevents an update of expectations, without it the
// RC could stay asleep forever.
ExpectationsTimeout = 2 * time.Minute
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
type ReplicationManager struct {
kubeClient client.Interface
podControl PodControlInterface
syncTime <-chan time.Time
// To allow injection of syncReplicationController for testing.
syncHandler func(controller api.ReplicationController) error
}
// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
// createReplica creates new replicated pods according to the spec.
createReplica(namespace string, controller api.ReplicationController)
// deletePod deletes the pod identified by podID.
deletePod(namespace string, podID string) error
}
// RealPodControl is the default implementation of PodControllerInterface.
type RealPodControl struct {
kubeClient client.Interface
recorder record.EventRecorder
}
// Time period of main replication controller sync loop
const DefaultSyncPeriod = 5 * time.Second
const CreatedByAnnotation = "kubernetes.io/created-by"
func (r RealPodControl) createReplica(namespace string, controller api.ReplicationController) {
desiredLabels := make(labels.Set)
for k, v := range controller.Spec.Template.Labels {
desiredLabels[k] = v
}
desiredAnnotations := make(labels.Set)
for k, v := range controller.Spec.Template.Annotations {
desiredAnnotations[k] = v
}
createdByRef, err := api.GetReference(&controller)
if err != nil {
util.HandleError(fmt.Errorf("unable to get controller reference: %v", err))
return
}
createdByRefJson, err := json.Marshal(createdByRef)
if err != nil {
util.HandleError(fmt.Errorf("unable to serialize controller reference: %v", err))
return
}
desiredAnnotations[CreatedByAnnotation] = string(createdByRefJson)
// use the dash (if the name isn't too long) to make the pod name a bit prettier
prefix := fmt.Sprintf("%s-", controller.Name)
if ok, _ := validation.ValidatePodName(prefix, true); !ok {
prefix = controller.Name
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: desiredLabels,
Annotations: desiredAnnotations,
GenerateName: prefix,
},
}
if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil {
util.HandleError(fmt.Errorf("unable to convert pod template: %v", err))
return
}
if labels.Set(pod.Labels).AsSelector().Empty() {
util.HandleError(fmt.Errorf("unable to create pod replica, no labels"))
return
}
if _, err := r.kubeClient.Pods(namespace).Create(pod); err != nil {
r.recorder.Eventf(&controller, "failedCreate", "Error creating: %v", err)
util.HandleError(fmt.Errorf("unable to create pod replica: %v", err))
}
}
func (r RealPodControl) deletePod(namespace, podID string) error {
return r.kubeClient.Pods(namespace).Delete(podID)
syncHandler func(rcKey string) error
// A TTLCache of pod creates/deletes each rc expects to see
expectations *RCExpectations
// A store of controllers, populated by the rcController
controllerStore cache.StoreToControllerLister
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all replication controllers
rcController *framework.Controller
// Watches changes to all pods
podController *framework.Controller
// Controllers that need to be updated
queue *workqueue.Type
}
// NewReplicationManager creates a new ReplicationManager.
@@ -131,181 +91,254 @@ func NewReplicationManager(kubeClient client.Interface) *ReplicationManager {
kubeClient: kubeClient,
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
},
expectations: NewRCExpectations(),
queue: workqueue.New(),
}
rm.controllerStore.Store, rm.rcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.ReplicationController{},
FullControllerResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
// We only really need to do this when spec changes, but for correctness it is safer to
// periodically double check. It is overkill for 2 reasons:
// 1. Status.Replica updates will cause a sync
// 2. Every 30s we will get a full resync (this will happen anyway every 5 minutes when pods relist)
// However, it shouldn't be that bad as rcs that haven't met expectations won't sync, and all
// the listing is done using local stores.
oldRC := old.(*api.ReplicationController)
curRC := cur.(*api.ReplicationController)
if oldRC.Status.Replicas != curRC.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas)
}
rm.enqueueController(cur)
},
// This will enter the sync loop and no-op, becuase the controller has been deleted from the store.
// Note that deleting a controller immediately after resizing it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
},
)
rm.podStore.Store, rm.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return rm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return rm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Pod{},
PodRelistPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
},
)
rm.syncHandler = rm.syncReplicationController
return rm
}
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(period time.Duration) {
rm.syncTime = time.Tick(period)
resourceVersion := ""
go util.Forever(func() { rm.watchControllers(&resourceVersion) }, period)
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go rm.rcController.Run(stopCh)
go rm.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(rm.worker, time.Second, stopCh)
}
<-stopCh
rm.queue.ShutDown()
}
// resourceVersion is a pointer to the resource version to use/update.
func (rm *ReplicationManager) watchControllers(resourceVersion *string) {
watching, err := rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(
labels.Everything(),
fields.Everything(),
*resourceVersion,
)
// getPodControllers returns the controller managing the given pod.
// TODO: Surface that we are ignoring multiple controllers for a single pod.
func (rm *ReplicationManager) getPodControllers(pod *api.Pod) *api.ReplicationController {
controllers, err := rm.controllerStore.GetPodControllers(pod)
if err != nil {
util.HandleError(fmt.Errorf("unable to watch: %v", err))
time.Sleep(5 * time.Second)
glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name)
return nil
}
return &controllers[0]
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (rm *ReplicationManager) addPod(obj interface{}) {
pod := obj.(*api.Pod)
if rc := rm.getPodControllers(pod); rc != nil {
rm.expectations.CreationObserved(rc)
rm.enqueueController(rc)
}
}
// When a pod is updated, figure out what controller/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
// and new controller. old and cur must be *api.Pod types.
func (rm *ReplicationManager) updatePod(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
// A periodic relist will send update events for all known pods.
return
}
// TODO: Write a unittest for this case
curPod := cur.(*api.Pod)
if rc := rm.getPodControllers(curPod); rc != nil {
rm.enqueueController(rc)
}
oldPod := old.(*api.Pod)
// Only need to get the old controller if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
// If the old and new rc are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldRC := rm.getPodControllers(oldPod); oldRC != nil {
rm.enqueueController(oldRC)
}
}
}
// When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) deletePod(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
if rc := rm.getPodControllers(pod); rc != nil {
rm.expectations.DeletionObserved(rc)
rm.enqueueController(rc)
}
return
}
podKey, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
// A periodic relist might not have a pod that the store has, in such cases we are sent a tombstone key.
// We don't know which controllers to sync, so just let the controller relist handle this.
glog.Infof("Pod %q was deleted but we don't have a record of its final state so it could take up to %v before a controller recreates a replica.", podKey, ExpectationsTimeout)
}
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) enqueueController(obj interface{}) {
key, err := rcKeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
rm.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
for {
select {
case <-rm.syncTime:
rm.synchronize()
case event, open := <-watching.ResultChan():
if !open {
// watchChannel has been closed, or something else went
// wrong with our watch call. Let the util.Forever()
// that called us call us again.
func() {
key, quit := rm.queue.Get()
if quit {
return
}
if event.Type == watch.Error {
util.HandleError(fmt.Errorf("error from watch during sync: %v", errors.FromObject(event.Object)))
// Clear the resource version, this may cause us to skip some elements on the watch,
// but we'll catch them on the synchronize() call, so it works out.
*resourceVersion = ""
continue
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing replication controller: %v", err)
}
glog.V(4).Infof("Got watch: %#v", event)
rc, ok := event.Object.(*api.ReplicationController)
if !ok {
if status, ok := event.Object.(*api.Status); ok {
if status.Status == api.StatusFailure {
glog.Errorf("Failed to watch: %v", status)
// Clear resource version here, as above, this won't hurt consistency, but we
// should consider introspecting more carefully here. (or make the apiserver smarter)
// "why not both?"
*resourceVersion = ""
continue
}
}
util.HandleError(fmt.Errorf("unexpected object: %#v", event.Object))
continue
}
// If we get disconnected, start where we left off.
*resourceVersion = rc.ResourceVersion
// Sync even if this is a deletion event, to ensure that we leave
// it in the desired state.
glog.V(4).Infof("About to sync from watch: %q", rc.Name)
if err := rm.syncHandler(*rc); err != nil {
util.HandleError(fmt.Errorf("unexpected sync error: %v", err))
}
}
}()
}
}
// filterActivePods returns pods that have not terminated.
func filterActivePods(pods []api.Pod) []*api.Pod {
var result []*api.Pod
for i := range pods {
if api.PodSucceeded != pods[i].Status.Phase &&
api.PodFailed != pods[i].Status.Phase {
result = append(result, &pods[i])
}
}
return result
}
type activePods []*api.Pod
func (s activePods) Len() int { return len(s) }
func (s activePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s activePods) Less(i, j int) bool {
// Unassigned < assigned
if s[i].Spec.Host == "" && s[j].Spec.Host != "" {
return true
}
// PodPending < PodUnknown < PodRunning
m := map[api.PodPhase]int{api.PodPending: 0, api.PodUnknown: 1, api.PodRunning: 2}
if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
return m[s[i].Status.Phase] < m[s[j].Status.Phase]
}
// Not ready < ready
if !api.IsPodReady(s[i]) && api.IsPodReady(s[j]) {
return true
}
return false
}
func (rm *ReplicationManager) syncReplicationController(controller api.ReplicationController) error {
s := labels.Set(controller.Spec.Selector).AsSelector()
podList, err := rm.kubeClient.Pods(controller.Namespace).List(s, fields.Everything())
if err != nil {
return err
}
filteredList := filterActivePods(podList.Items)
numActivePods := len(filteredList)
diff := numActivePods - controller.Spec.Replicas
// manageReplicas checks and updates replicas for the given replication controller.
func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller *api.ReplicationController) {
diff := len(filteredPods) - controller.Spec.Replicas
if diff < 0 {
diff *= -1
rm.expectations.ExpectCreations(controller, diff)
wait := sync.WaitGroup{}
wait.Add(diff)
glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
rm.podControl.createReplica(controller.Namespace, controller)
if err := rm.podControl.createReplica(controller.Namespace, controller); err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
rm.expectations.CreationObserved(controller)
util.HandleError(err)
}
}()
}
wait.Wait()
} else if diff > 0 {
rm.expectations.ExpectDeletions(controller, diff)
glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff)
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(activePods(filteredList))
sort.Sort(activePods(filteredPods))
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
rm.podControl.deletePod(controller.Namespace, filteredList[ix].Name)
if err := rm.podControl.deletePod(controller.Namespace, filteredPods[ix].Name); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
rm.expectations.DeletionObserved(controller)
}
}(i)
}
wait.Wait()
}
if controller.Status.Replicas != numActivePods {
controller.Status.Replicas = numActivePods
_, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller)
if err != nil {
return err
}
}
// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (rm *ReplicationManager) syncReplicationController(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := rm.controllerStore.Store.GetByKey(key)
if !exists {
glog.Infof("Replication Controller has been deleted %v", key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve rc %v from store: %v", key, err)
rm.queue.Add(key)
return err
}
controller := *obj.(*api.ReplicationController)
podList, err := rm.podStore.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
return err
}
// TODO: Do this in a single pass, or use an index.
filteredPods := filterActivePods(podList.Items)
if rm.expectations.SatisfiedExpectations(&controller) {
rm.manageReplicas(filteredPods, &controller)
}
// Always updates status as pods come up or die
if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(controller.Namespace), &controller, len(filteredPods)); err != nil {
glog.V(2).Infof("Failed to update replica count for controller %v, will try on next sync", controller.Name)
}
return nil
}
func (rm *ReplicationManager) synchronize() {
// TODO: remove this method completely and rely on the watch.
// Add resource version tracking to watch to make this work.
var controllers []api.ReplicationController
list, err := rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything())
if err != nil {
util.HandleError(fmt.Errorf("synchronization error: %v", err))
return
}
controllers = list.Items
wg := sync.WaitGroup{}
wg.Add(len(controllers))
for ix := range controllers {
go func(ix int) {
defer wg.Done()
glog.V(4).Infof("periodic sync of %v", controllers[ix].Name)
err := rm.syncHandler(controllers[ix])
if err != nil {
util.HandleError(fmt.Errorf("error synchronizing: %v", err))
}
}(ix)
}
wg.Wait()
}