Merge pull request #67031 from krzysztof-jastrzebski/node_startup
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Reduce latency to node ready after CIDR is assigned. This adds code to execute an immediate runtime and node status update when the Kubelet sees that it has a CIDR, which significantly decreases the latency to node ready. ```release-note Speed up kubelet start time by executing an immediate runtime and node status update when the Kubelet sees that it has a CIDR. ```
This commit is contained in:
		| @@ -692,7 +692,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, | ||||
| 	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) | ||||
| 	klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) | ||||
| 	klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy) | ||||
| 	klet.updatePodCIDR(kubeCfg.PodCIDR) | ||||
| 	if err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil { | ||||
| 		glog.Errorf("Pod CIDR update failed %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// setup containerGC | ||||
| 	containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady) | ||||
| @@ -1011,6 +1013,18 @@ type Kubelet struct { | ||||
| 	//    as it takes time to gather all necessary node information. | ||||
| 	nodeStatusUpdateFrequency time.Duration | ||||
|  | ||||
| 	// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe. | ||||
| 	// This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else. | ||||
| 	syncNodeStatusMux sync.Mutex | ||||
|  | ||||
| 	// updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe. | ||||
| 	// This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else. | ||||
| 	updatePodCIDRMux sync.Mutex | ||||
|  | ||||
| 	// updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe. | ||||
| 	// This lock is used by Kublet.updateRuntimeUp function and shouldn't be used anywhere else. | ||||
| 	updateRuntimeMux sync.Mutex | ||||
|  | ||||
| 	// Generates pod events. | ||||
| 	pleg pleg.PodLifecycleEventGenerator | ||||
|  | ||||
| @@ -1347,6 +1361,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { | ||||
| 	if kl.kubeClient != nil { | ||||
| 		// Start syncing node status immediately, this may set up things the runtime needs to run. | ||||
| 		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) | ||||
| 		go kl.fastStatusUpdateOnce() | ||||
| 	} | ||||
| 	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) | ||||
|  | ||||
| @@ -2079,6 +2094,9 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { | ||||
| // and returns an error if the status check fails.  If the status check is OK, | ||||
| // update the container runtime uptime in the kubelet runtimeState. | ||||
| func (kl *Kubelet) updateRuntimeUp() { | ||||
| 	kl.updateRuntimeMux.Lock() | ||||
| 	defer kl.updateRuntimeMux.Unlock() | ||||
|  | ||||
| 	s, err := kl.containerRuntime.Status() | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("Container runtime sanity check failed: %v", err) | ||||
| @@ -2153,6 +2171,31 @@ func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID str | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR | ||||
| // is applied  and tries to update pod CIDR immediately. After pod CIDR is updated it fires off | ||||
| // a runtime update and a node status update. Function returns after one successful node status update. | ||||
| // Function is executed only during Kubelet start which improves latency to ready node by updating | ||||
| // pod CIDR, runtime status and node statuses ASAP. | ||||
| func (kl *Kubelet) fastStatusUpdateOnce() { | ||||
| 	for { | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 		node, err := kl.GetNode() | ||||
| 		if err != nil { | ||||
| 			glog.Errorf(err.Error()) | ||||
| 			continue | ||||
| 		} | ||||
| 		if node.Spec.PodCIDR != "" { | ||||
| 			if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { | ||||
| 				glog.Errorf("Pod CIDR update failed %v", err) | ||||
| 				continue | ||||
| 			} | ||||
| 			kl.updateRuntimeUp() | ||||
| 			kl.syncNodeStatus() | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // isSyncPodWorthy filters out events that are not worthy of pod syncing | ||||
| func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { | ||||
| 	// ContatnerRemoved doesn't affect pod state | ||||
|   | ||||
| @@ -56,22 +56,25 @@ func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool { | ||||
|  | ||||
| // updatePodCIDR updates the pod CIDR in the runtime state if it is different | ||||
| // from the current CIDR. | ||||
| func (kl *Kubelet) updatePodCIDR(cidr string) { | ||||
| func (kl *Kubelet) updatePodCIDR(cidr string) error { | ||||
| 	kl.updatePodCIDRMux.Lock() | ||||
| 	defer kl.updatePodCIDRMux.Unlock() | ||||
|  | ||||
| 	podCIDR := kl.runtimeState.podCIDR() | ||||
|  | ||||
| 	if podCIDR == cidr { | ||||
| 		return | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// kubelet -> generic runtime -> runtime shim -> network plugin | ||||
| 	// docker/non-cri implementations have a passthrough UpdatePodCIDR | ||||
| 	if err := kl.getRuntime().UpdatePodCIDR(cidr); err != nil { | ||||
| 		glog.Errorf("Failed to update pod CIDR: %v", err) | ||||
| 		return | ||||
| 		return fmt.Errorf("failed to update pod CIDR: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr) | ||||
| 	kl.runtimeState.setPodCIDR(cidr) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // syncNetworkUtil ensures the network utility are present on host. | ||||
|   | ||||
| @@ -351,6 +351,9 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { | ||||
| // It synchronizes node status to master, registering the kubelet first if | ||||
| // necessary. | ||||
| func (kl *Kubelet) syncNodeStatus() { | ||||
| 	kl.syncNodeStatusMux.Lock() | ||||
| 	defer kl.syncNodeStatusMux.Unlock() | ||||
|  | ||||
| 	if kl.kubeClient == nil || kl.heartbeatClient == nil { | ||||
| 		return | ||||
| 	} | ||||
| @@ -402,7 +405,9 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { | ||||
| 	} | ||||
|  | ||||
| 	if node.Spec.PodCIDR != "" { | ||||
| 		kl.updatePodCIDR(node.Spec.PodCIDR) | ||||
| 		if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { | ||||
| 			glog.Errorf(err.Error()) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	kl.setNodeStatus(node) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue