kubelet: filter out terminated pods before rejecting pods
Currently, kubelet doesn't filter out terminated pods before determining whether a pod fits. This could lead to duplicated events for rejecting the pods. This change fixes that. This change also groups all related pod fitness checking functions into one function to improve readability.
This commit is contained in:
@@ -1181,23 +1181,8 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
|
|||||||
}
|
}
|
||||||
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
|
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
|
||||||
|
|
||||||
// Reject pods that we cannot run.
|
// Handles pod admission.
|
||||||
kl.handleNotFittingPods(allPods)
|
pods := kl.admitPods(allPods, podSyncTypes)
|
||||||
|
|
||||||
// Reject new creation requests if diskspace is running low.
|
|
||||||
kl.handleOutOfDisk(allPods, podSyncTypes)
|
|
||||||
|
|
||||||
// Pod phase progresses monotonically. Once a pod has reached a final state,
|
|
||||||
// it should never leave irregardless of the restart policy. The statuses
|
|
||||||
// of such pods should not be changed, and there is no need to sync them.
|
|
||||||
// TODO: the logic here does not handle two cases:
|
|
||||||
// 1. If the containers were removed immediately after they died, kubelet
|
|
||||||
// may fail to generate correct statuses, let alone filtering correctly.
|
|
||||||
// 2. If kubelet restarted before writing the terminated status for a pod
|
|
||||||
// to the apiserver, it could still restart the terminated pod (even
|
|
||||||
// though the pod was not considered terminated by the apiserver).
|
|
||||||
// These two conditions could be alleviated by checkpointing kubelet.
|
|
||||||
pods := kl.filterOutTerminatedPods(allPods)
|
|
||||||
|
|
||||||
glog.V(4).Infof("Desired: %#v", pods)
|
glog.V(4).Infof("Desired: %#v", pods)
|
||||||
var err error
|
var err error
|
||||||
@@ -1358,10 +1343,10 @@ func (kl *Kubelet) checkCapacityExceeded(pods []*api.Pod) (fitting []*api.Pod, n
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
|
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
|
||||||
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) {
|
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod {
|
||||||
if len(podSyncTypes) == 0 {
|
if len(podSyncTypes) == 0 {
|
||||||
// regular sync. no new pods
|
// regular sync. no new pods
|
||||||
return
|
return pods
|
||||||
}
|
}
|
||||||
outOfDockerDisk := false
|
outOfDockerDisk := false
|
||||||
outOfRootDisk := false
|
outOfRootDisk := false
|
||||||
@@ -1381,18 +1366,26 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]m
|
|||||||
// We ignore the first disk check to ensure that running pods are not killed.
|
// We ignore the first disk check to ensure that running pods are not killed.
|
||||||
// Disk manager will only declare out of disk problems if unfreeze has been called.
|
// Disk manager will only declare out of disk problems if unfreeze has been called.
|
||||||
kl.diskSpaceManager.Unfreeze()
|
kl.diskSpaceManager.Unfreeze()
|
||||||
if outOfDockerDisk || outOfRootDisk {
|
|
||||||
for _, pod := range pods {
|
if !outOfDockerDisk && !outOfRootDisk {
|
||||||
// Only reject pods that didn't start yet.
|
// Disk space is fine.
|
||||||
if podSyncTypes[pod.UID] == metrics.SyncPodCreate {
|
return pods
|
||||||
kl.recorder.Eventf(pod, "OutOfDisk", "Cannot start the pod due to lack of disk space.")
|
|
||||||
kl.statusManager.SetPodStatus(pod, api.PodStatus{
|
|
||||||
Phase: api.PodFailed,
|
|
||||||
Message: "Pod cannot be started due to lack of disk space."})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var fitting []*api.Pod
|
||||||
|
for i := range pods {
|
||||||
|
pod := pods[i]
|
||||||
|
// Only reject pods that didn't start yet.
|
||||||
|
if podSyncTypes[pod.UID] == metrics.SyncPodCreate {
|
||||||
|
kl.recorder.Eventf(pod, "OutOfDisk", "Cannot start the pod due to lack of disk space.")
|
||||||
|
kl.statusManager.SetPodStatus(pod, api.PodStatus{
|
||||||
|
Phase: api.PodFailed,
|
||||||
|
Message: "Pod cannot be started due to lack of disk space."})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fitting = append(fitting, pod)
|
||||||
|
}
|
||||||
|
return fitting
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkNodeSelectorMatching detects pods that do not match node's labels.
|
// checkNodeSelectorMatching detects pods that do not match node's labels.
|
||||||
@@ -1412,9 +1405,10 @@ func (kl *Kubelet) checkNodeSelectorMatching(pods []*api.Pod) (fitting []*api.Po
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleNotfittingPods handles pods that do not fit on the node.
|
// handleNotfittingPods handles pods that do not fit on the node and returns
|
||||||
// Currently conflicts on Port.HostPort values, matching node's labels and exceeding node's capacity are handled.
|
// the pods that fit. It currently checks host port conflicts, node selector
|
||||||
func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) {
|
// mismatches, and exceeded node capacity.
|
||||||
|
func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod {
|
||||||
fitting, notFitting := checkHostPortConflicts(pods)
|
fitting, notFitting := checkHostPortConflicts(pods)
|
||||||
for _, pod := range notFitting {
|
for _, pod := range notFitting {
|
||||||
kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
|
kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
|
||||||
@@ -1436,6 +1430,38 @@ func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) {
|
|||||||
Phase: api.PodFailed,
|
Phase: api.PodFailed,
|
||||||
Message: "Pod cannot be started due to exceeded capacity"})
|
Message: "Pod cannot be started due to exceeded capacity"})
|
||||||
}
|
}
|
||||||
|
return fitting
|
||||||
|
}
|
||||||
|
|
||||||
|
// admitPods handles pod admission. It filters out terminated pods, and pods
|
||||||
|
// that don't fit on the node, and may reject pods if node is overcommitted.
|
||||||
|
func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod {
|
||||||
|
// Pod phase progresses monotonically. Once a pod has reached a final state,
|
||||||
|
// it should never leave irregardless of the restart policy. The statuses
|
||||||
|
// of such pods should not be changed, and there is no need to sync them.
|
||||||
|
// TODO: the logic here does not handle two cases:
|
||||||
|
// 1. If the containers were removed immediately after they died, kubelet
|
||||||
|
// may fail to generate correct statuses, let alone filtering correctly.
|
||||||
|
// 2. If kubelet restarted before writing the terminated status for a pod
|
||||||
|
// to the apiserver, it could still restart the terminated pod (even
|
||||||
|
// though the pod was not considered terminated by the apiserver).
|
||||||
|
// These two conditions could be alleviated by checkpointing kubelet.
|
||||||
|
pods := kl.filterOutTerminatedPods(allPods)
|
||||||
|
|
||||||
|
// Respect the pod creation order when resolving conflicts.
|
||||||
|
sort.Sort(podsByCreationTime(pods))
|
||||||
|
|
||||||
|
// Reject pods that we cannot run.
|
||||||
|
// handleNotFittingPods relies on static information (e.g. immutable fields
|
||||||
|
// in the pod specs or machine information that doesn't change without
|
||||||
|
// rebooting), and the pods are sorted by immutable creation time. Hence it
|
||||||
|
// should only rejects new pods without checking the pod sync types.
|
||||||
|
fitting := kl.handleNotFittingPods(pods)
|
||||||
|
|
||||||
|
// Reject new creation requests if diskspace is running low.
|
||||||
|
admittedPods := kl.handleOutOfDisk(fitting, podSyncTypes)
|
||||||
|
|
||||||
|
return admittedPods
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncLoop is the main loop for processing changes. It watches for changes from
|
// syncLoop is the main loop for processing changes. It watches for changes from
|
||||||
|
Reference in New Issue
Block a user