Merge pull request #84525 from klueska/upstream-fix-hint-generation-after-kubelet-restart

Fix bug in TopologyManager hint generation after kubelet restart
This commit is contained in:
Kubernetes Prow Robot
2019-11-06 15:33:50 -08:00
committed by GitHub
7 changed files with 348 additions and 34 deletions

View File

@@ -229,6 +229,8 @@ func (m *manager) State() state.Reader {
}
func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
}
@@ -239,15 +241,68 @@ type reconciledContainer struct {
containerID string
}
func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
func (m *manager) removeStaleState() {
// Only once all sources are ready do we attempt to remove any stale state.
// This ensures that the call to `m.activePods()` below will succeed with
// the actual active pods list.
if !m.sourcesReady.AllReady() {
return
}
// We grab the lock to ensure that no new containers will grab CPUs while
// executing the code below. Without this lock, its possible that we end up
// removing state that is newly added by an asynchronous call to
// AddContainer() during the execution of this code.
m.Lock()
defer m.Unlock()
// We remove stale state very conservatively, only removing *any* state
// once we know for sure that we wont be accidentally removing state that
// is still valid. Since this function is called periodically, we will just
// try again next time this function is called.
activePods := m.activePods()
if len(activePods) == 0 {
// If there are no active pods, skip the removal of stale state.
return
}
// Build a list of containerIDs for all containers in all active Pods.
activeContainers := make(map[string]struct{})
for _, pod := range activePods {
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
// If even one pod does not have it's status set, skip state removal.
return
}
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
containerID, err := findContainerIDByName(&pstatus, container.Name)
if err != nil {
// If even one container does not have it's containerID set, skip state removal.
return
}
activeContainers[containerID] = struct{}{}
}
}
// Loop through the CPUManager state. Remove any state for containers not
// in the `activeContainers` list built above. The shortcircuits in place
// above ensure that no erroneous state will ever be removed.
for containerID := range m.state.GetCPUAssignments() {
if _, ok := activeContainers[containerID]; !ok {
klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID)
err := m.policy.RemoveContainer(m.state, containerID)
if err != nil {
klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err)
}
}
}
}
func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
success = []reconciledContainer{}
failure = []reconciledContainer{}
activeContainers := make(map[string]*v1.Pod)
m.removeStaleState()
for _, pod := range m.activePods() {
allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
@@ -286,8 +341,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
}
}
activeContainers[containerID] = pod
cset := m.state.GetCPUSetOrDefault(containerID)
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
@@ -306,16 +359,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
}
}
for containerID := range m.state.GetCPUAssignments() {
if pod, ok := activeContainers[containerID]; !ok {
err := m.RemoveContainer(containerID)
if err != nil {
klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err)
failure = append(failure, reconciledContainer{pod.Name, "", containerID})
}
}
}
return success, failure
}

View File

@@ -320,6 +320,23 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod v1.Pod, container v1.
return nil
}
// Short circuit to regenerate the same hints if there are already
// guaranteed CPUs allocated to the Container. This might happen after a
// kubelet restart, for example.
containerID, _ := findContainerIDByName(&pod.Status, container.Name)
if allocated, exists := s.GetCPUSet(containerID); exists {
if allocated.Size() != requested {
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size())
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): {},
}
}
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", string(pod.UID), container.Name)
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, requested),
}
}
// Get a list of available CPUs.
available := p.assignableCPUs(s)

View File

@@ -213,6 +213,20 @@ func TestStaticPolicyAdd(t *testing.T) {
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 5),
},
{
description: "GuPodMultipleCores, SingleSocketHT, ExpectSameAllocation",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID3": cpuset.NewCPUSet(2, 3, 6, 7),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
pod: makePod("4000m", "4000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(2, 3, 6, 7),
},
{
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocket",
topo: topoDualSocketHT,

View File

@@ -23,6 +23,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@@ -72,6 +73,7 @@ func TestGetTopologyHints(t *testing.T) {
name string
pod v1.Pod
container v1.Container
assignments state.ContainerCPUAssignments
defaultCPUSet cpuset.CPUSet
expectedHints []topologymanager.TopologyHint
}{
@@ -142,6 +144,86 @@ func TestGetTopologyHints(t *testing.T) {
},
},
},
{
name: "Request more CPUs than available",
pod: *testPod2,
container: *testContainer2,
defaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
expectedHints: nil,
},
{
name: "Regenerate Single-Node NUMA Hints if already allocated 1/2",
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6),
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
{
NUMANodeAffinity: firstSocketMask,
Preferred: true,
},
{
NUMANodeAffinity: crossSocketMask,
Preferred: false,
},
},
},
{
name: "Regenerate Single-Node NUMA Hints if already allocated 1/2",
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(3, 9),
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
{
NUMANodeAffinity: secondSocketMask,
Preferred: true,
},
{
NUMANodeAffinity: crossSocketMask,
Preferred: false,
},
},
},
{
name: "Regenerate Cross-NUMA Hints if already allocated",
pod: *testPod4,
container: *testContainer4,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
{
NUMANodeAffinity: crossSocketMask,
Preferred: true,
},
},
},
{
name: "Requested less than already allocated",
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6, 3, 9),
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{},
},
{
name: "Requested more than already allocated",
pod: *testPod4,
container: *testContainer4,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6, 3, 9),
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{},
},
}
for _, tc := range tcases {
topology, _ := topology.Discover(&machineInfo, numaNodeInfo)
@@ -151,9 +233,13 @@ func TestGetTopologyHints(t *testing.T) {
topology: topology,
},
state: &mockState{
assignments: tc.assignments,
defaultCPUSet: tc.defaultCPUSet,
},
topology: topology,
topology: topology,
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
sourcesReady: &sourcesReadyStub{},
}
hints := m.GetTopologyHints(tc.pod, tc.container)[string(v1.ResourceCPU)]

View File

@@ -60,6 +60,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@@ -28,19 +28,40 @@ import (
// ensures the Device Manager is consulted when Topology Aware Hints for each
// container are created.
func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint {
deviceHints := make(map[string][]topologymanager.TopologyHint)
// Garbage collect any stranded device resources before providing TopologyHints
m.updateAllocatedDevices(m.activePods())
// Loop through all device resources and generate TopologyHints for them..
deviceHints := make(map[string][]topologymanager.TopologyHint)
for resourceObj, requestedObj := range container.Resources.Limits {
resource := string(resourceObj)
requested := int(requestedObj.Value())
// Only consider resources associated with a device plugin.
if m.isDevicePluginResource(resource) {
// Only consider devices that actually container topology information.
if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
klog.Infof("[devicemanager] Resource '%v' does not have a topology preference", resource)
deviceHints[resource] = nil
continue
}
// Short circuit to regenerate the same hints if there are already
// devices allocated to the Container. This might happen after a
// kubelet restart, for example.
allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
if allocated.Len() > 0 {
if allocated.Len() != requested {
klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", resource, string(pod.UID), container.Name, requested, allocated.Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v, container %v)", resource, string(pod.UID), container.Name)
deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, requested)
continue
}
// Get the list of available devices, for which TopologyHints should be generated.
available := m.getAvailableDevices(resource)
if available.Len() < requested {
klog.Errorf("[devicemanager] Unable to generate topology hints: requested number of devices unavailable for '%s': requested: %d, available: %d", resource, requested, available.Len())
@@ -48,6 +69,8 @@ func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[s
continue
}
// Generate TopologyHints for this resource given the current
// request size and the list of available devices.
deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, requested)
}
}
@@ -66,8 +89,6 @@ func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool {
}
func (m *ManagerImpl) getAvailableDevices(resource string) sets.String {
// Gets Devices in use.
m.updateAllocatedDevices(m.activePods())
// Strip all devices in use from the list of healthy ones.
return m.healthyDevices[resource].Difference(m.allocatedDevices[resource])
}

View File

@@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@@ -52,13 +53,17 @@ func makeSocketMask(sockets ...int) bitmask.BitMask {
func TestGetTopologyHints(t *testing.T) {
tcases := []struct {
description string
podUID string
containerName string
request map[string]string
devices map[string][]pluginapi.Device
allocatedDevices map[string][]string
allocatedDevices map[string]map[string]map[string][]string
expectedHints map[string][]topologymanager.TopologyHint
}{
{
description: "Single Request, no alignment",
description: "Single Request, no alignment",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "1",
},
@@ -73,7 +78,9 @@ func TestGetTopologyHints(t *testing.T) {
},
},
{
description: "Single Request, only one with alignment",
description: "Single Request, only one with alignment",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "1",
},
@@ -97,7 +104,9 @@ func TestGetTopologyHints(t *testing.T) {
},
},
{
description: "Single Request, one device per socket",
description: "Single Request, one device per socket",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "1",
},
@@ -125,7 +134,9 @@ func TestGetTopologyHints(t *testing.T) {
},
},
{
description: "Request for 2, one device per socket",
description: "Request for 2, one device per socket",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "2",
},
@@ -145,7 +156,9 @@ func TestGetTopologyHints(t *testing.T) {
},
},
{
description: "Request for 2, 2 devices per socket",
description: "Request for 2, 2 devices per socket",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "2",
},
@@ -175,7 +188,9 @@ func TestGetTopologyHints(t *testing.T) {
},
},
{
description: "Request for 2, optimal on 1 NUMA node, forced cross-NUMA",
description: "Request for 2, optimal on 1 NUMA node, forced cross-NUMA",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "2",
},
@@ -187,8 +202,12 @@ func TestGetTopologyHints(t *testing.T) {
makeNUMADevice("Dev4", 1),
},
},
allocatedDevices: map[string][]string{
"testdevice": {"Dev1", "Dev2"},
allocatedDevices: map[string]map[string]map[string][]string{
"fakePod": {
"fakeOtherContainer": {
"testdevice": {"Dev1", "Dev2"},
},
},
},
expectedHints: map[string][]topologymanager.TopologyHint{
"testdevice": {
@@ -200,7 +219,9 @@ func TestGetTopologyHints(t *testing.T) {
},
},
{
description: "2 device types, mixed configuration",
description: "2 device types, mixed configuration",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice1": "2",
"testdevice2": "1",
@@ -243,6 +264,110 @@ func TestGetTopologyHints(t *testing.T) {
},
},
},
{
description: "Single device type, more requested than available",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "6",
},
devices: map[string][]pluginapi.Device{
"testdevice": {
makeNUMADevice("Dev1", 0),
makeNUMADevice("Dev2", 0),
makeNUMADevice("Dev3", 1),
makeNUMADevice("Dev4", 1),
},
},
expectedHints: map[string][]topologymanager.TopologyHint{
"testdevice": {},
},
},
{
description: "Single device type, all already allocated to container",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "2",
},
devices: map[string][]pluginapi.Device{
"testdevice": {
makeNUMADevice("Dev1", 0),
makeNUMADevice("Dev2", 0),
},
},
allocatedDevices: map[string]map[string]map[string][]string{
"fakePod": {
"fakeContainer": {
"testdevice": {"Dev1", "Dev2"},
},
},
},
expectedHints: map[string][]topologymanager.TopologyHint{
"testdevice": {
{
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
{
NUMANodeAffinity: makeSocketMask(0, 1),
Preferred: false,
},
},
},
},
{
description: "Single device type, less already allocated to container than requested",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "4",
},
devices: map[string][]pluginapi.Device{
"testdevice": {
makeNUMADevice("Dev1", 0),
makeNUMADevice("Dev2", 0),
makeNUMADevice("Dev3", 1),
makeNUMADevice("Dev4", 1),
},
},
allocatedDevices: map[string]map[string]map[string][]string{
"fakePod": {
"fakeContainer": {
"testdevice": {"Dev1", "Dev2"},
},
},
},
expectedHints: map[string][]topologymanager.TopologyHint{
"testdevice": {},
},
},
{
description: "Single device type, more already allocated to container than requested",
podUID: "fakePod",
containerName: "fakeContainer",
request: map[string]string{
"testdevice": "2",
},
devices: map[string][]pluginapi.Device{
"testdevice": {
makeNUMADevice("Dev1", 0),
makeNUMADevice("Dev2", 0),
makeNUMADevice("Dev3", 1),
makeNUMADevice("Dev4", 1),
},
},
allocatedDevices: map[string]map[string]map[string][]string{
"fakePod": {
"fakeContainer": {
"testdevice": {"Dev1", "Dev2", "Dev3", "Dev4"},
},
},
},
expectedHints: map[string][]topologymanager.TopologyHint{
"testdevice": {},
},
},
}
for _, tc := range tcases {
@@ -252,6 +377,8 @@ func TestGetTopologyHints(t *testing.T) {
}
pod := makePod(resourceList)
pod.UID = types.UID(tc.podUID)
pod.Spec.Containers[0].Name = tc.containerName
m := ManagerImpl{
allDevices: make(map[string]map[string]pluginapi.Device),
@@ -259,7 +386,7 @@ func TestGetTopologyHints(t *testing.T) {
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
sourcesReady: &sourcesReadyStub{},
activePods: func() []*v1.Pod { return []*v1.Pod{} },
activePods: func() []*v1.Pod { return []*v1.Pod{pod} },
numaNodes: []int{0, 1},
}
@@ -273,11 +400,16 @@ func TestGetTopologyHints(t *testing.T) {
}
}
for r := range tc.allocatedDevices {
m.allocatedDevices[r] = sets.NewString()
for p := range tc.allocatedDevices {
for c := range tc.allocatedDevices[p] {
for r, devices := range tc.allocatedDevices[p][c] {
m.podDevices.insert(p, c, r, sets.NewString(devices...), nil)
for _, d := range tc.allocatedDevices[r] {
m.allocatedDevices[r].Insert(d)
m.allocatedDevices[r] = sets.NewString()
for _, d := range devices {
m.allocatedDevices[r].Insert(d)
}
}
}
}