Introduction of a pod condition type indicating disruption. Its reason field indicates the reason:

- PreemptionByKubeScheduler (Pod preempted by kube-scheduler)
- DeletionByTaintManager (Pod deleted by taint manager due to NoExecute taint)
- EvictionByEvictionAPI (Pod evicted by Eviction API)
- DeletionByPodGC (an orphaned Pod deleted by PodGC)PreemptedByScheduler (Pod preempted by kube-scheduler)
This commit is contained in:
Michal Wozniak
2022-08-02 07:58:08 +02:00
parent 719f3cf8da
commit 04fcbd721c
18 changed files with 979 additions and 396 deletions

View File

@@ -2430,6 +2430,10 @@ const (
PodReasonUnschedulable = "Unschedulable"
// ContainersReady indicates whether all containers in the pod are ready.
ContainersReady PodConditionType = "ContainersReady"
// AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be deleted due to a
// disruption (such as preemption, eviction API or garbage-collection).
// The constant is to be renamed once the name is accepted within the KEP-3329.
AlphaNoCompatGuaranteeDisruptionTarget PodConditionType = "DisruptionTarget"
)
// PodCondition represents pod's condition

View File

@@ -30,14 +30,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/klog/v2"
)
@@ -105,7 +109,7 @@ func deletePodHandler(c clientset.Interface, emitEventFunc func(types.Namespaced
}
var err error
for i := 0; i < retries; i++ {
err = c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
err = addConditionAndDeletePod(ctx, c, name, ns)
if err == nil {
break
}
@@ -115,6 +119,27 @@ func deletePodHandler(c clientset.Interface, emitEventFunc func(types.Namespaced
}
}
func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
newStatus := pod.Status.DeepCopy()
if apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
}) {
if _, _, _, err = utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
}
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
}
func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
result := []v1.Taint{}
for i := range taints {

View File

@@ -27,11 +27,14 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -128,10 +131,12 @@ func TestFilterNoExecuteTaints(t *testing.T) {
func TestCreatePod(t *testing.T) {
testCases := []struct {
description string
pod *v1.Pod
taintedNodes map[string][]v1.Taint
expectDelete bool
description string
pod *v1.Pod
taintedNodes map[string][]v1.Taint
expectPatch bool
expectDelete bool
enablePodDisruptionConditions bool
}{
{
description: "not scheduled - ignore",
@@ -153,6 +158,16 @@ func TestCreatePod(t *testing.T) {
},
expectDelete: true,
},
{
description: "schedule on tainted Node; PodDisruptionConditions enabled",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
enablePodDisruptionConditions: true,
},
{
description: "schedule on tainted Node with finite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
@@ -180,28 +195,24 @@ func TestCreatePod(t *testing.T) {
}
for _, item := range testCases {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset()
controller, podIndexer, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = item.taintedNodes
t.Run(item.description, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)()
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*item.pod}})
controller, podIndexer, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = item.taintedNodes
podIndexer.Add(item.pod)
controller.PodUpdated(nil, item.pod)
// wait a bit
time.Sleep(timeForControllerToProgress)
podIndexer.Add(item.pod)
controller.PodUpdated(nil, item.pod)
// wait a bit
time.Sleep(timeForControllerToProgress)
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
cancel()
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
@@ -222,13 +233,26 @@ func TestDeletePod(t *testing.T) {
func TestUpdatePod(t *testing.T) {
testCases := []struct {
description string
prevPod *v1.Pod
newPod *v1.Pod
taintedNodes map[string][]v1.Taint
expectDelete bool
additionalSleep time.Duration
description string
prevPod *v1.Pod
newPod *v1.Pod
taintedNodes map[string][]v1.Taint
expectPatch bool
expectDelete bool
additionalSleep time.Duration
enablePodDisruptionConditions bool
}{
{
description: "scheduling onto tainted Node results in patch and delete when PodDisruptionConditions enabled",
prevPod: testutil.NewPod("pod1", ""),
newPod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
enablePodDisruptionConditions: true,
},
{
description: "scheduling onto tainted Node",
prevPod: testutil.NewPod("pod1", ""),
@@ -269,36 +293,31 @@ func TestUpdatePod(t *testing.T) {
}
for _, item := range testCases {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset()
controller, podIndexer, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = item.taintedNodes
t.Run(item.description, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)()
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*item.prevPod}})
controller, podIndexer, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = item.taintedNodes
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
fakeClientset.ClearActions()
time.Sleep(timeForControllerToProgress)
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgress)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
fakeClientset.ClearActions()
time.Sleep(timeForControllerToProgress)
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgress)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
cancel()
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
@@ -346,15 +365,8 @@ func TestCreateNode(t *testing.T) {
// wait a bit
time.Sleep(timeForControllerToProgress)
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
verifyPodActions(t, item.description, fakeClientset, false, item.expectDelete)
cancel()
}
}
@@ -381,13 +393,26 @@ func TestDeleteNode(t *testing.T) {
func TestUpdateNode(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectDelete bool
additionalSleep time.Duration
description string
pods []v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectPatch bool
expectDelete bool
additionalSleep time.Duration
enablePodDisruptionConditions bool
}{
{
description: "Added taint, expect node patched and deleted when PodDisruptionConditions is enabled",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
enablePodDisruptionConditions: true,
},
{
description: "Added taint",
pods: []v1.Pod{
@@ -458,29 +483,24 @@ func TestUpdateNode(t *testing.T) {
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(context.TODO())
controller.NodeUpdated(item.oldNode, item.newNode)
// wait a bit
time.Sleep(timeForControllerToProgress)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
t.Run(item.description, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)()
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(context.TODO())
controller.NodeUpdated(item.oldNode, item.newNode)
// wait a bit
time.Sleep(timeForControllerToProgress)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
close(stopCh)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
close(stopCh)
})
}
}
@@ -765,7 +785,7 @@ func TestEventualConsistency(t *testing.T) {
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
expectDelete: true,
},
{
description: "new pod2 created on tainted Node",
@@ -787,7 +807,7 @@ func TestEventualConsistency(t *testing.T) {
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
expectDelete: true,
},
}
@@ -809,15 +829,7 @@ func TestEventualConsistency(t *testing.T) {
// TODO(mborsz): Remove this sleep and other sleeps in this file.
time.Sleep(timeForControllerToProgress)
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if !podDeleted {
t.Errorf("%v: Unexpected test result. Expected delete, got: %v", item.description, podDeleted)
}
verifyPodActions(t, item.description, fakeClientset, false, item.expectDelete)
fakeClientset.ClearActions()
// And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well.
@@ -826,15 +838,26 @@ func TestEventualConsistency(t *testing.T) {
// wait a bit
time.Sleep(timeForControllerToProgress)
podDeleted = false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
close(stopCh)
}
}
func verifyPodActions(t *testing.T, description string, fakeClientset *fake.Clientset, expectPatch, expectDelete bool) {
t.Helper()
podPatched := false
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" {
podPatched = true
}
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podPatched != expectPatch {
t.Errorf("[%v]Unexpected test result. Expected patch %v, got %v", description, expectPatch, podPatched)
}
if podDeleted != expectDelete {
t.Errorf("[%v]Unexpected test result. Expected delete %v, got %v", description, expectDelete, podDeleted)
}
}

View File

@@ -37,8 +37,10 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
nodeutil "k8s.io/kubernetes/pkg/util/node"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/kubernetes/pkg/util/taints"
)
@@ -171,13 +173,13 @@ func (gcc *PodGCController) gcTerminating(ctx context.Context, pods []*v1.Pod) {
var wait sync.WaitGroup
for i := 0; i < deleteCount; i++ {
wait.Add(1)
go func(namespace string, name string) {
go func(pod *v1.Pod) {
defer wait.Done()
if err := gcc.deletePod(ctx, namespace, name); err != nil {
if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
// ignore not founds
utilruntime.HandleError(err)
}
}(terminatingPods[i].Namespace, terminatingPods[i].Name)
}(terminatingPods[i])
}
wait.Wait()
}
@@ -203,13 +205,13 @@ func (gcc *PodGCController) gcTerminated(ctx context.Context, pods []*v1.Pod) {
var wait sync.WaitGroup
for i := 0; i < deleteCount; i++ {
wait.Add(1)
go func(namespace string, name string) {
go func(pod *v1.Pod) {
defer wait.Done()
if err := gcc.deletePod(ctx, namespace, name); err != nil {
if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
// ignore not founds
defer utilruntime.HandleError(err)
}
}(terminatedPods[i].Namespace, terminatedPods[i].Name)
}(terminatedPods[i])
}
wait.Wait()
}
@@ -238,7 +240,13 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node
continue
}
klog.V(2).InfoS("Found orphaned Pod assigned to the Node, deleting.", "pod", klog.KObj(pod), "node", pod.Spec.NodeName)
if err := gcc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
condition := &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists",
}
if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil {
utilruntime.HandleError(err)
} else {
klog.V(0).InfoS("Forced deletion of orphaned Pod succeeded", "pod", klog.KObj(pod))
@@ -287,7 +295,7 @@ func (gcc *PodGCController) gcUnscheduledTerminating(ctx context.Context, pods [
}
klog.V(2).InfoS("Found unscheduled terminating Pod not assigned to any Node, deleting.", "pod", klog.KObj(pod))
if err := gcc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
utilruntime.HandleError(err)
} else {
klog.V(0).InfoS("Forced deletion of unscheduled terminating Pod succeeded", "pod", klog.KObj(pod))
@@ -308,7 +316,30 @@ func (o byCreationTimestamp) Less(i, j int) bool {
return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
}
func (gcc *PodGCController) deletePod(ctx context.Context, namespace, name string) error {
klog.InfoS("PodGC is force deleting Pod", "pod", klog.KRef(namespace, name))
return gcc.kubeClient.CoreV1().Pods(namespace).Delete(ctx, name, *metav1.NewDeleteOptions(0))
func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.Pod) error {
return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil)
}
func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error {
klog.InfoS("PodGC is force deleting Pod", "pod", klog.KRef(pod.Namespace, pod.Name))
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
newStatus := pod.Status.DeepCopy()
updated := false
if condition != nil {
updated = apipod.UpdatePodCondition(newStatus, condition)
}
// Mark the pod as failed - this is especially important in case the pod
// is orphaned, in which case the pod would remain in the Running phase
// forever as there is no kubelet running to change the phase.
if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
newStatus.Phase = v1.PodFailed
updated = true
}
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
}
return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0))
}

View File

@@ -21,6 +21,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -51,29 +52,6 @@ func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (
return controller, podInformer, nodeInformer
}
func compareStringSetToList(set sets.String, list []string) bool {
for _, item := range list {
if !set.Has(item) {
return false
}
}
if len(list) != len(set) {
return false
}
return true
}
func getDeletedPodNames(client *fake.Clientset) []string {
deletedPodNames := make([]string, 0)
for _, action := range client.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
deleteAction := action.(clienttesting.DeleteAction)
deletedPodNames = append(deletedPodNames, deleteAction.GetName())
}
}
return deletedPodNames
}
func TestGCTerminated(t *testing.T) {
type nameToPhase struct {
name string
@@ -81,11 +59,25 @@ func TestGCTerminated(t *testing.T) {
}
testCases := []struct {
name string
pods []nameToPhase
threshold int
deletedPodNames sets.String
name string
pods []nameToPhase
threshold int
deletedPodNames sets.String
patchedPodNames sets.String
enablePodDisruptionConditions bool
}{
{
name: "delete pod a which is PodFailed and pod b which is PodSucceeded; PodDisruptionConditions enabled",
pods: []nameToPhase{
{name: "a", phase: v1.PodFailed},
{name: "b", phase: v1.PodSucceeded},
{name: "c", phase: v1.PodFailed},
},
threshold: 1,
patchedPodNames: sets.NewString(),
deletedPodNames: sets.NewString("a", "b"),
enablePodDisruptionConditions: true,
},
{
name: "threshold = 0, disables terminated pod deletion",
pods: []nameToPhase{
@@ -136,29 +128,30 @@ func TestGCTerminated(t *testing.T) {
},
}
for i, test := range testCases {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}})
gcc, podInformer, _ := NewFromClient(client, test.threshold)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
creationTime := time.Unix(0, 0)
nodes := []*v1.Node{testutil.NewNode("node")}
pods := make([]*v1.Pod, 0, len(test.pods))
for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
podInformer.Informer().GetStore().Add(&v1.Pod{
pods = append(pods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}},
Status: v1.PodStatus{Phase: pod.phase},
Spec: v1.PodSpec{NodeName: "node"},
})
}
client := setupNewSimpleClient(nodes, pods)
gcc, podInformer, _ := NewFromClient(client, test.threshold)
for _, pod := range pods {
podInformer.Informer().GetStore().Add(pod)
}
gcc.gc(context.TODO())
deletedPodNames := getDeletedPodNames(client)
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
i, test.deletedPodNames.List(), deletedPodNames)
}
verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames)
})
}
}
@@ -185,17 +178,19 @@ func waitForAdded(q workqueue.DelayingInterface, depth int) error {
func TestGCOrphaned(t *testing.T) {
testCases := []struct {
name string
initialClientNodes []*v1.Node
initialInformerNodes []*v1.Node
delay time.Duration
addedClientNodes []*v1.Node
deletedClientNodes []*v1.Node
addedInformerNodes []*v1.Node
deletedInformerNodes []*v1.Node
pods []*v1.Pod
itemsInQueue int
deletedPodNames sets.String
name string
initialClientNodes []*v1.Node
initialInformerNodes []*v1.Node
delay time.Duration
addedClientNodes []*v1.Node
deletedClientNodes []*v1.Node
addedInformerNodes []*v1.Node
deletedInformerNodes []*v1.Node
pods []*v1.Pod
itemsInQueue int
deletedPodNames sets.String
patchedPodNames sets.String
enablePodDisruptionConditions bool
}{
{
name: "nodes present in lister",
@@ -237,6 +232,18 @@ func TestGCOrphaned(t *testing.T) {
itemsInQueue: 1,
deletedPodNames: sets.NewString("a", "b"),
},
{
name: "no nodes with PodDisruptionConditions enabled",
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodFailed),
makePod("b", "deleted", v1.PodSucceeded),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a", "b"),
patchedPodNames: sets.NewString("a", "b"),
enablePodDisruptionConditions: true,
},
{
name: "quarantine not finished",
delay: quarantineTime / 2,
@@ -317,11 +324,16 @@ func TestGCOrphaned(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
nodeList := &v1.NodeList{}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
nodes := make([]*v1.Node, 0, len(test.initialClientNodes))
for _, node := range test.initialClientNodes {
nodeList.Items = append(nodeList.Items, *node)
nodes = append(nodes, node)
}
client := fake.NewSimpleClientset(nodeList)
pods := make([]*v1.Pod, 0, len(test.pods))
for _, pod := range test.pods {
pods = append(pods, pod)
}
client := setupNewSimpleClient(nodes, pods)
gcc, podInformer, nodeInformer := NewFromClient(client, -1)
for _, node := range test.initialInformerNodes {
nodeInformer.Informer().GetStore().Add(node)
@@ -369,12 +381,7 @@ func TestGCOrphaned(t *testing.T) {
// Actual pod deletion
gcc.gc(context.TODO())
deletedPodNames = getDeletedPodNames(client)
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
test.deletedPodNames.List(), deletedPodNames)
}
verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames)
})
}
}
@@ -388,10 +395,23 @@ func TestGCUnscheduledTerminating(t *testing.T) {
}
testCases := []struct {
name string
pods []nameToPhase
deletedPodNames sets.String
name string
pods []nameToPhase
deletedPodNames sets.String
patchedPodNames sets.String
enablePodDisruptionConditions bool
}{
{
name: "Unscheduled pod in any phase must be deleted, the phase of the running pod is changed to Failed; PodDisruptionConditions enabled",
pods: []nameToPhase{
{name: "a", phase: v1.PodFailed, deletionTimeStamp: &metav1.Time{}, nodeName: ""},
{name: "b", phase: v1.PodSucceeded, deletionTimeStamp: &metav1.Time{}, nodeName: ""},
{name: "c", phase: v1.PodRunning, deletionTimeStamp: &metav1.Time{}, nodeName: ""},
},
deletedPodNames: sets.NewString("a", "b", "c"),
patchedPodNames: sets.NewString("c"),
enablePodDisruptionConditions: true,
},
{
name: "Unscheduled pod in any phase must be deleted",
pods: []nameToPhase{
@@ -412,21 +432,28 @@ func TestGCUnscheduledTerminating(t *testing.T) {
},
}
for i, test := range testCases {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
gcc, podInformer, _ := NewFromClient(client, -1)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
creationTime := time.Unix(0, 0)
pods := make([]*v1.Pod, 0, len(test.pods))
for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
podInformer.Informer().GetStore().Add(&v1.Pod{
pods = append(pods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime},
DeletionTimestamp: pod.deletionTimeStamp},
Status: v1.PodStatus{Phase: pod.phase},
Spec: v1.PodSpec{NodeName: pod.nodeName},
})
}
nodes := []*v1.Node{}
client := setupNewSimpleClient(nodes, pods)
gcc, podInformer, _ := NewFromClient(client, -1)
for _, pod := range pods {
podInformer.Informer().GetStore().Add(pod)
}
pods, err := podInformer.Lister().List(labels.Everything())
if err != nil {
@@ -434,12 +461,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
return
}
gcc.gcUnscheduledTerminating(context.TODO(), pods)
deletedPodNames := getDeletedPodNames(client)
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v",
i, test.deletedPodNames.List(), deletedPodNames, test.name)
}
verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames)
})
}
}
@@ -460,10 +482,12 @@ func TestGCTerminating(t *testing.T) {
}
testCases := []struct {
name string
pods []nameToPodConfig
nodes []node
deletedPodNames sets.String
name string
pods []nameToPodConfig
nodes []node
deletedPodNames sets.String
patchedPodNames sets.String
enablePodDisruptionConditions bool
}{
{
name: "pods have deletion timestamp set and the corresponding nodes are not ready",
@@ -544,16 +568,31 @@ func TestGCTerminating(t *testing.T) {
},
deletedPodNames: sets.NewString("b1", "b4", "b5", "b6"),
},
{
name: "pods deleted from node tained out-of-service; PodDisruptionConditions enabled",
nodes: []node{
{name: "worker", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService,
Effect: v1.TaintEffectNoExecute}}},
},
pods: []nameToPodConfig{
{name: "a", phase: v1.PodRunning, deletionTimeStamp: &metav1.Time{}, nodeName: "worker"},
{name: "b", phase: v1.PodFailed, deletionTimeStamp: &metav1.Time{}, nodeName: "worker"},
{name: "c", phase: v1.PodSucceeded, deletionTimeStamp: &metav1.Time{}, nodeName: "worker"},
},
deletedPodNames: sets.NewString("a", "b", "c"),
patchedPodNames: sets.NewString("a"),
enablePodDisruptionConditions: true,
},
}
for i, test := range testCases {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node-a")}})
gcc, podInformer, nodeInformer := NewFromClient(client, -1)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
creationTime := time.Unix(0, 0)
nodes := make([]*v1.Node, 0, len(test.nodes))
for _, node := range test.nodes {
creationTime = creationTime.Add(2 * time.Hour)
nodeInformer.Informer().GetStore().Add(&v1.Node{
nodes = append(nodes, &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: node.name, CreationTimestamp: metav1.Time{Time: creationTime}},
Spec: v1.NodeSpec{
Taints: node.taints,
@@ -568,24 +607,74 @@ func TestGCTerminating(t *testing.T) {
},
})
}
pods := make([]*v1.Pod, 0, len(test.pods))
for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
podInformer.Informer().GetStore().Add(&v1.Pod{
pods = append(pods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime},
DeletionTimestamp: pod.deletionTimeStamp},
Status: v1.PodStatus{Phase: pod.phase},
Spec: v1.PodSpec{NodeName: pod.nodeName},
})
}
client := setupNewSimpleClient(nodes, pods)
gcc, podInformer, nodeInformer := NewFromClient(client, -1)
for _, pod := range pods {
podInformer.Informer().GetStore().Add(pod)
}
for _, node := range nodes {
nodeInformer.Informer().GetStore().Add(node)
}
gcc.gc(context.TODO())
deletedPodNames := getDeletedPodNames(client)
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
i, test.deletedPodNames.List(), deletedPodNames)
}
verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames)
})
}
}
func verifyDeletedAndPatchedPods(t *testing.T, client *fake.Clientset, wantDeletedPodNames, wantPatchedPodNames sets.String) {
t.Helper()
deletedPodNames := getDeletedPodNames(client)
if diff := cmp.Diff(wantDeletedPodNames, deletedPodNames); diff != "" {
t.Errorf("Deleted pod names (-want,+got):\n%s", diff)
}
patchedPodNames := getPatchedPodNames(client)
if diff := cmp.Diff(wantPatchedPodNames, patchedPodNames); diff != "" {
t.Errorf("Patched pod names (-want,+got):\n%s", diff)
}
}
func setupNewSimpleClient(nodes []*v1.Node, pods []*v1.Pod) *fake.Clientset {
podList := &v1.PodList{}
for _, podItem := range pods {
podList.Items = append(podList.Items, *podItem)
}
nodeList := &v1.NodeList{}
for _, nodeItem := range nodes {
nodeList.Items = append(nodeList.Items, *nodeItem)
}
return fake.NewSimpleClientset(nodeList, podList)
}
func getDeletedPodNames(client *fake.Clientset) sets.String {
deletedPodNames := sets.NewString()
for _, action := range client.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
deleteAction := action.(clienttesting.DeleteAction)
deletedPodNames.Insert(deleteAction.GetName())
}
}
return deletedPodNames
}
func getPatchedPodNames(client *fake.Clientset) sets.String {
patchedPodNames := sets.NewString()
for _, action := range client.Actions() {
if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" {
patchAction := action.(clienttesting.PatchAction)
patchedPodNames.Insert(patchAction.GetName())
}
}
return patchedPodNames
}

View File

@@ -629,6 +629,14 @@ const (
// Enables controlling pod ranking on replicaset scale-down.
PodDeletionCost featuregate.Feature = "PodDeletionCost"
// owner: @mimowo
// kep: http://kep.k8s.io/3329
// alpha: v1.25
//
// Enables support for appending a dedicated pod condition indicating that
// the pod is being deleted due to a disruption.
PodDisruptionConditions featuregate.Feature = "PodDisruptionConditions"
// owner: @ddebroy
// alpha: v1.25
//
@@ -1005,6 +1013,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
PodDeletionCost: {Default: true, PreRelease: featuregate.Beta},
PodDisruptionConditions: {Default: false, PreRelease: featuregate.Alpha},
PodHasNetworkCondition: {Default: false, PreRelease: featuregate.Alpha},
PodOverhead: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26

View File

@@ -684,7 +684,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes))
if err != nil {

View File

@@ -32,12 +32,14 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/apiserver/pkg/util/feature"
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
"k8s.io/client-go/util/retry"
pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
podutil "k8s.io/kubernetes/pkg/api/pod"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/features"
)
const (
@@ -153,11 +155,10 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje
}
err = retry.OnError(EvictionsRetry, shouldRetry, func() error {
obj, err = r.store.Get(ctx, eviction.Name, &metav1.GetOptions{})
pod, err = getPod(r, ctx, eviction.Name)
if err != nil {
return err
}
pod = obj.(*api.Pod)
// Evicting a terminal pod should result in direct deletion of pod as it already caused disruption by the time we are evicting.
// There is no need to check for pdb.
@@ -178,7 +179,7 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje
deleteOptions = deleteOptions.DeepCopy()
setPreconditionsResourceVersion(deleteOptions, &pod.ResourceVersion)
}
_, _, err = r.store.Delete(ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions)
err = addConditionAndDeletePod(r, ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions)
if err != nil {
return err
}
@@ -276,7 +277,7 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje
}
// Try the delete
_, _, err = r.store.Delete(ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions)
err = addConditionAndDeletePod(r, ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions)
if err != nil {
if errors.IsConflict(err) && updateDeletionOptions &&
(originalDeleteOptions.Preconditions == nil || originalDeleteOptions.Preconditions.ResourceVersion == nil) {
@@ -292,6 +293,41 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje
return &metav1.Status{Status: metav1.StatusSuccess}, nil
}
func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, validation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
pod, err := getPod(r, ctx, name)
if err != nil {
return err
}
conditionAppender := func(_ context.Context, newObj, _ runtime.Object) (runtime.Object, error) {
podObj := newObj.(*api.Pod)
podutil.UpdatePodCondition(&podObj.Status, &api.PodCondition{
Type: api.AlphaNoCompatGuaranteeDisruptionTarget,
Status: api.ConditionTrue,
Reason: "EvictionByEvictionAPI",
Message: "Eviction API: evicting",
})
return podObj, nil
}
podCopyUpdated := rest.DefaultUpdatedObjectInfo(pod, conditionAppender)
if _, _, err = r.store.Update(ctx, name, podCopyUpdated, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
return err
}
}
_, _, err := r.store.Delete(ctx, name, rest.ValidateAllObjectFunc, options)
return err
}
func getPod(r *EvictionREST, ctx context.Context, name string) (*api.Pod, error) {
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, err
}
return obj.(*api.Pod), nil
}
func setPreconditionsResourceVersion(deleteOptions *metav1.DeleteOptions, resourceVersion *string) {
if deleteOptions.Preconditions == nil {
deleteOptions.Preconditions = &metav1.Preconditions{}

View File

@@ -105,7 +105,7 @@ func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGet
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
LegacyBinding: &LegacyBindingREST{bindingREST},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Eviction: newEvictionStorage(&statusStore, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},

View File

@@ -28,11 +28,14 @@ import (
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
@@ -340,9 +343,26 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
} else if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
} else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
condition := &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "PreemptionByKubeScheduler",
Message: "Kube-scheduler: preempting",
}
newStatus := pod.Status.DeepCopy()
if apipod.UpdatePodCondition(newStatus, condition) {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
}
}
if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
pod.Namespace, pod.Name, c.Name())

View File

@@ -30,7 +30,7 @@ import (
)
// PatchPodStatus patches pod status. It returns true and avoids an update if the patch contains no changes.
func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, bool, error) {
func PatchPodStatus(ctx context.Context, c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, bool, error) {
patchBytes, unchanged, err := preparePatchBytesForPodStatus(namespace, name, uid, oldPodStatus, newPodStatus)
if err != nil {
return nil, nil, false, err
@@ -39,7 +39,7 @@ func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID
return nil, patchBytes, true, nil
}
updatedPod, err := c.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
updatedPod, err := c.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
return nil, nil, false, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err)
}

View File

@@ -88,7 +88,7 @@ func TestPatchPodStatus(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
_, patchBytes, unchanged, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus()))
_, patchBytes, unchanged, err := PatchPodStatus(context.TODO(), client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus()))
if err != nil {
t.Errorf("unexpected error: %v", err)
}