enforce the interface relationship between ServicePort and BaseServiceInfo

Currently the BaseServiceInfo struct implements the ServicePort interface, but
only uses that interface sometimes. All the elements of BaseServiceInfo are exported
and sometimes the interface is used to access them and othertimes not

I extended the ServicePort interface so that all relevent values can be accessed through
it and unexported all the elements of BaseServiceInfo
This commit is contained in:
Jacob Tanenbaum 2019-04-08 14:11:36 -04:00
parent b6b01e17ec
commit c0392d72e9
6 changed files with 232 additions and 195 deletions

View File

@ -156,7 +156,7 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.B
// Store the following for performance reasons. // Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
protocol := strings.ToLower(string(info.Protocol)) protocol := strings.ToLower(string(info.Protocol()))
info.serviceNameString = svcPortName.String() info.serviceNameString = svcPortName.String()
info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol)
info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol)
@ -620,14 +620,14 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo
// TODO: move it to util // TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap { for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.GetNodePort() nodePort := svcInfo.NodePort()
var err error var err error
if nodePort != 0 { if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
} else { } else {
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
} }
if err != nil { if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
@ -689,9 +689,9 @@ func (proxier *Proxier) syncProxyRules() {
staleServices := serviceUpdateResult.UDPStaleClusterIP staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames { for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString()) klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIPString()) staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP) staleServices.Insert(extIP)
} }
@ -820,8 +820,8 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue continue
} }
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
protocol := strings.ToLower(string(svcInfo.Protocol)) protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.serviceNameString svcNameString := svcInfo.serviceNameString
hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 hasEndpoints := len(proxier.endpointsMap[svcName]) > 0
@ -837,7 +837,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
svcXlbChain := svcInfo.serviceLBChainName svcXlbChain := svcInfo.serviceLBChainName
if svcInfo.OnlyNodeLocalEndpoints { if svcInfo.OnlyNodeLocalEndpoints() {
// Only for services request OnlyLocal traffic // Only for services request OnlyLocal traffic
// create the per-service LB chain, retaining counters if possible. // create the per-service LB chain, retaining counters if possible.
if lbChain, ok := existingNATChains[svcXlbChain]; ok { if lbChain, ok := existingNATChains[svcXlbChain]; ok {
@ -854,8 +854,8 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
) )
if proxier.masqueradeAll { if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
@ -874,24 +874,24 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPs { for _, externalIP := range svcInfo.ExternalIPStrings() {
// If the "external" IP happens to be an IP that is local to this // If the "external" IP happens to be an IP that is local to this
// machine, hold the local port open so no other process can open it // machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work). // (because the socket might open but it would never work).
if local, err := utilproxy.IsLocalIP(externalIP); err != nil { if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
klog.Errorf("can't determine if IP is local, assuming not: %v", err) klog.Errorf("can't determine if IP is local, assuming not: %v", err)
} else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) { } else if local && (svcInfo.Protocol() != v1.ProtocolSCTP) {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "externalIP for " + svcNameString, Description: "externalIP for " + svcNameString,
IP: externalIP, IP: externalIP,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
@ -922,7 +922,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
) )
// We have to SNAT packets to external IPs. // We have to SNAT packets to external IPs.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
@ -946,7 +946,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
@ -954,8 +954,8 @@ func (proxier *Proxier) syncProxyRules() {
// Capture load-balancer ingress. // Capture load-balancer ingress.
fwChain := svcInfo.serviceFirewallChainName fwChain := svcInfo.serviceFirewallChainName
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { for _, ingress := range svcInfo.LoadBalancerIPStrings() {
if ingress.IP != "" { if ingress != "" {
if hasEndpoints { if hasEndpoints {
// create service firewall chain // create service firewall chain
if chain, ok := existingNATChains[fwChain]; ok { if chain, ok := existingNATChains[fwChain]; ok {
@ -972,8 +972,8 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
) )
// jump to service firewall chain // jump to service firewall chain
writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
@ -987,18 +987,18 @@ func (proxier *Proxier) syncProxyRules() {
chosenChain := svcXlbChain chosenChain := svcXlbChain
// If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP. // If we are proxying only locally, we can retain the source IP.
if !svcInfo.OnlyNodeLocalEndpoints { if !svcInfo.OnlyNodeLocalEndpoints() {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
chosenChain = svcChain chosenChain = svcChain
} }
if len(svcInfo.LoadBalancerSourceRanges) == 0 { if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
// allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
} else { } else {
// firewall filter based on each source range // firewall filter based on each source range
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges { for _, src := range svcInfo.LoadBalancerSourceRanges() {
writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
// ignore error because it has been validated // ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src) _, cidr, _ := net.ParseCIDR(src)
@ -1010,7 +1010,7 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...) writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...)
} }
} }
@ -1023,8 +1023,8 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
@ -1034,7 +1034,7 @@ func (proxier *Proxier) syncProxyRules() {
// Capture nodeports. If we had more than 2 rules it might be // Capture nodeports. If we had more than 2 rules it might be
// worthwhile to make a new per-service chain for nodeport rules, but // worthwhile to make a new per-service chain for nodeport rules, but
// with just 2 rules it ends up being a waste and a cognitive burden. // with just 2 rules it ends up being a waste and a cognitive burden.
if svcInfo.NodePort != 0 { if svcInfo.NodePort() != 0 {
// Hold the local port open so no other process can open it // Hold the local port open so no other process can open it
// (because the socket might open but it would never work). // (because the socket might open but it would never work).
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
@ -1048,7 +1048,7 @@ func (proxier *Proxier) syncProxyRules() {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString, Description: "nodePort for " + svcNameString,
IP: address, IP: address,
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
} }
if utilproxy.IsZeroCIDR(address) { if utilproxy.IsZeroCIDR(address) {
@ -1066,7 +1066,7 @@ func (proxier *Proxier) syncProxyRules() {
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp] replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.GetProtocol() != v1.ProtocolSCTP { } else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp) socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil { if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
@ -1091,9 +1091,9 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeNodePortsChain), "-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort), "--dport", strconv.Itoa(svcInfo.NodePort()),
) )
if !svcInfo.OnlyNodeLocalEndpoints { if !svcInfo.OnlyNodeLocalEndpoints() {
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Jump to the service chain. // Jump to the service chain.
@ -1117,7 +1117,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "addrtype", "--dst-type", "LOCAL", "-m", "addrtype", "--dst-type", "LOCAL",
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort), "--dport", strconv.Itoa(svcInfo.NodePort()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
@ -1153,7 +1153,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// First write session affinity rules, if applicable. // First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains { for _, endpointChain := range endpointChains {
args = append(args[:0], args = append(args[:0],
"-A", string(svcChain), "-A", string(svcChain),
@ -1161,7 +1161,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.appendServiceCommentLocked(args, svcNameString) proxier.appendServiceCommentLocked(args, svcNameString)
args = append(args, args = append(args,
"-m", "recent", "--name", string(endpointChain), "-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(endpointChain), "-j", string(endpointChain),
) )
writeLine(proxier.natRules, args...) writeLine(proxier.natRules, args...)
@ -1174,7 +1174,7 @@ func (proxier *Proxier) syncProxyRules() {
localEndpointChains := make([]utiliptables.Chain, 0) localEndpointChains := make([]utiliptables.Chain, 0)
for i, endpointChain := range endpointChains { for i, endpointChain := range endpointChains {
// Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
if svcInfo.OnlyNodeLocalEndpoints && endpoints[i].IsLocal { if svcInfo.OnlyNodeLocalEndpoints() && endpoints[i].IsLocal {
// These slices parallel each other; must be kept in sync // These slices parallel each other; must be kept in sync
localEndpoints = append(localEndpoints, endpoints[i]) localEndpoints = append(localEndpoints, endpoints[i])
localEndpointChains = append(localEndpointChains, endpointChains[i]) localEndpointChains = append(localEndpointChains, endpointChains[i])
@ -1207,7 +1207,7 @@ func (proxier *Proxier) syncProxyRules() {
"-s", utilproxy.ToCIDR(net.ParseIP(epIP)), "-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
"-j", string(KubeMarkMasqChain))...) "-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists. // Update client-affinity lists.
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
} }
// DNAT to final destination. // DNAT to final destination.
@ -1216,7 +1216,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// The logic below this applies only if this service is marked as OnlyLocal // The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.OnlyNodeLocalEndpoints { if !svcInfo.OnlyNodeLocalEndpoints() {
continue continue
} }
@ -1247,13 +1247,13 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(proxier.natRules, args...) writeLine(proxier.natRules, args...)
} else { } else {
// First write session affinity rules only over local endpoints, if applicable. // First write session affinity rules only over local endpoints, if applicable.
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range localEndpointChains { for _, endpointChain := range localEndpointChains {
writeLine(proxier.natRules, writeLine(proxier.natRules,
"-A", string(svcXlbChain), "-A", string(svcXlbChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
"-m", "recent", "--name", string(endpointChain), "-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(endpointChain)) "-j", string(endpointChain))
} }
} }

View File

@ -147,30 +147,18 @@ func TestGetChainLinesMultipleTables(t *testing.T) {
checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected) checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
} }
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol v1.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
return &serviceInfo{
BaseServiceInfo: &proxy.BaseServiceInfo{
SessionAffinityType: v1.ServiceAffinityNone, // default
StickyMaxAgeSeconds: int(v1.DefaultClientIPServiceAffinitySeconds), // default
ClusterIP: ip,
Port: port,
Protocol: protocol,
OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
},
}
}
func TestDeleteEndpointConnections(t *testing.T) { func TestDeleteEndpointConnections(t *testing.T) {
const ( const (
UDP = v1.ProtocolUDP UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP TCP = v1.ProtocolTCP
SCTP = v1.ProtocolSCTP SCTP = v1.ProtocolSCTP
) )
testCases := []struct { testCases := []struct {
description string description string
svcName string svcName string
svcIP string svcIP string
svcPort int svcPort int32
protocol v1.Protocol protocol v1.Protocol
endpoint string // IP:port endpoint endpoint string // IP:port endpoint
epSvcPair proxy.ServiceEndpoint // Will be generated by test epSvcPair proxy.ServiceEndpoint // Will be generated by test
@ -237,21 +225,6 @@ func TestDeleteEndpointConnections(t *testing.T) {
}, },
} }
// Create a service map that has service info entries for all test cases
// and generate an endpoint service pair for each test case
serviceMap := make(map[proxy.ServicePortName]proxy.ServicePort)
for i, tc := range testCases {
svc := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
}
serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false)
testCases[i].epSvcPair = proxy.ServiceEndpoint{
Endpoint: tc.endpoint,
ServicePortName: svc,
}
}
// Create a fake executor for the conntrack utility. This should only be // Create a fake executor for the conntrack utility. This should only be
// invoked for UDP connections, since no conntrack cleanup is needed for TCP // invoked for UDP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{} fcmd := fakeexec.FakeCmd{}
@ -276,16 +249,43 @@ func TestDeleteEndpointConnections(t *testing.T) {
} }
} }
// Create a proxier using the fake conntrack executor and service map ipt := iptablestest.NewFake()
fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap} fp := NewFakeProxier(ipt)
fp.exec = &fexec
for _, tc := range testCases {
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
}),
)
proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
}
// Run the test cases // Run the test cases
for _, tc := range testCases { for _, tc := range testCases {
priorExecs := fexec.CommandCalls priorExecs := fexec.CommandCalls
priorGlogErrs := klog.Stats.Error.Lines() priorGlogErrs := klog.Stats.Error.Lines()
input := []proxy.ServiceEndpoint{tc.epSvcPair} svc := proxy.ServicePortName{
fakeProxier.deleteEndpointConnections(input) NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
}
input := []proxy.ServiceEndpoint{
{
Endpoint: tc.endpoint,
ServicePortName: svc,
},
}
fp.deleteEndpointConnections(input)
// For UDP connections, check the executed conntrack command // For UDP connections, check the executed conntrack command
var expExecs int var expExecs int

View File

@ -758,9 +758,9 @@ func (proxier *Proxier) syncProxyRules() {
staleServices := serviceUpdateResult.UDPStaleClusterIP staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames { for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString()) klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIPString()) staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP) staleServices.Insert(extIP)
} }
@ -815,7 +815,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue continue
} }
protocol := strings.ToLower(string(svcInfo.Protocol)) protocol := strings.ToLower(string(svcInfo.Protocol()))
// Precompute svcNameString; with many services the many calls // Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles. // to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String() svcNameString := svcName.String()
@ -853,8 +853,8 @@ func (proxier *Proxier) syncProxyRules() {
// Capture the clusterIP. // Capture the clusterIP.
// ipset call // ipset call
entry := &utilipset.Entry{ entry := &utilipset.Entry{
IP: svcInfo.ClusterIP.String(), IP: svcInfo.ClusterIP().String(),
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -867,15 +867,15 @@ func (proxier *Proxier) syncProxyRules() {
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: svcInfo.ClusterIP, Address: svcInfo.ClusterIP(),
Port: uint16(svcInfo.Port), Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
// Set session affinity flag and timeout for IPVS service // Set session affinity flag and timeout for IPVS service
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true); err == nil {
@ -891,16 +891,16 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPs { for _, externalIP := range svcInfo.ExternalIPStrings() {
if local, err := utilproxy.IsLocalIP(externalIP); err != nil { if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
klog.Errorf("can't determine if IP is local, assuming not: %v", err) klog.Errorf("can't determine if IP is local, assuming not: %v", err)
// We do not start listening on SCTP ports, according to our agreement in the // We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP // SCTP support KEP
} else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) { } else if local && (svcInfo.Protocol() != v1.ProtocolSCTP) {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "externalIP for " + svcNameString, Description: "externalIP for " + svcNameString,
IP: externalIP, IP: externalIP,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
@ -928,7 +928,7 @@ func (proxier *Proxier) syncProxyRules() {
// ipset call // ipset call
entry := &utilipset.Entry{ entry := &utilipset.Entry{
IP: externalIP, IP: externalIP,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -942,13 +942,13 @@ func (proxier *Proxier) syncProxyRules() {
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(externalIP), Address: net.ParseIP(externalIP),
Port: uint16(svcInfo.Port), Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
@ -962,12 +962,12 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { for _, ingress := range svcInfo.LoadBalancerIPStrings() {
if ingress.IP != "" { if ingress != "" {
// ipset call // ipset call
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress.IP, IP: ingress,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -981,14 +981,14 @@ func (proxier *Proxier) syncProxyRules() {
} }
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
if svcInfo.OnlyNodeLocalEndpoints { if svcInfo.OnlyNodeLocalEndpoints() {
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid { if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name)) klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
continue continue
} }
proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
} }
if len(svcInfo.LoadBalancerSourceRanges) != 0 { if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips. // This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
@ -998,11 +998,11 @@ func (proxier *Proxier) syncProxyRules() {
} }
proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges { for _, src := range svcInfo.LoadBalancerSourceRanges() {
// ipset call // ipset call
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress.IP, IP: ingress,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
Net: src, Net: src,
SetType: utilipset.HashIPPortNet, SetType: utilipset.HashIPPortNet,
@ -1025,10 +1025,10 @@ func (proxier *Proxier) syncProxyRules() {
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress.IP, IP: ingress,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
IP2: ingress.IP, IP2: ingress,
SetType: utilipset.HashIPPortIP, SetType: utilipset.HashIPPortIP,
} }
// enumerate all white list source ip // enumerate all white list source ip
@ -1042,19 +1042,19 @@ func (proxier *Proxier) syncProxyRules() {
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress.IP), Address: net.ParseIP(ingress),
Port: uint16(svcInfo.Port), Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
} }
} else { } else {
@ -1063,7 +1063,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
if svcInfo.NodePort != 0 { if svcInfo.NodePort() != 0 {
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil { if err != nil {
klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err) klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
@ -1075,7 +1075,7 @@ func (proxier *Proxier) syncProxyRules() {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString, Description: "nodePort for " + svcNameString,
IP: address, IP: address,
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
} }
if utilproxy.IsZeroCIDR(address) { if utilproxy.IsZeroCIDR(address) {
@ -1095,14 +1095,14 @@ func (proxier *Proxier) syncProxyRules() {
replacementPortsMap[lp] = proxier.portsMap[lp] replacementPortsMap[lp] = proxier.portsMap[lp]
// We do not start listening on SCTP ports, according to our agreement in the // We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP // SCTP support KEP
} else if svcInfo.GetProtocol() != v1.ProtocolSCTP { } else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp) socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil { if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue continue
} }
if lp.Protocol == "udp" { if lp.Protocol == "udp" {
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
} }
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
@ -1111,13 +1111,14 @@ func (proxier *Proxier) syncProxyRules() {
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
// ipset call // ipset call
var nodePortSet *IPSet var nodePortSet *IPSet
switch protocol { switch protocol {
case "tcp": case "tcp":
nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
entry = &utilipset.Entry{ entry = &utilipset.Entry{
// No need to provide ip info // No need to provide ip info
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.BitmapPort, SetType: utilipset.BitmapPort,
} }
@ -1125,7 +1126,7 @@ func (proxier *Proxier) syncProxyRules() {
nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
entry = &utilipset.Entry{ entry = &utilipset.Entry{
// No need to provide ip info // No need to provide ip info
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.BitmapPort, SetType: utilipset.BitmapPort,
} }
@ -1133,7 +1134,7 @@ func (proxier *Proxier) syncProxyRules() {
nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP] nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: proxier.nodeIP.String(), IP: proxier.nodeIP.String(),
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -1150,7 +1151,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Add externaltrafficpolicy=local type nodeport entry // Add externaltrafficpolicy=local type nodeport entry
if svcInfo.OnlyNodeLocalEndpoints { if svcInfo.OnlyNodeLocalEndpoints() {
var nodePortLocalSet *IPSet var nodePortLocalSet *IPSet
switch protocol { switch protocol {
case "tcp": case "tcp":
@ -1189,18 +1190,18 @@ func (proxier *Proxier) syncProxyRules() {
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: nodeIP, Address: nodeIP,
Port: uint16(svcInfo.NodePort), Port: uint16(svcInfo.NodePort()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false); err == nil { if err := proxier.syncService(svcNameString, serv, false); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
} }
} else { } else {
@ -1529,9 +1530,9 @@ func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptabl
// This assumes the proxier mutex is held // This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap { for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
} }

View File

@ -40,60 +40,85 @@ import (
// or can be used for constructing a more specific ServiceInfo struct // or can be used for constructing a more specific ServiceInfo struct
// defined by the proxier if needed. // defined by the proxier if needed.
type BaseServiceInfo struct { type BaseServiceInfo struct {
ClusterIP net.IP clusterIP net.IP
Port int port int
Protocol v1.Protocol protocol v1.Protocol
NodePort int nodePort int
LoadBalancerStatus v1.LoadBalancerStatus loadBalancerStatus v1.LoadBalancerStatus
SessionAffinityType v1.ServiceAffinity sessionAffinityType v1.ServiceAffinity
StickyMaxAgeSeconds int stickyMaxAgeSeconds int
ExternalIPs []string externalIPs []string
LoadBalancerSourceRanges []string loadBalancerSourceRanges []string
HealthCheckNodePort int healthCheckNodePort int
OnlyNodeLocalEndpoints bool onlyNodeLocalEndpoints bool
} }
var _ ServicePort = &BaseServiceInfo{} var _ ServicePort = &BaseServiceInfo{}
// String is part of ServicePort interface. // String is part of ServicePort interface.
func (info *BaseServiceInfo) String() string { func (info *BaseServiceInfo) String() string {
return fmt.Sprintf("%s:%d/%s", info.ClusterIP, info.Port, info.Protocol) return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol)
} }
// ClusterIPString is part of ServicePort interface. // ClusterIP is part of ServicePort interface.
func (info *BaseServiceInfo) ClusterIPString() string { func (info *BaseServiceInfo) ClusterIP() net.IP {
return info.ClusterIP.String() return info.clusterIP
} }
// GetProtocol is part of ServicePort interface. // Port is part of ServicePort interface.
func (info *BaseServiceInfo) GetProtocol() v1.Protocol { func (info *BaseServiceInfo) Port() int {
return info.Protocol return info.port
} }
// GetHealthCheckNodePort is part of ServicePort interface. // SessionAffinityType is part of the ServicePort interface.
func (info *BaseServiceInfo) GetHealthCheckNodePort() int { func (info *BaseServiceInfo) SessionAffinityType() v1.ServiceAffinity {
return info.HealthCheckNodePort return info.sessionAffinityType
} }
// GetNodePort is part of the ServicePort interface. // StickyMaxAgeSeconds is part of the ServicePort interface
func (info *BaseServiceInfo) GetNodePort() int { func (info *BaseServiceInfo) StickyMaxAgeSeconds() int {
return info.NodePort return info.stickyMaxAgeSeconds
}
// Protocol is part of ServicePort interface.
func (info *BaseServiceInfo) Protocol() v1.Protocol {
return info.protocol
}
// LoadBalancerSourceRanges is part of ServicePort interface
func (info *BaseServiceInfo) LoadBalancerSourceRanges() []string {
return info.loadBalancerSourceRanges
}
// HealthCheckNodePort is part of ServicePort interface.
func (info *BaseServiceInfo) HealthCheckNodePort() int {
return info.healthCheckNodePort
}
// NodePort is part of the ServicePort interface.
func (info *BaseServiceInfo) NodePort() int {
return info.nodePort
} }
// ExternalIPStrings is part of ServicePort interface. // ExternalIPStrings is part of ServicePort interface.
func (info *BaseServiceInfo) ExternalIPStrings() []string { func (info *BaseServiceInfo) ExternalIPStrings() []string {
return info.ExternalIPs return info.externalIPs
} }
// LoadBalancerIPStrings is part of ServicePort interface. // LoadBalancerIPStrings is part of ServicePort interface.
func (info *BaseServiceInfo) LoadBalancerIPStrings() []string { func (info *BaseServiceInfo) LoadBalancerIPStrings() []string {
var ips []string var ips []string
for _, ing := range info.LoadBalancerStatus.Ingress { for _, ing := range info.loadBalancerStatus.Ingress {
ips = append(ips, ing.IP) ips = append(ips, ing.IP)
} }
return ips return ips
} }
// OnlyNodeLocalEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool {
return info.onlyNodeLocalEndpoints
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
onlyNodeLocalEndpoints := false onlyNodeLocalEndpoints := false
if apiservice.RequestsOnlyLocalTraffic(service) { if apiservice.RequestsOnlyLocalTraffic(service) {
@ -105,32 +130,32 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
} }
info := &BaseServiceInfo{ info := &BaseServiceInfo{
ClusterIP: net.ParseIP(service.Spec.ClusterIP), clusterIP: net.ParseIP(service.Spec.ClusterIP),
Port: int(port.Port), port: int(port.Port),
Protocol: port.Protocol, protocol: port.Protocol,
NodePort: int(port.NodePort), nodePort: int(port.NodePort),
// Deep-copy in case the service instance changes // Deep-copy in case the service instance changes
LoadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(), loadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(),
SessionAffinityType: service.Spec.SessionAffinity, sessionAffinityType: service.Spec.SessionAffinity,
StickyMaxAgeSeconds: stickyMaxAgeSeconds, stickyMaxAgeSeconds: stickyMaxAgeSeconds,
OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints, onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
} }
if sct.isIPv6Mode == nil { if sct.isIPv6Mode == nil {
info.ExternalIPs = make([]string, len(service.Spec.ExternalIPs)) info.externalIPs = make([]string, len(service.Spec.ExternalIPs))
info.LoadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges)) info.loadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges))
copy(info.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
copy(info.ExternalIPs, service.Spec.ExternalIPs) copy(info.externalIPs, service.Spec.ExternalIPs)
} else { } else {
// Filter out the incorrect IP version case. // Filter out the incorrect IP version case.
// If ExternalIPs and LoadBalancerSourceRanges on service contains incorrect IP versions, // If ExternalIPs and LoadBalancerSourceRanges on service contains incorrect IP versions,
// only filter out the incorrect ones. // only filter out the incorrect ones.
var incorrectIPs []string var incorrectIPs []string
info.ExternalIPs, incorrectIPs = utilproxy.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode) info.externalIPs, incorrectIPs = utilproxy.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode)
if len(incorrectIPs) > 0 { if len(incorrectIPs) > 0 {
utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "externalIPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "externalIPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID)
} }
info.LoadBalancerSourceRanges, incorrectIPs = utilproxy.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode) info.loadBalancerSourceRanges, incorrectIPs = utilproxy.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode)
if len(incorrectIPs) > 0 { if len(incorrectIPs) > 0 {
utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "loadBalancerSourceRanges", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "loadBalancerSourceRanges", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID)
} }
@ -141,7 +166,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
if p == 0 { if p == 0 {
klog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name) klog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name)
} else { } else {
info.HealthCheckNodePort = int(p) info.healthCheckNodePort = int(p)
} }
} }
@ -239,8 +264,8 @@ func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (res
// computing this incrementally similarly to serviceMap. // computing this incrementally similarly to serviceMap.
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap { for svcPortName, info := range serviceMap {
if info.GetHealthCheckNodePort() != 0 { if info.HealthCheckNodePort() != 0 {
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort()) result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
} }
} }
@ -355,8 +380,8 @@ func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
info, exists := (*sm)[svcPortName] info, exists := (*sm)[svcPortName]
if exists { if exists {
klog.V(1).Infof("Removing service port %q", svcPortName) klog.V(1).Infof("Removing service port %q", svcPortName)
if info.GetProtocol() == v1.ProtocolUDP { if info.Protocol() == v1.ProtocolUDP {
UDPStaleClusterIP.Insert(info.ClusterIPString()) UDPStaleClusterIP.Insert(info.ClusterIP().String())
} }
delete(*sm, svcPortName) delete(*sm, svcPortName)
} else { } else {

View File

@ -33,12 +33,12 @@ const testHostname = "test-hostname"
func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo { func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo {
info := &BaseServiceInfo{ info := &BaseServiceInfo{
ClusterIP: net.ParseIP(clusterIP), clusterIP: net.ParseIP(clusterIP),
Port: port, port: port,
Protocol: v1.Protocol(protocol), protocol: v1.Protocol(protocol),
} }
if healthcheckNodePort != 0 { if healthcheckNodePort != 0 {
info.HealthCheckNodePort = healthcheckNodePort info.healthCheckNodePort = healthcheckNodePort
} }
for _, svcInfoFunc := range svcInfoFuncs { for _, svcInfoFunc := range svcInfoFuncs {
svcInfoFunc(info) svcInfoFunc(info)
@ -269,8 +269,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "validIPv4", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "validIPv4", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv4} info.externalIPs = []string{testExternalIPv4}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4} info.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
}), }),
}, },
isIPv6Mode: &falseVal, isIPv6Mode: &falseVal,
@ -297,8 +297,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "validIPv6", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "validIPv6", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv6} info.externalIPs = []string{testExternalIPv6}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6} info.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
}), }),
}, },
isIPv6Mode: &trueVal, isIPv6Mode: &trueVal,
@ -325,8 +325,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv4} info.externalIPs = []string{testExternalIPv4}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4} info.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
}), }),
}, },
isIPv6Mode: &falseVal, isIPv6Mode: &falseVal,
@ -353,8 +353,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv6} info.externalIPs = []string{testExternalIPv6}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6} info.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
}), }),
}, },
isIPv6Mode: &trueVal, isIPv6Mode: &trueVal,
@ -371,12 +371,12 @@ func TestServiceToServiceMap(t *testing.T) {
} }
for svcKey, expectedInfo := range tc.expected { for svcKey, expectedInfo := range tc.expected {
svcInfo := newServices[svcKey].(*BaseServiceInfo) svcInfo := newServices[svcKey].(*BaseServiceInfo)
if !svcInfo.ClusterIP.Equal(expectedInfo.ClusterIP) || if !svcInfo.clusterIP.Equal(expectedInfo.clusterIP) ||
svcInfo.Port != expectedInfo.Port || svcInfo.port != expectedInfo.port ||
svcInfo.Protocol != expectedInfo.Protocol || svcInfo.protocol != expectedInfo.protocol ||
svcInfo.HealthCheckNodePort != expectedInfo.HealthCheckNodePort || svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
!sets.NewString(svcInfo.ExternalIPs...).Equal(sets.NewString(expectedInfo.ExternalIPs...)) || !sets.NewString(svcInfo.externalIPs...).Equal(sets.NewString(expectedInfo.externalIPs...)) ||
!sets.NewString(svcInfo.LoadBalancerSourceRanges...).Equal(sets.NewString(expectedInfo.LoadBalancerSourceRanges...)) { !sets.NewString(svcInfo.loadBalancerSourceRanges...).Equal(sets.NewString(expectedInfo.loadBalancerSourceRanges...)) {
t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo) t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo)
} }
} }

View File

@ -18,6 +18,7 @@ package proxy
import ( import (
"fmt" "fmt"
"net"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -52,18 +53,28 @@ func (spn ServicePortName) String() string {
type ServicePort interface { type ServicePort interface {
// String returns service string. An example format can be: `IP:Port/Protocol`. // String returns service string. An example format can be: `IP:Port/Protocol`.
String() string String() string
// ClusterIPString returns service cluster IP in string format. // GetClusterIP returns service cluster IP in net.IP format.
ClusterIPString() string ClusterIP() net.IP
// GetPort returns service port if present. If return 0 means not present.
Port() int
// GetSessionAffinityType returns service session affinity type
SessionAffinityType() v1.ServiceAffinity
// GetStickyMaxAgeSeconds returns service max connection age
StickyMaxAgeSeconds() int
// ExternalIPStrings returns service ExternalIPs as a string array. // ExternalIPStrings returns service ExternalIPs as a string array.
ExternalIPStrings() []string ExternalIPStrings() []string
// LoadBalancerIPStrings returns service LoadBalancerIPs as a string array. // LoadBalancerIPStrings returns service LoadBalancerIPs as a string array.
LoadBalancerIPStrings() []string LoadBalancerIPStrings() []string
// GetProtocol returns service protocol. // GetProtocol returns service protocol.
GetProtocol() v1.Protocol Protocol() v1.Protocol
// LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not
LoadBalancerSourceRanges() []string
// GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present. // GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present.
GetHealthCheckNodePort() int HealthCheckNodePort() int
// GetNodePort returns a service Node port if present. If return 0, it means not present. // GetNodePort returns a service Node port if present. If return 0, it means not present.
GetNodePort() int NodePort() int
// GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints
OnlyNodeLocalEndpoints() bool
} }
// Endpoint in an interface which abstracts information about an endpoint. // Endpoint in an interface which abstracts information about an endpoint.