Merge pull request #49488 from k82cn/k8s_46935
Automatic merge from submit-queue (batch tested with PRs 49488, 50407, 46105, 50456, 50258) Requeue DaemonSets if non-daemon pods were deleted. **What this PR does / why we need it**: Requeue DaemonSets if no daemon pods were deleted. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #46935 **Release note**: ```release-note None ```
This commit is contained in:
@@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1beta1"
|
||||
@@ -94,7 +95,8 @@ type DaemonSetsController struct {
|
||||
// To allow injection of syncDaemonSet for testing.
|
||||
syncHandler func(dsKey string) error
|
||||
// used for unit testing
|
||||
enqueueDaemonSet func(ds *extensions.DaemonSet)
|
||||
enqueueDaemonSet func(ds *extensions.DaemonSet)
|
||||
enqueueDaemonSetRateLimited func(ds *extensions.DaemonSet)
|
||||
// A TTLCache of pod creates/deletes each ds expects to see
|
||||
expectations controller.ControllerExpectationsInterface
|
||||
// dsLister can list/get daemonsets from the shared informer's store
|
||||
@@ -120,6 +122,11 @@ type DaemonSetsController struct {
|
||||
|
||||
// DaemonSet keys that need to be synced.
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
// The DaemonSet that has suspended pods on nodes; the key is node name, the value
|
||||
// is DaemonSet set that want to run pods but can't schedule in latest syncup cycle.
|
||||
suspendedDaemonPodsMutex sync.Mutex
|
||||
suspendedDaemonPods map[string]sets.String
|
||||
}
|
||||
|
||||
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController {
|
||||
@@ -141,9 +148,10 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo
|
||||
crControl: controller.RealControllerRevisionControl{
|
||||
KubeClient: kubeClient,
|
||||
},
|
||||
burstReplicas: BurstReplicas,
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
|
||||
burstReplicas: BurstReplicas,
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
|
||||
suspendedDaemonPods: map[string]sets.String{},
|
||||
}
|
||||
|
||||
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@@ -191,6 +199,7 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo
|
||||
|
||||
dsc.syncHandler = dsc.syncDaemonSet
|
||||
dsc.enqueueDaemonSet = dsc.enqueue
|
||||
dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
|
||||
return dsc
|
||||
}
|
||||
|
||||
@@ -267,6 +276,16 @@ func (dsc *DaemonSetsController) enqueue(ds *extensions.DaemonSet) {
|
||||
dsc.queue.Add(key)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) enqueueRateLimited(ds *extensions.DaemonSet) {
|
||||
key, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
|
||||
return
|
||||
}
|
||||
|
||||
dsc.queue.AddRateLimited(key)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
@@ -519,6 +538,67 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// listSuspendedDaemonPods lists the Daemon pods that 'want to run, but should not schedule'
|
||||
// for the node.
|
||||
func (dsc *DaemonSetsController) listSuspendedDaemonPods(node string) (dss []string) {
|
||||
dsc.suspendedDaemonPodsMutex.Lock()
|
||||
defer dsc.suspendedDaemonPodsMutex.Unlock()
|
||||
|
||||
if _, found := dsc.suspendedDaemonPods[node]; !found {
|
||||
return nil
|
||||
}
|
||||
|
||||
for k := range dsc.suspendedDaemonPods[node] {
|
||||
dss = append(dss, k)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// requeueSuspendedDaemonPods enqueues all DaemonSets which has pods that 'want to run,
|
||||
// but should not schedule' for the node; so DaemonSetController will sync up them again.
|
||||
func (dsc *DaemonSetsController) requeueSuspendedDaemonPods(node string) {
|
||||
dss := dsc.listSuspendedDaemonPods(node)
|
||||
for _, dsKey := range dss {
|
||||
if ns, name, err := cache.SplitMetaNamespaceKey(dsKey); err != nil {
|
||||
glog.Errorf("Failed to get DaemonSet's namespace and name from %s: %v", dsKey, err)
|
||||
continue
|
||||
} else if ds, err := dsc.dsLister.DaemonSets(ns).Get(name); err != nil {
|
||||
glog.Errorf("Failed to get DaemonSet %s/%s: %v", ns, name, err)
|
||||
continue
|
||||
} else {
|
||||
dsc.enqueueDaemonSetRateLimited(ds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addSuspendedDaemonPods adds DaemonSet which has pods that 'want to run,
|
||||
// but should not schedule' for the node to the suspended queue.
|
||||
func (dsc *DaemonSetsController) addSuspendedDaemonPods(node, ds string) {
|
||||
dsc.suspendedDaemonPodsMutex.Lock()
|
||||
defer dsc.suspendedDaemonPodsMutex.Unlock()
|
||||
|
||||
if _, found := dsc.suspendedDaemonPods[node]; !found {
|
||||
dsc.suspendedDaemonPods[node] = sets.NewString()
|
||||
}
|
||||
dsc.suspendedDaemonPods[node].Insert(ds)
|
||||
}
|
||||
|
||||
// removeSuspendedDaemonPods removes DaemonSet which has pods that 'want to run,
|
||||
// but should not schedule' for the node from suspended queue.
|
||||
func (dsc *DaemonSetsController) removeSuspendedDaemonPods(node, ds string) {
|
||||
dsc.suspendedDaemonPodsMutex.Lock()
|
||||
defer dsc.suspendedDaemonPodsMutex.Unlock()
|
||||
|
||||
if _, found := dsc.suspendedDaemonPods[node]; !found {
|
||||
return
|
||||
}
|
||||
dsc.suspendedDaemonPods[node].Delete(ds)
|
||||
|
||||
if len(dsc.suspendedDaemonPods[node]) == 0 {
|
||||
delete(dsc.suspendedDaemonPods, node)
|
||||
}
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
@@ -542,10 +622,18 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
||||
controllerRef := metav1.GetControllerOf(pod)
|
||||
if controllerRef == nil {
|
||||
// No controller should care about orphans being deleted.
|
||||
if len(pod.Spec.NodeName) != 0 {
|
||||
// If scheduled pods were deleted, requeue suspended daemon pods.
|
||||
dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
|
||||
}
|
||||
return
|
||||
}
|
||||
ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
|
||||
if ds == nil {
|
||||
if len(pod.Spec.NodeName) != 0 {
|
||||
// If scheduled pods were deleted, requeue suspended daemon pods.
|
||||
dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
|
||||
}
|
||||
return
|
||||
}
|
||||
dsKey, err := controller.KeyFunc(ds)
|
||||
@@ -729,20 +817,25 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, hash string) e
|
||||
var nodesNeedingDaemonPods, podsToDelete []string
|
||||
var failedPodsObserved int
|
||||
for _, node := range nodeList {
|
||||
_, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
|
||||
wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
daemonPods, exists := nodeToDaemonPods[node.Name]
|
||||
dsKey, _ := cache.MetaNamespaceKeyFunc(ds)
|
||||
dsc.removeSuspendedDaemonPods(node.Name, dsKey)
|
||||
|
||||
switch {
|
||||
case wantToRun && !shouldSchedule:
|
||||
// If daemon pod is supposed to run, but can not be scheduled, add to suspended list.
|
||||
dsc.addSuspendedDaemonPods(node.Name, dsKey)
|
||||
case shouldSchedule && !exists:
|
||||
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
|
||||
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
|
||||
case shouldContinueRunning:
|
||||
// If a daemon pod failed, delete it
|
||||
// If there's no daemon pods left on this node, we will create it in the next sync loop
|
||||
// If there's non-daemon pods left on this node, we will create it in the next sync loop
|
||||
var daemonPodsRunning []*v1.Pod
|
||||
for _, pod := range daemonPods {
|
||||
if pod.Status.Phase == v1.PodFailed {
|
||||
|
Reference in New Issue
Block a user