Merge pull request #84462 from klueska/upstream-cpu-manager-update-state-semantics

Update CPUManager stored state semantics
This commit is contained in:
Kubernetes Prow Robot 2019-12-17 12:00:12 -08:00 committed by GitHub
commit a1fc96f41e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1220 additions and 770 deletions

View File

@ -50,6 +50,7 @@ go_library(
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/core/v1/helper/qos:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/devicemanager:go_default_library",
"//pkg/kubelet/cm/util:go_default_library",
@ -100,6 +101,7 @@ go_library(
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/core/v1/helper/qos:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/devicemanager:go_default_library",
"//pkg/kubelet/cm/util:go_default_library",

View File

@ -50,6 +50,7 @@ import (
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -574,7 +575,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService)
containerMap, err := buildContainerMapFromRuntime(runtimeService)
if err != nil {
return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
}
cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
}
// cache the node Info including resource capacity and
@ -686,6 +691,25 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
}
}
func buildContainerMapFromRuntime(runtimeService internalapi.RuntimeService) (containermap.ContainerMap, error) {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
return nil, fmt.Errorf("no PodsandBox found with Id '%s'", c.PodSandboxId)
}
containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id)
}
return containerMap, nil
}
func isProcessRunningInHost(pid int) (bool, error) {
// Get init pid namespace.
initPidNs, err := os.Readlink("/proc/1/ns/pid")

View File

@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"container_map.go",
"cpu_assignment.go",
"cpu_manager.go",
"fake_cpu_manager.go",
@ -15,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper/qos:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
@ -34,7 +34,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"container_map_test.go",
"cpu_assignment_test.go",
"cpu_manager_test.go",
"policy_none_test.go",
@ -44,6 +43,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
@ -69,6 +69,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/cm/cpumanager/containermap:all-srcs",
"//pkg/kubelet/cm/cpumanager/state:all-srcs",
"//pkg/kubelet/cm/cpumanager/topology:all-srcs",
],

View File

@ -1,68 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"k8s.io/api/core/v1"
)
// containerMap maps (podUID, containerName) -> containerID
type containerMap map[string]map[string]string
func newContainerMap() containerMap {
return make(containerMap)
}
func (cm containerMap) Add(p *v1.Pod, c *v1.Container, containerID string) {
podUID := string(p.UID)
if _, exists := cm[podUID]; !exists {
cm[podUID] = make(map[string]string)
}
cm[podUID][c.Name] = containerID
}
func (cm containerMap) Remove(containerID string) {
found := false
for podUID := range cm {
for containerName := range cm[podUID] {
if containerID == cm[podUID][containerName] {
delete(cm[podUID], containerName)
found = true
break
}
}
if len(cm[podUID]) == 0 {
delete(cm, podUID)
}
if found {
break
}
}
}
func (cm containerMap) Get(p *v1.Pod, c *v1.Container) (string, error) {
podUID := string(p.UID)
if _, exists := cm[podUID]; !exists {
return "", fmt.Errorf("pod %s not in containerMap", podUID)
}
if _, exists := cm[podUID][c.Name]; !exists {
return "", fmt.Errorf("container %s not in containerMap for pod %s", c.Name, podUID)
}
return cm[podUID][c.Name], nil
}

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["container_map.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap",
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["container_map_test.go"],
embed = [":go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,71 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containermap
import (
"fmt"
)
// ContainerMap maps (containerID)->(*v1.Pod, *v1.Container)
type ContainerMap map[string]struct {
podUID string
containerName string
}
// NewContainerMap creates a new ContainerMap struct
func NewContainerMap() ContainerMap {
return make(ContainerMap)
}
// Add adds a mapping of (containerID)->(podUID, containerName) to the ContainerMap
func (cm ContainerMap) Add(podUID, containerName, containerID string) {
cm[containerID] = struct {
podUID string
containerName string
}{podUID, containerName}
}
// RemoveByContainerID removes a mapping of (containerID)->(podUID, containerName) from the ContainerMap
func (cm ContainerMap) RemoveByContainerID(containerID string) {
delete(cm, containerID)
}
// RemoveByContainerRef removes a mapping of (containerID)->(podUID, containerName) from the ContainerMap
func (cm ContainerMap) RemoveByContainerRef(podUID, containerName string) {
containerID, err := cm.GetContainerID(podUID, containerName)
if err == nil {
cm.RemoveByContainerID(containerID)
}
}
// GetContainerID retrieves a ContainerID from the ContainerMap
func (cm ContainerMap) GetContainerID(podUID, containerName string) (string, error) {
for key, val := range cm {
if val.podUID == podUID && val.containerName == containerName {
return key, nil
}
}
return "", fmt.Errorf("container %s not in ContainerMap for pod %s", containerName, podUID)
}
// GetContainerRef retrieves a (podUID, containerName) pair from the ContainerMap
func (cm ContainerMap) GetContainerRef(containerID string) (string, string, error) {
if _, exists := cm[containerID]; !exists {
return "", "", fmt.Errorf("containerID %s not in ContainerMap", containerID)
}
return cm[containerID].podUID, cm[containerID].containerName, nil
}

View File

@ -14,13 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
package containermap
import (
"testing"
"k8s.io/api/core/v1"
apimachinery "k8s.io/apimachinery/pkg/types"
)
func TestContainerMap(t *testing.T) {
@ -37,34 +34,48 @@ func TestContainerMap(t *testing.T) {
}
for _, tc := range testCases {
pod := v1.Pod{}
pod.UID = apimachinery.UID(tc.podUID)
// Build a new containerMap from the testCases, checking proper
// addition, retrieval along the way.
cm := newContainerMap()
cm := NewContainerMap()
for i := range tc.containerNames {
container := v1.Container{Name: tc.containerNames[i]}
cm.Add(tc.podUID, tc.containerNames[i], tc.containerIDs[i])
cm.Add(&pod, &container, tc.containerIDs[i])
containerID, err := cm.Get(&pod, &container)
containerID, err := cm.GetContainerID(tc.podUID, tc.containerNames[i])
if err != nil {
t.Errorf("error adding and retrieving container: %v", err)
t.Errorf("error adding and retrieving containerID: %v", err)
}
if containerID != tc.containerIDs[i] {
t.Errorf("mismatched containerIDs %v, %v", containerID, tc.containerIDs[i])
}
podUID, containerName, err := cm.GetContainerRef(containerID)
if err != nil {
t.Errorf("error retrieving container reference: %v", err)
}
if podUID != tc.podUID {
t.Errorf("mismatched pod UID %v, %v", tc.podUID, podUID)
}
if containerName != tc.containerNames[i] {
t.Errorf("mismatched container Name %v, %v", tc.containerNames[i], containerName)
}
}
// Remove all entries from the containerMap, checking proper removal of
// each along the way.
for i := range tc.containerNames {
container := v1.Container{Name: tc.containerNames[i]}
cm.Remove(tc.containerIDs[i])
containerID, err := cm.Get(&pod, &container)
cm.RemoveByContainerID(tc.containerIDs[i])
containerID, err := cm.GetContainerID(tc.podUID, tc.containerNames[i])
if err == nil {
t.Errorf("unexpected retrieval of containerID after removal: %v", containerID)
}
cm.Add(tc.podUID, tc.containerNames[i], tc.containerIDs[i])
cm.RemoveByContainerRef(tc.podUID, tc.containerNames[i])
podUID, containerName, err := cm.GetContainerRef(tc.containerIDs[i])
if err == nil {
t.Errorf("unexpected retrieval of container reference after removal: (%v, %v)", podUID, containerName)
}
}
// Verify containerMap now empty.

View File

@ -28,6 +28,7 @@ import (
"k8s.io/klog"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
@ -52,7 +53,7 @@ const cpuManagerStateFileName = "cpu_manager_state"
// Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface {
// Start is called during Kubelet initialization.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService)
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap)
// AddContainer is called between container create and container start
// so that initial CPU affinity settings can be written through to the
@ -96,6 +97,10 @@ type manager struct {
// and the containerID of their containers
podStatusProvider status.PodStatusProvider
// containerMap provides a mapping from (pod, container) -> containerID
// for all containers a pod
containerMap containermap.ContainerMap
topology *topology.CPUTopology
nodeAllocatableReservation v1.ResourceList
@ -103,6 +108,9 @@ type manager struct {
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
// We use it to determine when we can purge inactive pods from checkpointed state.
sourcesReady config.SourcesReady
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string
}
var _ Manager = &manager{}
@ -153,23 +161,19 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
}
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name())
if err != nil {
return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err)
}
manager := &manager{
policy: policy,
reconcilePeriod: reconcilePeriod,
state: stateImpl,
containerMap: containermap.NewContainerMap(),
topology: topo,
nodeAllocatableReservation: nodeAllocatableReservation,
stateFileDirectory: stateFileDirectory,
}
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
}
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) {
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) {
klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
m.sourcesReady = sourcesReady
@ -177,6 +181,13 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
m.podStatusProvider = podStatusProvider
m.containerRuntime = containerRuntime
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), initialContainers)
if err != nil {
klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v\n", err)
panic("[cpumanager] - please drain node and remove policy state file")
}
m.state = stateImpl
m.policy.Start(m.state)
if m.policy.Name() == string(PolicyNone) {
return
@ -186,13 +197,24 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
m.Lock()
err := m.policy.AddContainer(m.state, p, c, containerID)
// Proactively remove CPUs from init containers that have already run.
// They are guaranteed to have run to completion before any other
// container is run.
for _, initContainer := range p.Spec.InitContainers {
if c.Name != initContainer.Name {
err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name)
if err != nil {
klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err)
}
}
}
err := m.policyAddContainer(p, c, containerID)
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: %v", err)
m.Unlock()
return err
}
cpus := m.state.GetCPUSetOrDefault(containerID)
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
m.Unlock()
if !cpus.IsEmpty() {
@ -200,7 +222,7 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: %v", err)
m.Lock()
err := m.policy.RemoveContainer(m.state, containerID)
err := m.policyRemoveContainerByID(containerID)
if err != nil {
klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
}
@ -216,14 +238,46 @@ func (m *manager) RemoveContainer(containerID string) error {
m.Lock()
defer m.Unlock()
err := m.policy.RemoveContainer(m.state, containerID)
err := m.policyRemoveContainerByID(containerID)
if err != nil {
klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
return err
}
return nil
}
func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
err := m.policy.AddContainer(m.state, p, c)
if err == nil {
m.containerMap.Add(string(p.UID), c.Name, containerID)
}
return err
}
func (m *manager) policyRemoveContainerByID(containerID string) error {
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {
return nil
}
err = m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerID(containerID)
}
return err
}
func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
err := m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerRef(podUID, containerName)
}
return err
}
func (m *manager) State() state.Reader {
return m.state
}
@ -256,43 +310,35 @@ func (m *manager) removeStaleState() {
m.Lock()
defer m.Unlock()
// We remove stale state very conservatively, only removing *any* state
// once we know for sure that we wont be accidentally removing state that
// is still valid. Since this function is called periodically, we will just
// try again next time this function is called.
// Get the list of active pods.
activePods := m.activePods()
if len(activePods) == 0 {
// If there are no active pods, skip the removal of stale state.
// Since this function is called periodically, we will just try again
// next time this function is called.
return
}
// Build a list of containerIDs for all containers in all active Pods.
activeContainers := make(map[string]struct{})
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
// If even one pod does not have it's status set, skip state removal.
return
}
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
containerID, err := findContainerIDByName(&pstatus, container.Name)
if err != nil {
// If even one container does not have it's containerID set, skip state removal.
return
}
activeContainers[containerID] = struct{}{}
activeContainers[string(pod.UID)][container.Name] = struct{}{}
}
}
// Loop through the CPUManager state. Remove any state for containers not
// in the `activeContainers` list built above. The shortcircuits in place
// above ensure that no erroneous state will ever be removed.
for containerID := range m.state.GetCPUAssignments() {
if _, ok := activeContainers[containerID]; !ok {
klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID)
err := m.policy.RemoveContainer(m.state, containerID)
if err != nil {
klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err)
// in the `activeContainers` list built above.
assignments := m.state.GetCPUAssignments()
for podUID := range assignments {
for containerName := range assignments[podUID] {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
err := m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
}
}
}
}
@ -325,7 +371,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
// - policy does not want to track the container
// - kubelet has just been restarted - and there is no previous state file
// - container has been removed from state by RemoveContainer call (DeletionTimestamp is set)
if _, ok := m.state.GetCPUSet(containerID); !ok {
if _, ok := m.state.GetCPUSet(string(pod.UID), container.Name); !ok {
if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil {
klog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
err := m.AddContainer(pod, &container, containerID)
@ -341,7 +387,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
}
}
cset := m.state.GetCPUSetOrDefault(containerID)
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)

View File

@ -19,6 +19,7 @@ package cpumanager
import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"
@ -32,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
@ -43,8 +45,8 @@ type mockState struct {
defaultCPUSet cpuset.CPUSet
}
func (s *mockState) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
res, ok := s.assignments[containerID]
func (s *mockState) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
res, ok := s.assignments[podUID][containerName]
return res.Clone(), ok
}
@ -52,23 +54,29 @@ func (s *mockState) GetDefaultCPUSet() cpuset.CPUSet {
return s.defaultCPUSet.Clone()
}
func (s *mockState) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(containerID); ok {
func (s *mockState) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(podUID, containerName); ok {
return res
}
return s.GetDefaultCPUSet()
}
func (s *mockState) SetCPUSet(containerID string, cset cpuset.CPUSet) {
s.assignments[containerID] = cset
func (s *mockState) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
if _, exists := s.assignments[podUID]; !exists {
s.assignments[podUID] = make(map[string]cpuset.CPUSet)
}
s.assignments[podUID][containerName] = cset
}
func (s *mockState) SetDefaultCPUSet(cset cpuset.CPUSet) {
s.defaultCPUSet = cset
}
func (s *mockState) Delete(containerID string) {
delete(s.assignments, containerID)
func (s *mockState) Delete(podUID string, containerName string) {
delete(s.assignments[podUID], containerName)
if len(s.assignments[podUID]) == 0 {
delete(s.assignments, podUID)
}
}
func (s *mockState) ClearState() {
@ -95,11 +103,11 @@ func (p *mockPolicy) Name() string {
func (p *mockPolicy) Start(s state.State) {
}
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
return p.err
}
func (p *mockPolicy) RemoveContainer(s state.State, containerID string) error {
func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
return p.err
}
@ -124,8 +132,8 @@ func (psp mockPodStatusProvider) GetPodStatus(uid types.UID) (v1.PodStatus, bool
return psp.podStatus, psp.found
}
func makePod(cpuRequest, cpuLimit string) *v1.Pod {
return &v1.Pod{
func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod {
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
@ -143,10 +151,19 @@ func makePod(cpuRequest, cpuLimit string) *v1.Pod {
},
},
}
pod.UID = types.UID(podUID)
pod.Spec.Containers[0].Name = containerName
return pod
}
func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
UID: "podUID",
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{},
Containers: []v1.Container{},
@ -155,7 +172,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string })
for i, cpu := range initCPUs {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{
Name: "initContainer-" + string(i),
Name: "initContainer-" + strconv.Itoa(i),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request),
@ -171,7 +188,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string })
for i, cpu := range appCPUs {
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: "appContainer-" + string(i),
Name: "appContainer-" + strconv.Itoa(i),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request),
@ -246,11 +263,12 @@ func TestCPUManagerAdd(t *testing.T) {
containerRuntime: mockRuntimeService{
err: testCase.updateErr,
},
containerMap: containermap.NewContainerMap(),
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
pod := makePod("2", "2")
pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {
@ -264,6 +282,237 @@ func TestCPUManagerAdd(t *testing.T) {
}
}
func TestCPUManagerAddWithInitContainers(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
initContainerIDs []string
containerIDs []string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expInitCSets []cpuset.CPUSet
expCSets []cpuset.CPUSet
}{
{
description: "No Guaranteed Init CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"100m", "100m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet()},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Equal Number of Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"4000m", "4000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "More Init Container Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"6000m", "6000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Less Init Container Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"2000m", "2000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Multi Init Container Equal CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
},
{
description: "Multi Init Container Less CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"4000m", "4000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
},
{
description: "Multi Init Container More CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}},
[]struct{ request, limit string }{
{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Multi Init Container Increasing CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"6000m", "6000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)},
},
{
description: "Multi Init, Multi App Container Split CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID-1", "appFakeID-2"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(1, 5)},
},
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
state := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
mgr := &manager{
policy: policy,
state: state,
containerRuntime: mockRuntimeService{},
containerMap: containermap.NewContainerMap(),
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
containers := append(
testCase.pod.Spec.InitContainers,
testCase.pod.Spec.Containers...)
containerIDs := append(
testCase.initContainerIDs,
testCase.containerIDs...)
expCSets := append(
testCase.expInitCSets,
testCase.expCSets...)
for i := range containers {
err := mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
if err != nil {
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
}
cset, found := state.assignments[string(testCase.pod.UID)][containers[i].Name]
if !expCSets[i].IsEmpty() && !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v",
testCase.description, containers[i].Name, state.assignments)
}
if found && !cset.Equals(expCSets[i]) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
testCase.description, expCSets[i], containers[i].Name, cset)
}
}
}
}
func TestCPUManagerGenerate(t *testing.T) {
testCases := []struct {
description string
@ -377,6 +626,9 @@ func TestCPUManagerGenerate(t *testing.T) {
}
func TestCPUManagerRemove(t *testing.T) {
containerID := "fakeID"
containerMap := containermap.NewContainerMap()
mgr := &manager{
policy: &mockPolicy{
err: nil,
@ -386,11 +638,13 @@ func TestCPUManagerRemove(t *testing.T) {
defaultCPUSet: cpuset.NewCPUSet(),
},
containerRuntime: mockRuntimeService{},
containerMap: containerMap,
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
err := mgr.RemoveContainer("fakeID")
containerMap.Add("", "", containerID)
err := mgr.RemoveContainer(containerID)
if err != nil {
t.Errorf("CPU Manager RemoveContainer() error. expected error to be nil but got: %v", err)
}
@ -401,11 +655,13 @@ func TestCPUManagerRemove(t *testing.T) {
},
state: state.NewMemoryState(),
containerRuntime: mockRuntimeService{},
containerMap: containerMap,
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
err = mgr.RemoveContainer("fakeID")
containerMap.Add("", "", containerID)
err = mgr.RemoveContainer(containerID)
if !reflect.DeepEqual(err, fmt.Errorf("fake error")) {
t.Errorf("CPU Manager RemoveContainer() error. expected error: fake error but got: %v", err)
}
@ -429,12 +685,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -443,18 +699,20 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(1, 2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: nil,
expectSucceededContainerName: "fakeName",
expectSucceededContainerName: "fakeContainerName",
expectFailedContainerName: "",
},
{
@ -463,12 +721,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -477,18 +735,20 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
InitContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(1, 2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: nil,
expectSucceededContainerName: "fakeName",
expectSucceededContainerName: "fakeContainerName",
expectFailedContainerName: "",
},
{
@ -497,12 +757,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -514,7 +774,7 @@ func TestReconcileState(t *testing.T) {
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
{
description: "cpu manager reconclie - container id not found",
@ -522,12 +782,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -536,8 +796,8 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName1",
ContainerID: "docker://fakeID",
Name: "fakeContainerName1",
ContainerID: "docker://fakeContainerID",
},
},
},
@ -546,7 +806,7 @@ func TestReconcileState(t *testing.T) {
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
{
description: "cpu manager reconclie - cpuset is empty",
@ -554,12 +814,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -568,19 +828,21 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
updateErr: nil,
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
{
description: "cpu manager reconclie - container update error",
@ -588,12 +850,12 @@ func TestReconcileState(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
UID: "fakePodUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
Name: "fakeContainerName",
},
},
},
@ -602,19 +864,21 @@ func TestReconcileState(t *testing.T) {
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
Name: "fakeContainerName",
ContainerID: "docker://fakeContainerID",
},
},
},
pspFound: true,
stAssignments: state.ContainerCPUAssignments{
"fakeID": cpuset.NewCPUSet(1, 2),
"fakePodUID": map[string]cpuset.CPUSet{
"fakeContainerName": cpuset.NewCPUSet(1, 2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: fmt.Errorf("fake container update error"),
expectSucceededContainerName: "",
expectFailedContainerName: "fakeName",
expectFailedContainerName: "fakeContainerName",
},
}
@ -630,6 +894,7 @@ func TestReconcileState(t *testing.T) {
containerRuntime: mockRuntimeService{
err: testCase.updateErr,
},
containerMap: containermap.NewContainerMap(),
activePods: func() []*v1.Pod {
return testCase.activePods
},
@ -724,11 +989,12 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
containerRuntime: mockRuntimeService{
err: testCase.updateErr,
},
containerMap: containermap.NewContainerMap(),
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
pod := makePod("2", "2")
pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {

View File

@ -19,6 +19,7 @@ package cpumanager
import (
"k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -29,7 +30,7 @@ type fakeManager struct {
state state.State
}
func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) {
func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) {
klog.Info("[fake cpumanager] Start()")
}

View File

@ -27,9 +27,9 @@ type Policy interface {
Name() string
Start(s state.State)
// AddContainer call is idempotent
AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error
AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error
// RemoveContainer call is idempotent
RemoveContainer(s state.State, containerID string) error
RemoveContainer(s state.State, podUID string, containerName string) error
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.

View File

@ -43,11 +43,11 @@ func (p *nonePolicy) Start(s state.State) {
klog.Info("[cpumanager] none policy: Start")
}
func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
return nil
}
func (p *nonePolicy) RemoveContainer(s state.State, containerID string) error {
func (p *nonePolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
return nil
}

View File

@ -41,10 +41,10 @@ func TestNonePolicyAdd(t *testing.T) {
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
}
testPod := makePod("1000m", "1000m")
testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m")
container := &testPod.Spec.Containers[0]
err := policy.AddContainer(st, testPod, container, "fakeID")
err := policy.AddContainer(st, testPod, container)
if err != nil {
t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err)
}
@ -58,7 +58,10 @@ func TestNonePolicyRemove(t *testing.T) {
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
}
err := policy.RemoveContainer(st, "fakeID")
testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m")
container := &testPod.Spec.Containers[0]
err := policy.RemoveContainer(st, string(testPod.UID), container.Name)
if err != nil {
t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err)
}

View File

@ -75,10 +75,6 @@ type staticPolicy struct {
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reserved cpuset.CPUSet
// containerMap provides a mapping from
// (pod, container) -> containerID
// for all containers a pod
containerMap containerMap
// topology manager reference to get container Topology affinity
affinity topologymanager.Store
}
@ -110,10 +106,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
return &staticPolicy{
topology: topology,
reserved: reserved,
containerMap: newContainerMap(),
affinity: affinity,
topology: topology,
reserved: reserved,
affinity: affinity,
}
}
@ -153,11 +148,13 @@ func (p *staticPolicy) validateState(s state.State) error {
}
// 2. Check if state for static policy is consistent
for cID, cset := range tmpAssignments {
// None of the cpu in DEFAULT cset should be in s.assignments
if !tmpDefaultCPUset.Intersection(cset).IsEmpty() {
return fmt.Errorf("container id: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"",
cID, cset.String(), tmpDefaultCPUset.String())
for pod := range tmpAssignments {
for container, cset := range tmpAssignments[pod] {
// None of the cpu in DEFAULT cset should be in s.assignments
if !tmpDefaultCPUset.Intersection(cset).IsEmpty() {
return fmt.Errorf("pod: %s, container: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"",
pod, container, cset.String(), tmpDefaultCPUset.String())
}
}
}
@ -170,8 +167,10 @@ func (p *staticPolicy) validateState(s state.State) error {
// the set of CPUs stored in the state.
totalKnownCPUs := tmpDefaultCPUset.Clone()
tmpCPUSets := []cpuset.CPUSet{}
for _, cset := range tmpAssignments {
tmpCPUSets = append(tmpCPUSets, cset)
for pod := range tmpAssignments {
for _, cset := range tmpAssignments[pod] {
tmpCPUSets = append(tmpCPUSets, cset)
}
}
totalKnownCPUs = totalKnownCPUs.UnionAll(tmpCPUSets)
if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) {
@ -187,40 +186,16 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) (rerr error) {
// So long as this function does not return an error,
// add (pod, container, containerID) to the containerMap.
defer func() {
if rerr == nil {
p.containerMap.Add(pod, container, containerID)
}
}()
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s)", pod.Name, container.Name)
// container belongs in an exclusively allocated pool
if _, ok := s.GetCPUSet(containerID); ok {
klog.Infof("[cpumanager] static policy: container already present in state, skipping (container: %s, container id: %s)", container.Name, containerID)
if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
return nil
}
// Proactively remove CPUs from init containers that have already run.
// They are guaranteed to have run to completion before any other
// container is run.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name != initContainer.Name {
initContainerID, err := p.containerMap.Get(pod, &initContainer)
if err != nil {
continue
}
err = p.RemoveContainer(s, initContainerID)
if err != nil {
klog.Warningf("[cpumanager] unable to remove init container (container id: %s, error: %v)", initContainerID, err)
}
}
}
// Call Topology Manager to get the aligned socket affinity across all hint providers.
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
@ -228,27 +203,19 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity)
if err != nil {
klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err)
return err
}
s.SetCPUSet(containerID, cpuset)
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
}
func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr error) {
// So long as this function does not return an error,
// remove containerID from the containerMap.
defer func() {
if rerr == nil {
p.containerMap.Remove(containerID)
}
}()
klog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID)
if toRelease, ok := s.GetCPUSet(containerID); ok {
s.Delete(containerID)
func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
klog.Infof("[cpumanager] static policy: RemoveContainer (pod: %s, container: %s)", podUID, containerName)
if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
s.Delete(podUID, containerName)
// Mutate the shared pool, adding released cpus.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
}
@ -328,8 +295,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod v1.Pod, container v1.
// Short circuit to regenerate the same hints if there are already
// guaranteed CPUs allocated to the Container. This might happen after a
// kubelet restart, for example.
containerID, _ := findContainerIDByName(&pod.Status, container.Name)
if allocated, exists := s.GetCPUSet(containerID); exists {
if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
if allocated.Size() != requested {
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size())
return map[string][]topologymanager.TopologyHint{

View File

@ -33,7 +33,8 @@ type staticPolicyTest struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
containerID string
podUID string
containerName string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
@ -43,19 +44,6 @@ type staticPolicyTest struct {
expPanic bool
}
type staticPolicyMultiContainerTest struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
initContainerIDs []string
containerIDs []string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expInitCSets []cpuset.CPUSet
expCSets []cpuset.CPUSet
}
func TestStaticPolicyName(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
@ -72,7 +60,9 @@ func TestStaticPolicyStart(t *testing.T) {
description: "non-corrupted state",
topo: topoDualSocketHT,
stAssignments: state.ContainerCPUAssignments{
"0": cpuset.NewCPUSet(0),
"fakePod": map[string]cpuset.CPUSet{
"0": cpuset.NewCPUSet(0),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expCSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
@ -97,7 +87,9 @@ func TestStaticPolicyStart(t *testing.T) {
description: "assigned core 2 is still present in available cpuset",
topo: topoDualSocketHT,
stAssignments: state.ContainerCPUAssignments{
"0": cpuset.NewCPUSet(0, 1, 2),
"fakePod": map[string]cpuset.CPUSet{
"0": cpuset.NewCPUSet(0, 1, 2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
expPanic: true,
@ -106,8 +98,10 @@ func TestStaticPolicyStart(t *testing.T) {
description: "core 12 is not present in topology but is in state cpuset",
topo: topoDualSocketHT,
stAssignments: state.ContainerCPUAssignments{
"0": cpuset.NewCPUSet(0, 1, 2),
"1": cpuset.NewCPUSet(3, 4),
"fakePod": map[string]cpuset.CPUSet{
"0": cpuset.NewCPUSet(0, 1, 2),
"1": cpuset.NewCPUSet(3, 4),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10, 11, 12),
expPanic: true,
@ -116,8 +110,10 @@ func TestStaticPolicyStart(t *testing.T) {
description: "core 11 is present in topology but is not in state cpuset",
topo: topoDualSocketHT,
stAssignments: state.ContainerCPUAssignments{
"0": cpuset.NewCPUSet(0, 1, 2),
"1": cpuset.NewCPUSet(3, 4),
"fakePod": map[string]cpuset.CPUSet{
"0": cpuset.NewCPUSet(0, 1, 2),
"1": cpuset.NewCPUSet(3, 4),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10),
expPanic: true,
@ -179,10 +175,9 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodSingleCore, SingleSocketHT, ExpectError",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID2",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("8000m", "8000m"),
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@ -191,10 +186,9 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID2",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "1000m"),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core
@ -203,12 +197,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2, 3, 6, 7),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
pod: makePod("2000m", "2000m"),
pod: makePod("fakePod", "fakeContainer3", "2000m", "2000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 5),
@ -217,12 +212,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, SingleSocketHT, ExpectSameAllocation",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID3": cpuset.NewCPUSet(2, 3, 6, 7),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer3": cpuset.NewCPUSet(2, 3, 6, 7),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
pod: makePod("4000m", "4000m"),
pod: makePod("fakePod", "fakeContainer3", "4000m", "4000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(2, 3, 6, 7),
@ -231,12 +227,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocket",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11),
pod: makePod("6000m", "6000m"),
pod: makePod("fakePod", "fakeContainer3", "6000m", "6000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 3, 5, 7, 9, 11),
@ -245,12 +242,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocThreeCores",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(1, 5),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(1, 5),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 2, 3, 4, 6, 7, 8, 9, 10, 11),
pod: makePod("6000m", "6000m"),
pod: makePod("fakePod", "fakeContainer3", "6000m", "6000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(2, 3, 4, 8, 9, 10),
@ -259,12 +257,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, DualSocketNoHT, ExpectAllocOneSocket",
topo: topoDualSocketNoHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7),
pod: makePod("4000m", "4000m"),
pod: makePod("fakePod", "fakeContainer1", "4000m", "4000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4, 5, 6, 7),
@ -273,12 +272,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, DualSocketNoHT, ExpectAllocFourCores",
topo: topoDualSocketNoHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(4, 5),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(4, 5),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 6, 7),
pod: makePod("4000m", "4000m"),
pod: makePod("fakePod", "fakeContainer1", "4000m", "4000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 3, 6, 7),
@ -287,12 +287,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocketOneCore",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(2),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11),
pod: makePod("8000m", "8000m"),
pod: makePod("fakePod", "fakeContainer3", "8000m", "8000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 3, 4, 5, 7, 9, 10, 11),
@ -301,10 +302,9 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "NonGuPod, SingleSocketHT, NoAlloc",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "2000m"),
pod: makePod("fakePod", "fakeContainer1", "1000m", "2000m"),
expErr: nil,
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@ -313,10 +313,9 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodNonIntegerCore, SingleSocketHT, NoAlloc",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID4",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("977m", "977m"),
pod: makePod("fakePod", "fakeContainer4", "977m", "977m"),
expErr: nil,
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@ -325,12 +324,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, SingleSocketHT, NoAllocExpectError",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID5",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 7),
pod: makePod("2000m", "2000m"),
pod: makePod("fakePod", "fakeContainer5", "2000m", "2000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@ -339,12 +339,13 @@ func TestStaticPolicyAdd(t *testing.T) {
description: "GuPodMultipleCores, DualSocketHT, NoAllocExpectError",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID5",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(1, 2, 3),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(1, 2, 3),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 4, 5, 6, 7, 8, 9, 10, 11),
pod: makePod("10000m", "10000m"),
pod: makePod("fakePod", "fakeContainer5", "10000m", "10000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@ -355,12 +356,13 @@ func TestStaticPolicyAdd(t *testing.T) {
// Expect all CPUs from Socket 0.
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocSock0",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(3, 11, 4, 5, 6, 7),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(3, 11, 4, 5, 6, 7),
},
},
stDefaultCPUSet: largeTopoCPUSet.Difference(cpuset.NewCPUSet(3, 11, 4, 5, 6, 7)),
pod: makePod("72000m", "72000m"),
pod: makePod("fakePod", "fakeContainer5", "72000m", "72000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: largeTopoSock0CPUSet,
@ -370,13 +372,14 @@ func TestStaticPolicyAdd(t *testing.T) {
// Expect CPUs from the 2 full cores available from the three Sockets.
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllFullCoresFromThreeSockets",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51,
53, 173, 113, 233, 54, 61)),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51,
53, 173, 113, 233, 54, 61)),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51, 53, 173, 113, 233, 54, 61),
pod: makePod("12000m", "12000m"),
pod: makePod("fakePod", "fakeCcontainer5", "12000m", "12000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 25, 13, 38, 11, 35, 23, 48, 53, 173, 113, 233),
@ -386,14 +389,15 @@ func TestStaticPolicyAdd(t *testing.T) {
// Expect all CPUs from Socket 1 and the hyper-threads from the full core.
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllSock1+FullCore",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53,
173, 61, 181, 108, 228, 115, 235))),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": largeTopoCPUSet.Difference(largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53,
173, 61, 181, 108, 228, 115, 235))),
},
},
stDefaultCPUSet: largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53, 173, 61, 181, 108, 228,
115, 235)),
pod: makePod("76000m", "76000m"),
pod: makePod("fakePod", "fakeContainer5", "76000m", "76000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47)),
@ -403,12 +407,13 @@ func TestStaticPolicyAdd(t *testing.T) {
// Expect allocation of all the CPUs from the partial cores.
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52),
pod: makePod("5000m", "5000m"),
pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52),
@ -419,12 +424,13 @@ func TestStaticPolicyAdd(t *testing.T) {
// Error is expect since available CPUs are less than the request.
description: "GuPodMultipleCores, topoQuadSocketFourWayHT, NoAlloc",
topo: topoQuadSocketFourWayHT,
containerID: "fakeID5",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52),
pod: makePod("76000m", "76000m"),
pod: makePod("fakePod", "fakeContainer5", "76000m", "76000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@ -440,17 +446,17 @@ func TestStaticPolicyAdd(t *testing.T) {
}
container := &testCase.pod.Spec.Containers[0]
err := policy.AddContainer(st, testCase.pod, container, testCase.containerID)
err := policy.AddContainer(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[testCase.containerID]
cset, found := st.assignments[string(testCase.pod.UID)][container.Name]
if !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
@ -465,221 +471,10 @@ func TestStaticPolicyAdd(t *testing.T) {
}
if !testCase.expCPUAlloc {
_, found := st.assignments[testCase.containerID]
_, found := st.assignments[string(testCase.pod.UID)][container.Name]
if found {
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
}
}
}
}
func TestStaticPolicyAddWithInitContainers(t *testing.T) {
testCases := []staticPolicyMultiContainerTest{
{
description: "No Guaranteed Init CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"100m", "100m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet()},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Equal Number of Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"4000m", "4000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "More Init Container Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"6000m", "6000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Less Init Container Guaranteed CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{{"2000m", "2000m"}},
[]struct{ request, limit string }{{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Multi Init Container Equal CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
},
{
description: "Multi Init Container Less CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"4000m", "4000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4)},
},
{
description: "Multi Init Container More CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}},
[]struct{ request, limit string }{
{"4000m", "4000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5)},
},
{
description: "Multi Init Container Increasing CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"6000m", "6000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)},
},
{
description: "Multi Init, Multi App Container Split CPUs",
topo: topoSingleSocketHT,
numReservedCPUs: 0,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
initContainerIDs: []string{"initFakeID-1", "initFakeID-2"},
containerIDs: []string{"appFakeID-1", "appFakeID-2"},
pod: makeMultiContainerPod(
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"4000m", "4000m"}},
[]struct{ request, limit string }{
{"2000m", "2000m"},
{"2000m", "2000m"}}),
expInitCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(0, 4, 1, 5)},
expCSets: []cpuset.CPUSet{
cpuset.NewCPUSet(0, 4),
cpuset.NewCPUSet(1, 5)},
},
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
containers := append(
testCase.pod.Spec.InitContainers,
testCase.pod.Spec.Containers...)
containerIDs := append(
testCase.initContainerIDs,
testCase.containerIDs...)
expCSets := append(
testCase.expInitCSets,
testCase.expCSets...)
for i := range containers {
err := policy.AddContainer(st, testCase.pod, &containers[i], containerIDs[i])
if err != nil {
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
}
cset, found := st.assignments[containerIDs[i]]
if !expCSets[i].IsEmpty() && !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v",
testCase.description, containerIDs[i], st.assignments)
}
if found && !cset.Equals(expCSets[i]) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
testCase.description, expCSets[i], containerIDs[i], cset)
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
}
}
@ -688,43 +483,55 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) {
func TestStaticPolicyRemove(t *testing.T) {
testCases := []staticPolicyTest{
{
description: "SingleSocketHT, DeAllocOneContainer",
topo: topoSingleSocketHT,
containerID: "fakeID1",
description: "SingleSocketHT, DeAllocOneContainer",
topo: topoSingleSocketHT,
podUID: "fakePod",
containerName: "fakeContainer1",
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 2, 3),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer1": cpuset.NewCPUSet(1, 2, 3),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(4, 5, 6, 7),
expCSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
},
{
description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty",
topo: topoSingleSocketHT,
containerID: "fakeID1",
description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty",
topo: topoSingleSocketHT,
podUID: "fakePod",
containerName: "fakeContainer1",
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 2, 3),
"fakeID2": cpuset.NewCPUSet(4, 5, 6, 7),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer1": cpuset.NewCPUSet(1, 2, 3),
"fakeContainer2": cpuset.NewCPUSet(4, 5, 6, 7),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(),
expCSet: cpuset.NewCPUSet(1, 2, 3),
},
{
description: "SingleSocketHT, DeAllocTwoContainer",
topo: topoSingleSocketHT,
containerID: "fakeID1",
description: "SingleSocketHT, DeAllocTwoContainer",
topo: topoSingleSocketHT,
podUID: "fakePod",
containerName: "fakeContainer1",
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 3, 5),
"fakeID2": cpuset.NewCPUSet(2, 4),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer1": cpuset.NewCPUSet(1, 3, 5),
"fakeContainer2": cpuset.NewCPUSet(2, 4),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(6, 7),
expCSet: cpuset.NewCPUSet(1, 3, 5, 6, 7),
},
{
description: "SingleSocketHT, NoDeAlloc",
topo: topoSingleSocketHT,
containerID: "fakeID2",
description: "SingleSocketHT, NoDeAlloc",
topo: topoSingleSocketHT,
podUID: "fakePod",
containerName: "fakeContainer2",
stAssignments: state.ContainerCPUAssignments{
"fakeID1": cpuset.NewCPUSet(1, 3, 5),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer1": cpuset.NewCPUSet(1, 3, 5),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(2, 4, 6, 7),
expCSet: cpuset.NewCPUSet(2, 4, 6, 7),
@ -739,16 +546,16 @@ func TestStaticPolicyRemove(t *testing.T) {
defaultCPUSet: testCase.stDefaultCPUSet,
}
policy.RemoveContainer(st, testCase.containerID)
policy.RemoveContainer(st, testCase.podUID, testCase.containerName)
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSet) {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v",
testCase.description, testCase.expCSet, st.defaultCPUSet)
}
if _, found := st.assignments[testCase.containerID]; found {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected containerID %v not be in assignments %v",
testCase.description, testCase.containerID, st.assignments)
if _, found := st.assignments[testCase.podUID][testCase.containerName]; found {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected (pod %v, container %v) not be in assignments %v",
testCase.description, testCase.podUID, testCase.containerName, st.assignments)
}
}
}
@ -850,7 +657,6 @@ type staticPolicyTestWithResvList struct {
topo *topology.CPUTopology
numReservedCPUs int
reserved cpuset.CPUSet
containerID string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
@ -925,10 +731,9 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
reserved: cpuset.NewCPUSet(0),
containerID: "fakeID2",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("8000m", "8000m"),
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@ -938,10 +743,9 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
containerID: "fakeID2",
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "1000m"),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core
@ -951,12 +755,13 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
containerID: "fakeID3",
stAssignments: state.ContainerCPUAssignments{
"fakeID100": cpuset.NewCPUSet(2, 3, 6, 7),
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
},
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
pod: makePod("2000m", "2000m"),
pod: makePod("fakePod", "fakeContainer3", "2000m", "2000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4, 5),
@ -972,17 +777,17 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
}
container := &testCase.pod.Spec.Containers[0]
err := policy.AddContainer(st, testCase.pod, container, testCase.containerID)
err := policy.AddContainer(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[testCase.containerID]
cset, found := st.assignments[string(testCase.pod.UID)][container.Name]
if !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
@ -997,10 +802,10 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
}
if !testCase.expCPUAlloc {
_, found := st.assignments[testCase.containerID]
_, found := st.assignments[string(testCase.pod.UID)][container.Name]
if found {
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v",
testCase.description, container.Name, st.assignments)
}
}
}

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
@ -30,6 +31,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm/cpumanager/containermap:go_default_library",
"//pkg/kubelet/cm/cpumanager/state/testing:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",

View File

@ -23,38 +23,87 @@ import (
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
var _ checkpointmanager.Checkpoint = &CPUManagerCheckpoint{}
// CPUManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint
type CPUManagerCheckpoint struct {
type CPUManagerCheckpoint = CPUManagerCheckpointV2
var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV1{}
var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV2{}
// CPUManagerCheckpointV1 struct is used to store cpu/pod assignments in a checkpoint in v1 format
type CPUManagerCheckpointV1 struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]string `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// CPUManagerCheckpointV2 struct is used to store cpu/pod assignments in a checkpoint in v2 format
type CPUManagerCheckpointV2 struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]map[string]string `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// NewCPUManagerCheckpoint returns an instance of Checkpoint
func NewCPUManagerCheckpoint() *CPUManagerCheckpoint {
return &CPUManagerCheckpoint{
//lint:ignore unexported-type-in-api user-facing error message
return newCPUManagerCheckpointV2()
}
func newCPUManagerCheckpointV1() *CPUManagerCheckpointV1 {
return &CPUManagerCheckpointV1{
Entries: make(map[string]string),
}
}
// MarshalCheckpoint returns marshalled checkpoint
func (cp *CPUManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
func newCPUManagerCheckpointV2() *CPUManagerCheckpointV2 {
return &CPUManagerCheckpointV2{
Entries: make(map[string]map[string]string),
}
}
// MarshalCheckpoint returns marshalled checkpoint in v1 format
func (cp *CPUManagerCheckpointV1) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
cp.Checksum = 0
cp.Checksum = checksum.New(cp)
return json.Marshal(*cp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (cp *CPUManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
// MarshalCheckpoint returns marshalled checkpoint in v2 format
func (cp *CPUManagerCheckpointV2) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
cp.Checksum = 0
cp.Checksum = checksum.New(cp)
return json.Marshal(*cp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v1 format
func (cp *CPUManagerCheckpointV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid
func (cp *CPUManagerCheckpoint) VerifyChecksum() error {
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v2 format
func (cp *CPUManagerCheckpointV2) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid in v1 format
func (cp *CPUManagerCheckpointV1) VerifyChecksum() error {
if cp.Checksum == 0 {
// accept empty checksum for compatibility with old file backend
return nil
}
ck := cp.Checksum
cp.Checksum = 0
err := ck.Verify(cp)
cp.Checksum = ck
return err
}
// VerifyChecksum verifies that current checksum of checkpoint is valid in v2 format
func (cp *CPUManagerCheckpointV2) VerifyChecksum() error {
if cp.Checksum == 0 {
// accept empty checksum for compatibility with old file backend
return nil

View File

@ -21,30 +21,33 @@ import (
)
// ContainerCPUAssignments type used in cpu manager state
type ContainerCPUAssignments map[string]cpuset.CPUSet
type ContainerCPUAssignments map[string]map[string]cpuset.CPUSet
// Clone returns a copy of ContainerCPUAssignments
func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments {
ret := make(ContainerCPUAssignments)
for key, val := range as {
ret[key] = val
for pod := range as {
ret[pod] = make(map[string]cpuset.CPUSet)
for container, cset := range as[pod] {
ret[pod][container] = cset
}
}
return ret
}
// Reader interface used to read current cpu/pod assignment state
type Reader interface {
GetCPUSet(containerID string) (cpuset.CPUSet, bool)
GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool)
GetDefaultCPUSet() cpuset.CPUSet
GetCPUSetOrDefault(containerID string) cpuset.CPUSet
GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet
GetCPUAssignments() ContainerCPUAssignments
}
type writer interface {
SetCPUSet(containerID string, cpuset cpuset.CPUSet)
SetCPUSet(podUID string, containerName string, cpuset cpuset.CPUSet)
SetDefaultCPUSet(cpuset cpuset.CPUSet)
SetCPUAssignments(ContainerCPUAssignments)
Delete(containerID string)
Delete(podUID string, containerName string)
ClearState()
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
@ -35,10 +36,11 @@ type stateCheckpoint struct {
cache State
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
initialContainers containermap.ContainerMap
}
// NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend
func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) {
func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
@ -48,6 +50,7 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
initialContainers: initialContainers,
}
if err := stateCheckpoint.restoreState(); err != nil {
@ -60,6 +63,30 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err
return stateCheckpoint, nil
}
// migrateV1CheckpointToV2Checkpoint() converts checkpoints from the v1 format to the v2 format
func (sc *stateCheckpoint) migrateV1CheckpointToV2Checkpoint(src *CPUManagerCheckpointV1, dst *CPUManagerCheckpointV2) error {
if src.PolicyName != "" {
dst.PolicyName = src.PolicyName
}
if src.DefaultCPUSet != "" {
dst.DefaultCPUSet = src.DefaultCPUSet
}
for containerID, cset := range src.Entries {
podUID, containerName, err := sc.initialContainers.GetContainerRef(containerID)
if err != nil {
return fmt.Errorf("containerID '%v' not found in initial containers list", containerID)
}
if dst.Entries == nil {
dst.Entries = make(map[string]map[string]string)
}
if _, exists := dst.Entries[podUID]; !exists {
dst.Entries[podUID] = make(map[string]string)
}
dst.Entries[podUID][containerName] = cset
}
return nil
}
// restores state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) restoreState() error {
sc.mux.Lock()
@ -71,28 +98,40 @@ func (sc *stateCheckpoint) restoreState() error {
tmpDefaultCPUSet := cpuset.NewCPUSet()
tmpContainerCPUSet := cpuset.NewCPUSet()
checkpoint := NewCPUManagerCheckpoint()
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound {
sc.storeState()
return nil
checkpointV1 := newCPUManagerCheckpointV1()
checkpointV2 := newCPUManagerCheckpointV2()
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV1); err != nil {
checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil {
if err == errors.ErrCheckpointNotFound {
sc.storeState()
return nil
}
return err
}
return err
}
if sc.policyName != checkpoint.PolicyName {
return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
if err = sc.migrateV1CheckpointToV2Checkpoint(checkpointV1, checkpointV2); err != nil {
return fmt.Errorf("error migrating v1 checkpoint state to v2 checkpoint state: %s", err)
}
if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil {
return fmt.Errorf("could not parse default cpu set %q: %v", checkpoint.DefaultCPUSet, err)
if sc.policyName != checkpointV2.PolicyName {
return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpointV2.PolicyName)
}
for containerID, cpuString := range checkpoint.Entries {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
return fmt.Errorf("could not parse cpuset %q for container id %q: %v", cpuString, containerID, err)
if tmpDefaultCPUSet, err = cpuset.Parse(checkpointV2.DefaultCPUSet); err != nil {
return fmt.Errorf("could not parse default cpu set %q: %v", checkpointV2.DefaultCPUSet, err)
}
for pod := range checkpointV2.Entries {
tmpAssignments[pod] = make(map[string]cpuset.CPUSet)
for container, cpuString := range checkpointV2.Entries[pod] {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
return fmt.Errorf("could not parse cpuset %q for container %q in pod %q: %v", cpuString, container, pod, err)
}
tmpAssignments[pod][container] = tmpContainerCPUSet
}
tmpAssignments[containerID] = tmpContainerCPUSet
}
sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
@ -110,8 +149,12 @@ func (sc *stateCheckpoint) storeState() {
checkpoint.PolicyName = sc.policyName
checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String()
for containerID, cset := range sc.cache.GetCPUAssignments() {
checkpoint.Entries[containerID] = cset.String()
assignments := sc.cache.GetCPUAssignments()
for pod := range assignments {
checkpoint.Entries[pod] = make(map[string]string)
for container, cset := range assignments[pod] {
checkpoint.Entries[pod][container] = cset.String()
}
}
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
@ -122,11 +165,11 @@ func (sc *stateCheckpoint) storeState() {
}
// GetCPUSet returns current CPU set
func (sc *stateCheckpoint) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
sc.mux.RLock()
defer sc.mux.RUnlock()
res, ok := sc.cache.GetCPUSet(containerID)
res, ok := sc.cache.GetCPUSet(podUID, containerName)
return res, ok
}
@ -139,11 +182,11 @@ func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet {
}
// GetCPUSetOrDefault returns current CPU set, or default one if it wasn't changed
func (sc *stateCheckpoint) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
func (sc *stateCheckpoint) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetCPUSetOrDefault(containerID)
return sc.cache.GetCPUSetOrDefault(podUID, containerName)
}
// GetCPUAssignments returns current CPU to pod assignments
@ -155,10 +198,10 @@ func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments {
}
// SetCPUSet sets CPU set
func (sc *stateCheckpoint) SetCPUSet(containerID string, cset cpuset.CPUSet) {
func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetCPUSet(containerID, cset)
sc.cache.SetCPUSet(podUID, containerName, cset)
sc.storeState()
}
@ -179,10 +222,10 @@ func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) {
}
// Delete deletes assignment for specified pod
func (sc *stateCheckpoint) Delete(containerID string) {
func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.Delete(containerID)
sc.cache.Delete(podUID, containerName)
sc.storeState()
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
@ -35,6 +36,7 @@ func TestCheckpointStateRestore(t *testing.T) {
description string
checkpointContent string
policyName string
initialContainers containermap.ContainerMap
expectedError string
expectedState *stateMemory
}{
@ -42,6 +44,7 @@ func TestCheckpointStateRestore(t *testing.T) {
"Restore non-existing checkpoint",
"",
"none",
containermap.ContainerMap{},
"",
&stateMemory{},
},
@ -51,9 +54,10 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none",
"defaultCPUSet": "4-6",
"entries": {},
"checksum": 2912033808
"checksum": 2655485041
}`,
"none",
containermap.ContainerMap{},
"",
&stateMemory{
defaultCPUSet: cpuset.NewCPUSet(4, 5, 6),
@ -65,17 +69,22 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none",
"defaultCPUSet": "1-3",
"entries": {
"container1": "4-6",
"container2": "1-3"
"pod": {
"container1": "4-6",
"container2": "1-3"
}
},
"checksum": 1535905563
"checksum": 3415933391
}`,
"none",
containermap.ContainerMap{},
"",
&stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
},
@ -89,6 +98,7 @@ func TestCheckpointStateRestore(t *testing.T) {
"checksum": 1337
}`,
"none",
containermap.ContainerMap{},
"checkpoint is corrupted",
&stateMemory{},
},
@ -96,6 +106,7 @@ func TestCheckpointStateRestore(t *testing.T) {
"Restore checkpoint with invalid JSON",
`{`,
"none",
containermap.ContainerMap{},
"unexpected end of JSON input",
&stateMemory{},
},
@ -105,9 +116,10 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "other",
"defaultCPUSet": "1-3",
"entries": {},
"checksum": 4195836012
"checksum": 698611581
}`,
"none",
containermap.ContainerMap{},
`configured policy "none" differs from state checkpoint policy "other"`,
&stateMemory{},
},
@ -117,9 +129,10 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none",
"defaultCPUSet": "1.3",
"entries": {},
"checksum": 1025273327
"checksum": 1966990140
}`,
"none",
containermap.ContainerMap{},
`could not parse default cpu set "1.3": strconv.Atoi: parsing "1.3": invalid syntax`,
&stateMemory{},
},
@ -129,15 +142,47 @@ func TestCheckpointStateRestore(t *testing.T) {
"policyName": "none",
"defaultCPUSet": "1-3",
"entries": {
"container1": "4-6",
"container2": "asd"
"pod": {
"container1": "4-6",
"container2": "asd"
}
},
"checksum": 2764213924
"checksum": 3082925826
}`,
"none",
`could not parse cpuset "asd" for container id "container2": strconv.Atoi: parsing "asd": invalid syntax`,
containermap.ContainerMap{},
`could not parse cpuset "asd" for container "container2" in pod "pod": strconv.Atoi: parsing "asd": invalid syntax`,
&stateMemory{},
},
{
"Restore checkpoint with migration",
`{
"policyName": "none",
"defaultCPUSet": "1-3",
"entries": {
"containerID1": "4-6",
"containerID2": "1-3"
},
"checksum": 2832947348
}`,
"none",
func() containermap.ContainerMap {
cm := containermap.NewContainerMap()
cm.Add("pod", "container1", "containerID1")
cm.Add("pod", "container2", "containerID2")
return cm
}(),
"",
&stateMemory{
assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
},
},
}
// create checkpoint manager for testing
@ -159,7 +204,7 @@ func TestCheckpointStateRestore(t *testing.T) {
}
}
restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName)
restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName, tc.initialContainers)
if err != nil {
if strings.TrimSpace(tc.expectedError) != "" {
tc.expectedError = "could not restore state from checkpoint: " + tc.expectedError
@ -189,8 +234,10 @@ func TestCheckpointStateStore(t *testing.T) {
{
"Store assignments",
&stateMemory{
assignments: map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(1, 5, 8),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"container1": cpuset.NewCPUSet(1, 5, 8),
},
},
},
},
@ -206,7 +253,7 @@ func TestCheckpointStateStore(t *testing.T) {
// ensure there is no previous checkpoint
cpm.RemoveCheckpoint(testingCheckpoint)
cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err)
}
@ -216,7 +263,7 @@ func TestCheckpointStateStore(t *testing.T) {
cs1.SetCPUAssignments(tc.expectedState.assignments)
// restore checkpoint with previously stored values
cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err)
}
@ -230,28 +277,34 @@ func TestCheckpointStateHelpers(t *testing.T) {
testCases := []struct {
description string
defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet
assignments map[string]map[string]cpuset.CPUSet
}{
{
description: "One container",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(0, 1),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1),
},
},
},
{
description: "Two containers",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3, 4, 5),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3, 4, 5),
},
},
},
{
description: "Container without assigned cpus",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(),
},
},
},
}
@ -266,21 +319,23 @@ func TestCheckpointStateHelpers(t *testing.T) {
// ensure there is no previous checkpoint
cpm.RemoveCheckpoint(testingCheckpoint)
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err)
}
state.SetDefaultCPUSet(tc.defaultCPUset)
for container, set := range tc.containers {
state.SetCPUSet(container, set)
if cpus, _ := state.GetCPUSet(container); !cpus.Equals(set) {
t.Fatalf("state inconsistent, got %q instead of %q", set, cpus)
}
for pod := range tc.assignments {
for container, set := range tc.assignments[pod] {
state.SetCPUSet(pod, container, set)
if cpus, _ := state.GetCPUSet(pod, container); !cpus.Equals(set) {
t.Fatalf("state inconsistent, got %q instead of %q", set, cpus)
}
state.Delete(container)
if _, ok := state.GetCPUSet(container); ok {
t.Fatal("deleted container still existing in state")
state.Delete(pod, container)
if _, ok := state.GetCPUSet(pod, container); ok {
t.Fatal("deleted container still existing in state")
}
}
}
})
@ -291,34 +346,38 @@ func TestCheckpointStateClear(t *testing.T) {
testCases := []struct {
description string
defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet
assignments map[string]map[string]cpuset.CPUSet
}{
{
"Valid state",
cpuset.NewCPUSet(1, 5, 10),
map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(1, 4),
map[string]map[string]cpuset.CPUSet{
"pod": {
"container1": cpuset.NewCPUSet(1, 4),
},
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none")
state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil)
if err != nil {
t.Fatalf("could not create testing checkpointState instance: %v", err)
}
state.SetDefaultCPUSet(tc.defaultCPUset)
state.SetCPUAssignments(tc.containers)
state.SetCPUAssignments(tc.assignments)
state.ClearState()
if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) {
t.Fatal("cleared state with non-empty default cpu set")
}
for container := range tc.containers {
if _, ok := state.GetCPUSet(container); ok {
t.Fatalf("container %q with non-default cpu set in cleared state", container)
for pod := range tc.assignments {
for container := range tc.assignments[pod] {
if _, ok := state.GetCPUSet(pod, container); ok {
t.Fatalf("container %q in pod %q with non-default cpu set in cleared state", container, pod)
}
}
}
})

View File

@ -30,8 +30,10 @@ const compatibilityTestingCheckpoint = "cpumanager_state_compatibility_test"
var state = &stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3),
}
@ -44,12 +46,12 @@ func TestFileToCheckpointCompatibility(t *testing.T) {
// ensure testing state is removed after testing
defer os.Remove(statePath)
fileState := NewFileState(statePath, "none")
fileState := NewFileState(statePath, "none", nil)
fileState.SetDefaultCPUSet(state.defaultCPUSet)
fileState.SetCPUAssignments(state.assignments)
restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none")
restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil)
if err != nil {
t.Fatalf("could not restore file state: %v", err)
}
@ -68,13 +70,13 @@ func TestCheckpointToFileCompatibility(t *testing.T) {
// ensure testing checkpoint is removed after testing
defer cpm.RemoveCheckpoint(compatibilityTestingCheckpoint)
checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none")
checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil)
require.NoError(t, err)
checkpointState.SetDefaultCPUSet(state.defaultCPUSet)
checkpointState.SetCPUAssignments(state.assignments)
restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none")
restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil)
AssertStateEqual(t, restoredState, state)
}

View File

@ -24,30 +24,39 @@ import (
"sync"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
type stateFileData struct {
type stateFileDataV1 struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]string `json:"entries,omitempty"`
}
type stateFileDataV2 struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCpuSet"`
Entries map[string]map[string]string `json:"entries,omitempty"`
}
var _ State = &stateFile{}
type stateFile struct {
sync.RWMutex
stateFilePath string
policyName string
cache State
stateFilePath string
policyName string
cache State
initialContainers containermap.ContainerMap
}
// NewFileState creates new State for keeping track of cpu/pod assignment with file backend
func NewFileState(filePath string, policyName string) State {
func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) State {
stateFile := &stateFile{
stateFilePath: filePath,
cache: NewMemoryState(),
policyName: policyName,
stateFilePath: filePath,
cache: NewMemoryState(),
policyName: policyName,
initialContainers: initialContainers,
}
if err := stateFile.tryRestoreState(); err != nil {
@ -61,6 +70,30 @@ func NewFileState(filePath string, policyName string) State {
return stateFile
}
// migrateV1StateToV2State() converts state from the v1 format to the v2 format
func (sf *stateFile) migrateV1StateToV2State(src *stateFileDataV1, dst *stateFileDataV2) error {
if src.PolicyName != "" {
dst.PolicyName = src.PolicyName
}
if src.DefaultCPUSet != "" {
dst.DefaultCPUSet = src.DefaultCPUSet
}
for containerID, cset := range src.Entries {
podUID, containerName, err := sf.initialContainers.GetContainerRef(containerID)
if err != nil {
return fmt.Errorf("containerID '%v' not found in initial containers list", containerID)
}
if dst.Entries == nil {
dst.Entries = make(map[string]map[string]string)
}
if _, exists := dst.Entries[podUID]; !exists {
dst.Entries[podUID] = make(map[string]string)
}
dst.Entries[podUID][containerName] = cset
}
return nil
}
// tryRestoreState tries to read state file, upon any error,
// err message is logged and state is left clean. un-initialized
func (sf *stateFile) tryRestoreState() error {
@ -90,28 +123,40 @@ func (sf *stateFile) tryRestoreState() error {
}
// File exists; try to read it.
var readState stateFileData
var readStateV1 stateFileDataV1
var readStateV2 stateFileDataV2
if err = json.Unmarshal(content, &readState); err != nil {
klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath)
return err
}
if sf.policyName != readState.PolicyName {
return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName)
}
if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil {
klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet)
return err
}
for containerID, cpuString := range readState.Entries {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
klog.Errorf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString)
if err = json.Unmarshal(content, &readStateV1); err != nil {
readStateV1 = stateFileDataV1{} // reset it back to 0
if err = json.Unmarshal(content, &readStateV2); err != nil {
klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath)
return err
}
tmpAssignments[containerID] = tmpContainerCPUSet
}
if err = sf.migrateV1StateToV2State(&readStateV1, &readStateV2); err != nil {
klog.Errorf("[cpumanager] state file: could not migrate v1 state to v2 state - \"%s\"", sf.stateFilePath)
return err
}
if sf.policyName != readStateV2.PolicyName {
return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readStateV2.PolicyName)
}
if tmpDefaultCPUSet, err = cpuset.Parse(readStateV2.DefaultCPUSet); err != nil {
klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readStateV2.DefaultCPUSet)
return err
}
for pod := range readStateV2.Entries {
tmpAssignments[pod] = make(map[string]cpuset.CPUSet)
for container, cpuString := range readStateV2.Entries[pod] {
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
klog.Errorf("[cpumanager] state file: could not parse state file - pod: %s, container: %s, cpuset: \"%s\"", pod, container, cpuString)
return err
}
tmpAssignments[pod][container] = tmpContainerCPUSet
}
}
sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
@ -128,14 +173,18 @@ func (sf *stateFile) storeState() {
var content []byte
var err error
data := stateFileData{
data := stateFileDataV2{
PolicyName: sf.policyName,
DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(),
Entries: map[string]string{},
Entries: map[string]map[string]string{},
}
for containerID, cset := range sf.cache.GetCPUAssignments() {
data.Entries[containerID] = cset.String()
assignments := sf.cache.GetCPUAssignments()
for pod := range assignments {
data.Entries[pod] = map[string]string{}
for container, cset := range assignments[pod] {
data.Entries[pod][container] = cset.String()
}
}
if content, err = json.Marshal(data); err != nil {
@ -147,11 +196,11 @@ func (sf *stateFile) storeState() {
}
}
func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
func (sf *stateFile) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
sf.RLock()
defer sf.RUnlock()
res, ok := sf.cache.GetCPUSet(containerID)
res, ok := sf.cache.GetCPUSet(podUID, containerName)
return res, ok
}
@ -162,11 +211,11 @@ func (sf *stateFile) GetDefaultCPUSet() cpuset.CPUSet {
return sf.cache.GetDefaultCPUSet()
}
func (sf *stateFile) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
func (sf *stateFile) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
sf.RLock()
defer sf.RUnlock()
return sf.cache.GetCPUSetOrDefault(containerID)
return sf.cache.GetCPUSetOrDefault(podUID, containerName)
}
func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments {
@ -175,10 +224,10 @@ func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments {
return sf.cache.GetCPUAssignments()
}
func (sf *stateFile) SetCPUSet(containerID string, cset cpuset.CPUSet) {
func (sf *stateFile) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
sf.Lock()
defer sf.Unlock()
sf.cache.SetCPUSet(containerID, cset)
sf.cache.SetCPUSet(podUID, containerName, cset)
sf.storeState()
}
@ -196,10 +245,10 @@ func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) {
sf.storeState()
}
func (sf *stateFile) Delete(containerID string) {
func (sf *stateFile) Delete(podUID string, containerName string) {
sf.Lock()
defer sf.Unlock()
sf.cache.Delete(containerID)
sf.cache.Delete(podUID, containerName)
sf.storeState()
}

View File

@ -27,6 +27,7 @@ import (
"strings"
"testing"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
@ -70,17 +71,19 @@ func stderrCapture(t *testing.T, f func() State) (bytes.Buffer, State) {
func TestFileStateTryRestore(t *testing.T) {
testCases := []struct {
description string
stateFileContent string
policyName string
expErr string
expPanic bool
expectedState *stateMemory
description string
stateFileContent string
policyName string
initialContainers containermap.ContainerMap
expErr string
expPanic bool
expectedState *stateMemory
}{
{
"Invalid JSON - one byte file",
"\n",
"none",
containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
true,
&stateMemory{},
@ -89,6 +92,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Invalid JSON - invalid content",
"{",
"none",
containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
true,
&stateMemory{},
@ -97,6 +101,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Try restore defaultCPUSet only",
`{"policyName": "none", "defaultCpuSet": "4-6"}`,
"none",
containermap.ContainerMap{},
"",
false,
&stateMemory{
@ -108,6 +113,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Try restore defaultCPUSet only - invalid name",
`{"policyName": "none", "defaultCpuSet" "4-6"}`,
"none",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`,
true,
&stateMemory{},
@ -117,17 +123,22 @@ func TestFileStateTryRestore(t *testing.T) {
`{
"policyName": "none",
"entries": {
"container1": "4-6",
"container2": "1-3"
"pod": {
"container1": "4-6",
"container2": "1-3"
}
}
}`,
"none",
containermap.ContainerMap{},
"",
false,
&stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
},
@ -140,6 +151,7 @@ func TestFileStateTryRestore(t *testing.T) {
"entries": {}
}`,
"B",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`,
true,
&stateMemory{},
@ -148,6 +160,7 @@ func TestFileStateTryRestore(t *testing.T) {
"Try restore invalid assignments",
`{"entries": }`,
"none",
containermap.ContainerMap{},
"[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)",
true,
&stateMemory{},
@ -158,17 +171,22 @@ func TestFileStateTryRestore(t *testing.T) {
"policyName": "none",
"defaultCpuSet": "23-24",
"entries": {
"container1": "4-6",
"container2": "1-3"
"pod": {
"container1": "4-6",
"container2": "1-3"
}
}
}`,
"none",
containermap.ContainerMap{},
"",
false,
&stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(23, 24),
},
@ -180,6 +198,7 @@ func TestFileStateTryRestore(t *testing.T) {
"defaultCpuSet": "2-sd"
}`,
"none",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`,
true,
&stateMemory{},
@ -190,11 +209,14 @@ func TestFileStateTryRestore(t *testing.T) {
"policyName": "none",
"defaultCpuSet": "23-24",
"entries": {
"container1": "p-6",
"container2": "1-3"
"pod": {
"container1": "p-6",
"container2": "1-3"
}
}
}`,
"none",
containermap.ContainerMap{},
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`,
true,
&stateMemory{},
@ -203,6 +225,7 @@ func TestFileStateTryRestore(t *testing.T) {
"tryRestoreState creates empty state file",
"",
"none",
containermap.ContainerMap{},
"",
false,
&stateMemory{
@ -210,6 +233,35 @@ func TestFileStateTryRestore(t *testing.T) {
defaultCPUSet: cpuset.NewCPUSet(),
},
},
{
"Try restore with migration",
`{
"policyName": "none",
"defaultCpuSet": "23-24",
"entries": {
"containerID1": "4-6",
"containerID2": "1-3"
}
}`,
"none",
func() containermap.ContainerMap {
cm := containermap.NewContainerMap()
cm.Add("pod", "container1", "containerID1")
cm.Add("pod", "container2", "containerID2")
return cm
}(),
"",
false,
&stateMemory{
assignments: ContainerCPUAssignments{
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(23, 24),
},
},
}
for idx, tc := range testCases {
@ -239,7 +291,7 @@ func TestFileStateTryRestore(t *testing.T) {
defer os.Remove(sfilePath.Name())
logData, fileState := stderrCapture(t, func() State {
return NewFileState(sfilePath.Name(), tc.policyName)
return NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers)
})
if tc.expErr != "" {
@ -284,7 +336,7 @@ func TestFileStateTryRestorePanic(t *testing.T) {
}
}
}()
NewFileState(sfilePath, "static")
NewFileState(sfilePath, "static", nil)
})
}
@ -315,8 +367,10 @@ func TestUpdateStateFile(t *testing.T) {
"",
&stateMemory{
assignments: ContainerCPUAssignments{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
"pod": map[string]cpuset.CPUSet{
"container1": cpuset.NewCPUSet(4, 5, 6),
"container2": cpuset.NewCPUSet(1, 2, 3),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
},
@ -363,7 +417,7 @@ func TestUpdateStateFile(t *testing.T) {
return
}
}
newFileState := NewFileState(sfilePath.Name(), "static")
newFileState := NewFileState(sfilePath.Name(), "static", nil)
AssertStateEqual(t, newFileState, tc.expectedState)
})
}
@ -373,35 +427,43 @@ func TestHelpersStateFile(t *testing.T) {
testCases := []struct {
description string
defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet
assignments map[string]map[string]cpuset.CPUSet
}{
{
description: "one container",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(0, 1),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1),
},
},
},
{
description: "two containers",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3, 4, 5),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3, 4, 5),
},
},
},
{
description: "container with more cpus than is possible",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(0, 10),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 10),
},
},
},
{
description: "container without assigned cpus",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(),
},
},
},
}
@ -414,19 +476,21 @@ func TestHelpersStateFile(t *testing.T) {
t.Errorf("cannot create temporary test file: %q", err.Error())
}
state := NewFileState(sfFile.Name(), "static")
state := NewFileState(sfFile.Name(), "static", nil)
state.SetDefaultCPUSet(tc.defaultCPUset)
for containerName, containerCPUs := range tc.containers {
state.SetCPUSet(containerName, containerCPUs)
if cpus, _ := state.GetCPUSet(containerName); !cpus.Equals(containerCPUs) {
t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus)
}
state.Delete(containerName)
if cpus := state.GetCPUSetOrDefault(containerName); !cpus.Equals(tc.defaultCPUset) {
t.Error("deleted container still existing in state")
}
for podUID := range tc.assignments {
for containerName, containerCPUs := range tc.assignments[podUID] {
state.SetCPUSet(podUID, containerName, containerCPUs)
if cpus, _ := state.GetCPUSet(podUID, containerName); !cpus.Equals(containerCPUs) {
t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus)
}
state.Delete(podUID, containerName)
if cpus := state.GetCPUSetOrDefault(podUID, containerName); !cpus.Equals(tc.defaultCPUset) {
t.Error("deleted container still existing in state")
}
}
}
})
@ -437,15 +501,17 @@ func TestClearStateStateFile(t *testing.T) {
testCases := []struct {
description string
defaultCPUset cpuset.CPUSet
containers map[string]cpuset.CPUSet
assignments map[string]map[string]cpuset.CPUSet
}{
{
description: "valid file",
defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8),
containers: map[string]cpuset.CPUSet{
"c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3),
"c3": cpuset.NewCPUSet(4, 5),
assignments: map[string]map[string]cpuset.CPUSet{
"pod": {
"c1": cpuset.NewCPUSet(0, 1),
"c2": cpuset.NewCPUSet(2, 3),
"c3": cpuset.NewCPUSet(4, 5),
},
},
},
}
@ -457,19 +523,23 @@ func TestClearStateStateFile(t *testing.T) {
t.Errorf("cannot create temporary test file: %q", err.Error())
}
state := NewFileState(sfFile.Name(), "static")
state := NewFileState(sfFile.Name(), "static", nil)
state.SetDefaultCPUSet(testCase.defaultCPUset)
for containerName, containerCPUs := range testCase.containers {
state.SetCPUSet(containerName, containerCPUs)
for podUID := range testCase.assignments {
for containerName, containerCPUs := range testCase.assignments[podUID] {
state.SetCPUSet(podUID, containerName, containerCPUs)
}
}
state.ClearState()
if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) {
t.Error("cleared state shouldn't has got information about available cpuset")
}
for containerName := range testCase.containers {
if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(containerName)) {
t.Error("cleared state shouldn't has got information about containers")
for podUID := range testCase.assignments {
for containerName := range testCase.assignments[podUID] {
if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(podUID, containerName)) {
t.Error("cleared state shouldn't has got information about containers")
}
}
}
})

View File

@ -40,11 +40,11 @@ func NewMemoryState() State {
}
}
func (s *stateMemory) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
func (s *stateMemory) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
s.RLock()
defer s.RUnlock()
res, ok := s.assignments[containerID]
res, ok := s.assignments[podUID][containerName]
return res.Clone(), ok
}
@ -55,8 +55,8 @@ func (s *stateMemory) GetDefaultCPUSet() cpuset.CPUSet {
return s.defaultCPUSet.Clone()
}
func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(containerID); ok {
func (s *stateMemory) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(podUID, containerName); ok {
return res
}
return s.GetDefaultCPUSet()
@ -68,12 +68,16 @@ func (s *stateMemory) GetCPUAssignments() ContainerCPUAssignments {
return s.assignments.Clone()
}
func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) {
func (s *stateMemory) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
s.Lock()
defer s.Unlock()
s.assignments[containerID] = cset
klog.Infof("[cpumanager] updated desired cpuset (container id: %s, cpuset: \"%s\")", containerID, cset)
if _, ok := s.assignments[podUID]; !ok {
s.assignments[podUID] = make(map[string]cpuset.CPUSet)
}
s.assignments[podUID][containerName] = cset
klog.Infof("[cpumanager] updated desired cpuset (pod: %s, container: %s, cpuset: \"%s\")", podUID, containerName, cset)
}
func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) {
@ -92,12 +96,15 @@ func (s *stateMemory) SetCPUAssignments(a ContainerCPUAssignments) {
klog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a)
}
func (s *stateMemory) Delete(containerID string) {
func (s *stateMemory) Delete(podUID string, containerName string) {
s.Lock()
defer s.Unlock()
delete(s.assignments, containerID)
klog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID)
delete(s.assignments[podUID], containerName)
if len(s.assignments[podUID]) == 0 {
delete(s.assignments, podUID)
}
klog.V(2).Infof("[cpumanager] deleted cpuset assignment (pod: %s, container: %s)", podUID, containerName)
}
func (s *stateMemory) ClearState() {

View File

@ -31,13 +31,13 @@ import (
)
func TestGetTopologyHints(t *testing.T) {
testPod1 := makePod("2", "2")
testPod1 := makePod("fakePod", "fakeContainer", "2", "2")
testContainer1 := &testPod1.Spec.Containers[0]
testPod2 := makePod("5", "5")
testPod2 := makePod("fakePod", "fakeContainer", "5", "5")
testContainer2 := &testPod2.Spec.Containers[0]
testPod3 := makePod("7", "7")
testPod3 := makePod("fakePod", "fakeContainer", "7", "7")
testContainer3 := &testPod3.Spec.Containers[0]
testPod4 := makePod("11", "11")
testPod4 := makePod("fakePod", "fakeContainer", "11", "11")
testContainer4 := &testPod4.Spec.Containers[0]
firstSocketMask, _ := bitmask.NewBitMask(0)
@ -156,7 +156,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6),
string(testPod1.UID): map[string]cpuset.CPUSet{
testContainer1.Name: cpuset.NewCPUSet(0, 6),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
@ -175,7 +177,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(3, 9),
string(testPod1.UID): map[string]cpuset.CPUSet{
testContainer1.Name: cpuset.NewCPUSet(3, 9),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
@ -194,7 +198,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod4,
container: *testContainer4,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
string(testPod4.UID): map[string]cpuset.CPUSet{
testContainer4.Name: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{
@ -209,7 +215,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod1,
container: *testContainer1,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6, 3, 9),
string(testPod1.UID): map[string]cpuset.CPUSet{
testContainer1.Name: cpuset.NewCPUSet(0, 6, 3, 9),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{},
@ -219,7 +227,9 @@ func TestGetTopologyHints(t *testing.T) {
pod: *testPod4,
container: *testContainer4,
assignments: state.ContainerCPUAssignments{
"": cpuset.NewCPUSet(0, 6, 3, 9),
string(testPod4.UID): map[string]cpuset.CPUSet{
testContainer4.Name: cpuset.NewCPUSet(0, 6, 3, 9),
},
},
defaultCPUSet: cpuset.NewCPUSet(),
expectedHints: []topologymanager.TopologyHint{},

View File

@ -124,7 +124,7 @@ func waitForContainerRemoval(containerName, podName, podNS string) {
func waitForStateFileCleanedUp() {
gomega.Eventually(func() bool {
restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static")
restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static", nil)
framework.ExpectNoError(err, "failed to create testing cpumanager state instance")
assignments := restoredState.GetCPUAssignments()
if len(assignments) == 0 {