Revert (most of) "Issue 70020; Flush Conntrack entities for SCTP"

This commit did not actually work; in between when it was first
written and tested, and when it merged, the code in
pkg/proxy/endpoints.go was changed to only add UDP endpoints to the
"stale endpoints"/"stale services" lists, and so checking for "either
UDP or SCTP" rather than just UDP when processing those lists had no
effect.

This reverts most of commit aa8521df66
(but leaves the changes related to
ipvs.IsRsGracefulTerminationNeeded() since that actually did have the
effect it meant to have).
This commit is contained in:
Dan Winship 2023-01-23 15:55:03 -05:00
parent 6a31757f45
commit 4381973a44
5 changed files with 42 additions and 89 deletions

View File

@ -744,35 +744,34 @@ func isServiceChainName(chainString string) bool {
return false
}
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost.
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
// TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
svcProto := svcInfo.Protocol()
var err error
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto)
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
}
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
}
@ -839,8 +838,8 @@ func (proxier *Proxier) syncProxyRules() {
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP)

View File

@ -131,7 +131,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
for _, tc := range testCases {
if conntrack.IsClearConntrackNeeded(tc.protocol) {
if tc.protocol == UDP {
var cmdOutput string
var simErr error
if tc.simulatedErr == "" {
@ -186,7 +186,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
// For UDP and SCTP connections, check the executed conntrack command
var expExecs int
if conntrack.IsClearConntrackNeeded(tc.protocol) {
if tc.protocol == UDP {
isIPv6 := func(ip string) bool {
netIP := netutils.ParseIPSloppy(ip)
return netIP.To4() == nil
@ -265,7 +265,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
}
// Create a fake executor for the conntrack utility. This should only be
// invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP
// invoked for UDP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
@ -274,7 +274,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
for _, tc := range testCases {
if conntrack.IsClearConntrackNeeded(tc.protocol) {
if tc.protocol == UDP {
var cmdOutput string
var simErr error
if tc.simulatedErr == "" {
@ -327,15 +327,15 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
fp.deleteEndpointConnections(input)
// For UDP and SCTP connections, check the executed conntrack command
// For UDP connections, check the executed conntrack command
var expExecs int
if conntrack.IsClearConntrackNeeded(tc.protocol) {
if tc.protocol == UDP {
isIPv6 := func(ip string) bool {
netIP := netutils.ParseIPSloppy(ip)
return netIP.To4() == nil
}
endpointIP := utilproxy.IPPart(tc.endpoint)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol))))
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
if isIPv6(endpointIP) {
expectCommand += " -f ipv6"
}

View File

@ -953,8 +953,8 @@ func (proxier *Proxier) syncProxyRules() {
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP)
@ -1813,35 +1813,34 @@ func (proxier *Proxier) createAndLinkKubeChain() {
}
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost.
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
// TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
svcProto := svcInfo.Protocol()
var err error
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto)
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
}
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
}

View File

@ -100,7 +100,7 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
return fmt.Errorf("error deleting conntrack entries for %s peer {%s, %s}, error: %v", protoStr(protocol), origin, dest, err)
return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err)
}
return nil
}
@ -116,12 +116,7 @@ func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protoc
parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest)
err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for %s port: %d, error: %v", protoStr(protocol), port, err)
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
}
return nil
}
// IsClearConntrackNeeded returns true if protocol requires conntrack cleanup for the stale connections
func IsClearConntrackNeeded(proto v1.Protocol) bool {
return proto == v1.ProtocolUDP || proto == v1.ProtocolSCTP
}

View File

@ -177,7 +177,7 @@ func TestClearUDPConntrackForPort(t *testing.T) {
}
}
func TestDeleteConnections(t *testing.T) {
func TestDeleteUDPConnections(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
@ -185,11 +185,6 @@ func TestDeleteConnections(t *testing.T) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
@ -197,9 +192,6 @@ func TestDeleteConnections(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
@ -208,52 +200,30 @@ func TestDeleteConnections(t *testing.T) {
name string
origin string
dest string
proto v1.Protocol
}{
{
name: "UDP IPv4 success",
name: "IPv4 success",
origin: "1.2.3.4",
dest: "10.20.30.40",
proto: v1.ProtocolUDP,
},
{
name: "UDP IPv4 simulated failure",
name: "IPv4 simulated failure",
origin: "2.3.4.5",
dest: "20.30.40.50",
proto: v1.ProtocolUDP,
},
{
name: "UDP IPv6 success",
name: "IPv6 success",
origin: "fd00::600d:f00d",
dest: "2001:db8::5",
proto: v1.ProtocolUDP,
},
{
name: "SCTP IPv4 success",
origin: "1.2.3.5",
dest: "10.20.30.50",
proto: v1.ProtocolSCTP,
},
{
name: "SCTP IPv4 simulated failure",
origin: "2.3.4.6",
dest: "20.30.40.60",
proto: v1.ProtocolSCTP,
},
{
name: "SCTP IPv6 success",
origin: "fd00::600d:f00d",
dest: "2001:db8::6",
proto: v1.ProtocolSCTP,
},
}
svcCount := 0
for i, tc := range testCases {
err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, tc.proto)
err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.origin, tc.dest, protoStr(tc.proto)) + familyParamStr(utilnet.IsIPv6String(tc.origin))
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
@ -265,46 +235,36 @@ func TestDeleteConnections(t *testing.T) {
}
}
func TestClearConntrackForPortNAT(t *testing.T) {
func TestClearUDPConntrackForPortNAT(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := []struct {
name string
port int
dest string
proto v1.Protocol
name string
port int
dest string
}{
{
name: "UDP IPv4 success",
port: 30211,
dest: "1.2.3.4",
proto: v1.ProtocolUDP,
},
{
name: "SCTP IPv4 success",
port: 30215,
dest: "1.2.3.5",
proto: v1.ProtocolSCTP,
name: "IPv4 success",
port: 30211,
dest: "1.2.3.4",
},
}
svcCount := 0
for i, tc := range testCases {
err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, tc.proto)
err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D -p %s --dport %d --dst-nat %s", protoStr(tc.proto), tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest))
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)