From 692f8aab277d8fba35d9ec48c544e128f86469c2 Mon Sep 17 00:00:00 2001 From: huyinhou Date: Mon, 19 Dec 2022 12:07:10 +0800 Subject: [PATCH 1/6] fix kubelet crash, concurrent map iteration and map write When kubelet starts a Pod that requires device resources, if the device plug-in updates the device at the same time, it may cause kubelet to crash. Signed-off-by: huyinhou --- pkg/kubelet/cm/devicemanager/topology_hints.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 8e9521d8d12..d45f45c1143 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -147,11 +147,15 @@ func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { } func (m *ManagerImpl) getAvailableDevices(resource string) sets.String { + m.mutex.Lock() + defer m.mutex.Unlock() // Strip all devices in use from the list of healthy ones. return m.healthyDevices[resource].Difference(m.allocatedDevices[resource]) } func (m *ManagerImpl) generateDeviceTopologyHints(resource string, available sets.String, reusable sets.String, request int) []topologymanager.TopologyHint { + m.mutex.Lock() + defer m.mutex.Unlock() // Initialize minAffinitySize to include all NUMA Nodes minAffinitySize := len(m.numaNodes) From 997cefc9da208e9b01ca848e48cddbfc09cbb9d8 Mon Sep 17 00:00:00 2001 From: huyinhou Date: Wed, 28 Dec 2022 19:11:52 +0800 Subject: [PATCH 2/6] add unit test --- pkg/kubelet/cm/devicemanager/manager_test.go | 56 +++++++++++++++++--- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 5d12ea318c4..653b94417a4 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -111,7 +111,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) + m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName, nil) cleanup(t, m, p) } @@ -201,7 +201,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { {ID: "Dev3", Health: pluginapi.Healthy}, } - m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName) + m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, nil) // Wait for the first callback to be issued. select { @@ -252,9 +252,10 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { cleanup(t, m, p1) } -func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) { +func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, + topology []cadvisorapi.Node) (Manager, <-chan interface{}) { topologyStore := topologymanager.NewFakeManager() - m, err := newManagerImpl(socketName, nil, topologyStore) + m, err := newManagerImpl(socketName, topology, topologyStore) require.NoError(t, err) updateChan := make(chan interface{}) @@ -302,13 +303,13 @@ func runPluginManager(pluginManager pluginmanager.PluginManager) { } func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) { - m, updateChan := setupDeviceManager(t, devs, callback, socketName) + m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil) p := setupDevicePlugin(t, devs, pluginSocketName) return m, updateChan, p } -func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { - m, updateChan := setupDeviceManager(t, devs, callback, socketName) +func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, topology []cadvisorapi.Node) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { + m, updateChan := setupDeviceManager(t, devs, callback, socketName, topology) p := setupDevicePlugin(t, devs, pluginSocketName) pm := setupPluginManager(t, pluginSocketName, m) return m, updateChan, p, pm @@ -1403,3 +1404,44 @@ func TestReadPreNUMACheckpoint(t *testing.T) { err = m.readCheckpoint() require.NoError(t, err) } + +func TestGetTopologyHintsWithUpdates(t *testing.T) { + socketDir, socketName, pluginSocketName, err := tmpSocketDir() + defer os.RemoveAll(socketDir) + testPod := makePod(v1.ResourceList{ + testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI)}) + require.NoError(t, err) + + devs := []*pluginapi.Device{} + for i := 0; i < 5000; i++ { + devs = append(devs, &pluginapi.Device{ + ID: fmt.Sprintf("dev-%d", i), + Health: pluginapi.Healthy, + Topology: &pluginapi.TopologyInfo{ + Nodes: []*pluginapi.NUMANode{ + {ID: 0}, + }, + }}) + } + topology := []cadvisorapi.Node{ + {Id: 0}, + } + m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, topology) + + <-ch + go func() { + p1.Update(devs) + }() + + updated := false + for i := 0; i < 5000 && !updated; i++ { + m.GetTopologyHints(testPod, &testPod.Spec.Containers[0]) + select { + case <-ch: + updated = true + default: + } + } + + cleanup(t, m, p1) +} From b9987eeb6c5ad9a4c64bd2a140dbe339e16bdf62 Mon Sep 17 00:00:00 2001 From: huyinhou Date: Thu, 29 Dec 2022 18:27:08 +0800 Subject: [PATCH 3/6] fix allDevices map data race --- pkg/kubelet/cm/devicemanager/manager.go | 2 +- pkg/kubelet/cm/devicemanager/topology_hints.go | 8 +++++++- pkg/kubelet/cm/devicemanager/topology_hints_test.go | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 8cb57aa8190..5c3e4ddee0a 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -646,7 +646,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.String) (sets.String, sets.String, sets.String) { // If alignment information is not available, just pass the available list back. hint := m.topologyAffinityStore.GetAffinity(podUID, contName) - if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil { + if !m.deviceHasTopologyAlignmentLocked(resource) || hint.NUMANodeAffinity == nil { return sets.NewString(), sets.NewString(), available } diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index d45f45c1143..e32d97c4adb 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -136,7 +136,7 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana return deviceHints } -func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { +func (m *ManagerImpl) deviceHasTopologyAlignmentLocked(resource string) bool { // If any device has Topology NUMANodes available, we assume they care about alignment. for _, device := range m.allDevices[resource] { if device.Topology != nil && len(device.Topology.Nodes) > 0 { @@ -146,6 +146,12 @@ func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { return false } +func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.deviceHasTopologyAlignmentLocked(resource) +} + func (m *ManagerImpl) getAvailableDevices(resource string) sets.String { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index a7bc5157366..33becf9febf 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -456,7 +456,7 @@ func TestTopologyAlignedAllocation(t *testing.T) { } alignment := make(map[int]int) - if m.deviceHasTopologyAlignment(tc.resource) { + if m.deviceHasTopologyAlignmentLocked(tc.resource) { for d := range allocated { if m.allDevices[tc.resource][d].Topology != nil { alignment[int(m.allDevices[tc.resource][d].Topology.Nodes[0].ID)]++ From 4702503d15bab810a128338f26120b3120a20836 Mon Sep 17 00:00:00 2001 From: huyinhou Date: Thu, 29 Dec 2022 19:00:14 +0800 Subject: [PATCH 4/6] update test case Signed-off-by: huyinhou --- pkg/kubelet/cm/devicemanager/manager_test.go | 99 +++++++++++++++----- 1 file changed, 73 insertions(+), 26 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 653b94417a4..08c5a39ada4 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -21,6 +21,8 @@ import ( "os" "path/filepath" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -111,7 +113,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName, nil) + m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) cleanup(t, m, p) } @@ -201,7 +203,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { {ID: "Dev3", Health: pluginapi.Healthy}, } - m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, nil) + m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName) // Wait for the first callback to be issued. select { @@ -308,8 +310,8 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc return m, updateChan, p } -func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, topology []cadvisorapi.Node) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { - m, updateChan := setupDeviceManager(t, devs, callback, socketName, topology) +func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { + m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil) p := setupDevicePlugin(t, devs, pluginSocketName) pm := setupPluginManager(t, pluginSocketName, m) return m, updateChan, p, pm @@ -1406,15 +1408,13 @@ func TestReadPreNUMACheckpoint(t *testing.T) { } func TestGetTopologyHintsWithUpdates(t *testing.T) { - socketDir, socketName, pluginSocketName, err := tmpSocketDir() + socketDir, socketName, _, err := tmpSocketDir() defer os.RemoveAll(socketDir) - testPod := makePod(v1.ResourceList{ - testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI)}) require.NoError(t, err) - devs := []*pluginapi.Device{} - for i := 0; i < 5000; i++ { - devs = append(devs, &pluginapi.Device{ + devs := []pluginapi.Device{} + for i := 0; i < 1000; i++ { + devs = append(devs, pluginapi.Device{ ID: fmt.Sprintf("dev-%d", i), Health: pluginapi.Healthy, Topology: &pluginapi.TopologyInfo{ @@ -1426,22 +1426,69 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) { topology := []cadvisorapi.Node{ {Id: 0}, } - m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName, topology) - - <-ch - go func() { - p1.Update(devs) - }() - - updated := false - for i := 0; i < 5000 && !updated; i++ { - m.GetTopologyHints(testPod, &testPod.Spec.Containers[0]) - select { - case <-ch: - updated = true - default: - } + testCases := []struct { + description string + count int + devices []pluginapi.Device + testfunc func(manager *wrappedManagerImpl) + }{ + { + description: "getAvailableDevices data race when update device", + count: 1, + devices: devs, + testfunc: func(manager *wrappedManagerImpl) { + manager.getAvailableDevices(testResourceName) + }, + }, + { + description: "generateDeviceTopologyHints data race when update device", + count: 1, + devices: devs, + testfunc: func(manager *wrappedManagerImpl) { + manager.generateDeviceTopologyHints( + testResourceName, sets.NewString(), sets.NewString(), 1) + }, + }, + { + description: "deviceHasTopologyAlignment data race when update device", + count: 1000, + devices: devs[:1], + testfunc: func(manager *wrappedManagerImpl) { + manager.deviceHasTopologyAlignment(testResourceName) + }, + }, } - cleanup(t, m, p1) + for _, test := range testCases { + t.Run(test.description, func(t *testing.T) { + m, _ := setupDeviceManager(t, nil, nil, socketName, topology) + defer m.Stop() + mimpl := m.(*wrappedManagerImpl) + + wg := sync.WaitGroup{} + wg.Add(2) + + updated := atomic.Bool{} + updated.Store(false) + go func() { + defer wg.Done() + for i := 0; i < test.count; i++ { + // simulate the device plugin to send device updates + mimpl.genericDeviceUpdateCallback(testResourceName, devs) + } + updated.Store(true) + }() + go func() { + defer wg.Done() + for !updated.Load() { + // When a data race occurs, golang will throw an error, and recover() cannot catch this error, + // Such as: `throw("Concurrent map iteration and map writing")`. + // When this test ends quietly, no data race error occurs. + // Otherwise, the test process exits automatically and prints all goroutine call stacks. + test.testfunc(mimpl) + } + }() + wg.Wait() + }) + } } From 32495ae3f1a85e8a4c2c47a878232e94561a97ad Mon Sep 17 00:00:00 2001 From: huyinhou Date: Mon, 20 Feb 2023 10:46:04 +0800 Subject: [PATCH 5/6] add lock in generate topology hints function --- pkg/kubelet/cm/devicemanager/manager.go | 2 +- pkg/kubelet/cm/devicemanager/manager_test.go | 24 ++--- .../cm/devicemanager/topology_hints.go | 97 ++++++++++--------- .../cm/devicemanager/topology_hints_test.go | 2 +- 4 files changed, 60 insertions(+), 65 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 5c3e4ddee0a..8cb57aa8190 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -646,7 +646,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.String) (sets.String, sets.String, sets.String) { // If alignment information is not available, just pass the available list back. hint := m.topologyAffinityStore.GetAffinity(podUID, contName) - if !m.deviceHasTopologyAlignmentLocked(resource) || hint.NUMANodeAffinity == nil { + if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil { return sets.NewString(), sets.NewString(), available } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 08c5a39ada4..6b7830c2316 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -1423,6 +1423,9 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) { }, }}) } + testPod := makePod(v1.ResourceList{ + testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI), + }) topology := []cadvisorapi.Node{ {Id: 0}, } @@ -1433,28 +1436,19 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) { testfunc func(manager *wrappedManagerImpl) }{ { - description: "getAvailableDevices data race when update device", - count: 1, + description: "GetTopologyHints data race when update device", + count: 10, devices: devs, testfunc: func(manager *wrappedManagerImpl) { - manager.getAvailableDevices(testResourceName) + manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0]) }, }, { - description: "generateDeviceTopologyHints data race when update device", - count: 1, + description: "GetPodTopologyHints data race when update device", + count: 10, devices: devs, testfunc: func(manager *wrappedManagerImpl) { - manager.generateDeviceTopologyHints( - testResourceName, sets.NewString(), sets.NewString(), 1) - }, - }, - { - description: "deviceHasTopologyAlignment data race when update device", - count: 1000, - devices: devs[:1], - testfunc: func(manager *wrappedManagerImpl) { - manager.deviceHasTopologyAlignment(testResourceName) + manager.GetPodTopologyHints(testPod) }, }, } diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index e32d97c4adb..9e03a9029b9 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -38,47 +38,44 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map // Loop through all device resources and generate TopologyHints for them.. deviceHints := make(map[string][]topologymanager.TopologyHint) - for resourceObj, requestedObj := range container.Resources.Limits { - resource := string(resourceObj) - requested := int(requestedObj.Value()) + accumulatedResourceRequests := m.getContainerDeviceRequest(container) + m.mutex.Lock() + defer m.mutex.Unlock() + for resource, requested := range accumulatedResourceRequests { + // Only consider devices that actually container topology information. + if aligned := m.deviceHasTopologyAlignment(resource); !aligned { + klog.InfoS("Resource does not have a topology preference", "resource", resource) + deviceHints[resource] = nil + continue + } - // Only consider resources associated with a device plugin. - if m.isDevicePluginResource(resource) { - // Only consider devices that actually container topology information. - if aligned := m.deviceHasTopologyAlignment(resource); !aligned { - klog.InfoS("Resource does not have a topology preference", "resource", resource) - deviceHints[resource] = nil - continue - } - - // Short circuit to regenerate the same hints if there are already - // devices allocated to the Container. This might happen after a - // kubelet restart, for example. - allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) - if allocated.Len() > 0 { - if allocated.Len() != requested { - klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len()) - deviceHints[resource] = []topologymanager.TopologyHint{} - continue - } - klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name) - deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested) - continue - } - - // Get the list of available devices, for which TopologyHints should be generated. - available := m.getAvailableDevices(resource) - reusable := m.devicesToReuse[string(pod.UID)][resource] - if available.Union(reusable).Len() < requested { - klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len()) + // Short circuit to regenerate the same hints if there are already + // devices allocated to the Container. This might happen after a + // kubelet restart, for example. + allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) + if allocated.Len() > 0 { + if allocated.Len() != requested { + klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len()) deviceHints[resource] = []topologymanager.TopologyHint{} continue } - - // Generate TopologyHints for this resource given the current - // request size and the list of available devices. - deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested) + klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name) + deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested) + continue } + + // Get the list of available devices, for which TopologyHints should be generated. + available := m.getAvailableDevices(resource) + reusable := m.devicesToReuse[string(pod.UID)][resource] + if available.Union(reusable).Len() < requested { + klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len()) + deviceHints[resource] = []topologymanager.TopologyHint{} + continue + } + + // Generate TopologyHints for this resource given the current + // request size and the list of available devices. + deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested) } return deviceHints @@ -96,7 +93,8 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana deviceHints := make(map[string][]topologymanager.TopologyHint) accumulatedResourceRequests := m.getPodDeviceRequest(pod) - + m.mutex.Lock() + defer m.mutex.Unlock() for resource, requested := range accumulatedResourceRequests { // Only consider devices that actually contain topology information. if aligned := m.deviceHasTopologyAlignment(resource); !aligned { @@ -136,7 +134,7 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana return deviceHints } -func (m *ManagerImpl) deviceHasTopologyAlignmentLocked(resource string) bool { +func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { // If any device has Topology NUMANodes available, we assume they care about alignment. for _, device := range m.allDevices[resource] { if device.Topology != nil && len(device.Topology.Nodes) > 0 { @@ -146,22 +144,12 @@ func (m *ManagerImpl) deviceHasTopologyAlignmentLocked(resource string) bool { return false } -func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { - m.mutex.Lock() - defer m.mutex.Unlock() - return m.deviceHasTopologyAlignmentLocked(resource) -} - func (m *ManagerImpl) getAvailableDevices(resource string) sets.String { - m.mutex.Lock() - defer m.mutex.Unlock() // Strip all devices in use from the list of healthy ones. return m.healthyDevices[resource].Difference(m.allocatedDevices[resource]) } func (m *ManagerImpl) generateDeviceTopologyHints(resource string, available sets.String, reusable sets.String, request int) []topologymanager.TopologyHint { - m.mutex.Lock() - defer m.mutex.Unlock() // Initialize minAffinitySize to include all NUMA Nodes minAffinitySize := len(m.numaNodes) @@ -307,3 +295,16 @@ func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int { return podRequests } + +func (m *ManagerImpl) getContainerDeviceRequest(container *v1.Container) map[string]int { + containerRequests := make(map[string]int) + for resourceObj, requestedObj := range container.Resources.Limits { + resource := string(resourceObj) + requested := int(requestedObj.Value()) + if !m.isDevicePluginResource(resource) { + continue + } + containerRequests[resource] = requested + } + return containerRequests +} diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index 33becf9febf..a7bc5157366 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -456,7 +456,7 @@ func TestTopologyAlignedAllocation(t *testing.T) { } alignment := make(map[int]int) - if m.deviceHasTopologyAlignmentLocked(tc.resource) { + if m.deviceHasTopologyAlignment(tc.resource) { for d := range allocated { if m.allDevices[tc.resource][d].Topology != nil { alignment[int(m.allDevices[tc.resource][d].Topology.Nodes[0].ID)]++ From 88274d96fcf3817e9d1328e5e7c026bda172090e Mon Sep 17 00:00:00 2001 From: huyinhou Date: Mon, 6 Mar 2023 14:23:14 +0800 Subject: [PATCH 6/6] update code style Signed-off-by: huyinhou --- pkg/kubelet/cm/devicemanager/topology_hints.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 9e03a9029b9..709cd4d4653 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -39,10 +39,11 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map // Loop through all device resources and generate TopologyHints for them.. deviceHints := make(map[string][]topologymanager.TopologyHint) accumulatedResourceRequests := m.getContainerDeviceRequest(container) + m.mutex.Lock() defer m.mutex.Unlock() for resource, requested := range accumulatedResourceRequests { - // Only consider devices that actually container topology information. + // Only consider devices that actually contain topology information. if aligned := m.deviceHasTopologyAlignment(resource); !aligned { klog.InfoS("Resource does not have a topology preference", "resource", resource) deviceHints[resource] = nil @@ -93,6 +94,7 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana deviceHints := make(map[string][]topologymanager.TopologyHint) accumulatedResourceRequests := m.getPodDeviceRequest(pod) + m.mutex.Lock() defer m.mutex.Unlock() for resource, requested := range accumulatedResourceRequests {