kube-controller-manager: Decouple TaintManager from NodeLifeCycleController (KEP-3902)

This commit is contained in:
Andrea Tosatto
2023-07-10 14:02:56 +01:00
parent 38ed3ef7b7
commit ccda2d6fd4
14 changed files with 556 additions and 224 deletions

View File

@@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsv1informers "k8s.io/client-go/informers/apps/v1"
coordinformers "k8s.io/client-go/informers/coordination/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
@@ -54,7 +55,9 @@ import (
kubeletapis "k8s.io/kubelet/pkg/apis"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
"k8s.io/kubernetes/pkg/controller/tainteviction"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/features"
taintutils "k8s.io/kubernetes/pkg/util/taints"
)
@@ -127,6 +130,13 @@ const (
// podUpdateWorkerSizes assumes that in most cases pod will be handled by monitorNodeHealth pass.
// Pod update workers will only handle lagging cache pods. 4 workers should be enough.
podUpdateWorkerSize = 4
// nodeUpdateWorkerSize defines the size of workers for node update or/and pod update.
nodeUpdateWorkerSize = 8
// taintEvictionController is defined here in order to prevent imports of
// k8s.io/kubernetes/cmd/kube-controller-manager/names which would result in validation errors.
// This constant will be removed upon graduation of the SeparateTaintEvictionController feature.
taintEvictionController = "taint-eviction-controller"
)
// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
@@ -207,7 +217,7 @@ type podUpdateItem struct {
// Controller is the controller that manages node's life cycle.
type Controller struct {
taintManager *scheduler.NoExecuteTaintManager
taintManager *tainteviction.Controller
podLister corelisters.PodLister
podInformerSynced cache.InformerSynced
@@ -326,7 +336,7 @@ func NewNodeLifecycleController(
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeUpdateWorkerSize: scheduler.UpdateWorkerSize,
nodeUpdateWorkerSize: nodeUpdateWorkerSize,
zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
nodesToRetry: sync.Map{},
zoneStates: make(map[string]ZoneState),
@@ -346,17 +356,11 @@ func NewNodeLifecycleController(
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
nc.podUpdated(nil, pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(nil, pod)
}
},
UpdateFunc: func(prev, obj interface{}) {
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
nc.podUpdated(prevPod, newPod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(prevPod, newPod)
}
},
DeleteFunc: func(obj interface{}) {
pod, isPod := obj.(*v1.Pod)
@@ -374,9 +378,6 @@ func NewNodeLifecycleController(
}
}
nc.podUpdated(pod, nil)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(pod, nil)
}
},
})
nc.podInformerSynced = podInformer.Informer().HasSynced
@@ -412,21 +413,14 @@ func NewNodeLifecycleController(
nc.podLister = podInformer.Lister()
nc.nodeLister = nodeInformer.Lister()
nc.taintManager = scheduler.NewNoExecuteTaintManager(ctx, kubeClient, nc.podLister, nc.nodeLister, nc.getPodsAssignedToNode)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
})
if !utilfeature.DefaultFeatureGate.Enabled(features.SeparateTaintEvictionController) {
logger.Info("Running TaintEvictionController as part of NodeLifecyleController")
tm, err := tainteviction.New(ctx, kubeClient, podInformer, nodeInformer, taintEvictionController)
if err != nil {
return nil, err
}
nc.taintManager = tm
}
logger.Info("Controller will reconcile labels")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -480,10 +474,13 @@ func (nc *Controller) Run(ctx context.Context) {
return
}
go nc.taintManager.Run(ctx)
if !utilfeature.DefaultFeatureGate.Enabled(features.SeparateTaintEvictionController) {
logger.Info("Starting", "controller", taintEvictionController)
go nc.taintManager.Run(ctx)
}
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
for i := 0; i < nodeUpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and

View File

@@ -1,535 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"fmt"
"hash/fnv"
"io"
"math"
"sync"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/klog/v2"
)
const (
// TODO (k82cn): Figure out a reasonable number of workers/channels and propagate
// the number of workers up making it a parameter of Run() function.
// NodeUpdateChannelSize defines the size of channel for node update events.
NodeUpdateChannelSize = 10
// UpdateWorkerSize defines the size of workers for node update or/and pod update.
UpdateWorkerSize = 8
podUpdateChannelSize = 1
retries = 5
)
type nodeUpdateItem struct {
nodeName string
}
type podUpdateItem struct {
podName string
podNamespace string
nodeName string
}
func hash(val string, max int) int {
hasher := fnv.New32a()
io.WriteString(hasher, val)
return int(hasher.Sum32() % uint32(max))
}
// GetPodsByNodeNameFunc returns the list of pods assigned to the specified node.
type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
// NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
// from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct {
client clientset.Interface
broadcaster record.EventBroadcaster
recorder record.EventRecorder
podLister corelisters.PodLister
nodeLister corelisters.NodeLister
getPodsAssignedToNode GetPodsByNodeNameFunc
taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLock sync.Mutex
taintedNodes map[string][]v1.Taint
nodeUpdateChannels []chan nodeUpdateItem
podUpdateChannels []chan podUpdateItem
nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.Interface
}
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(ctx context.Context, args *WorkArgs) error {
return func(ctx context.Context, args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.FromContext(ctx).Info("NoExecuteTaintManager is deleting pod", "pod", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = addConditionAndDeletePod(ctx, c, name, ns)
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
})
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
}
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
}
func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
result := []v1.Taint{}
for i := range taints {
if taints[i].Effect == v1.TaintEffectNoExecute {
result = append(result, taints[i])
}
}
return result
}
// getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite.
func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
minTolerationTime := int64(math.MaxInt64)
if len(tolerations) == 0 {
return 0
}
for i := range tolerations {
if tolerations[i].TolerationSeconds != nil {
tolerationSeconds := *(tolerations[i].TolerationSeconds)
if tolerationSeconds <= 0 {
return 0
} else if tolerationSeconds < minTolerationTime {
minTolerationTime = tolerationSeconds
}
}
}
if minTolerationTime == int64(math.MaxInt64) {
return -1
}
return time.Duration(minTolerationTime) * time.Second
}
// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, podLister corelisters.PodLister, nodeLister corelisters.NodeLister, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
tm := &NoExecuteTaintManager{
client: c,
broadcaster: eventBroadcaster,
recorder: recorder,
podLister: podLister,
nodeLister: nodeLister,
getPodsAssignedToNode: getPodsAssignedToNode,
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),
podUpdateQueue: workqueue.NewNamed("noexec_taint_pod"),
}
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
return tm
}
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
logger.Info("Starting NoExecuteTaintManager")
// Start events processing pipeline.
tc.broadcaster.StartStructuredLogging(0)
if tc.client != nil {
logger.Info("Sending events to api server")
tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
} else {
logger.Error(nil, "kubeClient is nil when starting NodeController")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
defer tc.broadcaster.Shutdown()
defer tc.nodeUpdateQueue.ShutDown()
defer tc.podUpdateQueue.ShutDown()
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(ctx.Done())
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
// The fact that pods are processed by the same worker as nodes is used to avoid races
// between node worker setting tc.taintedNodes and pod worker reading this to decide
// whether to delete pod.
// It's possible that even without this assumption this code is still correct.
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
}
}(ctx.Done())
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(ctx, i, wg.Done, ctx.Done())
}
wg.Wait()
}
func (tc *NoExecuteTaintManager) worker(ctx context.Context, worker int, done func(), stopCh <-chan struct{}) {
defer done()
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(ctx, nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(ctx, nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(ctx, podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}
// PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
podName := ""
podNamespace := ""
nodeName := ""
oldTolerations := []v1.Toleration{}
if oldPod != nil {
podName = oldPod.Name
podNamespace = oldPod.Namespace
nodeName = oldPod.Spec.NodeName
oldTolerations = oldPod.Spec.Tolerations
}
newTolerations := []v1.Toleration{}
if newPod != nil {
podName = newPod.Name
podNamespace = newPod.Namespace
nodeName = newPod.Spec.NodeName
newTolerations = newPod.Spec.Tolerations
}
if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
return
}
updateItem := podUpdateItem{
podName: podName,
podNamespace: podNamespace,
nodeName: nodeName,
}
tc.podUpdateQueue.Add(updateItem)
}
// NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
nodeName := ""
oldTaints := []v1.Taint{}
if oldNode != nil {
nodeName = oldNode.Name
oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
}
newTaints := []v1.Taint{}
if newNode != nil {
nodeName = newNode.Name
newTaints = getNoExecuteTaints(newNode.Spec.Taints)
}
if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
return
}
updateItem := nodeUpdateItem{
nodeName: nodeName,
}
tc.nodeUpdateQueue.Add(updateItem)
}
func (tc *NoExecuteTaintManager) cancelWorkWithEvent(logger klog.Logger, nsName types.NamespacedName) {
if tc.taintEvictionQueue.CancelWork(logger, nsName.String()) {
tc.emitCancelPodDeletionEvent(nsName)
}
}
func (tc *NoExecuteTaintManager) processPodOnNode(
ctx context.Context,
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
logger := klog.FromContext(ctx)
if len(taints) == 0 {
tc.cancelWorkWithEvent(logger, podNamespacedName)
}
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
logger.V(2).Info("Not all taints are tolerated after update for pod on node", "pod", podNamespacedName.String(), "node", klog.KRef("", nodeName))
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
tc.cancelWorkWithEvent(logger, podNamespacedName)
tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime < 0 {
logger.V(4).Info("Current tolerations for pod tolerate forever, cancelling any scheduled deletion", "pod", podNamespacedName.String())
tc.cancelWorkWithEvent(logger, podNamespacedName)
return
}
startTime := now
triggerTime := startTime.Add(minTolerationTime)
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
}
tc.cancelWorkWithEvent(logger, podNamespacedName)
}
tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
func (tc *NoExecuteTaintManager) handlePodUpdate(ctx context.Context, podUpdate podUpdateItem) {
pod, err := tc.podLister.Pods(podUpdate.podNamespace).Get(podUpdate.podName)
logger := klog.FromContext(ctx)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
logger.V(4).Info("Noticed pod deletion", "pod", podNamespacedName)
tc.cancelWorkWithEvent(logger, podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
logger.V(4).Info("Noticed pod update", "pod", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
tc.processPodOnNode(ctx, podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
func (tc *NoExecuteTaintManager) handleNodeUpdate(ctx context.Context, nodeUpdate nodeUpdateItem) {
node, err := tc.nodeLister.Get(nodeUpdate.nodeName)
logger := klog.FromContext(ctx)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
logger.V(4).Info("Noticed node deletion", "node", klog.KRef("", nodeUpdate.nodeName))
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// Create or Update
logger.V(4).Info("Noticed node update", "node", klog.KObj(node))
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
logger.V(4).Info("Updating known taints on node", "node", klog.KObj(node), "taints", taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
pods, err := tc.getPodsAssignedToNode(node.Name)
if err != nil {
logger.Error(err, "Failed to get pods assigned to node", "node", klog.KObj(node))
return
}
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
logger.V(4).Info("All taints were removed from the node. Cancelling all evictions...", "node", klog.KObj(node))
for i := range pods {
tc.cancelWorkWithEvent(logger, types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
for _, pod := range pods {
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(ctx, podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
func (tc *NoExecuteTaintManager) emitPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
}
func (tc *NoExecuteTaintManager) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
}

View File

@@ -1,962 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"fmt"
"sort"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features"
)
var timeForControllerToProgressForSanityCheck = 20 * time.Millisecond
func getPodsAssignedToNode(ctx context.Context, c *fake.Clientset) GetPodsByNodeNameFunc {
return func(nodeName string) ([]*v1.Pod, error) {
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
return []*v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
}
rPods := make([]*v1.Pod, len(pods.Items))
for i := range pods.Items {
rPods[i] = &pods.Items[i]
}
return rPods, nil
}
}
func createNoExecuteTaint(index int) v1.Taint {
now := metav1.Now()
return v1.Taint{
Key: "testTaint" + fmt.Sprintf("%v", index),
Value: "test" + fmt.Sprintf("%v", index),
Effect: v1.TaintEffectNoExecute,
TimeAdded: &now,
}
}
func addToleration(pod *v1.Pod, index int, duration int64) *v1.Pod {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if duration < 0 {
pod.Spec.Tolerations = []v1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: v1.TaintEffectNoExecute}}
} else {
pod.Spec.Tolerations = []v1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: v1.TaintEffectNoExecute, TolerationSeconds: &duration}}
}
return pod
}
func addTaintsToNode(node *v1.Node, key, value string, indices []int) *v1.Node {
taints := []v1.Taint{}
for _, index := range indices {
taints = append(taints, createNoExecuteTaint(index))
}
node.Spec.Taints = taints
return node
}
func setupNewNoExecuteTaintManager(ctx context.Context, fakeClientSet *fake.Clientset) (*NoExecuteTaintManager, cache.Indexer, cache.Indexer) {
informerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0)
podIndexer := informerFactory.Core().V1().Pods().Informer().GetIndexer()
nodeIndexer := informerFactory.Core().V1().Nodes().Informer().GetIndexer()
mgr := NewNoExecuteTaintManager(ctx, fakeClientSet, informerFactory.Core().V1().Pods().Lister(), informerFactory.Core().V1().Nodes().Lister(), getPodsAssignedToNode(ctx, fakeClientSet))
return mgr, podIndexer, nodeIndexer
}
type timestampedPod struct {
names []string
timestamp time.Duration
}
type durationSlice []timestampedPod
func (a durationSlice) Len() int { return len(a) }
func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a durationSlice) Less(i, j int) bool { return a[i].timestamp < a[j].timestamp }
func TestFilterNoExecuteTaints(t *testing.T) {
taints := []v1.Taint{
{
Key: "one",
Value: "one",
Effect: v1.TaintEffectNoExecute,
},
{
Key: "two",
Value: "two",
Effect: v1.TaintEffectNoSchedule,
},
}
taints = getNoExecuteTaints(taints)
if len(taints) != 1 || taints[0].Key != "one" {
t.Errorf("Filtering doesn't work. Got %v", taints)
}
}
func TestCreatePod(t *testing.T) {
testCases := []struct {
description string
pod *v1.Pod
taintedNodes map[string][]v1.Taint
expectPatch bool
expectDelete bool
enablePodDisruptionConditions bool
}{
{
description: "not scheduled - ignore",
pod: testutil.NewPod("pod1", ""),
taintedNodes: map[string][]v1.Taint{},
expectDelete: false,
},
{
description: "scheduled on untainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{},
expectDelete: false,
},
{
description: "schedule on tainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
{
description: "schedule on tainted Node; PodDisruptionConditions enabled",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
enablePodDisruptionConditions: true,
},
{
description: "schedule on tainted Node with finite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite ivalid toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 2, -1),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)()
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*item.pod}})
controller, podIndexer, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = item.taintedNodes
podIndexer.Add(item.pod)
controller.PodUpdated(nil, item.pod)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
func TestDeletePod(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
}
controller.PodUpdated(testutil.NewPod("pod1", "node1"), nil)
// wait a bit to see if nothing will panic
time.Sleep(timeForControllerToProgressForSanityCheck)
}
func TestUpdatePod(t *testing.T) {
testCases := []struct {
description string
prevPod *v1.Pod
awaitForScheduledEviction bool
newPod *v1.Pod
taintedNodes map[string][]v1.Taint
expectPatch bool
expectDelete bool
enablePodDisruptionConditions bool
}{
{
description: "scheduling onto tainted Node results in patch and delete when PodDisruptionConditions enabled",
prevPod: testutil.NewPod("pod1", ""),
newPod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
enablePodDisruptionConditions: true,
},
{
description: "scheduling onto tainted Node",
prevPod: testutil.NewPod("pod1", ""),
newPod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
{
description: "scheduling onto tainted Node with toleration",
prevPod: addToleration(testutil.NewPod("pod1", ""), 1, -1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "removing toleration",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
newPod: testutil.NewPod("pod1", "node1"),
awaitForScheduledEviction: true,
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
{
description: "lengthening toleration shouldn't work",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
awaitForScheduledEviction: true,
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)()
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*item.prevPod}})
controller, podIndexer, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = item.taintedNodes
go controller.Run(ctx)
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
if item.awaitForScheduledEviction {
nsName := types.NamespacedName{Namespace: item.prevPod.Namespace, Name: item.prevPod.Name}
err := wait.PollImmediate(time.Millisecond*10, time.Second, func() (bool, error) {
scheduledEviction := controller.taintEvictionQueue.GetWorkerUnsafe(nsName.String())
return scheduledEviction != nil, nil
})
if err != nil {
t.Fatalf("Failed to await for scheduled eviction: %q", err)
}
}
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
func TestCreateNode(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
node *v1.Node
expectPatch bool
expectDelete bool
}{
{
description: "Creating Node matching already assigned Pod",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: testutil.NewNode("node1"),
expectPatch: false,
expectDelete: false,
},
{
description: "Creating tainted Node matching already assigned Pod",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "Creating tainted Node matching already assigned tolerating Pod",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: false,
expectDelete: false,
},
}
for _, item := range testCases {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
nodeIndexer.Add(item.node)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(nil, item.node)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
}
}
func TestDeleteNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
}
go controller.Run(ctx)
controller.NodeUpdated(testutil.NewNode("node1"), nil)
// await until controller.taintedNodes is empty
err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
controller.taintedNodesLock.Lock()
defer controller.taintedNodesLock.Unlock()
_, ok := controller.taintedNodes["node1"]
return !ok, nil
})
if err != nil {
t.Errorf("Failed to await for processing node deleted: %q", err)
}
cancel()
}
func TestUpdateNode(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectPatch bool
expectDelete bool
additionalSleep time.Duration
enablePodDisruptionConditions bool
}{
{
description: "Added taint, expect node patched and deleted when PodDisruptionConditions is enabled",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
enablePodDisruptionConditions: true,
},
{
description: "Added taint",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: true,
},
{
description: "Added tolerated taint",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
},
{
description: "Only one added taint tolerated",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectDelete: true,
},
{
description: "Taint removed",
pods: []v1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
},
oldNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
newNode: testutil.NewNode("node1"),
expectDelete: false,
additionalSleep: 1500 * time.Millisecond,
},
{
description: "Pod with multiple tolerations are evicted when first one runs out",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
},
Spec: v1.PodSpec{
NodeName: "node1",
Tolerations: []v1.Toleration{
{Key: "testTaint1", Value: "test1", Effect: v1.TaintEffectNoExecute, TolerationSeconds: &[]int64{1}[0]},
{Key: "testTaint2", Value: "test2", Effect: v1.TaintEffectNoExecute, TolerationSeconds: &[]int64{100}[0]},
},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(item.oldNode, item.newNode)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
})
}
}
func TestUpdateNodeWithMultipleTaints(t *testing.T) {
taint1 := createNoExecuteTaint(1)
taint2 := createNoExecuteTaint(2)
minute := int64(60)
pod := testutil.NewPod("pod1", "node1")
pod.Spec.Tolerations = []v1.Toleration{
{Key: taint1.Key, Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoExecute},
{Key: taint2.Key, Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoExecute, TolerationSeconds: &minute},
}
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
untaintedNode := testutil.NewNode("node1")
doubleTaintedNode := testutil.NewNode("node1")
doubleTaintedNode.Spec.Taints = []v1.Taint{taint1, taint2}
singleTaintedNode := testutil.NewNode("node1")
singleTaintedNode.Spec.Taints = []v1.Taint{taint1}
ctx, cancel := context.WithCancel(context.TODO())
fakeClientset := fake.NewSimpleClientset(pod)
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
// no taint
nodeIndexer.Add(untaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion with no taints")
}
// no taint -> infinitely tolerated taint
nodeIndexer.Update(singleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion with permanently tolerated taint")
}
// infinitely tolerated taint -> temporarily tolerated taint
nodeIndexer.Update(doubleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) == nil {
t.Fatalf("pod not queued for deletion after addition of temporarily tolerated taint")
}
// temporarily tolerated taint -> infinitely tolerated taint
nodeIndexer.Update(singleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion after removal of temporarily tolerated taint")
}
// verify pod is not deleted
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
t.Error("Unexpected deletion")
}
}
cancel()
}
func TestUpdateNodeWithMultiplePods(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectedDeleteTimes durationSlice
}{
{
description: "Pods with different toleration times are evicted appropriately",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectedDeleteTimes: durationSlice{
{[]string{"pod1"}, 0},
{[]string{"pod2"}, time.Second},
},
},
{
description: "Evict all pods not matching all taints instantly",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectedDeleteTimes: durationSlice{
{[]string{"pod1", "pod2", "pod3"}, 0},
},
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
t.Logf("Starting testcase %q", item.description)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
sort.Sort(item.expectedDeleteTimes)
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(item.oldNode, item.newNode)
startedAt := time.Now()
for i := range item.expectedDeleteTimes {
if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp {
// compute a grace duration to give controller time to process updates. Choose big
// enough intervals in the test cases above to avoid flakes.
var increment time.Duration
if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp {
increment = 500 * time.Millisecond
} else {
increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2))
}
sleepTime := item.expectedDeleteTimes[i].timestamp - time.Since(startedAt) + increment
if sleepTime < 0 {
sleepTime = 0
}
t.Logf("Sleeping for %v", sleepTime)
time.Sleep(sleepTime)
}
for delay, podName := range item.expectedDeleteTimes[i].names {
deleted := false
for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
if podName == deleteAction.GetName() {
deleted = true
}
}
if !deleted {
t.Errorf("Failed to deleted pod %v after %v", podName, delay)
}
}
for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
deletedPodName := deleteAction.GetName()
expected := false
for _, podName := range item.expectedDeleteTimes[i].names {
if podName == deletedPodName {
expected = true
}
}
if !expected {
t.Errorf("Pod %v was deleted even though it shouldn't have", deletedPodName)
}
}
fakeClientset.ClearActions()
}
})
}
}
func TestGetMinTolerationTime(t *testing.T) {
one := int64(1)
two := int64(2)
oneSec := 1 * time.Second
tests := []struct {
tolerations []v1.Toleration
expected time.Duration
}{
{
tolerations: []v1.Toleration{},
expected: 0,
},
{
tolerations: []v1.Toleration{
{
TolerationSeconds: nil,
},
},
expected: -1,
},
{
tolerations: []v1.Toleration{
{
TolerationSeconds: &one,
},
{
TolerationSeconds: &two,
},
},
expected: oneSec,
},
{
tolerations: []v1.Toleration{
{
TolerationSeconds: &one,
},
{
TolerationSeconds: nil,
},
},
expected: oneSec,
},
{
tolerations: []v1.Toleration{
{
TolerationSeconds: nil,
},
{
TolerationSeconds: &one,
},
},
expected: oneSec,
},
}
for _, test := range tests {
got := getMinTolerationTime(test.tolerations)
if got != test.expected {
t.Errorf("Incorrect min toleration time: got %v, expected %v", got, test.expected)
}
}
}
// TestEventualConsistency verifies if getPodsAssignedToNode returns incomplete data
// (e.g. due to watch latency), it will reconcile the remaining pods eventually.
// This scenario is partially covered by TestUpdatePods, but given this is an important
// property of TaintManager, it's better to have explicit test for this.
func TestEventualConsistency(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
prevPod *v1.Pod
newPod *v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectPatch bool
expectDelete bool
}{
{
description: "existing pod2 scheduled onto tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: testutil.NewPod("pod2", ""),
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "existing pod2 with taint toleration scheduled onto tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: addToleration(testutil.NewPod("pod2", ""), 1, 100),
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "new pod2 created on tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "new pod2 with tait toleration created on tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller, podIndexer, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
if item.prevPod != nil {
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
}
// First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet.
controller.NodeUpdated(item.oldNode, item.newNode)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
fakeClientset.ClearActions()
// And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well.
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgressForSanityCheck)
})
}
}
func verifyPodActions(t *testing.T, description string, fakeClientset *fake.Clientset, expectPatch, expectDelete bool) {
t.Helper()
podPatched := false
podDeleted := false
// use Poll instead of PollImmediate to give some processing time to the controller that the expected
// actions are likely to be already sent
err := wait.Poll(10*time.Millisecond, 5*time.Second, func() (bool, error) {
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" {
podPatched = true
}
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
return podPatched == expectPatch && podDeleted == expectDelete, nil
})
if err != nil {
t.Errorf("Failed waiting for the expected actions: %q", err)
}
if podPatched != expectPatch {
t.Errorf("[%v]Unexpected test result. Expected patch %v, got %v", description, expectPatch, podPatched)
}
if podDeleted != expectDelete {
t.Errorf("[%v]Unexpected test result. Expected delete %v, got %v", description, expectDelete, podDeleted)
}
}
// TestPodDeletionEvent Verify that the output events are as expected
func TestPodDeletionEvent(t *testing.T) {
f := func(path cmp.Path) bool {
switch path.String() {
// These fields change at runtime, so ignore it
case "LastTimestamp", "FirstTimestamp", "ObjectMeta.Name":
return true
}
return false
}
t.Run("emitPodDeletionEvent", func(t *testing.T) {
controller := &NoExecuteTaintManager{}
recorder := testutil.NewFakeRecorder()
controller.recorder = recorder
controller.emitPodDeletionEvent(types.NamespacedName{
Name: "test",
Namespace: "test",
})
want := []*v1.Event{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
InvolvedObject: v1.ObjectReference{
Kind: "Pod",
APIVersion: "v1",
Namespace: "test",
Name: "test",
},
Reason: "TaintManagerEviction",
Type: "Normal",
Count: 1,
Message: "Marking for deletion Pod test/test",
Source: v1.EventSource{Component: "nodeControllerTest"},
},
}
if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 {
t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff)
}
})
t.Run("emitCancelPodDeletionEvent", func(t *testing.T) {
controller := &NoExecuteTaintManager{}
recorder := testutil.NewFakeRecorder()
controller.recorder = recorder
controller.emitCancelPodDeletionEvent(types.NamespacedName{
Name: "test",
Namespace: "test",
})
want := []*v1.Event{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
InvolvedObject: v1.ObjectReference{
Kind: "Pod",
APIVersion: "v1",
Namespace: "test",
Name: "test",
},
Reason: "TaintManagerEviction",
Type: "Normal",
Count: 1,
Message: "Cancelling deletion of Pod test/test",
Source: v1.EventSource{Component: "nodeControllerTest"},
},
}
if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 {
t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff)
}
})
}

View File

@@ -1,156 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"sync"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
// WorkArgs keeps arguments that will be passed to the function executed by the worker.
type WorkArgs struct {
NamespacedName types.NamespacedName
}
// KeyFromWorkArgs creates a key for the given `WorkArgs`
func (w *WorkArgs) KeyFromWorkArgs() string {
return w.NamespacedName.String()
}
// NewWorkArgs is a helper function to create new `WorkArgs`
func NewWorkArgs(name, namespace string) *WorkArgs {
return &WorkArgs{types.NamespacedName{Namespace: namespace, Name: name}}
}
// TimedWorker is a responsible for executing a function no earlier than at FireAt time.
type TimedWorker struct {
WorkItem *WorkArgs
CreatedAt time.Time
FireAt time.Time
Timer clock.Timer
}
// createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
func createWorker(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(ctx context.Context, args *WorkArgs) error, clock clock.WithDelayedExecution) *TimedWorker {
delay := fireAt.Sub(createdAt)
logger := klog.FromContext(ctx)
fWithErrorLogging := func() {
err := f(ctx, args)
if err != nil {
logger.Error(err, "NodeLifecycle: timed worker failed")
}
}
if delay <= 0 {
go fWithErrorLogging()
return nil
}
timer := clock.AfterFunc(delay, fWithErrorLogging)
return &TimedWorker{
WorkItem: args,
CreatedAt: createdAt,
FireAt: fireAt,
Timer: timer,
}
}
// Cancel cancels the execution of function by the `TimedWorker`
func (w *TimedWorker) Cancel() {
if w != nil {
w.Timer.Stop()
}
}
// TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution.
type TimedWorkerQueue struct {
sync.Mutex
// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
workers map[string]*TimedWorker
workFunc func(ctx context.Context, args *WorkArgs) error
clock clock.WithDelayedExecution
}
// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute
// given function `f`.
func CreateWorkerQueue(f func(ctx context.Context, args *WorkArgs) error) *TimedWorkerQueue {
return &TimedWorkerQueue{
workers: make(map[string]*TimedWorker),
workFunc: f,
clock: clock.RealClock{},
}
}
func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(ctx context.Context, args *WorkArgs) error {
return func(ctx context.Context, args *WorkArgs) error {
err := q.workFunc(ctx, args)
q.Lock()
defer q.Unlock()
if err == nil {
// To avoid duplicated calls we keep the key in the queue, to prevent
// subsequent additions.
q.workers[key] = nil
} else {
delete(q.workers, key)
}
return err
}
}
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
func (q *TimedWorkerQueue) AddWork(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
logger := klog.FromContext(ctx)
logger.V(4).Info("Adding TimedWorkerQueue item and to be fired at firedTime", "item", key, "createTime", createdAt, "firedTime", fireAt)
q.Lock()
defer q.Unlock()
if _, exists := q.workers[key]; exists {
logger.Info("Trying to add already existing work, skipping", "args", args)
return
}
worker := createWorker(ctx, args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock)
q.workers[key] = worker
}
// CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled.
func (q *TimedWorkerQueue) CancelWork(logger klog.Logger, key string) bool {
q.Lock()
defer q.Unlock()
worker, found := q.workers[key]
result := false
if found {
logger.V(4).Info("Cancelling TimedWorkerQueue item", "item", key, "time", time.Now())
if worker != nil {
result = true
worker.Cancel()
}
delete(q.workers, key)
}
return result
}
// GetWorkerUnsafe returns a TimedWorker corresponding to the given key.
// Unsafe method - workers have attached goroutines which can fire after this function is called.
func (q *TimedWorkerQueue) GetWorkerUnsafe(key string) *TimedWorker {
q.Lock()
defer q.Unlock()
return q.workers[key]
}

View File

@@ -1,156 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/klog/v2/ktesting"
testingclock "k8s.io/utils/clock/testing"
)
func TestExecute(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(5)
queue := CreateWorkerQueue(func(ctx context.Context, args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now)
// Adding the same thing second time should be no-op
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 5 {
t.Errorf("Expected testVal = 5, got %v", lastVal)
}
}
func TestExecuteDelayed(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(5)
queue := CreateWorkerQueue(func(ctx context.Context, args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
fakeClock.Step(11 * time.Second)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 5 {
t.Errorf("Expected testVal = 5, got %v", lastVal)
}
}
func TestCancel(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(3)
queue := CreateWorkerQueue(func(ctx context.Context, args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
logger, _ := ktesting.NewTestContext(t)
queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs())
fakeClock.Step(11 * time.Second)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 3 {
t.Errorf("Expected testVal = 3, got %v", lastVal)
}
}
func TestCancelAndReadd(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(4)
queue := CreateWorkerQueue(func(ctx context.Context, args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
return nil
})
now := time.Now()
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
logger, _ := ktesting.NewTestContext(t)
queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs())
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
fakeClock.Step(11 * time.Second)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 4 {
t.Errorf("Expected testVal = 4, got %v", lastVal)
}
}