Merge pull request #71573 from JacobTanenbaum/UDP_conntrack
Correctly Clear conntrack entry on endpoint changes when using nodeport
This commit is contained in:
@@ -607,7 +607,13 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
|
||||
for _, epSvcPair := range connectionMap {
|
||||
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP {
|
||||
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
|
||||
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
|
||||
nodePort := svcInfo.GetNodePort()
|
||||
var err error
|
||||
if nodePort != 0 {
|
||||
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
|
||||
} else {
|
||||
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
|
||||
}
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
|
||||
}
|
||||
|
@@ -74,6 +74,11 @@ func (info *BaseServiceInfo) GetHealthCheckNodePort() int {
|
||||
return info.HealthCheckNodePort
|
||||
}
|
||||
|
||||
// GetNodePort is part of the ServicePort interface.
|
||||
func (info *BaseServiceInfo) GetNodePort() int {
|
||||
return info.NodePort
|
||||
}
|
||||
|
||||
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
|
||||
onlyNodeLocalEndpoints := false
|
||||
if apiservice.RequestsOnlyLocalTraffic(service) {
|
||||
|
@@ -54,6 +54,8 @@ type ServicePort interface {
|
||||
GetProtocol() v1.Protocol
|
||||
// GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present.
|
||||
GetHealthCheckNodePort() int
|
||||
// GetNodePort returns a service Node port if present. If return 0, it means not present.
|
||||
GetNodePort() int
|
||||
}
|
||||
|
||||
// Endpoint in an interface which abstracts information about an endpoint.
|
||||
|
@@ -107,3 +107,19 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearEntriesForPortNAT uses the conntrack tool to delete the contrack entries
|
||||
// for connections specified by the {dest IP, port} pair.
|
||||
// Known issue:
|
||||
// https://github.com/kubernetes/kubernetes/issues/59368
|
||||
func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protocol v1.Protocol) error {
|
||||
if port <= 0 {
|
||||
return fmt.Errorf("Wrong port number. The port number must be greater then zero")
|
||||
}
|
||||
parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest)
|
||||
err := Exec(execer, parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -234,3 +234,50 @@ func TestDeleteUDPConnections(t *testing.T) {
|
||||
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClearUDPConntrackForPortNAT(t *testing.T) {
|
||||
fcmd := fakeexec.FakeCmd{
|
||||
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
|
||||
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||
func() ([]byte, error) {
|
||||
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
|
||||
},
|
||||
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||
},
|
||||
}
|
||||
fexec := fakeexec.FakeExec{
|
||||
CommandScript: []fakeexec.FakeCommandAction{
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
},
|
||||
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
port int
|
||||
dest string
|
||||
}{
|
||||
{
|
||||
name: "IPv4 success",
|
||||
port: 30211,
|
||||
dest: "1.2.3.4",
|
||||
},
|
||||
}
|
||||
svcCount := 0
|
||||
for i, tc := range testCases {
|
||||
err := ClearEntriesForPortNAT(&fexec, tc.dest, tc.port, v1.ProtocolUDP)
|
||||
if err != nil {
|
||||
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
|
||||
}
|
||||
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest))
|
||||
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
|
||||
if expectCommand != execCommand {
|
||||
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
|
||||
}
|
||||
svcCount++
|
||||
}
|
||||
if svcCount != fexec.CommandCalls {
|
||||
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user