Second attempt: Plumb context to Kubelet CRI calls (#113591)

* plumb context from CRI calls through kubelet

* clean up extra timeouts

* try fixing incorrectly cancelled context
This commit is contained in:
David Ashpole
2022-11-05 09:02:13 -04:00
committed by GitHub
parent 27766455f1
commit 64af1adace
115 changed files with 1444 additions and 1190 deletions

View File

@@ -177,6 +177,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
ctx := context.Background()
// generate one more in inputImageList than we configure the Kubelet to report,
// or 5 images if unlimited
numTestImages := int(tc.nodeStatusMaxImages) + 1
@@ -290,7 +291,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
@@ -315,6 +316,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
func TestUpdateExistingNodeStatus(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@@ -478,7 +480,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
@@ -506,6 +508,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
ctx := context.Background()
if testing.Short() {
t.Skip("skipping test in short mode.")
}
@@ -559,7 +562,7 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
}
// should return an error, but not hang
assert.Error(t, kubelet.updateNodeStatus())
assert.Error(t, kubelet.updateNodeStatus(ctx))
// should have attempted multiple times
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry {
@@ -572,6 +575,7 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
}
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@@ -681,13 +685,13 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
checkNodeStatus := func(status v1.ConditionStatus, reason string) {
kubeClient.ClearActions()
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
updatedNode, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), testKubeletHostname, metav1.GetOptions{})
updatedNode, err := kubeClient.CoreV1().Nodes().Get(ctx, testKubeletHostname, metav1.GetOptions{})
require.NoError(t, err, "can't apply node status patch")
for i, cond := range updatedNode.Status.Conditions {
@@ -781,17 +785,19 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
}
func TestUpdateNodeStatusError(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain
assert.Error(t, kubelet.updateNodeStatus())
assert.Error(t, kubelet.updateNodeStatus(ctx))
assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry)
}
func TestUpdateNodeStatusWithLease(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
clock := testKubelet.fakeClock
@@ -911,7 +917,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
// Update node status when node status is created.
// Report node status.
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
@@ -934,7 +940,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
// Update node status again when nothing is changed (except heartbeat time).
// Report node status if it has exceeded the duration of nodeStatusReportFrequency.
clock.Step(time.Minute)
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
// 2 more action (There were 2 actions before).
actions = kubeClient.Actions()
@@ -959,7 +965,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
// Update node status again when nothing is changed (except heartbeat time).
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
// Only 1 more action (There were 4 actions before).
actions = kubeClient.Actions()
@@ -977,7 +983,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
newMachineInfo := oldMachineInfo.Clone()
newMachineInfo.MemoryCapacity = uint64(newMemoryCapacity)
kubelet.setCachedMachineInfo(newMachineInfo)
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
// 2 more action (There were 5 actions before).
actions = kubeClient.Actions()
@@ -1009,7 +1015,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
updatedNode.Spec.PodCIDR = podCIDRs[0]
updatedNode.Spec.PodCIDRs = podCIDRs
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now")
// 2 more action (There were 7 actions before).
actions = kubeClient.Actions()
@@ -1022,7 +1028,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
clock.Step(10 * time.Second)
assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated")
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
// Only 1 more action (There were 9 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 10)
@@ -1078,6 +1084,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
ctx := context.Background()
// Setup
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@@ -1094,7 +1101,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
kubelet.volumeManager = fakeVolumeManager
// Only test VolumesInUse setter
kubelet.setNodeStatusFuncs = []func(*v1.Node) error{
kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{
nodestatus.VolumesInUse(kubelet.volumeManager.ReconcilerStatesHasBeenSynced,
kubelet.volumeManager.GetVolumesInUse),
}
@@ -1103,7 +1110,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain
// Execute
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
// Validate
actions := kubeClient.Actions()
@@ -1345,6 +1352,7 @@ func TestTryRegisterWithApiServer(t *testing.T) {
}
func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
ctx := context.Background()
const nodeStatusMaxImages = 5
// generate one more in inputImageList than we configure the Kubelet to report
@@ -1403,7 +1411,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
@@ -2817,6 +2825,7 @@ func TestUpdateNodeAddresses(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
ctx := context.Background()
oldNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
@@ -2832,15 +2841,15 @@ func TestUpdateNodeAddresses(t *testing.T) {
},
}
_, err := kubeClient.CoreV1().Nodes().Update(context.TODO(), oldNode, metav1.UpdateOptions{})
_, err := kubeClient.CoreV1().Nodes().Update(ctx, oldNode, metav1.UpdateOptions{})
assert.NoError(t, err)
kubelet.setNodeStatusFuncs = []func(*v1.Node) error{
func(node *v1.Node) error {
kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{
func(_ context.Context, node *v1.Node) error {
node.Status.Addresses = expectedNode.Status.Addresses
return nil
},
}
assert.NoError(t, kubelet.updateNodeStatus())
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
lastAction := actions[len(actions)-1]