Merge pull request #124617 from bart0sh/PR144-e2e_node-DRA-test-plugin-failures
e2e_node: DRA: test plugin failures
This commit is contained in:
		@@ -33,22 +33,42 @@ var BeRegistered = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error
 | 
				
			|||||||
	return false, nil
 | 
						return false, nil
 | 
				
			||||||
}).WithMessage("contain successful NotifyRegistrationStatus call")
 | 
					}).WithMessage("contain successful NotifyRegistrationStatus call")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NodePrepareResouceCalled checks that NodePrepareResource API has been called
 | 
					// NodePrepareResoucesSucceeded checks that NodePrepareResources API has been called and succeeded
 | 
				
			||||||
var NodePrepareResourceCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
 | 
					var NodePrepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
 | 
				
			||||||
	for _, call := range actualCalls {
 | 
						for _, call := range actualCalls {
 | 
				
			||||||
		if strings.HasSuffix(call.FullMethod, "/NodePrepareResource") && call.Err == nil {
 | 
							if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Response != nil && call.Err == nil {
 | 
				
			||||||
			return true, nil
 | 
								return true, nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return false, nil
 | 
						return false, nil
 | 
				
			||||||
}).WithMessage("contain NodePrepareResource call")
 | 
					}).WithMessage("contain successful NodePrepareResources call")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NodePrepareResoucesCalled checks that NodePrepareResources API has been called
 | 
					// NodePrepareResoucesFailed checks that NodePrepareResources API has been called and returned an error
 | 
				
			||||||
var NodePrepareResourcesCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
 | 
					var NodePrepareResourcesFailed = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
 | 
				
			||||||
	for _, call := range actualCalls {
 | 
						for _, call := range actualCalls {
 | 
				
			||||||
		if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err == nil {
 | 
							if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err != nil {
 | 
				
			||||||
			return true, nil
 | 
								return true, nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return false, nil
 | 
						return false, nil
 | 
				
			||||||
}).WithMessage("contain NodePrepareResources call")
 | 
					}).WithMessage("contain unsuccessful NodePrepareResources call")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NodeUnprepareResoucesSucceeded checks that NodeUnprepareResources API has been called and succeeded
 | 
				
			||||||
 | 
					var NodeUnprepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
 | 
				
			||||||
 | 
						for _, call := range actualCalls {
 | 
				
			||||||
 | 
							if strings.HasSuffix(call.FullMethod, "/NodeUnprepareResources") && call.Response != nil && call.Err == nil {
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return false, nil
 | 
				
			||||||
 | 
					}).WithMessage("contain successful NodeUnprepareResources call")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NodeUnprepareResoucesFailed checks that NodeUnprepareResources API has been called and returned an error
 | 
				
			||||||
 | 
					var NodeUnprepareResourcesFailed = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
 | 
				
			||||||
 | 
						for _, call := range actualCalls {
 | 
				
			||||||
 | 
							if strings.HasSuffix(call.FullMethod, "/NodeUnprepareResources") && call.Err != nil {
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return false, nil
 | 
				
			||||||
 | 
					}).WithMessage("contain unsuccessful NodeUnprepareResources call")
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,6 +23,7 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"path/filepath"
 | 
						"path/filepath"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/google/go-cmp/cmp"
 | 
						"github.com/google/go-cmp/cmp"
 | 
				
			||||||
@@ -54,7 +55,14 @@ type ExamplePlugin struct {
 | 
				
			|||||||
	prepared       map[ClaimID]any
 | 
						prepared       map[ClaimID]any
 | 
				
			||||||
	gRPCCalls      []GRPCCall
 | 
						gRPCCalls      []GRPCCall
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	block bool
 | 
						blockPrepareResourcesMutex   sync.Mutex
 | 
				
			||||||
 | 
						blockUnprepareResourcesMutex sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						prepareResourcesFailure   error
 | 
				
			||||||
 | 
						failPrepareResourcesMutex sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						unprepareResourcesFailure   error
 | 
				
			||||||
 | 
						failUnprepareResourcesMutex sync.Mutex
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type GRPCCall struct {
 | 
					type GRPCCall struct {
 | 
				
			||||||
@@ -162,10 +170,60 @@ func (ex *ExamplePlugin) IsRegistered() bool {
 | 
				
			|||||||
	return status.PluginRegistered
 | 
						return status.PluginRegistered
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Block sets a flag to block Node[Un]PrepareResources
 | 
					// BlockNodePrepareResources locks blockPrepareResourcesMutex and returns unlocking function for it
 | 
				
			||||||
// to emulate time consuming or stuck calls
 | 
					func (ex *ExamplePlugin) BlockNodePrepareResources() func() {
 | 
				
			||||||
func (ex *ExamplePlugin) Block() {
 | 
						ex.blockPrepareResourcesMutex.Lock()
 | 
				
			||||||
	ex.block = true
 | 
						return func() {
 | 
				
			||||||
 | 
							ex.blockPrepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// BlockNodeUnprepareResources locks blockUnprepareResourcesMutex and returns unlocking function for it
 | 
				
			||||||
 | 
					func (ex *ExamplePlugin) BlockNodeUnprepareResources() func() {
 | 
				
			||||||
 | 
						ex.blockUnprepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
						return func() {
 | 
				
			||||||
 | 
							ex.blockUnprepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SetNodePrepareResourcesFailureMode sets the failure mode for NodePrepareResources call
 | 
				
			||||||
 | 
					// and returns a function to unset the failure mode
 | 
				
			||||||
 | 
					func (ex *ExamplePlugin) SetNodePrepareResourcesFailureMode() func() {
 | 
				
			||||||
 | 
						ex.failPrepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
						ex.prepareResourcesFailure = errors.New("simulated PrepareResources failure")
 | 
				
			||||||
 | 
						ex.failPrepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return func() {
 | 
				
			||||||
 | 
							ex.failPrepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
							ex.prepareResourcesFailure = nil
 | 
				
			||||||
 | 
							ex.failPrepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ex *ExamplePlugin) getPrepareResourcesFailure() error {
 | 
				
			||||||
 | 
						ex.failPrepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
						defer ex.failPrepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
						return ex.prepareResourcesFailure
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SetNodeUnprepareResourcesFailureMode sets the failure mode for NodeUnprepareResources call
 | 
				
			||||||
 | 
					// and returns a function to unset the failure mode
 | 
				
			||||||
 | 
					func (ex *ExamplePlugin) SetNodeUnprepareResourcesFailureMode() func() {
 | 
				
			||||||
 | 
						ex.failUnprepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
						ex.unprepareResourcesFailure = errors.New("simulated UnprepareResources failure")
 | 
				
			||||||
 | 
						ex.failUnprepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return func() {
 | 
				
			||||||
 | 
							ex.failUnprepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
							ex.unprepareResourcesFailure = nil
 | 
				
			||||||
 | 
							ex.failUnprepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ex *ExamplePlugin) getUnprepareResourcesFailure() error {
 | 
				
			||||||
 | 
						ex.failUnprepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
						defer ex.failUnprepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
						return ex.unprepareResourcesFailure
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NodePrepareResource ensures that the CDI file for the claim exists. It uses
 | 
					// NodePrepareResource ensures that the CDI file for the claim exists. It uses
 | 
				
			||||||
@@ -175,15 +233,10 @@ func (ex *ExamplePlugin) Block() {
 | 
				
			|||||||
func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) ([]string, error) {
 | 
					func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) ([]string, error) {
 | 
				
			||||||
	logger := klog.FromContext(ctx)
 | 
						logger := klog.FromContext(ctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Block to emulate plugin stuckness or slowness.
 | 
					 | 
				
			||||||
	// By default the call will not be blocked as ex.block = false.
 | 
					 | 
				
			||||||
	if ex.block {
 | 
					 | 
				
			||||||
		<-ctx.Done()
 | 
					 | 
				
			||||||
		return nil, ctx.Err()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	ex.mutex.Lock()
 | 
						ex.mutex.Lock()
 | 
				
			||||||
	defer ex.mutex.Unlock()
 | 
						defer ex.mutex.Unlock()
 | 
				
			||||||
 | 
						ex.blockPrepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
						defer ex.blockPrepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	deviceName := "claim-" + claimUID
 | 
						deviceName := "claim-" + claimUID
 | 
				
			||||||
	vendor := ex.driverName
 | 
						vendor := ex.driverName
 | 
				
			||||||
@@ -309,6 +362,11 @@ func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1a
 | 
				
			|||||||
	resp := &drapbv1alpha3.NodePrepareResourcesResponse{
 | 
						resp := &drapbv1alpha3.NodePrepareResourcesResponse{
 | 
				
			||||||
		Claims: make(map[string]*drapbv1alpha3.NodePrepareResourceResponse),
 | 
							Claims: make(map[string]*drapbv1alpha3.NodePrepareResourceResponse),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if failure := ex.getPrepareResourcesFailure(); failure != nil {
 | 
				
			||||||
 | 
							return resp, failure
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, claimReq := range req.Claims {
 | 
						for _, claimReq := range req.Claims {
 | 
				
			||||||
		cdiDevices, err := ex.nodePrepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
 | 
							cdiDevices, err := ex.nodePrepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
@@ -328,14 +386,10 @@ func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1a
 | 
				
			|||||||
// NodePrepareResource. It's idempotent, therefore it is not an error when that
 | 
					// NodePrepareResource. It's idempotent, therefore it is not an error when that
 | 
				
			||||||
// file is already gone.
 | 
					// file is already gone.
 | 
				
			||||||
func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) error {
 | 
					func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) error {
 | 
				
			||||||
	logger := klog.FromContext(ctx)
 | 
						ex.blockUnprepareResourcesMutex.Lock()
 | 
				
			||||||
 | 
						defer ex.blockUnprepareResourcesMutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Block to emulate plugin stuckness or slowness.
 | 
						logger := klog.FromContext(ctx)
 | 
				
			||||||
	// By default the call will not be blocked as ex.block = false.
 | 
					 | 
				
			||||||
	if ex.block {
 | 
					 | 
				
			||||||
		<-ctx.Done()
 | 
					 | 
				
			||||||
		return ctx.Err()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	filePath := ex.getJSONFilePath(claimUID)
 | 
						filePath := ex.getJSONFilePath(claimUID)
 | 
				
			||||||
	if err := ex.fileOps.Remove(filePath); err != nil {
 | 
						if err := ex.fileOps.Remove(filePath); err != nil {
 | 
				
			||||||
@@ -381,6 +435,11 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
 | 
				
			|||||||
	resp := &drapbv1alpha3.NodeUnprepareResourcesResponse{
 | 
						resp := &drapbv1alpha3.NodeUnprepareResourcesResponse{
 | 
				
			||||||
		Claims: make(map[string]*drapbv1alpha3.NodeUnprepareResourceResponse),
 | 
							Claims: make(map[string]*drapbv1alpha3.NodeUnprepareResourceResponse),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if failure := ex.getUnprepareResourcesFailure(); failure != nil {
 | 
				
			||||||
 | 
							return resp, failure
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, claimReq := range req.Claims {
 | 
						for _, claimReq := range req.Claims {
 | 
				
			||||||
		err := ex.nodeUnprepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
 | 
							err := ex.nodeUnprepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
@@ -487,3 +546,14 @@ func (ex *ExamplePlugin) GetGRPCCalls() []GRPCCall {
 | 
				
			|||||||
	calls = append(calls, ex.gRPCCalls...)
 | 
						calls = append(calls, ex.gRPCCalls...)
 | 
				
			||||||
	return calls
 | 
						return calls
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CountCalls counts GRPC calls with the given method suffix.
 | 
				
			||||||
 | 
					func (ex *ExamplePlugin) CountCalls(methodSuffix string) int {
 | 
				
			||||||
 | 
						count := 0
 | 
				
			||||||
 | 
						for _, call := range ex.GetGRPCCalls() {
 | 
				
			||||||
 | 
							if strings.HasSuffix(call.FullMethod, methodSuffix) {
 | 
				
			||||||
 | 
								count += 1
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return count
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -26,6 +26,7 @@ package e2enode
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"path"
 | 
						"path"
 | 
				
			||||||
	"path/filepath"
 | 
						"path/filepath"
 | 
				
			||||||
@@ -39,7 +40,7 @@ import (
 | 
				
			|||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/client-go/kubernetes"
 | 
						"k8s.io/client-go/kubernetes"
 | 
				
			||||||
	"k8s.io/klog/v2"
 | 
						"k8s.io/klog/v2"
 | 
				
			||||||
	dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
 | 
						draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
 | 
				
			||||||
	admissionapi "k8s.io/pod-security-admission/api"
 | 
						admissionapi "k8s.io/pod-security-admission/api"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/test/e2e/feature"
 | 
						"k8s.io/kubernetes/test/e2e/feature"
 | 
				
			||||||
@@ -52,10 +53,11 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	driverName                = "test-driver.cdi.k8s.io"
 | 
						driverName                = "test-driver.cdi.k8s.io"
 | 
				
			||||||
 | 
						kubeletPlugin1Name        = "test-driver1.cdi.k8s.io"
 | 
				
			||||||
 | 
						kubeletPlugin2Name        = "test-driver2.cdi.k8s.io"
 | 
				
			||||||
	cdiDir                    = "/var/run/cdi"
 | 
						cdiDir                    = "/var/run/cdi"
 | 
				
			||||||
	endpoint                  = "/var/lib/kubelet/plugins/test-driver/dra.sock"
 | 
						endpointTemplate          = "/var/lib/kubelet/plugins/%s/dra.sock"
 | 
				
			||||||
	pluginRegistrationPath    = "/var/lib/kubelet/plugins_registry"
 | 
						pluginRegistrationPath    = "/var/lib/kubelet/plugins_registry"
 | 
				
			||||||
	draAddress                = "/var/lib/kubelet/plugins/test-driver/dra.sock"
 | 
					 | 
				
			||||||
	pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered
 | 
						pluginRegistrationTimeout = time.Second * 60 // how long to wait for a node plugin to be registered
 | 
				
			||||||
	podInPendingStateTimeout  = time.Second * 60 // how long to wait for a pod to stay in pending state
 | 
						podInPendingStateTimeout  = time.Second * 60 // how long to wait for a pod to stay in pending state
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -64,11 +66,11 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
 | 
				
			|||||||
	f := framework.NewDefaultFramework("dra-node")
 | 
						f := framework.NewDefaultFramework("dra-node")
 | 
				
			||||||
	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
 | 
						f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var kubeletPlugin *testdriver.ExamplePlugin
 | 
						var kubeletPlugin, kubeletPlugin1, kubeletPlugin2 *testdriver.ExamplePlugin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	f.Context("Resource Kubelet Plugin", f.WithSerial(), func() {
 | 
						f.Context("Resource Kubelet Plugin", f.WithSerial(), func() {
 | 
				
			||||||
		ginkgo.BeforeEach(func(ctx context.Context) {
 | 
							ginkgo.BeforeEach(func(ctx context.Context) {
 | 
				
			||||||
			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
 | 
								kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f), driverName)
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		ginkgo.It("must register after Kubelet restart", func(ctx context.Context) {
 | 
							ginkgo.It("must register after Kubelet restart", func(ctx context.Context) {
 | 
				
			||||||
@@ -88,7 +90,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
 | 
				
			|||||||
		ginkgo.It("must register after plugin restart", func(ctx context.Context) {
 | 
							ginkgo.It("must register after plugin restart", func(ctx context.Context) {
 | 
				
			||||||
			ginkgo.By("restart Kubelet Plugin")
 | 
								ginkgo.By("restart Kubelet Plugin")
 | 
				
			||||||
			kubeletPlugin.Stop()
 | 
								kubeletPlugin.Stop()
 | 
				
			||||||
			kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f))
 | 
								kubeletPlugin = newKubeletPlugin(ctx, getNodeName(ctx, f), driverName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			ginkgo.By("wait for Kubelet plugin re-registration")
 | 
								ginkgo.By("wait for Kubelet plugin re-registration")
 | 
				
			||||||
			gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
 | 
				
			||||||
@@ -97,7 +99,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
 | 
				
			|||||||
		ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) {
 | 
							ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) {
 | 
				
			||||||
			// Stop Kubelet
 | 
								// Stop Kubelet
 | 
				
			||||||
			startKubelet := stopKubelet()
 | 
								startKubelet := stopKubelet()
 | 
				
			||||||
			pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
 | 
				
			||||||
			// Pod must be in pending state
 | 
								// Pod must be in pending state
 | 
				
			||||||
			err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
								err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
				return pod.Status.Phase == v1.PodPending, nil
 | 
									return pod.Status.Phase == v1.PodPending, nil
 | 
				
			||||||
@@ -111,9 +113,9 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
 | 
				
			|||||||
		})
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		ginkgo.It("must keep pod in pending state if NodePrepareResources times out", func(ctx context.Context) {
 | 
							ginkgo.It("must keep pod in pending state if NodePrepareResources times out", func(ctx context.Context) {
 | 
				
			||||||
			ginkgo.By("set delay for the NodePrepareResources call")
 | 
								unblock := kubeletPlugin.BlockNodePrepareResources()
 | 
				
			||||||
			kubeletPlugin.Block()
 | 
								defer unblock()
 | 
				
			||||||
			pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			ginkgo.By("wait for pod to be in Pending state")
 | 
								ginkgo.By("wait for pod to be in Pending state")
 | 
				
			||||||
			err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
								err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
@@ -121,22 +123,312 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
 | 
				
			|||||||
			})
 | 
								})
 | 
				
			||||||
			framework.ExpectNoError(err)
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			ginkgo.By("wait for NodePrepareResources call")
 | 
					 | 
				
			||||||
			gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesCalled)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// TODO: Check condition or event when implemented
 | 
								// TODO: Check condition or event when implemented
 | 
				
			||||||
			// see https://github.com/kubernetes/kubernetes/issues/118468 for details
 | 
								// see https://github.com/kubernetes/kubernetes/issues/118468 for details
 | 
				
			||||||
			ginkgo.By("check that pod is consistently in Pending state")
 | 
								ginkgo.By("check that pod is consistently in Pending state")
 | 
				
			||||||
			gomega.Consistently(ctx, e2epod.Get(f.ClientSet, pod)).WithTimeout(podInPendingStateTimeout).Should(e2epod.BeInPhase(v1.PodPending),
 | 
								gomega.Consistently(ctx, e2epod.Get(f.ClientSet, pod)).WithTimeout(podInPendingStateTimeout).Should(e2epod.BeInPhase(v1.PodPending),
 | 
				
			||||||
				"Pod should be in Pending state as resource preparation time outed")
 | 
									"Pod should be in Pending state as resource preparation time outed")
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must run pod if NodePrepareResources fails and then succeeds", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin.SetNodePrepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to be in Pending state")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
 | 
									return pod.Status.Phase == v1.PodPending, nil
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must run pod if NodeUnprepareResources fails and then succeeds", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must retry NodePrepareResources after Kubelet restart", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin.SetNodePrepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to be in Pending state")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
 | 
									return pod.Status.Phase == v1.PodPending, nil
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("stop Kubelet")
 | 
				
			||||||
 | 
								startKubelet := stopKubelet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("start Kubelet")
 | 
				
			||||||
 | 
								startKubelet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must retry NodeUnprepareResources after Kubelet restart", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{driverName})
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("stop Kubelet")
 | 
				
			||||||
 | 
								startKubelet := stopKubelet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("start Kubelet")
 | 
				
			||||||
 | 
								startKubelet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must call NodeUnprepareResources for deleted pod", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("delete pod")
 | 
				
			||||||
 | 
								e2epod.DeletePodOrFail(ctx, f.ClientSet, f.Namespace.Name, pod.Name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must call NodeUnprepareResources for deleted pod after Kubelet restart", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin.SetNodeUnprepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("delete pod")
 | 
				
			||||||
 | 
								err := e2epod.DeletePodWithGracePeriod(ctx, f.ClientSet, pod, 0)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("restart Kubelet")
 | 
				
			||||||
 | 
								stopKubelet()()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must not call NodePrepareResources for deleted pod after Kubelet restart", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unblock := kubeletPlugin.BlockNodePrepareResources()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to be in Pending state")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
 | 
									return pod.Status.Phase == v1.PodPending, nil
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("stop Kubelet")
 | 
				
			||||||
 | 
								startKubelet := stopKubelet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("delete pod")
 | 
				
			||||||
 | 
								e2epod.DeletePodOrFail(ctx, f.ClientSet, f.Namespace.Name, pod.Name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unblock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("start Kubelet")
 | 
				
			||||||
 | 
								startKubelet()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								calls := kubeletPlugin.CountCalls("/NodePrepareResources")
 | 
				
			||||||
 | 
								ginkgo.By("make sure NodePrepareResources is not called again")
 | 
				
			||||||
 | 
								gomega.Consistently(kubeletPlugin.CountCalls("/NodePrepareResources")).WithTimeout(draplugin.PluginClientTimeout).Should(gomega.Equal(calls))
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						f.Context("Two resource Kubelet Plugins", f.WithSerial(), func() {
 | 
				
			||||||
 | 
							ginkgo.BeforeEach(func(ctx context.Context) {
 | 
				
			||||||
 | 
								kubeletPlugin1 = newKubeletPlugin(ctx, getNodeName(ctx, f), kubeletPlugin1Name)
 | 
				
			||||||
 | 
								kubeletPlugin2 = newKubeletPlugin(ctx, getNodeName(ctx, f), kubeletPlugin2Name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for Kubelet plugin registration")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin1.GetGRPCCalls()).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls()).WithTimeout(pluginRegistrationTimeout).Should(testdriver.BeRegistered)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must prepare and unprepare resources", func(ctx context.Context) {
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodePrepareResources calls to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin1.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for NodeUnprepareResources calls to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin1.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must run pod if NodePrepareResources fails for one plugin and then succeeds", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin2.SetNodePrepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to be in Pending state")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
 | 
									return pod.Status.Phase == v1.PodPending, nil
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodePrepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must run pod if NodeUnprepareResources fails for one plugin and then succeeds", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unset := kubeletPlugin2.SetNodeUnprepareResourcesFailureMode()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin1 NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin1.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodeUnprepareResources call to fail")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesFailed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unset()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodeUnprepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must run pod if NodePrepareResources is in progress for one plugin when Kubelet restarts", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unblock := kubeletPlugin.BlockNodePrepareResources()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to be in Pending state")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
 | 
									return pod.Status.Phase == v1.PodPending, nil
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("restart Kubelet")
 | 
				
			||||||
 | 
								restartKubelet(true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unblock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("must call NodeUnprepareResources again if it's in progress for one plugin when Kubelet restarts", func(ctx context.Context) {
 | 
				
			||||||
 | 
								unblock := kubeletPlugin2.BlockNodeUnprepareResources()
 | 
				
			||||||
 | 
								pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", true, []string{kubeletPlugin1Name, kubeletPlugin2Name})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin1 NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin1.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodePrepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("restart Kubelet")
 | 
				
			||||||
 | 
								restartKubelet(true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								unblock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for plugin2 NodeUnprepareResources call to succeed")
 | 
				
			||||||
 | 
								gomega.Eventually(kubeletPlugin2.GetGRPCCalls).WithTimeout(draplugin.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for pod to succeed")
 | 
				
			||||||
 | 
								err := e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
})
 | 
					})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Run Kubelet plugin and wait until it's registered
 | 
					// Run Kubelet plugin and wait until it's registered
 | 
				
			||||||
func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExamplePlugin {
 | 
					func newKubeletPlugin(ctx context.Context, nodeName, pluginName string) *testdriver.ExamplePlugin {
 | 
				
			||||||
	ginkgo.By("start Kubelet plugin")
 | 
						ginkgo.By("start Kubelet plugin")
 | 
				
			||||||
	logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", nodeName)
 | 
						logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin "+pluginName), "node", nodeName)
 | 
				
			||||||
	ctx = klog.NewContext(ctx, logger)
 | 
						ctx = klog.NewContext(ctx, logger)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Ensure that directories exist, creating them if necessary. We want
 | 
						// Ensure that directories exist, creating them if necessary. We want
 | 
				
			||||||
@@ -144,18 +436,19 @@ func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExampleP
 | 
				
			|||||||
	// creating those directories.
 | 
						// creating those directories.
 | 
				
			||||||
	err := os.MkdirAll(cdiDir, os.FileMode(0750))
 | 
						err := os.MkdirAll(cdiDir, os.FileMode(0750))
 | 
				
			||||||
	framework.ExpectNoError(err, "create CDI directory")
 | 
						framework.ExpectNoError(err, "create CDI directory")
 | 
				
			||||||
 | 
						endpoint := fmt.Sprintf(endpointTemplate, pluginName)
 | 
				
			||||||
	err = os.MkdirAll(filepath.Dir(endpoint), 0750)
 | 
						err = os.MkdirAll(filepath.Dir(endpoint), 0750)
 | 
				
			||||||
	framework.ExpectNoError(err, "create socket directory")
 | 
						framework.ExpectNoError(err, "create socket directory")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	plugin, err := testdriver.StartPlugin(
 | 
						plugin, err := testdriver.StartPlugin(
 | 
				
			||||||
		ctx,
 | 
							ctx,
 | 
				
			||||||
		cdiDir,
 | 
							cdiDir,
 | 
				
			||||||
		driverName,
 | 
							pluginName,
 | 
				
			||||||
		"",
 | 
							"",
 | 
				
			||||||
		testdriver.FileOperations{},
 | 
							testdriver.FileOperations{},
 | 
				
			||||||
		kubeletplugin.PluginSocketPath(endpoint),
 | 
							kubeletplugin.PluginSocketPath(endpoint),
 | 
				
			||||||
		kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, driverName+"-reg.sock")),
 | 
							kubeletplugin.RegistrarSocketPath(path.Join(pluginRegistrationPath, pluginName+"-reg.sock")),
 | 
				
			||||||
		kubeletplugin.KubeletPluginSocketPath(draAddress),
 | 
							kubeletplugin.KubeletPluginSocketPath(endpoint),
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	framework.ExpectNoError(err)
 | 
						framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -170,13 +463,13 @@ func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExampleP
 | 
				
			|||||||
// NOTE: as scheduler and controller manager are not running by the Node e2e,
 | 
					// NOTE: as scheduler and controller manager are not running by the Node e2e,
 | 
				
			||||||
// the objects must contain all required data to be processed correctly by the API server
 | 
					// the objects must contain all required data to be processed correctly by the API server
 | 
				
			||||||
// and placed on the node without involving the scheduler and the DRA controller
 | 
					// and placed on the node without involving the scheduler and the DRA controller
 | 
				
			||||||
func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string) *v1.Pod {
 | 
					func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string, deferPodDeletion bool, pluginNames []string) *v1.Pod {
 | 
				
			||||||
	// ResourceClass
 | 
						// ResourceClass
 | 
				
			||||||
	class := &resourcev1alpha2.ResourceClass{
 | 
						class := &resourcev1alpha2.ResourceClass{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
			Name: className,
 | 
								Name: className,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		DriverName: driverName,
 | 
							DriverName: "controller",
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	_, err := clientSet.ResourceV1alpha2().ResourceClasses().Create(ctx, class, metav1.CreateOptions{})
 | 
						_, err := clientSet.ResourceV1alpha2().ResourceClasses().Create(ctx, class, metav1.CreateOptions{})
 | 
				
			||||||
	framework.ExpectNoError(err)
 | 
						framework.ExpectNoError(err)
 | 
				
			||||||
@@ -231,22 +524,26 @@ func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, node
 | 
				
			|||||||
	createdPod, err := clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
 | 
						createdPod, err := clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
 | 
				
			||||||
	framework.ExpectNoError(err)
 | 
						framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ginkgo.DeferCleanup(clientSet.CoreV1().Pods(namespace).Delete, podName, metav1.DeleteOptions{})
 | 
						if deferPodDeletion {
 | 
				
			||||||
 | 
							ginkgo.DeferCleanup(clientSet.CoreV1().Pods(namespace).Delete, podName, metav1.DeleteOptions{})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Update claim status: set ReservedFor and AllocationResult
 | 
						// Update claim status: set ReservedFor and AllocationResult
 | 
				
			||||||
	// NOTE: This is usually done by the DRA controller
 | 
						// NOTE: This is usually done by the DRA controller
 | 
				
			||||||
 | 
						resourceHandlers := make([]resourcev1alpha2.ResourceHandle, len(pluginNames))
 | 
				
			||||||
 | 
						for i, pluginName := range pluginNames {
 | 
				
			||||||
 | 
							resourceHandlers[i] = resourcev1alpha2.ResourceHandle{
 | 
				
			||||||
 | 
								DriverName: pluginName,
 | 
				
			||||||
 | 
								Data:       "{\"EnvVars\":{\"DRA_PARAM1\":\"PARAM1_VALUE\"},\"NodeName\":\"\"}",
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	createdClaim.Status = resourcev1alpha2.ResourceClaimStatus{
 | 
						createdClaim.Status = resourcev1alpha2.ResourceClaimStatus{
 | 
				
			||||||
		DriverName: driverName,
 | 
							DriverName: "controller",
 | 
				
			||||||
		ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{
 | 
							ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{
 | 
				
			||||||
			{Resource: "pods", Name: podName, UID: createdPod.UID},
 | 
								{Resource: "pods", Name: podName, UID: createdPod.UID},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		Allocation: &resourcev1alpha2.AllocationResult{
 | 
							Allocation: &resourcev1alpha2.AllocationResult{
 | 
				
			||||||
			ResourceHandles: []resourcev1alpha2.ResourceHandle{
 | 
								ResourceHandles: resourceHandlers,
 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					DriverName: driverName,
 | 
					 | 
				
			||||||
					Data:       "{\"EnvVars\":{\"DRA_PARAM1\":\"PARAM1_VALUE\"},\"NodeName\":\"\"}",
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	_, err = clientSet.ResourceV1alpha2().ResourceClaims(namespace).UpdateStatus(ctx, createdClaim, metav1.UpdateOptions{})
 | 
						_, err = clientSet.ResourceV1alpha2().ResourceClaims(namespace).UpdateStatus(ctx, createdClaim, metav1.UpdateOptions{})
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user