diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index e44b07bc8a2..6236164c799 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "container_map.go", "cpu_assignment.go", "cpu_manager.go", "fake_cpu_manager.go", diff --git a/pkg/kubelet/cm/cpumanager/container_map.go b/pkg/kubelet/cm/cpumanager/container_map.go new file mode 100644 index 00000000000..2da2c931b60 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/container_map.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "fmt" + + "k8s.io/api/core/v1" +) + +// containerMap maps (podUID, containerName) -> containerID +type containerMap map[string]map[string]string + +func newContainerMap() containerMap { + return make(containerMap) +} + +func (cm containerMap) Add(p *v1.Pod, c *v1.Container, containerID string) { + podUID := string(p.UID) + if _, exists := cm[podUID]; !exists { + cm[podUID] = make(map[string]string) + } + cm[podUID][c.Name] = containerID +} + +func (cm containerMap) Remove(containerID string) { + found := false + for podUID := range cm { + for containerName := range cm[podUID] { + if containerID == cm[podUID][containerName] { + delete(cm[podUID], containerName) + found = true + break + } + } + if len(cm[podUID]) == 0 { + delete(cm, podUID) + } + if found { + break + } + } +} + +func (cm containerMap) Get(p *v1.Pod, c *v1.Container) (string, error) { + podUID := string(p.UID) + if _, exists := cm[podUID]; !exists { + return "", fmt.Errorf("pod %s not in containerMap", podUID) + } + if _, exists := cm[podUID][c.Name]; !exists { + return "", fmt.Errorf("container %s not in containerMap for pod %s", c.Name, podUID) + } + return cm[podUID][c.Name], nil +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 38dce8b08e3..c374cb42e55 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -73,6 +73,10 @@ type staticPolicy struct { topology *topology.CPUTopology // set of CPUs that is not available for exclusive assignment reserved cpuset.CPUSet + // containerMap provides a mapping from + // (pod, container) -> containerID + // for all containers a pod + containerMap containerMap } // Ensure staticPolicy implements Policy interface @@ -97,8 +101,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) return &staticPolicy{ - topology: topology, - reserved: reserved, + topology: topology, + reserved: reserved, + containerMap: newContainerMap(), } } @@ -172,7 +177,15 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } -func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { +func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) (rerr error) { + // So long as this function does not return an error, + // add (pod, container, containerID) to the containerMap. + defer func() { + if rerr == nil { + p.containerMap.Add(pod, container, containerID) + } + }() + if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 { klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) // container belongs in an exclusively allocated pool @@ -182,6 +195,22 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co return nil } + // Proactively remove CPUs from init containers that have already run. + // They are guaranteed to have run to completion before any other + // container is run. + for _, initContainer := range pod.Spec.InitContainers { + if container.Name != initContainer.Name { + initContainerID, err := p.containerMap.Get(pod, &initContainer) + if err != nil { + continue + } + err = p.RemoveContainer(s, initContainerID) + if err != nil { + klog.Warningf("[cpumanager] unable to remove init container (container id: %s, error: %v)", initContainerID, err) + } + } + } + cpuset, err := p.allocateCPUs(s, numCPUs) if err != nil { klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err) @@ -193,7 +222,15 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co return nil } -func (p *staticPolicy) RemoveContainer(s state.State, containerID string) error { +func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr error) { + // So long as this function does not return an error, + // remove containerID from the containerMap. + defer func() { + if rerr == nil { + p.containerMap.Remove(containerID) + } + }() + klog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID) if toRelease, ok := s.GetCPUSet(containerID); ok { s.Delete(containerID)