Remove conntrack entry on udp rule add.
Moved conntrack util outside of proxy pkg Added warning message if conntrack binary is not found Addressed review comments. ran gofmt
This commit is contained in:
parent
fa5c815cca
commit
098a4467fe
@ -17,11 +17,13 @@ go_library(
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/network/hostport",
|
||||
deps = [
|
||||
"//pkg/proxy/iptables:go_default_library",
|
||||
"//pkg/util/conntrack:go_default_library",
|
||||
"//pkg/util/iptables:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@ -38,6 +40,7 @@ go_test(
|
||||
"//pkg/util/iptables:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -26,9 +26,12 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
|
||||
"k8s.io/kubernetes/pkg/util/conntrack"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
"k8s.io/utils/exec"
|
||||
)
|
||||
|
||||
// HostPortManager is an interface for adding and removing hostport for a given pod sandbox.
|
||||
@ -44,18 +47,26 @@ type HostPortManager interface {
|
||||
}
|
||||
|
||||
type hostportManager struct {
|
||||
hostPortMap map[hostport]closeable
|
||||
iptables utiliptables.Interface
|
||||
portOpener hostportOpener
|
||||
mu sync.Mutex
|
||||
hostPortMap map[hostport]closeable
|
||||
execer exec.Interface
|
||||
conntrackFound bool
|
||||
iptables utiliptables.Interface
|
||||
portOpener hostportOpener
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewHostportManager(iptables utiliptables.Interface) HostPortManager {
|
||||
return &hostportManager{
|
||||
h := &hostportManager{
|
||||
hostPortMap: make(map[hostport]closeable),
|
||||
execer: exec.New(),
|
||||
iptables: iptables,
|
||||
portOpener: openLocalPort,
|
||||
}
|
||||
h.conntrackFound = conntrack.Exists(h.execer)
|
||||
if !h.conntrackFound {
|
||||
glog.Warningf("The binary conntrack is not installed, this can cause failures in network connection cleanup.")
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) {
|
||||
@ -103,10 +114,14 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
|
||||
}
|
||||
|
||||
newChains := []utiliptables.Chain{}
|
||||
conntrackPortsToRemove := []int{}
|
||||
for _, pm := range hostportMappings {
|
||||
protocol := strings.ToLower(string(pm.Protocol))
|
||||
chain := getHostportChain(id, pm)
|
||||
newChains = append(newChains, chain)
|
||||
if pm.Protocol == v1.ProtocolUDP {
|
||||
conntrackPortsToRemove = append(conntrackPortsToRemove, int(pm.HostPort))
|
||||
}
|
||||
|
||||
// Add new hostport chain
|
||||
writeLine(natChains, utiliptables.MakeChainLine(chain))
|
||||
@ -150,6 +165,21 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
|
||||
// clean up opened host port if encounter any error
|
||||
return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)})
|
||||
}
|
||||
isIpv6 := conntrack.IsIPv6(podPortMapping.IP)
|
||||
|
||||
// Remove conntrack entries just after adding the new iptables rules. If the conntrack entry is removed along with
|
||||
// the IP tables rule, it can be the case that the packets received by the node after iptables rule removal will
|
||||
// create a new conntrack entry without any DNAT. That will result in blackhole of the traffic even after correct
|
||||
// iptables rules have been added back.
|
||||
if hm.execer != nil && hm.conntrackFound {
|
||||
glog.Infof("Starting to delete udp conntrack entries: %v, isIPv6 - %v", conntrackPortsToRemove, isIpv6)
|
||||
for _, port := range conntrackPortsToRemove {
|
||||
err = conntrack.ClearEntriesForPort(hm.execer, port, isIpv6, v1.ProtocolUDP)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", port, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/api/core/v1"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
"k8s.io/utils/exec"
|
||||
)
|
||||
|
||||
func TestHostportManager(t *testing.T) {
|
||||
@ -34,6 +35,7 @@ func TestHostportManager(t *testing.T) {
|
||||
hostPortMap: make(map[hostport]closeable),
|
||||
iptables: iptables,
|
||||
portOpener: portOpener.openFakeSocket,
|
||||
execer: exec.New(),
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
|
@ -21,6 +21,7 @@ go_library(
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/util/async:go_default_library",
|
||||
"//pkg/util/conntrack:go_default_library",
|
||||
"//pkg/util/iptables:go_default_library",
|
||||
"//pkg/util/sysctl:go_default_library",
|
||||
"//pkg/util/version:go_default_library",
|
||||
@ -42,6 +43,7 @@ go_test(
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/util/async:go_default_library",
|
||||
"//pkg/util/conntrack:go_default_library",
|
||||
"//pkg/util/iptables:go_default_library",
|
||||
"//pkg/util/iptables/testing:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
|
@ -46,6 +46,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
"k8s.io/kubernetes/pkg/util/conntrack"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||
utilversion "k8s.io/kubernetes/pkg/util/version"
|
||||
@ -682,7 +683,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
|
||||
for _, epSvcPair := range connectionMap {
|
||||
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP {
|
||||
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
|
||||
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP)
|
||||
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, v1.ProtocolUDP)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
|
||||
}
|
||||
@ -839,7 +840,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
glog.Errorf("Failed to cast serviceInfo %q", svcName.String())
|
||||
continue
|
||||
}
|
||||
isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP)
|
||||
isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP)
|
||||
protocol := strings.ToLower(string(svcInfo.protocol))
|
||||
svcNameString = svcInfo.serviceNameString
|
||||
|
||||
@ -1052,7 +1053,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
|
||||
// This only affects UDP connections, which are not common.
|
||||
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
|
||||
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6)
|
||||
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
|
||||
}
|
||||
@ -1376,7 +1377,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Finish housekeeping.
|
||||
// TODO: these could be made more consistent.
|
||||
for _, svcIP := range staleServices.UnsortedList() {
|
||||
if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil {
|
||||
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
|
||||
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
"k8s.io/kubernetes/pkg/util/conntrack"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
|
||||
"k8s.io/utils/exec"
|
||||
@ -222,7 +223,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.3:80",
|
||||
simulatedErr: utilproxy.NoConnectionToDelete,
|
||||
simulatedErr: conntrack.NoConnectionToDelete,
|
||||
}, {
|
||||
description: "V4 UDP, unexpected error, should be glogged",
|
||||
svcName: "v4-udp-simulated-error",
|
||||
@ -328,7 +329,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
|
||||
|
||||
// Check the number of new glog errors
|
||||
var expGlogErrs int64
|
||||
if tc.simulatedErr != "" && tc.simulatedErr != utilproxy.NoConnectionToDelete {
|
||||
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
|
||||
expGlogErrs = 1
|
||||
}
|
||||
glogErrs := glog.Stats.Error.Lines() - priorGlogErrs
|
||||
|
@ -85,6 +85,7 @@ go_library(
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/util/async:go_default_library",
|
||||
"//pkg/util/conntrack:go_default_library",
|
||||
"//pkg/util/ipset:go_default_library",
|
||||
"//pkg/util/iptables:go_default_library",
|
||||
"//pkg/util/ipvs:go_default_library",
|
||||
|
@ -45,6 +45,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
"k8s.io/kubernetes/pkg/util/conntrack"
|
||||
utilipset "k8s.io/kubernetes/pkg/util/ipset"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
|
||||
@ -295,7 +296,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
|
||||
isIPv6 := utilproxy.IsIPv6(nodeIP)
|
||||
isIPv6 := conntrack.IsIPv6(nodeIP)
|
||||
|
||||
glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
|
||||
|
||||
@ -1116,8 +1117,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
if lp.Protocol == "udp" {
|
||||
isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP)
|
||||
utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6)
|
||||
isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP)
|
||||
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP)
|
||||
}
|
||||
replacementPortsMap[lp] = socket
|
||||
} // We're holding the port, so it's OK to install ipvs rules.
|
||||
@ -1349,7 +1350,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Finish housekeeping.
|
||||
// TODO: these could be made more consistent.
|
||||
for _, svcIP := range staleServices.UnsortedList() {
|
||||
if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil {
|
||||
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, clientv1.ProtocolUDP); err != nil {
|
||||
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
|
||||
}
|
||||
}
|
||||
@ -1363,7 +1364,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
|
||||
for _, epSvcPair := range connectionMap {
|
||||
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP {
|
||||
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
|
||||
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP)
|
||||
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, clientv1.ProtocolUDP)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
|
||||
}
|
||||
|
@ -57,9 +57,11 @@ go_library(
|
||||
"//pkg/apis/core/helper:go_default_library",
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/util/conntrack:go_default_library",
|
||||
"//pkg/util/iptables:go_default_library",
|
||||
"//pkg/util/slice:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
@ -36,6 +37,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/util/conntrack"
|
||||
"k8s.io/kubernetes/pkg/util/iptables"
|
||||
utilexec "k8s.io/utils/exec"
|
||||
)
|
||||
@ -507,7 +509,7 @@ func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets.
|
||||
proxier.loadBalancer.DeleteService(serviceName)
|
||||
}
|
||||
for _, svcIP := range staleUDPServices.UnsortedList() {
|
||||
if err := proxyutil.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil {
|
||||
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
|
||||
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"conntrack.go",
|
||||
"endpoints.go",
|
||||
"port.go",
|
||||
"utils.go",
|
||||
@ -15,14 +14,12 @@ go_library(
|
||||
"//pkg/apis/core/helper:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"conntrack_test.go",
|
||||
"endpoints_test.go",
|
||||
"port_test.go",
|
||||
"utils_test.go",
|
||||
@ -32,8 +29,6 @@ go_test(
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -15,6 +15,7 @@ filegroup(
|
||||
"//pkg/util/bandwidth:all-srcs",
|
||||
"//pkg/util/config:all-srcs",
|
||||
"//pkg/util/configz:all-srcs",
|
||||
"//pkg/util/conntrack:all-srcs",
|
||||
"//pkg/util/dbus:all-srcs",
|
||||
"//pkg/util/ebtables:all-srcs",
|
||||
"//pkg/util/env:all-srcs",
|
||||
|
41
pkg/util/conntrack/BUILD
Normal file
41
pkg/util/conntrack/BUILD
Normal file
@ -0,0 +1,41 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"conntrack.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/util/conntrack",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"conntrack_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
package conntrack
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -22,22 +22,30 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/utils/exec"
|
||||
)
|
||||
|
||||
// Utilities for dealing with conntrack
|
||||
|
||||
// NoConnectionToDelete is the error string returned by conntrack when no matching connections are found
|
||||
const NoConnectionToDelete = "0 flow entries have been deleted"
|
||||
|
||||
// IsIPv6 returns true if the given ip address is a valid ipv6 address
|
||||
func IsIPv6(netIP net.IP) bool {
|
||||
return netIP != nil && netIP.To4() == nil
|
||||
}
|
||||
|
||||
// IsIPv6String returns true if the given string is a valid ipv6 address
|
||||
func IsIPv6String(ip string) bool {
|
||||
netIP := net.ParseIP(ip)
|
||||
return IsIPv6(netIP)
|
||||
}
|
||||
|
||||
func protoStr(proto v1.Protocol) string {
|
||||
return strings.ToLower(string(proto))
|
||||
}
|
||||
|
||||
func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
|
||||
if isIPv6 {
|
||||
parameters = append(parameters, "-f", "ipv6")
|
||||
@ -45,11 +53,11 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
|
||||
return parameters
|
||||
}
|
||||
|
||||
// ClearUDPConntrackForIP uses the conntrack tool to delete the conntrack entries
|
||||
// ClearEntriesForIP uses the conntrack tool to delete the conntrack entries
|
||||
// for the UDP connections specified by the given service IP
|
||||
func ClearUDPConntrackForIP(execer exec.Interface, ip string) error {
|
||||
parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", "udp")
|
||||
err := ExecConntrackTool(execer, parameters...)
|
||||
func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) error {
|
||||
parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol))
|
||||
err := Exec(execer, parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
@ -59,8 +67,8 @@ func ClearUDPConntrackForIP(execer exec.Interface, ip string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecConntrackTool executes the conntrack tool using the given parameters
|
||||
func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
|
||||
// Exec executes the conntrack tool using the given parameters
|
||||
func Exec(execer exec.Interface, parameters ...string) error {
|
||||
conntrackPath, err := execer.LookPath("conntrack")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error looking for path of conntrack: %v", err)
|
||||
@ -72,29 +80,36 @@ func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearUDPConntrackForPort uses the conntrack tool to delete the conntrack entries
|
||||
// for the UDP connections specified by the port.
|
||||
// Exists returns true if conntrack binary is installed.
|
||||
func Exists(execer exec.Interface) bool {
|
||||
_, err := execer.LookPath("conntrack")
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// ClearEntriesForPort uses the conntrack tool to delete the conntrack entries
|
||||
// for connections specified by the port.
|
||||
// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
|
||||
// The solution is clearing the conntrack. Known issues:
|
||||
// https://github.com/docker/docker/issues/8795
|
||||
// https://github.com/kubernetes/kubernetes/issues/31983
|
||||
func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) error {
|
||||
func ClearEntriesForPort(execer exec.Interface, port int, isIPv6 bool, protocol v1.Protocol) error {
|
||||
if port <= 0 {
|
||||
return fmt.Errorf("Wrong port number. The port number must be greater than zero")
|
||||
}
|
||||
parameters := parametersWithFamily(isIPv6, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
|
||||
err := ExecConntrackTool(execer, parameters...)
|
||||
parameters := parametersWithFamily(isIPv6, "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port))
|
||||
err := Exec(execer, parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearUDPConntrackForPeers uses the conntrack tool to delete the conntrack entries
|
||||
// for the UDP connections specified by the {origin, dest} IP pair.
|
||||
func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error {
|
||||
parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp")
|
||||
err := ExecConntrackTool(execer, parameters...)
|
||||
// ClearEntriesForNAT uses the conntrack tool to delete the conntrack entries
|
||||
// for connections specified by the {origin, dest} IP pair.
|
||||
func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.Protocol) error {
|
||||
parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest,
|
||||
"-p", protoStr(protocol))
|
||||
err := Exec(execer, parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
package conntrack
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/utils/exec"
|
||||
fakeexec "k8s.io/utils/exec/testing"
|
||||
)
|
||||
@ -61,7 +62,7 @@ func TestExecConntrackTool(t *testing.T) {
|
||||
expectErr := []bool{false, false, true}
|
||||
|
||||
for i := range testCases {
|
||||
err := ExecConntrackTool(&fexec, testCases[i]...)
|
||||
err := Exec(&fexec, testCases[i]...)
|
||||
|
||||
if expectErr[i] {
|
||||
if err == nil {
|
||||
@ -115,7 +116,7 @@ func TestClearUDPConntrackForIP(t *testing.T) {
|
||||
|
||||
svcCount := 0
|
||||
for _, tc := range testCases {
|
||||
if err := ClearUDPConntrackForIP(&fexec, tc.ip); err != nil {
|
||||
if err := ClearEntriesForIP(&fexec, tc.ip, v1.ProtocolUDP); err != nil {
|
||||
t.Errorf("%s test case:, Unexpected error: %v", tc.name, err)
|
||||
}
|
||||
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(IsIPv6String(tc.ip))
|
||||
@ -160,7 +161,7 @@ func TestClearUDPConntrackForPort(t *testing.T) {
|
||||
}
|
||||
svcCount := 0
|
||||
for _, tc := range testCases {
|
||||
err := ClearUDPConntrackForPort(&fexec, tc.port, tc.isIPv6)
|
||||
err := ClearEntriesForPort(&fexec, tc.port, tc.isIPv6, v1.ProtocolUDP)
|
||||
if err != nil {
|
||||
t.Errorf("%s test case: Unexpected error: %v", tc.name, err)
|
||||
}
|
||||
@ -218,7 +219,7 @@ func TestDeleteUDPConnections(t *testing.T) {
|
||||
}
|
||||
svcCount := 0
|
||||
for i, tc := range testCases {
|
||||
err := ClearUDPConntrackForPeers(&fexec, tc.origin, tc.dest)
|
||||
err := ClearEntriesForNAT(&fexec, tc.origin, tc.dest, v1.ProtocolUDP)
|
||||
if err != nil {
|
||||
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
|
||||
}
|
Loading…
Reference in New Issue
Block a user