fix evictions test and refactor the test
This commit is contained in:
		| @@ -21,6 +21,8 @@ package evictions | |||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http/httptest" | 	"net/http/httptest" | ||||||
|  | 	"sync" | ||||||
|  | 	"sync/atomic" | ||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| @@ -33,15 +35,138 @@ import ( | |||||||
| 	"k8s.io/kubernetes/pkg/client/restclient" | 	"k8s.io/kubernetes/pkg/client/restclient" | ||||||
| 	"k8s.io/kubernetes/pkg/controller/disruption" | 	"k8s.io/kubernetes/pkg/controller/disruption" | ||||||
| 	"k8s.io/kubernetes/pkg/controller/informers" | 	"k8s.io/kubernetes/pkg/controller/informers" | ||||||
|  | 	utilerrors "k8s.io/kubernetes/pkg/util/errors" | ||||||
| 	"k8s.io/kubernetes/pkg/util/intstr" | 	"k8s.io/kubernetes/pkg/util/intstr" | ||||||
| 	"k8s.io/kubernetes/pkg/util/wait" | 	"k8s.io/kubernetes/pkg/util/wait" | ||||||
| 	"k8s.io/kubernetes/test/integration/framework" | 	"k8s.io/kubernetes/test/integration/framework" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| 	defaultTimeout = 10 * time.Minute | 	numOfEvictions = 10 | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // TestConcurrentEvictionRequests is to make sure pod disruption budgets (PDB) controller is able to | ||||||
|  | // handle concurrent eviction requests. Original issue:#37605 | ||||||
|  | func TestConcurrentEvictionRequests(t *testing.T) { | ||||||
|  | 	podNameFormat := "test-pod-%d" | ||||||
|  |  | ||||||
|  | 	s, rm, podInformer, clientSet := rmSetup(t) | ||||||
|  | 	defer s.Close() | ||||||
|  |  | ||||||
|  | 	ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) | ||||||
|  | 	defer framework.DeleteTestingNamespace(ns, s, t) | ||||||
|  |  | ||||||
|  | 	stopCh := make(chan struct{}) | ||||||
|  | 	go podInformer.Run(stopCh) | ||||||
|  | 	go rm.Run(stopCh) | ||||||
|  | 	defer close(stopCh) | ||||||
|  |  | ||||||
|  | 	config := restclient.Config{Host: s.URL} | ||||||
|  | 	clientSet, err := clientset.NewForConfig(&config) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("Failed to create clientset: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var gracePeriodSeconds int64 = 30 | ||||||
|  | 	deleteOption := &v1.DeleteOptions{ | ||||||
|  | 		GracePeriodSeconds: &gracePeriodSeconds, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Generate numOfEvictions pods to evict | ||||||
|  | 	for i := 0; i < numOfEvictions; i++ { | ||||||
|  | 		podName := fmt.Sprintf(podNameFormat, i) | ||||||
|  | 		pod := newPod(podName) | ||||||
|  |  | ||||||
|  | 		if _, err := clientSet.Core().Pods(ns.Name).Create(pod); err != nil { | ||||||
|  | 			t.Errorf("Failed to create pod: %v", err) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		addPodConditionReady(pod) | ||||||
|  | 		if _, err := clientSet.Core().Pods(ns.Name).UpdateStatus(pod); err != nil { | ||||||
|  | 			t.Fatal(err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	waitToObservePods(t, podInformer, numOfEvictions) | ||||||
|  |  | ||||||
|  | 	pdb := newPDB() | ||||||
|  | 	if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil { | ||||||
|  | 		t.Errorf("Failed to create PodDisruptionBudget: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	waitPDBStable(t, clientSet, numOfEvictions, ns.Name, pdb.Name) | ||||||
|  |  | ||||||
|  | 	var numberPodsEvicted uint32 = 0 | ||||||
|  | 	errCh := make(chan error, 3*numOfEvictions) | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 	// spawn numOfEvictions goroutines to concurrently evict the pods | ||||||
|  | 	for i := 0; i < numOfEvictions; i++ { | ||||||
|  | 		wg.Add(1) | ||||||
|  | 		go func(id int, errCh chan error) { | ||||||
|  | 			defer wg.Done() | ||||||
|  | 			podName := fmt.Sprintf(podNameFormat, id) | ||||||
|  | 			eviction := newEviction(ns.Name, podName, deleteOption) | ||||||
|  |  | ||||||
|  | 			err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) { | ||||||
|  | 				e := clientSet.Policy().Evictions(ns.Name).Evict(eviction) | ||||||
|  | 				switch { | ||||||
|  | 				case errors.IsTooManyRequests(e): | ||||||
|  | 					return false, nil | ||||||
|  | 				case errors.IsConflict(e): | ||||||
|  | 					return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e) | ||||||
|  | 				case e == nil: | ||||||
|  | 					return true, nil | ||||||
|  | 				default: | ||||||
|  | 					return false, e | ||||||
|  | 				} | ||||||
|  | 			}) | ||||||
|  |  | ||||||
|  | 			if err != nil { | ||||||
|  | 				errCh <- err | ||||||
|  | 				// should not return here otherwise we would leak the pod | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			_, err = clientSet.Core().Pods(ns.Name).Get(podName, metav1.GetOptions{}) | ||||||
|  | 			switch { | ||||||
|  | 			case errors.IsNotFound(err): | ||||||
|  | 				atomic.AddUint32(&numberPodsEvicted, 1) | ||||||
|  | 				// pod was evicted and deleted so return from goroutine immediately | ||||||
|  | 				return | ||||||
|  | 			case err == nil: | ||||||
|  | 				// this shouldn't happen if the pod was evicted successfully | ||||||
|  | 				errCh <- fmt.Errorf("Pod %q is expected to be evicted", podName) | ||||||
|  | 			default: | ||||||
|  | 				errCh <- err | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// delete pod which still exists due to error | ||||||
|  | 			e := clientSet.Core().Pods(ns.Name).Delete(podName, deleteOption) | ||||||
|  | 			if e != nil { | ||||||
|  | 				errCh <- e | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 		}(i, errCh) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	wg.Wait() | ||||||
|  |  | ||||||
|  | 	close(errCh) | ||||||
|  | 	var errList []error | ||||||
|  | 	if err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil { | ||||||
|  | 		errList = append(errList, fmt.Errorf("Failed to delete PodDisruptionBudget: %v", err)) | ||||||
|  | 	} | ||||||
|  | 	for err := range errCh { | ||||||
|  | 		errList = append(errList, err) | ||||||
|  | 	} | ||||||
|  | 	if len(errList) > 0 { | ||||||
|  | 		t.Fatal(utilerrors.NewAggregate(errList)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if atomic.LoadUint32(&numberPodsEvicted) != numOfEvictions { | ||||||
|  | 		t.Fatalf("fewer number of successful evictions than expected :", numberPodsEvicted) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| func newPod(podName string) *v1.Pod { | func newPod(podName string) *v1.Pod { | ||||||
| 	return &v1.Pod{ | 	return &v1.Pod{ | ||||||
| 		ObjectMeta: v1.ObjectMeta{ | 		ObjectMeta: v1.ObjectMeta{ | ||||||
| @@ -121,111 +246,6 @@ func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, | |||||||
| 	return s, rm, informers.Pods().Informer(), clientSet | 	return s, rm, informers.Pods().Informer(), clientSet | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestConcurrentEvictionRequests(t *testing.T) { |  | ||||||
| 	podNameFormat := "test-pod-%d" |  | ||||||
|  |  | ||||||
| 	s, rm, podInformer, clientSet := rmSetup(t) |  | ||||||
| 	defer s.Close() |  | ||||||
|  |  | ||||||
| 	ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) |  | ||||||
| 	defer framework.DeleteTestingNamespace(ns, s, t) |  | ||||||
|  |  | ||||||
| 	stopCh := make(chan struct{}) |  | ||||||
| 	go podInformer.Run(stopCh) |  | ||||||
| 	go rm.Run(stopCh) |  | ||||||
|  |  | ||||||
| 	config := restclient.Config{Host: s.URL} |  | ||||||
| 	clientSet, err := clientset.NewForConfig(&config) |  | ||||||
|  |  | ||||||
| 	var gracePeriodSeconds int64 = 30 |  | ||||||
| 	deleteOption := &v1.DeleteOptions{ |  | ||||||
| 		GracePeriodSeconds: &gracePeriodSeconds, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Generate 10 pods to evict |  | ||||||
| 	for i := 0; i < 10; i++ { |  | ||||||
| 		podName := fmt.Sprintf(podNameFormat, i) |  | ||||||
| 		pod := newPod(podName) |  | ||||||
|  |  | ||||||
| 		if _, err := clientSet.Core().Pods(ns.Name).Create(pod); err != nil { |  | ||||||
| 			t.Errorf("Failed to create pod: %v", err) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		addPodConditionReady(pod) |  | ||||||
| 		if _, err := clientSet.Core().Pods(ns.Name).UpdateStatus(pod); err != nil { |  | ||||||
| 			t.Fatal(err) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	waitToObservePods(t, podInformer, 10) |  | ||||||
|  |  | ||||||
| 	pdb := newPDB() |  | ||||||
| 	if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil { |  | ||||||
| 		t.Errorf("Failed to create PodDisruptionBudget: %v", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	waitPDBStable(t, clientSet, 10, ns.Name, pdb.Name) |  | ||||||
|  |  | ||||||
| 	doneCh := make(chan bool, 10) |  | ||||||
| 	errCh := make(chan error, 1) |  | ||||||
| 	// spawn 10 goroutine to concurrently evict the pods |  | ||||||
| 	for i := 0; i < 10; i++ { |  | ||||||
| 		go func(id int, doneCh chan bool, errCh chan error) { |  | ||||||
| 			evictionName := fmt.Sprintf(podNameFormat, id) |  | ||||||
| 			eviction := newEviction(ns.Name, evictionName, deleteOption) |  | ||||||
|  |  | ||||||
| 			var e error |  | ||||||
| 			for { |  | ||||||
| 				e = clientSet.Policy().Evictions(ns.Name).Evict(eviction) |  | ||||||
| 				if errors.IsTooManyRequests(e) { |  | ||||||
| 					time.Sleep(5 * time.Second) |  | ||||||
| 				} else { |  | ||||||
| 					break |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 			if e != nil { |  | ||||||
| 				if errors.IsConflict(err) { |  | ||||||
| 					fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e) |  | ||||||
| 				} else { |  | ||||||
| 					errCh <- e |  | ||||||
| 				} |  | ||||||
| 				return |  | ||||||
| 			} |  | ||||||
| 			doneCh <- true |  | ||||||
| 		}(i, doneCh, errCh) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	doneCount := 0 |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case err := <-errCh: |  | ||||||
| 			t.Errorf("%v", err) |  | ||||||
| 			return |  | ||||||
| 		case <-doneCh: |  | ||||||
| 			doneCount++ |  | ||||||
| 			if doneCount == 10 { |  | ||||||
| 				return |  | ||||||
| 			} |  | ||||||
| 		case <-time.After(defaultTimeout): |  | ||||||
| 			t.Errorf("Eviction did not complete within %v", defaultTimeout) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for i := 0; i < 10; i++ { |  | ||||||
| 		podName := fmt.Sprintf(podNameFormat, i) |  | ||||||
| 		_, err := clientSet.Core().Pods(ns.Name).Get(podName, metav1.GetOptions{}) |  | ||||||
| 		if !errors.IsNotFound(err) { |  | ||||||
| 			t.Errorf("Pod %q is expected to be evicted", podName) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil { |  | ||||||
| 		t.Errorf("Failed to delete PodDisruptionBudget: %v", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	close(stopCh) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // wait for the podInformer to observe the pods. Call this function before | // wait for the podInformer to observe the pods. Call this function before | ||||||
| // running the RS controller to prevent the rc manager from creating new pods | // running the RS controller to prevent the rc manager from creating new pods | ||||||
| // rather than adopting the existing ones. | // rather than adopting the existing ones. | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 ymqytw
					ymqytw