From d9adf2036085b462c46adc05c26fb750377daadd Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 22 Oct 2019 19:05:43 +0200 Subject: [PATCH 1/5] Abstract removeStaleState from reconcileState in CPUManager This will become especially important as we move to a model where exclusive CPUs are assigned at pod admission time rather than at pod creation time. Having this function will allow us to do garbage collection on these CPUs anytime we are about to allocate CPUs to a new set of containers, in addition to reclaiming state periodically in the reconcileState() loop. --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 71 +++++++++++++++++++----- 1 file changed, 56 insertions(+), 15 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 2ab8b2b37ad..ac8a009ae05 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -239,15 +239,68 @@ type reconciledContainer struct { containerID string } -func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { +func (m *manager) removeStaleState() { + // Only once all sources are ready do we attempt to remove any stale state. + // This ensures that the call to `m.activePods()` below will succeed with + // the actual active pods list. if !m.sourcesReady.AllReady() { return } + + // We grab the lock to ensure that no new containers will grab CPUs while + // executing the code below. Without this lock, its possible that we end up + // removing state that is newly added by an asynchronous call to + // AddContainer() during the execution of this code. + m.Lock() + defer m.Unlock() + + // We remove stale state very conservatively, only removing *any* state + // once we know for sure that we wont be accidentally removing state that + // is still valid. Since this function is called periodically, we will just + // try again next time this function is called. + activePods := m.activePods() + if len(activePods) == 0 { + // If there are no active pods, skip the removal of stale state. + return + } + + // Build a list of containerIDs for all containers in all active Pods. + activeContainers := make(map[string]struct{}) + for _, pod := range activePods { + pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) + if !ok { + // If even one pod does not have it's status set, skip state removal. + return + } + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + containerID, err := findContainerIDByName(&pstatus, container.Name) + if err != nil { + // If even one container does not have it's containerID set, skip state removal. + return + } + activeContainers[containerID] = struct{}{} + } + } + + // Loop through the CPUManager state. Remove any state for containers not + // in the `activeContainers` list built above. The shortcircuits in place + // above ensure that no erroneous state will ever be removed. + for containerID := range m.state.GetCPUAssignments() { + if _, ok := activeContainers[containerID]; !ok { + klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID) + err := m.policy.RemoveContainer(m.state, containerID) + if err != nil { + klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err) + } + } + } +} + +func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { success = []reconciledContainer{} failure = []reconciledContainer{} - activeContainers := make(map[string]*v1.Pod) - + m.removeStaleState() for _, pod := range m.activePods() { allContainers := pod.Spec.InitContainers allContainers = append(allContainers, pod.Spec.Containers...) @@ -286,8 +339,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec } } - activeContainers[containerID] = pod - cset := m.state.GetCPUSetOrDefault(containerID) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. @@ -306,16 +357,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec success = append(success, reconciledContainer{pod.Name, container.Name, containerID}) } } - - for containerID := range m.state.GetCPUAssignments() { - if pod, ok := activeContainers[containerID]; !ok { - err := m.RemoveContainer(containerID) - if err != nil { - klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err) - failure = append(failure, reconciledContainer{pod.Name, "", containerID}) - } - } - } return success, failure } From 58f3554ebe67c22a9b3170528df95baa9264c80f Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 5 Nov 2019 13:00:20 +0000 Subject: [PATCH 2/5] Sync all CPU and device state before generating TopologyHints for them This ensures that we have the most up-to-date state when generating topology hints for a container. Without this, it's possible that some resources will be seen as allocated, when they are actually free. --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 2 ++ pkg/kubelet/cm/devicemanager/topology_hints.go | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index ac8a009ae05..69d3a0a0519 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -229,6 +229,8 @@ func (m *manager) State() state.Reader { } func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint { + // Garbage collect any stranded resources before providing TopologyHints + m.removeStaleState() // Delegate to active policy return m.policy.GetTopologyHints(m.state, pod, container) } diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 8fbeff4858a..4b6fcdb95c2 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -28,8 +28,10 @@ import ( // ensures the Device Manager is consulted when Topology Aware Hints for each // container are created. func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint { - deviceHints := make(map[string][]topologymanager.TopologyHint) + // Garbage collect any stranded device resources before providing TopologyHints + m.updateAllocatedDevices(m.activePods()) + deviceHints := make(map[string][]topologymanager.TopologyHint) for resourceObj, requestedObj := range container.Resources.Limits { resource := string(resourceObj) requested := int(requestedObj.Value()) @@ -66,8 +68,6 @@ func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { } func (m *ManagerImpl) getAvailableDevices(resource string) sets.String { - // Gets Devices in use. - m.updateAllocatedDevices(m.activePods()) // Strip all devices in use from the list of healthy ones. return m.healthyDevices[resource].Difference(m.allocatedDevices[resource]) } From a338c8f7fda50dc9a483e5bdca09cc4811dbe376 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 5 Nov 2019 13:06:23 +0000 Subject: [PATCH 3/5] Add some more comments to GetTopologyHints() in the devicemanager --- pkg/kubelet/cm/devicemanager/topology_hints.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 4b6fcdb95c2..81fb3577ea2 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -31,18 +31,22 @@ func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[s // Garbage collect any stranded device resources before providing TopologyHints m.updateAllocatedDevices(m.activePods()) + // Loop through all device resources and generate TopologyHints for them.. deviceHints := make(map[string][]topologymanager.TopologyHint) for resourceObj, requestedObj := range container.Resources.Limits { resource := string(resourceObj) requested := int(requestedObj.Value()) + // Only consider resources associated with a device plugin. if m.isDevicePluginResource(resource) { + // Only consider devices that actually container topology information. if aligned := m.deviceHasTopologyAlignment(resource); !aligned { klog.Infof("[devicemanager] Resource '%v' does not have a topology preference", resource) deviceHints[resource] = nil continue } + // Get the list of available devices, for which TopologyHints should be generated. available := m.getAvailableDevices(resource) if available.Len() < requested { klog.Errorf("[devicemanager] Unable to generate topology hints: requested number of devices unavailable for '%s': requested: %d, available: %d", resource, requested, available.Len()) @@ -50,6 +54,8 @@ func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[s continue } + // Generate TopologyHints for this resource given the current + // request size and the list of available devices. deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, requested) } } From 9dc116eb087160ab4b508ccc5179e30e8693564f Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 5 Nov 2019 13:09:49 +0000 Subject: [PATCH 4/5] Ensure CPUManager TopologyHints are regenerated after kubelet restart This patch also includes test to make sure the newly added logic works as expected. --- pkg/kubelet/cm/cpumanager/policy_static.go | 17 ++++ .../cm/cpumanager/policy_static_test.go | 14 +++ .../cm/cpumanager/topology_hints_test.go | 88 ++++++++++++++++++- 3 files changed, 118 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index e37c6114022..82c81d77a90 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -320,6 +320,23 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod v1.Pod, container v1. return nil } + // Short circuit to regenerate the same hints if there are already + // guaranteed CPUs allocated to the Container. This might happen after a + // kubelet restart, for example. + containerID, _ := findContainerIDByName(&pod.Status, container.Name) + if allocated, exists := s.GetCPUSet(containerID); exists { + if allocated.Size() != requested { + klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size()) + return map[string][]topologymanager.TopologyHint{ + string(v1.ResourceCPU): {}, + } + } + klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", string(pod.UID), container.Name) + return map[string][]topologymanager.TopologyHint{ + string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, requested), + } + } + // Get a list of available CPUs. available := p.assignableCPUs(s) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 3d30009917a..acba7c811aa 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -213,6 +213,20 @@ func TestStaticPolicyAdd(t *testing.T) { expCPUAlloc: true, expCSet: cpuset.NewCPUSet(1, 5), }, + { + description: "GuPodMultipleCores, SingleSocketHT, ExpectSameAllocation", + topo: topoSingleSocketHT, + numReservedCPUs: 1, + containerID: "fakeID3", + stAssignments: state.ContainerCPUAssignments{ + "fakeID3": cpuset.NewCPUSet(2, 3, 6, 7), + }, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), + pod: makePod("4000m", "4000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(2, 3, 6, 7), + }, { description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocket", topo: topoDualSocketHT, diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index f998e734159..0b70150da06 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -23,6 +23,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -72,6 +73,7 @@ func TestGetTopologyHints(t *testing.T) { name string pod v1.Pod container v1.Container + assignments state.ContainerCPUAssignments defaultCPUSet cpuset.CPUSet expectedHints []topologymanager.TopologyHint }{ @@ -142,6 +144,86 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, + { + name: "Request more CPUs than available", + pod: *testPod2, + container: *testContainer2, + defaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), + expectedHints: nil, + }, + { + name: "Regenerate Single-Node NUMA Hints if already allocated 1/2", + pod: *testPod1, + container: *testContainer1, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 6), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: firstSocketMask, + Preferred: true, + }, + { + NUMANodeAffinity: crossSocketMask, + Preferred: false, + }, + }, + }, + { + name: "Regenerate Single-Node NUMA Hints if already allocated 1/2", + pod: *testPod1, + container: *testContainer1, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(3, 9), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: secondSocketMask, + Preferred: true, + }, + { + NUMANodeAffinity: crossSocketMask, + Preferred: false, + }, + }, + }, + { + name: "Regenerate Cross-NUMA Hints if already allocated", + pod: *testPod4, + container: *testContainer4, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: crossSocketMask, + Preferred: true, + }, + }, + }, + { + name: "Requested less than already allocated", + pod: *testPod1, + container: *testContainer1, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 6, 3, 9), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{}, + }, + { + name: "Requested more than already allocated", + pod: *testPod4, + container: *testContainer4, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 6, 3, 9), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{}, + }, } for _, tc := range tcases { topology, _ := topology.Discover(&machineInfo, numaNodeInfo) @@ -151,9 +233,13 @@ func TestGetTopologyHints(t *testing.T) { topology: topology, }, state: &mockState{ + assignments: tc.assignments, defaultCPUSet: tc.defaultCPUSet, }, - topology: topology, + topology: topology, + activePods: func() []*v1.Pod { return nil }, + podStatusProvider: mockPodStatusProvider{}, + sourcesReady: &sourcesReadyStub{}, } hints := m.GetTopologyHints(tc.pod, tc.container)[string(v1.ResourceCPU)] From 4d4d4bdd6152d34203bc2f273160ac5a172a0917 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 5 Nov 2019 13:16:48 +0000 Subject: [PATCH 5/5] Ensure devicemanager TopologyHints are regenerated after kubelet restart This patch also includes test to make sure the newly added logic works as expected. --- pkg/kubelet/cm/devicemanager/BUILD | 1 + .../cm/devicemanager/topology_hints.go | 15 ++ .../cm/devicemanager/topology_hints_test.go | 162 ++++++++++++++++-- 3 files changed, 163 insertions(+), 15 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index 909309cd520..4c0ff545194 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -60,6 +60,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 81fb3577ea2..7a8d1925896 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -46,6 +46,21 @@ func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[s continue } + // Short circuit to regenerate the same hints if there are already + // devices allocated to the Container. This might happen after a + // kubelet restart, for example. + allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) + if allocated.Len() > 0 { + if allocated.Len() != requested { + klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", resource, string(pod.UID), container.Name, requested, allocated.Len()) + deviceHints[resource] = []topologymanager.TopologyHint{} + continue + } + klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v, container %v)", resource, string(pod.UID), container.Name) + deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, requested) + continue + } + // Get the list of available devices, for which TopologyHints should be generated. available := m.getAvailableDevices(resource) if available.Len() < requested { diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index e77cf3b4957..b69a7e3c62d 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -52,13 +53,17 @@ func makeSocketMask(sockets ...int) bitmask.BitMask { func TestGetTopologyHints(t *testing.T) { tcases := []struct { description string + podUID string + containerName string request map[string]string devices map[string][]pluginapi.Device - allocatedDevices map[string][]string + allocatedDevices map[string]map[string]map[string][]string expectedHints map[string][]topologymanager.TopologyHint }{ { - description: "Single Request, no alignment", + description: "Single Request, no alignment", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "1", }, @@ -73,7 +78,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Single Request, only one with alignment", + description: "Single Request, only one with alignment", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "1", }, @@ -97,7 +104,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Single Request, one device per socket", + description: "Single Request, one device per socket", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "1", }, @@ -125,7 +134,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Request for 2, one device per socket", + description: "Request for 2, one device per socket", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "2", }, @@ -145,7 +156,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Request for 2, 2 devices per socket", + description: "Request for 2, 2 devices per socket", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "2", }, @@ -175,7 +188,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Request for 2, optimal on 1 NUMA node, forced cross-NUMA", + description: "Request for 2, optimal on 1 NUMA node, forced cross-NUMA", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "2", }, @@ -187,8 +202,12 @@ func TestGetTopologyHints(t *testing.T) { makeNUMADevice("Dev4", 1), }, }, - allocatedDevices: map[string][]string{ - "testdevice": {"Dev1", "Dev2"}, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeOtherContainer": { + "testdevice": {"Dev1", "Dev2"}, + }, + }, }, expectedHints: map[string][]topologymanager.TopologyHint{ "testdevice": { @@ -200,7 +219,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "2 device types, mixed configuration", + description: "2 device types, mixed configuration", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice1": "2", "testdevice2": "1", @@ -243,6 +264,110 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, + { + description: "Single device type, more requested than available", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "6", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + makeNUMADevice("Dev3", 1), + makeNUMADevice("Dev4", 1), + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": {}, + }, + }, + { + description: "Single device type, all already allocated to container", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "2", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + }, + }, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeContainer": { + "testdevice": {"Dev1", "Dev2"}, + }, + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": { + { + NUMANodeAffinity: makeSocketMask(0), + Preferred: true, + }, + { + NUMANodeAffinity: makeSocketMask(0, 1), + Preferred: false, + }, + }, + }, + }, + { + description: "Single device type, less already allocated to container than requested", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "4", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + makeNUMADevice("Dev3", 1), + makeNUMADevice("Dev4", 1), + }, + }, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeContainer": { + "testdevice": {"Dev1", "Dev2"}, + }, + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": {}, + }, + }, + { + description: "Single device type, more already allocated to container than requested", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "2", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + makeNUMADevice("Dev3", 1), + makeNUMADevice("Dev4", 1), + }, + }, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeContainer": { + "testdevice": {"Dev1", "Dev2", "Dev3", "Dev4"}, + }, + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": {}, + }, + }, } for _, tc := range tcases { @@ -252,6 +377,8 @@ func TestGetTopologyHints(t *testing.T) { } pod := makePod(resourceList) + pod.UID = types.UID(tc.podUID) + pod.Spec.Containers[0].Name = tc.containerName m := ManagerImpl{ allDevices: make(map[string]map[string]pluginapi.Device), @@ -259,7 +386,7 @@ func TestGetTopologyHints(t *testing.T) { allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), sourcesReady: &sourcesReadyStub{}, - activePods: func() []*v1.Pod { return []*v1.Pod{} }, + activePods: func() []*v1.Pod { return []*v1.Pod{pod} }, numaNodes: []int{0, 1}, } @@ -273,11 +400,16 @@ func TestGetTopologyHints(t *testing.T) { } } - for r := range tc.allocatedDevices { - m.allocatedDevices[r] = sets.NewString() + for p := range tc.allocatedDevices { + for c := range tc.allocatedDevices[p] { + for r, devices := range tc.allocatedDevices[p][c] { + m.podDevices.insert(p, c, r, sets.NewString(devices...), nil) - for _, d := range tc.allocatedDevices[r] { - m.allocatedDevices[r].Insert(d) + m.allocatedDevices[r] = sets.NewString() + for _, d := range devices { + m.allocatedDevices[r].Insert(d) + } + } } }