Add Get interfaces for container's checkpointed ResourcesAllocated and Resize values, remove error logging for valid standalone kubelet scenario

This commit is contained in:
vinay kulkarni 2023-03-06 06:34:53 +00:00
parent 12435b26fc
commit b0dce923f1
5 changed files with 103 additions and 73 deletions

View File

@ -2105,19 +2105,12 @@ func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, strin
// TODO: out of resource eviction should have a pod admitter call-out // TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods} attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// If pod resource allocation checkpoint manager (checkpointState) is nil, it is likely because
// kubelet is bootstrapping kube-system pods on master node. In this scenario, pod resource resize
// is neither expected nor can be handled. Fall back to regular 'canAdmitPod' processing.
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate if we can toss out all this checkpointing
// code and instead rely on ResourceAllocation / Resize values persisted in PodStatus. (Ref: KEP 2527)
// Use allocated resources values from checkpoint store (source of truth) to determine fit // Use allocated resources values from checkpoint store (source of truth) to determine fit
checkpointState := kl.statusManager.State()
if checkpointState != nil {
otherPods := make([]*v1.Pod, 0, len(pods)) otherPods := make([]*v1.Pod, 0, len(pods))
for _, p := range pods { for _, p := range pods {
op := p.DeepCopy() op := p.DeepCopy()
for _, c := range op.Spec.Containers { for _, c := range op.Spec.Containers {
resourcesAllocated, found := checkpointState.GetContainerResourceAllocation(string(p.UID), c.Name) resourcesAllocated, found := kl.statusManager.GetContainerResourceAllocation(string(p.UID), c.Name)
if c.Resources.Requests != nil && found { if c.Resources.Requests != nil && found {
c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU] c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU]
c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory] c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory]
@ -2126,9 +2119,6 @@ func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, strin
otherPods = append(otherPods, op) otherPods = append(otherPods, op)
} }
attrs.OtherPods = otherPods attrs.OtherPods = otherPods
} else {
klog.ErrorS(nil, "pod resource allocation checkpoint manager is not initialized.")
}
} }
for _, podAdmitHandler := range kl.admitHandlers { for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit { if result := podAdmitHandler.Admit(attrs); !result.Admit {
@ -2413,18 +2403,11 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
activePods := kl.filterOutInactivePods(existingPods) activePods := kl.filterOutInactivePods(existingPods)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// If pod resource allocation checkpoint manager (checkpointState) is nil, it is likely because
// kubelet is bootstrapping kube-system pods on master node. In this scenario, pod resource resize
// is neither expected nor can be handled. Fall back to regular 'canAdmitPod' processing.
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate if we can toss out all this checkpointing
// code and instead rely on ResourceAllocation / Resize values persisted in PodStatus. (Ref: KEP 2527)
checkpointState := kl.statusManager.State()
if checkpointState != nil {
// To handle kubelet restarts, test pod admissibility using ResourcesAllocated values // To handle kubelet restarts, test pod admissibility using ResourcesAllocated values
// (for cpu & memory) from checkpoint store. If found, that is the source of truth. // (for cpu & memory) from checkpoint store. If found, that is the source of truth.
podCopy := pod.DeepCopy() podCopy := pod.DeepCopy()
for _, c := range podCopy.Spec.Containers { for _, c := range podCopy.Spec.Containers {
resourcesAllocated, found := checkpointState.GetContainerResourceAllocation(string(pod.UID), c.Name) resourcesAllocated, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name)
if c.Resources.Requests != nil && found { if c.Resources.Requests != nil && found {
c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU] c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU]
c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory] c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory]
@ -2447,13 +2430,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
continue continue
} }
} }
} else {
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
} }
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)

View File

@ -1472,14 +1472,9 @@ func (kl *Kubelet) determinePodResizeStatus(pod *v1.Pod, podStatus *v1.PodStatus
klog.ErrorS(err, "SetPodResizeStatus failed", "pod", pod.Name) klog.ErrorS(err, "SetPodResizeStatus failed", "pod", pod.Name)
} }
} else { } else {
checkpointState := kl.statusManager.State() if resizeStatus, found := kl.statusManager.GetPodResizeStatus(string(pod.UID)); found {
if checkpointState != nil {
if resizeStatus, found := checkpointState.GetPodResizeStatus(string(pod.UID)); found {
podResizeStatus = resizeStatus podResizeStatus = resizeStatus
} }
} else {
klog.ErrorS(nil, "pod resource allocation checkpoint manager is not initialized.")
}
} }
return podResizeStatus return podResizeStatus
} }
@ -1773,12 +1768,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
container := kubecontainer.GetContainerSpec(pod, cName) container := kubecontainer.GetContainerSpec(pod, cName)
// ResourcesAllocated values come from checkpoint. It is the source-of-truth. // ResourcesAllocated values come from checkpoint. It is the source-of-truth.
found := false found := false
checkpointState := kl.statusManager.State() status.ResourcesAllocated, found = kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName)
if checkpointState != nil {
status.ResourcesAllocated, found = checkpointState.GetContainerResourceAllocation(string(pod.UID), cName)
} else {
klog.ErrorS(nil, "pod resource allocation checkpoint manager is not initialized.")
}
if !(container.Resources.Requests == nil && container.Resources.Limits == nil) && !found { if !(container.Resources.Requests == nil && container.Resources.Limits == nil) && !found {
// Log error and fallback to ResourcesAllocated in oldStatus if it exists // Log error and fallback to ResourcesAllocated in oldStatus if it exists
klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName) klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName)

View File

@ -68,6 +68,16 @@ func (m *fakeManager) State() state.Reader {
return m.state return m.state
} }
func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) {
klog.InfoS("GetContainerResourceAllocation()")
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
klog.InfoS("GetPodResizeStatus()")
return "", false
}
func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
klog.InfoS("SetPodAllocation()") klog.InfoS("SetPodAllocation()")
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {

View File

@ -140,6 +140,12 @@ type Manager interface {
// State returns a read-only interface to the internal status manager state. // State returns a read-only interface to the internal status manager state.
State() state.Reader State() state.Reader
// GetContainerResourceAllocation returns checkpointed ResourcesAllocated value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool)
// GetPodResizeStatus returns checkpointed PodStatus.Resize value
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers. // SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error SetPodAllocation(pod *v1.Pod) error
@ -234,6 +240,28 @@ func (m *manager) State() state.Reader {
return m.state return m.state
} }
// GetContainerResourceAllocation returns the last checkpointed ResourcesAllocated values
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
if m.state != nil {
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
return nil, false
}
// GetPodResizeStatus returns the last checkpointed ResizeStaus value
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
if m.state != nil {
return m.state.GetPodResizeStatus(podUID)
}
return "", false
}
// SetPodAllocation checkpoints the resources allocated to a pod's containers // SetPodAllocation checkpoints the resources allocated to a pod's containers
func (m *manager) SetPodAllocation(pod *v1.Pod) error { func (m *manager) SetPodAllocation(pod *v1.Pod) error {
if m.state == nil { if m.state == nil {
@ -680,13 +708,11 @@ func (m *manager) deletePodStatus(uid types.UID) {
delete(m.podStatuses, uid) delete(m.podStatuses, uid)
m.podStartupLatencyHelper.DeletePodStartupState(uid) m.podStartupLatencyHelper.DeletePodStartupState(uid)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if m.state == nil { if m.state != nil {
klog.ErrorS(nil, "pod allocation checkpoint manager is not initialized")
return
}
m.state.Delete(string(uid), "") m.state.Delete(string(uid), "")
} }
} }
}
// TODO(filipg): It'd be cleaner if we can do this without signal from user. // TODO(filipg): It'd be cleaner if we can do this without signal from user.
func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
@ -697,15 +723,13 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
klog.V(5).InfoS("Removing pod from status map.", "podUID", key) klog.V(5).InfoS("Removing pod from status map.", "podUID", key)
delete(m.podStatuses, key) delete(m.podStatuses, key)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if m.state == nil { if m.state != nil {
klog.ErrorS(nil, "pod allocation checkpoint manager is not initialized")
continue
}
m.state.Delete(string(key), "") m.state.Delete(string(key), "")
} }
} }
} }
} }
}
// syncBatch syncs pods statuses with the apiserver. // syncBatch syncs pods statuses with the apiserver.
func (m *manager) syncBatch() { func (m *manager) syncBatch() {

View File

@ -189,6 +189,36 @@ func (m *MockManager) EXPECT() *MockManagerMockRecorder {
return m.recorder return m.recorder
} }
// GetContainerResourceAllocation mocks base method.
func (m *MockManager) GetContainerResourceAllocation(podUID, containerName string) (v1.ResourceList, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetContainerResourceAllocation", podUID, containerName)
ret0, _ := ret[0].(v1.ResourceList)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetContainerResourceAllocation indicates an expected call of GetContainerResourceAllocation.
func (mr *MockManagerMockRecorder) GetContainerResourceAllocation(podUID, containerName interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContainerResourceAllocation", reflect.TypeOf((*MockManager)(nil).GetContainerResourceAllocation), podUID, containerName)
}
// GetPodResizeStatus mocks base method.
func (m *MockManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPodResizeStatus", podUID)
ret0, _ := ret[0].(v1.PodResizeStatus)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetPodResizeStatus indicates an expected call of GetPodResizeStatus.
func (mr *MockManagerMockRecorder) GetPodResizeStatus(podUID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodResizeStatus", reflect.TypeOf((*MockManager)(nil).GetPodResizeStatus), podUID)
}
// GetPodStatus mocks base method. // GetPodStatus mocks base method.
func (m *MockManager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { func (m *MockManager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
m.ctrl.T.Helper() m.ctrl.T.Helper()