Merge pull request #105527 from rphillips/fixes/filter_terminated_pods
kubelet: set terminated podWorker status for terminated pods
This commit is contained in:
		| @@ -28,6 +28,7 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| 	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" | ||||
| 	"k8s.io/klog/v2" | ||||
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | ||||
| 	"k8s.io/kubernetes/pkg/kubelet/events" | ||||
| @@ -501,6 +502,22 @@ func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) | ||||
| 	return ok | ||||
| } | ||||
|  | ||||
| func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { | ||||
| 	runningContainers := 0 | ||||
| 	runningSandboxes := 0 | ||||
| 	for _, container := range status.ContainerStatuses { | ||||
| 		if container.State == kubecontainer.ContainerStateRunning { | ||||
| 			runningContainers++ | ||||
| 		} | ||||
| 	} | ||||
| 	for _, sb := range status.SandboxStatuses { | ||||
| 		if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY { | ||||
| 			runningSandboxes++ | ||||
| 		} | ||||
| 	} | ||||
| 	return runningContainers == 0 && runningSandboxes == 0 | ||||
| } | ||||
|  | ||||
| // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable, | ||||
| // terminating, or terminated, and will transition to terminating if deleted on the apiserver, it is | ||||
| // discovered to have a terminal phase (Succeeded or Failed), or if it is evicted by the kubelet. | ||||
| @@ -536,6 +553,22 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { | ||||
| 		status = &podSyncStatus{ | ||||
| 			syncedAt: now, | ||||
| 		} | ||||
| 		// if this pod is being synced for the first time, we need to make sure it is an active pod | ||||
| 		if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) { | ||||
| 			// check to see if the pod is not running and the pod is terminal. | ||||
| 			// If this succeeds then record in the podWorker that it is terminated. | ||||
| 			if statusCache, err := p.podCache.Get(pod.UID); err == nil { | ||||
| 				if isPodStatusCacheTerminal(statusCache) { | ||||
| 					status = &podSyncStatus{ | ||||
| 						terminatedAt:       now, | ||||
| 						terminatingAt:      now, | ||||
| 						syncedAt:           now, | ||||
| 						startedTerminating: true, | ||||
| 						finished:           true, | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		p.podSyncStatuses[uid] = status | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -140,6 +140,18 @@ func newPod(uid, name string) *v1.Pod { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod { | ||||
| 	return &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			UID:  types.UID(uid), | ||||
| 			Name: name, | ||||
| 		}, | ||||
| 		Status: v1.PodStatus{ | ||||
| 			Phase: phase, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // syncPodRecord is a record of a sync pod call | ||||
| type syncPodRecord struct { | ||||
| 	name       string | ||||
| @@ -273,6 +285,36 @@ func TestUpdatePod(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestUpdatePodWithTerminatedPod(t *testing.T) { | ||||
| 	podWorkers, _ := createPodWorkers() | ||||
| 	terminatedPod := newPodWithPhase("0000-0000-0000", "done-pod", v1.PodSucceeded) | ||||
| 	runningPod := &kubecontainer.Pod{ID: "0000-0000-0001", Name: "done-pod"} | ||||
| 	pod := newPod("0000-0000-0002", "running-pod") | ||||
|  | ||||
| 	podWorkers.UpdatePod(UpdatePodOptions{ | ||||
| 		Pod:        terminatedPod, | ||||
| 		UpdateType: kubetypes.SyncPodCreate, | ||||
| 	}) | ||||
| 	podWorkers.UpdatePod(UpdatePodOptions{ | ||||
| 		Pod:        pod, | ||||
| 		UpdateType: kubetypes.SyncPodCreate, | ||||
| 	}) | ||||
| 	podWorkers.UpdatePod(UpdatePodOptions{ | ||||
| 		UpdateType: kubetypes.SyncPodKill, | ||||
| 		RunningPod: runningPod, | ||||
| 	}) | ||||
|  | ||||
| 	if podWorkers.IsPodKnownTerminated(pod.UID) == true { | ||||
| 		t.Errorf("podWorker state should not be terminated") | ||||
| 	} | ||||
| 	if podWorkers.IsPodKnownTerminated(terminatedPod.UID) == false { | ||||
| 		t.Errorf("podWorker state should be terminated") | ||||
| 	} | ||||
| 	if podWorkers.IsPodKnownTerminated(runningPod.ID) == true { | ||||
| 		t.Errorf("podWorker state should not be marked terminated for a running pod") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestUpdatePodForRuntimePod(t *testing.T) { | ||||
| 	podWorkers, processed := createPodWorkers() | ||||
|  | ||||
|   | ||||
| @@ -28,8 +28,10 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	"k8s.io/apimachinery/pkg/api/resource" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/kubernetes/test/e2e/framework" | ||||
| 	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" | ||||
| 	testutils "k8s.io/kubernetes/test/utils" | ||||
| 	imageutils "k8s.io/kubernetes/test/utils/image" | ||||
|  | ||||
| @@ -49,7 +51,7 @@ func waitForPods(f *framework.Framework, podCount int, timeout time.Duration) (r | ||||
|  | ||||
| 		runningPods = []*v1.Pod{} | ||||
| 		for _, pod := range podList.Items { | ||||
| 			if r, err := testutils.PodRunningReady(&pod); err != nil || !r { | ||||
| 			if r, err := testutils.PodRunningReadyOrSucceeded(&pod); err != nil || !r { | ||||
| 				continue | ||||
| 			} | ||||
| 			runningPods = append(runningPods, &pod) | ||||
| @@ -62,7 +64,7 @@ func waitForPods(f *framework.Framework, podCount int, timeout time.Duration) (r | ||||
| 	return runningPods | ||||
| } | ||||
|  | ||||
| var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive] [NodeFeature:ContainerRuntimeRestart]", func() { | ||||
| var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive]", func() { | ||||
| 	const ( | ||||
| 		// Saturate the node. It's not necessary that all these pods enter | ||||
| 		// Running/Ready, because we don't know the number of cores in the | ||||
| @@ -145,4 +147,70 @@ var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive] [NodeFeature:Container | ||||
| 			}) | ||||
| 		}) | ||||
| 	}) | ||||
|  | ||||
| 	ginkgo.Context("Kubelet", func() { | ||||
| 		ginkgo.It("should correctly account for terminated pods after restart", func() { | ||||
| 			node := getLocalNode(f) | ||||
| 			cpus := node.Status.Allocatable[v1.ResourceCPU] | ||||
| 			numCpus := int((&cpus).Value()) | ||||
| 			if numCpus < 1 { | ||||
| 				e2eskipper.Skipf("insufficient CPU available for kubelet restart test") | ||||
| 			} | ||||
|  | ||||
| 			// create as many restartNever pods as there are allocatable CPU | ||||
| 			// nodes; if they are not correctly accounted for as terminated | ||||
| 			// later, this will fill up all node capacity | ||||
| 			podCountRestartNever := numCpus | ||||
| 			ginkgo.By(fmt.Sprintf("creating %d RestartNever pods on node", podCountRestartNever)) | ||||
| 			restartNeverPods := newTestPods(podCountRestartNever, false, imageutils.GetE2EImage(imageutils.BusyBox), "restart-kubelet-test") | ||||
| 			for _, pod := range restartNeverPods { | ||||
| 				pod.Spec.RestartPolicy = "Never" | ||||
| 				pod.Spec.Containers[0].Command = []string{"echo", "hi"} | ||||
| 				pod.Spec.Containers[0].Resources.Limits = v1.ResourceList{ | ||||
| 					v1.ResourceCPU: resource.MustParse("1"), | ||||
| 				} | ||||
| 			} | ||||
| 			createBatchPodWithRateControl(f, restartNeverPods, podCreationInterval) | ||||
| 			defer deletePodsSync(f, restartNeverPods) | ||||
|  | ||||
| 			completedPods := waitForPods(f, podCountRestartNever, time.Minute) | ||||
| 			if len(completedPods) < podCountRestartNever { | ||||
| 				framework.Failf("Failed to run sufficient restartNever pods, got %d but expected %d", len(completedPods), podCountRestartNever) | ||||
| 			} | ||||
|  | ||||
| 			podCountRestartAlways := (numCpus / 2) + 1 | ||||
| 			ginkgo.By(fmt.Sprintf("creating %d RestartAlways pods on node", podCountRestartAlways)) | ||||
| 			restartAlwaysPods := newTestPods(podCountRestartAlways, false, imageutils.GetPauseImageName(), "restart-kubelet-test") | ||||
| 			for _, pod := range restartAlwaysPods { | ||||
| 				pod.Spec.Containers[0].Resources.Limits = v1.ResourceList{ | ||||
| 					v1.ResourceCPU: resource.MustParse("1"), | ||||
| 				} | ||||
| 			} | ||||
| 			createBatchPodWithRateControl(f, restartAlwaysPods, podCreationInterval) | ||||
| 			defer deletePodsSync(f, restartAlwaysPods) | ||||
|  | ||||
| 			numAllPods := podCountRestartNever + podCountRestartAlways | ||||
| 			allPods := waitForPods(f, numAllPods, startTimeout) | ||||
| 			if len(allPods) < numAllPods { | ||||
| 				framework.Failf("Failed to run sufficient restartAlways pods, got %d but expected %d", len(allPods), numAllPods) | ||||
| 			} | ||||
|  | ||||
| 			ginkgo.By("killing and restarting kubelet") | ||||
| 			// We want to kill the kubelet rather than a graceful restart | ||||
| 			startKubelet := stopKubelet() | ||||
| 			startKubelet() | ||||
|  | ||||
| 			// If this test works correctly, each of these pods will exit | ||||
| 			// with no issue. But if accounting breaks, pods scheduled after | ||||
| 			// restart may think these old pods are consuming CPU and we | ||||
| 			// will get an OutOfCpu error. | ||||
| 			ginkgo.By("verifying restartNever pods succeed and restartAlways pods stay running") | ||||
| 			for start := time.Now(); time.Since(start) < startTimeout; time.Sleep(10 * time.Second) { | ||||
| 				postRestartRunningPods := waitForPods(f, numAllPods, time.Minute) | ||||
| 				if len(postRestartRunningPods) < numAllPods { | ||||
| 					framework.Failf("less pods are running after node restart, got %d but expected %d", len(postRestartRunningPods), numAllPods) | ||||
| 				} | ||||
| 			} | ||||
| 		}) | ||||
| 	}) | ||||
| }) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot