Merge pull request #1457 from brendandburns/pull
Add a rate limiter, use it to rate limit docker pulls.
This commit is contained in:
@@ -61,6 +61,8 @@ var (
|
|||||||
etcdServerList util.StringList
|
etcdServerList util.StringList
|
||||||
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
|
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
|
||||||
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
|
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
|
||||||
|
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
|
||||||
|
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -157,7 +159,9 @@ func main() {
|
|||||||
cadvisorClient,
|
cadvisorClient,
|
||||||
etcdClient,
|
etcdClient,
|
||||||
*rootDirectory,
|
*rootDirectory,
|
||||||
*syncFrequency)
|
*syncFrequency,
|
||||||
|
float32(*registryPullQPS),
|
||||||
|
*registryBurst)
|
||||||
|
|
||||||
health.AddHealthChecker("exec", health.NewExecHealthChecker(k))
|
health.AddHealthChecker("exec", health.NewExecHealthChecker(k))
|
||||||
health.AddHealthChecker("http", health.NewHTTPHealthChecker(&http.Client{}))
|
health.AddHealthChecker("http", health.NewHTTPHealthChecker(&http.Client{}))
|
||||||
|
@@ -28,6 +28,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/fsouza/go-dockerclient"
|
"github.com/fsouza/go-dockerclient"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
@@ -64,8 +65,13 @@ type dockerPuller struct {
|
|||||||
keyring *dockerKeyring
|
keyring *dockerKeyring
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type throttledDockerPuller struct {
|
||||||
|
puller dockerPuller
|
||||||
|
limiter util.RateLimiter
|
||||||
|
}
|
||||||
|
|
||||||
// NewDockerPuller creates a new instance of the default implementation of DockerPuller.
|
// NewDockerPuller creates a new instance of the default implementation of DockerPuller.
|
||||||
func NewDockerPuller(client DockerInterface) DockerPuller {
|
func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
|
||||||
dp := dockerPuller{
|
dp := dockerPuller{
|
||||||
client: client,
|
client: client,
|
||||||
keyring: newDockerKeyring(),
|
keyring: newDockerKeyring(),
|
||||||
@@ -81,8 +87,13 @@ func NewDockerPuller(client DockerInterface) DockerPuller {
|
|||||||
if dp.keyring.count() == 0 {
|
if dp.keyring.count() == 0 {
|
||||||
glog.V(1).Infof("Continuing with empty Docker keyring")
|
glog.V(1).Infof("Continuing with empty Docker keyring")
|
||||||
}
|
}
|
||||||
|
if qps == 0.0 {
|
||||||
return dp
|
return dp
|
||||||
|
}
|
||||||
|
return &throttledDockerPuller{
|
||||||
|
puller: dp,
|
||||||
|
limiter: util.NewTokenBucketRateLimiter(qps, burst),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type dockerContainerCommandRunner struct{}
|
type dockerContainerCommandRunner struct{}
|
||||||
@@ -130,6 +141,13 @@ func (p dockerPuller) Pull(image string) error {
|
|||||||
return p.client.PullImage(opts, creds)
|
return p.client.PullImage(opts, creds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p throttledDockerPuller) Pull(image string) error {
|
||||||
|
if p.limiter.CanAccept() {
|
||||||
|
return p.puller.Pull(image)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("pull QPS exceeded.")
|
||||||
|
}
|
||||||
|
|
||||||
// DockerContainers is a map of containers
|
// DockerContainers is a map of containers
|
||||||
type DockerContainers map[DockerID]*docker.APIContainers
|
type DockerContainers map[DockerID]*docker.APIContainers
|
||||||
|
|
||||||
|
@@ -67,7 +67,9 @@ func NewMainKubelet(
|
|||||||
cc CadvisorInterface,
|
cc CadvisorInterface,
|
||||||
ec tools.EtcdClient,
|
ec tools.EtcdClient,
|
||||||
rd string,
|
rd string,
|
||||||
ri time.Duration) *Kubelet {
|
ri time.Duration,
|
||||||
|
pullQPS float32,
|
||||||
|
pullBurst int) *Kubelet {
|
||||||
return &Kubelet{
|
return &Kubelet{
|
||||||
hostname: hn,
|
hostname: hn,
|
||||||
dockerClient: dc,
|
dockerClient: dc,
|
||||||
@@ -78,6 +80,8 @@ func NewMainKubelet(
|
|||||||
podWorkers: newPodWorkers(),
|
podWorkers: newPodWorkers(),
|
||||||
runner: dockertools.NewDockerContainerCommandRunner(),
|
runner: dockertools.NewDockerContainerCommandRunner(),
|
||||||
httpClient: &http.Client{},
|
httpClient: &http.Client{},
|
||||||
|
pullQPS: pullQPS,
|
||||||
|
pullBurst: pullBurst,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -119,6 +123,10 @@ type Kubelet struct {
|
|||||||
runner dockertools.ContainerCommandRunner
|
runner dockertools.ContainerCommandRunner
|
||||||
// Optional, client for http requests, defaults to empty client
|
// Optional, client for http requests, defaults to empty client
|
||||||
httpClient httpGetInterface
|
httpClient httpGetInterface
|
||||||
|
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
|
||||||
|
pullQPS float32
|
||||||
|
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
|
||||||
|
pullBurst int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the kubelet reacting to config updates
|
// Run starts the kubelet reacting to config updates
|
||||||
@@ -127,7 +135,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
|
|||||||
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
|
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
|
||||||
}
|
}
|
||||||
if kl.dockerPuller == nil {
|
if kl.dockerPuller == nil {
|
||||||
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient)
|
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
|
||||||
}
|
}
|
||||||
if kl.healthChecker == nil {
|
if kl.healthChecker == nil {
|
||||||
kl.healthChecker = health.NewHealthChecker()
|
kl.healthChecker = health.NewHealthChecker()
|
||||||
@@ -376,7 +384,9 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error
|
|||||||
Image: networkContainerImage,
|
Image: networkContainerImage,
|
||||||
Ports: ports,
|
Ports: ports,
|
||||||
}
|
}
|
||||||
kl.dockerPuller.Pull(networkContainerImage)
|
if err := kl.dockerPuller.Pull(networkContainerImage); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
return kl.runContainer(pod, container, nil, "")
|
return kl.runContainer(pod, container, nil, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
104
pkg/util/throttle.go
Normal file
104
pkg/util/throttle.go
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RateLimiter interface {
|
||||||
|
// CanAccept returns true if the rate is below the limit, false otherwise
|
||||||
|
CanAccept() bool
|
||||||
|
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
type tickRateLimiter struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
tokens chan bool
|
||||||
|
ticker <-chan time.Time
|
||||||
|
stop chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
|
||||||
|
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
|
||||||
|
// smoothed qps rate of 'qps'.
|
||||||
|
// The bucket is initially filled with 'burst' tokens, the rate limiter spawns a go routine
|
||||||
|
// which refills the bucket with one token at a rate of 'qps'. The maximum number of tokens in
|
||||||
|
// the bucket is capped at 'burst'.
|
||||||
|
// When done with the limiter, Stop() must be called to halt the associated goroutine.
|
||||||
|
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
||||||
|
ticker := time.Tick(time.Duration(float32(time.Second) / qps))
|
||||||
|
rate := newTokenBucketRateLimiterFromTicker(ticker, burst)
|
||||||
|
go rate.run()
|
||||||
|
return rate
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter {
|
||||||
|
if burst < 1 {
|
||||||
|
panic("burst must be a positive integer")
|
||||||
|
}
|
||||||
|
rate := &tickRateLimiter{
|
||||||
|
tokens: make(chan bool, burst),
|
||||||
|
ticker: ticker,
|
||||||
|
stop: make(chan bool),
|
||||||
|
}
|
||||||
|
for i := 0; i < burst; i++ {
|
||||||
|
rate.tokens <- true
|
||||||
|
}
|
||||||
|
return rate
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tickRateLimiter) CanAccept() bool {
|
||||||
|
select {
|
||||||
|
case <-t.tokens:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tickRateLimiter) Stop() {
|
||||||
|
close(t.stop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *tickRateLimiter) run() {
|
||||||
|
for {
|
||||||
|
if !r.step() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *tickRateLimiter) step() bool {
|
||||||
|
select {
|
||||||
|
case <-r.ticker:
|
||||||
|
r.increment()
|
||||||
|
return true
|
||||||
|
case <-r.stop:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tickRateLimiter) increment() {
|
||||||
|
// non-blocking send
|
||||||
|
select {
|
||||||
|
case t.tokens <- true:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
62
pkg/util/throttle_test.go
Normal file
62
pkg/util/throttle_test.go
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBasicThrottle(t *testing.T) {
|
||||||
|
ticker := make(chan time.Time, 1)
|
||||||
|
r := newTokenBucketRateLimiterFromTicker(ticker, 3)
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
if !r.CanAccept() {
|
||||||
|
t.Error("unexpected false accept")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.CanAccept() {
|
||||||
|
t.Error("unexpected true accept")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIncrementThrottle(t *testing.T) {
|
||||||
|
ticker := make(chan time.Time, 1)
|
||||||
|
r := newTokenBucketRateLimiterFromTicker(ticker, 1)
|
||||||
|
if !r.CanAccept() {
|
||||||
|
t.Error("unexpected false accept")
|
||||||
|
}
|
||||||
|
if r.CanAccept() {
|
||||||
|
t.Error("unexpected true accept")
|
||||||
|
}
|
||||||
|
ticker <- time.Now()
|
||||||
|
r.step()
|
||||||
|
|
||||||
|
if !r.CanAccept() {
|
||||||
|
t.Error("unexpected false accept")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOverBurst(t *testing.T) {
|
||||||
|
ticker := make(chan time.Time, 1)
|
||||||
|
r := newTokenBucketRateLimiterFromTicker(ticker, 3)
|
||||||
|
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
ticker <- time.Now()
|
||||||
|
r.step()
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user