merge test integration scheduler util
This commit is contained in:
@@ -26,27 +26,41 @@ import (
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"k8s.io/client-go/scale"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
pvutil "k8s.io/component-helpers/storage/volume"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kube-scheduler/config/v1beta3"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/disruption"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
|
||||
"k8s.io/kubernetes/pkg/scheduler/profile"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
taintutils "k8s.io/kubernetes/pkg/util/taints"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
// ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module
|
||||
@@ -456,3 +470,483 @@ func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// InitDisruptionController initializes and runs a Disruption Controller to properly
|
||||
// update PodDisuptionBudget objects.
|
||||
func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController {
|
||||
informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
|
||||
|
||||
discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
|
||||
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
|
||||
|
||||
config := restclient.Config{Host: testCtx.HTTPServer.URL}
|
||||
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery())
|
||||
scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
|
||||
if err != nil {
|
||||
t.Fatalf("Error in create scaleClient: %v", err)
|
||||
}
|
||||
|
||||
dc := disruption.NewDisruptionController(
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Policy().V1().PodDisruptionBudgets(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
informers.Apps().V1().ReplicaSets(),
|
||||
informers.Apps().V1().Deployments(),
|
||||
informers.Apps().V1().StatefulSets(),
|
||||
testCtx.ClientSet,
|
||||
mapper,
|
||||
scaleClient,
|
||||
testCtx.ClientSet.Discovery())
|
||||
|
||||
informers.Start(testCtx.Scheduler.StopEverything)
|
||||
informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
|
||||
go dc.Run(testCtx.Ctx)
|
||||
return dc
|
||||
}
|
||||
|
||||
// InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default
|
||||
// configuration.
|
||||
func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext {
|
||||
testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), opts...)
|
||||
SyncInformerFactory(testCtx)
|
||||
go testCtx.Scheduler.Run(testCtx.Ctx)
|
||||
return testCtx
|
||||
}
|
||||
|
||||
// InitTestDisablePreemption initializes a test environment and creates API server and scheduler with default
|
||||
// configuration but with pod preemption disabled.
|
||||
func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
|
||||
cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{
|
||||
Profiles: []v1beta3.KubeSchedulerProfile{{
|
||||
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
|
||||
Plugins: &v1beta3.Plugins{
|
||||
PostFilter: v1beta3.PluginSet{
|
||||
Disabled: []v1beta3.Plugin{
|
||||
{Name: defaultpreemption.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
})
|
||||
testCtx := InitTestSchedulerWithOptions(
|
||||
t, InitTestAPIServer(t, nsPrefix, nil),
|
||||
scheduler.WithProfiles(cfg.Profiles...))
|
||||
SyncInformerFactory(testCtx)
|
||||
go testCtx.Scheduler.Run(testCtx.Ctx)
|
||||
return testCtx
|
||||
}
|
||||
|
||||
// WaitForReflection waits till the passFunc confirms that the object it expects
|
||||
// to see is in the store. Used to observe reflected events.
|
||||
func WaitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
|
||||
passFunc func(n interface{}) bool) error {
|
||||
var nodes []*v1.Node
|
||||
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
n, err := nodeLister.Get(key)
|
||||
|
||||
switch {
|
||||
case err == nil && passFunc(n):
|
||||
return true, nil
|
||||
case apierrors.IsNotFound(err):
|
||||
nodes = append(nodes, nil)
|
||||
case err != nil:
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
default:
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Logf("Logging consecutive node versions received from store:")
|
||||
for i, n := range nodes {
|
||||
t.Logf("%d: %#v", i, n)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func UpdateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
|
||||
return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
|
||||
}
|
||||
|
||||
func CreateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
|
||||
return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
|
||||
nodes := make([]*v1.Node, numNodes)
|
||||
for i := 0; i < numNodes; i++ {
|
||||
nodeName := fmt.Sprintf("%v-%d", prefix, i)
|
||||
node, err := CreateNode(cs, wrapper.Name(nodeName).Obj())
|
||||
if err != nil {
|
||||
return nodes[:], err
|
||||
}
|
||||
nodes[i] = node
|
||||
}
|
||||
return nodes[:], nil
|
||||
}
|
||||
|
||||
// CreateAndWaitForNodesInCache calls createNodes(), and wait for the created
|
||||
// nodes to be present in scheduler cache.
|
||||
func CreateAndWaitForNodesInCache(testCtx *TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
|
||||
existingNodes := testCtx.Scheduler.Cache.NodeCount()
|
||||
nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
|
||||
if err != nil {
|
||||
return nodes, fmt.Errorf("cannot create nodes: %v", err)
|
||||
}
|
||||
return nodes, WaitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes)
|
||||
}
|
||||
|
||||
// WaitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache
|
||||
// within 30 seconds; otherwise returns false.
|
||||
func WaitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
|
||||
err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
return sched.Cache.NodeCount() >= nodeCount, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type PausePodConfig struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Affinity *v1.Affinity
|
||||
Annotations, Labels, NodeSelector map[string]string
|
||||
Resources *v1.ResourceRequirements
|
||||
Tolerations []v1.Toleration
|
||||
NodeName string
|
||||
SchedulerName string
|
||||
Priority *int32
|
||||
PreemptionPolicy *v1.PreemptionPolicy
|
||||
PriorityClassName string
|
||||
}
|
||||
|
||||
// InitPausePod initializes a pod API object from the given config. It is used
|
||||
// mainly in pod creation process.
|
||||
func InitPausePod(conf *PausePodConfig) *v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: conf.Name,
|
||||
Namespace: conf.Namespace,
|
||||
Labels: conf.Labels,
|
||||
Annotations: conf.Annotations,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeSelector: conf.NodeSelector,
|
||||
Affinity: conf.Affinity,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: conf.Name,
|
||||
Image: imageutils.GetPauseImageName(),
|
||||
},
|
||||
},
|
||||
Tolerations: conf.Tolerations,
|
||||
NodeName: conf.NodeName,
|
||||
SchedulerName: conf.SchedulerName,
|
||||
Priority: conf.Priority,
|
||||
PreemptionPolicy: conf.PreemptionPolicy,
|
||||
PriorityClassName: conf.PriorityClassName,
|
||||
},
|
||||
}
|
||||
if conf.Resources != nil {
|
||||
pod.Spec.Containers[0].Resources = *conf.Resources
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
// CreatePausePod creates a pod with "Pause" image and the given config and
|
||||
// return its pointer and error status.
|
||||
func CreatePausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
|
||||
return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
// CreatePausePodWithResource creates a pod with "Pause" image and the given
|
||||
// resources and returns its pointer and error status. The resource list can be
|
||||
// nil.
|
||||
func CreatePausePodWithResource(cs clientset.Interface, podName string,
|
||||
nsName string, res *v1.ResourceList) (*v1.Pod, error) {
|
||||
var conf PausePodConfig
|
||||
if res == nil {
|
||||
conf = PausePodConfig{
|
||||
Name: podName,
|
||||
Namespace: nsName,
|
||||
}
|
||||
} else {
|
||||
conf = PausePodConfig{
|
||||
Name: podName,
|
||||
Namespace: nsName,
|
||||
Resources: &v1.ResourceRequirements{
|
||||
Requests: *res,
|
||||
},
|
||||
}
|
||||
}
|
||||
return CreatePausePod(cs, InitPausePod(&conf))
|
||||
}
|
||||
|
||||
// RunPausePod creates a pod with "Pause" image and the given config and waits
|
||||
// until it is scheduled. It returns its pointer and error status.
|
||||
func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
|
||||
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pause pod: %v", err)
|
||||
}
|
||||
if err = WaitForPodToSchedule(cs, pod); err != nil {
|
||||
return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
|
||||
}
|
||||
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
|
||||
return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err)
|
||||
}
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
type PodWithContainersConfig struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Containers []v1.Container
|
||||
}
|
||||
|
||||
// InitPodWithContainers initializes a pod API object from the given config. This is used primarily for generating
|
||||
// pods with containers each having a specific image.
|
||||
func InitPodWithContainers(cs clientset.Interface, conf *PodWithContainersConfig) *v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: conf.Name,
|
||||
Namespace: conf.Namespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: conf.Containers,
|
||||
},
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
// RunPodWithContainers creates a pod with given config and containers and waits
|
||||
// until it is scheduled. It returns its pointer and error status.
|
||||
func RunPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
|
||||
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pod-with-containers: %v", err)
|
||||
}
|
||||
if err = WaitForPodToSchedule(cs, pod); err != nil {
|
||||
return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
|
||||
}
|
||||
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
|
||||
return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err)
|
||||
}
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
// PodIsGettingEvicted returns true if the pod's deletion timestamp is set.
|
||||
func PodIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if pod.DeletionTimestamp != nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// PodScheduledIn returns true if a given pod is placed onto one of the expected nodes.
|
||||
func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
// This could be a connection error so we want to retry.
|
||||
return false, nil
|
||||
}
|
||||
if pod.Spec.NodeName == "" {
|
||||
return false, nil
|
||||
}
|
||||
for _, nodeName := range nodeNames {
|
||||
if pod.Spec.NodeName == nodeName {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// PodUnschedulable returns a condition function that returns true if the given pod
|
||||
// gets unschedulable status.
|
||||
func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
// This could be a connection error so we want to retry.
|
||||
return false, nil
|
||||
}
|
||||
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
|
||||
return cond != nil && cond.Status == v1.ConditionFalse &&
|
||||
cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil
|
||||
}
|
||||
}
|
||||
|
||||
// PodSchedulingError returns a condition function that returns true if the given pod
|
||||
// gets unschedulable status for reasons other than "Unschedulable". The scheduler
|
||||
// records such reasons in case of error.
|
||||
func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
// This could be a connection error so we want to retry.
|
||||
return false, nil
|
||||
}
|
||||
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
|
||||
return cond != nil && cond.Status == v1.ConditionFalse &&
|
||||
cond.Reason != v1.PodReasonUnschedulable, nil
|
||||
}
|
||||
}
|
||||
|
||||
// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
|
||||
// an error if it does not become unschedulable within the given timeout.
|
||||
func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
|
||||
return wait.Poll(100*time.Millisecond, timeout, PodUnschedulable(cs, pod.Namespace, pod.Name))
|
||||
}
|
||||
|
||||
// waitForPodUnschedule waits for a pod to fail scheduling and returns
|
||||
// an error if it does not become unschedulable within the timeout duration (30 seconds).
|
||||
func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
|
||||
return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
|
||||
}
|
||||
|
||||
// WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
|
||||
// the expected values.
|
||||
func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
|
||||
return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
|
||||
pdbList, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pdbList.Items) != len(pdbs) {
|
||||
return false, nil
|
||||
}
|
||||
for i, pdb := range pdbs {
|
||||
found := false
|
||||
for _, cpdb := range pdbList.Items {
|
||||
if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
|
||||
found = true
|
||||
if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitCachedPodsStable waits until scheduler cache has the given pods.
|
||||
func WaitCachedPodsStable(testCtx *TestContext, pods []*v1.Pod) error {
|
||||
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
|
||||
cachedPods, err := testCtx.Scheduler.Cache.PodCount()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pods) != cachedPods {
|
||||
return false, nil
|
||||
}
|
||||
for _, p := range pods {
|
||||
actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
|
||||
if err1 != nil {
|
||||
return false, err1
|
||||
}
|
||||
cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod)
|
||||
if err2 != nil || cachedPod == nil {
|
||||
return false, err2
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// DeletePod deletes the given pod in the given namespace.
|
||||
func DeletePod(cs clientset.Interface, podName string, nsName string) error {
|
||||
return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0))
|
||||
}
|
||||
|
||||
func GetPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
|
||||
return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
// podScheduled returns true if a node is assigned to the given pod.
|
||||
func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
// This could be a connection error so we want to retry.
|
||||
return false, nil
|
||||
}
|
||||
return pod.Spec.NodeName != "", nil
|
||||
}
|
||||
}
|
||||
|
||||
func CreateNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error {
|
||||
for _, n := range namespaces {
|
||||
ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}}
|
||||
if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// timeout returns a timeout error if the given `f` function doesn't
|
||||
// complete within `d` duration; otherwise it returns nil.
|
||||
func timeout(ctx context.Context, d time.Duration, f func()) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, d)
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
f()
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// NextPodOrDie returns the next Pod in the scheduler queue.
|
||||
// The operation needs to be completed within 5 seconds; otherwise the test gets aborted.
|
||||
func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
|
||||
t.Helper()
|
||||
|
||||
var podInfo *schedulerframework.QueuedPodInfo
|
||||
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
|
||||
// default go testing timeout (10m) to abort.
|
||||
if err := timeout(testCtx.Ctx, time.Second*5, func() {
|
||||
podInfo = testCtx.Scheduler.NextPod()
|
||||
}); err != nil {
|
||||
t.Fatalf("Timed out waiting for the Pod to be popped: %v", err)
|
||||
}
|
||||
return podInfo
|
||||
}
|
||||
|
||||
// NextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout.
|
||||
func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
|
||||
t.Helper()
|
||||
|
||||
var podInfo *schedulerframework.QueuedPodInfo
|
||||
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
|
||||
// default go testing timeout (10m) to abort.
|
||||
if err := timeout(testCtx.Ctx, time.Second*5, func() {
|
||||
podInfo = testCtx.Scheduler.NextPod()
|
||||
}); err != nil {
|
||||
return nil
|
||||
}
|
||||
return podInfo
|
||||
}
|
||||
|
Reference in New Issue
Block a user