Updates to container manager and internal container lifecycle to accommodate Topology Manager
Co-authored-by: Conor Nolan <conor.nolan@intel.com>
This commit is contained in:
		| @@ -693,6 +693,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan | |||||||
| 				ExperimentalPodPidsLimit:              s.PodPidsLimit, | 				ExperimentalPodPidsLimit:              s.PodPidsLimit, | ||||||
| 				EnforceCPULimits:                      s.CPUCFSQuota, | 				EnforceCPULimits:                      s.CPUCFSQuota, | ||||||
| 				CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration, | 				CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration, | ||||||
|  | 				ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy, | ||||||
| 			}, | 			}, | ||||||
| 			s.FailSwapOn, | 			s.FailSwapOn, | ||||||
| 			devicePluginEnabled, | 			devicePluginEnabled, | ||||||
|   | |||||||
| @@ -28,6 +28,7 @@ go_library( | |||||||
|         "//pkg/features:go_default_library", |         "//pkg/features:go_default_library", | ||||||
|         "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", |         "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", | ||||||
|         "//pkg/kubelet/cm/cpumanager:go_default_library", |         "//pkg/kubelet/cm/cpumanager:go_default_library", | ||||||
|  |         "//pkg/kubelet/cm/topologymanager:go_default_library", | ||||||
|         "//pkg/kubelet/config:go_default_library", |         "//pkg/kubelet/config:go_default_library", | ||||||
|         "//pkg/kubelet/container:go_default_library", |         "//pkg/kubelet/container:go_default_library", | ||||||
|         "//pkg/kubelet/eviction/api:go_default_library", |         "//pkg/kubelet/eviction/api:go_default_library", | ||||||
|   | |||||||
| @@ -24,6 +24,7 @@ import ( | |||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	internalapi "k8s.io/cri-api/pkg/apis" | 	internalapi "k8s.io/cri-api/pkg/apis" | ||||||
| 	podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" | 	podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" | ||||||
|  | 	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/config" | 	"k8s.io/kubernetes/pkg/kubelet/config" | ||||||
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | ||||||
| 	evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" | 	evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" | ||||||
| @@ -108,6 +109,9 @@ type ContainerManager interface { | |||||||
| 	// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, | 	// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, | ||||||
| 	// due to node recreation. | 	// due to node recreation. | ||||||
| 	ShouldResetExtendedResourceCapacity() bool | 	ShouldResetExtendedResourceCapacity() bool | ||||||
|  |  | ||||||
|  | 	// GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission | ||||||
|  | 	GetTopologyPodAdmitHandler() topologymanager.Manager | ||||||
| } | } | ||||||
|  |  | ||||||
| type NodeConfig struct { | type NodeConfig struct { | ||||||
| @@ -127,6 +131,7 @@ type NodeConfig struct { | |||||||
| 	ExperimentalPodPidsLimit              int64 | 	ExperimentalPodPidsLimit              int64 | ||||||
| 	EnforceCPULimits                      bool | 	EnforceCPULimits                      bool | ||||||
| 	CPUCFSQuotaPeriod                     time.Duration | 	CPUCFSQuotaPeriod                     time.Duration | ||||||
|  | 	ExperimentalTopologyManagerPolicy     string | ||||||
| } | } | ||||||
|  |  | ||||||
| type NodeAllocatableConfig struct { | type NodeAllocatableConfig struct { | ||||||
|   | |||||||
| @@ -48,6 +48,7 @@ import ( | |||||||
| 	"k8s.io/kubernetes/pkg/kubelet/cadvisor" | 	"k8s.io/kubernetes/pkg/kubelet/cadvisor" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" | 	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" | ||||||
|  | 	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" | ||||||
| 	cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" | 	cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/config" | 	"k8s.io/kubernetes/pkg/kubelet/config" | ||||||
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | ||||||
| @@ -139,6 +140,8 @@ type containerManagerImpl struct { | |||||||
| 	deviceManager devicemanager.Manager | 	deviceManager devicemanager.Manager | ||||||
| 	// Interface for CPU affinity management. | 	// Interface for CPU affinity management. | ||||||
| 	cpuManager cpumanager.Manager | 	cpuManager cpumanager.Manager | ||||||
|  | 	// Interface for Topology resource co-ordination | ||||||
|  | 	topologyManager topologymanager.Manager | ||||||
| } | } | ||||||
|  |  | ||||||
| type features struct { | type features struct { | ||||||
| @@ -284,6 +287,20 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I | |||||||
| 		qosContainerManager: qosContainerManager, | 		qosContainerManager: qosContainerManager, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { | ||||||
|  | 		cm.topologyManager, err = topologymanager.NewManager( | ||||||
|  | 			nodeConfig.ExperimentalTopologyManagerPolicy, | ||||||
|  | 		) | ||||||
|  |  | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		klog.Infof("[topologymanager] Initilizing Topology Manager with %s policy", nodeConfig.ExperimentalTopologyManagerPolicy) | ||||||
|  | 	} else { | ||||||
|  | 		cm.topologyManager = topologymanager.NewFakeManager() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	klog.Infof("Creating device plugin manager: %t", devicePluginEnabled) | 	klog.Infof("Creating device plugin manager: %t", devicePluginEnabled) | ||||||
| 	if devicePluginEnabled { | 	if devicePluginEnabled { | ||||||
| 		cm.deviceManager, err = devicemanager.NewManagerImpl() | 		cm.deviceManager, err = devicemanager.NewManagerImpl() | ||||||
| @@ -332,7 +349,7 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { | func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { | ||||||
| 	return &internalContainerLifecycleImpl{cm.cpuManager} | 	return &internalContainerLifecycleImpl{cm.cpuManager, cm.topologyManager} | ||||||
| } | } | ||||||
|  |  | ||||||
| // Create a cgroup container manager. | // Create a cgroup container manager. | ||||||
| @@ -644,6 +661,10 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.No | |||||||
| 	return cm.deviceManager.Allocate(node, attrs) | 	return cm.deviceManager.Allocate(node, attrs) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { | ||||||
|  | 	return cm.topologyManager | ||||||
|  | } | ||||||
|  |  | ||||||
| func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { | func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { | ||||||
| 	cpuLimit := int64(0) | 	cpuLimit := int64(0) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -24,6 +24,7 @@ import ( | |||||||
| 	internalapi "k8s.io/cri-api/pkg/apis" | 	internalapi "k8s.io/cri-api/pkg/apis" | ||||||
| 	podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" | 	podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | ||||||
|  | 	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/config" | 	"k8s.io/kubernetes/pkg/kubelet/config" | ||||||
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/lifecycle" | 	"k8s.io/kubernetes/pkg/kubelet/lifecycle" | ||||||
| @@ -101,7 +102,7 @@ func (cm *containerManagerStub) UpdatePluginResources(*schedulernodeinfo.NodeInf | |||||||
| } | } | ||||||
|  |  | ||||||
| func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle { | func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle { | ||||||
| 	return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} | 	return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (cm *containerManagerStub) GetPodCgroupRoot() string { | func (cm *containerManagerStub) GetPodCgroupRoot() string { | ||||||
| @@ -116,6 +117,10 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool { | |||||||
| 	return cm.shouldResetExtendedResourceCapacity | 	return cm.shouldResetExtendedResourceCapacity | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func NewStubContainerManager() ContainerManager { | func NewStubContainerManager() ContainerManager { | ||||||
| 	return &containerManagerStub{shouldResetExtendedResourceCapacity: false} | 	return &containerManagerStub{shouldResetExtendedResourceCapacity: false} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -34,6 +34,7 @@ import ( | |||||||
| 	podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" | 	podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/cadvisor" | 	"k8s.io/kubernetes/pkg/kubelet/cadvisor" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | ||||||
|  | 	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/config" | 	"k8s.io/kubernetes/pkg/kubelet/config" | ||||||
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/lifecycle" | 	"k8s.io/kubernetes/pkg/kubelet/lifecycle" | ||||||
| @@ -161,7 +162,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(*schedulernodeinfo.NodeInf | |||||||
| } | } | ||||||
|  |  | ||||||
| func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { | func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { | ||||||
| 	return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} | 	return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (cm *containerManagerImpl) GetPodCgroupRoot() string { | func (cm *containerManagerImpl) GetPodCgroupRoot() string { | ||||||
| @@ -175,3 +176,7 @@ func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.Conta | |||||||
| func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { | func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { | ||||||
| 	return false | 	return false | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|   | |||||||
| @@ -22,6 +22,7 @@ import ( | |||||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||||
| 	kubefeatures "k8s.io/kubernetes/pkg/features" | 	kubefeatures "k8s.io/kubernetes/pkg/features" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | 	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" | ||||||
|  | 	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type InternalContainerLifecycle interface { | type InternalContainerLifecycle interface { | ||||||
| @@ -32,12 +33,22 @@ type InternalContainerLifecycle interface { | |||||||
|  |  | ||||||
| // Implements InternalContainerLifecycle interface. | // Implements InternalContainerLifecycle interface. | ||||||
| type internalContainerLifecycleImpl struct { | type internalContainerLifecycleImpl struct { | ||||||
| 	cpuManager cpumanager.Manager | 	cpuManager      cpumanager.Manager | ||||||
|  | 	topologyManager topologymanager.Manager | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { | func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { | ||||||
| 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { | 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { | ||||||
| 		return i.cpuManager.AddContainer(pod, container, containerID) | 		err := i.cpuManager.AddContainer(pod, container, containerID) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { | ||||||
|  | 		err := i.topologyManager.AddContainer(pod, containerID) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -51,7 +62,16 @@ func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) er | |||||||
|  |  | ||||||
| func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error { | func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error { | ||||||
| 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { | 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { | ||||||
| 		return i.cpuManager.RemoveContainer(containerID) | 		err := i.cpuManager.RemoveContainer(containerID) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { | ||||||
|  | 		err := i.topologyManager.RemoveContainer(containerID) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -859,7 +859,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, | |||||||
| 	} | 	} | ||||||
| 	klet.AddPodSyncLoopHandler(activeDeadlineHandler) | 	klet.AddPodSyncLoopHandler(activeDeadlineHandler) | ||||||
| 	klet.AddPodSyncHandler(activeDeadlineHandler) | 	klet.AddPodSyncHandler(activeDeadlineHandler) | ||||||
|  | 	if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) { | ||||||
|  | 		klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler()) | ||||||
|  | 	} | ||||||
| 	criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) | 	criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) | ||||||
| 	klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) | 	klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) | ||||||
| 	// apply functional Option's | 	// apply functional Option's | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Louise Daly
					Louise Daly