diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 5d12ea318c4..6b7830c2316 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -21,6 +21,8 @@ import ( "os" "path/filepath" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -252,9 +254,10 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { cleanup(t, m, p1) } -func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) { +func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, + topology []cadvisorapi.Node) (Manager, <-chan interface{}) { topologyStore := topologymanager.NewFakeManager() - m, err := newManagerImpl(socketName, nil, topologyStore) + m, err := newManagerImpl(socketName, topology, topologyStore) require.NoError(t, err) updateChan := make(chan interface{}) @@ -302,13 +305,13 @@ func runPluginManager(pluginManager pluginmanager.PluginManager) { } func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) { - m, updateChan := setupDeviceManager(t, devs, callback, socketName) + m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil) p := setupDevicePlugin(t, devs, pluginSocketName) return m, updateChan, p } func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { - m, updateChan := setupDeviceManager(t, devs, callback, socketName) + m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil) p := setupDevicePlugin(t, devs, pluginSocketName) pm := setupPluginManager(t, pluginSocketName, m) return m, updateChan, p, pm @@ -1403,3 +1406,83 @@ func TestReadPreNUMACheckpoint(t *testing.T) { err = m.readCheckpoint() require.NoError(t, err) } + +func TestGetTopologyHintsWithUpdates(t *testing.T) { + socketDir, socketName, _, err := tmpSocketDir() + defer os.RemoveAll(socketDir) + require.NoError(t, err) + + devs := []pluginapi.Device{} + for i := 0; i < 1000; i++ { + devs = append(devs, pluginapi.Device{ + ID: fmt.Sprintf("dev-%d", i), + Health: pluginapi.Healthy, + Topology: &pluginapi.TopologyInfo{ + Nodes: []*pluginapi.NUMANode{ + {ID: 0}, + }, + }}) + } + testPod := makePod(v1.ResourceList{ + testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI), + }) + topology := []cadvisorapi.Node{ + {Id: 0}, + } + testCases := []struct { + description string + count int + devices []pluginapi.Device + testfunc func(manager *wrappedManagerImpl) + }{ + { + description: "GetTopologyHints data race when update device", + count: 10, + devices: devs, + testfunc: func(manager *wrappedManagerImpl) { + manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0]) + }, + }, + { + description: "GetPodTopologyHints data race when update device", + count: 10, + devices: devs, + testfunc: func(manager *wrappedManagerImpl) { + manager.GetPodTopologyHints(testPod) + }, + }, + } + + for _, test := range testCases { + t.Run(test.description, func(t *testing.T) { + m, _ := setupDeviceManager(t, nil, nil, socketName, topology) + defer m.Stop() + mimpl := m.(*wrappedManagerImpl) + + wg := sync.WaitGroup{} + wg.Add(2) + + updated := atomic.Bool{} + updated.Store(false) + go func() { + defer wg.Done() + for i := 0; i < test.count; i++ { + // simulate the device plugin to send device updates + mimpl.genericDeviceUpdateCallback(testResourceName, devs) + } + updated.Store(true) + }() + go func() { + defer wg.Done() + for !updated.Load() { + // When a data race occurs, golang will throw an error, and recover() cannot catch this error, + // Such as: `throw("Concurrent map iteration and map writing")`. + // When this test ends quietly, no data race error occurs. + // Otherwise, the test process exits automatically and prints all goroutine call stacks. + test.testfunc(mimpl) + } + }() + wg.Wait() + }) + } +} diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 8e9521d8d12..709cd4d4653 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -38,47 +38,45 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map // Loop through all device resources and generate TopologyHints for them.. deviceHints := make(map[string][]topologymanager.TopologyHint) - for resourceObj, requestedObj := range container.Resources.Limits { - resource := string(resourceObj) - requested := int(requestedObj.Value()) + accumulatedResourceRequests := m.getContainerDeviceRequest(container) - // Only consider resources associated with a device plugin. - if m.isDevicePluginResource(resource) { - // Only consider devices that actually container topology information. - if aligned := m.deviceHasTopologyAlignment(resource); !aligned { - klog.InfoS("Resource does not have a topology preference", "resource", resource) - deviceHints[resource] = nil - continue - } + m.mutex.Lock() + defer m.mutex.Unlock() + for resource, requested := range accumulatedResourceRequests { + // Only consider devices that actually contain topology information. + if aligned := m.deviceHasTopologyAlignment(resource); !aligned { + klog.InfoS("Resource does not have a topology preference", "resource", resource) + deviceHints[resource] = nil + continue + } - // Short circuit to regenerate the same hints if there are already - // devices allocated to the Container. This might happen after a - // kubelet restart, for example. - allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) - if allocated.Len() > 0 { - if allocated.Len() != requested { - klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len()) - deviceHints[resource] = []topologymanager.TopologyHint{} - continue - } - klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name) - deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested) - continue - } - - // Get the list of available devices, for which TopologyHints should be generated. - available := m.getAvailableDevices(resource) - reusable := m.devicesToReuse[string(pod.UID)][resource] - if available.Union(reusable).Len() < requested { - klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len()) + // Short circuit to regenerate the same hints if there are already + // devices allocated to the Container. This might happen after a + // kubelet restart, for example. + allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) + if allocated.Len() > 0 { + if allocated.Len() != requested { + klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len()) deviceHints[resource] = []topologymanager.TopologyHint{} continue } - - // Generate TopologyHints for this resource given the current - // request size and the list of available devices. - deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested) + klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name) + deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested) + continue } + + // Get the list of available devices, for which TopologyHints should be generated. + available := m.getAvailableDevices(resource) + reusable := m.devicesToReuse[string(pod.UID)][resource] + if available.Union(reusable).Len() < requested { + klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len()) + deviceHints[resource] = []topologymanager.TopologyHint{} + continue + } + + // Generate TopologyHints for this resource given the current + // request size and the list of available devices. + deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested) } return deviceHints @@ -97,6 +95,8 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana deviceHints := make(map[string][]topologymanager.TopologyHint) accumulatedResourceRequests := m.getPodDeviceRequest(pod) + m.mutex.Lock() + defer m.mutex.Unlock() for resource, requested := range accumulatedResourceRequests { // Only consider devices that actually contain topology information. if aligned := m.deviceHasTopologyAlignment(resource); !aligned { @@ -297,3 +297,16 @@ func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int { return podRequests } + +func (m *ManagerImpl) getContainerDeviceRequest(container *v1.Container) map[string]int { + containerRequests := make(map[string]int) + for resourceObj, requestedObj := range container.Resources.Limits { + resource := string(resourceObj) + requested := int(requestedObj.Value()) + if !m.isDevicePluginResource(resource) { + continue + } + containerRequests[resource] = requested + } + return containerRequests +}