diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index f8a8ea7d803..fd1ab56cb5c 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -744,42 +744,6 @@ func isServiceChainName(chainString string) bool { return false } -// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost (because UDP). -// This assumes the proxier mutex is held -// TODO: move it to util -func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) { - for _, epSvcPair := range deletedUDPEndpoints { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok { - endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - nodePort := svcInfo.NodePort() - var err error - if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) - } - } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) - } - for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) - } - } - for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) - } - } - } - } -} - // Assumes proxier.mu is held. func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string) []string { // Not printing these comments, can reduce size of iptables (in case of large @@ -831,29 +795,6 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - // We need to detect stale connections to UDP Services so we - // can clean dangling conntrack entries that can blackhole traffic. - conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs - conntrackCleanupServiceNodePorts := sets.NewInt() - // merge stale services gathered from updateEndpointsMap - // an UDP service that changes from 0 to non-0 endpoints is considered stale. - for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok { - klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) - conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) - for _, extIP := range svcInfo.ExternalIPStrings() { - conntrackCleanupServiceIPs.Insert(extIP) - } - for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - conntrackCleanupServiceIPs.Insert(lbIP) - } - nodePort := svcInfo.NodePort() - if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { - conntrackCleanupServiceNodePorts.Insert(nodePort) - } - } - } - klog.V(2).InfoS("Syncing iptables rules") success := false @@ -1629,24 +1570,8 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Error syncing healthcheck endpoints") } - // Finish housekeeping. - // Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed. - // TODO: these could be made more consistent. - klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList()) - for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { - if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { - klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP) - } - } - klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList()) - for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { - err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) - } - } - klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints) - proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints) + // Finish housekeeping, clear stale conntrack entries for UDP Services + conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index e348f41cd07..a3f70752e67 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -946,29 +946,6 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - // We need to detect stale connections to UDP Services so we - // can clean dangling conntrack entries that can blackhole traffic. - conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs - conntrackCleanupServiceNodePorts := sets.NewInt() - // merge stale services gathered from updateEndpointsMap - // an UDP service that changes from 0 to non-0 endpoints is considered stale. - for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok { - klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) - conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) - for _, extIP := range svcInfo.ExternalIPStrings() { - conntrackCleanupServiceIPs.Insert(extIP) - } - for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - conntrackCleanupServiceIPs.Insert(lbIP) - } - nodePort := svcInfo.NodePort() - if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { - conntrackCleanupServiceNodePorts.Insert(nodePort) - } - } - } - klog.V(3).InfoS("Syncing ipvs proxier rules") proxier.serviceNoLocalEndpointsInternal = sets.New[string]() @@ -1528,24 +1505,8 @@ func (proxier *Proxier) syncProxyRules() { metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len())) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) - // Finish housekeeping. - // Clear stale conntrack entries for UDP Services, this has to be done AFTER the ipvs rules are programmed. - // TODO: these could be made more consistent. - klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList()) - for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { - if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { - klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP) - } - } - klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList()) - for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { - err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, proxier.ipFamily == v1.IPv6Protocol, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) - } - } - klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints) - proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints) + // Finish housekeeping, clear stale conntrack entries for UDP Services + conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed @@ -1817,42 +1778,6 @@ func (proxier *Proxier) createAndLinkKubeChain() { } -// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost (because UDP). -// This assumes the proxier mutex is held -// TODO: move it to util -func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) { - for _, epSvcPair := range deletedUDPEndpoints { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok { - endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - nodePort := svcInfo.NodePort() - var err error - if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) - } - } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) - } - for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) - } - } - for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) - } - } - } - } -} - func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, alreadyBoundAddrs sets.Set[string]) error { appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs) if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {