Expose flags for new NodeEviction logic in NodeController

This commit is contained in:
gmarek
2016-08-05 14:50:19 +02:00
parent 68327f76bf
commit 4cf698ef04
10 changed files with 1146 additions and 938 deletions

View File

@@ -40,31 +40,6 @@ const (
LargeClusterThreshold = 20
)
// This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone.
// The zone is considered:
// - fullyDisrupted if there're no Ready Nodes,
// - partiallyDisrupted if more than 1/3 of Nodes (at least 3) are not Ready,
// - normal otherwise
func ComputeZoneState(nodeReadyConditions []*api.NodeCondition) zoneState {
readyNodes := 0
notReadyNodes := 0
for i := range nodeReadyConditions {
if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == api.ConditionTrue {
readyNodes++
} else {
notReadyNodes++
}
}
switch {
case readyNodes == 0 && notReadyNodes > 0:
return stateFullDisruption
case notReadyNodes > 2 && 2*notReadyNodes > readyNodes:
return statePartialDisruption
default:
return stateNormal
}
}
// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
@@ -336,15 +311,3 @@ func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder
}
return complete, nextAttempt, nil
}
func HealthyQPSFunc(nodeNum int, defaultQPS float32) float32 {
return defaultQPS
}
// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func ReducedQPSFunc(nodeNum int, defaultQPS float32) float32 {
if nodeNum > LargeClusterThreshold {
return defaultQPS / 10
}
return 0
}

View File

@@ -120,7 +120,6 @@ type NodeController struct {
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*RateLimitedTimedQueue
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
evictionLimiterQPS float32
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
@@ -140,10 +139,14 @@ type NodeController struct {
forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error)
computeZoneStateFunc func(nodeConditions []*api.NodeCondition) zoneState
enterPartialDisruptionFunc func(nodeNum int, defaultQPS float32) float32
enterFullDisruptionFunc func(nodeNum int, defaultQPS float32) float32
enterPartialDisruptionFunc func(nodeNum int) float32
enterFullDisruptionFunc func(nodeNum int) float32
zoneStates map[string]zoneState
zoneStates map[string]zoneState
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
@@ -163,6 +166,9 @@ func NewNodeController(
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
@@ -195,31 +201,34 @@ func NewNodeController(
}
nc := &NodeController{
cloud: cloud,
knownNodeSet: make(map[string]*api.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: unversioned.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
enterPartialDisruptionFunc: ReducedQPSFunc,
enterFullDisruptionFunc: HealthyQPSFunc,
computeZoneStateFunc: ComputeZoneState,
evictionLimiterQPS: evictionLimiterQPS,
zoneStates: make(map[string]zoneState),
cloud: cloud,
knownNodeSet: make(map[string]*api.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: unversioned.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState),
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: nc.maybeDeleteTerminatingPod,
@@ -336,6 +345,9 @@ func NewNodeControllerFromClient(
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
@@ -344,8 +356,9 @@ func NewNodeControllerFromClient(
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) {
podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc())
nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod,
nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS,
largeClusterThreshold, unhealthyZoneThreshold, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR,
serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
if err != nil {
return nil, err
}
@@ -650,14 +663,14 @@ func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zone
nc.zoneTerminationEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
case statePartialDisruption:
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize, nc.evictionLimiterQPS))
nc.enterPartialDisruptionFunc(zoneSize))
nc.zoneTerminationEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize, nc.evictionLimiterQPS))
nc.enterPartialDisruptionFunc(zoneSize))
case stateFullDisruption:
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize, nc.evictionLimiterQPS))
nc.enterFullDisruptionFunc(zoneSize))
nc.zoneTerminationEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize, nc.evictionLimiterQPS))
nc.enterFullDisruptionFunc(zoneSize))
}
}
@@ -871,3 +884,41 @@ func (nc *NodeController) evictPods(node *api.Node) bool {
defer nc.evictorLock.Unlock()
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
return nc.evictionLimiterQPS
}
// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
if int32(nodeNum) > nc.largeClusterThreshold {
return nc.secondaryEvictionLimiterQPS
}
return 0
}
// This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone.
// The zone is considered:
// - fullyDisrupted if there're no Ready Nodes,
// - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
// - normal otherwise
func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*api.NodeCondition) zoneState {
readyNodes := 0
notReadyNodes := 0
for i := range nodeReadyConditions {
if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == api.ConditionTrue {
readyNodes++
} else {
notReadyNodes++
}
}
switch {
case readyNodes == 0 && notReadyNodes > 0:
return stateFullDisruption
case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
return statePartialDisruption
default:
return stateNormal
}
}

View File

@@ -36,6 +36,8 @@ const (
testNodeStartupGracePeriod = 60 * time.Second
testNodeMonitorPeriod = 5 * time.Second
testRateLimiterQPS = float32(10000)
testLargeClusterThreshold = 20
testUnhealtyThreshold = float32(0.55)
)
func TestMonitorNodeStatusEvictPods(t *testing.T) {
@@ -461,7 +463,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table {
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets {
@@ -978,13 +980,13 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: item.podList}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod,
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
nodeController.enterPartialDisruptionFunc = func(nodeNum int, defaultQPS float32) float32 {
nodeController.enterPartialDisruptionFunc = func(nodeNum int) float32 {
return testRateLimiterQPS
}
nodeController.enterFullDisruptionFunc = func(nodeNum int, defaultQPS float32) float32 {
nodeController.enterFullDisruptionFunc = func(nodeNum int) float32 {
return testRateLimiterQPS
}
if err := nodeController.monitorNodeStatus(); err != nil {
@@ -1071,7 +1073,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
deleteWaitChan: make(chan struct{}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{}
@@ -1304,7 +1306,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for i, item := range table {
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@@ -1454,7 +1457,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}
for i, item := range table {
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
nodeController, _ := NewNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@@ -1536,7 +1540,8 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
@@ -1596,8 +1601,10 @@ func TestNodeEventGeneration(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
fakeRecorder := NewFakeRecorder()
nodeController.recorder = fakeRecorder
@@ -1707,7 +1714,7 @@ func TestCheckPod(t *testing.T) {
},
}
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
@@ -1774,7 +1781,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"),
newPod("c", "gone"),
}
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))