Split CPUManager takeByTopology() into two different algorithms

The first implements the original algorithm which packs CPUs onto NUMA nodes if
more than one NUMA node is required to satisfy the allocation. The second
disitributes CPUs across NUMA nodes if they can't all fit into one.

The "distributing" algorithm is currently a noop and just returns an error of
"unimplemented". A subsequent commit will add the logic to implement this
algorithm according to KEP 2902:

https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option

Signed-off-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
Kevin Klues 2021-10-12 10:07:54 +00:00
parent 0e7928edce
commit 462544d079
3 changed files with 26 additions and 13 deletions

View File

@ -318,7 +318,7 @@ func (a *cpuAccumulator) isFailed() bool {
return a.numCPUsNeeded > a.details.CPUs().Size()
}
func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
if acc.isSatisfied() {
return acc.result, nil
@ -358,3 +358,7 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num
return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
}
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
return cpuset.NewCPUSet(), fmt.Errorf("unimplemented")
}

View File

@ -506,7 +506,7 @@ func TestCPUAccumulatorTake(t *testing.T) {
}
}
func TestTakeByTopology(t *testing.T) {
func TestTakeByTopologyNUMAPacked(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
@ -631,7 +631,7 @@ func TestTakeByTopology(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
result, err := takeByTopology(tc.topo, tc.availableCPUs, tc.numCPUs)
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs)
if tc.expErr != "" && err.Error() != tc.expErr {
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
}

View File

@ -118,6 +118,13 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
klog.InfoS("Static policy created with configuration", "options", opts)
policy := &staticPolicy{
topology: topology,
affinity: affinity,
cpusToReuse: make(map[string]cpuset.CPUSet),
options: opts,
}
allCPUs := topology.CPUDetails.CPUs()
var reserved cpuset.CPUSet
if reservedCPUs.Size() > 0 {
@ -128,7 +135,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
//
// For example: Given a system with 8 CPUs available and HT enabled,
// if numReservedCPUs=2, then reserved={0,4}
reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs)
reserved, _ = policy.takeByTopology(allCPUs, numReservedCPUs)
}
if reserved.Size() != numReservedCPUs {
@ -137,14 +144,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
}
klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved)
policy.reserved = reserved
return &staticPolicy{
topology: topology,
reserved: reserved,
affinity: affinity,
cpusToReuse: make(map[string]cpuset.CPUSet),
options: opts,
}, nil
return policy, nil
}
func (p *staticPolicy) Name() string {
@ -318,7 +320,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
numAlignedToAlloc = numCPUs
}
alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc)
alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
if err != nil {
return cpuset.NewCPUSet(), err
}
@ -327,7 +329,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
}
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size())
remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size())
if err != nil {
return cpuset.NewCPUSet(), err
}
@ -381,6 +383,13 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
return requestedByAppContainers
}
func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
if p.options.DistributeCPUsAcrossNUMA {
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs)
}
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs)
}
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Get a count of how many guaranteed CPUs have been requested.
requested := p.guaranteedCPUs(pod, container)