proxy/iptables: Abstract out shared OpenLocalPort code

Also, in the NodePort code, fix it to properly take advantage of the
fact that GetNodeAddresses() guarantees that if it returns a
"match-all" CIDR, then it doesn't return anything else. That also
makes it unnecessary to loop over the node addresses twice.
This commit is contained in:
Dan Winship 2021-10-28 11:41:18 -04:00
parent 9cd0552ddd
commit 4c64008181

View File

@ -1151,27 +1151,7 @@ func (proxier *Proxier) syncProxyRules() {
Port: svcInfo.Port(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
continue
}
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}
proxier.openPort(lp, replacementPortsMap)
}
if hasEndpoints {
@ -1308,8 +1288,12 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 {
// Hold the local port open so no other process can open it
// (because the socket might open but it would never work).
lps := make([]netutils.LocalPort, 0)
// nodeAddresses only contains the addresses for this proxier's IP family.
for address := range nodeAddresses {
if utilproxy.IsZeroCIDR(address) {
address = ""
}
lp := netutils.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
@ -1317,39 +1301,7 @@ func (proxier *Proxier) syncProxyRules() {
Port: svcInfo.NodePort(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
}
lps = append(lps, lp)
}
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
continue
}
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}
proxier.openPort(lp, replacementPortsMap)
}
if hasEndpoints {
@ -1688,3 +1640,33 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
func (proxier *Proxier) openPort(lp netutils.LocalPort, replacementPortsMap map[netutils.LocalPort]netutils.Closeable) {
// We don't open ports for SCTP services
if lp.Protocol == netutils.Protocol(v1.ProtocolSCTP) {
return
}
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
return
}
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "can't open port, skipping it", "port", lp)
return
}
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}