Merge pull request #34544 from gmarek/split

Automatic merge from submit-queue

Move RunRC-like functions to test/utils

Ref. #34336

cc @timothysc - the "move" part of the small refactoring. @jayunit100
This commit is contained in:
Kubernetes Submit Queue
2016-10-14 07:22:38 -07:00
committed by GitHub
27 changed files with 895 additions and 756 deletions

View File

@@ -23,7 +23,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net"
"net/http"
@@ -46,11 +45,9 @@ import (
"k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
"k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/restclient"
@@ -79,7 +76,7 @@ import (
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
testutil "k8s.io/kubernetes/test/utils"
testutils "k8s.io/kubernetes/test/utils"
"github.com/blang/semver"
"golang.org/x/crypto/ssh"
@@ -112,9 +109,6 @@ const (
// How long to wait for a service endpoint to be resolvable.
ServiceStartTimeout = 1 * time.Minute
// String used to mark pod deletion
nonExist = "NonExist"
// How often to Poll pods, nodes and claims.
Poll = 2 * time.Second
@@ -250,110 +244,6 @@ func GetMasterHost() string {
return masterUrl.Host
}
// Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}.
type PodStore struct {
cache.Store
stopCh chan struct{}
reflector *cache.Reflector
}
func NewPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *PodStore {
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.LabelSelector = label
options.FieldSelector = field
return c.Pods(namespace).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.LabelSelector = label
options.FieldSelector = field
return c.Pods(namespace).Watch(options)
},
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
reflector := cache.NewReflector(lw, &api.Pod{}, store, 0)
reflector.RunUntil(stopCh)
return &PodStore{store, stopCh, reflector}
}
func (s *PodStore) List() []*api.Pod {
objects := s.Store.List()
pods := make([]*api.Pod, 0)
for _, o := range objects {
pods = append(pods, o.(*api.Pod))
}
return pods
}
func (s *PodStore) Stop() {
close(s.stopCh)
}
type RCConfig struct {
Client *client.Client
Image string
Command []string
Name string
Namespace string
PollInterval time.Duration
Timeout time.Duration
PodStatusFile *os.File
Replicas int
CpuRequest int64 // millicores
CpuLimit int64 // millicores
MemRequest int64 // bytes
MemLimit int64 // bytes
ReadinessProbe *api.Probe
DNSPolicy *api.DNSPolicy
// Env vars, set the same for every pod.
Env map[string]string
// Extra labels added to every pod.
Labels map[string]string
// Node selector for pods in the RC.
NodeSelector map[string]string
// Ports to declare in the container (map of name to containerPort).
Ports map[string]int
// Ports to declare in the container as host and container ports.
HostPorts map[string]int
Volumes []api.Volume
VolumeMounts []api.VolumeMount
// Pointer to a list of pods; if non-nil, will be set to a list of pods
// created by this RC by RunRC.
CreatedPods *[]*api.Pod
// Maximum allowable container failures. If exceeded, RunRC returns an error.
// Defaults to replicas*0.1 if unspecified.
MaxContainerFailures *int
// If set to false starting RC will print progress, otherwise only errors will be printed.
Silent bool
// If set this function will be used to print log lines instead of glog.
LogFunc func(fmt string, args ...interface{})
}
func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) {
if rc.LogFunc != nil {
rc.LogFunc(fmt, args...)
}
glog.Infof(fmt, args...)
}
type DeploymentConfig struct {
RCConfig
}
type ReplicaSetConfig struct {
RCConfig
}
func nowStamp() string {
return time.Now().Format(time.StampMilli)
}
@@ -470,17 +360,6 @@ var providersWithMasterSSH = []string{"gce", "gke", "kubemark", "aws"}
type podCondition func(pod *api.Pod) (bool, error)
// podReady returns whether pod has a condition of Ready with a status of true.
// TODO: should be replaced with api.IsPodReady
func podReady(pod *api.Pod) bool {
for _, cond := range pod.Status.Conditions {
if cond.Type == api.PodReady && cond.Status == api.ConditionTrue {
return true
}
}
return false
}
// logPodStates logs basic info of provided pods for debugging.
func logPodStates(pods []api.Pod) {
// Find maximum widths for pod, node, and phase strings for column printing.
@@ -541,40 +420,6 @@ func errorBadPodsStates(badPods []api.Pod, desiredPods int, ns, desiredState str
return errStr + buf.String()
}
// PodRunningReady checks whether pod p's phase is running and it has a ready
// condition of status true.
func PodRunningReady(p *api.Pod) (bool, error) {
// Check the phase is running.
if p.Status.Phase != api.PodRunning {
return false, fmt.Errorf("want pod '%s' on '%s' to be '%v' but was '%v'",
p.ObjectMeta.Name, p.Spec.NodeName, api.PodRunning, p.Status.Phase)
}
// Check the ready condition is true.
if !podReady(p) {
return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v",
p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionTrue, p.Status.Conditions)
}
return true, nil
}
func PodRunningReadyOrSucceeded(p *api.Pod) (bool, error) {
// Check if the phase is succeeded.
if p.Status.Phase == api.PodSucceeded {
return true, nil
}
return PodRunningReady(p)
}
// PodNotReady checks whether pod p's has a ready condition of status false.
func PodNotReady(p *api.Pod) (bool, error) {
// Check the ready condition is false.
if podReady(p) {
return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v",
p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionFalse, p.Status.Conditions)
}
return true, nil
}
// check if a Pod is controlled by a Replication Controller in the List
func hasReplicationControllersForPod(rcs *api.ReplicationControllerList, pod api.Pod) bool {
for _, rc := range rcs.Items {
@@ -681,7 +526,7 @@ func WaitForPodsRunningReady(c *client.Client, ns string, minPods int32, timeout
Logf("%v in state %v, ignoring", pod.Name, pod.Status.Phase)
continue
}
if res, err := PodRunningReady(&pod); res && err == nil {
if res, err := testutils.PodRunningReady(&pod); res && err == nil {
nOk++
if hasReplicationControllersForPod(rcList, pod) {
replicaOk++
@@ -751,7 +596,7 @@ func RunKubernetesServiceTestContainer(c *client.Client, ns string) {
}
}()
timeout := 5 * time.Minute
if err := waitForPodCondition(c, ns, p.Name, "clusterapi-tester", timeout, PodRunningReady); err != nil {
if err := waitForPodCondition(c, ns, p.Name, "clusterapi-tester", timeout, testutils.PodRunningReady); err != nil {
Logf("Pod %v took longer than %v to enter running/ready: %v", p.Name, timeout, err)
return
}
@@ -788,7 +633,7 @@ func LogFailedContainers(c *client.Client, ns string, logFunc func(ftm string, a
}
logFunc("Running kubectl logs on non-ready containers in %v", ns)
for _, pod := range podList.Items {
if res, err := PodRunningReady(&pod); !res || err != nil {
if res, err := testutils.PodRunningReady(&pod); !res || err != nil {
kubectlLogPod(c, pod, "", Logf)
}
}
@@ -911,7 +756,7 @@ func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout tim
}
Logf("Waiting for pod %[1]s in namespace '%[2]s' status to be '%[3]s'"+
"(found phase: %[4]q, readiness: %[5]t) (%[6]v elapsed)",
podName, ns, desc, pod.Status.Phase, podReady(pod), time.Since(start))
podName, ns, desc, pod.Status.Phase, testutils.PodReady(pod), time.Since(start))
}
return fmt.Errorf("gave up waiting for pod '%s' to be '%s' after %v", podName, desc, timeout)
}
@@ -2338,471 +2183,25 @@ func (f *Framework) MatchContainerOutput(
return nil
}
// podInfo contains pod information useful for debugging e2e tests.
type podInfo struct {
oldHostname string
oldPhase string
hostname string
phase string
}
// PodDiff is a map of pod name to podInfos
type PodDiff map[string]*podInfo
// Print formats and prints the give PodDiff.
func (p PodDiff) String(ignorePhases sets.String) string {
ret := ""
for name, info := range p {
if ignorePhases.Has(info.phase) {
continue
}
if info.phase == nonExist {
ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname)
continue
}
phaseChange, hostChange := false, false
msg := fmt.Sprintf("Pod %v ", name)
if info.oldPhase != info.phase {
phaseChange = true
if info.oldPhase == nonExist {
msg += fmt.Sprintf("in phase %v ", info.phase)
} else {
msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
}
}
if info.oldHostname != info.hostname {
hostChange = true
if info.oldHostname == nonExist || info.oldHostname == "" {
msg += fmt.Sprintf("assigned host %v ", info.hostname)
} else {
msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
}
}
if phaseChange || hostChange {
ret += msg + "\n"
}
}
return ret
}
// Diff computes a PodDiff given 2 lists of pods.
func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff {
podInfoMap := PodDiff{}
// New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
for _, pod := range curPods {
podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
}
// Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
for _, pod := range oldPods {
if info, ok := podInfoMap[pod.Name]; ok {
info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
} else {
podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
}
}
return podInfoMap
}
// RunDeployment Launches (and verifies correctness) of a Deployment
// and will wait for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling Cleanup).
func RunDeployment(config DeploymentConfig) error {
err := config.create()
if err != nil {
return err
}
return config.start()
}
func (config *DeploymentConfig) create() error {
func RunDeployment(config testutils.DeploymentConfig) error {
By(fmt.Sprintf("creating deployment %s in namespace %s", config.Name, config.Namespace))
deployment := &extensions.Deployment{
ObjectMeta: api.ObjectMeta{
Name: config.Name,
},
Spec: extensions.DeploymentSpec{
Replicas: int32(config.Replicas),
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{
"name": config.Name,
},
},
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"name": config.Name},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: config.Name,
Image: config.Image,
Command: config.Command,
Ports: []api.ContainerPort{{ContainerPort: 80}},
},
},
},
},
},
}
config.applyTo(&deployment.Spec.Template)
_, err := config.Client.Deployments(config.Namespace).Create(deployment)
if err != nil {
return fmt.Errorf("Error creating deployment: %v", err)
}
config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, deployment.Spec.Replicas)
return nil
config.NodeDumpFunc = DumpNodeDebugInfo
config.ContainerDumpFunc = LogFailedContainers
return testutils.RunDeployment(config)
}
// RunReplicaSet launches (and verifies correctness) of a ReplicaSet
// and waits until all the pods it launches to reach the "Running" state.
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling Cleanup).
func RunReplicaSet(config ReplicaSetConfig) error {
err := config.create()
if err != nil {
return err
}
return config.start()
}
func (config *ReplicaSetConfig) create() error {
func RunReplicaSet(config testutils.ReplicaSetConfig) error {
By(fmt.Sprintf("creating replicaset %s in namespace %s", config.Name, config.Namespace))
rs := &extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: config.Name,
},
Spec: extensions.ReplicaSetSpec{
Replicas: int32(config.Replicas),
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{
"name": config.Name,
},
},
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"name": config.Name},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: config.Name,
Image: config.Image,
Command: config.Command,
Ports: []api.ContainerPort{{ContainerPort: 80}},
},
},
},
},
},
}
config.applyTo(&rs.Spec.Template)
_, err := config.Client.ReplicaSets(config.Namespace).Create(rs)
if err != nil {
return fmt.Errorf("Error creating replica set: %v", err)
}
config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, rs.Spec.Replicas)
return nil
config.NodeDumpFunc = DumpNodeDebugInfo
config.ContainerDumpFunc = LogFailedContainers
return testutils.RunReplicaSet(config)
}
// RunRC Launches (and verifies correctness) of a Replication Controller
// and will wait for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling Cleanup).
func RunRC(config RCConfig) error {
err := config.create()
if err != nil {
return err
}
return config.start()
}
func (config *RCConfig) create() error {
func RunRC(config testutils.RCConfig) error {
By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace))
dnsDefault := api.DNSDefault
if config.DNSPolicy == nil {
config.DNSPolicy = &dnsDefault
}
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: config.Name,
},
Spec: api.ReplicationControllerSpec{
Replicas: int32(config.Replicas),
Selector: map[string]string{
"name": config.Name,
},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"name": config.Name},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: config.Name,
Image: config.Image,
Command: config.Command,
Ports: []api.ContainerPort{{ContainerPort: 80}},
ReadinessProbe: config.ReadinessProbe,
},
},
DNSPolicy: *config.DNSPolicy,
NodeSelector: config.NodeSelector,
},
},
},
}
config.applyTo(rc.Spec.Template)
_, err := config.Client.ReplicationControllers(config.Namespace).Create(rc)
if err != nil {
return fmt.Errorf("Error creating replication controller: %v", err)
}
config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, rc.Spec.Replicas)
return nil
}
func (config *RCConfig) applyTo(template *api.PodTemplateSpec) {
if config.Env != nil {
for k, v := range config.Env {
c := &template.Spec.Containers[0]
c.Env = append(c.Env, api.EnvVar{Name: k, Value: v})
}
}
if config.Labels != nil {
for k, v := range config.Labels {
template.ObjectMeta.Labels[k] = v
}
}
if config.NodeSelector != nil {
template.Spec.NodeSelector = make(map[string]string)
for k, v := range config.NodeSelector {
template.Spec.NodeSelector[k] = v
}
}
if config.Ports != nil {
for k, v := range config.Ports {
c := &template.Spec.Containers[0]
c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v)})
}
}
if config.HostPorts != nil {
for k, v := range config.HostPorts {
c := &template.Spec.Containers[0]
c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
}
}
if config.CpuLimit > 0 || config.MemLimit > 0 {
template.Spec.Containers[0].Resources.Limits = api.ResourceList{}
}
if config.CpuLimit > 0 {
template.Spec.Containers[0].Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
}
if config.MemLimit > 0 {
template.Spec.Containers[0].Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
}
if config.CpuRequest > 0 || config.MemRequest > 0 {
template.Spec.Containers[0].Resources.Requests = api.ResourceList{}
}
if config.CpuRequest > 0 {
template.Spec.Containers[0].Resources.Requests[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
}
if config.MemRequest > 0 {
template.Spec.Containers[0].Resources.Requests[api.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
}
if len(config.Volumes) > 0 {
template.Spec.Volumes = config.Volumes
}
if len(config.VolumeMounts) > 0 {
template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
}
}
type RCStartupStatus struct {
Expected int
Terminating int
Running int
RunningButNotReady int
Waiting int
Pending int
Unknown int
Inactive int
FailedContainers int
Created []*api.Pod
ContainerRestartNodes sets.String
}
func (s *RCStartupStatus) String(name string) string {
return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
}
func ComputeRCStartupStatus(pods []*api.Pod, expected int) RCStartupStatus {
startupStatus := RCStartupStatus{
Expected: expected,
Created: make([]*api.Pod, 0, expected),
ContainerRestartNodes: sets.NewString(),
}
for _, p := range pods {
if p.DeletionTimestamp != nil {
startupStatus.Terminating++
continue
}
startupStatus.Created = append(startupStatus.Created, p)
if p.Status.Phase == api.PodRunning {
ready := false
for _, c := range p.Status.Conditions {
if c.Type == api.PodReady && c.Status == api.ConditionTrue {
ready = true
break
}
}
if ready {
// Only count a pod is running when it is also ready.
startupStatus.Running++
} else {
startupStatus.RunningButNotReady++
}
for _, v := range FailedContainers(p) {
startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
}
} else if p.Status.Phase == api.PodPending {
if p.Spec.NodeName == "" {
startupStatus.Waiting++
} else {
startupStatus.Pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
startupStatus.Inactive++
} else if p.Status.Phase == api.PodUnknown {
startupStatus.Unknown++
}
}
return startupStatus
}
func (config *RCConfig) start() error {
// Don't force tests to fail if they don't care about containers restarting.
var maxContainerFailures int
if config.MaxContainerFailures == nil {
maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
} else {
maxContainerFailures = *config.MaxContainerFailures
}
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
PodStore := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
defer PodStore.Stop()
interval := config.PollInterval
if interval <= 0 {
interval = 10 * time.Second
}
timeout := config.Timeout
if timeout <= 0 {
timeout = 5 * time.Minute
}
oldPods := make([]*api.Pod, 0)
oldRunning := 0
lastChange := time.Now()
for oldRunning != config.Replicas {
time.Sleep(interval)
pods := PodStore.List()
startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
pods = startupStatus.Created
if config.CreatedPods != nil {
*config.CreatedPods = pods
}
if !config.Silent {
config.RCConfigLog(startupStatus.String(config.Name))
}
if config.PodStatusFile != nil {
fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
}
if startupStatus.FailedContainers > maxContainerFailures {
DumpNodeDebugInfo(config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog)
// Get the logs from the failed containers to help diagnose what caused them to fail
LogFailedContainers(config.Client, config.Namespace, config.RCConfigLog)
return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
}
if len(pods) < len(oldPods) || len(pods) > config.Replicas {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods for %s changed: %d vs %d", config.Name, len(pods), len(oldPods))
config.RCConfigLog("%v, pods that changed since the last iteration:", errorStr)
config.RCConfigLog(Diff(oldPods, pods).String(sets.NewString()))
return fmt.Errorf(errorStr)
}
if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
lastChange = time.Now()
}
oldPods = pods
oldRunning = startupStatus.Running
if time.Since(lastChange) > timeout {
break
}
}
if oldRunning != config.Replicas {
// List only pods from a given replication controller.
options := api.ListOptions{LabelSelector: label}
if pods, err := config.Client.Pods(api.NamespaceAll).List(options); err == nil {
for _, pod := range pods.Items {
config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
}
} else {
config.RCConfigLog("Can't list pod debug info: %v", err)
}
return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas)
}
return nil
}
// Simplified version of RunRC, that does not create RC, but creates plain Pods.
// Optionally waits for pods to start running (if waitForRunning == true).
// The number of replicas must be non-zero.
func StartPods(c *client.Client, replicas int, namespace string, podNamePrefix string,
pod api.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error {
// no pod to start
if replicas < 1 {
panic("StartPods: number of replicas must be non-zero")
}
startPodsID := string(uuid.NewUUID()) // So that we can label and find them
for i := 0; i < replicas; i++ {
podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
pod.ObjectMeta.Name = podName
pod.ObjectMeta.Labels["name"] = podName
pod.ObjectMeta.Labels["startPodsID"] = startPodsID
pod.Spec.Containers[0].Name = podName
_, err := c.Pods(namespace).Create(&pod)
return err
}
logFunc("Waiting for running...")
if waitForRunning {
label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
err := WaitForPodsWithLabelRunning(c, namespace, label)
return fmt.Errorf("Error waiting for %d pods to be running - probably a timeout: %v", replicas, err)
}
return nil
config.NodeDumpFunc = DumpNodeDebugInfo
config.ContainerDumpFunc = LogFailedContainers
return testutils.RunRC(config)
}
type EventsLister func(opts v1.ListOptions, ns string) (*v1.EventList, error)
@@ -3014,7 +2413,7 @@ func WaitForAllNodesSchedulable(c *client.Client) error {
}
func AddOrUpdateLabelOnNode(c clientset.Interface, nodeName string, labelKey, labelValue string) {
ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue}))
ExpectNoError(testutils.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue}))
}
func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string, labelValue string) {
@@ -3028,10 +2427,10 @@ func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string,
// won't fail if target label doesn't exist or has been removed.
func RemoveLabelOffNode(c clientset.Interface, nodeName string, labelKey string) {
By("removing the label " + labelKey + " off the node " + nodeName)
ExpectNoError(testutil.RemoveLabelOffNode(c, nodeName, []string{labelKey}))
ExpectNoError(testutils.RemoveLabelOffNode(c, nodeName, []string{labelKey}))
By("verifying the node doesn't have the label " + labelKey)
ExpectNoError(testutil.VerifyLabelsRemoved(c, nodeName, []string{labelKey}))
ExpectNoError(testutils.VerifyLabelsRemoved(c, nodeName, []string{labelKey}))
}
func AddOrUpdateTaintOnNode(c *client.Client, nodeName string, taint api.Taint) {
@@ -3189,42 +2588,16 @@ func WaitForRCPodsRunning(c *client.Client, ns, rcName string) error {
return err
}
selector := labels.SelectorFromSet(labels.Set(rc.Spec.Selector))
err = WaitForPodsWithLabelRunning(c, ns, selector)
err = testutils.WaitForPodsWithLabelRunning(c, ns, selector)
if err != nil {
return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", rcName, err)
}
return nil
}
// Wait up to 10 minutes for all matching pods to become Running and at least one
// matching pod exists.
func WaitForPodsWithLabelRunning(c *client.Client, ns string, label labels.Selector) error {
running := false
PodStore := NewPodStore(c, ns, label, fields.Everything())
defer PodStore.Stop()
waitLoop:
for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
pods := PodStore.List()
if len(pods) == 0 {
continue waitLoop
}
for _, p := range pods {
if p.Status.Phase != api.PodRunning {
continue waitLoop
}
}
running = true
break
}
if !running {
return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String())
}
return nil
}
// Returns true if all the specified pods are scheduled, else returns false.
func podsWithLabelScheduled(c *client.Client, ns string, label labels.Selector) (bool, error) {
PodStore := NewPodStore(c, ns, label, fields.Everything())
PodStore := testutils.NewPodStore(c, ns, label, fields.Everything())
defer PodStore.Stop()
pods := PodStore.List()
if len(pods) == 0 {
@@ -3383,11 +2756,11 @@ func DeleteRCAndWaitForGC(c *client.Client, ns, name string) error {
// podStoreForRC creates a PodStore that monitors pods belong to the rc. It
// waits until the reflector does a List() before returning.
func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*PodStore, error) {
func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*testutils.PodStore, error) {
labels := labels.SelectorFromSet(rc.Spec.Selector)
ps := NewPodStore(c, rc.Namespace, labels, fields.Everything())
ps := testutils.NewPodStore(c, rc.Namespace, labels, fields.Everything())
err := wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
if len(ps.reflector.LastSyncResourceVersion()) != 0 {
if len(ps.Reflector.LastSyncResourceVersion()) != 0 {
return true, nil
}
return false, nil
@@ -3399,7 +2772,7 @@ func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*PodStore,
// This is to make a fair comparison of deletion time between DeleteRCAndPods
// and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
// when the pod is inactvie.
func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error {
func waitForPodsInactive(ps *testutils.PodStore, interval, timeout time.Duration) error {
return wait.PollImmediate(interval, timeout, func() (bool, error) {
pods := ps.List()
for _, pod := range pods {
@@ -3412,7 +2785,7 @@ func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error {
}
// waitForPodsGone waits until there are no pods left in the PodStore.
func waitForPodsGone(ps *PodStore, interval, timeout time.Duration) error {
func waitForPodsGone(ps *testutils.PodStore, interval, timeout time.Duration) error {
return wait.PollImmediate(interval, timeout, func() (bool, error) {
if pods := ps.List(); len(pods) == 0 {
return true, nil
@@ -3833,38 +3206,6 @@ func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string,
return deployment, err
}
// FailedContainers inspects all containers in a pod and returns failure
// information for containers that have failed or been restarted.
// A map is returned where the key is the containerID and the value is a
// struct containing the restart and failure information
func FailedContainers(pod *api.Pod) map[string]ContainerFailures {
var state ContainerFailures
states := make(map[string]ContainerFailures)
statuses := pod.Status.ContainerStatuses
if len(statuses) == 0 {
return nil
} else {
for _, status := range statuses {
if status.State.Terminated != nil {
states[status.ContainerID] = ContainerFailures{status: status.State.Terminated}
} else if status.LastTerminationState.Terminated != nil {
states[status.ContainerID] = ContainerFailures{status: status.LastTerminationState.Terminated}
}
if status.RestartCount > 0 {
var ok bool
if state, ok = states[status.ContainerID]; !ok {
state = ContainerFailures{}
}
state.Restarts = int(status.RestartCount)
states[status.ContainerID] = state
}
}
}
return states
}
// Prints the histogram of the events and returns the number of bad events.
func BadEvents(events []*api.Event) int {
type histogramKey struct {
@@ -4085,14 +3426,14 @@ func GetSigner(provider string) (ssh.Signer, error) {
// podNames in namespace ns are running and ready, using c and waiting at most
// timeout.
func CheckPodsRunningReady(c *client.Client, ns string, podNames []string, timeout time.Duration) bool {
return CheckPodsCondition(c, ns, podNames, timeout, PodRunningReady, "running and ready")
return CheckPodsCondition(c, ns, podNames, timeout, testutils.PodRunningReady, "running and ready")
}
// CheckPodsRunningReadyOrSucceeded returns whether all pods whose names are
// listed in podNames in namespace ns are running and ready, or succeeded; use
// c and waiting at most timeout.
func CheckPodsRunningReadyOrSucceeded(c *client.Client, ns string, podNames []string, timeout time.Duration) bool {
return CheckPodsCondition(c, ns, podNames, timeout, PodRunningReadyOrSucceeded, "running and ready, or succeeded")
return CheckPodsCondition(c, ns, podNames, timeout, testutils.PodRunningReadyOrSucceeded, "running and ready, or succeeded")
}
// CheckPodsCondition returns whether all pods whose names are listed in podNames
@@ -4692,7 +4033,7 @@ func ScaleRCByLabels(client *client.Client, clientset clientset.Interface, ns st
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
}
} else {
if err := WaitForPodsWithLabelRunning(
if err := testutils.WaitForPodsWithLabelRunning(
client, ns, labels.SelectorFromSet(labels.Set(rc.Spec.Selector))); err != nil {
return err
}