Merge pull request #99737 from alculquicondor/indexed-job-e2e
Integration and e2e tests for Indexed job
This commit is contained in:
		@@ -19,12 +19,14 @@ package apps
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	batchv1 "k8s.io/api/batch/v1"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	batchinternal "k8s.io/kubernetes/pkg/apis/batch"
 | 
			
		||||
@@ -67,6 +69,37 @@ var _ = SIGDescribe("Job", func() {
 | 
			
		||||
		framework.ExpectEqual(successes, completions, "epected %d successful job pods, but got  %d", completions, successes)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
		Testcase: Ensure Pods of an Indexed Job get a unique index.
 | 
			
		||||
		Description: Create an Indexed Job, wait for completion, capture the output of the pods and verify that they contain the completion index.
 | 
			
		||||
	*/
 | 
			
		||||
	ginkgo.It("[Feature:IndexedJob] should create pods for an Indexed job with completion indexes", func() {
 | 
			
		||||
		ginkgo.By("Creating Indexed job")
 | 
			
		||||
		job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
 | 
			
		||||
		job.Spec.CompletionMode = batchv1.IndexedCompletion
 | 
			
		||||
		job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
 | 
			
		||||
		framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name)
 | 
			
		||||
 | 
			
		||||
		ginkgo.By("Ensuring job reaches completions")
 | 
			
		||||
		err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
 | 
			
		||||
		framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
 | 
			
		||||
 | 
			
		||||
		ginkgo.By("Ensuring pods with index for job exist")
 | 
			
		||||
		pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
 | 
			
		||||
		framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
 | 
			
		||||
		succeededIndexes := sets.NewInt()
 | 
			
		||||
		for _, pod := range pods.Items {
 | 
			
		||||
			if pod.Status.Phase == v1.PodSucceeded && pod.Annotations != nil {
 | 
			
		||||
				ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotationAlpha])
 | 
			
		||||
				framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name)
 | 
			
		||||
				succeededIndexes.Insert(ix)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		gotIndexes := succeededIndexes.List()
 | 
			
		||||
		wantIndexes := []int{0, 1, 2, 3}
 | 
			
		||||
		framework.ExpectEqual(gotIndexes, wantIndexes, "expected completed indexes %s, but got %s", gotIndexes, wantIndexes)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
		Testcase: Ensure that the pods associated with the job are removed once the job is deleted
 | 
			
		||||
		Description: Create a job and ensure the associated pod count is equal to paralellism count. Delete the
 | 
			
		||||
 
 | 
			
		||||
@@ -18,19 +18,28 @@ package job
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/go-cmp/cmp"
 | 
			
		||||
	batchv1 "k8s.io/api/batch/v1"
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	eventsv1 "k8s.io/api/events/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	"k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	restclient "k8s.io/client-go/rest"
 | 
			
		||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/job"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
			
		||||
	"k8s.io/utils/pointer"
 | 
			
		||||
)
 | 
			
		||||
@@ -181,6 +190,85 @@ func TestParallelJobWithCompletions(t *testing.T) {
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestIndexedJob(t *testing.T) {
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
 | 
			
		||||
 | 
			
		||||
	closeFn, restConfig, clientSet, ns := setup(t, "indexed")
 | 
			
		||||
	defer closeFn()
 | 
			
		||||
	ctx, cancel := startJobController(restConfig, clientSet)
 | 
			
		||||
	defer func() {
 | 
			
		||||
		cancel()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
 | 
			
		||||
		Spec: batchv1.JobSpec{
 | 
			
		||||
			Parallelism:    pointer.Int32Ptr(3),
 | 
			
		||||
			Completions:    pointer.Int32Ptr(4),
 | 
			
		||||
			CompletionMode: batchv1.IndexedCompletion,
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create Job: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
 | 
			
		||||
		Active: 3,
 | 
			
		||||
	})
 | 
			
		||||
	validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
 | 
			
		||||
 | 
			
		||||
	// One Pod succeeds.
 | 
			
		||||
	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
 | 
			
		||||
		t.Fatal("Failed trying to succeed pod with index 1")
 | 
			
		||||
	}
 | 
			
		||||
	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
 | 
			
		||||
		Active:    3,
 | 
			
		||||
		Succeeded: 1,
 | 
			
		||||
	})
 | 
			
		||||
	validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
 | 
			
		||||
 | 
			
		||||
	// Disable feature gate and restart controller.
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)()
 | 
			
		||||
	cancel()
 | 
			
		||||
	ctx, cancel = startJobController(restConfig, clientSet)
 | 
			
		||||
	events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	defer events.Stop()
 | 
			
		||||
 | 
			
		||||
	// One Pod fails, but no recreations happen because feature is disabled.
 | 
			
		||||
	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
 | 
			
		||||
		t.Fatal("Failed trying to succeed pod with index 2")
 | 
			
		||||
	}
 | 
			
		||||
	if err := waitForEvent(events, jobObj.UID, "IndexedJobDisabled"); err != nil {
 | 
			
		||||
		t.Errorf("Waiting for an event for IndexedJobDisabled: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1")
 | 
			
		||||
 | 
			
		||||
	// Re-enable feature gate and restart controller. Failed Pod should be recreated now.
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
 | 
			
		||||
	cancel()
 | 
			
		||||
	ctx, cancel = startJobController(restConfig, clientSet)
 | 
			
		||||
 | 
			
		||||
	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
 | 
			
		||||
		Active:    3,
 | 
			
		||||
		Failed:    1,
 | 
			
		||||
		Succeeded: 1,
 | 
			
		||||
	})
 | 
			
		||||
	validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
 | 
			
		||||
 | 
			
		||||
	// Remaining Pods succeed.
 | 
			
		||||
	if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
 | 
			
		||||
		t.Fatal("Failed trying to succeed remaining pods")
 | 
			
		||||
	}
 | 
			
		||||
	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
 | 
			
		||||
		Active:    0,
 | 
			
		||||
		Failed:    1,
 | 
			
		||||
		Succeeded: 4,
 | 
			
		||||
	})
 | 
			
		||||
	validateJobPodsIndexes(ctx, t, clientSet, jobObj, nil, "0-3")
 | 
			
		||||
	validateJobSucceeded(ctx, t, clientSet, jobObj)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type podsByStatus struct {
 | 
			
		||||
	Active    int
 | 
			
		||||
	Failed    int
 | 
			
		||||
@@ -223,6 +311,62 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// validateJobPodsIndexes validates indexes of active and completed Pods of an
 | 
			
		||||
// Indexed Job. Call after validateJobPodsStatus
 | 
			
		||||
func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Int, gotCompleted string) {
 | 
			
		||||
	t.Helper()
 | 
			
		||||
	updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to get updated Job: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if updatedJob.Status.CompletedIndexes != gotCompleted {
 | 
			
		||||
		t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted)
 | 
			
		||||
	}
 | 
			
		||||
	pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to list Job Pods: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	gotActive := sets.NewInt()
 | 
			
		||||
	for _, pod := range pods.Items {
 | 
			
		||||
		if isPodOwnedByJob(&pod, jobObj) {
 | 
			
		||||
			if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
 | 
			
		||||
				if ix, err := getCompletionIndex(&pod); err != nil {
 | 
			
		||||
					t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err)
 | 
			
		||||
				} else {
 | 
			
		||||
					gotActive.Insert(ix)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if wantActive == nil {
 | 
			
		||||
		wantActive = sets.NewInt()
 | 
			
		||||
	}
 | 
			
		||||
	if diff := cmp.Diff(wantActive.List(), gotActive.List()); diff != "" {
 | 
			
		||||
		t.Errorf("Unexpected active indexes (-want,+got):\n%s", diff)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
 | 
			
		||||
	return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
		for {
 | 
			
		||||
			var ev watch.Event
 | 
			
		||||
			select {
 | 
			
		||||
			case ev = <-events.ResultChan():
 | 
			
		||||
			default:
 | 
			
		||||
				return false, nil
 | 
			
		||||
			}
 | 
			
		||||
			e, ok := ev.Object.(*eventsv1.Event)
 | 
			
		||||
			if !ok {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			ctrl := "job-controller"
 | 
			
		||||
			if (e.ReportingController == ctrl || e.DeprecatedSource.Component == ctrl) && e.Reason == reason && e.Regarding.UID == uid {
 | 
			
		||||
				return true, nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
 | 
			
		||||
	t.Helper()
 | 
			
		||||
	if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
@@ -265,6 +409,38 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
 | 
			
		||||
	pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("listing Job Pods: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	for _, pod := range pods.Items {
 | 
			
		||||
		if p := pod.Status.Phase; !isPodOwnedByJob(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
 | 
			
		||||
			pod.Status.Phase = phase
 | 
			
		||||
			_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return fmt.Errorf("updating pod %s status: %w", pod.Name, err)
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return errors.New("no pod matching index found")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getCompletionIndex(p *v1.Pod) (int, error) {
 | 
			
		||||
	if p.Annotations == nil {
 | 
			
		||||
		return 0, errors.New("no annotations found")
 | 
			
		||||
	}
 | 
			
		||||
	v, ok := p.Annotations[batchv1.JobCompletionIndexAnnotationAlpha]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return 0, fmt.Errorf("annotation %s not found", batchv1.JobCompletionIndexAnnotationAlpha)
 | 
			
		||||
	}
 | 
			
		||||
	return strconv.Atoi(v)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool {
 | 
			
		||||
	for _, owner := range p.ObjectMeta.OwnerReferences {
 | 
			
		||||
		if owner.Kind == "Job" && owner.UID == j.UID {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user