From 267dae6435e105c3553096c97458717fb1dc5da6 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Mon, 5 Dec 2016 02:14:46 +0100 Subject: [PATCH] controller: requeue replica sets for availability checks --- pkg/controller/replicaset/replica_set.go | 28 +++++++++++++++++++ .../replication/replication_controller.go | 28 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 0bbb9491f36..12efcbaedaf 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -343,8 +343,19 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { } } + changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) if curRS := rsc.getPodReplicaSet(curPod); curRS != nil { rsc.enqueueReplicaSet(curRS) + // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in + // the Pod status which in turn will trigger a requeue of the owning replica set thus + // having its status updated with the newly available replica. For now, we can fake the + // update by resyncing the controller MinReadySeconds after the it is requeued because + // a Pod transitioned to Ready. + // Note that this still suffers from #29229, we are just moving the problem one level + // "closer" to kubelet (from the deployment to the replica set controller). + if changedToReady && curRS.Spec.MinReadySeconds > 0 { + rsc.enqueueReplicaSetAfter(curRS, time.Duration(curRS.Spec.MinReadySeconds)*time.Second) + } } } @@ -398,6 +409,23 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { rsc.queue.Add(key) } +// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item. +func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + return + } + + // TODO: Handle overlapping replica sets better. Either disallow them at admission time or + // deterministically avoid syncing replica sets that fight over pods. Currently, we only + // ensure that the same replica set is synced for a given pod. When we periodically relist + // all replica sets there will still be some replica instability. One way to handle this is + // by querying the store for all replica sets that this replica set overlaps, as well as all + // replica sets that overlap this ReplicaSet, and sorting them. + rsc.queue.AddAfter(key, after) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rsc *ReplicaSetController) worker() { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index bcb5192d3d7..9c8a5f07f57 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -400,8 +400,19 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { } } + changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) if curRC := rm.getPodController(curPod); curRC != nil { rm.enqueueController(curRC) + // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in + // the Pod status which in turn will trigger a requeue of the owning replication controller + // thus having its status updated with the newly available replica. For now, we can fake the + // update by resyncing the controller MinReadySeconds after the it is requeued because a Pod + // transitioned to Ready. + // Note that this still suffers from #29229, we are just moving the problem one level + // "closer" to kubelet (from the deployment to the replication controller manager). + if changedToReady && curRC.Spec.MinReadySeconds > 0 { + rm.enqueueControllerAfter(curRC, time.Duration(curRC.Spec.MinReadySeconds)*time.Second) + } } } @@ -455,6 +466,23 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) { rm.queue.Add(key) } +// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item. +func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + + // TODO: Handle overlapping controllers better. Either disallow them at admission time or + // deterministically avoid syncing controllers that fight over pods. Currently, we only + // ensure that the same controller is synced for a given pod. When we periodically relist + // all controllers there will still be some replica instability. One way to handle this is + // by querying the store for all controllers that this rc overlaps, as well as all + // controllers that overlap this rc, and sorting them. + rm.queue.AddAfter(key, after) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() {