From 30c78c8ab37c29877abd9c8a6e650891816d0495 Mon Sep 17 00:00:00 2001 From: gmarek Date: Fri, 28 Oct 2016 13:50:39 +0200 Subject: [PATCH] Create multiple namespaces in the Density test --- test/e2e/density.go | 173 ++++++++++++++++++++++++++------------------ test/e2e/load.go | 28 +++---- 2 files changed, 118 insertions(+), 83 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index ac664b291fc..11caf6ecec0 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -55,7 +55,6 @@ var MaxContainerFailures = 0 type DensityTestConfig struct { Configs []testutils.RCConfig ClientSet internalclientset.Interface - Namespace string PollInterval time.Duration PodCount int Timeout time.Duration @@ -160,9 +159,9 @@ func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceC return constraints } -func logPodStartupStatus(c internalclientset.Interface, expectedPods int, ns string, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) { +func logPodStartupStatus(c internalclientset.Interface, expectedPods int, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) { label := labels.SelectorFromSet(labels.Set(observedLabels)) - podStore := testutils.NewPodStore(c, ns, label, fields.Everything()) + podStore := testutils.NewPodStore(c, api.NamespaceAll, label, fields.Everything()) defer podStore.Stop() ticker := time.NewTicker(period) defer ticker.Stop() @@ -199,7 +198,7 @@ func runDensityTest(dtc DensityTestConfig) time.Duration { }() } logStopCh := make(chan struct{}) - go logPodStartupStatus(dtc.ClientSet, dtc.PodCount, dtc.Namespace, map[string]string{"type": "densityPod"}, dtc.PollInterval, logStopCh) + go logPodStartupStatus(dtc.ClientSet, dtc.PodCount, map[string]string{"type": "densityPod"}, dtc.PollInterval, logStopCh) wg.Wait() startupTime := time.Now().Sub(startTime) close(logStopCh) @@ -236,15 +235,15 @@ func cleanupDensityTest(dtc DensityTestConfig) { // We explicitly delete all pods to have API calls necessary for deletion accounted in metrics. for i := range dtc.Configs { rcName := dtc.Configs[i].Name - rc, err := dtc.ClientSet.Core().ReplicationControllers(dtc.Namespace).Get(rcName) + rc, err := dtc.ClientSet.Core().ReplicationControllers(dtc.Configs[i].Namespace).Get(rcName) if err == nil && rc.Spec.Replicas != 0 { if framework.TestContext.GarbageCollectorEnabled { By("Cleaning up only the replication controller, garbage collector will clean up the pods") - err := framework.DeleteRCAndWaitForGC(dtc.ClientSet, dtc.Namespace, rcName) + err := framework.DeleteRCAndWaitForGC(dtc.ClientSet, dtc.Configs[i].Namespace, rcName) framework.ExpectNoError(err) } else { By("Cleaning up the replication controller and pods") - err := framework.DeleteRCAndPods(dtc.ClientSet, dtc.Namespace, rcName) + err := framework.DeleteRCAndPods(dtc.ClientSet, dtc.Configs[i].Namespace, rcName) framework.ExpectNoError(err) } } @@ -392,19 +391,23 @@ var _ = framework.KubeDescribe("Density", func() { defer fileHndl.Close() timeout := 10 * time.Minute - // TODO: loop to podsPerNode instead of 1 when we're ready. - numberOrRCs := 1 - RCConfigs := make([]testutils.RCConfig, numberOrRCs) - for i := 0; i < numberOrRCs; i++ { - RCName := "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid + // nodeCountPerNamespace and CreateNamespaces are defined in load.go + numberOfRCs := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace + namespaces, err := CreateNamespaces(f, numberOfRCs, fmt.Sprintf("density-%v", testArg.podsPerNode)) + framework.ExpectNoError(err) + + RCConfigs := make([]testutils.RCConfig, numberOfRCs) + for i := 0; i < numberOfRCs; i++ { + RCName := fmt.Sprintf("density%v-%v-%v", totalPods, i, uuid) + nsName := namespaces[i].Name RCConfigs[i] = testutils.RCConfig{Client: c, Image: framework.GetPauseImageName(f.ClientSet), Name: RCName, - Namespace: ns, + Namespace: nsName, Labels: map[string]string{"type": "densityPod"}, PollInterval: itArg.interval, PodStatusFile: fileHndl, - Replicas: (totalPods + numberOrRCs - 1) / numberOrRCs, + Replicas: (totalPods + numberOfRCs - 1) / numberOfRCs, CpuRequest: nodeCpuCapacity / 100, MemRequest: nodeMemCapacity / 100, MaxContainerFailures: &MaxContainerFailures, @@ -416,7 +419,6 @@ var _ = framework.KubeDescribe("Density", func() { ClientSet: f.ClientSet, Configs: RCConfigs, PodCount: totalPods, - Namespace: ns, PollInterval: itArg.interval, Timeout: timeout, } @@ -425,7 +427,7 @@ var _ = framework.KubeDescribe("Density", func() { By("Scheduling additional Pods to measure startup latencies") createTimes := make(map[string]unversioned.Time, 0) - nodes := make(map[string]string, 0) + nodeNames := make(map[string]string, 0) scheduleTimes := make(map[string]unversioned.Time, 0) runTimes := make(map[string]unversioned.Time, 0) watchTimes := make(map[string]unversioned.Time, 0) @@ -440,7 +442,7 @@ var _ = framework.KubeDescribe("Density", func() { if _, found := watchTimes[p.Name]; !found { watchTimes[p.Name] = unversioned.Now() createTimes[p.Name] = p.CreationTimestamp - nodes[p.Name] = p.Spec.NodeName + nodeNames[p.Name] = p.Spec.NodeName var startTime unversioned.Time for _, cs := range p.Status.ContainerStatuses { if cs.State.Running != nil { @@ -459,36 +461,48 @@ var _ = framework.KubeDescribe("Density", func() { } additionalPodsPrefix = "density-latency-pod" - latencyPodsStore, controller := cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix}) - obj, err := c.Core().Pods(ns).List(options) - return runtime.Object(obj), err - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix}) - return c.Core().Pods(ns).Watch(options) - }, - }, - &api.Pod{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - p, ok := obj.(*api.Pod) - Expect(ok).To(Equal(true)) - go checkPod(p) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - p, ok := newObj.(*api.Pod) - Expect(ok).To(Equal(true)) - go checkPod(p) - }, - }, - ) - stopCh := make(chan struct{}) - go controller.Run(stopCh) + + latencyPodStores := make([]cache.Store, len(namespaces)) + for i := 0; i < len(namespaces); i++ { + nsName := namespaces[i].Name + latencyPodsStore, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix}) + obj, err := c.Core().Pods(nsName).List(options) + return runtime.Object(obj), err + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix}) + return c.Core().Pods(nsName).Watch(options) + }, + }, + &api.Pod{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + p, ok := obj.(*api.Pod) + if !ok { + framework.Logf("Failed to cast observed object to *api.Pod.") + } + Expect(ok).To(Equal(true)) + go checkPod(p) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + p, ok := newObj.(*api.Pod) + if !ok { + framework.Logf("Failed to cast observed object to *api.Pod.") + } + Expect(ok).To(Equal(true)) + go checkPod(p) + }, + }, + ) + latencyPodStores[i] = latencyPodsStore + + go controller.Run(stopCh) + } // Create some additional pods with throughput ~5 pods/sec. var wg sync.WaitGroup @@ -505,9 +519,12 @@ var _ = framework.KubeDescribe("Density", func() { cpuRequest = *resource.NewMilliQuantity(0, resource.DecimalSI) memRequest = *resource.NewQuantity(0, resource.DecimalSI) } + rcNameToNsMap := map[string]string{} for i := 1; i <= nodeCount; i++ { name := additionalPodsPrefix + "-" + strconv.Itoa(i) - go createRunningPodFromRC(&wg, c, name, ns, framework.GetPauseImageName(f.ClientSet), additionalPodsPrefix, cpuRequest, memRequest) + nsName := namespaces[i%len(namespaces)].Name + rcNameToNsMap[name] = nsName + go createRunningPodFromRC(&wg, c, name, nsName, framework.GetPauseImageName(f.ClientSet), additionalPodsPrefix, cpuRequest, memRequest) time.Sleep(200 * time.Millisecond) } wg.Wait() @@ -521,29 +538,34 @@ var _ = framework.KubeDescribe("Density", func() { close(stopCh) nodeToLatencyPods := make(map[string]int) - for _, item := range latencyPodsStore.List() { - pod := item.(*api.Pod) - nodeToLatencyPods[pod.Spec.NodeName]++ - } - for node, count := range nodeToLatencyPods { - if count > 1 { - framework.Logf("%d latency pods scheduled on %s", count, node) + for i := range latencyPodStores { + for _, item := range latencyPodStores[i].List() { + pod := item.(*api.Pod) + nodeToLatencyPods[pod.Spec.NodeName]++ + } + for node, count := range nodeToLatencyPods { + if count > 1 { + framework.Logf("%d latency pods scheduled on %s", count, node) + } } } - selector := fields.Set{ - "involvedObject.kind": "Pod", - "involvedObject.namespace": ns, - "source": api.DefaultSchedulerName, - }.AsSelector() - options := api.ListOptions{FieldSelector: selector} - schedEvents, err := c.Core().Events(ns).List(options) - framework.ExpectNoError(err) - for k := range createTimes { - for _, event := range schedEvents.Items { - if event.InvolvedObject.Name == k { - scheduleTimes[k] = event.FirstTimestamp - break + for i := 0; i < len(namespaces); i++ { + nsName := namespaces[i].Name + selector := fields.Set{ + "involvedObject.kind": "Pod", + "involvedObject.namespace": nsName, + "source": api.DefaultSchedulerName, + }.AsSelector() + options := api.ListOptions{FieldSelector: selector} + schedEvents, err := c.Core().Events(nsName).List(options) + framework.ExpectNoError(err) + for k := range createTimes { + for _, event := range schedEvents.Items { + if event.InvolvedObject.Name == k { + scheduleTimes[k] = event.FirstTimestamp + break + } } } } @@ -556,12 +578,24 @@ var _ = framework.KubeDescribe("Density", func() { for name, create := range createTimes { sched, ok := scheduleTimes[name] + if !ok { + framework.Logf("Failed to find schedule time for %v", name) + } Expect(ok).To(Equal(true)) run, ok := runTimes[name] + if !ok { + framework.Logf("Failed to find run time for %v", name) + } Expect(ok).To(Equal(true)) watch, ok := watchTimes[name] + if !ok { + framework.Logf("Failed to find watch time for %v", name) + } Expect(ok).To(Equal(true)) - node, ok := nodes[name] + node, ok := nodeNames[name] + if !ok { + framework.Logf("Failed to find node for %v", name) + } Expect(ok).To(Equal(true)) scheduleLag = append(scheduleLag, framework.PodLatencyData{Name: name, Node: node, Latency: sched.Time.Sub(create.Time)}) @@ -592,7 +626,7 @@ var _ = framework.KubeDescribe("Density", func() { By("Removing additional replication controllers") deleteRC := func(i int) { name := additionalPodsPrefix + "-" + strconv.Itoa(i+1) - framework.ExpectNoError(framework.DeleteRCAndWaitForGC(c, ns, name)) + framework.ExpectNoError(framework.DeleteRCAndWaitForGC(c, rcNameToNsMap[name], name)) } workqueue.Parallelize(16, nodeCount, deleteRC) } @@ -636,7 +670,6 @@ var _ = framework.KubeDescribe("Density", func() { ClientSet: f.ClientSet, Configs: RCConfigs, PodCount: totalPods, - Namespace: ns, PollInterval: 10 * time.Second, Timeout: 10 * time.Minute, } diff --git a/test/e2e/load.go b/test/e2e/load.go index 55ce1110b10..543b6e75bf9 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -66,7 +66,6 @@ var _ = framework.KubeDescribe("Load capacity", func() { var nodeCount int var ns string var configs []*testutils.RCConfig - var namespaces []*api.Namespace // Gathers metrics before teardown // TODO add flag that allows to skip cleanup on failure @@ -140,7 +139,9 @@ var _ = framework.KubeDescribe("Load capacity", func() { It(name, func() { // Create a number of namespaces. - namespaces = createNamespaces(f, nodeCount, itArg.podsPerNode) + namespaceCount := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace + namespaces, err := CreateNamespaces(f, namespaceCount, fmt.Sprintf("load-%v-nodepods", itArg.podsPerNode)) + framework.ExpectNoError(err) totalPods := itArg.podsPerNode * nodeCount configs = generateRCConfigs(totalPods, itArg.image, itArg.command, namespaces) @@ -210,17 +211,6 @@ var _ = framework.KubeDescribe("Load capacity", func() { } }) -func createNamespaces(f *framework.Framework, nodeCount, podsPerNode int) []*api.Namespace { - namespaceCount := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace - namespaces := []*api.Namespace{} - for i := 1; i <= namespaceCount; i++ { - namespace, err := f.CreateNamespace(fmt.Sprintf("load-%d-nodepods-%d", podsPerNode, i), nil) - framework.ExpectNoError(err) - namespaces = append(namespaces, namespace) - } - return namespaces -} - func createClients(numberOfClients int) ([]*internalclientset.Clientset, error) { clients := make([]*internalclientset.Clientset, numberOfClients) for i := 0; i < numberOfClients; i++ { @@ -414,3 +404,15 @@ func deleteRC(wg *sync.WaitGroup, config *testutils.RCConfig, deletingTime time. framework.ExpectNoError(framework.DeleteRCAndPods(config.Client, config.Namespace, config.Name), fmt.Sprintf("deleting rc %s", config.Name)) } } + +func CreateNamespaces(f *framework.Framework, namespaceCount int, namePrefix string) ([]*api.Namespace, error) { + namespaces := []*api.Namespace{} + for i := 1; i <= namespaceCount; i++ { + namespace, err := f.CreateNamespace(fmt.Sprintf("%v-%d", namePrefix, i), nil) + if err != nil { + return []*api.Namespace{}, err + } + namespaces = append(namespaces, namespace) + } + return namespaces, nil +}