Merge pull request #67864 from k82cn/k8s_67823
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md. Taint node in paralle. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #67823 **Release note**: ```release-note None ```
This commit is contained in:
		| @@ -36,6 +36,7 @@ go_library( | ||||
|         "//staging/src/k8s.io/client-go/tools/cache:go_default_library", | ||||
|         "//staging/src/k8s.io/client-go/tools/record:go_default_library", | ||||
|         "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", | ||||
|         "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", | ||||
|         "//vendor/github.com/golang/glog:go_default_library", | ||||
|         "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", | ||||
|     ], | ||||
|   | ||||
| @@ -24,6 +24,8 @@ package nodelifecycle | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"hash/fnv" | ||||
| 	"io" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| @@ -46,6 +48,7 @@ import ( | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| 	"k8s.io/client-go/util/flowcontrol" | ||||
| 	"k8s.io/client-go/util/workqueue" | ||||
| 	v1node "k8s.io/kubernetes/pkg/api/v1/node" | ||||
| 	"k8s.io/kubernetes/pkg/cloudprovider" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| @@ -218,6 +221,9 @@ type Controller struct { | ||||
| 	// if set to true, NodeController will taint Nodes based on its condition for 'NetworkUnavailable', | ||||
| 	// 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'. | ||||
| 	taintNodeByCondition bool | ||||
|  | ||||
| 	nodeUpdateChannels []chan *v1.Node | ||||
| 	nodeUpdateQueue    workqueue.Interface | ||||
| } | ||||
|  | ||||
| // NewNodeLifecycleController returns a new taint controller. | ||||
| @@ -276,6 +282,7 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer, | ||||
| 		runTaintManager:             runTaintManager, | ||||
| 		useTaintBasedEvictions:      useTaintBasedEvictions && runTaintManager, | ||||
| 		taintNodeByCondition:        taintNodeByCondition, | ||||
| 		nodeUpdateQueue:             workqueue.New(), | ||||
| 	} | ||||
| 	if useTaintBasedEvictions { | ||||
| 		glog.Infof("Controller is using taint based evictions.") | ||||
| @@ -343,10 +350,12 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer, | ||||
| 		glog.Infof("Controller will taint node by condition.") | ||||
| 		nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||
| 			AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { | ||||
| 				return nc.doNoScheduleTaintingPass(node) | ||||
| 				nc.nodeUpdateQueue.Add(node) | ||||
| 				return nil | ||||
| 			}), | ||||
| 			UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { | ||||
| 				return nc.doNoScheduleTaintingPass(newNode) | ||||
| 				nc.nodeUpdateQueue.Add(newNode) | ||||
| 				return nil | ||||
| 			}), | ||||
| 		}) | ||||
| 	} | ||||
| @@ -383,18 +392,52 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { | ||||
| 	} | ||||
|  | ||||
| 	if nc.runTaintManager { | ||||
| 		go nc.taintManager.Run(wait.NeverStop) | ||||
| 		go nc.taintManager.Run(stopCh) | ||||
| 	} | ||||
|  | ||||
| 	if nc.taintNodeByCondition { | ||||
| 		for i := 0; i < scheduler.UpdateWorkerSize; i++ { | ||||
| 			nc.nodeUpdateChannels = append(nc.nodeUpdateChannels, make(chan *v1.Node, scheduler.NodeUpdateChannelSize)) | ||||
| 		} | ||||
|  | ||||
| 		// Dispatcher | ||||
| 		go func(stopCh <-chan struct{}) { | ||||
| 			for { | ||||
| 				obj, shutdown := nc.nodeUpdateQueue.Get() | ||||
| 				if shutdown { | ||||
| 					break | ||||
| 				} | ||||
|  | ||||
| 				node := obj.(*v1.Node) | ||||
| 				hash := hash(node.Name, scheduler.UpdateWorkerSize) | ||||
|  | ||||
| 				select { | ||||
| 				case <-stopCh: | ||||
| 					nc.nodeUpdateQueue.Done(node) | ||||
| 					return | ||||
| 				case nc.nodeUpdateChannels[hash] <- node: | ||||
| 				} | ||||
| 				nc.nodeUpdateQueue.Done(node) | ||||
| 			} | ||||
| 		}(stopCh) | ||||
| 		// Close node update queue to cleanup go routine. | ||||
| 		defer nc.nodeUpdateQueue.ShutDown() | ||||
|  | ||||
| 		// Start workers to update NoSchedule taint for nodes. | ||||
| 		for i := 0; i < scheduler.UpdateWorkerSize; i++ { | ||||
| 			go nc.doNoScheduleTaintingPassWorker(i, stopCh) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if nc.useTaintBasedEvictions { | ||||
| 		// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated | ||||
| 		// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. | ||||
| 		go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) | ||||
| 		go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh) | ||||
| 	} else { | ||||
| 		// Managing eviction of nodes: | ||||
| 		// When we delete pods off a node, if the node was not empty at the time we then | ||||
| 		// queue an eviction watcher. If we hit an error, retry deletion. | ||||
| 		go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) | ||||
| 		go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh) | ||||
| 	} | ||||
|  | ||||
| 	// Incorporate the results of node status pushed from kubelet to master. | ||||
| @@ -402,7 +445,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { | ||||
| 		if err := nc.monitorNodeStatus(); err != nil { | ||||
| 			glog.Errorf("Error monitoring node status: %v", err) | ||||
| 		} | ||||
| 	}, nc.nodeMonitorPeriod, wait.NeverStop) | ||||
| 	}, nc.nodeMonitorPeriod, stopCh) | ||||
|  | ||||
| 	<-stopCh | ||||
| } | ||||
| @@ -445,6 +488,19 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (nc *Controller) doNoScheduleTaintingPassWorker(i int, stopCh <-chan struct{}) { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-stopCh: | ||||
| 			return | ||||
| 		case node := <-nc.nodeUpdateChannels[i]: | ||||
| 			if err := nc.doNoScheduleTaintingPass(node); err != nil { | ||||
| 				glog.Errorf("Failed to taint NoSchedule on node <%s>: %v", node.Name, err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { | ||||
| 	// Map node's condition to Taints. | ||||
| 	var taints []v1.Taint | ||||
| @@ -1197,3 +1253,9 @@ func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) | ||||
| 		return notReadyNodes, stateNormal | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func hash(val string, max int) int { | ||||
| 	hasher := fnv.New32a() | ||||
| 	io.WriteString(hasher, val) | ||||
| 	return int(hasher.Sum32()) % max | ||||
| } | ||||
|   | ||||
| @@ -40,9 +40,15 @@ import ( | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	nodeUpdateChannelSize = 10 | ||||
| 	podUpdateChannelSize  = 1 | ||||
| 	retries               = 5 | ||||
| 	// TODO (k82cn): Figure out a reasonable number of workers/channels and propagate | ||||
| 	// the number of workers up making it a paramater of Run() function. | ||||
|  | ||||
| 	// NodeUpdateChannelSize defines the size of channel for node update events. | ||||
| 	NodeUpdateChannelSize = 10 | ||||
| 	// UpdateWorkerSize defines the size of workers for node update or/and pod update. | ||||
| 	UpdateWorkerSize     = 8 | ||||
| 	podUpdateChannelSize = 1 | ||||
| 	retries              = 5 | ||||
| ) | ||||
|  | ||||
| // Needed to make workqueue work | ||||
| @@ -204,11 +210,8 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { | ||||
| func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { | ||||
| 	glog.V(0).Infof("Starting NoExecuteTaintManager") | ||||
|  | ||||
| 	// TODO: Figure out a reasonable number of workers and propagate the | ||||
| 	// number of workers up making it a paramater of Run() function. | ||||
| 	workers := 8 | ||||
| 	for i := 0; i < workers; i++ { | ||||
| 		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, nodeUpdateChannelSize)) | ||||
| 	for i := 0; i < UpdateWorkerSize; i++ { | ||||
| 		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, NodeUpdateChannelSize)) | ||||
| 		tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize)) | ||||
| 	} | ||||
|  | ||||
| @@ -221,11 +224,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { | ||||
| 				break | ||||
| 			} | ||||
| 			nodeUpdate := item.(*nodeUpdateItem) | ||||
| 			hash := hash(nodeUpdate.name(), workers) | ||||
| 			hash := hash(nodeUpdate.name(), UpdateWorkerSize) | ||||
| 			select { | ||||
| 			case <-stopCh: | ||||
| 				tc.nodeUpdateQueue.Done(item) | ||||
| 				break | ||||
| 				return | ||||
| 			case tc.nodeUpdateChannels[hash] <- nodeUpdate: | ||||
| 			} | ||||
| 			tc.nodeUpdateQueue.Done(item) | ||||
| @@ -239,11 +242,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { | ||||
| 				break | ||||
| 			} | ||||
| 			podUpdate := item.(*podUpdateItem) | ||||
| 			hash := hash(podUpdate.nodeName(), workers) | ||||
| 			hash := hash(podUpdate.nodeName(), UpdateWorkerSize) | ||||
| 			select { | ||||
| 			case <-stopCh: | ||||
| 				tc.podUpdateQueue.Done(item) | ||||
| 				break | ||||
| 				return | ||||
| 			case tc.podUpdateChannels[hash] <- podUpdate: | ||||
| 			} | ||||
| 			tc.podUpdateQueue.Done(item) | ||||
| @@ -251,8 +254,8 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { | ||||
| 	}(stopCh) | ||||
|  | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	wg.Add(workers) | ||||
| 	for i := 0; i < workers; i++ { | ||||
| 	wg.Add(UpdateWorkerSize) | ||||
| 	for i := 0; i < UpdateWorkerSize; i++ { | ||||
| 		go tc.worker(i, wg.Done, stopCh) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
|   | ||||
| @@ -639,7 +639,7 @@ func TestTaintNodeByCondition(t *testing.T) { | ||||
| 				t.Errorf("Failed to create node, err: %v", err) | ||||
| 			} | ||||
| 			if err := waitForNodeTaints(cs, node, test.expectedTaints); err != nil { | ||||
| 				t.Errorf("Failed to taint node, err: %v", err) | ||||
| 				t.Errorf("Failed to taint node <%s>, err: %v", node.Name, err) | ||||
| 			} | ||||
|  | ||||
| 			var pods []*v1.Pod | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue