controller/nodelifecycle: Make monitorNodeHealth process nodes concurrently
Marking the pods not ready on a node requires looping over them and updating each pod's status one at a time. This is performed serially, and can take a while if we're processing each node serially as well. Since the time is spent waiting on io, there's an opportunity to go faster by processing multiple nodes concurrently. This change modifies the loop to process nodes in parallel, using the same number of workers as doNodeProcessingPassWorker. This change also introduces histogram metrics to better observe monitorNodeHealth.
This commit is contained in:
@@ -2451,6 +2451,128 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize tests the happy path of
|
||||
// TestMonitorNodeHealthMarkPodsNotReady with a large number of nodes/pods and
|
||||
// varying numbers of workers.
|
||||
func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) {
|
||||
const numNodes = 50
|
||||
const podsPerNode = 100
|
||||
makeNodes := func() []*v1.Node {
|
||||
nodes := make([]*v1.Node, numNodes)
|
||||
// Node created long time ago, with status updated by kubelet exceeds grace period.
|
||||
// Expect pods status updated and Unknown node status posted from node controller
|
||||
for i := 0; i < numNodes; i++ {
|
||||
nodes[i] = &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("node%d", i),
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{
|
||||
{
|
||||
Type: v1.NodeReady,
|
||||
Status: v1.ConditionTrue,
|
||||
// Node status hasn't been updated for 1hr.
|
||||
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
makePods := func() []v1.Pod {
|
||||
pods := make([]v1.Pod, numNodes*podsPerNode)
|
||||
for i := 0; i < numNodes*podsPerNode; i++ {
|
||||
pods[i] = *testutil.NewPod(fmt.Sprintf("pod%d", i), fmt.Sprintf("node%d", i%numNodes))
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
table := []struct {
|
||||
workers int
|
||||
}{
|
||||
{workers: 0}, // will default to scheduler.UpdateWorkerSize
|
||||
{workers: 1},
|
||||
}
|
||||
|
||||
for i, item := range table {
|
||||
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||
|
||||
fakeNodeHandler := &testutil.FakeNodeHandler{
|
||||
Existing: makeNodes(),
|
||||
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: makePods()}),
|
||||
}
|
||||
|
||||
nodeController, _ := newNodeLifecycleControllerFromClient(
|
||||
context.TODO(),
|
||||
fakeNodeHandler,
|
||||
5*time.Minute,
|
||||
testRateLimiterQPS,
|
||||
testRateLimiterQPS,
|
||||
testLargeClusterThreshold,
|
||||
testUnhealthyThreshold,
|
||||
testNodeMonitorGracePeriod,
|
||||
testNodeStartupGracePeriod,
|
||||
testNodeMonitorPeriod,
|
||||
false)
|
||||
nodeController.now = func() metav1.Time { return fakeNow }
|
||||
nodeController.recorder = testutil.NewFakeRecorder()
|
||||
nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(fakeNodeHandler.Clientset)
|
||||
if item.workers != 0 {
|
||||
nodeController.nodeUpdateWorkerSize = item.workers
|
||||
}
|
||||
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
|
||||
t.Errorf("Case[%d] unexpected error: %v", i, err)
|
||||
}
|
||||
|
||||
nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(1 * time.Minute)} }
|
||||
for i := range fakeNodeHandler.Existing {
|
||||
fakeNodeHandler.Existing[i].Status = v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{
|
||||
{
|
||||
Type: v1.NodeReady,
|
||||
Status: v1.ConditionTrue,
|
||||
// Node status hasn't been updated for 1hr.
|
||||
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
|
||||
t.Errorf("Case[%d] unexpected error: %v", i, err)
|
||||
}
|
||||
|
||||
podStatusUpdates := 0
|
||||
for _, action := range fakeNodeHandler.Actions() {
|
||||
if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" {
|
||||
podStatusUpdates++
|
||||
}
|
||||
}
|
||||
const expectedPodStatusUpdates = numNodes * podsPerNode
|
||||
if podStatusUpdates != expectedPodStatusUpdates {
|
||||
t.Errorf("Case[%d] expect pod status updated to be %v, but got %v", i, expectedPodStatusUpdates, podStatusUpdates)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
|
||||
type nodeIteration struct {
|
||||
timeToPass time.Duration
|
||||
|
Reference in New Issue
Block a user