Add support for CFS quota in kubelet

This commit is contained in:
derekwaynecarr
2015-09-01 09:27:01 -04:00
parent b6f2f396ba
commit 5dc74e8dbf
8 changed files with 101 additions and 6 deletions

View File

@@ -124,7 +124,7 @@ type KubeletServer struct {
MaxPods int MaxPods int
DockerExecHandlerName string DockerExecHandlerName string
ResolverConfig string ResolverConfig string
CPUCFSQuota bool
// Flags intended for testing // Flags intended for testing
// Crash immediately, rather than eating panics. // Crash immediately, rather than eating panics.
@@ -189,6 +189,7 @@ func NewKubeletServer() *KubeletServer {
SystemContainer: "", SystemContainer: "",
ConfigureCBR0: false, ConfigureCBR0: false,
DockerExecHandlerName: "native", DockerExecHandlerName: "native",
CPUCFSQuota: false,
} }
} }
@@ -255,6 +256,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.DockerExecHandlerName, "docker-exec-handler", s.DockerExecHandlerName, "Handler to use when executing a command in a container. Valid values are 'native' and 'nsenter'. Defaults to 'native'.") fs.StringVar(&s.DockerExecHandlerName, "docker-exec-handler", s.DockerExecHandlerName, "Handler to use when executing a command in a container. Valid values are 'native' and 'nsenter'. Defaults to 'native'.")
fs.StringVar(&s.PodCIDR, "pod-cidr", "", "The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master.") fs.StringVar(&s.PodCIDR, "pod-cidr", "", "The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master.")
fs.StringVar(&s.ResolverConfig, "resolv-conf", kubelet.ResolvConfDefault, "Resolver configuration file used as the basis for the container DNS resolution configuration.") fs.StringVar(&s.ResolverConfig, "resolv-conf", kubelet.ResolvConfDefault, "Resolver configuration file used as the basis for the container DNS resolution configuration.")
fs.BoolVar(&s.CPUCFSQuota, "cpu-cfs-quota", s.CPUCFSQuota, "Enable CPU CFS quota enforcement for containers that specify CPU limits")
// Flags intended for testing, not recommended used in production environments. // Flags intended for testing, not recommended used in production environments.
fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.") fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.")
fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]") fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]")
@@ -362,6 +364,7 @@ func (s *KubeletServer) KubeletConfig() (*KubeletConfig, error) {
MaxPods: s.MaxPods, MaxPods: s.MaxPods,
DockerExecHandler: dockerExecHandler, DockerExecHandler: dockerExecHandler,
ResolverConfig: s.ResolverConfig, ResolverConfig: s.ResolverConfig,
CPUCFSQuota: s.CPUCFSQuota,
}, nil }, nil
} }
@@ -604,6 +607,7 @@ func SimpleKubelet(client *client.Client,
MaxPods: 32, MaxPods: 32,
DockerExecHandler: &dockertools.NativeExecHandler{}, DockerExecHandler: &dockertools.NativeExecHandler{},
ResolverConfig: kubelet.ResolvConfDefault, ResolverConfig: kubelet.ResolvConfDefault,
CPUCFSQuota: false,
} }
return &kcfg return &kcfg
} }
@@ -774,6 +778,7 @@ type KubeletConfig struct {
MaxPods int MaxPods int
DockerExecHandler dockertools.ExecHandler DockerExecHandler dockertools.ExecHandler
ResolverConfig string ResolverConfig string
CPUCFSQuota bool
} }
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
@@ -833,7 +838,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.PodCIDR, kc.PodCIDR,
kc.MaxPods, kc.MaxPods,
kc.DockerExecHandler, kc.DockerExecHandler,
kc.ResolverConfig) kc.ResolverConfig,
kc.CPUCFSQuota)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

View File

@@ -262,6 +262,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
MaxPods: s.MaxPods, MaxPods: s.MaxPods,
DockerExecHandler: dockerExecHandler, DockerExecHandler: dockerExecHandler,
ResolverConfig: s.ResolverConfig, ResolverConfig: s.ResolverConfig,
CPUCFSQuota: s.CPUCFSQuota,
} }
kcfg.NodeName = kcfg.Hostname kcfg.NodeName = kcfg.Hostname
@@ -364,6 +365,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
kc.MaxPods, kc.MaxPods,
kc.DockerExecHandler, kc.DockerExecHandler,
kc.ResolverConfig, kc.ResolverConfig,
kc.CPUCFSQuota,
) )
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

View File

@@ -264,3 +264,4 @@ whitelist-override-label
www-prefix www-prefix
retry_time retry_time
file_content_in_loop file_content_in_loop
cpu-cfs-quota

View File

@@ -50,6 +50,9 @@ const (
minShares = 2 minShares = 2
sharesPerCPU = 1024 sharesPerCPU = 1024
milliCPUToCPU = 1000 milliCPUToCPU = 1000
// 100000 is equivalent to 100ms
quotaPeriod = 100000
) )
// DockerInterface is an abstract interface for testability. It abstracts the interface of docker.Client. // DockerInterface is an abstract interface for testability. It abstracts the interface of docker.Client.
@@ -306,6 +309,28 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
return client return client
} }
// milliCPUToQuota converts milliCPU to CFS quota and period values
func milliCPUToQuota(milliCPU int64) (quota int64, period int64) {
// CFS quota is measured in two values:
// - cfs_period_us=100ms (the amount of time to measure usage across)
// - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
// so in the above example, you are limited to 20% of a single CPU
// for multi-cpu environments, you just scale equivalent amounts
if milliCPU == 0 {
// take the default behavior from docker
return
}
// we set the period to 100ms by default
period = quotaPeriod
// we then convert your milliCPU to a value normalized over a period
quota = (milliCPU * quotaPeriod) / milliCPUToCPU
return
}
func milliCPUToShares(milliCPU int64) int64 { func milliCPUToShares(milliCPU int64) int64 {
if milliCPU == 0 { if milliCPU == 0 {
// Docker converts zero milliCPU to unset, which maps to kernel default // Docker converts zero milliCPU to unset, which maps to kernel default

View File

@@ -737,3 +737,43 @@ func TestMakePortsAndBindings(t *testing.T) {
} }
} }
} }
func TestMilliCPUToQuota(t *testing.T) {
testCases := []struct {
input int64
quota int64
period int64
}{
{
input: int64(0),
quota: int64(0),
period: int64(0),
},
{
input: int64(200),
quota: int64(20000),
period: int64(100000),
},
{
input: int64(500),
quota: int64(50000),
period: int64(100000),
},
{
input: int64(1000),
quota: int64(100000),
period: int64(100000),
},
{
input: int64(1500),
quota: int64(150000),
period: int64(100000),
},
}
for _, testCase := range testCases {
quota, period := milliCPUToQuota(testCase.input)
if quota != testCase.quota || period != testCase.period {
t.Errorf("Input %v, expected quota %v period %v, but got quota %v period %v", testCase.input, testCase.quota, testCase.period, quota, period)
}
}
}

View File

@@ -46,7 +46,7 @@ func NewFakeDockerManager(
fakeProcFs := procfs.NewFakeProcFs() fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOomAdjuster, fakeProcFs) fakeOomAdjuster, fakeProcFs, false)
dm.dockerPuller = &FakeDockerPuller{} dm.dockerPuller = &FakeDockerPuller{}
dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder) dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder)
return dm return dm

View File

@@ -132,6 +132,9 @@ type DockerManager struct {
// Get information from /proc mount. // Get information from /proc mount.
procFs procfs.ProcFsInterface procFs procfs.ProcFsInterface
// If true, enforce container cpu limits with CFS quota support
cpuCFSQuota bool
} }
func NewDockerManager( func NewDockerManager(
@@ -150,7 +153,8 @@ func NewDockerManager(
httpClient kubeletTypes.HttpGetter, httpClient kubeletTypes.HttpGetter,
execHandler ExecHandler, execHandler ExecHandler,
oomAdjuster *oom.OomAdjuster, oomAdjuster *oom.OomAdjuster,
procFs procfs.ProcFsInterface) *DockerManager { procFs procfs.ProcFsInterface,
cpuCFSQuota bool) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker // Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems. // if there are any problems.
dockerRoot := "/var/lib/docker" dockerRoot := "/var/lib/docker"
@@ -201,6 +205,7 @@ func NewDockerManager(
execHandler: execHandler, execHandler: execHandler,
oomAdjuster: oomAdjuster, oomAdjuster: oomAdjuster,
procFs: procFs, procFs: procFs,
cpuCFSQuota: cpuCFSQuota,
} }
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder) dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder)
@@ -673,6 +678,7 @@ func (dm *DockerManager) runContainer(
// of CPU shares. // of CPU shares.
cpuShares = milliCPUToShares(cpuRequest.MilliValue()) cpuShares = milliCPUToShares(cpuRequest.MilliValue())
} }
_, containerName := BuildDockerName(dockerName, container) _, containerName := BuildDockerName(dockerName, container)
dockerOpts := docker.CreateContainerOptions{ dockerOpts := docker.CreateContainerOptions{
Name: containerName, Name: containerName,
@@ -742,6 +748,15 @@ func (dm *DockerManager) runContainer(
MemorySwap: -1, MemorySwap: -1,
CPUShares: cpuShares, CPUShares: cpuShares,
} }
if dm.cpuCFSQuota {
// if cpuLimit.Amount is nil, then the appropriate default value is returned to allow full usage of cpu resource.
cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue())
hc.CPUQuota = cpuQuota
hc.CPUPeriod = cpuPeriod
}
if len(opts.DNS) > 0 { if len(opts.DNS) > 0 {
hc.DNS = opts.DNS hc.DNS = opts.DNS
} }

View File

@@ -164,7 +164,8 @@ func NewMainKubelet(
podCIDR string, podCIDR string,
pods int, pods int,
dockerExecHandler dockertools.ExecHandler, dockerExecHandler dockertools.ExecHandler,
resolverConfig string) (*Kubelet, error) { resolverConfig string,
cpuCFSQuota bool) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} }
@@ -281,6 +282,7 @@ func NewMainKubelet(
pods: pods, pods: pods,
syncLoopMonitor: util.AtomicValue{}, syncLoopMonitor: util.AtomicValue{},
resolverConfig: resolverConfig, resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
} }
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@@ -317,7 +319,8 @@ func NewMainKubelet(
klet.httpClient, klet.httpClient,
dockerExecHandler, dockerExecHandler,
oomAdjuster, oomAdjuster,
procFs) procFs,
klet.cpuCFSQuota)
case "rkt": case "rkt":
conf := &rkt.Config{ conf := &rkt.Config{
Path: rktPath, Path: rktPath,
@@ -556,6 +559,9 @@ type Kubelet struct {
// Optionally shape the bandwidth of a pod // Optionally shape the bandwidth of a pod
shaper bandwidth.BandwidthShaper shaper bandwidth.BandwidthShaper
// True if container cpu limits should be enforced via cgroup CFS quota
cpuCFSQuota bool
} }
// getRootDir returns the full path to the directory under which kubelet can // getRootDir returns the full path to the directory under which kubelet can