diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 2ab8b2b37ad..69d3a0a0519 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -229,6 +229,8 @@ func (m *manager) State() state.Reader { } func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint { + // Garbage collect any stranded resources before providing TopologyHints + m.removeStaleState() // Delegate to active policy return m.policy.GetTopologyHints(m.state, pod, container) } @@ -239,15 +241,68 @@ type reconciledContainer struct { containerID string } -func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { +func (m *manager) removeStaleState() { + // Only once all sources are ready do we attempt to remove any stale state. + // This ensures that the call to `m.activePods()` below will succeed with + // the actual active pods list. if !m.sourcesReady.AllReady() { return } + + // We grab the lock to ensure that no new containers will grab CPUs while + // executing the code below. Without this lock, its possible that we end up + // removing state that is newly added by an asynchronous call to + // AddContainer() during the execution of this code. + m.Lock() + defer m.Unlock() + + // We remove stale state very conservatively, only removing *any* state + // once we know for sure that we wont be accidentally removing state that + // is still valid. Since this function is called periodically, we will just + // try again next time this function is called. + activePods := m.activePods() + if len(activePods) == 0 { + // If there are no active pods, skip the removal of stale state. + return + } + + // Build a list of containerIDs for all containers in all active Pods. + activeContainers := make(map[string]struct{}) + for _, pod := range activePods { + pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) + if !ok { + // If even one pod does not have it's status set, skip state removal. + return + } + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + containerID, err := findContainerIDByName(&pstatus, container.Name) + if err != nil { + // If even one container does not have it's containerID set, skip state removal. + return + } + activeContainers[containerID] = struct{}{} + } + } + + // Loop through the CPUManager state. Remove any state for containers not + // in the `activeContainers` list built above. The shortcircuits in place + // above ensure that no erroneous state will ever be removed. + for containerID := range m.state.GetCPUAssignments() { + if _, ok := activeContainers[containerID]; !ok { + klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID) + err := m.policy.RemoveContainer(m.state, containerID) + if err != nil { + klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err) + } + } + } +} + +func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { success = []reconciledContainer{} failure = []reconciledContainer{} - activeContainers := make(map[string]*v1.Pod) - + m.removeStaleState() for _, pod := range m.activePods() { allContainers := pod.Spec.InitContainers allContainers = append(allContainers, pod.Spec.Containers...) @@ -286,8 +341,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec } } - activeContainers[containerID] = pod - cset := m.state.GetCPUSetOrDefault(containerID) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. @@ -306,16 +359,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec success = append(success, reconciledContainer{pod.Name, container.Name, containerID}) } } - - for containerID := range m.state.GetCPUAssignments() { - if pod, ok := activeContainers[containerID]; !ok { - err := m.RemoveContainer(containerID) - if err != nil { - klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err) - failure = append(failure, reconciledContainer{pod.Name, "", containerID}) - } - } - } return success, failure } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index e37c6114022..82c81d77a90 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -320,6 +320,23 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod v1.Pod, container v1. return nil } + // Short circuit to regenerate the same hints if there are already + // guaranteed CPUs allocated to the Container. This might happen after a + // kubelet restart, for example. + containerID, _ := findContainerIDByName(&pod.Status, container.Name) + if allocated, exists := s.GetCPUSet(containerID); exists { + if allocated.Size() != requested { + klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size()) + return map[string][]topologymanager.TopologyHint{ + string(v1.ResourceCPU): {}, + } + } + klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", string(pod.UID), container.Name) + return map[string][]topologymanager.TopologyHint{ + string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, requested), + } + } + // Get a list of available CPUs. available := p.assignableCPUs(s) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 3d30009917a..acba7c811aa 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -213,6 +213,20 @@ func TestStaticPolicyAdd(t *testing.T) { expCPUAlloc: true, expCSet: cpuset.NewCPUSet(1, 5), }, + { + description: "GuPodMultipleCores, SingleSocketHT, ExpectSameAllocation", + topo: topoSingleSocketHT, + numReservedCPUs: 1, + containerID: "fakeID3", + stAssignments: state.ContainerCPUAssignments{ + "fakeID3": cpuset.NewCPUSet(2, 3, 6, 7), + }, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), + pod: makePod("4000m", "4000m"), + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(2, 3, 6, 7), + }, { description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocket", topo: topoDualSocketHT, diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index f998e734159..0b70150da06 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -23,6 +23,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -72,6 +73,7 @@ func TestGetTopologyHints(t *testing.T) { name string pod v1.Pod container v1.Container + assignments state.ContainerCPUAssignments defaultCPUSet cpuset.CPUSet expectedHints []topologymanager.TopologyHint }{ @@ -142,6 +144,86 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, + { + name: "Request more CPUs than available", + pod: *testPod2, + container: *testContainer2, + defaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), + expectedHints: nil, + }, + { + name: "Regenerate Single-Node NUMA Hints if already allocated 1/2", + pod: *testPod1, + container: *testContainer1, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 6), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: firstSocketMask, + Preferred: true, + }, + { + NUMANodeAffinity: crossSocketMask, + Preferred: false, + }, + }, + }, + { + name: "Regenerate Single-Node NUMA Hints if already allocated 1/2", + pod: *testPod1, + container: *testContainer1, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(3, 9), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: secondSocketMask, + Preferred: true, + }, + { + NUMANodeAffinity: crossSocketMask, + Preferred: false, + }, + }, + }, + { + name: "Regenerate Cross-NUMA Hints if already allocated", + pod: *testPod4, + container: *testContainer4, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: crossSocketMask, + Preferred: true, + }, + }, + }, + { + name: "Requested less than already allocated", + pod: *testPod1, + container: *testContainer1, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 6, 3, 9), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{}, + }, + { + name: "Requested more than already allocated", + pod: *testPod4, + container: *testContainer4, + assignments: state.ContainerCPUAssignments{ + "": cpuset.NewCPUSet(0, 6, 3, 9), + }, + defaultCPUSet: cpuset.NewCPUSet(), + expectedHints: []topologymanager.TopologyHint{}, + }, } for _, tc := range tcases { topology, _ := topology.Discover(&machineInfo, numaNodeInfo) @@ -151,9 +233,13 @@ func TestGetTopologyHints(t *testing.T) { topology: topology, }, state: &mockState{ + assignments: tc.assignments, defaultCPUSet: tc.defaultCPUSet, }, - topology: topology, + topology: topology, + activePods: func() []*v1.Pod { return nil }, + podStatusProvider: mockPodStatusProvider{}, + sourcesReady: &sourcesReadyStub{}, } hints := m.GetTopologyHints(tc.pod, tc.container)[string(v1.ResourceCPU)] diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index 909309cd520..4c0ff545194 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -60,6 +60,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 8fbeff4858a..7a8d1925896 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -28,19 +28,40 @@ import ( // ensures the Device Manager is consulted when Topology Aware Hints for each // container are created. func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint { - deviceHints := make(map[string][]topologymanager.TopologyHint) + // Garbage collect any stranded device resources before providing TopologyHints + m.updateAllocatedDevices(m.activePods()) + // Loop through all device resources and generate TopologyHints for them.. + deviceHints := make(map[string][]topologymanager.TopologyHint) for resourceObj, requestedObj := range container.Resources.Limits { resource := string(resourceObj) requested := int(requestedObj.Value()) + // Only consider resources associated with a device plugin. if m.isDevicePluginResource(resource) { + // Only consider devices that actually container topology information. if aligned := m.deviceHasTopologyAlignment(resource); !aligned { klog.Infof("[devicemanager] Resource '%v' does not have a topology preference", resource) deviceHints[resource] = nil continue } + // Short circuit to regenerate the same hints if there are already + // devices allocated to the Container. This might happen after a + // kubelet restart, for example. + allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) + if allocated.Len() > 0 { + if allocated.Len() != requested { + klog.Errorf("[devicemanager] Resource '%v' already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", resource, string(pod.UID), container.Name, requested, allocated.Len()) + deviceHints[resource] = []topologymanager.TopologyHint{} + continue + } + klog.Infof("[devicemanager] Regenerating TopologyHints for resource '%v' already allocated to (pod %v, container %v)", resource, string(pod.UID), container.Name) + deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, requested) + continue + } + + // Get the list of available devices, for which TopologyHints should be generated. available := m.getAvailableDevices(resource) if available.Len() < requested { klog.Errorf("[devicemanager] Unable to generate topology hints: requested number of devices unavailable for '%s': requested: %d, available: %d", resource, requested, available.Len()) @@ -48,6 +69,8 @@ func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[s continue } + // Generate TopologyHints for this resource given the current + // request size and the list of available devices. deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, requested) } } @@ -66,8 +89,6 @@ func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { } func (m *ManagerImpl) getAvailableDevices(resource string) sets.String { - // Gets Devices in use. - m.updateAllocatedDevices(m.activePods()) // Strip all devices in use from the list of healthy ones. return m.healthyDevices[resource].Difference(m.allocatedDevices[resource]) } diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index e77cf3b4957..b69a7e3c62d 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -52,13 +53,17 @@ func makeSocketMask(sockets ...int) bitmask.BitMask { func TestGetTopologyHints(t *testing.T) { tcases := []struct { description string + podUID string + containerName string request map[string]string devices map[string][]pluginapi.Device - allocatedDevices map[string][]string + allocatedDevices map[string]map[string]map[string][]string expectedHints map[string][]topologymanager.TopologyHint }{ { - description: "Single Request, no alignment", + description: "Single Request, no alignment", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "1", }, @@ -73,7 +78,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Single Request, only one with alignment", + description: "Single Request, only one with alignment", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "1", }, @@ -97,7 +104,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Single Request, one device per socket", + description: "Single Request, one device per socket", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "1", }, @@ -125,7 +134,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Request for 2, one device per socket", + description: "Request for 2, one device per socket", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "2", }, @@ -145,7 +156,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Request for 2, 2 devices per socket", + description: "Request for 2, 2 devices per socket", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "2", }, @@ -175,7 +188,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "Request for 2, optimal on 1 NUMA node, forced cross-NUMA", + description: "Request for 2, optimal on 1 NUMA node, forced cross-NUMA", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice": "2", }, @@ -187,8 +202,12 @@ func TestGetTopologyHints(t *testing.T) { makeNUMADevice("Dev4", 1), }, }, - allocatedDevices: map[string][]string{ - "testdevice": {"Dev1", "Dev2"}, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeOtherContainer": { + "testdevice": {"Dev1", "Dev2"}, + }, + }, }, expectedHints: map[string][]topologymanager.TopologyHint{ "testdevice": { @@ -200,7 +219,9 @@ func TestGetTopologyHints(t *testing.T) { }, }, { - description: "2 device types, mixed configuration", + description: "2 device types, mixed configuration", + podUID: "fakePod", + containerName: "fakeContainer", request: map[string]string{ "testdevice1": "2", "testdevice2": "1", @@ -243,6 +264,110 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, + { + description: "Single device type, more requested than available", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "6", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + makeNUMADevice("Dev3", 1), + makeNUMADevice("Dev4", 1), + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": {}, + }, + }, + { + description: "Single device type, all already allocated to container", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "2", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + }, + }, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeContainer": { + "testdevice": {"Dev1", "Dev2"}, + }, + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": { + { + NUMANodeAffinity: makeSocketMask(0), + Preferred: true, + }, + { + NUMANodeAffinity: makeSocketMask(0, 1), + Preferred: false, + }, + }, + }, + }, + { + description: "Single device type, less already allocated to container than requested", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "4", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + makeNUMADevice("Dev3", 1), + makeNUMADevice("Dev4", 1), + }, + }, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeContainer": { + "testdevice": {"Dev1", "Dev2"}, + }, + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": {}, + }, + }, + { + description: "Single device type, more already allocated to container than requested", + podUID: "fakePod", + containerName: "fakeContainer", + request: map[string]string{ + "testdevice": "2", + }, + devices: map[string][]pluginapi.Device{ + "testdevice": { + makeNUMADevice("Dev1", 0), + makeNUMADevice("Dev2", 0), + makeNUMADevice("Dev3", 1), + makeNUMADevice("Dev4", 1), + }, + }, + allocatedDevices: map[string]map[string]map[string][]string{ + "fakePod": { + "fakeContainer": { + "testdevice": {"Dev1", "Dev2", "Dev3", "Dev4"}, + }, + }, + }, + expectedHints: map[string][]topologymanager.TopologyHint{ + "testdevice": {}, + }, + }, } for _, tc := range tcases { @@ -252,6 +377,8 @@ func TestGetTopologyHints(t *testing.T) { } pod := makePod(resourceList) + pod.UID = types.UID(tc.podUID) + pod.Spec.Containers[0].Name = tc.containerName m := ManagerImpl{ allDevices: make(map[string]map[string]pluginapi.Device), @@ -259,7 +386,7 @@ func TestGetTopologyHints(t *testing.T) { allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), sourcesReady: &sourcesReadyStub{}, - activePods: func() []*v1.Pod { return []*v1.Pod{} }, + activePods: func() []*v1.Pod { return []*v1.Pod{pod} }, numaNodes: []int{0, 1}, } @@ -273,11 +400,16 @@ func TestGetTopologyHints(t *testing.T) { } } - for r := range tc.allocatedDevices { - m.allocatedDevices[r] = sets.NewString() + for p := range tc.allocatedDevices { + for c := range tc.allocatedDevices[p] { + for r, devices := range tc.allocatedDevices[p][c] { + m.podDevices.insert(p, c, r, sets.NewString(devices...), nil) - for _, d := range tc.allocatedDevices[r] { - m.allocatedDevices[r].Insert(d) + m.allocatedDevices[r] = sets.NewString() + for _, d := range devices { + m.allocatedDevices[r].Insert(d) + } + } } }