Add informers to deployment controller

This commit is contained in:
Sam Ghods
2015-09-21 00:06:45 -07:00
committed by Janet Kuo
parent 08c2cba266
commit b838d8ce18
7 changed files with 566 additions and 57 deletions

View File

@@ -22,77 +22,308 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
const (
// We'll attempt to recompute the required replicas of all deployments
// that have fulfilled their expectations at least this often. This recomputation
// happens based on contents in the local caches.
FullDeploymentResyncPeriod = 30 * time.Second
// We'll keep replication controller watches open up to this long. In the unlikely case
// that a watch misdelivers info about an RC, it'll take this long for
// that mistake to be rectified.
ControllerRelistPeriod = 5 * time.Minute
// We'll keep pod watches open up to this long. In the unlikely case
// that a watch misdelivers info about a pod, it'll take this long for
// that mistake to be rectified.
PodRelistPeriod = 5 * time.Minute
)
type DeploymentController struct {
client client.Interface
expClient client.ExtensionsInterface
eventRecorder record.EventRecorder
rcControl controller.RCControlInterface
// To allow injection of syncDeployment for testing.
syncHandler func(dKey string) error
// A store of deployments, populated by the dController
dStore cache.StoreToDeploymentLister
// Watches changes to all deployments
dController *framework.Controller
// A store of replication controllers, populated by the rcController
rcStore cache.StoreToReplicationControllerLister
// Watches changes to all replication controllers
rcController *framework.Controller
// rcStoreSynced returns true if the RC store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rcStoreSynced func() bool
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods
podController *framework.Controller
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
// Deployments that need to be synced
queue *workqueue.Type
}
func New(client client.Interface) *DeploymentController {
func NewDeploymentController(client client.Interface) *DeploymentController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(client.Events(""))
return &DeploymentController{
dc := &DeploymentController{
client: client,
expClient: client.Extensions(),
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(),
}
dc.dStore.Store, dc.dController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return dc.expClient.Deployments(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.expClient.Deployments(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options)
},
},
&extensions.Deployment{},
FullDeploymentResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: dc.enqueueDeployment,
UpdateFunc: func(old, cur interface{}) {
// Resync on deployment object relist.
dc.enqueueDeployment(cur)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the deployment.
DeleteFunc: dc.enqueueDeployment,
},
)
dc.rcStore.Store, dc.rcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return dc.client.ReplicationControllers(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options)
},
},
&api.ReplicationController{},
ControllerRelistPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: dc.addRC,
UpdateFunc: dc.updateRC,
DeleteFunc: dc.deleteRC,
},
)
// We do not event on anything from the podController, but we use the local
// podStore to make queries about the current state of pods (e.g. whether
// they are ready or not) more efficient.
dc.podStore.Store, dc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return dc.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options)
},
},
&api.Pod{},
PodRelistPeriod,
framework.ResourceEventHandlerFuncs{},
)
dc.syncHandler = dc.syncDeployment
return dc
}
// When an RC is created, enqueue the deployment that manages it.
func (dc *DeploymentController) addRC(obj interface{}) {
rc := obj.(*api.ReplicationController)
if d := dc.getDeploymentForRC(rc); rc != nil {
dc.enqueueDeployment(d)
}
}
func (d *DeploymentController) Run(syncPeriod time.Duration) {
go util.Until(func() {
errs := d.reconcileDeployments()
for _, err := range errs {
glog.Errorf("Failed to reconcile: %v", err)
}
}, syncPeriod, util.NeverStop)
}
func (d *DeploymentController) reconcileDeployments() []error {
list, err := d.expClient.Deployments(api.NamespaceAll).List(api.ListOptions{})
// getDeploymentForRC returns the deployment managing the given RC.
// TODO: Surface that we are ignoring multiple deployments for a given controller.
func (dc *DeploymentController) getDeploymentForRC(rc *api.ReplicationController) *extensions.Deployment {
deployments, err := dc.dStore.GetDeploymentsForRC(rc)
if err != nil {
return []error{fmt.Errorf("error listing deployments: %v", err)}
glog.V(4).Infof("No deployments found for replication controller %v, deployment controller will avoid syncing", rc.Name)
return nil
}
errs := []error{}
for _, deployment := range list.Items {
if err := d.reconcileDeployment(&deployment); err != nil {
errs = append(errs, fmt.Errorf("error in reconciling deployment %s: %v", deployment.Name, err))
// Because all RC's belonging to a deployment should have a unique label key,
// there should never be more than one deployment returned by the above method.
// If that happens we should probably dynamically repair the situation by ultimately
// trying to clean up one of the controllers, for now we just return one of the two,
// likely randomly.
return &deployments[0]
}
// When a controller is updated, figure out what deployment/s manage it and wake them
// up. If the labels of the controller have changed we need to awaken both the old
// and new deployments. old and cur must be *api.ReplicationController types.
func (dc *DeploymentController) updateRC(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
// A periodic relist will send update events for all known controllers.
return
}
// TODO: Write a unittest for this case
curRC := cur.(*api.ReplicationController)
if d := dc.getDeploymentForRC(curRC); d != nil {
dc.enqueueDeployment(d)
}
// A number of things could affect the old deployment: labels changing,
// pod template changing, etc.
oldRC := old.(*api.ReplicationController)
// TODO: Is this the right way to check this, or is checking names sufficient?
if !api.Semantic.DeepEqual(oldRC, curRC) {
if oldD := dc.getDeploymentForRC(oldRC); oldD != nil {
dc.enqueueDeployment(oldD)
}
}
return errs
}
func (d *DeploymentController) reconcileDeployment(deployment *extensions.Deployment) error {
switch deployment.Spec.Strategy.Type {
case extensions.RecreateDeploymentStrategyType:
return d.reconcileRecreateDeployment(*deployment)
case extensions.RollingUpdateDeploymentStrategyType:
return d.reconcileRollingUpdateDeployment(*deployment)
// When a controller is deleted, enqueue the deployment that manages it.
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown
// marker item.
func (dc *DeploymentController) deleteRC(obj interface{}) {
rc, ok := obj.(*api.ReplicationController)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the RC
// changed labels the new deployment will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a deployment recreates/updates controllers", obj, FullDeploymentResyncPeriod)
return
}
rc, ok = tombstone.Obj.(*api.ReplicationController)
if !ok {
glog.Errorf("Tombstone contained object that is not an rc %+v, could take up to %v before a deployment recreates/updates controllers", obj, FullDeploymentResyncPeriod)
return
}
}
if d := dc.getDeploymentForRC(rc); d != nil {
dc.enqueueDeployment(d)
}
return fmt.Errorf("unexpected deployment strategy type: %s", deployment.Spec.Strategy.Type)
}
func (d *DeploymentController) reconcileRecreateDeployment(deployment extensions.Deployment) error {
// obj could be an *api.Deployment, or a DeletionFinalStateUnknown marker item.
func (dc *DeploymentController) enqueueDeployment(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
// TODO: Handle overlapping deployments better. Either disallow them at admission time or
// deterministically avoid syncing deployments that fight over RC's. Currently, we only
// ensure that the same deployment is synced for a given RC. When we periodically relist
// all deployments there will still be some RC instability. One way to handle this is
// by querying the store for all deployments that this deployment overlaps, as well as all
// deployments that overlap this deployments, and sorting them.
dc.queue.Add(key)
}
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go dc.dController.Run(stopCh)
go dc.rcController.Run(stopCh)
go dc.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down deployment controller")
dc.queue.ShutDown()
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
for {
func() {
key, quit := dc.queue.Get()
if quit {
return
}
defer dc.queue.Done(key)
err := dc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing deployment: %v", err)
}
}()
}
}
func (dc *DeploymentController) syncDeployment(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := dc.dStore.Store.GetByKey(key)
if !exists {
glog.Infof("Deployment has been deleted %v", key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
dc.queue.Add(key)
return err
}
d := *obj.(*extensions.Deployment)
switch d.Spec.Strategy.Type {
case extensions.RecreateDeploymentStrategyType:
return dc.syncRecreateDeployment(d)
case extensions.RollingUpdateDeploymentStrategyType:
return dc.syncRollingUpdateDeployment(d)
}
return fmt.Errorf("Unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error {
// TODO: implement me.
return nil
}
func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment extensions.Deployment) error {
newRC, err := d.getNewRC(deployment)
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error {
newRC, err := dc.getNewRC(deployment)
if err != nil {
return err
}
oldRCs, err := d.getOldRCs(deployment)
oldRCs, err := dc.getOldRCs(deployment)
if err != nil {
return err
}
@@ -100,36 +331,32 @@ func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment exten
allRCs := append(oldRCs, newRC)
// Scale up, if we can.
scaledUp, err := d.reconcileNewRC(allRCs, newRC, deployment)
scaledUp, err := dc.reconcileNewRC(allRCs, newRC, deployment)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return d.updateDeploymentStatus(allRCs, newRC, deployment)
return dc.updateDeploymentStatus(allRCs, newRC, deployment)
}
// Scale down, if we can.
scaledDown, err := d.reconcileOldRCs(allRCs, oldRCs, newRC, deployment)
scaledDown, err := dc.reconcileOldRCs(allRCs, oldRCs, newRC, deployment)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return d.updateDeploymentStatus(allRCs, newRC, deployment)
return dc.updateDeploymentStatus(allRCs, newRC, deployment)
}
// TODO: raise an event, neither scaled up nor down.
return nil
}
func (d *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
return deploymentutil.GetOldRCs(deployment, d.client)
}
// Returns an RC that matches the intent of the given deployment.
// It creates a new RC if required.
func (d *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) {
existingNewRC, err := deploymentutil.GetNewRC(deployment, d.client)
func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) {
existingNewRC, err := deploymentutil.GetNewRC(deployment, dc.client)
if err != nil || existingNewRC != nil {
return existingNewRC, err
}
@@ -151,21 +378,55 @@ func (d *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.
Template: &newRCTemplate,
},
}
createdRC, err := d.client.ReplicationControllers(namespace).Create(&newRC)
createdRC, err := dc.client.ReplicationControllers(namespace).Create(&newRC)
if err != nil {
return nil, fmt.Errorf("error creating replication controller: %v", err)
}
return createdRC, nil
}
func (d *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
// TODO: (janet) HEAD >>> return deploymentutil.GetOldRCs(deployment, d.client)
namespace := deployment.ObjectMeta.Namespace
// 1. Find all pods whose labels match deployment.Spec.Selector
podList, err := dc.podStore.Pods(api.NamespaceAll).List(labels.SelectorFromSet(deployment.Spec.Selector))
if err != nil {
return nil, fmt.Errorf("error listing pods: %v", err)
}
// 2. Find the corresponding RCs for pods in podList.
oldRCs := map[string]api.ReplicationController{}
rcList, err := dc.rcStore.List()
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
}
for _, pod := range podList.Items {
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
for _, rc := range rcList {
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
if rcLabelsSelector.Matches(podLabelsSelector) {
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
if api.Semantic.DeepEqual(rc.Spec.Template, deploymentutil.GetNewRCTemplate(deployment)) {
continue
}
oldRCs[rc.ObjectMeta.Name] = rc
}
}
}
rcSlice := []*api.ReplicationController{}
for _, value := range oldRCs {
rcSlice = append(rcSlice, &value)
}
return rcSlice, nil
}
func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
if newRC.Spec.Replicas == deployment.Spec.Replicas {
// Scaling not required.
return false, nil
}
if newRC.Spec.Replicas > deployment.Spec.Replicas {
// Scale down.
_, err := d.scaleRCAndRecordEvent(newRC, deployment.Spec.Replicas, deployment)
_, err := dc.scaleRCAndRecordEvent(newRC, deployment.Spec.Replicas, deployment)
return true, err
}
// Check if we can scale up.
@@ -188,11 +449,11 @@ func (d *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationControlle
// Do not exceed the number of desired replicas.
scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas)))
newReplicasCount := newRC.Spec.Replicas + scaleUpCount
_, err = d.scaleRCAndRecordEvent(newRC, newReplicasCount, deployment)
_, err = dc.scaleRCAndRecordEvent(newRC, newReplicasCount, deployment)
return true, err
}
func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs)
if oldPodsCount == 0 {
// Cant scale down further
@@ -209,7 +470,7 @@ func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControll
minAvailable := deployment.Spec.Replicas - maxUnavailable
minReadySeconds := deployment.Spec.Strategy.RollingUpdate.MinReadySeconds
// Find the number of ready pods.
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(d.client, allRCs, minReadySeconds)
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, allRCs, minReadySeconds)
if err != nil {
return false, fmt.Errorf("could not find available pods: %v", err)
}
@@ -231,7 +492,7 @@ func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControll
// Scale down.
scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount)))
newReplicasCount := targetRC.Spec.Replicas - scaleDownCount
_, err = d.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment)
_, err = dc.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment)
if err != nil {
return false, err
}
@@ -240,7 +501,7 @@ func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControll
return true, err
}
func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) error {
func (dc *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) error {
totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs)
updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC})
newDeployment := deployment
@@ -249,29 +510,29 @@ func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationC
Replicas: totalReplicas,
UpdatedReplicas: updatedReplicas,
}
_, err := d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
_, err := dc.client.Extensions().Deployments(api.NamespaceAll).UpdateStatus(&newDeployment)
return err
}
func (d *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationController, newScale int, deployment extensions.Deployment) (*api.ReplicationController, error) {
func (dc *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationController, newScale int, deployment extensions.Deployment) (*api.ReplicationController, error) {
scalingOperation := "down"
if rc.Spec.Replicas < newScale {
scalingOperation = "up"
}
newRC, err := d.scaleRC(rc, newScale)
newRC, err := dc.scaleRC(rc, newScale)
if err == nil {
d.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale)
}
return newRC, err
}
func (d *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) {
func (dc *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) {
// TODO: Using client for now, update to use store when it is ready.
rc.Spec.Replicas = newScale
return d.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
return dc.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
}
func (d *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
// TODO: Using client for now, update to use store when it is ready.
return d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
return dc.client.Extensions().Deployments(api.NamespaceAll).Update(deployment)
}