Merge pull request #83549 from danwinship/service-jig-sanitycheck

more e2eservice.TestJig cleanups
This commit is contained in:
Kubernetes Prow Robot
2019-10-18 12:37:31 -07:00
committed by GitHub
10 changed files with 592 additions and 501 deletions

View File

@@ -899,11 +899,12 @@ func testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f *framework.Framew
framework.ExpectNoError(err)
framework.Logf("Creating a service %s with type=LoadBalancer and externalTrafficPolicy=Local in namespace %s", name, ns)
jig := e2eservice.NewTestJig(c, name)
jig := e2eservice.NewTestJig(c, ns, name)
jig.Labels = podLabels
service := jig.CreateLoadBalancerService(ns, name, e2eservice.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) {
service, err := jig.CreateLoadBalancerService(e2eservice.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
})
framework.ExpectNoError(err)
lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0])
svcPort := int(service.Spec.Ports[0].Port)
@@ -921,13 +922,15 @@ func testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f *framework.Framew
defer close(done)
go func() {
defer ginkgo.GinkgoRecover()
expectedNodes := jig.GetEndpointNodeNames(service)
expectedNodes, err := jig.GetEndpointNodeNames()
framework.ExpectNoError(err)
// The affinity policy should ensure that before an old pod is
// deleted, a new pod will have been created on the same node.
// Thus the set of nodes with local endpoints for the service
// should remain unchanged.
wait.Until(func() {
actualNodes := jig.GetEndpointNodeNames(service)
actualNodes, err := jig.GetEndpointNodeNames()
framework.ExpectNoError(err)
if !actualNodes.Equal(expectedNodes) {
framework.Logf("The set of nodes with local endpoints changed; started with %v, now have %v", expectedNodes.List(), actualNodes.List())
failed <- struct{}{}

View File

@@ -841,8 +841,8 @@ func (cont *NginxIngressController) Init() {
// --publish-service flag (see <IngressManifestPath>/nginx/rc.yaml) to make it work in private
// clusters, i.e. clusters where nodes don't have public IPs.
framework.Logf("Creating load balancer service for nginx ingress controller")
serviceJig := e2eservice.NewTestJig(cont.Client, "nginx-ingress-lb")
serviceJig.CreateTCPServiceOrFail(cont.Ns, func(svc *v1.Service) {
serviceJig := e2eservice.NewTestJig(cont.Client, cont.Ns, "nginx-ingress-lb")
_, err := serviceJig.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Spec.Selector = map[string]string{"k8s-app": "nginx-ingress-lb"}
svc.Spec.Ports = []v1.ServicePort{
@@ -850,8 +850,9 @@ func (cont *NginxIngressController) Init() {
{Name: "https", Port: 443},
{Name: "stats", Port: 18080}}
})
cont.lbSvc = serviceJig.WaitForLoadBalancerOrFail(cont.Ns, "nginx-ingress-lb", e2eservice.GetServiceLoadBalancerCreationTimeout(cont.Client))
serviceJig.SanityCheckService(cont.lbSvc, v1.ServiceTypeLoadBalancer)
framework.ExpectNoError(err)
cont.lbSvc, err = serviceJig.WaitForLoadBalancer(e2eservice.GetServiceLoadBalancerCreationTimeout(cont.Client))
framework.ExpectNoError(err)
read := func(file string) string {
return string(testfiles.ReadOrDie(filepath.Join(IngressManifestPath, "nginx", file)))

View File

@@ -15,7 +15,6 @@ go_library(
importpath = "k8s.io/kubernetes/test/e2e/framework/service",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",

View File

@@ -40,7 +40,6 @@ import (
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@@ -54,16 +53,18 @@ var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
// TestJig is a test jig to help service testing.
type TestJig struct {
ID string
Name string
Client clientset.Interface
Labels map[string]string
Client clientset.Interface
Namespace string
Name string
ID string
Labels map[string]string
}
// NewTestJig allocates and inits a new TestJig.
func NewTestJig(client clientset.Interface, name string) *TestJig {
func NewTestJig(client clientset.Interface, namespace, name string) *TestJig {
j := &TestJig{}
j.Client = client
j.Namespace = namespace
j.Name = name
j.ID = j.Name + "-" + string(uuid.NewUUID())
j.Labels = map[string]string{"testid": j.ID}
@@ -74,10 +75,10 @@ func NewTestJig(client clientset.Interface, name string) *TestJig {
// newServiceTemplate returns the default v1.Service template for this j, but
// does not actually create the Service. The default Service has the same name
// as the j and exposes the given port.
func (j *TestJig) newServiceTemplate(namespace string, proto v1.Protocol, port int32) *v1.Service {
func (j *TestJig) newServiceTemplate(proto v1.Protocol, port int32) *v1.Service {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Namespace: j.Namespace,
Name: j.Name,
Labels: j.Labels,
},
@@ -97,54 +98,54 @@ func (j *TestJig) newServiceTemplate(namespace string, proto v1.Protocol, port i
// CreateTCPServiceWithPort creates a new TCP Service with given port based on the
// j's defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *TestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *v1.Service), port int32) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, port)
func (j *TestJig) CreateTCPServiceWithPort(tweak func(svc *v1.Service), port int32) (*v1.Service, error) {
svc := j.newServiceTemplate(v1.ProtocolTCP, port)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
result, err := j.Client.CoreV1().Services(j.Namespace).Create(svc)
if err != nil {
framework.Failf("Failed to create TCP Service %q: %v", svc.Name, err)
return nil, fmt.Errorf("failed to create TCP Service %q: %v", svc.Name, err)
}
return result
return j.sanityCheckService(result, svc.Spec.Type)
}
// CreateTCPServiceOrFail creates a new TCP Service based on the j's
// CreateTCPService creates a new TCP Service based on the j's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *TestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, 80)
func (j *TestJig) CreateTCPService(tweak func(svc *v1.Service)) (*v1.Service, error) {
svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
result, err := j.Client.CoreV1().Services(j.Namespace).Create(svc)
if err != nil {
framework.Failf("Failed to create TCP Service %q: %v", svc.Name, err)
return nil, fmt.Errorf("failed to create TCP Service %q: %v", svc.Name, err)
}
return result
return j.sanityCheckService(result, svc.Spec.Type)
}
// CreateUDPServiceOrFail creates a new UDP Service based on the j's
// CreateUDPService creates a new UDP Service based on the j's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *TestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolUDP, 80)
func (j *TestJig) CreateUDPService(tweak func(svc *v1.Service)) (*v1.Service, error) {
svc := j.newServiceTemplate(v1.ProtocolUDP, 80)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
result, err := j.Client.CoreV1().Services(j.Namespace).Create(svc)
if err != nil {
framework.Failf("Failed to create UDP Service %q: %v", svc.Name, err)
return nil, fmt.Errorf("failed to create UDP Service %q: %v", svc.Name, err)
}
return result
return j.sanityCheckService(result, svc.Spec.Type)
}
// CreateExternalNameServiceOrFail creates a new ExternalName type Service based on the j's defaults.
// CreateExternalNameService creates a new ExternalName type Service based on the j's defaults.
// Callers can provide a function to tweak the Service object before it is created.
func (j *TestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
func (j *TestJig) CreateExternalNameService(tweak func(svc *v1.Service)) (*v1.Service, error) {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Namespace: j.Namespace,
Name: j.Name,
Labels: j.Labels,
},
@@ -157,31 +158,17 @@ func (j *TestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(s
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
result, err := j.Client.CoreV1().Services(j.Namespace).Create(svc)
if err != nil {
framework.Failf("Failed to create ExternalName Service %q: %v", svc.Name, err)
return nil, fmt.Errorf("failed to create ExternalName Service %q: %v", svc.Name, err)
}
return result
}
// CreateServiceWithServicePort creates a new Service with ServicePort.
func (j *TestJig) CreateServiceWithServicePort(labels map[string]string, namespace string, ports []v1.ServicePort) (*v1.Service, error) {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: j.Name,
},
Spec: v1.ServiceSpec{
Selector: labels,
Ports: ports,
},
}
return j.Client.CoreV1().Services(namespace).Create(service)
return j.sanityCheckService(result, svc.Spec.Type)
}
// ChangeServiceType updates the given service's ServiceType to the given newType.
func (j *TestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) {
func (j *TestJig) ChangeServiceType(newType v1.ServiceType, timeout time.Duration) error {
ingressIP := ""
svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) {
svc, err := j.UpdateService(func(s *v1.Service) {
for _, ing := range s.Status.LoadBalancer.Ingress {
if ing.IP != "" {
ingressIP = ing.IP
@@ -190,101 +177,117 @@ func (j *TestJig) ChangeServiceType(namespace, name string, newType v1.ServiceTy
s.Spec.Type = newType
s.Spec.Ports[0].NodePort = 0
})
if ingressIP != "" {
j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
if err != nil {
return err
}
if ingressIP != "" {
_, err = j.WaitForLoadBalancerDestroy(ingressIP, int(svc.Spec.Ports[0].Port), timeout)
}
return err
}
// CreateOnlyLocalNodePortService creates a NodePort service with
// ExternalTrafficPolicy set to Local and sanity checks its nodePort.
// If createPod is true, it also creates an RC with 1 replica of
// the standard netexec container used everywhere in this test.
func (j *TestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service {
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
func (j *TestJig) CreateOnlyLocalNodePortService(createPod bool) (*v1.Service, error) {
ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=NodePort and ExternalTrafficPolicy=Local")
svc, err := j.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
})
if err != nil {
return nil, err
}
if createPod {
ginkgo.By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
ginkgo.By("creating a pod to be part of the service " + j.Name)
_, err = j.Run(nil)
if err != nil {
return nil, err
}
}
j.SanityCheckService(svc, v1.ServiceTypeNodePort)
return svc
return svc, nil
}
// CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
// ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
// If createPod is true, it also creates an RC with 1 replica of
// the standard netexec container used everywhere in this test.
func (j *TestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
tweak func(svc *v1.Service)) *v1.Service {
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local")
j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
func (j *TestJig) CreateOnlyLocalLoadBalancerService(timeout time.Duration, createPod bool,
tweak func(svc *v1.Service)) (*v1.Service, error) {
_, err := j.CreateLoadBalancerService(timeout, func(svc *v1.Service) {
ginkgo.By("setting ExternalTrafficPolicy=Local")
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
if tweak != nil {
tweak(svc)
}
})
if err != nil {
return nil, err
}
if createPod {
ginkgo.By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
ginkgo.By("creating a pod to be part of the service " + j.Name)
_, err = j.Run(nil)
if err != nil {
return nil, err
}
}
ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc := j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
return j.WaitForLoadBalancer(timeout)
}
// CreateLoadBalancerService creates a loadbalancer service and waits
// for it to acquire an ingress IP.
func (j *TestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service {
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
if tweak != nil {
tweak(svc)
}
})
func (j *TestJig) CreateLoadBalancerService(timeout time.Duration, tweak func(svc *v1.Service)) (*v1.Service, error) {
ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
if tweak != nil {
tweak(svc)
}
_, err := j.Client.CoreV1().Services(j.Namespace).Create(svc)
if err != nil {
return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %v", svc.Name, err)
}
ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc := j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
return j.WaitForLoadBalancer(timeout)
}
// GetEndpointNodes returns a map of nodenames:external-ip on which the
// endpoints of the given Service are running.
func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
// endpoints of the Service are running.
func (j *TestJig) GetEndpointNodes() (map[string][]string, error) {
nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
epNodes := j.GetEndpointNodeNames(svc)
if err != nil {
return nil, err
}
epNodes, err := j.GetEndpointNodeNames()
if err != nil {
return nil, err
}
nodeMap := map[string][]string{}
for _, n := range nodes.Items {
if epNodes.Has(n.Name) {
nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
}
}
return nodeMap
return nodeMap, nil
}
// GetEndpointNodeNames returns a string set of node names on which the
// endpoints of the given Service are running.
func (j *TestJig) GetEndpointNodeNames(svc *v1.Service) sets.String {
endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
func (j *TestJig) GetEndpointNodeNames() (sets.String, error) {
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
}
if len(endpoints.Subsets) == 0 {
framework.Failf("Endpoint has no subsets, cannot determine node addresses.")
return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
}
epNodes := sets.NewString()
for _, ss := range endpoints.Subsets {
@@ -294,15 +297,15 @@ func (j *TestJig) GetEndpointNodeNames(svc *v1.Service) sets.String {
}
}
}
return epNodes
return epNodes, nil
}
// WaitForEndpointOnNode waits for a service endpoint on the given node.
func (j *TestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) {
err := wait.PollImmediate(framework.Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
endpoints, err := j.Client.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
func (j *TestJig) WaitForEndpointOnNode(nodeName string) error {
return wait.PollImmediate(framework.Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
framework.Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
return false, nil
}
if len(endpoints.Subsets) == 0 {
@@ -315,20 +318,19 @@ func (j *TestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string)
return false, nil
}
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName)
framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
if epHostName != nodeName {
framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
return false, nil
}
return true, nil
})
framework.ExpectNoError(err)
}
// WaitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
func (j *TestJig) WaitForAvailableEndpoint(namespace, serviceName string, timeout time.Duration) {
func (j *TestJig) WaitForAvailableEndpoint(timeout time.Duration) error {
//Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
endpointSelector := fields.OneTermEqualSelector("metadata.name", serviceName)
endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
stopCh := make(chan struct{})
endpointAvailable := false
var controller cache.Controller
@@ -336,12 +338,12 @@ func (j *TestJig) WaitForAvailableEndpoint(namespace, serviceName string, timeou
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = endpointSelector.String()
obj, err := j.Client.CoreV1().Endpoints(namespace).List(options)
obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = endpointSelector.String()
return j.Client.CoreV1().Endpoints(namespace).Watch(options)
return j.Client.CoreV1().Endpoints(j.Namespace).Watch(options)
},
},
&v1.Endpoints{},
@@ -372,25 +374,32 @@ func (j *TestJig) WaitForAvailableEndpoint(namespace, serviceName string, timeou
err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
return endpointAvailable, nil
})
framework.ExpectNoError(err, "No subset of available IP address found for the endpoint %s within timeout %v", serviceName, timeout)
if err != nil {
return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)
}
return nil
}
// SanityCheckService performs sanity checks on the given service
func (j *TestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
// sanityCheckService performs sanity checks on the given service; in particular, ensuring
// that creating/updating a service allocates IPs, ports, etc, as needed.
func (j *TestJig) sanityCheckService(svc *v1.Service, svcType v1.ServiceType) (*v1.Service, error) {
if svcType == "" {
svcType = v1.ServiceTypeClusterIP
}
if svc.Spec.Type != svcType {
framework.Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
return nil, fmt.Errorf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
}
if svcType != v1.ServiceTypeExternalName {
if svc.Spec.ExternalName != "" {
framework.Failf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
return nil, fmt.Errorf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
}
if svc.Spec.ClusterIP != api.ClusterIPNone && svc.Spec.ClusterIP == "" {
framework.Failf("didn't get ClusterIP for non-ExternamName service")
if svc.Spec.ClusterIP == "" {
return nil, fmt.Errorf("didn't get ClusterIP for non-ExternalName service")
}
} else {
if svc.Spec.ClusterIP != "" {
framework.Failf("unexpected Spec.ClusterIP (%s) for ExternamName service, expected empty", svc.Spec.ClusterIP)
return nil, fmt.Errorf("unexpected Spec.ClusterIP (%s) for ExternalName service, expected empty", svc.Spec.ClusterIP)
}
}
@@ -401,11 +410,11 @@ func (j *TestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
for i, port := range svc.Spec.Ports {
hasNodePort := (port.NodePort != 0)
if hasNodePort != expectNodePorts {
framework.Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
return nil, fmt.Errorf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
}
if hasNodePort {
if !NodePortRange.Contains(int(port.NodePort)) {
framework.Failf("out-of-range nodePort (%d) for service", port.NodePort)
return nil, fmt.Errorf("out-of-range nodePort (%d) for service", port.NodePort)
}
}
}
@@ -415,53 +424,44 @@ func (j *TestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
}
hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
if hasIngress != expectIngress {
framework.Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
return nil, fmt.Errorf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
}
if hasIngress {
for i, ing := range svc.Status.LoadBalancer.Ingress {
if ing.IP == "" && ing.Hostname == "" {
framework.Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
}
}
}
return svc, nil
}
// UpdateService fetches a service, calls the update function on it, and
// then attempts to send the updated service. It tries up to 3 times in the
// face of timeouts and conflicts.
func (j *TestJig) UpdateService(namespace, name string, update func(*v1.Service)) (*v1.Service, error) {
func (j *TestJig) UpdateService(update func(*v1.Service)) (*v1.Service, error) {
for i := 0; i < 3; i++ {
service, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
service, err := j.Client.CoreV1().Services(j.Namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Service %q: %v", name, err)
return nil, fmt.Errorf("failed to get Service %q: %v", j.Name, err)
}
update(service)
service, err = j.Client.CoreV1().Services(namespace).Update(service)
result, err := j.Client.CoreV1().Services(j.Namespace).Update(service)
if err == nil {
return service, nil
return j.sanityCheckService(result, service.Spec.Type)
}
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
return nil, fmt.Errorf("failed to update Service %q: %v", name, err)
return nil, fmt.Errorf("failed to update Service %q: %v", j.Name, err)
}
}
return nil, fmt.Errorf("too many retries updating Service %q", name)
return nil, fmt.Errorf("too many retries updating Service %q", j.Name)
}
// UpdateServiceOrFail fetches a service, calls the update function on it, and
// then attempts to send the updated service. It tries up to 3 times in the
// face of timeouts and conflicts.
func (j *TestJig) UpdateServiceOrFail(namespace, name string, update func(*v1.Service)) *v1.Service {
svc, err := j.UpdateService(namespace, name, update)
if err != nil {
framework.Failf(err.Error())
}
return svc
}
// WaitForNewIngressIPOrFail waits for the given service to get a new ingress IP, or fails after the given timeout
func (j *TestJig) WaitForNewIngressIPOrFail(namespace, name, existingIP string, timeout time.Duration) *v1.Service {
framework.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have a new ingress IP", func(svc *v1.Service) bool {
// WaitForNewIngressIP waits for the given service to get a new ingress IP, or returns an error after the given timeout
func (j *TestJig) WaitForNewIngressIP(existingIP string, timeout time.Duration) (*v1.Service, error) {
framework.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, j.Name)
service, err := j.waitForCondition(timeout, "have a new ingress IP", func(svc *v1.Service) bool {
if len(svc.Status.LoadBalancer.Ingress) == 0 {
return false
}
@@ -471,18 +471,21 @@ func (j *TestJig) WaitForNewIngressIPOrFail(namespace, name, existingIP string,
}
return true
})
return service
if err != nil {
return nil, err
}
return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
}
// ChangeServiceNodePortOrFail changes node ports of the given service.
func (j *TestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *v1.Service {
// ChangeServiceNodePort changes node ports of the given service.
func (j *TestJig) ChangeServiceNodePort(initial int) (*v1.Service, error) {
var err error
var service *v1.Service
for i := 1; i < NodePortRange.Size; i++ {
offs1 := initial - NodePortRange.Base
offs2 := (offs1 + i) % NodePortRange.Size
newPort := NodePortRange.Base + offs2
service, err = j.UpdateService(namespace, name, func(s *v1.Service) {
service, err = j.UpdateService(func(s *v1.Service) {
s.Spec.Ports[0].NodePort = int32(newPort)
})
if err != nil && strings.Contains(err.Error(), portallocator.ErrAllocated.Error()) {
@@ -492,23 +495,23 @@ func (j *TestJig) ChangeServiceNodePortOrFail(namespace, name string, initial in
// Otherwise err was nil or err was a real error
break
}
if err != nil {
framework.Failf("Could not change the nodePort: %v", err)
}
return service
return service, err
}
// WaitForLoadBalancerOrFail waits the given service to have a LoadBalancer, or fails after the given timeout
func (j *TestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *v1.Service {
framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have a load balancer", func(svc *v1.Service) bool {
// WaitForLoadBalancer waits the given service to have a LoadBalancer, or returns an error after the given timeout
func (j *TestJig) WaitForLoadBalancer(timeout time.Duration) (*v1.Service, error) {
framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, j.Name)
service, err := j.waitForCondition(timeout, "have a load balancer", func(svc *v1.Service) bool {
return len(svc.Status.LoadBalancer.Ingress) > 0
})
return service
if err != nil {
return nil, err
}
return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
}
// WaitForLoadBalancerDestroyOrFail waits the given service to destroy a LoadBalancer, or fails after the given timeout
func (j *TestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *v1.Service {
// WaitForLoadBalancerDestroy waits the given service to destroy a LoadBalancer, or returns an error after the given timeout
func (j *TestJig) WaitForLoadBalancerDestroy(ip string, port int, timeout time.Duration) (*v1.Service, error) {
// TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
defer func() {
if err := framework.EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
@@ -516,17 +519,20 @@ func (j *TestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip st
}
}()
framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have no load balancer", func(svc *v1.Service) bool {
framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, j.Name)
service, err := j.waitForCondition(timeout, "have no load balancer", func(svc *v1.Service) bool {
return len(svc.Status.LoadBalancer.Ingress) == 0
})
return service
if err != nil {
return nil, err
}
return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
}
func (j *TestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1.Service) bool) *v1.Service {
func (j *TestJig) waitForCondition(timeout time.Duration, message string, conditionFn func(*v1.Service) bool) (*v1.Service, error) {
var service *v1.Service
pollFunc := func() (bool, error) {
svc, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
svc, err := j.Client.CoreV1().Services(j.Namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
@@ -537,21 +543,21 @@ func (j *TestJig) waitForConditionOrFail(namespace, name string, timeout time.Du
return false, nil
}
if err := wait.PollImmediate(framework.Poll, timeout, pollFunc); err != nil {
framework.Failf("Timed out waiting for service %q to %s", name, message)
return nil, fmt.Errorf("timed out waiting for service %q to %s", j.Name, message)
}
return service
return service, nil
}
// newRCTemplate returns the default v1.ReplicationController object for
// this j, but does not actually create the RC. The default RC has the same
// name as the j and runs the "netexec" container.
func (j *TestJig) newRCTemplate(namespace string) *v1.ReplicationController {
func (j *TestJig) newRCTemplate() *v1.ReplicationController {
var replicas int32 = 1
var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
rc := &v1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Namespace: j.Namespace,
Name: j.Name,
Labels: j.Labels,
},
@@ -607,29 +613,29 @@ func (j *TestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
})
}
// CreatePDBOrFail returns a PodDisruptionBudget for the given ReplicationController, or fails if a PodDisruptionBudget isn't ready
func (j *TestJig) CreatePDBOrFail(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
pdb := j.newPDBTemplate(namespace, rc)
newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Create(pdb)
// CreatePDB returns a PodDisruptionBudget for the given ReplicationController, or returns an error if a PodDisruptionBudget isn't ready
func (j *TestJig) CreatePDB(rc *v1.ReplicationController) (*policyv1beta1.PodDisruptionBudget, error) {
pdb := j.newPDBTemplate(rc)
newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(j.Namespace).Create(pdb)
if err != nil {
framework.Failf("Failed to create PDB %q %v", pdb.Name, err)
return nil, fmt.Errorf("failed to create PDB %q %v", pdb.Name, err)
}
if err := j.waitForPdbReady(namespace); err != nil {
framework.Failf("Failed waiting for PDB to be ready: %v", err)
if err := j.waitForPdbReady(); err != nil {
return nil, fmt.Errorf("failed waiting for PDB to be ready: %v", err)
}
return newPdb
return newPdb, nil
}
// newPDBTemplate returns the default policyv1beta1.PodDisruptionBudget object for
// this j, but does not actually create the PDB. The default PDB specifies a
// MinAvailable of N-1 and matches the pods created by the RC.
func (j *TestJig) newPDBTemplate(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
func (j *TestJig) newPDBTemplate(rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
minAvailable := intstr.FromInt(int(*rc.Spec.Replicas) - 1)
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Namespace: j.Namespace,
Name: j.Name,
Labels: j.Labels,
},
@@ -642,54 +648,55 @@ func (j *TestJig) newPDBTemplate(namespace string, rc *v1.ReplicationController)
return pdb
}
// RunOrFail creates a ReplicationController and Pod(s) and waits for the
// Run creates a ReplicationController and Pod(s) and waits for the
// Pod(s) to be running. Callers can provide a function to tweak the RC object
// before it is created.
func (j *TestJig) RunOrFail(namespace string, tweak func(rc *v1.ReplicationController)) *v1.ReplicationController {
rc := j.newRCTemplate(namespace)
func (j *TestJig) Run(tweak func(rc *v1.ReplicationController)) (*v1.ReplicationController, error) {
rc := j.newRCTemplate()
if tweak != nil {
tweak(rc)
}
result, err := j.Client.CoreV1().ReplicationControllers(namespace).Create(rc)
result, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).Create(rc)
if err != nil {
framework.Failf("Failed to create RC %q: %v", rc.Name, err)
return nil, fmt.Errorf("failed to create RC %q: %v", rc.Name, err)
}
pods, err := j.waitForPodsCreated(namespace, int(*(rc.Spec.Replicas)))
pods, err := j.waitForPodsCreated(int(*(rc.Spec.Replicas)))
if err != nil {
framework.Failf("Failed to create pods: %v", err)
return nil, fmt.Errorf("failed to create pods: %v", err)
}
if err := j.waitForPodsReady(namespace, pods); err != nil {
framework.Failf("Failed waiting for pods to be running: %v", err)
if err := j.waitForPodsReady(pods); err != nil {
return nil, fmt.Errorf("failed waiting for pods to be running: %v", err)
}
return result
return result, nil
}
// Scale scales pods to the given replicas
func (j *TestJig) Scale(namespace string, replicas int) {
func (j *TestJig) Scale(replicas int) error {
rc := j.Name
scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{})
scale, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).GetScale(rc, metav1.GetOptions{})
if err != nil {
framework.Failf("Failed to get scale for RC %q: %v", rc, err)
return fmt.Errorf("failed to get scale for RC %q: %v", rc, err)
}
scale.Spec.Replicas = int32(replicas)
_, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale)
_, err = j.Client.CoreV1().ReplicationControllers(j.Namespace).UpdateScale(rc, scale)
if err != nil {
framework.Failf("Failed to scale RC %q: %v", rc, err)
return fmt.Errorf("failed to scale RC %q: %v", rc, err)
}
pods, err := j.waitForPodsCreated(namespace, replicas)
pods, err := j.waitForPodsCreated(replicas)
if err != nil {
framework.Failf("Failed waiting for pods: %v", err)
return fmt.Errorf("failed waiting for pods: %v", err)
}
if err := j.waitForPodsReady(namespace, pods); err != nil {
framework.Failf("Failed waiting for pods to be running: %v", err)
if err := j.waitForPodsReady(pods); err != nil {
return fmt.Errorf("failed waiting for pods to be running: %v", err)
}
return nil
}
func (j *TestJig) waitForPdbReady(namespace string) error {
func (j *TestJig) waitForPdbReady() error {
timeout := 2 * time.Minute
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Get(j.Name, metav1.GetOptions{})
pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(j.Namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
return err
}
@@ -701,14 +708,14 @@ func (j *TestJig) waitForPdbReady(namespace string) error {
return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
}
func (j *TestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
func (j *TestJig) waitForPodsCreated(replicas int) ([]string, error) {
timeout := 2 * time.Minute
// List the pods, making sure we observe all the replicas.
label := labels.SelectorFromSet(labels.Set(j.Labels))
framework.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := j.Client.CoreV1().Pods(namespace).List(options)
pods, err := j.Client.CoreV1().Pods(j.Namespace).List(options)
if err != nil {
return nil, err
}
@@ -729,28 +736,31 @@ func (j *TestJig) waitForPodsCreated(namespace string, replicas int) ([]string,
return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
}
func (j *TestJig) waitForPodsReady(namespace string, pods []string) error {
func (j *TestJig) waitForPodsReady(pods []string) error {
timeout := 2 * time.Minute
if !e2epod.CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
if !e2epod.CheckPodsRunningReady(j.Client, j.Namespace, pods, timeout) {
return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
}
return nil
}
func testReachabilityOverServiceName(serviceName string, sp v1.ServicePort, execPod *v1.Pod) {
testEndpointReachability(serviceName, sp.Port, sp.Protocol, execPod)
func testReachabilityOverServiceName(serviceName string, sp v1.ServicePort, execPod *v1.Pod) error {
return testEndpointReachability(serviceName, sp.Port, sp.Protocol, execPod)
}
func testReachabilityOverClusterIP(clusterIP string, sp v1.ServicePort, execPod *v1.Pod) {
func testReachabilityOverClusterIP(clusterIP string, sp v1.ServicePort, execPod *v1.Pod) error {
// If .spec.clusterIP is set to "" or "None" for service, ClusterIP is not created, so reachability can not be tested over clusterIP:servicePort
isClusterIPV46, err := regexp.MatchString(framework.RegexIPv4+"||"+framework.RegexIPv6, clusterIP)
framework.ExpectNoError(err, "Unable to parse ClusterIP: %s", clusterIP)
if isClusterIPV46 {
testEndpointReachability(clusterIP, sp.Port, sp.Protocol, execPod)
if err != nil {
return fmt.Errorf("unable to parse ClusterIP: %s", clusterIP)
}
if isClusterIPV46 {
return testEndpointReachability(clusterIP, sp.Port, sp.Protocol, execPod)
}
return nil
}
func testReachabilityOverNodePorts(nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod) {
func testReachabilityOverNodePorts(nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod) error {
internalAddrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
externalAddrs := e2enode.CollectAddresses(nodes, v1.NodeExternalIP)
for _, internalAddr := range internalAddrs {
@@ -760,11 +770,18 @@ func testReachabilityOverNodePorts(nodes *v1.NodeList, sp v1.ServicePort, pod *v
framework.Logf("skipping testEndpointReachability() for internal adddress %s", internalAddr)
continue
}
testEndpointReachability(internalAddr, sp.NodePort, sp.Protocol, pod)
err := testEndpointReachability(internalAddr, sp.NodePort, sp.Protocol, pod)
if err != nil {
return err
}
}
for _, externalAddr := range externalAddrs {
testEndpointReachability(externalAddr, sp.NodePort, sp.Protocol, pod)
err := testEndpointReachability(externalAddr, sp.NodePort, sp.Protocol, pod)
if err != nil {
return err
}
}
return nil
}
// isInvalidOrLocalhostAddress returns `true` if the provided `ip` is either not
@@ -780,7 +797,7 @@ func isInvalidOrLocalhostAddress(ip string) bool {
// testEndpointReachability tests reachability to endpoints (i.e. IP, ServiceName) and ports. Test request is initiated from specified execPod.
// TCP and UDP protocol based service are supported at this moment
// TODO: add support to test SCTP Protocol based services.
func testEndpointReachability(endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) {
func testEndpointReachability(endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) error {
ep := net.JoinHostPort(endpoint, strconv.Itoa(int(port)))
cmd := ""
switch protocol {
@@ -789,32 +806,44 @@ func testEndpointReachability(endpoint string, port int32, protocol v1.Protocol,
case v1.ProtocolUDP:
cmd = fmt.Sprintf("nc -zv -u -w 2 %s %v", endpoint, port)
default:
framework.Failf("Service reachablity check is not supported for %v", protocol)
return fmt.Errorf("service reachablity check is not supported for %v", protocol)
}
if cmd != "" {
err := wait.PollImmediate(1*time.Second, ServiceReachabilityShortPollTimeout, func() (bool, error) {
if _, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil {
framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
return false, nil
}
return true, nil
})
framework.ExpectNoError(err, "Service is not reachable within %v timeout on endpoint %s over %s protocol", ServiceReachabilityShortPollTimeout, ep, protocol)
err := wait.PollImmediate(1*time.Second, ServiceReachabilityShortPollTimeout, func() (bool, error) {
if _, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil {
framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("service is not reachable within %v timeout on endpoint %s over %s protocol", ServiceReachabilityShortPollTimeout, ep, protocol)
}
return nil
}
// checkClusterIPServiceReachability ensures that service of type ClusterIP is reachable over
// - ServiceName:ServicePort, ClusterIP:ServicePort
func (j *TestJig) checkClusterIPServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) {
func (j *TestJig) checkClusterIPServiceReachability(svc *v1.Service, pod *v1.Pod) error {
clusterIP := svc.Spec.ClusterIP
servicePorts := svc.Spec.Ports
j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout)
err := j.WaitForAvailableEndpoint(ServiceEndpointsTimeout)
if err != nil {
return err
}
for _, servicePort := range servicePorts {
testReachabilityOverServiceName(svc.Name, servicePort, pod)
testReachabilityOverClusterIP(clusterIP, servicePort, pod)
err = testReachabilityOverServiceName(svc.Name, servicePort, pod)
if err != nil {
return err
}
err = testReachabilityOverClusterIP(clusterIP, servicePort, pod)
if err != nil {
return err
}
}
return nil
}
// checkNodePortServiceReachability ensures that service of type nodePort are reachable
@@ -822,80 +851,100 @@ func (j *TestJig) checkClusterIPServiceReachability(namespace string, svc *v1.Se
// ServiceName:ServicePort, ClusterIP:ServicePort and NodeInternalIPs:NodePort
// - External clients should be reachable to service over -
// NodePublicIPs:NodePort
func (j *TestJig) checkNodePortServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) {
func (j *TestJig) checkNodePortServiceReachability(svc *v1.Service, pod *v1.Pod) error {
clusterIP := svc.Spec.ClusterIP
servicePorts := svc.Spec.Ports
// Consider only 2 nodes for testing
nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, 2)
framework.ExpectNoError(err)
if err != nil {
return err
}
j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout)
err = j.WaitForAvailableEndpoint(ServiceEndpointsTimeout)
if err != nil {
return err
}
for _, servicePort := range servicePorts {
testReachabilityOverServiceName(svc.Name, servicePort, pod)
testReachabilityOverClusterIP(clusterIP, servicePort, pod)
testReachabilityOverNodePorts(nodes, servicePort, pod)
err = testReachabilityOverServiceName(svc.Name, servicePort, pod)
if err != nil {
return err
}
err = testReachabilityOverClusterIP(clusterIP, servicePort, pod)
if err != nil {
return err
}
err = testReachabilityOverNodePorts(nodes, servicePort, pod)
if err != nil {
return err
}
}
return nil
}
// checkExternalServiceReachability ensures service of type externalName resolves to IP address and no fake externalName is set
// FQDN of kubernetes is used as externalName(for air tight platforms).
func (j *TestJig) checkExternalServiceReachability(svc *v1.Service, pod *v1.Pod) {
func (j *TestJig) checkExternalServiceReachability(svc *v1.Service, pod *v1.Pod) error {
// Service must resolve to IP
cmd := fmt.Sprintf("nslookup %s", svc.Name)
_, err := framework.RunHostCmd(pod.Namespace, pod.Name, cmd)
framework.ExpectNoError(err, "ExternalName service must resolve to IP")
if err != nil {
return fmt.Errorf("ExternalName service %q must resolve to IP", pod.Namespace+"/"+pod.Name)
}
return nil
}
// CheckServiceReachability ensures that request are served by the services. Only supports Services with type ClusterIP, NodePort and ExternalName.
func (j *TestJig) CheckServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) {
func (j *TestJig) CheckServiceReachability(svc *v1.Service, pod *v1.Pod) error {
svcType := svc.Spec.Type
j.SanityCheckService(svc, svcType)
_, err := j.sanityCheckService(svc, svcType)
if err != nil {
return err
}
switch svcType {
case v1.ServiceTypeClusterIP:
j.checkClusterIPServiceReachability(namespace, svc, pod)
return j.checkClusterIPServiceReachability(svc, pod)
case v1.ServiceTypeNodePort:
j.checkNodePortServiceReachability(namespace, svc, pod)
return j.checkNodePortServiceReachability(svc, pod)
case v1.ServiceTypeExternalName:
j.checkExternalServiceReachability(svc, pod)
return j.checkExternalServiceReachability(svc, pod)
default:
framework.Failf("Unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type.", svcType, svc.Name)
return fmt.Errorf("unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type", svcType, svc.Name)
}
}
// CreateServicePods creates a replication controller with the label same as service. Service listens to HTTP.
func (j *TestJig) CreateServicePods(c clientset.Interface, ns string, replica int) {
func (j *TestJig) CreateServicePods(replica int) error {
config := testutils.RCConfig{
Client: c,
Client: j.Client,
Name: j.Name,
Image: framework.ServeHostnameImage,
Command: []string{"/agnhost", "serve-hostname"},
Namespace: ns,
Namespace: j.Namespace,
Labels: j.Labels,
PollInterval: 3 * time.Second,
Timeout: framework.PodReadyBeforeTimeout,
Replicas: replica,
}
err := framework.RunRC(config)
framework.ExpectNoError(err, "Replica must be created")
return framework.RunRC(config)
}
// CreateTCPUDPServicePods creates a replication controller with the label same as service. Service listens to TCP and UDP.
func (j *TestJig) CreateTCPUDPServicePods(c clientset.Interface, ns string, replica int) {
func (j *TestJig) CreateTCPUDPServicePods(replica int) error {
config := testutils.RCConfig{
Client: c,
Client: j.Client,
Name: j.Name,
Image: framework.ServeHostnameImage,
Command: []string{"/agnhost", "serve-hostname", "--http=false", "--tcp", "--udp"},
Namespace: ns,
Namespace: j.Namespace,
Labels: j.Labels,
PollInterval: 3 * time.Second,
Timeout: framework.PodReadyBeforeTimeout,
Replicas: replica,
}
err := framework.RunRC(config)
framework.ExpectNoError(err, "Replica must be created")
return framework.RunRC(config)
}

View File

@@ -210,7 +210,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
ginkgo.It("should create service with cluster ip from primary service range [Feature:IPv6DualStackAlphaFeature:Phase2]", func() {
serviceName := "defaultclusterip"
ns := f.Namespace.Name
jig := e2eservice.NewTestJig(cs, serviceName)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
defaultIPFamily := v1.IPv4Protocol
if framework.TestContext.ClusterIsIPv6() {
@@ -229,7 +229,8 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
service := createService(t.ServiceName, t.Namespace, t.Labels, nil)
jig.Labels = t.Labels
jig.CreateServicePods(cs, ns, 2)
err := jig.CreateServicePods(2)
framework.ExpectNoError(err)
svc, err := t.CreateService(service)
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
@@ -251,7 +252,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
ns := f.Namespace.Name
ipv4 := v1.IPv4Protocol
jig := e2eservice.NewTestJig(cs, serviceName)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
t := e2eservice.NewServerTest(cs, ns, serviceName)
defer func() {
@@ -265,7 +266,8 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
service := createService(t.ServiceName, t.Namespace, t.Labels, &ipv4)
jig.Labels = t.Labels
jig.CreateServicePods(cs, ns, 2)
err := jig.CreateServicePods(2)
framework.ExpectNoError(err)
svc, err := t.CreateService(service)
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
@@ -287,7 +289,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
ns := f.Namespace.Name
ipv6 := v1.IPv6Protocol
jig := e2eservice.NewTestJig(cs, serviceName)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
t := e2eservice.NewServerTest(cs, ns, serviceName)
defer func() {
@@ -301,7 +303,8 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
service := createService(t.ServiceName, t.Namespace, t.Labels, &ipv6)
jig.Labels = t.Labels
jig.CreateServicePods(cs, ns, 2)
err := jig.CreateServicePods(2)
framework.ExpectNoError(err)
svc, err := t.CreateService(service)
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)

View File

@@ -71,7 +71,7 @@ var _ = SIGDescribe("Firewall rule", func() {
framework.ExpectNoError(err)
framework.Logf("Got cluster ID: %v", clusterID)
jig := e2eservice.NewTestJig(cs, serviceName)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(cs, e2eservice.MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
@@ -82,15 +82,17 @@ var _ = SIGDescribe("Firewall rule", func() {
nodesSet := sets.NewString(nodesNames...)
ginkgo.By("Creating a LoadBalancer type service with ExternalTrafficPolicy=Global")
svc := jig.CreateLoadBalancerService(ns, serviceName, e2eservice.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) {
svc, err := jig.CreateLoadBalancerService(e2eservice.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) {
svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: firewallTestHTTPPort}}
svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges
})
framework.ExpectNoError(err)
defer func() {
jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
_, err = jig.UpdateService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.LoadBalancerSourceRanges = nil
})
framework.ExpectNoError(err)
err = cs.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil)
framework.ExpectNoError(err)
ginkgo.By("Waiting for the local traffic health check firewall rule to be deleted")
@@ -116,9 +118,10 @@ var _ = SIGDescribe("Firewall rule", func() {
// OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE
ginkgo.By("Updating LoadBalancer service to ExternalTrafficPolicy=Local")
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
svc, err = jig.UpdateService(func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
})
framework.ExpectNoError(err)
ginkgo.By("Waiting for the nodes health check firewall rule to be deleted")
_, err = gce.WaitForFirewallRule(gceCloud, nodesHCFw.Name, false, e2eservice.LoadBalancerCleanupTimeout)

View File

@@ -65,17 +65,19 @@ var _ = SIGDescribe("Services [Feature:GCEAlphaFeature][Slow]", func() {
svcName := "net-tiers-svc"
ns := f.Namespace.Name
jig := e2eservice.NewTestJig(cs, svcName)
jig := e2eservice.NewTestJig(cs, ns, svcName)
ginkgo.By("creating a pod to be part of the service " + svcName)
jig.RunOrFail(ns, nil)
_, err := jig.Run(nil)
framework.ExpectNoError(err)
// Test 1: create a standard tiered LB for the Service.
ginkgo.By("creating a Service of type LoadBalancer using the standard network tier")
svc := jig.CreateTCPServiceOrFail(ns, func(svc *v1.Service) {
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
setNetworkTier(svc, string(gcecloud.NetworkTierAnnotationStandard))
})
framework.ExpectNoError(err)
// Verify that service has been updated properly.
svcTier, err := gcecloud.GetServiceNetworkTier(svc)
framework.ExpectNoError(err)
@@ -84,13 +86,14 @@ var _ = SIGDescribe("Services [Feature:GCEAlphaFeature][Slow]", func() {
serviceLBNames = append(serviceLBNames, cloudprovider.DefaultLoadBalancerName(svc))
// Wait and verify the LB.
ingressIP := waitAndVerifyLBWithTier(jig, ns, svcName, "", createTimeout, lagTimeout)
ingressIP := waitAndVerifyLBWithTier(jig, "", createTimeout, lagTimeout)
// Test 2: re-create a LB of a different tier for the updated Service.
ginkgo.By("updating the Service to use the premium (default) tier")
svc = jig.UpdateServiceOrFail(ns, svcName, func(svc *v1.Service) {
svc, err = jig.UpdateService(func(svc *v1.Service) {
clearNetworkTier(svc)
})
framework.ExpectNoError(err)
// Verify that service has been updated properly.
svcTier, err = gcecloud.GetServiceNetworkTier(svc)
framework.ExpectNoError(err)
@@ -98,7 +101,7 @@ var _ = SIGDescribe("Services [Feature:GCEAlphaFeature][Slow]", func() {
// Wait until the ingress IP changes. Each tier has its own pool of
// IPs, so changing tiers implies changing IPs.
ingressIP = waitAndVerifyLBWithTier(jig, ns, svcName, ingressIP, createTimeout, lagTimeout)
ingressIP = waitAndVerifyLBWithTier(jig, ingressIP, createTimeout, lagTimeout)
// Test 3: create a standard-tierd LB with a user-requested IP.
ginkgo.By("reserving a static IP for the load balancer")
@@ -119,10 +122,11 @@ var _ = SIGDescribe("Services [Feature:GCEAlphaFeature][Slow]", func() {
framework.Logf("Allocated static IP to be used by the load balancer: %q", requestedIP)
ginkgo.By("updating the Service to use the standard tier with a requested IP")
svc = jig.UpdateServiceOrFail(ns, svc.Name, func(svc *v1.Service) {
svc, err = jig.UpdateService(func(svc *v1.Service) {
svc.Spec.LoadBalancerIP = requestedIP
setNetworkTier(svc, string(gcecloud.NetworkTierAnnotationStandard))
})
framework.ExpectNoError(err)
// Verify that service has been updated properly.
framework.ExpectEqual(svc.Spec.LoadBalancerIP, requestedIP)
svcTier, err = gcecloud.GetServiceNetworkTier(svc)
@@ -130,20 +134,15 @@ var _ = SIGDescribe("Services [Feature:GCEAlphaFeature][Slow]", func() {
framework.ExpectEqual(svcTier, cloud.NetworkTierStandard)
// Wait until the ingress IP changes and verifies the LB.
ingressIP = waitAndVerifyLBWithTier(jig, ns, svcName, ingressIP, createTimeout, lagTimeout)
ingressIP = waitAndVerifyLBWithTier(jig, ingressIP, createTimeout, lagTimeout)
})
})
func waitAndVerifyLBWithTier(jig *e2eservice.TestJig, ns, svcName, existingIP string, waitTimeout, checkTimeout time.Duration) string {
var svc *v1.Service
if existingIP == "" {
// Creating the LB for the first time; wait for any ingress IP to show
// up.
svc = jig.WaitForNewIngressIPOrFail(ns, svcName, existingIP, waitTimeout)
} else {
// Re-creating the LB; wait for the ingress IP to change.
svc = jig.WaitForNewIngressIPOrFail(ns, svcName, existingIP, waitTimeout)
}
func waitAndVerifyLBWithTier(jig *e2eservice.TestJig, existingIP string, waitTimeout, checkTimeout time.Duration) string {
// If existingIP is "" this will wait for any ingress IP to show up. Otherwise
// it will wait for the ingress IP to change to something different.
svc, err := jig.WaitForNewIngressIP(existingIP, waitTimeout)
framework.ExpectNoError(err)
svcPort := int(svc.Spec.Ports[0].Port)
lbIngress := &svc.Status.LoadBalancer.Ingress[0]
@@ -154,7 +153,6 @@ func waitAndVerifyLBWithTier(jig *e2eservice.TestJig, ns, svcName, existingIP st
// Verify that the new ingress IP is the requested IP if it's set.
framework.ExpectEqual(ingressIP, svc.Spec.LoadBalancerIP)
}
jig.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
// If the IP has been used by previous test, sometimes we get the lingering
// 404 errors even after the LB is long gone. Tolerate and retry until the
// the new LB is fully established since this feature is still Alpha in GCP.

File diff suppressed because it is too large Load Diff

View File

@@ -43,27 +43,30 @@ func shouldTestPDBs() bool { return framework.ProviderIs("gce", "gke") }
// Setup creates a service with a load balancer and makes sure it's reachable.
func (t *ServiceUpgradeTest) Setup(f *framework.Framework) {
serviceName := "service-test"
jig := e2eservice.NewTestJig(f.ClientSet, serviceName)
jig := e2eservice.NewTestJig(f.ClientSet, f.Namespace.Name, serviceName)
ns := f.Namespace
ginkgo.By("creating a TCP service " + serviceName + " with type=LoadBalancer in namespace " + ns.Name)
tcpService := jig.CreateTCPServiceOrFail(ns.Name, func(s *v1.Service) {
tcpService, err := jig.CreateTCPService(func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeLoadBalancer
})
tcpService = jig.WaitForLoadBalancerOrFail(ns.Name, tcpService.Name, e2eservice.LoadBalancerCreateTimeoutDefault)
jig.SanityCheckService(tcpService, v1.ServiceTypeLoadBalancer)
framework.ExpectNoError(err)
tcpService, err = jig.WaitForLoadBalancer(e2eservice.LoadBalancerCreateTimeoutDefault)
framework.ExpectNoError(err)
// Get info to hit it with
tcpIngressIP := e2eservice.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
svcPort := int(tcpService.Spec.Ports[0].Port)
ginkgo.By("creating pod to be part of service " + serviceName)
rc := jig.RunOrFail(ns.Name, jig.AddRCAntiAffinity)
rc, err := jig.Run(jig.AddRCAntiAffinity)
framework.ExpectNoError(err)
if shouldTestPDBs() {
ginkgo.By("creating a PodDisruptionBudget to cover the ReplicationController")
jig.CreatePDBOrFail(ns.Name, rc)
_, err = jig.CreatePDB(rc)
framework.ExpectNoError(err)
}
// Hit it once before considering ourselves ready
@@ -111,11 +114,9 @@ func (t *ServiceUpgradeTest) test(f *framework.Framework, done <-chan struct{},
<-done
}
// Sanity check and hit it once more
// Hit it once more
ginkgo.By("hitting the pod through the service's LoadBalancer")
e2eservice.TestReachableHTTP(t.tcpIngressIP, t.svcPort, e2eservice.LoadBalancerLagTimeoutDefault)
t.jig.SanityCheckService(t.tcpService, v1.ServiceTypeLoadBalancer)
if testFinalizer {
defer func() {
ginkgo.By("Check that service can be deleted with finalizer")

View File

@@ -42,23 +42,21 @@ var _ = SIGDescribe("Services", func() {
serviceName := "nodeport-test"
ns := f.Namespace.Name
jig := e2eservice.NewTestJig(cs, serviceName)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
nodeIP, err := e2enode.PickIP(jig.Client)
if err != nil {
framework.Logf("Unexpected error occurred: %v", err)
}
// TODO: write a wrapper for ExpectNoErrorWithOffset()
framework.ExpectNoErrorWithOffset(0, err)
framework.ExpectNoError(err)
ginkgo.By("creating service " + serviceName + " with type=NodePort in namespace " + ns)
e2eservice := jig.CreateTCPServiceOrFail(ns, func(svc *v1.Service) {
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
})
jig.SanityCheckService(e2eservice, v1.ServiceTypeNodePort)
nodePort := int(e2eservice.Spec.Ports[0].NodePort)
framework.ExpectNoError(err)
nodePort := int(svc.Spec.Ports[0].NodePort)
ginkgo.By("creating Pod to be part of service " + serviceName)
jig.RunOrFail(ns, nil)
_, err = jig.Run(nil)
framework.ExpectNoError(err)
//using hybrid_network methods
ginkgo.By("creating Windows testing Pod")