Merge pull request #114572 from huyinhou/fix-concurrent-map-access

kubelet/deviceplugin: fix concurrent map iteration and map write
This commit is contained in:
Kubernetes Prow Robot 2023-03-06 06:06:29 -08:00 committed by GitHub
commit 68eea2468c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 135 additions and 39 deletions

View File

@ -21,6 +21,8 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -252,9 +254,10 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
cleanup(t, m, p1) cleanup(t, m, p1)
} }
func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) { func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string,
topology []cadvisorapi.Node) (Manager, <-chan interface{}) {
topologyStore := topologymanager.NewFakeManager() topologyStore := topologymanager.NewFakeManager()
m, err := newManagerImpl(socketName, nil, topologyStore) m, err := newManagerImpl(socketName, topology, topologyStore)
require.NoError(t, err) require.NoError(t, err)
updateChan := make(chan interface{}) updateChan := make(chan interface{})
@ -302,13 +305,13 @@ func runPluginManager(pluginManager pluginmanager.PluginManager) {
} }
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) { func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) {
m, updateChan := setupDeviceManager(t, devs, callback, socketName) m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil)
p := setupDevicePlugin(t, devs, pluginSocketName) p := setupDevicePlugin(t, devs, pluginSocketName)
return m, updateChan, p return m, updateChan, p
} }
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) {
m, updateChan := setupDeviceManager(t, devs, callback, socketName) m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil)
p := setupDevicePlugin(t, devs, pluginSocketName) p := setupDevicePlugin(t, devs, pluginSocketName)
pm := setupPluginManager(t, pluginSocketName, m) pm := setupPluginManager(t, pluginSocketName, m)
return m, updateChan, p, pm return m, updateChan, p, pm
@ -1403,3 +1406,83 @@ func TestReadPreNUMACheckpoint(t *testing.T) {
err = m.readCheckpoint() err = m.readCheckpoint()
require.NoError(t, err) require.NoError(t, err)
} }
func TestGetTopologyHintsWithUpdates(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir()
defer os.RemoveAll(socketDir)
require.NoError(t, err)
devs := []pluginapi.Device{}
for i := 0; i < 1000; i++ {
devs = append(devs, pluginapi.Device{
ID: fmt.Sprintf("dev-%d", i),
Health: pluginapi.Healthy,
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{ID: 0},
},
}})
}
testPod := makePod(v1.ResourceList{
testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI),
})
topology := []cadvisorapi.Node{
{Id: 0},
}
testCases := []struct {
description string
count int
devices []pluginapi.Device
testfunc func(manager *wrappedManagerImpl)
}{
{
description: "GetTopologyHints data race when update device",
count: 10,
devices: devs,
testfunc: func(manager *wrappedManagerImpl) {
manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0])
},
},
{
description: "GetPodTopologyHints data race when update device",
count: 10,
devices: devs,
testfunc: func(manager *wrappedManagerImpl) {
manager.GetPodTopologyHints(testPod)
},
},
}
for _, test := range testCases {
t.Run(test.description, func(t *testing.T) {
m, _ := setupDeviceManager(t, nil, nil, socketName, topology)
defer m.Stop()
mimpl := m.(*wrappedManagerImpl)
wg := sync.WaitGroup{}
wg.Add(2)
updated := atomic.Bool{}
updated.Store(false)
go func() {
defer wg.Done()
for i := 0; i < test.count; i++ {
// simulate the device plugin to send device updates
mimpl.genericDeviceUpdateCallback(testResourceName, devs)
}
updated.Store(true)
}()
go func() {
defer wg.Done()
for !updated.Load() {
// When a data race occurs, golang will throw an error, and recover() cannot catch this error,
// Such as: `throw("Concurrent map iteration and map writing")`.
// When this test ends quietly, no data race error occurs.
// Otherwise, the test process exits automatically and prints all goroutine call stacks.
test.testfunc(mimpl)
}
}()
wg.Wait()
})
}
}

View File

@ -38,47 +38,45 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
// Loop through all device resources and generate TopologyHints for them.. // Loop through all device resources and generate TopologyHints for them..
deviceHints := make(map[string][]topologymanager.TopologyHint) deviceHints := make(map[string][]topologymanager.TopologyHint)
for resourceObj, requestedObj := range container.Resources.Limits { accumulatedResourceRequests := m.getContainerDeviceRequest(container)
resource := string(resourceObj)
requested := int(requestedObj.Value())
// Only consider resources associated with a device plugin. m.mutex.Lock()
if m.isDevicePluginResource(resource) { defer m.mutex.Unlock()
// Only consider devices that actually container topology information. for resource, requested := range accumulatedResourceRequests {
if aligned := m.deviceHasTopologyAlignment(resource); !aligned { // Only consider devices that actually contain topology information.
klog.InfoS("Resource does not have a topology preference", "resource", resource) if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
deviceHints[resource] = nil klog.InfoS("Resource does not have a topology preference", "resource", resource)
continue deviceHints[resource] = nil
} continue
}
// Short circuit to regenerate the same hints if there are already // Short circuit to regenerate the same hints if there are already
// devices allocated to the Container. This might happen after a // devices allocated to the Container. This might happen after a
// kubelet restart, for example. // kubelet restart, for example.
allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
if allocated.Len() > 0 { if allocated.Len() > 0 {
if allocated.Len() != requested { if allocated.Len() != requested {
klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len()) klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name)
deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested)
continue
}
// Get the list of available devices, for which TopologyHints should be generated.
available := m.getAvailableDevices(resource)
reusable := m.devicesToReuse[string(pod.UID)][resource]
if available.Union(reusable).Len() < requested {
klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len())
deviceHints[resource] = []topologymanager.TopologyHint{} deviceHints[resource] = []topologymanager.TopologyHint{}
continue continue
} }
klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name)
// Generate TopologyHints for this resource given the current deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested)
// request size and the list of available devices. continue
deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested)
} }
// Get the list of available devices, for which TopologyHints should be generated.
available := m.getAvailableDevices(resource)
reusable := m.devicesToReuse[string(pod.UID)][resource]
if available.Union(reusable).Len() < requested {
klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
// Generate TopologyHints for this resource given the current
// request size and the list of available devices.
deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested)
} }
return deviceHints return deviceHints
@ -97,6 +95,8 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana
deviceHints := make(map[string][]topologymanager.TopologyHint) deviceHints := make(map[string][]topologymanager.TopologyHint)
accumulatedResourceRequests := m.getPodDeviceRequest(pod) accumulatedResourceRequests := m.getPodDeviceRequest(pod)
m.mutex.Lock()
defer m.mutex.Unlock()
for resource, requested := range accumulatedResourceRequests { for resource, requested := range accumulatedResourceRequests {
// Only consider devices that actually contain topology information. // Only consider devices that actually contain topology information.
if aligned := m.deviceHasTopologyAlignment(resource); !aligned { if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
@ -297,3 +297,16 @@ func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int {
return podRequests return podRequests
} }
func (m *ManagerImpl) getContainerDeviceRequest(container *v1.Container) map[string]int {
containerRequests := make(map[string]int)
for resourceObj, requestedObj := range container.Resources.Limits {
resource := string(resourceObj)
requested := int(requestedObj.Value())
if !m.isDevicePluginResource(resource) {
continue
}
containerRequests[resource] = requested
}
return containerRequests
}