diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2b96020d722..ab255e5216f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1452,24 +1452,36 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } // syncPod is the transaction script for the sync of a single pod (setting up) -// a pod. The reverse (teardown) is handled in syncTerminatingPod and -// syncTerminatedPod. If syncPod exits without error, then the pod runtime -// state is in sync with the desired configuration state (pod is running). -// If syncPod exits with a transient error, the next invocation of syncPod -// is expected to make progress towards reaching the runtime state. +// a pod. This method is reentrant and expected to converge a pod towards the +// desired state of the spec. The reverse (teardown) is handled in +// syncTerminatingPod and syncTerminatedPod. If syncPod exits without error, +// then the pod runtime state is in sync with the desired configuration state +// (pod is running). If syncPod exits with a transient error, the next +// invocation of syncPod is expected to make progress towards reaching the +// runtime state. syncPod exits with isTerminal when the pod was detected to +// have reached a terminal lifecycle phase due to container exits (for +// RestartNever or RestartOnFailure) and the next method invoked will by +// syncTerminatingPod. // // Arguments: // -// o - the SyncPodOptions for this invocation +// updateType - whether this is a create (first time) or an update, should +// only be used for metrics since this method must be reentrant +// pod - the pod that is being set up +// mirrorPod - the mirror pod known to the kubelet for this pod, if any +// podStatus - the most recent pod status observed for this pod which can +// be used to determine the set of actions that should be taken during +// this loop of syncPod // // The workflow is: -// * Kill the pod immediately if update type is SyncPodKill // * If the pod is being created, record pod worker start latency // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod // * If the pod is being seen as running for the first time, record pod // start latency // * Update the status of the pod in the status manager -// * Kill the pod if it should not be running due to soft admission +// * Stop the pod's containers if it should not be running due to soft +// admission +// * Ensure any background tracking for a runnable pod is started // * Create a mirror pod if the pod is a static pod, and does not // already have a mirror pod // * Create the data directories for the pod if they do not exist @@ -1483,10 +1495,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // // This operation writes all events that are dispatched in order to provide // the most accurate information possible about an error situation to aid debugging. -// Callers should not throw an event if this operation returns an error. -func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +// Callers should not write an event if this operation returns an error. +func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) - defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) + defer func() { + klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal) + }() // Latency measurements for the main workflow are relative to the // first time the pod was seen by kubelet. @@ -1518,11 +1532,17 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType for _, ipInfo := range apiPodStatus.PodIPs { podStatus.IPs = append(podStatus.IPs, ipInfo.IP) } - if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 { podStatus.IPs = []string{apiPodStatus.PodIP} } + // If the pod is terminal, we don't need to continue to setup the pod + if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed { + kl.statusManager.SetPodStatus(pod, apiPodStatus) + isTerminal = true + return isTerminal, nil + } + // If the pod should not be running, we request the pod's containers be stopped. This is not the same // as termination (we want to stop the pod, but potentially restart it later if soft admission allows // it later). Set the status and phase appropriately @@ -1572,13 +1592,13 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType // Return an error to signal that the sync loop should back off. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } - return syncErr + return false, syncErr } // If the network plugin is not ready, only start the pod if it uses the host network if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err) - return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) + return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) } // ensure the kubelet knows about referenced secrets or configmaps used by the pod @@ -1635,7 +1655,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType } if err := pcm.EnsureExists(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) - return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) + return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } } } @@ -1676,7 +1696,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if err := kl.makePodDataDirs(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err) klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) - return err + return false, err } // Volume manager will not mount volumes for terminating pods @@ -1686,7 +1706,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) - return err + return false, err } } @@ -1702,14 +1722,14 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { // Do not record an event here, as we keep all event logging for sync pod failures // local to container runtime, so we get better errors. - return err + return false, err } } - return nil + return false, nil } - return nil + return false, nil } // syncTerminatingPod is expected to terminate all running containers in a pod. Once this method diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 2808ad02b2c..432db4add9f 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -915,6 +915,12 @@ func countRunningContainerStatus(status v1.PodStatus) int { return runningContainers } +// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running +// containers. This returns false if the pod has not yet been started or the pod is unknown. +func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + return kl.podWorkers.CouldHaveRunningContainers(pod.UID) +} + // PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have // been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server. func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { @@ -1424,7 +1430,7 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase { } // generateAPIPodStatus creates the final API pod status for a pod, given the -// internal pod status. +// internal pod status. This method should only be called from within sync*Pod methods. func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus { klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod)) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index d7fe88c446d..e8c2d06f1df 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -505,9 +505,9 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { kubelet := testKubelet.kubelet var got bool kubelet.podWorkers = &fakePodWorkers{ - syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { got = true - return nil + return false, nil }, cache: kubelet.podCache, t: t, @@ -584,9 +584,9 @@ func TestDispatchWorkOfActivePod(t *testing.T) { kubelet := testKubelet.kubelet var got bool kubelet.podWorkers = &fakePodWorkers{ - syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { got = true - return nil + return false, nil }, cache: kubelet.podCache, t: t, @@ -1300,8 +1300,11 @@ func TestCreateMirrorPod(t *testing.T) { pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" pods := []*v1.Pod{pod} kl.podManager.SetPods(pods) - err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err) + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } podFullName := kubecontainer.GetPodFullName(pod) assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName) assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods()) @@ -1332,8 +1335,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { pods := []*v1.Pod{pod, mirrorPod} kl.podManager.SetPods(pods) - err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) + isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) assert.NoError(t, err) + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } name := kubecontainer.GetPodFullName(pod) creates, deletes := manager.GetCounts(name) if creates != 1 || deletes != 1 { @@ -1489,13 +1495,19 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) { }) kubelet.podManager.SetPods([]*v1.Pod{pod}) - err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error") + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource pod.Spec.HostNetwork = true - err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error") + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } } func TestFilterOutInactivePods(t *testing.T) { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index d8dfaa4034f..5632745e060 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -218,7 +218,7 @@ type PodWorkers interface { } // the function to invoke to perform a sync (reconcile the kubelet state to the desired shape of the pod) -type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error +type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) // the function to invoke to terminate a pod (ensure no running processes are present) type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error @@ -886,6 +886,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { } klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) + var isTerminal bool err := func() error { // The worker is responsible for ensuring the sync method sees the appropriate // status updates on resyncs (the result of the last sync), transitions to @@ -932,13 +933,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn) default: - err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) + isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) } lastSyncTime = time.Now() return err }() + var phaseTransition bool switch { case err == context.Canceled: // when the context is cancelled we expect an update to already be queued @@ -969,10 +971,17 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { } // otherwise we move to the terminating phase p.completeTerminating(pod) + phaseTransition = true + + case isTerminal: + // if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating + klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) + p.completeSync(pod) + phaseTransition = true } - // queue a retry for errors if necessary, then put the next event in the channel if any - p.completeWork(pod, err) + // queue a retry if necessary, then put the next event in the channel if any + p.completeWork(pod, phaseTransition, err) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) } @@ -1003,6 +1012,33 @@ func (p *podWorkers) acknowledgeTerminating(pod *v1.Pod) PodStatusFunc { return nil } +// completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should +// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways +// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by +// UpdatePod. +func (p *podWorkers) completeSync(pod *v1.Pod) { + p.podLock.Lock() + defer p.podLock.Unlock() + + klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID) + + if status, ok := p.podSyncStatuses[pod.UID]; ok { + if status.terminatingAt.IsZero() { + status.terminatingAt = time.Now() + } else { + klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + } + status.startedTerminating = true + } + + p.lastUndeliveredWorkUpdate[pod.UID] = podWork{ + WorkType: TerminatingPodWork, + Options: UpdatePodOptions{ + Pod: pod, + }, + } +} + // completeTerminating is invoked when syncTerminatingPod completes successfully, which means // no container is running, no container will be started in the future, and we are ready for // cleanup. This updates the termination state which prevents future syncs and will ensure @@ -1115,9 +1151,11 @@ func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) { // completeWork requeues on error or the next sync interval and then immediately executes any pending // work. -func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) { +func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) { // Requeue the last update if the last sync returned error. switch { + case phaseTransition: + p.workQueue.Enqueue(pod.UID, 0) case syncErr == nil: // No error; requeue at the regular resync interval. p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor)) diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index afde2b0d789..c47947fa579 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -18,7 +18,6 @@ package kubelet import ( "context" - "flag" "reflect" "strconv" "sync" @@ -31,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -48,6 +46,7 @@ type fakePodWorkers struct { t TestingInterface triggeredDeletion []types.UID + triggeredTerminal []types.UID statusLock sync.Mutex running map[types.UID]bool @@ -79,9 +78,13 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) { case kubetypes.SyncPodKill: f.triggeredDeletion = append(f.triggeredDeletion, uid) default: - if err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status); err != nil { + isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status) + if err != nil { f.t.Errorf("Unexpected error: %v", err) } + if isTerminal { + f.triggeredTerminal = append(f.triggeredTerminal, uid) + } } } @@ -249,7 +252,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { fakeCache := containertest.NewFakeCache(fakeRuntime) fakeQueue := &fakeQueue{} w := newPodWorkers( - func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { func() { lock.Lock() defer lock.Unlock() @@ -259,7 +262,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { updateType: updateType, }) }() - return nil + return false, nil }, func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { func() { @@ -530,9 +533,84 @@ func newUIDSet(uids ...types.UID) sets.String { return set } -func init() { - klog.InitFlags(nil) - flag.Lookup("v").Value.Set("5") +type terminalPhaseSync struct { + lock sync.Mutex + fn syncPodFnType + terminal sets.String +} + +func (s *terminalPhaseSync) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { + isTerminal, err := s.fn(ctx, updateType, pod, mirrorPod, podStatus) + if err != nil { + return false, err + } + if !isTerminal { + s.lock.Lock() + defer s.lock.Unlock() + isTerminal = s.terminal.Has(string(pod.UID)) + } + return isTerminal, nil +} + +func (s *terminalPhaseSync) SetTerminal(uid types.UID) { + s.lock.Lock() + defer s.lock.Unlock() + s.terminal.Insert(string(uid)) +} + +func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync { + return &terminalPhaseSync{ + fn: fn, + terminal: sets.NewString(), + } +} + +func TestTerminalPhaseTransition(t *testing.T) { + podWorkers, _ := createPodWorkers() + var channels WorkChannel + podWorkers.workerChannelFn = channels.Intercept + terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.syncPodFn) + podWorkers.syncPodFn = terminalPhaseSyncer.SyncPod + + // start pod + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod running + pod1 := podWorkers.podSyncStatuses[types.UID("1")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + + // send another update to the pod + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod still running + pod1 = podWorkers.podSyncStatuses[types.UID("1")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + + // the next sync should result in a transition to terminal + terminalPhaseSyncer.SetTerminal(types.UID("1")) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod terminating + pod1 = podWorkers.podSyncStatuses[types.UID("1")] + if !pod1.IsTerminationRequested() || !pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } } func TestStaticPodExclusion(t *testing.T) { @@ -1203,15 +1281,15 @@ type simpleFakeKubelet struct { wg sync.WaitGroup } -func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus - return nil + return false, nil } -func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus kl.wg.Done() - return nil + return false, nil } func (kl *simpleFakeKubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 19b8a4f6a7b..00f3022af5a 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -112,9 +112,10 @@ func (kl *Kubelet) runOnce(pods []*v1.Pod, retryDelay time.Duration) (results [] // runPod runs a single pod and wait until all containers are running. func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { + var isTerminal bool delay := retryDelay retry := 0 - for { + for !isTerminal { status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) if err != nil { return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err) @@ -131,7 +132,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - if err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { + if isTerminal, err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err) } if retry >= runOnceMaxRetries { @@ -143,6 +144,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { retry++ delay *= runOnceRetryDelayBackoff } + return nil } // isPodRunning returns true if all containers of a manifest are running. diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 84c8c6f92cf..8e956350882 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -83,8 +83,10 @@ type PodStatusProvider interface { // PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted. type PodDeletionSafetyProvider interface { - // A function which returns true if the pod can safely be deleted + // PodResourcesAreReclaimed returns true if the pod can safely be deleted. PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool + // PodCouldHaveRunningContainers returns true if the pod could have running containers. + PodCouldHaveRunningContainers(pod *v1.Pod) bool } // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with @@ -335,19 +337,26 @@ func (m *manager) TerminatePod(pod *v1.Pod) { oldStatus = &cachedStatus.status } status := *oldStatus.DeepCopy() - for i := range status.ContainerStatuses { - if status.ContainerStatuses[i].State.Terminated != nil { - continue - } - status.ContainerStatuses[i].State = v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{ - Reason: "ContainerStatusUnknown", - Message: "The container could not be located when the pod was terminated", - ExitCode: 137, - }, + + // once a pod has initialized, any missing status is treated as a failure + if hasPodInitialized(pod) { + for i := range status.ContainerStatuses { + if status.ContainerStatuses[i].State.Terminated != nil { + continue + } + status.ContainerStatuses[i].State = v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Reason: "ContainerStatusUnknown", + Message: "The container could not be located when the pod was terminated", + ExitCode: 137, + }, + } } } - for i := range status.InitContainerStatuses { + + // all but the final suffix of init containers which have no evidence of a container start are + // marked as failed containers + for i := range initializedContainers(status.InitContainerStatuses) { if status.InitContainerStatuses[i].State.Terminated != nil { continue } @@ -364,6 +373,49 @@ func (m *manager) TerminatePod(pod *v1.Pod) { m.updateStatusInternal(pod, status, true) } +// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which +// implies those containers should not be transitioned to terminated status. +func hasPodInitialized(pod *v1.Pod) bool { + // a pod without init containers is always initialized + if len(pod.Spec.InitContainers) == 0 { + return true + } + // if any container has ever moved out of waiting state, the pod has initialized + for _, status := range pod.Status.ContainerStatuses { + if status.LastTerminationState.Terminated != nil || status.State.Waiting == nil { + return true + } + } + // if the last init container has ever completed with a zero exit code, the pod is initialized + if l := len(pod.Status.InitContainerStatuses); l > 0 { + container := pod.Status.InitContainerStatuses[l-1] + if state := container.LastTerminationState; state.Terminated != nil && state.Terminated.ExitCode == 0 { + return true + } + if state := container.State; state.Terminated != nil && state.Terminated.ExitCode == 0 { + return true + } + } + // otherwise the pod has no record of being initialized + return false +} + +// initializedContainers returns all status except for suffix of containers that are in Waiting +// state, which is the set of containers that have attempted to start at least once. If all containers +// are Watiing, the first container is always returned. +func initializedContainers(containers []v1.ContainerStatus) []v1.ContainerStatus { + for i := len(containers) - 1; i >= 0; i-- { + if containers[i].State.Waiting == nil || containers[i].LastTerminationState.Terminated != nil { + return containers[0 : i+1] + } + } + // always return at least one container + if len(containers) > 0 { + return containers[0:1] + } + return nil +} + // checkContainerStateTransition ensures that no container is trying to transition // from a terminated to non-terminated state, which is illegal and indicates a // logical error in the kubelet. @@ -619,8 +671,9 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { return } - oldStatus := pod.Status.DeepCopy() - newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) + mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod)) + + newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes)) if err != nil { @@ -630,7 +683,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { if unchanged { klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) } else { - klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", status.status) + klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) pod = newPod } @@ -771,25 +824,49 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { return status } -// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions -// not owned by kubelet is preserved from oldPodStatus -func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus { - podConditions := []v1.PodCondition{} +// mergePodStatus merges oldPodStatus and newPodStatus to preserve where pod conditions +// not owned by kubelet and to ensure terminal phase transition only happens after all +// running containers have terminated. This method does not modify the old status. +func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningContainers bool) v1.PodStatus { + podConditions := make([]v1.PodCondition, 0, len(oldPodStatus.Conditions)+len(newPodStatus.Conditions)) + for _, c := range oldPodStatus.Conditions { if !kubetypes.PodConditionByKubelet(c.Type) { podConditions = append(podConditions, c) } } - for _, c := range newPodStatus.Conditions { if kubetypes.PodConditionByKubelet(c.Type) { podConditions = append(podConditions, c) } } newPodStatus.Conditions = podConditions + + // Delay transitioning a pod to a terminal status unless the pod is actually terminal. + // The Kubelet should never transition a pod to terminal status that could have running + // containers and thus actively be leveraging exclusive resources. Note that resources + // like volumes are reconciled by a subsystem in the Kubelet and will converge if a new + // pod reuses an exclusive resource (unmount -> free -> mount), which means we do not + // need wait for those resources to be detached by the Kubelet. In general, resources + // the Kubelet exclusively owns must be released prior to a pod being reported terminal, + // while resources that have participanting components above the API use the pod's + // transition to a terminal phase (or full deletion) to release those resources. + if !isPhaseTerminal(oldPodStatus.Phase) && isPhaseTerminal(newPodStatus.Phase) { + if couldHaveRunningContainers { + newPodStatus.Phase = oldPodStatus.Phase + newPodStatus.Reason = oldPodStatus.Reason + newPodStatus.Message = oldPodStatus.Message + } + } + return newPodStatus } +// isPhaseTerminal returns true if the pod's phase is terminal. +func isPhaseTerminal(phase v1.PodPhase) bool { + return phase == v1.PodFailed || phase == v1.PodSucceeded +} + // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile func NeedToReconcilePodReadiness(pod *v1.Pod) bool { if len(pod.Spec.ReadinessGates) == 0 { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 855fac52ecd..9ecdf67b300 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -648,11 +649,14 @@ func TestTerminatePodWaiting(t *testing.T) { t.Logf("we expect the container statuses to have changed to terminated") newStatus := expectPodStatus(t, syncer, testPod) - for i := range newStatus.ContainerStatuses { - assert.False(t, newStatus.ContainerStatuses[i].State.Terminated == nil, "expected containers to be terminated") + for _, container := range newStatus.ContainerStatuses { + assert.False(t, container.State.Terminated == nil, "expected containers to be terminated") } - for i := range newStatus.InitContainerStatuses { - assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated") + for _, container := range newStatus.InitContainerStatuses[:2] { + assert.False(t, container.State.Terminated == nil, "expected init containers to be terminated") + } + for _, container := range newStatus.InitContainerStatuses[2:] { + assert.False(t, container.State.Waiting == nil, "expected init containers to be waiting") } expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}} @@ -662,8 +666,8 @@ func TestTerminatePodWaiting(t *testing.T) { if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) { t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) } - if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, expectUnknownState) { - t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, expectUnknownState)) + if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) { + t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State)) } if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) { t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState)) @@ -680,6 +684,308 @@ func TestTerminatePodWaiting(t *testing.T) { assert.Equal(t, newStatus.Message, firstStatus.Message) } +func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { + newPod := func(initContainers, containers int, fns ...func(*v1.Pod)) *v1.Pod { + pod := getTestPod() + for i := 0; i < initContainers; i++ { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{ + Name: fmt.Sprintf("init-%d", i), + }) + } + for i := 0; i < containers; i++ { + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ + Name: fmt.Sprintf("%d", i), + }) + } + pod.Status.StartTime = &metav1.Time{Time: time.Unix(1, 0).UTC()} + for _, fn := range fns { + fn(pod) + } + return pod + } + expectTerminatedUnknown := func(t *testing.T, state v1.ContainerState) { + t.Helper() + if state.Terminated == nil || state.Running != nil || state.Waiting != nil { + t.Fatalf("unexpected state: %#v", state) + } + if state.Terminated.ExitCode != 137 || state.Terminated.Reason != "ContainerStatusUnknown" || len(state.Terminated.Message) == 0 { + t.Fatalf("unexpected terminated state: %#v", state.Terminated) + } + } + expectTerminated := func(t *testing.T, state v1.ContainerState, exitCode int32) { + t.Helper() + if state.Terminated == nil || state.Running != nil || state.Waiting != nil { + t.Fatalf("unexpected state: %#v", state) + } + if state.Terminated.ExitCode != exitCode { + t.Fatalf("unexpected terminated state: %#v", state.Terminated) + } + } + expectWaiting := func(t *testing.T, state v1.ContainerState) { + t.Helper() + if state.Terminated != nil || state.Running != nil || state.Waiting == nil { + t.Fatalf("unexpected state: %#v", state) + } + } + + testCases := []struct { + name string + pod *v1.Pod + updateFn func(*v1.Pod) + expectFn func(t *testing.T, status v1.PodStatus) + }{ + {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })}, + {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodRunning })}, + {pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, + } + })}, + { + name: "last termination state set", + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "0", + LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}, + State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, + }, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + container := status.ContainerStatuses[0] + if container.LastTerminationState.Terminated.ExitCode != 2 { + t.Fatalf("unexpected last state: %#v", container.LastTerminationState) + } + expectTerminatedUnknown(t, container.State) + }, + }, + { + name: "no previous state", + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults the first init container", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults only the first init container", + pod: newPod(2, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectWaiting(t, status.InitContainerStatuses[1].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults gaps", + pod: newPod(4, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}, + {Name: "init-3", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 1) + expectWaiting(t, status.InitContainerStatuses[3].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "failed last container is uninitialized", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 1) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "successful last container is initialized", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 0) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "successful last previous container is initialized, and container state is overwritten", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + { + Name: "init-2", + LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}, + State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, + }, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[2].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "running container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "evidence of terminated container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminated(t, status.ContainerStatuses[0].State, 0) + }, + }, + { + name: "evidence of previously terminated container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager()) + syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) + + original := tc.pod.DeepCopy() + syncer.SetPodStatus(original, original.Status) + + copied := tc.pod.DeepCopy() + if tc.updateFn != nil { + tc.updateFn(copied) + } + expected := copied.DeepCopy() + + syncer.TerminatePod(copied) + status := expectPodStatus(t, syncer, tc.pod.DeepCopy()) + if tc.expectFn != nil { + tc.expectFn(t, status) + return + } + if !reflect.DeepEqual(expected.Status, status) { + diff := cmp.Diff(expected.Status, status) + if len(diff) == 0 { + t.Fatalf("diff returned no results for failed DeepEqual: %#v != %#v", expected.Status, status) + } + t.Fatalf("unexpected status: %s", diff) + } + }) + } +} + func TestSetContainerReadiness(t *testing.T) { cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} @@ -957,6 +1263,7 @@ func TestDeletePods(t *testing.T) { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) + m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = true m.podManager.AddPod(pod) status := getRandomPodStatus() now := metav1.Now() @@ -966,6 +1273,22 @@ func TestDeletePods(t *testing.T) { verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) } +func TestDeletePodWhileReclaiming(t *testing.T) { + pod := getTestPod() + t.Logf("Set the deletion timestamp.") + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} + client := fake.NewSimpleClientset(pod) + m := newTestManager(client) + m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = false + m.podManager.AddPod(pod) + status := getRandomPodStatus() + now := metav1.Now() + status.StartTime = &now + m.SetPodStatus(pod, status) + t.Logf("Expect to see a delete action.") + verifyActions(t, m, []core.Action{getAction(), patchAction()}) +} + func TestDoNotDeleteMirrorPods(t *testing.T) { staticPod := getTestPod() staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} @@ -1070,19 +1393,22 @@ func deleteAction() core.DeleteAction { func TestMergePodStatus(t *testing.T) { useCases := []struct { - desc string - oldPodStatus func(input v1.PodStatus) v1.PodStatus - newPodStatus func(input v1.PodStatus) v1.PodStatus - expectPodStatus v1.PodStatus + desc string + hasRunningContainers bool + oldPodStatus func(input v1.PodStatus) v1.PodStatus + newPodStatus func(input v1.PodStatus) v1.PodStatus + expectPodStatus v1.PodStatus }{ { "no change", + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input }, getPodStatus(), }, { "readiness changes", + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { input.Conditions[0].Status = v1.ConditionFalse @@ -1105,6 +1431,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1134,6 +1461,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition and readiness changes", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1166,6 +1494,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition changes", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1199,13 +1528,77 @@ func TestMergePodStatus(t *testing.T) { Message: "Message", }, }, + { + "phase is transitioning to failed and no containers running", + false, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodRunning + input.Reason = "Unknown" + input.Message = "Message" + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Reason = "Evicted" + input.Message = "Was Evicted" + return input + }, + v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Reason: "Evicted", + Message: "Was Evicted", + }, + }, + { + "phase is transitioning to failed and containers running", + true, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodRunning + input.Reason = "Unknown" + input.Message = "Message" + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Reason = "Evicted" + input.Message = "Was Evicted" + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Reason: "Unknown", + Message: "Message", + }, + }, } for _, tc := range useCases { - output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus())) - if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { - t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output) - } + t.Run(tc.desc, func(t *testing.T) { + output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()), tc.hasRunningContainers) + if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { + t.Fatalf("unexpected output: %s", cmp.Diff(tc.expectPodStatus, output)) + } + }) } } diff --git a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go index 174c6e17515..98c3b226c0b 100644 --- a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go +++ b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go @@ -16,13 +16,18 @@ limitations under the License. package testing -import "k8s.io/api/core/v1" +import v1 "k8s.io/api/core/v1" // FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test. -type FakePodDeletionSafetyProvider struct{} - -// PodResourcesAreReclaimed implements PodDeletionSafetyProvider. -// Always reports that all pod resources are reclaimed. -func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { - return true +type FakePodDeletionSafetyProvider struct { + Reclaimed bool + HasRunning bool +} + +func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { + return f.Reclaimed +} + +func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + return f.HasRunning } diff --git a/pkg/kubelet/status/testing/mock_pod_status_provider.go b/pkg/kubelet/status/testing/mock_pod_status_provider.go index 82618b28785..a58d2886b1d 100644 --- a/pkg/kubelet/status/testing/mock_pod_status_provider.go +++ b/pkg/kubelet/status/testing/mock_pod_status_provider.go @@ -103,6 +103,20 @@ func (mr *MockPodDeletionSafetyProviderMockRecorder) PodResourcesAreReclaimed(po return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status) } +// PodCouldHaveRunningContainers mocks base method +func (m *MockPodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PodCouldHaveRunningContainers", pod) + ret0, _ := ret[0].(bool) + return ret0 +} + +// PodCouldHaveRunningContainers indicates an expected call of PodCouldHaveRunningContainers +func (mr *MockPodDeletionSafetyProviderMockRecorder) PodCouldHaveRunningContainers(pod interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod) +} + // MockManager is a mock of Manager interface type MockManager struct { ctrl *gomock.Controller