kubelet: Refactor tryUpdateNodeStatus() into smaller functions
This commit is contained in:
parent
b2c72feca8
commit
4a50fc4b8c
@ -479,21 +479,40 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error
|
|||||||
if tryNumber == 0 {
|
if tryNumber == 0 {
|
||||||
util.FromApiserverCache(&opts)
|
util.FromApiserverCache(&opts)
|
||||||
}
|
}
|
||||||
node, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
|
originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
|
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
originalNode := node.DeepCopy()
|
|
||||||
if originalNode == nil {
|
if originalNode == nil {
|
||||||
return fmt.Errorf("nil %q node object", kl.nodeName)
|
return fmt.Errorf("nil %q node object", kl.nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node, changed := kl.updateNode(ctx, originalNode)
|
||||||
|
shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency
|
||||||
|
|
||||||
|
if !shouldPatchNodeStatus {
|
||||||
|
kl.markVolumesFromNode(node)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
updatedNode, err := kl.patchNodeStatus(originalNode, node)
|
||||||
|
if err == nil {
|
||||||
|
kl.markVolumesFromNode(updatedNode)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateNode creates a copy of originalNode and runs update logic on it.
|
||||||
|
// It returns the updated node object and a bool indicating if anything has been changed.
|
||||||
|
func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) {
|
||||||
|
node := originalNode.DeepCopy()
|
||||||
|
|
||||||
podCIDRChanged := false
|
podCIDRChanged := false
|
||||||
if len(node.Spec.PodCIDRs) != 0 {
|
if len(node.Spec.PodCIDRs) != 0 {
|
||||||
// Pod CIDR could have been updated before, so we cannot rely on
|
// Pod CIDR could have been updated before, so we cannot rely on
|
||||||
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
|
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
|
||||||
// actually changed.
|
// actually changed.
|
||||||
|
var err error
|
||||||
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
|
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
|
||||||
if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil {
|
if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil {
|
||||||
klog.ErrorS(err, "Error updating pod CIDR")
|
klog.ErrorS(err, "Error updating pod CIDR")
|
||||||
@ -521,41 +540,48 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error
|
|||||||
|
|
||||||
kl.setNodeStatus(ctx, node)
|
kl.setNodeStatus(ctx, node)
|
||||||
|
|
||||||
now := kl.clock.Now()
|
changed := podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent
|
||||||
if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
|
return node, changed
|
||||||
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) && !areRequiredLabelsNotPresent {
|
}
|
||||||
// We must mark the volumes as ReportedInUse in volume manager's dsw even
|
|
||||||
// if no changes were made to the node status (no volumes were added or removed
|
|
||||||
// from the VolumesInUse list).
|
|
||||||
//
|
|
||||||
// The reason is that on a kubelet restart, the volume manager's dsw is
|
|
||||||
// repopulated and the volume ReportedInUse is initialized to false, while the
|
|
||||||
// VolumesInUse list from the Node object still contains the state from the
|
|
||||||
// previous kubelet instantiation.
|
|
||||||
//
|
|
||||||
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
|
|
||||||
// synced from the VolumesInUse list in the Node.Status.
|
|
||||||
//
|
|
||||||
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
|
|
||||||
// because it does not have access to the Node object.
|
|
||||||
// This also cannot be populated on node status manager init because the volume
|
|
||||||
// may not have been added to dsw at that time.
|
|
||||||
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// patchNodeStatus patches node on the API server based on originalNode.
|
||||||
|
// It returns any potential error, or an updatedNode and refreshes the state of kubelet when successful.
|
||||||
|
func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) {
|
||||||
// Patch the current status on the API server
|
// Patch the current status on the API server
|
||||||
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
|
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
kl.lastStatusReportTime = now
|
kl.lastStatusReportTime = kl.clock.Now()
|
||||||
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
|
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
|
||||||
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
|
return updatedNode, nil
|
||||||
// those volumes are already updated in the node's status
|
}
|
||||||
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
|
|
||||||
return nil
|
// markVolumesFromNode updates volumeManager with VolumesInUse status from node.
|
||||||
|
//
|
||||||
|
// In the case of node status update being unnecessary, call with the fetched node.
|
||||||
|
// We must mark the volumes as ReportedInUse in volume manager's dsw even
|
||||||
|
// if no changes were made to the node status (no volumes were added or removed
|
||||||
|
// from the VolumesInUse list).
|
||||||
|
//
|
||||||
|
// The reason is that on a kubelet restart, the volume manager's dsw is
|
||||||
|
// repopulated and the volume ReportedInUse is initialized to false, while the
|
||||||
|
// VolumesInUse list from the Node object still contains the state from the
|
||||||
|
// previous kubelet instantiation.
|
||||||
|
//
|
||||||
|
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
|
||||||
|
// synced from the VolumesInUse list in the Node.Status.
|
||||||
|
//
|
||||||
|
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
|
||||||
|
// because it does not have access to the Node object.
|
||||||
|
// This also cannot be populated on node status manager init because the volume
|
||||||
|
// may not have been added to dsw at that time.
|
||||||
|
//
|
||||||
|
// Or, after a successful node status update, call with updatedNode returned from
|
||||||
|
// the patch call, to mark the volumeInUse as reportedInUse to indicate
|
||||||
|
// those volumes are already updated in the node's status
|
||||||
|
func (kl *Kubelet) markVolumesFromNode(node *v1.Node) {
|
||||||
|
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
|
||||||
}
|
}
|
||||||
|
|
||||||
// recordNodeStatusEvent records an event of the given type with the given
|
// recordNodeStatusEvent records an event of the given type with the given
|
||||||
|
Loading…
Reference in New Issue
Block a user