diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e00144de210..7d2c17141d9 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -744,35 +744,34 @@ func isServiceChainName(chainString string) bool { return false } -// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost. +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() - svcProto := svcInfo.Protocol() var err error if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) } } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } @@ -839,8 +838,8 @@ func (proxier *Proxier) syncProxyRules() { // merge stale services gathered from updateEndpointsMap // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { conntrackCleanupServiceIPs.Insert(extIP) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index c6f64716063..a9d13b21ba9 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -131,7 +131,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) } for _, tc := range testCases { - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { var cmdOutput string var simErr error if tc.simulatedErr == "" { @@ -186,7 +186,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { // For UDP and SCTP connections, check the executed conntrack command var expExecs int - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { isIPv6 := func(ip string) bool { netIP := netutils.ParseIPSloppy(ip) return netIP.To4() == nil @@ -265,7 +265,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { } // Create a fake executor for the conntrack utility. This should only be - // invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP + // invoked for UDP connections, since no conntrack cleanup is needed for TCP fcmd := fakeexec.FakeCmd{} fexec := &fakeexec.FakeExec{ LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, @@ -274,7 +274,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) } for _, tc := range testCases { - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { var cmdOutput string var simErr error if tc.simulatedErr == "" { @@ -327,15 +327,15 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { fp.deleteEndpointConnections(input) - // For UDP and SCTP connections, check the executed conntrack command + // For UDP connections, check the executed conntrack command var expExecs int - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { isIPv6 := func(ip string) bool { netIP := netutils.ParseIPSloppy(ip) return netIP.To4() == nil } endpointIP := utilproxy.IPPart(tc.endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol)))) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) if isIPv6(endpointIP) { expectCommand += " -f ipv6" } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index d00801beb4b..e60d56bf0ba 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -953,8 +953,8 @@ func (proxier *Proxier) syncProxyRules() { // merge stale services gathered from updateEndpointsMap // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { conntrackCleanupServiceIPs.Insert(extIP) @@ -1813,35 +1813,34 @@ func (proxier *Proxier) createAndLinkKubeChain() { } -// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost. +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() - svcProto := svcInfo.Protocol() var err error if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) } } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } diff --git a/pkg/util/conntrack/conntrack.go b/pkg/util/conntrack/conntrack.go index 3a140c83df2..4b39f61ac79 100644 --- a/pkg/util/conntrack/conntrack.go +++ b/pkg/util/conntrack/conntrack.go @@ -100,7 +100,7 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services. - return fmt.Errorf("error deleting conntrack entries for %s peer {%s, %s}, error: %v", protoStr(protocol), origin, dest, err) + return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err) } return nil } @@ -116,12 +116,7 @@ func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protoc parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) err := Exec(execer, parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { - return fmt.Errorf("error deleting conntrack entries for %s port: %d, error: %v", protoStr(protocol), port, err) + return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil } - -// IsClearConntrackNeeded returns true if protocol requires conntrack cleanup for the stale connections -func IsClearConntrackNeeded(proto v1.Protocol) bool { - return proto == v1.ProtocolUDP || proto == v1.ProtocolSCTP -} diff --git a/pkg/util/conntrack/conntrack_test.go b/pkg/util/conntrack/conntrack_test.go index d7a395cde9b..18a7aab4e7b 100644 --- a/pkg/util/conntrack/conntrack_test.go +++ b/pkg/util/conntrack/conntrack_test.go @@ -177,7 +177,7 @@ func TestClearUDPConntrackForPort(t *testing.T) { } } -func TestDeleteConnections(t *testing.T) { +func TestDeleteUDPConnections(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, @@ -185,11 +185,6 @@ func TestDeleteConnections(t *testing.T) { return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { - return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") - }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } fexec := &fakeexec.FakeExec{ @@ -197,9 +192,6 @@ func TestDeleteConnections(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } @@ -208,52 +200,30 @@ func TestDeleteConnections(t *testing.T) { name string origin string dest string - proto v1.Protocol }{ { - name: "UDP IPv4 success", + name: "IPv4 success", origin: "1.2.3.4", dest: "10.20.30.40", - proto: v1.ProtocolUDP, }, { - name: "UDP IPv4 simulated failure", + name: "IPv4 simulated failure", origin: "2.3.4.5", dest: "20.30.40.50", - proto: v1.ProtocolUDP, }, { - name: "UDP IPv6 success", + name: "IPv6 success", origin: "fd00::600d:f00d", dest: "2001:db8::5", - proto: v1.ProtocolUDP, - }, - { - name: "SCTP IPv4 success", - origin: "1.2.3.5", - dest: "10.20.30.50", - proto: v1.ProtocolSCTP, - }, - { - name: "SCTP IPv4 simulated failure", - origin: "2.3.4.6", - dest: "20.30.40.60", - proto: v1.ProtocolSCTP, - }, - { - name: "SCTP IPv6 success", - origin: "fd00::600d:f00d", - dest: "2001:db8::6", - proto: v1.ProtocolSCTP, }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, tc.proto) + err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.origin, tc.dest, protoStr(tc.proto)) + familyParamStr(utilnet.IsIPv6String(tc.origin)) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) @@ -265,46 +235,36 @@ func TestDeleteConnections(t *testing.T) { } } -func TestClearConntrackForPortNAT(t *testing.T) { +func TestClearUDPConntrackForPortNAT(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } fexec := &fakeexec.FakeExec{ CommandScript: []fakeexec.FakeCommandAction{ func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } testCases := []struct { - name string - port int - dest string - proto v1.Protocol + name string + port int + dest string }{ { - name: "UDP IPv4 success", - port: 30211, - dest: "1.2.3.4", - proto: v1.ProtocolUDP, - }, - { - name: "SCTP IPv4 success", - port: 30215, - dest: "1.2.3.5", - proto: v1.ProtocolSCTP, + name: "IPv4 success", + port: 30211, + dest: "1.2.3.4", }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, tc.proto) + err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D -p %s --dport %d --dst-nat %s", protoStr(tc.proto), tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) + expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)