Merge pull request #88591 from smarterclayton/status_update

kubelet: Avoid sending no-op patches
This commit is contained in:
Kubernetes Prow Robot 2020-03-03 09:43:38 -08:00 committed by GitHub
commit 06b798781a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 29 deletions

View File

@ -541,15 +541,19 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
} }
oldStatus := pod.Status.DeepCopy() oldStatus := pod.Status.DeepCopy()
newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil { if err != nil {
klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
return return
} }
pod = newPod if unchanged {
klog.V(3).Infof("Status for pod %q is up-to-date: (%d)", format.Pod(pod), status.version)
} else {
klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
pod = newPod
}
klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
// We don't handle graceful deletion of mirror pods. // We don't handle graceful deletion of mirror pods.

View File

@ -96,6 +96,7 @@ func getRandomPodStatus() v1.PodStatus {
} }
func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) { func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) {
t.Helper()
manager.consumeUpdates() manager.consumeUpdates()
actions := manager.kubeClient.(*fake.Clientset).Actions() actions := manager.kubeClient.(*fake.Clientset).Actions()
defer manager.kubeClient.(*fake.Clientset).ClearActions() defer manager.kubeClient.(*fake.Clientset).ClearActions()
@ -401,17 +402,17 @@ func TestStaleUpdates(t *testing.T) {
t.Logf("Nothing left in the channel to sync") t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{}) verifyActions(t, m, []core.Action{})
t.Log("Unchanged status should not send an update.") t.Log("Unchanged status should not send an update")
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
t.Log("... unless it's stale.") t.Log("... even if it's stale as long as nothing changes")
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
m.syncBatch() m.syncBatch()
verifyActions(t, m, []core.Action{getAction(), patchAction()}) verifyActions(t, m, []core.Action{getAction()})
t.Logf("Nothing stuck in the pipe.") t.Logf("Nothing stuck in the pipe.")
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
@ -821,8 +822,9 @@ func TestReconcilePodStatus(t *testing.T) {
t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing") t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing")
syncer.podManager.UpdatePod(testPod) syncer.podManager.UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) { if syncer.needsReconcile(testPod.UID, podStatus) {
t.Errorf("Pod status is the same, a reconciliation is not needed") t.Fatalf("Pod status is the same, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch()
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
@ -835,17 +837,19 @@ func TestReconcilePodStatus(t *testing.T) {
testPod.Status.StartTime = &normalizedStartTime testPod.Status.StartTime = &normalizedStartTime
syncer.podManager.UpdatePod(testPod) syncer.podManager.UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) { if syncer.needsReconcile(testPod.UID, podStatus) {
t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed") t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch()
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
testPod.Status = getRandomPodStatus() changedPodStatus := getRandomPodStatus()
syncer.podManager.UpdatePod(testPod) syncer.podManager.UpdatePod(testPod)
if !syncer.needsReconcile(testPod.UID, podStatus) { if !syncer.needsReconcile(testPod.UID, changedPodStatus) {
t.Errorf("Pod status is different, a reconciliation is needed") t.Fatalf("Pod status is different, a reconciliation is needed")
} }
syncer.SetPodStatus(testPod, changedPodStatus)
syncer.syncBatch() syncer.syncBatch()
verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package pod package pod
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -28,26 +29,29 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
) )
// PatchPodStatus patches pod status. // PatchPodStatus patches pod status. It returns true and avoids an update if the patch contains no changes.
func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) { func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, bool, error) {
patchBytes, err := preparePatchBytesForPodStatus(namespace, name, uid, oldPodStatus, newPodStatus) patchBytes, unchanged, err := preparePatchBytesForPodStatus(namespace, name, uid, oldPodStatus, newPodStatus)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, false, err
}
if unchanged {
return nil, patchBytes, true, nil
} }
updatedPod, err := c.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") updatedPod, err := c.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err) return nil, nil, false, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err)
} }
return updatedPod, patchBytes, nil return updatedPod, patchBytes, false, nil
} }
func preparePatchBytesForPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { func preparePatchBytesForPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, bool, error) {
oldData, err := json.Marshal(v1.Pod{ oldData, err := json.Marshal(v1.Pod{
Status: oldPodStatus, Status: oldPodStatus,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err) return nil, false, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err)
} }
newData, err := json.Marshal(v1.Pod{ newData, err := json.Marshal(v1.Pod{
@ -55,12 +59,12 @@ func preparePatchBytesForPodStatus(namespace, name string, uid types.UID, oldPod
Status: newPodStatus, Status: newPodStatus,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err) return nil, false, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err)
} }
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err) return nil, false, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err)
} }
return patchBytes, nil return patchBytes, bytes.Equal(patchBytes, []byte(fmt.Sprintf(`{"metadata":{"uid":%q}}`, uid))), nil
} }

View File

@ -44,11 +44,13 @@ func TestPatchPodStatus(t *testing.T) {
testCases := []struct { testCases := []struct {
description string description string
mutate func(input v1.PodStatus) v1.PodStatus mutate func(input v1.PodStatus) v1.PodStatus
expectUnchanged bool
expectedPatchBytes []byte expectedPatchBytes []byte
}{ }{
{ {
"no change", "no change",
func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input },
true,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"}}`)),
}, },
{ {
@ -57,6 +59,7 @@ func TestPatchPodStatus(t *testing.T) {
input.Message = "random message" input.Message = "random message"
return input return input
}, },
false,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"message":"random message"}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"message":"random message"}}`)),
}, },
{ {
@ -65,6 +68,7 @@ func TestPatchPodStatus(t *testing.T) {
input.Conditions[0].Status = v1.ConditionFalse input.Conditions[0].Status = v1.ConditionFalse
return input return input
}, },
false,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)),
}, },
{ {
@ -78,17 +82,23 @@ func TestPatchPodStatus(t *testing.T) {
} }
return input return input
}, },
false,
[]byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)), []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)),
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
_, patchBytes, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus())) t.Run(tc.description, func(t *testing.T) {
if err != nil { _, patchBytes, unchanged, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus()))
t.Errorf("unexpected error: %v", err) if err != nil {
} t.Errorf("unexpected error: %v", err)
if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) { }
t.Errorf("for test case %q, expect patchBytes: %q, got: %q\n", tc.description, tc.expectedPatchBytes, patchBytes) if unchanged != tc.expectUnchanged {
} t.Errorf("unexpected change: %t", unchanged)
}
if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) {
t.Errorf("expect patchBytes: %q, got: %q\n", tc.expectedPatchBytes, patchBytes)
}
})
} }
} }