diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a4f8dc8ef76..7633c6c9dcd 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -369,6 +369,7 @@ var iptablesJumpChains = []iptablesJumpChain{ {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, + {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil}, @@ -847,6 +848,7 @@ func (proxier *Proxier) syncProxyRules() { } writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -917,6 +919,7 @@ func (proxier *Proxier) syncProxyRules() { // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -929,10 +932,10 @@ func (proxier *Proxier) syncProxyRules() { } // Capture load-balancer ingress. - if hasEndpoints { - fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { - if ingress.IP != "" { + fwChain := svcInfo.serviceFirewallChainName + for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { + if ingress.IP != "" { + if hasEndpoints { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { writeBytesLine(proxier.natChains, chain) @@ -993,10 +996,19 @@ func (proxier *Proxier) syncProxyRules() { // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } else { + // No endpoints. + writeLine(proxier.filterRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), + "--dport", strconv.Itoa(svcInfo.Port), + "-j", "REJECT", + ) } } } - // FIXME: do we need REJECT rules for load-balancer ingress if !hasEndpoints? // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but @@ -1078,6 +1090,7 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) } } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index 172f52bd89a..b98ddba72e8 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -738,6 +738,28 @@ func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.Replicati return result } +func (j *ServiceTestJig) Scale(namespace string, replicas int) { + rc := j.Name + scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{}) + if err != nil { + Failf("Failed to get scale for RC %q: %v", rc, err) + } + + scale.Spec.Replicas = int32(replicas) + _, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale) + if err != nil { + Failf("Failed to scale RC %q: %v", rc, err) + } + pods, err := j.waitForPodsCreated(namespace, replicas) + if err != nil { + Failf("Failed waiting for pods: %v", err) + } + if err := j.waitForPodsReady(namespace, pods); err != nil { + Failf("Failed waiting for pods to be running: %v", err) + } + return +} + func (j *ServiceTestJig) waitForPdbReady(namespace string) error { timeout := 2 * time.Minute for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) { @@ -911,6 +933,20 @@ func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout tim } } +func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/", nil) + if result.Status == HTTPRefused { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("HTTP service %v:%v not rejected: %v", host, port, err) + } +} + func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) { pollfn := func() (bool, error) { result := PokeUDP(host, port, "echo hello", &UDPPokeParams{ @@ -941,6 +977,19 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time } } +func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status == UDPRefused { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("UDP service %v:%v not rejected: %v", host, port, err) + } +} + func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { var body bytes.Buffer if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index a60fdd41388..55f743992da 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -791,11 +791,47 @@ var _ = SIGDescribe("Services", func() { jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) By("hitting the TCP service's LoadBalancer") - jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) if loadBalancerSupportsUDP { By("hitting the UDP service's LoadBalancer") - jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB) + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 0") + jig.Scale(ns1, 0) + jig.Scale(ns2, 0) + + By("looking for ICMP REJECT on the TCP service's NodePort") + jig.TestRejectedHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the UDP service's NodePort") + jig.TestRejectedUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the TCP service's LoadBalancer") + jig.TestRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("looking for ICMP REJECT on the UDP service's LoadBalancer") + jig.TestRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 1") + jig.Scale(ns1, 1) + jig.Scale(ns2, 1) + + By("hitting the TCP service's NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the UDP service's NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the TCP service's LoadBalancer") + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("hitting the UDP service's LoadBalancer") + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) } // Change the services back to ClusterIP.