Ratelimit replica creation

This commit is contained in:
Prashanth Balasubramanian
2015-05-06 14:39:14 -07:00
parent 12de230bb6
commit 4fdd5bc3f3
7 changed files with 142 additions and 26 deletions

View File

@@ -58,12 +58,15 @@ const (
// of expectations, without it the RC could stay asleep forever. This should
// be set based on the expected latency of watch events.
//
// TODO: Set this per expectation, based on its size.
// Currently an rc can service (create *and* observe the watch events for said
// creation) about 10-20 pods a second, so it takes about 3.5 min to service
// 3000 pods. Just creation is limited to 30qps, and watching happens with
// ~10-30s latency/pod at scale.
ExpectationsTimeout = 6 * time.Minute
// creation) about 10-20 pods a second, so it takes about 1 min to service
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 3 * time.Minute
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
@@ -72,6 +75,9 @@ type ReplicationManager struct {
kubeClient client.Interface
podControl PodControlInterface
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncReplicationController for testing.
syncHandler func(rcKey string) error
// A TTLCache of pod creates/deletes each rc expects to see
@@ -89,7 +95,7 @@ type ReplicationManager struct {
}
// NewReplicationManager creates a new ReplicationManager.
func NewReplicationManager(kubeClient client.Interface) *ReplicationManager {
func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
@@ -99,8 +105,9 @@ func NewReplicationManager(kubeClient client.Interface) *ReplicationManager {
kubeClient: kubeClient,
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
},
expectations: NewRCExpectations(),
queue: workqueue.New(),
burstReplicas: burstReplicas,
expectations: NewRCExpectations(),
queue: workqueue.New(),
}
rm.controllerStore.Store, rm.rcController = framework.NewInformer(
@@ -277,15 +284,19 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller
diff := len(filteredPods) - controller.Spec.Replicas
if diff < 0 {
diff *= -1
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
rm.expectations.ExpectCreations(controller, diff)
wait := sync.WaitGroup{}
wait.Add(diff)
glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
if err := rm.podControl.createReplica(controller.Namespace, controller); err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", controller.Namespace, controller.Name)
rm.expectations.CreationObserved(controller)
util.HandleError(err)
}
@@ -293,8 +304,11 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller
}
wait.Wait()
} else if diff > 0 {
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
rm.expectations.ExpectDeletions(controller, diff)
glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff)
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff)
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
@@ -307,6 +321,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller
defer wait.Done()
if err := rm.podControl.deletePod(controller.Namespace, filteredPods[ix].Name); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", controller.Namespace, controller.Name)
rm.expectations.DeletionObserved(controller)
}
}(i)