Merge pull request #51180 from ConnorDoyle/cpu-manager-static-policy

Automatic merge from submit-queue (batch tested with PRs 51180, 51893)

CPU manager static policy

Blocker for CPU manager #49186 (5 of 6)

* Previous PR in this series: #51357
* Next PR in this series: #51041

cc @derekwaynecarr @sjenning @flyingcougar @balajismaniam 

Attempting to be fairly accurate with main authorship at least at a file level -- please let me know if anyone has a better idea on how to improve this.

For posterity, here are the Kubelet flags to run the static policy (assuming `/kube-reserved` is a cgroup that exists for all required controllers)

`--feature-gates=CPUManager=true --cpu-manager-policy=static --cpu-manager-reconcile-period=5s --enforce-node-allocatable=pods,kube-reserved --kube-reserved-cgroup=/kube-reserved --kube-reserved=cpu=500m`

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue
2017-09-04 19:42:15 -07:00
committed by GitHub
39 changed files with 2701 additions and 31 deletions

View File

@@ -327,6 +327,8 @@ func AddKubeletConfigFlags(fs *pflag.FlagSet, c *kubeletconfig.KubeletConfigurat
fs.BoolVar(&c.CgroupsPerQOS, "cgroups-per-qos", c.CgroupsPerQOS, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.") fs.BoolVar(&c.CgroupsPerQOS, "cgroups-per-qos", c.CgroupsPerQOS, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.")
fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'") fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'")
fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.") fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.")
fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, "<Warning: Alpha feature> CPU Manager policy to use. Possible values: 'none', 'static'. Default: 'none'")
fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, "<Warning: Alpha feature> CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to `NodeStatusUpdateFrequency`")
fs.StringVar(&c.ContainerRuntime, "container-runtime", c.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'.") fs.StringVar(&c.ContainerRuntime, "container-runtime", c.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'.")
fs.DurationVar(&c.RuntimeRequestTimeout.Duration, "runtime-request-timeout", c.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later.") fs.DurationVar(&c.RuntimeRequestTimeout.Duration, "runtime-request-timeout", c.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later.")
fs.StringVar(&c.LockFilePath, "lock-file", c.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.") fs.StringVar(&c.LockFilePath, "lock-file", c.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")

View File

@@ -451,6 +451,8 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
HardEvictionThresholds: hardEvictionThresholds, HardEvictionThresholds: hardEvictionThresholds,
}, },
ExperimentalQOSReserved: *experimentalQOSReserved, ExperimentalQOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
}, },
s.FailSwapOn, s.FailSwapOn,
devicePluginEnabled, devicePluginEnabled,

View File

@@ -151,6 +151,12 @@ const (
// //
// Enable mount propagation of volumes. // Enable mount propagation of volumes.
MountPropagation utilfeature.Feature = "MountPropagation" MountPropagation utilfeature.Feature = "MountPropagation"
// owner: @ConnorDoyle
// alpha: v1.8
//
// Alternative container-level CPU affinity policies.
CPUManager utilfeature.Feature = "CPUManager"
) )
func init() { func init() {
@@ -180,6 +186,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
TaintNodesByCondition: {Default: false, PreRelease: utilfeature.Alpha}, TaintNodesByCondition: {Default: false, PreRelease: utilfeature.Alpha},
MountPropagation: {Default: false, PreRelease: utilfeature.Alpha}, MountPropagation: {Default: false, PreRelease: utilfeature.Alpha},
ExpandPersistentVolumes: {Default: false, PreRelease: utilfeature.Alpha}, ExpandPersistentVolumes: {Default: false, PreRelease: utilfeature.Alpha},
CPUManager: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed // inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side: // unintentionally on either side:

View File

@@ -214,6 +214,10 @@ type KubeletConfiguration struct {
RemoteRuntimeEndpoint string RemoteRuntimeEndpoint string
// remoteImageEndpoint is the endpoint of remote image service // remoteImageEndpoint is the endpoint of remote image service
RemoteImageEndpoint string RemoteImageEndpoint string
// CPUManagerPolicy is the name of the policy to use.
CPUManagerPolicy string
// CPU Manager reconciliation period.
CPUManagerReconcilePeriod metav1.Duration
// runtimeRequestTimeout is the timeout for all runtime requests except long running // runtimeRequestTimeout is the timeout for all runtime requests except long running
// requests - pull, logs, exec and attach. // requests - pull, logs, exec and attach.
// +optional // +optional

View File

@@ -171,6 +171,12 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
if obj.NodeStatusUpdateFrequency == zeroDuration { if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second} obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
} }
if obj.CPUManagerPolicy == "" {
obj.CPUManagerPolicy = "none"
}
if obj.CPUManagerReconcilePeriod == zeroDuration {
obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency
}
if obj.OOMScoreAdj == nil { if obj.OOMScoreAdj == nil {
temp := int32(qos.KubeletOOMScoreAdj) temp := int32(qos.KubeletOOMScoreAdj)
obj.OOMScoreAdj = &temp obj.OOMScoreAdj = &temp

View File

@@ -206,6 +206,10 @@ type KubeletConfiguration struct {
RemoteRuntimeEndpoint string `json:"remoteRuntimeEndpoint"` RemoteRuntimeEndpoint string `json:"remoteRuntimeEndpoint"`
// remoteImageEndpoint is the endpoint of remote image service // remoteImageEndpoint is the endpoint of remote image service
RemoteImageEndpoint string `json:"remoteImageEndpoint"` RemoteImageEndpoint string `json:"remoteImageEndpoint"`
// CPUManagerPolicy is the name of the policy to use.
CPUManagerPolicy string `json:"cpuManagerPolicy"`
// CPU Manager reconciliation period.
CPUManagerReconcilePeriod metav1.Duration `json:"cpuManagerReconcilePeriod"`
// runtimeRequestTimeout is the timeout for all runtime requests except long running // runtimeRequestTimeout is the timeout for all runtime requests except long running
// requests - pull, logs, exec and attach. // requests - pull, logs, exec and attach.
RuntimeRequestTimeout metav1.Duration `json:"runtimeRequestTimeout"` RuntimeRequestTimeout metav1.Duration `json:"runtimeRequestTimeout"`

View File

@@ -227,6 +227,8 @@ func autoConvert_v1alpha1_KubeletConfiguration_To_kubeletconfig_KubeletConfigura
out.ContainerRuntime = in.ContainerRuntime out.ContainerRuntime = in.ContainerRuntime
out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint
out.RemoteImageEndpoint = in.RemoteImageEndpoint out.RemoteImageEndpoint = in.RemoteImageEndpoint
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
out.ExperimentalMounterPath = in.ExperimentalMounterPath out.ExperimentalMounterPath = in.ExperimentalMounterPath
if err := v1.Convert_Pointer_string_To_string(&in.LockFilePath, &out.LockFilePath, s); err != nil { if err := v1.Convert_Pointer_string_To_string(&in.LockFilePath, &out.LockFilePath, s); err != nil {
@@ -390,6 +392,8 @@ func autoConvert_kubeletconfig_KubeletConfiguration_To_v1alpha1_KubeletConfigura
out.ContainerRuntime = in.ContainerRuntime out.ContainerRuntime = in.ContainerRuntime
out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint
out.RemoteImageEndpoint = in.RemoteImageEndpoint out.RemoteImageEndpoint = in.RemoteImageEndpoint
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
out.ExperimentalMounterPath = in.ExperimentalMounterPath out.ExperimentalMounterPath = in.ExperimentalMounterPath
if err := v1.Convert_string_To_Pointer_string(&in.LockFilePath, &out.LockFilePath, s); err != nil { if err := v1.Convert_string_To_Pointer_string(&in.LockFilePath, &out.LockFilePath, s); err != nil {

View File

@@ -299,6 +299,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
**out = **in **out = **in
} }
} }
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
if in.LockFilePath != nil { if in.LockFilePath != nil {
in, out := &in.LockFilePath, &out.LockFilePath in, out := &in.LockFilePath, &out.LockFilePath

View File

@@ -164,6 +164,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.ImageMinimumGCAge = in.ImageMinimumGCAge out.ImageMinimumGCAge = in.ImageMinimumGCAge
out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
if in.RegisterWithTaints != nil { if in.RegisterWithTaints != nil {
in, out := &in.RegisterWithTaints, &out.RegisterWithTaints in, out := &in.RegisterWithTaints, &out.RegisterWithTaints

View File

@@ -9,7 +9,9 @@ go_library(
"container_manager_unsupported.go", "container_manager_unsupported.go",
"device_plugin_handler.go", "device_plugin_handler.go",
"device_plugin_handler_stub.go", "device_plugin_handler_stub.go",
"fake_internal_container_lifecycle.go",
"helpers_unsupported.go", "helpers_unsupported.go",
"internal_container_lifecycle.go",
"pod_container_manager_stub.go", "pod_container_manager_stub.go",
"pod_container_manager_unsupported.go", "pod_container_manager_unsupported.go",
"types.go", "types.go",
@@ -29,18 +31,23 @@ go_library(
}), }),
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/deviceplugin:go_default_library", "//pkg/kubelet/deviceplugin:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library", "//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/util/mount:go_default_library", "//pkg/util/mount:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library",
] + select({ ] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [ "@io_bazel_rules_go//go/platform:linux_amd64": [

View File

@@ -17,12 +17,16 @@ limitations under the License.
package cm package cm
import ( import (
"time"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
// TODO: Migrate kubelet to either use its own internal objects or client library. // TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/status"
"fmt" "fmt"
"strconv" "strconv"
@@ -36,7 +40,7 @@ type ContainerManager interface {
// Runs the container manager's housekeeping. // Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container. // - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run. // - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc) error Start(*v1.Node, ActivePodsFunc, status.PodStatusProvider, internalapi.RuntimeService) error
// Returns resources allocated to system cgroups in the machine. // Returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services. // These cgroups include the system and Kubernetes services.
@@ -71,6 +75,8 @@ type ContainerManager interface {
// Returns RunContainerOptions with devices, mounts, and env fields populated for // Returns RunContainerOptions with devices, mounts, and env fields populated for
// extended resources required by container. // extended resources required by container.
GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error)
InternalContainerLifecycle() InternalContainerLifecycle
} }
type NodeConfig struct { type NodeConfig struct {
@@ -84,6 +90,8 @@ type NodeConfig struct {
ProtectKernelDefaults bool ProtectKernelDefaults bool
NodeAllocatableConfig NodeAllocatableConfig
ExperimentalQOSReserved map[v1.ResourceName]int64 ExperimentalQOSReserved map[v1.ResourceName]int64
ExperimentalCPUManagerPolicy string
ExperimentalCPUManagerReconcilePeriod time.Duration
} }
type NodeAllocatableConfig struct { type NodeAllocatableConfig struct {

View File

@@ -33,16 +33,22 @@ import (
"github.com/opencontainers/runc/libcontainer/cgroups" "github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs" "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs" "github.com/opencontainers/runc/libcontainer/configs"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
kubefeatures "k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/status"
utilfile "k8s.io/kubernetes/pkg/util/file" utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
@@ -120,6 +126,8 @@ type containerManagerImpl struct {
qosContainerManager QOSContainerManager qosContainerManager QOSContainerManager
// Interface for exporting and allocating devices reported by device plugins. // Interface for exporting and allocating devices reported by device plugins.
devicePluginHandler DevicePluginHandler devicePluginHandler DevicePluginHandler
// Interface for CPU affinity management.
cpuManager cpumanager.Manager
} }
type features struct { type features struct {
@@ -219,11 +227,11 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because // It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
// machine info is computed and cached once as part of cAdvisor object creation. // machine info is computed and cached once as part of cAdvisor object creation.
// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts // But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
if info, err := cadvisorInterface.MachineInfo(); err == nil { machineInfo, err := cadvisorInterface.MachineInfo()
capacity = cadvisor.CapacityFromMachineInfo(info) if err != nil {
} else {
return nil, err return nil, err
} }
capacity = cadvisor.CapacityFromMachineInfo(machineInfo)
cgroupRoot := nodeConfig.CgroupRoot cgroupRoot := nodeConfig.CgroupRoot
cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver) cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
@@ -287,6 +295,20 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err return nil, err
} }
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
cm.GetNodeAllocatableReservation(),
)
if err != nil {
glog.Errorf("failed to initialize cpu manager: %v", err)
return nil, err
}
}
return cm, nil return cm, nil
} }
@@ -306,6 +328,10 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
} }
} }
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager}
}
// Create a cgroup container manager. // Create a cgroup container manager.
func createManager(containerName string) *fs.Manager { func createManager(containerName string) *fs.Manager {
allowAllDevices := true allowAllDevices := true
@@ -512,7 +538,16 @@ func (cm *containerManagerImpl) Status() Status {
return cm.status return cm.status
} }
func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc) error { func (cm *containerManagerImpl) Start(node *v1.Node,
activePods ActivePodsFunc,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService) error {
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), podStatusProvider, runtimeService)
}
// cache the node Info including resource capacity and // cache the node Info including resource capacity and
// allocatable of the node // allocatable of the node
cm.nodeInfo = node cm.nodeInfo = node

View File

@@ -20,14 +20,17 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
) )
type containerManagerStub struct{} type containerManagerStub struct{}
var _ ContainerManager = &containerManagerStub{} var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc) error { func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
glog.V(2).Infof("Starting stub container manager") glog.V(2).Infof("Starting stub container manager")
return nil return nil
} }
@@ -72,6 +75,10 @@ func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Containe
return &kubecontainer.RunContainerOptions{}, nil return &kubecontainer.RunContainerOptions{}, nil
} }
func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
}
func NewStubContainerManager() ContainerManager { func NewStubContainerManager() ContainerManager {
return &containerManagerStub{} return &containerManagerStub{}
} }

View File

@@ -23,8 +23,11 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
) )
@@ -33,7 +36,7 @@ type unsupportedContainerManager struct {
var _ ContainerManager = &unsupportedContainerManager{} var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc) error { func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
return fmt.Errorf("Container Manager is unsupported in this build") return fmt.Errorf("Container Manager is unsupported in this build")
} }
@@ -77,6 +80,10 @@ func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.C
return &kubecontainer.RunContainerOptions{}, nil return &kubecontainer.RunContainerOptions{}, nil
} }
func (cm *unsupportedContainerManager) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()}
}
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil return &unsupportedContainerManager{}, nil
} }

View File

@@ -23,7 +23,9 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
) )
@@ -33,7 +35,7 @@ type containerManagerImpl struct {
var _ ContainerManager = &containerManagerImpl{} var _ ContainerManager = &containerManagerImpl{}
func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc) error { func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
glog.V(2).Infof("Starting Windows stub container manager") glog.V(2).Infof("Starting Windows stub container manager")
return nil return nil
} }

View File

@@ -1,17 +1,51 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"cpu_assignment.go",
"cpu_manager.go", "cpu_manager.go",
"fake_cpu_manager.go",
"policy.go", "policy.go",
"policy_none.go",
"policy_static.go",
], ],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/status:go_default_library", "//pkg/kubelet/status:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"cpu_assignment_test.go",
"cpu_manager_test.go",
"policy_none_test.go",
"policy_static_test.go",
"policy_test.go",
],
library = ":go_default_library",
deps = [
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
], ],
) )

View File

@@ -0,0 +1,197 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"sort"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
type cpuAccumulator struct {
topo *topology.CPUTopology
details topology.CPUDetails
numCPUsNeeded int
result cpuset.CPUSet
}
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator {
return &cpuAccumulator{
topo: topo,
details: topo.CPUDetails.KeepOnly(availableCPUs),
numCPUsNeeded: numCPUs,
result: cpuset.NewCPUSet(),
}
}
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
a.result = a.result.Union(cpus)
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
a.numCPUsNeeded -= cpus.Size()
}
// Returns true if the supplied socket is fully available in `topoDetails`.
func (a *cpuAccumulator) isSocketFree(socketID int) bool {
return a.details.CPUsInSocket(socketID).Size() == a.topo.CPUsPerSocket()
}
// Returns true if the supplied core is fully available in `topoDetails`.
func (a *cpuAccumulator) isCoreFree(coreID int) bool {
return a.details.CPUsInCore(coreID).Size() == a.topo.CPUsPerCore()
}
// Returns free socket IDs as a slice sorted by:
// - socket ID, ascending.
func (a *cpuAccumulator) freeSockets() []int {
return a.details.Sockets().Filter(a.isSocketFree).ToSlice()
}
// Returns core IDs as a slice sorted by:
// - the number of whole available cores on the socket, ascending
// - socket ID, ascending
// - core ID, ascending
func (a *cpuAccumulator) freeCores() []int {
socketIDs := a.details.Sockets().ToSlice()
sort.Slice(socketIDs,
func(i, j int) bool {
iCores := a.details.CoresInSocket(socketIDs[i]).Filter(a.isCoreFree)
jCores := a.details.CoresInSocket(socketIDs[j]).Filter(a.isCoreFree)
return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j]
})
coreIDs := []int{}
for _, s := range socketIDs {
coreIDs = append(coreIDs, a.details.CoresInSocket(s).Filter(a.isCoreFree).ToSlice()...)
}
return coreIDs
}
// Returns CPU IDs as a slice sorted by:
// - socket affinity with result
// - number of CPUs available on the same sockett
// - number of CPUs available on the same core
// - socket ID.
// - core ID.
func (a *cpuAccumulator) freeCPUs() []int {
result := []int{}
cores := a.details.Cores().ToSlice()
sort.Slice(
cores,
func(i, j int) bool {
iCore := cores[i]
jCore := cores[j]
iCPUs := a.topo.CPUDetails.CPUsInCore(iCore).ToSlice()
jCPUs := a.topo.CPUDetails.CPUsInCore(jCore).ToSlice()
iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID
jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID
// Compute the number of CPUs in the result reside on the same socket
// as each core.
iSocketColoScore := a.topo.CPUDetails.CPUsInSocket(iSocket).Intersection(a.result).Size()
jSocketColoScore := a.topo.CPUDetails.CPUsInSocket(jSocket).Intersection(a.result).Size()
// Compute the number of available CPUs available on the same socket
// as each core.
iSocketFreeScore := a.details.CPUsInSocket(iSocket).Size()
jSocketFreeScore := a.details.CPUsInSocket(jSocket).Size()
// Compute the number of available CPUs on each core.
iCoreFreeScore := a.details.CPUsInCore(iCore).Size()
jCoreFreeScore := a.details.CPUsInCore(jCore).Size()
return iSocketColoScore > jSocketColoScore ||
iSocketFreeScore < jSocketFreeScore ||
iCoreFreeScore < jCoreFreeScore ||
iSocket < jSocket ||
iCore < jCore
})
// For each core, append sorted CPU IDs to result.
for _, core := range cores {
result = append(result, a.details.CPUsInCore(core).ToSlice()...)
}
return result
}
func (a *cpuAccumulator) needs(n int) bool {
return a.numCPUsNeeded >= n
}
func (a *cpuAccumulator) isSatisfied() bool {
return a.numCPUsNeeded < 1
}
func (a *cpuAccumulator) isFailed() bool {
return a.numCPUsNeeded > a.details.CPUs().Size()
}
func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
if acc.isSatisfied() {
return acc.result, nil
}
if acc.isFailed() {
return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
}
// Algorithm: topology-aware best-fit
// 1. Acquire whole sockets, if available and the container requires at
// least a socket's-worth of CPUs.
for _, s := range acc.freeSockets() {
if acc.needs(acc.topo.CPUsPerSocket()) {
glog.V(4).Infof("[cpumanager] takeByTopology: claiming socket [%d]", s)
acc.take(acc.details.CPUsInSocket(s))
if acc.isSatisfied() {
return acc.result, nil
}
}
}
// 2. Acquire whole cores, if available and the container requires at least
// a core's-worth of CPUs.
for _, c := range acc.freeCores() {
if acc.needs(acc.topo.CPUsPerCore()) {
glog.V(4).Infof("[cpumanager] takeByTopology: claiming core [%d]", c)
acc.take(acc.details.CPUsInCore(c))
if acc.isSatisfied() {
return acc.result, nil
}
}
}
// 3. Acquire single threads, preferring to fill partially-allocated cores
// on the same sockets as the whole cores we have already taken in this
// allocation.
for _, c := range acc.freeCPUs() {
glog.V(4).Infof("[cpumanager] takeByTopology: claiming CPU [%d]", c)
if acc.needs(1) {
acc.take(cpuset.NewCPUSet(c))
}
if acc.isSatisfied() {
return acc.result, nil
}
}
return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
}

View File

@@ -0,0 +1,385 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
func TestCPUAccumulatorFreeSockets(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
availableCPUs cpuset.CPUSet
expect []int
}{
{
"single socket HT, 1 socket free",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
[]int{0},
},
{
"single socket HT, 0 sockets free",
topoSingleSocketHT,
cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
[]int{},
},
{
"dual socket HT, 2 sockets free",
topoDualSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
[]int{0, 1},
},
{
"dual socket HT, 1 socket free",
topoDualSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11),
[]int{1},
},
{
"dual socket HT, 0 sockets free",
topoDualSocketHT,
cpuset.NewCPUSet(0, 2, 3, 4, 5, 6, 7, 8, 9, 11),
[]int{},
},
}
for _, tc := range testCases {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
result := acc.freeSockets()
if !reflect.DeepEqual(result, tc.expect) {
t.Errorf("[%s] expected %v to equal %v", tc.description, result, tc.expect)
}
}
}
func TestCPUAccumulatorFreeCores(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
availableCPUs cpuset.CPUSet
expect []int
}{
{
"single socket HT, 4 cores free",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
[]int{0, 1, 2, 3},
},
{
"single socket HT, 3 cores free",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 4, 5, 6),
[]int{0, 1, 2},
},
{
"single socket HT, 3 cores free (1 partially consumed)",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6),
[]int{0, 1, 2},
},
{
"single socket HT, 0 cores free",
topoSingleSocketHT,
cpuset.NewCPUSet(),
[]int{},
},
{
"single socket HT, 0 cores free (4 partially consumed)",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3),
[]int{},
},
{
"dual socket HT, 6 cores free",
topoDualSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
[]int{0, 2, 4, 1, 3, 5},
},
{
"dual socket HT, 5 cores free (1 consumed from socket 0)",
topoDualSocketHT,
cpuset.NewCPUSet(2, 1, 3, 4, 5, 7, 8, 9, 10, 11),
[]int{2, 4, 1, 3, 5},
},
{
"dual socket HT, 4 cores free (1 consumed from each socket)",
topoDualSocketHT,
cpuset.NewCPUSet(2, 3, 4, 5, 8, 9, 10, 11),
[]int{2, 4, 3, 5},
},
}
for _, tc := range testCases {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
result := acc.freeCores()
if !reflect.DeepEqual(result, tc.expect) {
t.Errorf("[%s] expected %v to equal %v", tc.description, result, tc.expect)
}
}
}
func TestCPUAccumulatorFreeCPUs(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
availableCPUs cpuset.CPUSet
expect []int
}{
{
"single socket HT, 8 cpus free",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
[]int{0, 4, 1, 5, 2, 6, 3, 7},
},
{
"single socket HT, 5 cpus free",
topoSingleSocketHT,
cpuset.NewCPUSet(3, 4, 5, 6, 7),
[]int{4, 5, 6, 3, 7},
},
{
"dual socket HT, 12 cpus free",
topoDualSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
[]int{0, 6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11},
},
{
"dual socket HT, 11 cpus free",
topoDualSocketHT,
cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
[]int{6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11},
},
{
"dual socket HT, 10 cpus free",
topoDualSocketHT,
cpuset.NewCPUSet(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
[]int{2, 8, 4, 10, 1, 7, 3, 9, 5, 11},
},
}
for _, tc := range testCases {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
result := acc.freeCPUs()
if !reflect.DeepEqual(result, tc.expect) {
t.Errorf("[%s] expected %v to equal %v", tc.description, result, tc.expect)
}
}
}
func TestCPUAccumulatorTake(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
availableCPUs cpuset.CPUSet
takeCPUs []cpuset.CPUSet
numCPUs int
expectSatisfied bool
expectFailed bool
}{
{
"take 0 cpus from a single socket HT, require 1",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
[]cpuset.CPUSet{cpuset.NewCPUSet()},
1,
false,
false,
},
{
"take 0 cpus from a single socket HT, require 1, none available",
topoSingleSocketHT,
cpuset.NewCPUSet(),
[]cpuset.CPUSet{cpuset.NewCPUSet()},
1,
false,
true,
},
{
"take 1 cpu from a single socket HT, require 1",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
[]cpuset.CPUSet{cpuset.NewCPUSet(0)},
1,
true,
false,
},
{
"take 1 cpu from a single socket HT, require 2",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
[]cpuset.CPUSet{cpuset.NewCPUSet(0)},
2,
false,
false,
},
{
"take 2 cpu from a single socket HT, require 4, expect failed",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2),
[]cpuset.CPUSet{cpuset.NewCPUSet(0), cpuset.NewCPUSet(1)},
4,
false,
true,
},
{
"take all cpus one at a time from a single socket HT, require 8",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
[]cpuset.CPUSet{
cpuset.NewCPUSet(0),
cpuset.NewCPUSet(1),
cpuset.NewCPUSet(2),
cpuset.NewCPUSet(3),
cpuset.NewCPUSet(4),
cpuset.NewCPUSet(5),
cpuset.NewCPUSet(6),
cpuset.NewCPUSet(7),
},
8,
true,
false,
},
}
for _, tc := range testCases {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs)
totalTaken := 0
for _, cpus := range tc.takeCPUs {
acc.take(cpus)
totalTaken += cpus.Size()
}
if tc.expectSatisfied != acc.isSatisfied() {
t.Errorf("[%s] expected acc.isSatisfied() to be %t", tc.description, tc.expectSatisfied)
}
if tc.expectFailed != acc.isFailed() {
t.Errorf("[%s] expected acc.isFailed() to be %t", tc.description, tc.expectFailed)
}
for _, cpus := range tc.takeCPUs {
availableCPUs := acc.details.CPUs()
if cpus.Intersection(availableCPUs).Size() > 0 {
t.Errorf("[%s] expected intersection of taken cpus [%s] and acc.details.CPUs() [%s] to be empty", tc.description, cpus, availableCPUs)
}
if !cpus.IsSubsetOf(acc.result) {
t.Errorf("[%s] expected [%s] to be a subset of acc.result [%s]", tc.description, cpus, acc.result)
}
}
expNumCPUsNeeded := tc.numCPUs - totalTaken
if acc.numCPUsNeeded != expNumCPUsNeeded {
t.Errorf("[%s] expected acc.numCPUsNeeded to be %d (got %d)", tc.description, expNumCPUsNeeded, acc.numCPUsNeeded)
}
}
}
func TestTakeByTopology(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
availableCPUs cpuset.CPUSet
numCPUs int
expErr string
expResult cpuset.CPUSet
}{
{
"take more cpus than are available from single socket with HT",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 2, 4, 6),
5,
"not enough cpus available to satisfy request",
cpuset.NewCPUSet(),
},
{
"take zero cpus from single socket with HT",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
0,
"",
cpuset.NewCPUSet(),
},
{
"take one cpu from single socket with HT",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
1,
"",
cpuset.NewCPUSet(0),
},
{
"take one cpu from single socket with HT, some cpus are taken",
topoSingleSocketHT,
cpuset.NewCPUSet(1, 3, 5, 6, 7),
1,
"",
cpuset.NewCPUSet(6),
},
{
"take two cpus from single socket with HT",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
2,
"",
cpuset.NewCPUSet(0, 4),
},
{
"take all cpus from single socket with HT",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
8,
"",
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
},
{
"take two cpus from single socket with HT, only one core totally free",
topoSingleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 6),
2,
"",
cpuset.NewCPUSet(2, 6),
},
{
"take three cpus from dual socket with HT - core from Socket 0",
topoDualSocketHT,
cpuset.NewCPUSet(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1,
"",
cpuset.NewCPUSet(2),
},
{
"take a socket of cpus from dual socket with HT",
topoDualSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
6,
"",
cpuset.NewCPUSet(0, 2, 4, 6, 8, 10),
},
}
for _, tc := range testCases {
result, err := takeByTopology(tc.topo, tc.availableCPUs, tc.numCPUs)
if tc.expErr != "" && err.Error() != tc.expErr {
t.Errorf("expected error to be [%v] but it was [%v] in test \"%s\"", tc.expErr, err, tc.description)
}
if !result.Equals(tc.expResult) {
t.Errorf("expected result [%s] to equal [%s] in test \"%s\"", result, tc.expResult, tc.description)
}
}
}

View File

@@ -17,10 +17,21 @@ limitations under the License.
package cpumanager package cpumanager
import ( import (
"fmt"
"math"
"sync"
"time"
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
) )
@@ -51,3 +62,215 @@ type Manager interface {
// State returns a read-only interface to the internal CPU manager state. // State returns a read-only interface to the internal CPU manager state.
State() state.Reader State() state.Reader
} }
type manager struct {
sync.Mutex
policy Policy
// reconcilePeriod is the duration between calls to reconcileState.
reconcilePeriod time.Duration
// state allows pluggable CPU assignment policies while sharing a common
// representation of state for the system to inspect and reconcile.
state state.State
// containerRuntime is the container runtime service interface needed
// to make UpdateContainerResources() calls against the containers.
containerRuntime runtimeService
// activePods is a method for listing active pods on the node
// so all the containers can be updated in the reconciliation loop.
activePods ActivePodsFunc
// podStatusProvider provides a method for obtaining pod statuses
// and the containerID of their containers
podStatusProvider status.PodStatusProvider
machineInfo *cadvisorapi.MachineInfo
nodeAllocatableReservation v1.ResourceList
}
var _ Manager = &manager{}
// NewManager creates new cpu manager based on provided policy
func NewManager(
cpuPolicyName string,
reconcilePeriod time.Duration,
machineInfo *cadvisorapi.MachineInfo,
nodeAllocatableReservation v1.ResourceList,
) (Manager, error) {
var policy Policy
switch policyName(cpuPolicyName) {
case PolicyNone:
policy = NewNonePolicy()
case PolicyStatic:
topo, err := topology.Discover(machineInfo)
if err != nil {
return nil, err
}
glog.Infof("[cpumanager] detected CPU topology: %v", topo)
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
if !ok {
// The static policy cannot initialize without this information. Panic!
panic("[cpumanager] unable to determine reserved CPU resources for static policy")
}
if reservedCPUs.IsZero() {
// Panic!
//
// The static policy requires this to be nonzero. Zero CPU reservation
// would allow the shared pool to be completely exhausted. At that point
// either we would violate our guarantee of exclusivity or need to evict
// any pod that has at least one container that requires zero CPUs.
// See the comments in policy_static.go for more details.
panic("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
}
// Take the ceiling of the reservation, since fractional CPUs cannot be
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy = NewStaticPolicy(topo, numReservedCPUs)
default:
glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone)
policy = NewNonePolicy()
}
manager := &manager{
policy: policy,
reconcilePeriod: reconcilePeriod,
state: state.NewMemoryState(),
machineInfo: machineInfo,
nodeAllocatableReservation: nodeAllocatableReservation,
}
return manager, nil
}
func (m *manager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) {
glog.Infof("[cpumanger] starting with %s policy", m.policy.Name())
glog.Infof("[cpumanger] reconciling every %v", m.reconcilePeriod)
m.activePods = activePods
m.podStatusProvider = podStatusProvider
m.containerRuntime = containerRuntime
m.policy.Start(m.state)
if m.policy.Name() == string(PolicyNone) {
return
}
go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
}
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
m.Lock()
err := m.policy.AddContainer(m.state, p, c, containerID)
if err != nil {
glog.Errorf("[cpumanager] AddContainer error: %v", err)
m.Unlock()
return err
}
cpus := m.state.GetCPUSetOrDefault(containerID)
m.Unlock()
err = m.updateContainerCPUSet(containerID, cpus)
if err != nil {
glog.Errorf("[cpumanager] AddContainer error: %v", err)
return err
}
return nil
}
func (m *manager) RemoveContainer(containerID string) error {
m.Lock()
defer m.Unlock()
err := m.policy.RemoveContainer(m.state, containerID)
if err != nil {
glog.Errorf("[cpumanager] RemoveContainer error: %v", err)
return err
}
return nil
}
func (m *manager) State() state.Reader {
return m.state
}
type reconciledContainer struct {
podName string
containerName string
containerID string
}
func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
success = []reconciledContainer{}
failure = []reconciledContainer{}
for _, pod := range m.activePods() {
allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
for _, container := range allContainers {
status, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
glog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s, container: %s)", pod.Name, container.Name)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
break
}
containerID, err := findContainerIDByName(&status, container.Name)
if err != nil {
glog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
continue
}
cset := m.state.GetCPUSetOrDefault(containerID)
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
glog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
continue
}
glog.Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset)
err = m.updateContainerCPUSet(containerID, cset)
if err != nil {
glog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
continue
}
success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
}
}
return success, failure
}
func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
for _, container := range status.ContainerStatuses {
if container.Name == name && container.ContainerID != "" {
cid := &kubecontainer.ContainerID{}
err := cid.ParseString(container.ContainerID)
if err != nil {
return "", err
}
return cid.ID, nil
}
}
return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
}
func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
return m.containerRuntime.UpdateContainerResources(
containerID,
&runtimeapi.LinuxContainerResources{
CpusetCpus: cpus.String(),
})
}

View File

@@ -0,0 +1,452 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
type mockState struct {
assignments map[string]cpuset.CPUSet
defaultCPUSet cpuset.CPUSet
}
func (s *mockState) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
res, ok := s.assignments[containerID]
return res.Clone(), ok
}
func (s *mockState) GetDefaultCPUSet() cpuset.CPUSet {
return s.defaultCPUSet.Clone()
}
func (s *mockState) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
if res, ok := s.GetCPUSet(containerID); ok {
return res
}
return s.GetDefaultCPUSet()
}
func (s *mockState) SetCPUSet(containerID string, cset cpuset.CPUSet) {
s.assignments[containerID] = cset
}
func (s *mockState) SetDefaultCPUSet(cset cpuset.CPUSet) {
s.defaultCPUSet = cset
}
func (s *mockState) Delete(containerID string) {
delete(s.assignments, containerID)
}
type mockPolicy struct {
err error
}
func (p *mockPolicy) Name() string {
return "mock"
}
func (p *mockPolicy) Start(s state.State) {
}
func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
return p.err
}
func (p *mockPolicy) RemoveContainer(s state.State, containerID string) error {
return p.err
}
type mockRuntimeService struct {
err error
}
func (rt mockRuntimeService) UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error {
return rt.err
}
type mockPodStatusProvider struct {
podStatus v1.PodStatus
found bool
}
func (psp mockPodStatusProvider) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
return psp.podStatus, psp.found
}
type mockPodKiller struct {
killedPods []*v1.Pod
}
func (f *mockPodKiller) killPodNow(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
f.killedPods = append(f.killedPods, pod)
return nil
}
type mockPodProvider struct {
pods []*v1.Pod
}
func (f *mockPodProvider) getPods() []*v1.Pod {
return f.pods
}
type mockRecorder struct{}
func (r *mockRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
}
func makePod(cpuRequest, cpuLimit string) *v1.Pod {
return &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpuRequest),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpuLimit),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
},
},
},
},
}
}
// CpuAllocatable must be <= CpuCapacity
func prepareCPUNodeStatus(CPUCapacity, CPUAllocatable string) v1.NodeStatus {
nodestatus := v1.NodeStatus{
Capacity: make(v1.ResourceList, 1),
Allocatable: make(v1.ResourceList, 1),
}
cpucap, _ := resource.ParseQuantity(CPUCapacity)
cpuall, _ := resource.ParseQuantity(CPUAllocatable)
nodestatus.Capacity[v1.ResourceCPU] = cpucap
nodestatus.Allocatable[v1.ResourceCPU] = cpuall
return nodestatus
}
func TestCPUManagerAdd(t *testing.T) {
testCases := []struct {
description string
regErr error
updateErr error
expErr error
}{
{
description: "cpu manager add - no error",
regErr: nil,
updateErr: nil,
expErr: nil,
},
{
description: "cpu manager add - policy add container error",
regErr: fmt.Errorf("fake reg error"),
updateErr: nil,
expErr: fmt.Errorf("fake reg error"),
},
{
description: "cpu manager add - container update error",
regErr: nil,
updateErr: fmt.Errorf("fake update error"),
expErr: fmt.Errorf("fake update error"),
},
}
for _, testCase := range testCases {
mgr := &manager{
policy: &mockPolicy{
err: testCase.regErr,
},
state: &mockState{
assignments: map[string]cpuset.CPUSet{},
defaultCPUSet: cpuset.NewCPUSet(),
},
containerRuntime: mockRuntimeService{
err: testCase.updateErr,
},
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
pod := makePod("1000", "1000")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
}
}
func TestCPUManagerRemove(t *testing.T) {
mgr := &manager{
policy: &mockPolicy{
err: nil,
},
state: &mockState{
assignments: map[string]cpuset.CPUSet{},
defaultCPUSet: cpuset.NewCPUSet(),
},
containerRuntime: mockRuntimeService{},
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
err := mgr.RemoveContainer("fakeID")
if err != nil {
t.Errorf("CPU Manager RemoveContainer() error. expected error to be nil but got: %v", err)
}
mgr = &manager{
policy: &mockPolicy{
err: fmt.Errorf("fake error"),
},
state: state.NewMemoryState(),
containerRuntime: mockRuntimeService{},
activePods: func() []*v1.Pod { return nil },
podStatusProvider: mockPodStatusProvider{},
}
err = mgr.RemoveContainer("fakeID")
if !reflect.DeepEqual(err, fmt.Errorf("fake error")) {
t.Errorf("CPU Manager RemoveContainer() error. expected error: fake error but got: %v", err)
}
}
func TestReconcileState(t *testing.T) {
testCases := []struct {
description string
activePods []*v1.Pod
pspPS v1.PodStatus
pspFound bool
stAssignments map[string]cpuset.CPUSet
stDefaultCPUSet cpuset.CPUSet
updateErr error
expectFailedContainerName string
}{
{
description: "cpu manager reconclie - no error",
activePods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
},
},
},
},
},
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
},
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{
"fakeID": cpuset.NewCPUSet(1, 2),
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: nil,
expectFailedContainerName: "",
},
{
description: "cpu manager reconclie - pod status not found",
activePods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
},
},
},
},
},
pspPS: v1.PodStatus{},
pspFound: false,
stAssignments: map[string]cpuset.CPUSet{},
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectFailedContainerName: "fakeName",
},
{
description: "cpu manager reconclie - container id not found",
activePods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
},
},
},
},
},
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName1",
ContainerID: "docker://fakeID",
},
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{},
stDefaultCPUSet: cpuset.NewCPUSet(),
updateErr: nil,
expectFailedContainerName: "fakeName",
},
{
description: "cpu manager reconclie - cpuset is empty",
activePods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
},
},
},
},
},
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
},
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{
"fakeID": cpuset.NewCPUSet(),
},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
updateErr: nil,
expectFailedContainerName: "fakeName",
},
{
description: "cpu manager reconclie - container update error",
activePods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePodName",
UID: "fakeUID",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fakeName",
},
},
},
},
},
pspPS: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "fakeName",
ContainerID: "docker://fakeID",
},
},
},
pspFound: true,
stAssignments: map[string]cpuset.CPUSet{
"fakeID": cpuset.NewCPUSet(1, 2),
},
stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7),
updateErr: fmt.Errorf("fake container update error"),
expectFailedContainerName: "fakeName",
},
}
for _, testCase := range testCases {
mgr := &manager{
policy: &mockPolicy{
err: nil,
},
state: &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
},
containerRuntime: mockRuntimeService{
err: testCase.updateErr,
},
activePods: func() []*v1.Pod {
return testCase.activePods
},
podStatusProvider: mockPodStatusProvider{
podStatus: testCase.pspPS,
found: testCase.pspFound,
},
}
_, failure := mgr.reconcileState()
if testCase.expectFailedContainerName != "" {
// Search failed reconciled containers for the supplied name.
foundFailedContainer := false
for _, reconciled := range failure {
if reconciled.containerName == testCase.expectFailedContainerName {
foundFailedContainer = true
break
}
}
if !foundFailedContainer {
t.Errorf("Expected reconciliation failure for container: %s", testCase.expectFailedContainerName)
}
}
}
}

View File

@@ -0,0 +1,58 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/status"
)
type fakeManager struct {
state state.State
}
func (m *fakeManager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) {
glog.Info("[fake cpumanager] Start()")
}
func (m *fakeManager) Policy() Policy {
glog.Info("[fake cpumanager] Policy()")
return NewNonePolicy()
}
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
glog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
return nil
}
func (m *fakeManager) RemoveContainer(containerID string) error {
glog.Infof("[fake cpumanager] RemoveContainer (container id: %s)", containerID)
return nil
}
func (m *fakeManager) State() state.Reader {
return m.state
}
// NewFakeManager creates empty/fake cpu manager
func NewFakeManager() Manager {
return &fakeManager{
state: state.NewMemoryState(),
}
}

View File

@@ -0,0 +1,51 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
)
type nonePolicy struct{}
var _ Policy = &nonePolicy{}
// PolicyNone name of none policy
const PolicyNone policyName = "none"
// NewNonePolicy returns a cupset manager policy that does nothing
func NewNonePolicy() Policy {
return &nonePolicy{}
}
func (p *nonePolicy) Name() string {
return string(PolicyNone)
}
func (p *nonePolicy) Start(s state.State) {
glog.Info("[cpumanager] none policy: Start")
}
func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
return nil
}
func (p *nonePolicy) RemoveContainer(s state.State, containerID string) error {
return nil
}

View File

@@ -0,0 +1,64 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"testing"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
func TestNonePolicyName(t *testing.T) {
policy := &nonePolicy{}
policyName := policy.Name()
if policyName != "none" {
t.Errorf("NonePolicy Name() error. expected: none, returned: %v",
policyName)
}
}
func TestNonePolicyAdd(t *testing.T) {
policy := &nonePolicy{}
st := &mockState{
assignments: map[string]cpuset.CPUSet{},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
}
testPod := makePod("1000m", "1000m")
container := &testPod.Spec.Containers[0]
err := policy.AddContainer(st, testPod, container, "fakeID")
if err != nil {
t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err)
}
}
func TestNonePolicyRemove(t *testing.T) {
policy := &nonePolicy{}
st := &mockState{
assignments: map[string]cpuset.CPUSet{},
defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
}
err := policy.RemoveContainer(st, "fakeID")
if err != nil {
t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err)
}
}

View File

@@ -0,0 +1,172 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
// PolicyStatic is the name of the static policy
const PolicyStatic policyName = "static"
var _ Policy = &staticPolicy{}
// staticPolicy is a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
//
// This policy allocates CPUs exclusively for a container if all the following
// conditions are met:
//
// - The pod QoS class is Guaranteed.
// - The CPU request is a positive integer.
//
// The static policy maintains the following sets of logical CPUs:
//
// - SHARED: Burstable, BestEffort, and non-integral Guaranteed containers
// run here. Initially this contains all CPU IDs on the system. As
// exclusive allocations are created and destroyed, this CPU set shrinks
// and grows, accordingly. This is stored in the state as the default
// CPU set.
//
// - RESERVED: A subset of the shared pool which is not exclusively
// allocatable. The membership of this pool is static for the lifetime of
// the Kubelet. The size of the reserved pool is
// ceil(systemreserved.cpu + kubereserved.cpu).
// Reserved CPUs are taken topologically starting with lowest-indexed
// physical core, as reported by cAdvisor.
//
// - ASSIGNABLE: Equal to SHARED - RESERVED. Exclusive CPUs are allocated
// from this pool.
//
// - EXCLUSIVE ALLOCATIONS: CPU sets assigned exclusively to one container.
// These are stored as explicit assignments in the state.
//
// When an exclusive allocation is made, the static policy also updates the
// default cpuset in the state abstraction. The CPU manager's periodic
// reconcile loop takes care of rewriting the cpuset in cgroupfs for any
// containers that may be running in the shared pool. For this reason,
// applications running within exclusively-allocated containers must tolerate
// potentially sharing their allocated CPUs for up to the CPU manager
// reconcile period.
type staticPolicy struct {
// cpu socket topology
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reserved cpuset.CPUSet
}
// Ensure staticPolicy implements Policy interface
var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy {
allCPUs := topology.CPUDetails.CPUs()
// takeByTopology allocates CPUs associated with low-numbered cores from
// allCPUs.
//
// For example: Given a system with 8 CPUs available and HT enabled,
// if numReservedCPUs=2, then reserved={0,4}
reserved, _ := takeByTopology(topology, allCPUs, numReservedCPUs)
if reserved.Size() != numReservedCPUs {
panic(fmt.Sprintf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs))
}
glog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
return &staticPolicy{
topology: topology,
reserved: reserved,
}
}
func (p *staticPolicy) Name() string {
return string(PolicyStatic)
}
func (p *staticPolicy) Start(s state.State) {
// Configure the shared pool to include all detected CPU IDs.
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
}
// assignableCPUs returns the set of unassigned CPUs minus the reserved set.
func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
glog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 {
// container belongs in an exclusively allocated pool
cpuset, err := p.allocateCPUs(s, numCPUs)
if err != nil {
glog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)
return err
}
s.SetCPUSet(containerID, cpuset)
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
}
func (p *staticPolicy) RemoveContainer(s state.State, containerID string) error {
glog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID)
if toRelease, ok := s.GetCPUSet(containerID); ok {
s.Delete(containerID)
// Mutate the shared pool, adding released cpus.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
}
return nil
}
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) {
glog.Infof("[cpumanager] allocateCpus: (numCPUs: %d)", numCPUs)
result, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs)
if err != nil {
return cpuset.NewCPUSet(), err
}
// Remove allocated CPUs from the shared CPUSet.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
glog.Infof("[cpumanager] allocateCPUs: returning \"%v\"", result)
return result, nil
}
func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return 0
}
cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() {
return 0
}
// Safe downcast to do for all systems with < 2.1 billion CPUs.
// Per the language spec, `int` is guaranteed to be at least 32 bits wide.
// https://golang.org/ref/spec#Numeric_types
return int(cpuQuantity.Value())
}

View File

@@ -0,0 +1,341 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
type staticPolicyTest struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
containerID string
stAssignments map[string]cpuset.CPUSet
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
expErr error
expCPUAlloc bool
expCSet cpuset.CPUSet
}
func TestStaticPolicyName(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1)
policyName := policy.Name()
if policyName != "static" {
t.Errorf("StaticPolicy Name() error. expected: static, returned: %v",
policyName)
}
}
func TestStaticPolicyStart(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1).(*staticPolicy)
st := &mockState{
assignments: map[string]cpuset.CPUSet{},
defaultCPUSet: cpuset.NewCPUSet(),
}
policy.Start(st)
for cpuid := 1; cpuid < policy.topology.NumCPUs; cpuid++ {
if !st.defaultCPUSet.Contains(cpuid) {
t.Errorf("StaticPolicy Start() error. expected cpuid %d to be present in defaultCPUSet", cpuid)
}
}
}
func TestStaticPolicyAdd(t *testing.T) {
testCases := []staticPolicyTest{
{
description: "GuPodSingleCore, SingleSocketHT, ExpectError",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID2",
stAssignments: map[string]cpuset.CPUSet{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("8000m", "8000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
{
description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID2",
stAssignments: map[string]cpuset.CPUSet{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "1000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core
},
{
description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(2, 3, 6, 7),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
pod: makePod("2000m", "2000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 5),
},
{
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocket",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(2),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11),
pod: makePod("6000m", "6000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 3, 5, 7, 9, 11),
},
{
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocThreeCores",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(1, 5),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 2, 3, 4, 6, 7, 8, 9, 10, 11),
pod: makePod("6000m", "6000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(2, 3, 4, 8, 9, 10),
},
{
description: "GuPodMultipleCores, DualSocketNoHT, ExpectAllocOneSocket",
topo: topoDualSocketNoHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7),
pod: makePod("4000m", "4000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4, 5, 6, 7),
},
{
description: "GuPodMultipleCores, DualSocketNoHT, ExpectAllocFourCores",
topo: topoDualSocketNoHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(4, 5),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 6, 7),
pod: makePod("4000m", "4000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 3, 6, 7),
},
{
description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocketOneCore",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID3",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(2),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11),
pod: makePod("8000m", "8000m"),
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(1, 3, 4, 5, 7, 9, 10, 11),
},
{
description: "NonGuPod, SingleSocketHT, NoAlloc",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("1000m", "2000m"),
expErr: nil,
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
{
description: "GuPodNonIntegerCore, SingleSocketHT, NoAlloc",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID4",
stAssignments: map[string]cpuset.CPUSet{},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7),
pod: makePod("977m", "977m"),
expErr: nil,
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
{
description: "GuPodMultipleCores, SingleSocketHT, NoAllocExpectError",
topo: topoSingleSocketHT,
numReservedCPUs: 1,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 7),
pod: makePod("2000m", "2000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
{
description: "GuPodMultipleCores, DualSocketHT, NoAllocExpectError",
topo: topoDualSocketHT,
numReservedCPUs: 1,
containerID: "fakeID5",
stAssignments: map[string]cpuset.CPUSet{
"fakeID100": cpuset.NewCPUSet(1, 2, 3),
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 4, 5, 6, 7, 8, 9, 10, 11),
pod: makePod("10000m", "10000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
},
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
container := &testCase.pod.Spec.Containers[0]
err := policy.AddContainer(st, testCase.pod, container, testCase.containerID)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
if testCase.expCPUAlloc {
cset, found := st.assignments[testCase.containerID]
if !found {
t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v but got %v",
testCase.description, testCase.expCSet, cset)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
testCase.description, cset, st.defaultCPUSet)
}
}
if !testCase.expCPUAlloc {
_, found := st.assignments[testCase.containerID]
if found {
t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container id %v to be present in assignments %v",
testCase.description, testCase.containerID, st.assignments)
}
}
}
}
func TestStaticPolicyRemove(t *testing.T) {
testCases := []staticPolicyTest{
{
description: "SingleSocketHT, DeAllocOneContainer",
topo: topoSingleSocketHT,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
"fakeID1": cpuset.NewCPUSet(1, 2, 3),
},
stDefaultCPUSet: cpuset.NewCPUSet(4, 5, 6, 7),
expCSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
},
{
description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty",
topo: topoSingleSocketHT,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
"fakeID1": cpuset.NewCPUSet(1, 2, 3),
"fakeID2": cpuset.NewCPUSet(4, 5, 6, 7),
},
stDefaultCPUSet: cpuset.NewCPUSet(),
expCSet: cpuset.NewCPUSet(1, 2, 3),
},
{
description: "SingleSocketHT, DeAllocTwoContainer",
topo: topoSingleSocketHT,
containerID: "fakeID1",
stAssignments: map[string]cpuset.CPUSet{
"fakeID1": cpuset.NewCPUSet(1, 3, 5),
"fakeID2": cpuset.NewCPUSet(2, 4),
},
stDefaultCPUSet: cpuset.NewCPUSet(6, 7),
expCSet: cpuset.NewCPUSet(1, 3, 5, 6, 7),
},
{
description: "SingleSocketHT, NoDeAlloc",
topo: topoSingleSocketHT,
containerID: "fakeID2",
stAssignments: map[string]cpuset.CPUSet{
"fakeID1": cpuset.NewCPUSet(1, 3, 5),
},
stDefaultCPUSet: cpuset.NewCPUSet(2, 4, 6, 7),
expCSet: cpuset.NewCPUSet(2, 4, 6, 7),
},
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
policy.RemoveContainer(st, testCase.containerID)
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSet) {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v",
testCase.description, testCase.expCSet, st.defaultCPUSet)
}
if _, found := st.assignments[testCase.containerID]; found {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected containerID %v not be in assignments %v",
testCase.description, testCase.containerID, st.assignments)
}
}
}

View File

@@ -0,0 +1,75 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
)
var (
topoSingleSocketHT = &topology.CPUTopology{
NumCPUs: 8,
NumSockets: 1,
NumCores: 4,
CPUDetails: map[int]topology.CPUInfo{
0: {CoreID: 0, SocketID: 0},
1: {CoreID: 1, SocketID: 0},
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 0},
4: {CoreID: 0, SocketID: 0},
5: {CoreID: 1, SocketID: 0},
6: {CoreID: 2, SocketID: 0},
7: {CoreID: 3, SocketID: 0},
},
}
topoDualSocketHT = &topology.CPUTopology{
NumCPUs: 12,
NumSockets: 2,
NumCores: 6,
CPUDetails: map[int]topology.CPUInfo{
0: {CoreID: 0, SocketID: 0},
1: {CoreID: 1, SocketID: 1},
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 1},
4: {CoreID: 4, SocketID: 0},
5: {CoreID: 5, SocketID: 1},
6: {CoreID: 0, SocketID: 0},
7: {CoreID: 1, SocketID: 1},
8: {CoreID: 2, SocketID: 0},
9: {CoreID: 3, SocketID: 1},
10: {CoreID: 4, SocketID: 0},
11: {CoreID: 5, SocketID: 1},
},
}
topoDualSocketNoHT = &topology.CPUTopology{
NumCPUs: 8,
NumSockets: 2,
NumCores: 8,
CPUDetails: map[int]topology.CPUInfo{
0: {CoreID: 0, SocketID: 0},
1: {CoreID: 1, SocketID: 0},
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 0},
4: {CoreID: 4, SocketID: 1},
5: {CoreID: 5, SocketID: 1},
6: {CoreID: 6, SocketID: 1},
7: {CoreID: 7, SocketID: 1},
},
}
)

View File

@@ -2,9 +2,15 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["state.go"], srcs = [
"state.go",
"state_mem.go",
],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = ["//pkg/kubelet/cm/cpuset:go_default_library"], deps = [
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
) )
filegroup( filegroup(

View File

@@ -0,0 +1,90 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
type stateMemory struct {
sync.RWMutex
assignments map[string]cpuset.CPUSet
defaultCPUSet cpuset.CPUSet
}
var _ State = &stateMemory{}
// NewMemoryState creates new State for keeping track of cpu/pod assignment
func NewMemoryState() State {
glog.Infof("[cpumanager] initializing new in-memory state store")
return &stateMemory{
assignments: map[string]cpuset.CPUSet{},
defaultCPUSet: cpuset.NewCPUSet(),
}
}
func (s *stateMemory) GetCPUSet(containerID string) (cpuset.CPUSet, bool) {
s.RLock()
defer s.RUnlock()
res, ok := s.assignments[containerID]
return res.Clone(), ok
}
func (s *stateMemory) GetDefaultCPUSet() cpuset.CPUSet {
s.RLock()
defer s.RUnlock()
return s.defaultCPUSet.Clone()
}
func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet {
s.RLock()
defer s.RUnlock()
if res, ok := s.GetCPUSet(containerID); ok {
return res
}
return s.GetDefaultCPUSet()
}
func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) {
s.Lock()
defer s.Unlock()
s.assignments[containerID] = cset
glog.Infof("[cpumanager] updated desired cpuset (container id: %s, cpuset: \"%s\")", containerID, cset)
}
func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) {
s.Lock()
defer s.Unlock()
s.defaultCPUSet = cset
glog.Infof("[cpumanager] updated default cpuset: \"%s\"", cset)
}
func (s *stateMemory) Delete(containerID string) {
s.Lock()
defer s.Unlock()
delete(s.assignments, containerID)
glog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID)
}

View File

@@ -1,9 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["doc.go"], srcs = [
"doc.go",
"topology.go",
],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
],
) )
filegroup( filegroup(
@@ -19,3 +26,10 @@ filegroup(
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )
go_test(
name = "go_default_test",
srcs = ["topology_test.go"],
library = ":go_default_library",
deps = ["//vendor/github.com/google/cadvisor/info/v1:go_default_library"],
)

View File

@@ -0,0 +1,169 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topology
import (
"fmt"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
// CPUDetails is a map from CPU ID to Core ID and Socket ID.
type CPUDetails map[int]CPUInfo
// CPUTopology contains details of node cpu, where :
// CPU - logical CPU, cadvisor - thread
// Core - physical CPU, cadvisor - Core
// Socket - socket, cadvisor - Node
type CPUTopology struct {
NumCPUs int
NumCores int
NumSockets int
CPUDetails CPUDetails
}
// CPUsPerCore returns the number of logical CPUs are associated with
// each core.
func (topo *CPUTopology) CPUsPerCore() int {
if topo.NumCores == 0 {
return 0
}
return topo.NumCPUs / topo.NumCores
}
// CPUsPerSocket returns the number of logical CPUs are associated with
// each socket.
func (topo *CPUTopology) CPUsPerSocket() int {
if topo.NumSockets == 0 {
return 0
}
return topo.NumCPUs / topo.NumSockets
}
// CPUInfo contains the socket and core IDs associated with a CPU.
type CPUInfo struct {
SocketID int
CoreID int
}
// KeepOnly returns a new CPUDetails object with only the supplied cpus.
func (d CPUDetails) KeepOnly(cpus cpuset.CPUSet) CPUDetails {
result := CPUDetails{}
for cpu, info := range d {
if cpus.Contains(cpu) {
result[cpu] = info
}
}
return result
}
// Sockets returns all of the socket IDs associated with the CPUs in this
// CPUDetails.
func (d CPUDetails) Sockets() cpuset.CPUSet {
b := cpuset.NewBuilder()
for _, info := range d {
b.Add(info.SocketID)
}
return b.Result()
}
// CPUsInSocket returns all of the logical CPU IDs associated with the
// given socket ID in this CPUDetails.
func (d CPUDetails) CPUsInSocket(id int) cpuset.CPUSet {
b := cpuset.NewBuilder()
for cpu, info := range d {
if info.SocketID == id {
b.Add(cpu)
}
}
return b.Result()
}
// Cores returns all of the core IDs associated with the CPUs in this
// CPUDetails.
func (d CPUDetails) Cores() cpuset.CPUSet {
b := cpuset.NewBuilder()
for _, info := range d {
b.Add(info.CoreID)
}
return b.Result()
}
// CoresInSocket returns all of the core IDs associated with the given
// socket ID in this CPUDetails.
func (d CPUDetails) CoresInSocket(id int) cpuset.CPUSet {
b := cpuset.NewBuilder()
for _, info := range d {
if info.SocketID == id {
b.Add(info.CoreID)
}
}
return b.Result()
}
// CPUs returns all of the logical CPU IDs in this CPUDetails.
func (d CPUDetails) CPUs() cpuset.CPUSet {
b := cpuset.NewBuilder()
for cpuID := range d {
b.Add(cpuID)
}
return b.Result()
}
// CPUsInCore returns all of the logical CPU IDs associated with the
// given core ID in this CPUDetails.
func (d CPUDetails) CPUsInCore(id int) cpuset.CPUSet {
b := cpuset.NewBuilder()
for cpu, info := range d {
if info.CoreID == id {
b.Add(cpu)
}
}
return b.Result()
}
// Discover returns CPUTopology based on cadvisor node info
func Discover(machineInfo *cadvisorapi.MachineInfo) (*CPUTopology, error) {
if machineInfo.NumCores == 0 {
return nil, fmt.Errorf("could not detect number of cpus")
}
CPUDetails := CPUDetails{}
numCPUs := machineInfo.NumCores
numPhysicalCores := 0
for _, socket := range machineInfo.Topology {
numPhysicalCores += len(socket.Cores)
for _, core := range socket.Cores {
for _, cpu := range core.Threads {
CPUDetails[cpu] = CPUInfo{
CoreID: core.Id,
SocketID: socket.Id,
}
}
}
}
return &CPUTopology{
NumCPUs: numCPUs,
NumSockets: len(machineInfo.Topology),
NumCores: numPhysicalCores,
CPUDetails: CPUDetails,
}, nil
}

View File

@@ -0,0 +1,123 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topology
import (
"reflect"
"testing"
cadvisorapi "github.com/google/cadvisor/info/v1"
)
func Test_Discover(t *testing.T) {
tests := []struct {
name string
args *cadvisorapi.MachineInfo
want *CPUTopology
wantErr bool
}{
{
name: "FailNumCores",
args: &cadvisorapi.MachineInfo{
NumCores: 0,
},
want: &CPUTopology{},
wantErr: true,
},
{
name: "OneSocketHT",
args: &cadvisorapi.MachineInfo{
NumCores: 8,
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0, 4}},
{Id: 1, Threads: []int{1, 5}},
{Id: 2, Threads: []int{2, 6}},
{Id: 3, Threads: []int{3, 7}},
},
},
},
},
want: &CPUTopology{
NumCPUs: 8,
NumSockets: 1,
NumCores: 4,
CPUDetails: map[int]CPUInfo{
0: {CoreID: 0, SocketID: 0},
1: {CoreID: 1, SocketID: 0},
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 0},
4: {CoreID: 0, SocketID: 0},
5: {CoreID: 1, SocketID: 0},
6: {CoreID: 2, SocketID: 0},
7: {CoreID: 3, SocketID: 0},
},
},
wantErr: false,
},
{
name: "DualSocketNoHT",
args: &cadvisorapi.MachineInfo{
NumCores: 4,
Topology: []cadvisorapi.Node{
{Id: 0,
Cores: []cadvisorapi.Core{
{Id: 0, Threads: []int{0}},
{Id: 2, Threads: []int{2}},
},
},
{Id: 1,
Cores: []cadvisorapi.Core{
{Id: 1, Threads: []int{1}},
{Id: 3, Threads: []int{3}},
},
},
},
},
want: &CPUTopology{
NumCPUs: 4,
NumSockets: 2,
NumCores: 4,
CPUDetails: map[int]CPUInfo{
0: {CoreID: 0, SocketID: 0},
1: {CoreID: 1, SocketID: 1},
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 1},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := Discover(tt.args)
if err != nil {
if tt.wantErr {
t.Logf("Discover() expected error = %v", err)
} else {
t.Errorf("Discover() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Discover() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -1,25 +1,16 @@
package(default_visibility = ["//visibility:public"]) load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
licenses(["notice"]) go_library(
name = "go_default_library",
load( srcs = ["cpuset.go"],
"@io_bazel_rules_go//go:def.bzl", visibility = ["//visibility:public"],
"go_library", deps = ["//vendor/github.com/golang/glog:go_default_library"],
"go_test",
) )
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["cpuset_test.go"], srcs = ["cpuset_test.go"],
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"],
)
go_library(
name = "go_default_library",
srcs = ["cpuset.go"],
tags = ["automanaged"],
deps = ["//vendor/github.com/golang/glog:go_default_library"],
) )
filegroup( filegroup(
@@ -33,4 +24,5 @@ filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [":package-srcs"],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"],
) )

View File

@@ -0,0 +1,39 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"k8s.io/api/core/v1"
)
func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle {
return &fakeInternalContainerLifecycle{}
}
type fakeInternalContainerLifecycle struct{}
func (f *fakeInternalContainerLifecycle) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
return nil
}
func (f *fakeInternalContainerLifecycle) PreStopContainer(containerID string) error {
return nil
}
func (f *fakeInternalContainerLifecycle) PostStopContainer(containerID string) error {
return nil
}

View File

@@ -0,0 +1,57 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
)
type InternalContainerLifecycle interface {
PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error
PreStopContainer(containerID string) error
PostStopContainer(containerID string) error
}
// Implements InternalContainerLifecycle interface.
type internalContainerLifecycleImpl struct {
cpuManager cpumanager.Manager
}
func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.AddContainer(pod, container, containerID)
}
return nil
}
func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID)
}
return nil
}
func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID)
}
return nil
}

View File

@@ -622,6 +622,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if err != nil { if err != nil {
return nil, err return nil, err
} }
klet.runtimeService = runtimeService
runtime, err := kuberuntime.NewKubeGenericRuntimeManager( runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder), kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager, klet.livenessManager,
@@ -639,6 +640,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeCfg.CPUCFSQuota, kubeCfg.CPUCFSQuota,
runtimeService, runtimeService,
imageService, imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
) )
if err != nil { if err != nil {
return nil, err return nil, err
@@ -979,6 +981,11 @@ type Kubelet struct {
// Container runtime. // Container runtime.
containerRuntime kubecontainer.Runtime containerRuntime kubecontainer.Runtime
// Container runtime service (needed by container runtime Start()).
// TODO(CD): try to make this available without holding a reference in this
// struct. For example, by adding a getter to generic runtime.
runtimeService internalapi.RuntimeService
// reasonCache caches the failure reason of the last creation of all containers, which is // reasonCache caches the failure reason of the last creation of all containers, which is
// used for generating ContainerStatus. // used for generating ContainerStatus.
reasonCache *ReasonCache reasonCache *ReasonCache
@@ -1243,7 +1250,7 @@ func (kl *Kubelet) initializeModules() error {
return fmt.Errorf("Kubelet failed to get node info: %v", err) return fmt.Errorf("Kubelet failed to get node info: %v", err)
} }
if err := kl.containerManager.Start(node, kl.GetActivePods); err != nil { if err := kl.containerManager.Start(node, kl.GetActivePods, kl.statusManager, kl.runtimeService); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err) return fmt.Errorf("Failed to start ContainerManager %v", err)
} }

View File

@@ -29,6 +29,7 @@ go_library(
"//pkg/credentialprovider:go_default_library", "//pkg/credentialprovider:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library", "//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/images:go_default_library", "//pkg/kubelet/images:go_default_library",

View File

@@ -27,6 +27,7 @@ import (
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/credentialprovider"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
@@ -69,6 +70,7 @@ func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
runtimeService: runtimeService, runtimeService: runtimeService,
imageService: imageService, imageService: imageService,
keyring: keyring, keyring: keyring,
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
} }
typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion) typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion)

View File

@@ -116,6 +116,11 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err)) m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainer return grpc.ErrorDesc(err), ErrCreateContainer
} }
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", err)
return "Internal PreStartContainer hook failed", err
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container") m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")
if ref != nil { if ref != nil {
@@ -575,6 +580,11 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec
glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod) glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)
// Run internal pre-stop lifecycle hook
if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
return err
}
// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it // Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 { if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod) gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
@@ -806,6 +816,11 @@ func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID,
// it will not write container logs anymore in that state. // it will not write container logs anymore in that state.
func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error { func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {
glog.V(4).Infof("Removing container %q", containerID) glog.V(4).Infof("Removing container %q", containerID)
// Call internal container post-stop lifecycle hook.
if err := m.internalLifecycle.PostStopContainer(containerID); err != nil {
return err
}
// Remove the container log. // Remove the container log.
// TODO: Separate log and container lifecycle management. // TODO: Separate log and container lifecycle management.
if err := m.removeContainerLog(containerID); err != nil { if err := m.removeContainerLog(containerID); err != nil {

View File

@@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/credentialprovider"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/images"
@@ -108,6 +109,9 @@ type kubeGenericRuntimeManager struct {
// The directory path for seccomp profiles. // The directory path for seccomp profiles.
seccompProfileRoot string seccompProfileRoot string
// Internal lifecycle event handlers for container resource management.
internalLifecycle cm.InternalContainerLifecycle
} }
type KubeGenericRuntime interface { type KubeGenericRuntime interface {
@@ -134,6 +138,7 @@ func NewKubeGenericRuntimeManager(
cpuCFSQuota bool, cpuCFSQuota bool,
runtimeService internalapi.RuntimeService, runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService, imageService internalapi.ImageManagerService,
internalLifecycle cm.InternalContainerLifecycle,
) (KubeGenericRuntime, error) { ) (KubeGenericRuntime, error) {
kubeRuntimeManager := &kubeGenericRuntimeManager{ kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder, recorder: recorder,
@@ -147,6 +152,7 @@ func NewKubeGenericRuntimeManager(
runtimeService: newInstrumentedRuntimeService(runtimeService), runtimeService: newInstrumentedRuntimeService(runtimeService),
imageService: newInstrumentedImageManagerService(imageService), imageService: newInstrumentedImageManagerService(imageService),
keyring: credentialprovider.NewDockerKeyring(), keyring: credentialprovider.NewDockerKeyring(),
internalLifecycle: internalLifecycle,
} }
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion) typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)