e2e_node: change DRA test APIs to work with multiple plugins
This commit is contained in:
		| @@ -26,6 +26,7 @@ package e2enode | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 	"path/filepath" | ||||
| @@ -53,9 +54,8 @@ import ( | ||||
| const ( | ||||
| 	driverName                = "test-driver.cdi.k8s.io" | ||||
| 	cdiDir                    = "/var/run/cdi" | ||||
| 	endpoint                  = "/var/lib/kubelet/plugins/test-driver/dra.sock" | ||||
| 	endpointTemplate          = "/var/lib/kubelet/plugins/%s/dra.sock" | ||||
| 	pluginRegistrationPath    = "/var/lib/kubelet/plugins_registry" | ||||
| 	draAddress                = "/var/lib/kubelet/plugins/test-driver/dra.sock" | ||||
| 	pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered | ||||
| 	podInPendingStateTimeout  = time.Second * 60 // how long to wait for a pod to stay in pending state | ||||
| ) | ||||
| @@ -68,7 +68,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, | ||||
|  | ||||
| 	f.Context("Resource Kubelet Plugin", f.WithSerial(), func() { | ||||
| 		ginkgo.BeforeEach(func(ctx context.Context) { | ||||
| 			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f)) | ||||
| 			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f), driverName) | ||||
| 		}) | ||||
|  | ||||
| 		ginkgo.It("must register after Kubelet restart", func(ctx context.Context) { | ||||
| @@ -88,7 +88,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, | ||||
| 		ginkgo.It("must register after plugin restart", func(ctx context.Context) { | ||||
| 			ginkgo.By("restart Kubelet Plugin") | ||||
| 			kubeletPlugin.Stop() | ||||
| 			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f)) | ||||
| 			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f), driverName) | ||||
|  | ||||
| 			ginkgo.By("wait for Kubelet plugin re-registration") | ||||
| 			gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered) | ||||
| @@ -97,7 +97,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, | ||||
| 		ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) { | ||||
| 			// Stop Kubelet | ||||
| 			startKubelet := stopKubelet() | ||||
| 			pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true) | ||||
| 			pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName}) | ||||
| 			// Pod must be in pending state | ||||
| 			err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) { | ||||
| 				return pod.Status.Phase == v1.PodPending, nil | ||||
| @@ -305,9 +305,9 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, | ||||
| }) | ||||
|  | ||||
| // Run Kubelet plugin and wait until it's registered | ||||
| func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExamplePlugin { | ||||
| func newKubeletPlugin(ctx context.Context, nodeName, pluginName string) *testdriver.ExamplePlugin { | ||||
| 	ginkgo.By("start Kubelet plugin") | ||||
| 	logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", nodeName) | ||||
| 	logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin "+pluginName), "node", nodeName) | ||||
| 	ctx = klog.NewContext(ctx, logger) | ||||
|  | ||||
| 	// Ensure that directories exist, creating them if necessary. We want | ||||
| @@ -315,18 +315,19 @@ func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExampleP | ||||
| 	// creating those directories. | ||||
| 	err := os.MkdirAll(cdiDir, os.FileMode(0750)) | ||||
| 	framework.ExpectNoError(err, "create CDI directory") | ||||
| 	endpoint := fmt.Sprintf(endpointTemplate, pluginName) | ||||
| 	err = os.MkdirAll(filepath.Dir(endpoint), 0750) | ||||
| 	framework.ExpectNoError(err, "create socket directory") | ||||
|  | ||||
| 	plugin, err := testdriver.StartPlugin( | ||||
| 		ctx, | ||||
| 		cdiDir, | ||||
| 		driverName, | ||||
| 		pluginName, | ||||
| 		"", | ||||
| 		testdriver.FileOperations{}, | ||||
| 		kubeletplugin.PluginSocketPath(endpoint), | ||||
| 		kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, driverName+"-reg.sock")), | ||||
| 		kubeletplugin.KubeletPluginSocketPath(draAddress), | ||||
| 		kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, pluginName+"-reg.sock")), | ||||
| 		kubeletplugin.KubeletPluginSocketPath(endpoint), | ||||
| 	) | ||||
| 	framework.ExpectNoError(err) | ||||
|  | ||||
| @@ -341,13 +342,13 @@ func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExampleP | ||||
| // NOTE: as scheduler and controller manager are not running by the Node e2e, | ||||
| // the objects must contain all required data to be processed correctly by the API server | ||||
| // and placed on the node without involving the scheduler and the DRA controller | ||||
| func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string, deferPodDeletion bool) *v1.Pod { | ||||
| func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string, deferPodDeletion bool, pluginNames []string) *v1.Pod { | ||||
| 	// ResourceClass | ||||
| 	class := &resourcev1alpha2.ResourceClass{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: className, | ||||
| 		}, | ||||
| 		DriverName: driverName, | ||||
| 		DriverName: "controller", | ||||
| 	} | ||||
| 	_, err := clientSet.ResourceV1alpha2().ResourceClasses().Create(ctx, class, metav1.CreateOptions{}) | ||||
| 	framework.ExpectNoError(err) | ||||
| @@ -408,18 +409,20 @@ func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, node | ||||
|  | ||||
| 	// Update claim status: set ReservedFor and AllocationResult | ||||
| 	// NOTE: This is usually done by the DRA controller | ||||
| 	resourceHandlers := make([]resourcev1alpha2.ResourceHandle, len(pluginNames)) | ||||
| 	for i, pluginName := range pluginNames { | ||||
| 		resourceHandlers[i] = resourcev1alpha2.ResourceHandle{ | ||||
| 			DriverName: pluginName, | ||||
| 			Data:       "{\"EnvVars\":{\"DRA_PARAM1\":\"PARAM1_VALUE\"},\"NodeName\":\"\"}", | ||||
| 		} | ||||
| 	} | ||||
| 	createdClaim.Status = resourcev1alpha2.ResourceClaimStatus{ | ||||
| 		DriverName: driverName, | ||||
| 		DriverName: "controller", | ||||
| 		ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{ | ||||
| 			{Resource: "pods", Name: podName, UID: createdPod.UID}, | ||||
| 		}, | ||||
| 		Allocation: &resourcev1alpha2.AllocationResult{ | ||||
| 			ResourceHandles: []resourcev1alpha2.ResourceHandle{ | ||||
| 				{ | ||||
| 					DriverName: driverName, | ||||
| 					Data:       "{\"EnvVars\":{\"DRA_PARAM1\":\"PARAM1_VALUE\"},\"NodeName\":\"\"}", | ||||
| 				}, | ||||
| 			}, | ||||
| 			ResourceHandles: resourceHandlers, | ||||
| 		}, | ||||
| 	} | ||||
| 	_, err = clientSet.ResourceV1alpha2().ResourceClaims(namespace).UpdateStatus(ctx, createdClaim, metav1.UpdateOptions{}) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Ed Bartosh
					Ed Bartosh