Merge pull request #42254 from justinsb/volumes_dont_leak_nodestatusupdateneeded
Automatic merge from submit-queue volumes: SetNodeStatusUpdateNeeded on error If an error happened during the UpdateNodeStatuses loop, there were some code paths where we would not call SetNodeStatusUpdateNeeded, leaking the state. Add it to all paths by adding a function. Part of #40583 ```release-note NONE ```
This commit is contained in:
commit
18362beb0d
@ -22,6 +22,7 @@ go_library(
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
@ -83,65 +84,79 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
|
||||
continue
|
||||
}
|
||||
|
||||
clonedNode, err := api.Scheme.DeepCopy(nodeObj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error cloning node %q: %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
node, ok := clonedNode.(*v1.Node)
|
||||
if !ok || node == nil {
|
||||
return fmt.Errorf(
|
||||
"failed to cast %q object %#v to Node",
|
||||
nodeName,
|
||||
clonedNode)
|
||||
}
|
||||
|
||||
// TODO: Change to pkg/util/node.UpdateNodeStatus.
|
||||
oldData, err := json.Marshal(node)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to Marshal oldData for node %q. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
node.Status.VolumesAttached = attachedVolumes
|
||||
|
||||
newData, err := json.Marshal(node)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to Marshal newData for node %q. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
patchBytes, err :=
|
||||
strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to CreateTwoWayMergePatch for node %q. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
_, err = nsu.kubeClient.Core().Nodes().PatchStatus(string(nodeName), patchBytes)
|
||||
if err != nil {
|
||||
if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil {
|
||||
// If update node status fails, reset flag statusUpdateNeeded back to true
|
||||
// to indicate this node status needs to be updated again
|
||||
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
|
||||
return fmt.Errorf(
|
||||
"failed to kubeClient.Core().Nodes().Patch for node %q. %v",
|
||||
|
||||
glog.V(2).Infof(
|
||||
"Could not update node status for %q; re-marking for update. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
glog.V(2).Infof(
|
||||
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
|
||||
nodeName,
|
||||
string(patchBytes),
|
||||
node.Status.VolumesAttached)
|
||||
|
||||
// We currently always return immediately on error
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error {
|
||||
clonedNode, err := api.Scheme.DeepCopy(nodeObj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error cloning node %q: %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
node, ok := clonedNode.(*v1.Node)
|
||||
if !ok || node == nil {
|
||||
return fmt.Errorf(
|
||||
"failed to cast %q object %#v to Node",
|
||||
nodeName,
|
||||
clonedNode)
|
||||
}
|
||||
|
||||
// TODO: Change to pkg/util/node.UpdateNodeStatus.
|
||||
oldData, err := json.Marshal(node)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to Marshal oldData for node %q. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
node.Status.VolumesAttached = attachedVolumes
|
||||
|
||||
newData, err := json.Marshal(node)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to Marshal newData for node %q. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
patchBytes, err :=
|
||||
strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to CreateTwoWayMergePatch for node %q. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
_, err = nsu.kubeClient.Core().Nodes().PatchStatus(string(nodeName), patchBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to kubeClient.Core().Nodes().Patch for node %q. %v",
|
||||
nodeName,
|
||||
err)
|
||||
}
|
||||
glog.V(4).Infof(
|
||||
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
|
||||
nodeName,
|
||||
string(patchBytes),
|
||||
node.Status.VolumesAttached)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user