diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 8c707cc4b17..590d0054a65 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -24,7 +24,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" @@ -75,6 +74,9 @@ type DeploymentController struct { // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool + // A TTLCache of pod creates/deletes each deployment expects to see + expectations controller.ControllerExpectationsInterface + // Deployments that need to be synced queue *workqueue.Type } @@ -90,14 +92,15 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re expClient: client.Extensions(), eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}), queue: workqueue.New(), + expectations: controller.NewControllerExpectations(), } dc.dStore.Store, dc.dController = framework.NewInformer( &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return dc.expClient.Deployments(api.NamespaceAll).List(unversioned.ListOptions{}) + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.expClient.Deployments(api.NamespaceAll).List(options) }, - WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return dc.expClient.Deployments(api.NamespaceAll).Watch(options) }, }, @@ -116,10 +119,10 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re dc.rcStore.Store, dc.rcController = framework.NewInformer( &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return dc.client.ReplicationControllers(api.NamespaceAll).List(unversioned.ListOptions{}) + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.client.ReplicationControllers(api.NamespaceAll).List(options) }, - WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return dc.client.ReplicationControllers(api.NamespaceAll).Watch(options) }, }, @@ -132,15 +135,12 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re }, ) - // We do not event on anything from the podController, but we use the local - // podStore to make queries about the current state of pods (e.g. whether - // they are ready or not) more efficient. dc.podStore.Store, dc.podController = framework.NewInformer( &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return dc.client.Pods(api.NamespaceAll).List(unversioned.ListOptions{}) + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.client.Pods(api.NamespaceAll).List(options) }, - WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return dc.client.Pods(api.NamespaceAll).Watch(options) }, }, @@ -149,6 +149,8 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re framework.ResourceEventHandlerFuncs{ // When pod updates (becomes ready), we need to enqueue deployment UpdateFunc: dc.updatePod, + // When pod is deleted, we need to update deployment's expectations + DeleteFunc: dc.deletePod, }, ) @@ -226,7 +228,6 @@ func (dc *DeploymentController) updateRC(old, cur interface{}) { // marker item. func (dc *DeploymentController) deleteRC(obj interface{}) { rc, ok := obj.(*api.ReplicationController) - glog.V(4).Infof("Replication controller %s deleted.", rc.Name) // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains @@ -244,6 +245,7 @@ func (dc *DeploymentController) deleteRC(obj interface{}) { return } } + glog.V(4).Infof("Replication controller %s deleted.", rc.Name) if d := dc.getDeploymentForRC(rc); d != nil { dc.enqueueDeployment(d) } @@ -287,6 +289,37 @@ func (dc *DeploymentController) updatePod(old, cur interface{}) { } } +// When a pod is deleted, update expectations of the controller that manages the pod. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (dc *DeploymentController) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new rc will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout) + return + } + } + glog.V(4).Infof("Pod %s deleted.", pod.Name) + if d := dc.getDeploymentForPod(pod); d != nil { + dKey, err := controller.KeyFunc(d) + if err != nil { + glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err) + return + } + dc.expectations.DeletionObserved(dKey) + } +} + // obj could be an *api.Deployment, or a DeletionFinalStateUnknown marker item. func (dc *DeploymentController) enqueueDeployment(obj interface{}) { key, err := controller.KeyFunc(obj) @@ -338,6 +371,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { } if !exists { glog.Infof("Deployment has been deleted %v", key) + dc.expectations.DeleteExpectations(key) return nil } d := *obj.(*extensions.Deployment) @@ -379,7 +413,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension } // Scale down, if we can. - scaledDown, err := dc.reconcileOldRCs(allRCs, oldRCs, newRC, deployment) + scaledDown, err := dc.reconcileOldRCs(allRCs, oldRCs, newRC, deployment, true) if err != nil { return err } @@ -401,11 +435,11 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) { return deploymentutil.GetOldRCsFromLists(deployment, dc.client, - func(namespace string, options unversioned.ListOptions) (*api.PodList, error) { + func(namespace string, options api.ListOptions) (*api.PodList, error) { podList, err := dc.podStore.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector)) return &podList, err }, - func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { return dc.rcStore.List() }) } @@ -414,7 +448,7 @@ func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]* // It creates a new RC if required. func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) { existingNewRC, err := deploymentutil.GetNewRCFromList(deployment, dc.client, - func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { return dc.rcStore.List() }) if err != nil || existingNewRC != nil { @@ -479,7 +513,8 @@ func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationControll return true, err } -func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) { +// Set expectationsCheck to false to bypass expectations check when testing +func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment, expectationsCheck bool) (bool, error) { oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs) if oldPodsCount == 0 { // Cant scale down further @@ -495,6 +530,15 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl // Check if we can scale down. minAvailable := deployment.Spec.Replicas - maxUnavailable minReadySeconds := deployment.Spec.Strategy.RollingUpdate.MinReadySeconds + // Check the expectations of deployment before counting available pods + dKey, err := controller.KeyFunc(&deployment) + if err != nil { + return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err) + } + if expectationsCheck && !dc.expectations.SatisfiedExpectations(dKey) { + fmt.Printf("Expectations not met yet before reconciling old RCs\n") + return false, nil + } // Find the number of ready pods. readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, allRCs, minReadySeconds) if err != nil { @@ -523,6 +567,13 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl return false, err } totalScaleDownCount -= scaleDownCount + dKey, err := controller.KeyFunc(&deployment) + if err != nil { + return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err) + } + if expectationsCheck { + dc.expectations.ExpectDeletions(dKey, scaleDownCount) + } } return true, err } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 9dd0e5a6086..df8403c2d82 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -194,7 +194,7 @@ func TestDeploymentController_reconcileOldRCs(t *testing.T) { client: fake, eventRecorder: &record.FakeRecorder{}, } - scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment) + scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment, false) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -267,7 +267,7 @@ var alwaysReady = func() bool { return true } func newDeployment(replicas int) *exp.Deployment { d := exp.Deployment{ - TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.Version()}, + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, ObjectMeta: api.ObjectMeta{ UID: util.NewUUID(), Name: "foobar", diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 873f9af17fa..6ea68d16c19 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -210,7 +210,7 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon // overlap, sort by creation timestamp, subsort by name, then pick // the first. glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels) - sort.Sort(overlappingControllers(controllers)) + sort.Sort(OverlappingControllers(controllers)) } return &controllers[0] } diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go index 5033c29011c..b300e092205 100644 --- a/pkg/controller/replication/replication_controller_utils.go +++ b/pkg/controller/replication/replication_controller_utils.go @@ -57,12 +57,12 @@ func updateReplicaCount(rcClient client.ReplicationControllerInterface, controll } // OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker. -type overlappingControllers []api.ReplicationController +type OverlappingControllers []api.ReplicationController -func (o overlappingControllers) Len() int { return len(o) } -func (o overlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o OverlappingControllers) Len() int { return len(o) } +func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } -func (o overlappingControllers) Less(i, j int) bool { +func (o OverlappingControllers) Less(i, j int) bool { if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { return o[i].Name < o[j].Name } diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 2cfd5843ea0..b7e797bb791 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -22,7 +22,6 @@ import ( "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" @@ -32,17 +31,17 @@ import ( // GetOldRCs returns the old RCs targeted by the given Deployment; get PodList and RCList from client interface. func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, error) { return GetOldRCsFromLists(deployment, c, - func(namespace string, options unversioned.ListOptions) (*api.PodList, error) { + func(namespace string, options api.ListOptions) (*api.PodList, error) { return c.Pods(namespace).List(options) }, - func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { rcList, err := c.ReplicationControllers(namespace).List(options) return rcList.Items, err }) } // GetOldRCsFromLists returns the old RCs targeted by the given Deployment; get PodList and RCList with input functions. -func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, unversioned.ListOptions) (*api.PodList, error), getRcList func(string, unversioned.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, error) { +func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace // 1. Find all pods whose labels match deployment.Spec.Selector selector := labels.SelectorFromSet(deployment.Spec.Selector) @@ -87,7 +86,7 @@ func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, ge // Returns nil if the new RC doesnt exist yet. func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.ReplicationController, error) { return GetNewRCFromList(deployment, c, - func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { rcList, err := c.ReplicationControllers(namespace).List(options) return rcList.Items, err }) @@ -95,7 +94,7 @@ func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.Replic // GetNewRCFromList returns an RC that matches the intent of the given deployment; get RCList with the input function. // Returns nil if the new RC doesnt exist yet. -func GetNewRCFromList(deployment extensions.Deployment, c client.Interface, getRcList func(string, unversioned.ListOptions) ([]api.ReplicationController, error)) (*api.ReplicationController, error) { +func GetNewRCFromList(deployment extensions.Deployment, c client.Interface, getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) (*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace rcList, err := getRcList(namespace, api.ListOptions{}) if err != nil {