Add pod disruption conditions for kubelet initiated failures

This commit is contained in:
Michal Wozniak
2022-10-10 13:58:40 +02:00
parent c519bc02e8
commit 52cd6755eb
17 changed files with 883 additions and 21 deletions

View File

@@ -34,8 +34,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
@@ -150,7 +152,8 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD
func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
oldCopy := oldStatus.DeepCopy()
for _, c := range status.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
// both owned and shared conditions are used for kubelet status equality
if kubetypes.PodConditionByKubelet(c.Type) || kubetypes.PodConditionSharedByKubelet(c.Type) {
_, oc := podutil.GetPodCondition(oldCopy, c.Type)
if oc == nil || oc.Status != c.Status || oc.Message != c.Message || oc.Reason != c.Reason {
return false
@@ -501,6 +504,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// Set PodScheduledCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
// Set DisruptionTarget.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.AlphaNoCompatGuaranteeDisruptionTarget)
}
// ensure that the start time does not change across updates.
if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
status.StartTime = oldStatus.StartTime
@@ -879,9 +887,17 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon
podConditions = append(podConditions, c)
}
}
for _, c := range newPodStatus.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
} else if kubetypes.PodConditionSharedByKubelet(c.Type) {
// for shared conditions we update or append in podConditions
if i, _ := podutil.GetPodConditionFromList(podConditions, c.Type); i >= 0 {
podConditions[i] = c
} else {
podConditions = append(podConditions, c)
}
}
}
newPodStatus.Conditions = podConditions
@@ -900,6 +916,16 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon
newPodStatus.Phase = oldPodStatus.Phase
newPodStatus.Reason = oldPodStatus.Reason
newPodStatus.Message = oldPodStatus.Message
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
// revert setting of the pod disruption condition until the pod is terminal in order to do not issue
// an unnecessary PATCH request
revertPodCondition(&oldPodStatus, &newPodStatus, v1.AlphaNoCompatGuaranteeDisruptionTarget)
}
} else {
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
// update the LastTransitionTime when transitioning into the failed state
updateLastTransitionTime(&newPodStatus, &oldPodStatus, v1.AlphaNoCompatGuaranteeDisruptionTarget)
}
}
}
@@ -920,6 +946,18 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon
return newPodStatus
}
func revertPodCondition(oldPodStatus, newPodStatus *v1.PodStatus, cType v1.PodConditionType) {
if newIndex, newCondition := podutil.GetPodConditionFromList(newPodStatus.Conditions, cType); newCondition != nil {
if _, oldCondition := podutil.GetPodConditionFromList(oldPodStatus.Conditions, cType); oldCondition != nil {
// revert the new condition to what was before
newPodStatus.Conditions[newIndex] = *oldCondition
} else {
// delete the new condition as it wasn't there before
newPodStatus.Conditions = append(newPodStatus.Conditions[:newIndex], newPodStatus.Conditions[newIndex+1:]...)
}
}
}
// NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile
func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
if len(pod.Spec.ReadinessGates) == 0 {

View File

@@ -34,11 +34,14 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
kubeconfigmap "k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
@@ -1400,22 +1403,204 @@ func deleteAction() core.DeleteAction {
func TestMergePodStatus(t *testing.T) {
useCases := []struct {
desc string
hasRunningContainers bool
oldPodStatus func(input v1.PodStatus) v1.PodStatus
newPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus
desc string
enablePodDisruptionConditions bool
hasRunningContainers bool
oldPodStatus func(input v1.PodStatus) v1.PodStatus
newPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus
}{
{
"no change",
false,
false,
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus { return input },
getPodStatus(),
},
{
"add DisruptionTarget condition when transitioning into failed phase; PodDisruptionConditions enabled",
true,
false,
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodFailed
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
})
return input
},
v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
},
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
Reason: "PodFailed",
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
Reason: "PodFailed",
},
},
Message: "Message",
},
},
{
"don't add DisruptionTarget condition when transitioning into failed phase, but there are might still be running containers; PodDisruptionConditions enabled",
true,
true,
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodFailed
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
})
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Message: "Message",
},
},
{
"preserve DisruptionTarget condition; PodDisruptionConditions enabled",
true,
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
})
return input
},
func(input v1.PodStatus) v1.PodStatus {
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
},
},
Message: "Message",
},
},
{
"preserve DisruptionTarget condition; PodDisruptionConditions disabled",
false,
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
})
return input
},
func(input v1.PodStatus) v1.PodStatus {
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
},
},
Message: "Message",
},
},
{
"override DisruptionTarget condition; PodDisruptionConditions enabled",
true,
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "EvictedByEvictionAPI",
})
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
})
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "TerminationByKubelet",
},
},
Message: "Message",
},
},
{
"readiness changes",
false,
false,
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse
@@ -1439,6 +1624,7 @@ func TestMergePodStatus(t *testing.T) {
{
"additional pod condition",
false,
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
@@ -1469,6 +1655,7 @@ func TestMergePodStatus(t *testing.T) {
{
"additional pod condition and readiness changes",
false,
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
@@ -1502,6 +1689,7 @@ func TestMergePodStatus(t *testing.T) {
{
"additional pod condition changes",
false,
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
@@ -1538,6 +1726,7 @@ func TestMergePodStatus(t *testing.T) {
{
"phase is transitioning to failed and no containers running",
false,
false,
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodRunning
input.Reason = "Unknown"
@@ -1556,10 +1745,12 @@ func TestMergePodStatus(t *testing.T) {
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
Reason: "PodFailed",
},
{
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
Reason: "PodFailed",
},
{
Type: v1.PodScheduled,
@@ -1572,6 +1763,7 @@ func TestMergePodStatus(t *testing.T) {
},
{
"phase is transitioning to failed and containers running",
false,
true,
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodRunning
@@ -1605,6 +1797,7 @@ func TestMergePodStatus(t *testing.T) {
for _, tc := range useCases {
t.Run(tc.desc, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()), tc.hasRunningContainers)
if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) {
t.Fatalf("unexpected output: %s", cmp.Diff(tc.expectPodStatus, output))
@@ -1630,7 +1823,7 @@ func conditionsEqual(left, right []v1.PodCondition) bool {
for _, r := range right {
if l.Type == r.Type {
found = true
if l.Status != r.Status {
if l.Status != r.Status || l.Reason != r.Reason {
return false
}
}