From 4381973a4404a8a25c3076657349721bcb21d015 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 23 Jan 2023 15:55:03 -0500 Subject: [PATCH] Revert (most of) "Issue 70020; Flush Conntrack entities for SCTP" This commit did not actually work; in between when it was first written and tested, and when it merged, the code in pkg/proxy/endpoints.go was changed to only add UDP endpoints to the "stale endpoints"/"stale services" lists, and so checking for "either UDP or SCTP" rather than just UDP when processing those lists had no effect. This reverts most of commit aa8521df666c0cc8f65da7150a1961cf3277fc44 (but leaves the changes related to ipvs.IsRsGracefulTerminationNeeded() since that actually did have the effect it meant to have). --- pkg/proxy/iptables/proxier.go | 19 ++++---- pkg/proxy/iptables/proxier_test.go | 14 +++--- pkg/proxy/ipvs/proxier.go | 19 ++++---- pkg/util/conntrack/conntrack.go | 9 +--- pkg/util/conntrack/conntrack_test.go | 70 ++++++---------------------- 5 files changed, 42 insertions(+), 89 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e00144de210..7d2c17141d9 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -744,35 +744,34 @@ func isServiceChainName(chainString string) bool { return false } -// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost. +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() - svcProto := svcInfo.Protocol() var err error if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) } } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } @@ -839,8 +838,8 @@ func (proxier *Proxier) syncProxyRules() { // merge stale services gathered from updateEndpointsMap // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { conntrackCleanupServiceIPs.Insert(extIP) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index c6f64716063..a9d13b21ba9 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -131,7 +131,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) } for _, tc := range testCases { - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { var cmdOutput string var simErr error if tc.simulatedErr == "" { @@ -186,7 +186,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { // For UDP and SCTP connections, check the executed conntrack command var expExecs int - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { isIPv6 := func(ip string) bool { netIP := netutils.ParseIPSloppy(ip) return netIP.To4() == nil @@ -265,7 +265,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { } // Create a fake executor for the conntrack utility. This should only be - // invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP + // invoked for UDP connections, since no conntrack cleanup is needed for TCP fcmd := fakeexec.FakeCmd{} fexec := &fakeexec.FakeExec{ LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, @@ -274,7 +274,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) } for _, tc := range testCases { - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { var cmdOutput string var simErr error if tc.simulatedErr == "" { @@ -327,15 +327,15 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { fp.deleteEndpointConnections(input) - // For UDP and SCTP connections, check the executed conntrack command + // For UDP connections, check the executed conntrack command var expExecs int - if conntrack.IsClearConntrackNeeded(tc.protocol) { + if tc.protocol == UDP { isIPv6 := func(ip string) bool { netIP := netutils.ParseIPSloppy(ip) return netIP.To4() == nil } endpointIP := utilproxy.IPPart(tc.endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol)))) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) if isIPv6(endpointIP) { expectCommand += " -f ipv6" } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index d00801beb4b..e60d56bf0ba 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -953,8 +953,8 @@ func (proxier *Proxier) syncProxyRules() { // merge stale services gathered from updateEndpointsMap // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { conntrackCleanupServiceIPs.Insert(extIP) @@ -1813,35 +1813,34 @@ func (proxier *Proxier) createAndLinkKubeChain() { } -// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost. +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() - svcProto := svcInfo.Protocol() var err error if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) } } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } diff --git a/pkg/util/conntrack/conntrack.go b/pkg/util/conntrack/conntrack.go index 3a140c83df2..4b39f61ac79 100644 --- a/pkg/util/conntrack/conntrack.go +++ b/pkg/util/conntrack/conntrack.go @@ -100,7 +100,7 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services. - return fmt.Errorf("error deleting conntrack entries for %s peer {%s, %s}, error: %v", protoStr(protocol), origin, dest, err) + return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err) } return nil } @@ -116,12 +116,7 @@ func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protoc parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) err := Exec(execer, parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { - return fmt.Errorf("error deleting conntrack entries for %s port: %d, error: %v", protoStr(protocol), port, err) + return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil } - -// IsClearConntrackNeeded returns true if protocol requires conntrack cleanup for the stale connections -func IsClearConntrackNeeded(proto v1.Protocol) bool { - return proto == v1.ProtocolUDP || proto == v1.ProtocolSCTP -} diff --git a/pkg/util/conntrack/conntrack_test.go b/pkg/util/conntrack/conntrack_test.go index d7a395cde9b..18a7aab4e7b 100644 --- a/pkg/util/conntrack/conntrack_test.go +++ b/pkg/util/conntrack/conntrack_test.go @@ -177,7 +177,7 @@ func TestClearUDPConntrackForPort(t *testing.T) { } } -func TestDeleteConnections(t *testing.T) { +func TestDeleteUDPConnections(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, @@ -185,11 +185,6 @@ func TestDeleteConnections(t *testing.T) { return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { - return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") - }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } fexec := &fakeexec.FakeExec{ @@ -197,9 +192,6 @@ func TestDeleteConnections(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } @@ -208,52 +200,30 @@ func TestDeleteConnections(t *testing.T) { name string origin string dest string - proto v1.Protocol }{ { - name: "UDP IPv4 success", + name: "IPv4 success", origin: "1.2.3.4", dest: "10.20.30.40", - proto: v1.ProtocolUDP, }, { - name: "UDP IPv4 simulated failure", + name: "IPv4 simulated failure", origin: "2.3.4.5", dest: "20.30.40.50", - proto: v1.ProtocolUDP, }, { - name: "UDP IPv6 success", + name: "IPv6 success", origin: "fd00::600d:f00d", dest: "2001:db8::5", - proto: v1.ProtocolUDP, - }, - { - name: "SCTP IPv4 success", - origin: "1.2.3.5", - dest: "10.20.30.50", - proto: v1.ProtocolSCTP, - }, - { - name: "SCTP IPv4 simulated failure", - origin: "2.3.4.6", - dest: "20.30.40.60", - proto: v1.ProtocolSCTP, - }, - { - name: "SCTP IPv6 success", - origin: "fd00::600d:f00d", - dest: "2001:db8::6", - proto: v1.ProtocolSCTP, }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, tc.proto) + err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.origin, tc.dest, protoStr(tc.proto)) + familyParamStr(utilnet.IsIPv6String(tc.origin)) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) @@ -265,46 +235,36 @@ func TestDeleteConnections(t *testing.T) { } } -func TestClearConntrackForPortNAT(t *testing.T) { +func TestClearUDPConntrackForPortNAT(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } fexec := &fakeexec.FakeExec{ CommandScript: []fakeexec.FakeCommandAction{ func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } testCases := []struct { - name string - port int - dest string - proto v1.Protocol + name string + port int + dest string }{ { - name: "UDP IPv4 success", - port: 30211, - dest: "1.2.3.4", - proto: v1.ProtocolUDP, - }, - { - name: "SCTP IPv4 success", - port: 30215, - dest: "1.2.3.5", - proto: v1.ProtocolSCTP, + name: "IPv4 success", + port: 30211, + dest: "1.2.3.4", }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, tc.proto) + err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D -p %s --dport %d --dst-nat %s", protoStr(tc.proto), tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) + expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)