kubelet: pass context to VolumeManager.WaitFor*
This allows us to return with a timeout error as soon as the context is canceled. Previously in cases where the mount will never succeed pods can get stuck deleting for 2 minutes. In the Sync*Pod methods that call VolumeManager.WaitFor*, we must filter out wait.Interrupted errors from being logged as they are part of control flow, not runtime problems. Any early interruption should result in exiting the Sync*Pod method as quickly as possible without logging intermediate errors.
This commit is contained in:
		 Todd Neal
					Todd Neal
				
			
				
					committed by
					
						 Clayton Coleman
						Clayton Coleman
					
				
			
			
				
	
			
			
			 Clayton Coleman
						Clayton Coleman
					
				
			
						parent
						
							f33c4a1c79
						
					
				
				
					commit
					453f81d1ca
				
			| @@ -1641,10 +1641,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { | |||||||
| // This operation writes all events that are dispatched in order to provide | // This operation writes all events that are dispatched in order to provide | ||||||
| // the most accurate information possible about an error situation to aid debugging. | // the most accurate information possible about an error situation to aid debugging. | ||||||
| // Callers should not write an event if this operation returns an error. | // Callers should not write an event if this operation returns an error. | ||||||
| func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { | func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { | ||||||
| 	// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. | 	ctx, otelSpan := kl.tracer.Start(ctx, "syncPod", trace.WithAttributes( | ||||||
| 	// Currently, using that context causes test failures. |  | ||||||
| 	ctx, otelSpan := kl.tracer.Start(context.TODO(), "syncPod", trace.WithAttributes( |  | ||||||
| 		attribute.String("k8s.pod.uid", string(pod.UID)), | 		attribute.String("k8s.pod.uid", string(pod.UID)), | ||||||
| 		attribute.String("k8s.pod", klog.KObj(pod).String()), | 		attribute.String("k8s.pod", klog.KObj(pod).String()), | ||||||
| 		attribute.String("k8s.pod.name", pod.Name), | 		attribute.String("k8s.pod.name", pod.Name), | ||||||
| @@ -1739,13 +1737,15 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, | |||||||
| 		var syncErr error | 		var syncErr error | ||||||
| 		p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) | 		p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) | ||||||
| 		if err := kl.killPod(ctx, pod, p, nil); err != nil { | 		if err := kl.killPod(ctx, pod, p, nil); err != nil { | ||||||
| 			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) | 			if !wait.Interrupted(err) { | ||||||
| 			syncErr = fmt.Errorf("error killing pod: %v", err) | 				kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) | ||||||
| 			utilruntime.HandleError(syncErr) | 				syncErr = fmt.Errorf("error killing pod: %w", err) | ||||||
|  | 				utilruntime.HandleError(syncErr) | ||||||
|  | 			} | ||||||
| 		} else { | 		} else { | ||||||
| 			// There was no error killing the pod, but the pod cannot be run. | 			// There was no error killing the pod, but the pod cannot be run. | ||||||
| 			// Return an error to signal that the sync loop should back off. | 			// Return an error to signal that the sync loop should back off. | ||||||
| 			syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) | 			syncErr = fmt.Errorf("pod cannot be run: %v", runnable.Message) | ||||||
| 		} | 		} | ||||||
| 		return false, syncErr | 		return false, syncErr | ||||||
| 	} | 	} | ||||||
| @@ -1791,6 +1791,9 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, | |||||||
| 		if !pcm.Exists(pod) && !firstSync { | 		if !pcm.Exists(pod) && !firstSync { | ||||||
| 			p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) | 			p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) | ||||||
| 			if err := kl.killPod(ctx, pod, p, nil); err == nil { | 			if err := kl.killPod(ctx, pod, p, nil); err == nil { | ||||||
|  | 				if wait.Interrupted(err) { | ||||||
|  | 					return false, err | ||||||
|  | 				} | ||||||
| 				podKilled = true | 				podKilled = true | ||||||
| 			} else { | 			} else { | ||||||
| 				klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus) | 				klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus) | ||||||
| @@ -1854,15 +1857,13 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, | |||||||
| 		return false, err | 		return false, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Volume manager will not mount volumes for terminating pods | 	// Wait for volumes to attach/mount | ||||||
| 	// TODO: once context cancellation is added this check can be removed | 	if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil { | ||||||
| 	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { | 		if !wait.Interrupted(err) { | ||||||
| 		// Wait for volumes to attach/mount |  | ||||||
| 		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { |  | ||||||
| 			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) | 			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) | ||||||
| 			klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) | 			klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) | ||||||
| 			return false, err |  | ||||||
| 		} | 		} | ||||||
|  | 		return false, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Fetch the pull secrets for the pod | 	// Fetch the pull secrets for the pod | ||||||
| @@ -1881,8 +1882,13 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. | ||||||
|  | 	// Currently, using that context causes test failures. To remove this todoCtx, any wait.Interrupted | ||||||
|  | 	// errors need to be filtered from result and bypass the reasonCache - cancelling the context for | ||||||
|  | 	// SyncPod is a known and deliberate error, not a generic error. | ||||||
|  | 	todoCtx := context.TODO() | ||||||
| 	// Call the container runtime's SyncPod callback | 	// Call the container runtime's SyncPod callback | ||||||
| 	result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff) | 	result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff) | ||||||
| 	kl.reasonCache.Update(pod.UID, result) | 	kl.reasonCache.Update(pod.UID, result) | ||||||
| 	if err := result.Error(); err != nil { | 	if err := result.Error(); err != nil { | ||||||
| 		// Do not return error if the only failures were pods in backoff | 		// Do not return error if the only failures were pods in backoff | ||||||
| @@ -2056,7 +2062,7 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube | |||||||
| // This typically occurs when a pod is force deleted from configuration (local disk or API) and the | // This typically occurs when a pod is force deleted from configuration (local disk or API) and the | ||||||
| // kubelet restarts in the middle of the action. | // kubelet restarts in the middle of the action. | ||||||
| func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { | func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { | ||||||
| 	_, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatedPod", trace.WithAttributes( | 	ctx, otelSpan := kl.tracer.Start(ctx, "syncTerminatedPod", trace.WithAttributes( | ||||||
| 		attribute.String("k8s.pod.uid", string(pod.UID)), | 		attribute.String("k8s.pod.uid", string(pod.UID)), | ||||||
| 		attribute.String("k8s.pod", klog.KObj(pod).String()), | 		attribute.String("k8s.pod", klog.KObj(pod).String()), | ||||||
| 		attribute.String("k8s.pod.name", pod.Name), | 		attribute.String("k8s.pod.name", pod.Name), | ||||||
| @@ -2074,7 +2080,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus | |||||||
|  |  | ||||||
| 	// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied | 	// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied | ||||||
| 	// before syncTerminatedPod is invoked) | 	// before syncTerminatedPod is invoked) | ||||||
| 	if err := kl.volumeManager.WaitForUnmount(pod); err != nil { | 	if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID) | 	klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID) | ||||||
|   | |||||||
| @@ -94,6 +94,7 @@ import ( | |||||||
| 	"k8s.io/kubernetes/pkg/volume" | 	"k8s.io/kubernetes/pkg/volume" | ||||||
| 	"k8s.io/kubernetes/pkg/volume/gcepd" | 	"k8s.io/kubernetes/pkg/volume/gcepd" | ||||||
| 	_ "k8s.io/kubernetes/pkg/volume/hostpath" | 	_ "k8s.io/kubernetes/pkg/volume/hostpath" | ||||||
|  | 	volumesecret "k8s.io/kubernetes/pkg/volume/secret" | ||||||
| 	volumetest "k8s.io/kubernetes/pkg/volume/testing" | 	volumetest "k8s.io/kubernetes/pkg/volume/testing" | ||||||
| 	"k8s.io/kubernetes/pkg/volume/util" | 	"k8s.io/kubernetes/pkg/volume/util" | ||||||
| 	"k8s.io/kubernetes/pkg/volume/util/hostutil" | 	"k8s.io/kubernetes/pkg/volume/util/hostutil" | ||||||
| @@ -367,6 +368,7 @@ func newTestKubeletWithImageList( | |||||||
| 		allPlugins = append(allPlugins, plug) | 		allPlugins = append(allPlugins, plug) | ||||||
| 	} else { | 	} else { | ||||||
| 		allPlugins = append(allPlugins, gcepd.ProbeVolumePlugins()...) | 		allPlugins = append(allPlugins, gcepd.ProbeVolumePlugins()...) | ||||||
|  | 		allPlugins = append(allPlugins, volumesecret.ProbeVolumePlugins()...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var prober volume.DynamicPluginProber // TODO (#51147) inject mock | 	var prober volume.DynamicPluginProber // TODO (#51147) inject mock | ||||||
|   | |||||||
| @@ -17,8 +17,10 @@ limitations under the License. | |||||||
| package kubelet | package kubelet | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"testing" | 	"testing" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| @@ -80,7 +82,7 @@ func TestListVolumesForPod(t *testing.T) { | |||||||
| 	defer close(stopCh) | 	defer close(stopCh) | ||||||
|  |  | ||||||
| 	kubelet.podManager.SetPods([]*v1.Pod{pod}) | 	kubelet.podManager.SetPods([]*v1.Pod{pod}) | ||||||
| 	err := kubelet.volumeManager.WaitForAttachAndMount(pod) | 	err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
|  |  | ||||||
| 	podName := util.GetUniquePodName(pod) | 	podName := util.GetUniquePodName(pod) | ||||||
| @@ -199,7 +201,7 @@ func TestPodVolumesExist(t *testing.T) { | |||||||
|  |  | ||||||
| 	kubelet.podManager.SetPods(pods) | 	kubelet.podManager.SetPods(pods) | ||||||
| 	for _, pod := range pods { | 	for _, pod := range pods { | ||||||
| 		err := kubelet.volumeManager.WaitForAttachAndMount(pod) | 		err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 		assert.NoError(t, err) | 		assert.NoError(t, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -209,6 +211,131 @@ func TestPodVolumesExist(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestPodVolumeDeadlineAttachAndMount(t *testing.T) { | ||||||
|  | 	if testing.Short() { | ||||||
|  | 		t.Skip("skipping test in short mode.") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	testKubelet := newTestKubeletWithImageList(t, nil /*imageList*/, false, /* controllerAttachDetachEnabled */ | ||||||
|  | 		false /*initFakeVolumePlugin*/, true /*localStorageCapacityIsolation*/) | ||||||
|  |  | ||||||
|  | 	defer testKubelet.Cleanup() | ||||||
|  | 	kubelet := testKubelet.kubelet | ||||||
|  |  | ||||||
|  | 	// any test cases added here should have volumes that fail to mount | ||||||
|  | 	pods := []*v1.Pod{ | ||||||
|  | 		{ | ||||||
|  | 			ObjectMeta: metav1.ObjectMeta{ | ||||||
|  | 				Name: "pod1", | ||||||
|  | 				UID:  "pod1uid", | ||||||
|  | 			}, | ||||||
|  | 			Spec: v1.PodSpec{ | ||||||
|  | 				Containers: []v1.Container{ | ||||||
|  | 					{ | ||||||
|  | 						Name: "container1", | ||||||
|  | 						VolumeMounts: []v1.VolumeMount{ | ||||||
|  | 							{ | ||||||
|  | 								Name:      "vol1", | ||||||
|  | 								MountPath: "/mnt/vol1", | ||||||
|  | 							}, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 				Volumes: []v1.Volume{ | ||||||
|  | 					{ | ||||||
|  | 						Name: "vol1", | ||||||
|  | 						VolumeSource: v1.VolumeSource{ | ||||||
|  | 							Secret: &v1.SecretVolumeSource{ | ||||||
|  | 								SecretName: "non-existent", | ||||||
|  | 							}, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	stopCh := runVolumeManager(kubelet) | ||||||
|  | 	defer close(stopCh) | ||||||
|  |  | ||||||
|  | 	kubelet.podManager.SetPods(pods) | ||||||
|  | 	for _, pod := range pods { | ||||||
|  | 		start := time.Now() | ||||||
|  | 		// ensure our context times out quickly | ||||||
|  | 		ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) | ||||||
|  | 		err := kubelet.volumeManager.WaitForAttachAndMount(ctx, pod) | ||||||
|  | 		delta := time.Since(start) | ||||||
|  | 		// the standard timeout is 2 minutes, so if it's just a few seconds we know that the context timeout was the cause | ||||||
|  | 		assert.Lessf(t, delta, 10*time.Second, "WaitForAttachAndMount should timeout when the context is cancelled") | ||||||
|  | 		assert.ErrorIs(t, err, context.DeadlineExceeded) | ||||||
|  | 		cancel() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPodVolumeDeadlineUnmount(t *testing.T) { | ||||||
|  | 	if testing.Short() { | ||||||
|  | 		t.Skip("skipping test in short mode.") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	testKubelet := newTestKubeletWithImageList(t, nil /*imageList*/, false, /* controllerAttachDetachEnabled */ | ||||||
|  | 		true /*initFakeVolumePlugin*/, true /*localStorageCapacityIsolation*/) | ||||||
|  |  | ||||||
|  | 	defer testKubelet.Cleanup() | ||||||
|  | 	kubelet := testKubelet.kubelet | ||||||
|  |  | ||||||
|  | 	// any test cases added here should have volumes that succeed at mounting | ||||||
|  | 	pods := []*v1.Pod{ | ||||||
|  | 		{ | ||||||
|  | 			ObjectMeta: metav1.ObjectMeta{ | ||||||
|  | 				Name: "pod1", | ||||||
|  | 				UID:  "pod1uid", | ||||||
|  | 			}, | ||||||
|  | 			Spec: v1.PodSpec{ | ||||||
|  | 				Containers: []v1.Container{ | ||||||
|  | 					{ | ||||||
|  | 						Name: "container1", | ||||||
|  | 						VolumeMounts: []v1.VolumeMount{ | ||||||
|  | 							{ | ||||||
|  | 								Name:      "vol1", | ||||||
|  | 								MountPath: "/mnt/vol1", | ||||||
|  | 							}, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 				Volumes: []v1.Volume{ | ||||||
|  | 					{ | ||||||
|  | 						Name: "vol1", | ||||||
|  | 						VolumeSource: v1.VolumeSource{ | ||||||
|  | 							RBD: &v1.RBDVolumeSource{ | ||||||
|  | 								RBDImage: "fake-device", | ||||||
|  | 							}, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	stopCh := runVolumeManager(kubelet) | ||||||
|  | 	defer close(stopCh) | ||||||
|  |  | ||||||
|  | 	kubelet.podManager.SetPods(pods) | ||||||
|  | 	for i, pod := range pods { | ||||||
|  | 		if err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod); err != nil { | ||||||
|  | 			t.Fatalf("pod %d failed: %v", i, err) | ||||||
|  | 		} | ||||||
|  | 		start := time.Now() | ||||||
|  | 		// ensure our context times out quickly | ||||||
|  | 		ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) | ||||||
|  | 		err := kubelet.volumeManager.WaitForUnmount(ctx, pod) | ||||||
|  | 		delta := time.Since(start) | ||||||
|  | 		// the standard timeout is 2 minutes, so if it's just a few seconds we know that the context timeout was the cause | ||||||
|  | 		assert.Lessf(t, delta, 10*time.Second, "WaitForUnmount should timeout when the context is cancelled") | ||||||
|  | 		assert.ErrorIs(t, err, context.DeadlineExceeded) | ||||||
|  | 		cancel() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| func TestVolumeAttachAndMountControllerDisabled(t *testing.T) { | func TestVolumeAttachAndMountControllerDisabled(t *testing.T) { | ||||||
| 	if testing.Short() { | 	if testing.Short() { | ||||||
| 		t.Skip("skipping test in short mode.") | 		t.Skip("skipping test in short mode.") | ||||||
| @@ -246,7 +373,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) { | |||||||
| 	defer close(stopCh) | 	defer close(stopCh) | ||||||
|  |  | ||||||
| 	kubelet.podManager.SetPods([]*v1.Pod{pod}) | 	kubelet.podManager.SetPods([]*v1.Pod{pod}) | ||||||
| 	err := kubelet.volumeManager.WaitForAttachAndMount(pod) | 	err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
|  |  | ||||||
| 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | ||||||
| @@ -308,7 +435,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) { | |||||||
| 	kubelet.podManager.SetPods([]*v1.Pod{pod}) | 	kubelet.podManager.SetPods([]*v1.Pod{pod}) | ||||||
|  |  | ||||||
| 	// Verify volumes attached | 	// Verify volumes attached | ||||||
| 	err := kubelet.volumeManager.WaitForAttachAndMount(pod) | 	err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
|  |  | ||||||
| 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | ||||||
| @@ -335,7 +462,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) { | |||||||
| 	kubelet.podWorkers.(*fakePodWorkers).setPodRuntimeBeRemoved(pod.UID) | 	kubelet.podWorkers.(*fakePodWorkers).setPodRuntimeBeRemoved(pod.UID) | ||||||
| 	kubelet.podManager.SetPods([]*v1.Pod{}) | 	kubelet.podManager.SetPods([]*v1.Pod{}) | ||||||
|  |  | ||||||
| 	assert.NoError(t, kubelet.volumeManager.WaitForUnmount(pod)) | 	assert.NoError(t, kubelet.volumeManager.WaitForUnmount(context.Background(), pod)) | ||||||
| 	if actual := kubelet.volumeManager.GetMountedVolumesForPod(util.GetUniquePodName(pod)); len(actual) > 0 { | 	if actual := kubelet.volumeManager.GetMountedVolumesForPod(util.GetUniquePodName(pod)); len(actual) > 0 { | ||||||
| 		t.Fatalf("expected volume unmount to wait for no volumes: %v", actual) | 		t.Fatalf("expected volume unmount to wait for no volumes: %v", actual) | ||||||
| 	} | 	} | ||||||
| @@ -418,7 +545,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) { | |||||||
| 		stopCh, | 		stopCh, | ||||||
| 		kubelet.volumeManager) | 		kubelet.volumeManager) | ||||||
|  |  | ||||||
| 	assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(pod)) | 	assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)) | ||||||
|  |  | ||||||
| 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | ||||||
| 		util.GetUniquePodName(pod)) | 		util.GetUniquePodName(pod)) | ||||||
| @@ -504,7 +631,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { | |||||||
| 		kubelet.volumeManager) | 		kubelet.volumeManager) | ||||||
|  |  | ||||||
| 	// Verify volumes attached | 	// Verify volumes attached | ||||||
| 	assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(pod)) | 	assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)) | ||||||
|  |  | ||||||
| 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | 	podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( | ||||||
| 		util.GetUniquePodName(pod)) | 		util.GetUniquePodName(pod)) | ||||||
|   | |||||||
| @@ -17,6 +17,7 @@ limitations under the License. | |||||||
| package volumemanager | package volumemanager | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"sort" | 	"sort" | ||||||
| @@ -97,14 +98,14 @@ type VolumeManager interface { | |||||||
| 	// actual state of the world). | 	// actual state of the world). | ||||||
| 	// An error is returned if all volumes are not attached and mounted within | 	// An error is returned if all volumes are not attached and mounted within | ||||||
| 	// the duration defined in podAttachAndMountTimeout. | 	// the duration defined in podAttachAndMountTimeout. | ||||||
| 	WaitForAttachAndMount(pod *v1.Pod) error | 	WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error | ||||||
|  |  | ||||||
| 	// WaitForUnmount processes the volumes referenced in the specified | 	// WaitForUnmount processes the volumes referenced in the specified | ||||||
| 	// pod and blocks until they are all unmounted (reflected in the actual | 	// pod and blocks until they are all unmounted (reflected in the actual | ||||||
| 	// state of the world). | 	// state of the world). | ||||||
| 	// An error is returned if all volumes are not unmounted within | 	// An error is returned if all volumes are not unmounted within | ||||||
| 	// the duration defined in podAttachAndMountTimeout. | 	// the duration defined in podAttachAndMountTimeout. | ||||||
| 	WaitForUnmount(pod *v1.Pod) error | 	WaitForUnmount(ctx context.Context, pod *v1.Pod) error | ||||||
|  |  | ||||||
| 	// GetMountedVolumesForPod returns a VolumeMap containing the volumes | 	// GetMountedVolumesForPod returns a VolumeMap containing the volumes | ||||||
| 	// referenced by the specified pod that are successfully attached and | 	// referenced by the specified pod that are successfully attached and | ||||||
| @@ -385,7 +386,7 @@ func (vm *volumeManager) MarkVolumesAsReportedInUse( | |||||||
| 	vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse) | 	vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { | func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error { | ||||||
| 	if pod == nil { | 	if pod == nil { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| @@ -404,9 +405,11 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { | |||||||
| 	// like Downward API, depend on this to update the contents of the volume). | 	// like Downward API, depend on this to update the contents of the volume). | ||||||
| 	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName) | 	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName) | ||||||
|  |  | ||||||
| 	err := wait.PollImmediate( | 	err := wait.PollUntilContextTimeout( | ||||||
|  | 		ctx, | ||||||
| 		podAttachAndMountRetryInterval, | 		podAttachAndMountRetryInterval, | ||||||
| 		podAttachAndMountTimeout, | 		podAttachAndMountTimeout, | ||||||
|  | 		true, | ||||||
| 		vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes)) | 		vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes)) | ||||||
|  |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -423,7 +426,7 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		return fmt.Errorf( | 		return fmt.Errorf( | ||||||
| 			"unmounted volumes=%v, unattached volumes=%v, failed to process volumes=%v: %s", | 			"unmounted volumes=%v, unattached volumes=%v, failed to process volumes=%v: %w", | ||||||
| 			unmountedVolumes, | 			unmountedVolumes, | ||||||
| 			unattachedVolumes, | 			unattachedVolumes, | ||||||
| 			volumesNotInDSW, | 			volumesNotInDSW, | ||||||
| @@ -434,7 +437,7 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (vm *volumeManager) WaitForUnmount(pod *v1.Pod) error { | func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error { | ||||||
| 	if pod == nil { | 	if pod == nil { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| @@ -444,9 +447,11 @@ func (vm *volumeManager) WaitForUnmount(pod *v1.Pod) error { | |||||||
|  |  | ||||||
| 	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName) | 	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName) | ||||||
|  |  | ||||||
| 	err := wait.PollImmediate( | 	err := wait.PollUntilContextTimeout( | ||||||
|  | 		ctx, | ||||||
| 		podAttachAndMountRetryInterval, | 		podAttachAndMountRetryInterval, | ||||||
| 		podAttachAndMountTimeout, | 		podAttachAndMountTimeout, | ||||||
|  | 		true, | ||||||
| 		vm.verifyVolumesUnmountedFunc(uniquePodName)) | 		vm.verifyVolumesUnmountedFunc(uniquePodName)) | ||||||
|  |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -461,7 +466,7 @@ func (vm *volumeManager) WaitForUnmount(pod *v1.Pod) error { | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		return fmt.Errorf( | 		return fmt.Errorf( | ||||||
| 			"mounted volumes=%v: %s", | 			"mounted volumes=%v: %w", | ||||||
| 			mountedVolumes, | 			mountedVolumes, | ||||||
| 			err) | 			err) | ||||||
| 	} | 	} | ||||||
| @@ -499,8 +504,8 @@ func (vm *volumeManager) getUnattachedVolumes(uniquePodName types.UniquePodName) | |||||||
|  |  | ||||||
| // verifyVolumesMountedFunc returns a method that returns true when all expected | // verifyVolumesMountedFunc returns a method that returns true when all expected | ||||||
| // volumes are mounted. | // volumes are mounted. | ||||||
| func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc { | func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionWithContextFunc { | ||||||
| 	return func() (done bool, err error) { | 	return func(_ context.Context) (done bool, err error) { | ||||||
| 		if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 { | 		if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 { | ||||||
| 			return true, errors.New(strings.Join(errs, "; ")) | 			return true, errors.New(strings.Join(errs, "; ")) | ||||||
| 		} | 		} | ||||||
| @@ -510,8 +515,8 @@ func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, e | |||||||
|  |  | ||||||
| // verifyVolumesUnmountedFunc returns a method that is true when there are no mounted volumes for this | // verifyVolumesUnmountedFunc returns a method that is true when there are no mounted volumes for this | ||||||
| // pod. | // pod. | ||||||
| func (vm *volumeManager) verifyVolumesUnmountedFunc(podName types.UniquePodName) wait.ConditionFunc { | func (vm *volumeManager) verifyVolumesUnmountedFunc(podName types.UniquePodName) wait.ConditionWithContextFunc { | ||||||
| 	return func() (done bool, err error) { | 	return func(_ context.Context) (done bool, err error) { | ||||||
| 		if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 { | 		if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 { | ||||||
| 			return true, errors.New(strings.Join(errs, "; ")) | 			return true, errors.New(strings.Join(errs, "; ")) | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -17,6 +17,8 @@ limitations under the License. | |||||||
| package volumemanager | package volumemanager | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
|  |  | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/config" | 	"k8s.io/kubernetes/pkg/kubelet/config" | ||||||
| 	"k8s.io/kubernetes/pkg/kubelet/container" | 	"k8s.io/kubernetes/pkg/kubelet/container" | ||||||
| @@ -46,12 +48,12 @@ func (f *FakeVolumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan | |||||||
| } | } | ||||||
|  |  | ||||||
| // WaitForAttachAndMount is not implemented | // WaitForAttachAndMount is not implemented | ||||||
| func (f *FakeVolumeManager) WaitForAttachAndMount(pod *v1.Pod) error { | func (f *FakeVolumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // WaitForUnmount is not implemented | // WaitForUnmount is not implemented | ||||||
| func (f *FakeVolumeManager) WaitForUnmount(pod *v1.Pod) error { | func (f *FakeVolumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -106,7 +106,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { | |||||||
| 				stopCh, | 				stopCh, | ||||||
| 				manager) | 				manager) | ||||||
|  |  | ||||||
| 			err = manager.WaitForAttachAndMount(pod) | 			err = manager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 			if err != nil && !test.expectError { | 			if err != nil && !test.expectError { | ||||||
| 				t.Errorf("Expected success: %v", err) | 				t.Errorf("Expected success: %v", err) | ||||||
| 			} | 			} | ||||||
| @@ -204,7 +204,7 @@ func TestWaitForAttachAndMountError(t *testing.T) { | |||||||
|  |  | ||||||
| 	podManager.SetPods([]*v1.Pod{pod}) | 	podManager.SetPods([]*v1.Pod{pod}) | ||||||
|  |  | ||||||
| 	err = manager.WaitForAttachAndMount(pod) | 	err = manager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		t.Errorf("Expected error, got none") | 		t.Errorf("Expected error, got none") | ||||||
| 	} | 	} | ||||||
| @@ -246,7 +246,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { | |||||||
| 	go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name) | 	go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name) | ||||||
|  |  | ||||||
| 	err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { | 	err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { | ||||||
| 		err = manager.WaitForAttachAndMount(pod) | 		err = manager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			// Few "PVC not bound" errors are expected | 			// Few "PVC not bound" errors are expected | ||||||
| 			return false, nil | 			return false, nil | ||||||
| @@ -330,7 +330,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { | |||||||
| 			stopCh, | 			stopCh, | ||||||
| 			manager) | 			manager) | ||||||
|  |  | ||||||
| 		err = manager.WaitForAttachAndMount(pod) | 		err = manager.WaitForAttachAndMount(context.Background(), pod) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("Expected success: %v", err) | 			t.Errorf("Expected success: %v", err) | ||||||
| 			continue | 			continue | ||||||
|   | |||||||
							
								
								
									
										94
									
								
								test/e2e_node/terminate_pods_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										94
									
								
								test/e2e_node/terminate_pods_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,94 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2023 The Kubernetes Authors. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package e2enode | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/onsi/ginkgo/v2" | ||||||
|  | 	"github.com/onsi/gomega" | ||||||
|  | 	v1 "k8s.io/api/core/v1" | ||||||
|  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
|  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
|  | 	"k8s.io/apimachinery/pkg/util/uuid" | ||||||
|  | 	admissionapi "k8s.io/pod-security-admission/api" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/test/e2e/framework" | ||||||
|  | 	e2epod "k8s.io/kubernetes/test/e2e/framework/pod" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var _ = SIGDescribe("Terminate Pods", func() { | ||||||
|  | 	f := framework.NewDefaultFramework("terminate-pods") | ||||||
|  | 	f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline | ||||||
|  |  | ||||||
|  | 	ginkgo.It("should not hang when terminating pods mounting non-existent volumes", func(ctx context.Context) { | ||||||
|  | 		pod := &v1.Pod{ | ||||||
|  | 			ObjectMeta: metav1.ObjectMeta{ | ||||||
|  | 				Name: "pod1", | ||||||
|  | 			}, | ||||||
|  | 			Spec: v1.PodSpec{ | ||||||
|  | 				Containers: []v1.Container{ | ||||||
|  | 					{ | ||||||
|  | 						Name:  "container1", | ||||||
|  | 						Image: busyboxImage, | ||||||
|  | 						VolumeMounts: []v1.VolumeMount{ | ||||||
|  | 							{ | ||||||
|  | 								Name:      "vol1", | ||||||
|  | 								MountPath: "/mnt/vol1", | ||||||
|  | 							}, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 				Volumes: []v1.Volume{ | ||||||
|  | 					{ | ||||||
|  | 						Name: "vol1", | ||||||
|  | 						VolumeSource: v1.VolumeSource{ | ||||||
|  | 							Secret: &v1.SecretVolumeSource{ | ||||||
|  | 								SecretName: "non-existent-" + string(uuid.NewUUID()), | ||||||
|  | 							}, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		} | ||||||
|  | 		client := e2epod.NewPodClient(f) | ||||||
|  | 		pod = client.Create(context.TODO(), pod) | ||||||
|  | 		gomega.Expect(pod.Spec.NodeName).ToNot(gomega.BeEmpty()) | ||||||
|  |  | ||||||
|  | 		gomega.Eventually(ctx, func() bool { | ||||||
|  | 			pod, _ = client.Get(context.TODO(), pod.Name, metav1.GetOptions{}) | ||||||
|  | 			for _, c := range pod.Status.Conditions { | ||||||
|  | 				if c.Type == v1.ContainersReady && c.Status == v1.ConditionFalse { | ||||||
|  | 					return true | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			return false | ||||||
|  | 		}, 20*time.Second, 1*time.Second).Should(gomega.BeTrue()) | ||||||
|  |  | ||||||
|  | 		err := client.Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) | ||||||
|  |  | ||||||
|  | 		// Wait for the pod to disappear from the API server up to 10 seconds, this shouldn't hang for minutes due to | ||||||
|  | 		// non-existent secret being mounted. | ||||||
|  | 		gomega.Eventually(ctx, func() bool { | ||||||
|  | 			_, err := client.Get(context.TODO(), pod.Name, metav1.GetOptions{}) | ||||||
|  | 			return apierrors.IsNotFound(err) | ||||||
|  | 		}, 10*time.Second, time.Second).Should(gomega.BeTrue()) | ||||||
|  |  | ||||||
|  | 		framework.ExpectNoError(err) | ||||||
|  | 	}) | ||||||
|  | }) | ||||||
		Reference in New Issue
	
	Block a user