Merge pull request #98130 from JornShen/optimze_redundant_listenPortOpener

migrate to use k8s.io/util/net/port in kube-proxy
This commit is contained in:
Kubernetes Prow Robot 2021-02-18 10:02:51 -08:00 committed by GitHub
commit 6dc317a107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 162 additions and 364 deletions

View File

@ -58,6 +58,7 @@ go_test(
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -189,7 +189,7 @@ type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
portsMap map[utilnet.LocalPort]utilnet.Closeable
nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid
@ -209,7 +209,7 @@ type Proxier struct {
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
portMapper utilproxy.PortOpener
portMapper utilnet.PortOpener
recorder record.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer
@ -242,14 +242,6 @@ type Proxier struct {
networkInterfacer utilproxy.NetworkInterfacer
}
// listenPortOpener opens ports by calling bind() and listen().
type listenPortOpener struct{}
// OpenLocalPort holds the given local port open.
func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
return openLocalPort(lp, isIPv6)
}
// Proxier implements proxy.Provider
var _ proxy.Provider = &Proxier{}
@ -306,7 +298,7 @@ func NewProxier(ipt utiliptables.Interface,
}
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
@ -319,7 +311,7 @@ func NewProxier(ipt utiliptables.Interface,
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
portMapper: &utilnet.ListenPortOpener,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
@ -983,7 +975,7 @@ func (proxier *Proxier) syncProxyRules() {
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{}
// We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing:
@ -1019,6 +1011,10 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
localPortIPFamily := utilnet.IPv4
if isIPv6 {
localPortIPFamily = utilnet.IPv6
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.serviceNameString
@ -1104,17 +1100,18 @@ func (proxier *Proxier) syncProxyRules() {
// machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work).
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
lp := utilproxy.LocalPort{
lp := utilnet.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
IPFamily: localPortIPFamily,
Port: svcInfo.Port(),
Protocol: protocol,
Protocol: utilnet.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
@ -1128,6 +1125,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "can't open port, skipping externalIP", "port", lp.String())
continue
}
klog.V(2).InfoS("Opened local port", "port", lp.String())
replacementPortsMap[lp] = socket
}
}
@ -1261,13 +1259,14 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
lps := make([]utilproxy.LocalPort, 0)
lps := make([]utilnet.LocalPort, 0)
for address := range nodeAddresses {
lp := utilproxy.LocalPort{
lp := utilnet.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(),
Protocol: protocol,
Protocol: utilnet.Protocol(svcInfo.Protocol()),
}
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
@ -1285,11 +1284,12 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String())
continue
}
klog.V(2).InfoS("Opened local port", "port", lp.String())
replacementPortsMap[lp] = socket
}
}
@ -1666,49 +1666,3 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket utilproxy.Closeable
switch lp.Protocol {
case "tcp":
network := "tcp4"
if isIPv6 {
network = "tcp6"
}
listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
socket = listener
case "udp":
network := "udp4"
if isIPv6 {
network = "udp6"
}
addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
}
klog.V(2).InfoS("Opened local port", "port", lp.String())
return socket, nil
}

View File

@ -44,6 +44,7 @@ import (
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
utilnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
)
@ -463,12 +464,12 @@ func (f *fakeCloseable) Close() error {
// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*utilproxy.LocalPort
openPorts []*utilnet.LocalPort
}
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) {
f.openPorts = append(f.openPorts, lp)
return &fakeCloseable{}, nil
}
@ -493,8 +494,8 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
masqueradeMark: "0x4000",
localDetector: detectLocal,
hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
portMapper: &fakePortOpener{[]*utilnet.LocalPort{}},
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil),

View File

@ -18,7 +18,6 @@ go_test(
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/ipvs/testing:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/proxy/util/iptables:go_default_library",
"//pkg/proxy/util/testing:go_default_library",
"//pkg/util/async:go_default_library",

View File

@ -219,7 +219,7 @@ type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
portsMap map[utilnet.LocalPort]utilnet.Closeable
nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
// corresponding objects are synced after startup. This is used to avoid updating
@ -246,7 +246,7 @@ type Proxier struct {
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
portMapper utilproxy.PortOpener
portMapper utilnet.PortOpener
recorder record.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer
@ -459,7 +459,7 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{
ipFamily: ipFamily,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
@ -474,7 +474,7 @@ func NewProxier(ipt utiliptables.Interface,
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
portMapper: &utilnet.ListenPortOpener,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
@ -1093,7 +1093,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{}
// activeIPVSServices represents IPVS service successfully created in this round of sync
activeIPVSServices := map[string]bool{}
// currentIPVSServices represent IPVS services listed from the system
@ -1164,6 +1164,10 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
localPortIPFamily := utilnet.IPv4
if isIPv6 {
localPortIPFamily = utilnet.IPv6
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
// Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles.
@ -1246,17 +1250,18 @@ func (proxier *Proxier) syncProxyRules() {
// (because the socket might open but it would never work).
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
// We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
lp := utilproxy.LocalPort{
lp := utilnet.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
IPFamily: localPortIPFamily,
Port: svcInfo.Port(),
Protocol: protocol,
Protocol: utilnet.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
@ -1270,6 +1275,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.Error(msg)
continue
}
klog.V(2).Infof("Opened local port %s", lp.String())
replacementPortsMap[lp] = socket
}
} // We're holding the port, so it's OK to install IPVS rules.
@ -1430,13 +1436,14 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
var lps []utilproxy.LocalPort
var lps []utilnet.LocalPort
for _, address := range nodeAddresses {
lp := utilproxy.LocalPort{
lp := utilnet.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(),
Protocol: protocol,
Protocol: utilnet.Protocol(svcInfo.Protocol()),
}
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
@ -1456,12 +1463,14 @@ func (proxier *Proxier) syncProxyRules() {
// We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
if lp.Protocol == "udp" {
klog.V(2).Infof("Opened local port %s", lp.String())
if lp.Protocol == utilnet.UDP {
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
}
replacementPortsMap[lp] = socket
@ -2194,60 +2203,6 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre
return legacyAddrs
}
// listenPortOpener opens ports by calling bind() and listen().
type listenPortOpener struct{}
// OpenLocalPort holds the given local port open.
func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
return openLocalPort(lp, isIPv6)
}
func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use ipvs to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because ipvs would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket utilproxy.Closeable
switch lp.Protocol {
case "tcp":
network := "tcp4"
if isIPv6 {
network = "tcp6"
}
listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
socket = listener
case "udp":
network := "udp4"
if isIPv6 {
network = "udp6"
}
addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
}
klog.V(2).Infof("Opened local port %s", lp.String())
return socket, nil
}
// ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
// It will only operate iptables *nat table.
// Create and link the kube postrouting chain for SNAT packets.

View File

@ -36,7 +36,6 @@ import (
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
"k8s.io/kubernetes/pkg/util/async"
@ -70,12 +69,12 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) {
// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*utilproxy.LocalPort
openPorts []*utilnet.LocalPort
}
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) {
f.openPorts = append(f.openPorts, lp)
return nil, nil
}
@ -151,8 +150,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
strictARP: false,
localDetector: proxyutiliptables.NewNoOpLocalDetector(),
hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
portMapper: &fakePortOpener{[]*utilnet.LocalPort{}},
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},

View File

@ -5,7 +5,6 @@ go_library(
srcs = [
"endpoints.go",
"network.go",
"port.go",
"utils.go",
],
importpath = "k8s.io/kubernetes/pkg/proxy/util",
@ -27,7 +26,6 @@ go_test(
name = "go_default_test",
srcs = [
"endpoints_test.go",
"port_test.go",
"utils_test.go",
],
embed = [":go_default_library"],
@ -36,6 +34,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
],
)

View File

@ -1,67 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"strconv"
"k8s.io/klog/v2"
)
// LocalPort describes a port on specific IP address and protocol
type LocalPort struct {
// Description is the identity message of a given local port.
Description string
// IP is the IP address part of a given local port.
// If this string is empty, the port binds to all local IP addresses.
IP string
// Port is the port part of a given local port.
Port int
// Protocol is the protocol part of a given local port.
// The value is assumed to be lower-case. For example, "udp" not "UDP", "tcp" not "TCP".
Protocol string
}
func (lp *LocalPort) String() string {
ipPort := net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))
return fmt.Sprintf("%q (%s/%s)", lp.Description, ipPort, lp.Protocol)
}
// Closeable is an interface around closing a port.
type Closeable interface {
Close() error
}
// PortOpener is an interface around port opening/closing.
// Abstracted out for testing.
type PortOpener interface {
OpenLocalPort(lp *LocalPort, isIPv6 bool) (Closeable, error)
}
// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync.
func RevertPorts(replacementPortsMap, originalPortsMap map[LocalPort]Closeable) {
for k, v := range replacementPortsMap {
// Only close newly opened local ports - leave ones that were open before this update
if originalPortsMap[k] == nil {
klog.V(2).Infof("Closing local port %s", k.String())
v.Close()
}
}
}

View File

@ -1,145 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import "testing"
type fakeClosable struct {
closed bool
}
func (c *fakeClosable) Close() error {
c.closed = true
return nil
}
func TestLocalPortString(t *testing.T) {
testCases := []struct {
description string
ip string
port int
protocol string
expectedStr string
}{
{"IPv4 UDP", "1.2.3.4", 9999, "udp", "\"IPv4 UDP\" (1.2.3.4:9999/udp)"},
{"IPv4 TCP", "5.6.7.8", 1053, "tcp", "\"IPv4 TCP\" (5.6.7.8:1053/tcp)"},
{"IPv6 TCP", "2001:db8::1", 80, "tcp", "\"IPv6 TCP\" ([2001:db8::1]:80/tcp)"},
{"IPv4 SCTP", "9.10.11.12", 7777, "sctp", "\"IPv4 SCTP\" (9.10.11.12:7777/sctp)"},
{"IPv6 SCTP", "2001:db8::2", 80, "sctp", "\"IPv6 SCTP\" ([2001:db8::2]:80/sctp)"},
}
for _, tc := range testCases {
lp := &LocalPort{
Description: tc.description,
IP: tc.ip,
Port: tc.port,
Protocol: tc.protocol,
}
str := lp.String()
if str != tc.expectedStr {
t.Errorf("Unexpected output for %s, expected: %s, got: %s", tc.description, tc.expectedStr, str)
}
}
}
func TestRevertPorts(t *testing.T) {
testCases := []struct {
replacementPorts []LocalPort
existingPorts []LocalPort
expectToBeClose []bool
}{
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{},
expectToBeClose: []bool{true, true, true},
},
{
replacementPorts: []LocalPort{},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
expectToBeClose: []bool{},
},
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
expectToBeClose: []bool{false, false, false},
},
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5003},
},
expectToBeClose: []bool{false, true, false},
},
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
{Port: 5004},
},
expectToBeClose: []bool{false, false, false},
},
}
for i, tc := range testCases {
replacementPortsMap := make(map[LocalPort]Closeable)
for _, lp := range tc.replacementPorts {
replacementPortsMap[lp] = &fakeClosable{}
}
existingPortsMap := make(map[LocalPort]Closeable)
for _, lp := range tc.existingPorts {
existingPortsMap[lp] = &fakeClosable{}
}
RevertPorts(replacementPortsMap, existingPortsMap)
for j, expectation := range tc.expectToBeClose {
if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation {
t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i)
}
}
for _, lp := range tc.existingPorts {
if existingPortsMap[lp].(*fakeClosable).closed == true {
t.Errorf("Expect existing localport %v to be false in test case %v", lp, i)
}
}
}
}

View File

@ -482,3 +482,15 @@ func WriteBytesLine(buf *bytes.Buffer, bytes []byte) {
buf.Write(bytes)
buf.WriteByte('\n')
}
// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync.
func RevertPorts(replacementPortsMap, originalPortsMap map[utilnet.LocalPort]utilnet.Closeable) {
for k, v := range replacementPortsMap {
// Only close newly opened local ports - leave ones that were open before this update
if originalPortsMap[k] == nil {
klog.V(2).Infof("Closing local port %s", k.String())
v.Close()
}
}
}

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
fake "k8s.io/kubernetes/pkg/proxy/util/testing"
utilnet "k8s.io/utils/net"
)
func TestValidateWorks(t *testing.T) {
@ -1048,7 +1049,103 @@ func TestGetClusterIPByFamily(t *testing.T) {
}
})
}
}
type fakeClosable struct {
closed bool
}
func (c *fakeClosable) Close() error {
c.closed = true
return nil
}
func TestRevertPorts(t *testing.T) {
testCases := []struct {
replacementPorts []utilnet.LocalPort
existingPorts []utilnet.LocalPort
expectToBeClose []bool
}{
{
replacementPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []utilnet.LocalPort{},
expectToBeClose: []bool{true, true, true},
},
{
replacementPorts: []utilnet.LocalPort{},
existingPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
expectToBeClose: []bool{},
},
{
replacementPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
expectToBeClose: []bool{false, false, false},
},
{
replacementPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5003},
},
expectToBeClose: []bool{false, true, false},
},
{
replacementPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []utilnet.LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
{Port: 5004},
},
expectToBeClose: []bool{false, false, false},
},
}
for i, tc := range testCases {
replacementPortsMap := make(map[utilnet.LocalPort]utilnet.Closeable)
for _, lp := range tc.replacementPorts {
replacementPortsMap[lp] = &fakeClosable{}
}
existingPortsMap := make(map[utilnet.LocalPort]utilnet.Closeable)
for _, lp := range tc.existingPorts {
existingPortsMap[lp] = &fakeClosable{}
}
RevertPorts(replacementPortsMap, existingPortsMap)
for j, expectation := range tc.expectToBeClose {
if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation {
t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i)
}
}
for _, lp := range tc.existingPorts {
if existingPortsMap[lp].(*fakeClosable).closed == true {
t.Errorf("Expect existing localport %v to be false in test case %v", lp, i)
}
}
}
}
func TestWriteLine(t *testing.T) {

View File

@ -23,7 +23,6 @@ go_library(
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/metaproxier:go_default_library",
"//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
@ -67,7 +66,6 @@ go_test(
"@io_bazel_rules_go//go/platform:windows": [
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -50,7 +50,6 @@ import (
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
utilnet "k8s.io/utils/net"
)
@ -446,7 +445,6 @@ type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating hns policies
// with some partial data after kube-proxy restart.
@ -619,7 +617,6 @@ func NewProxier(
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.WindowsEndpointSliceProxying)
proxier := &Proxier{
endPointsRefCount: make(endPointsReferenceCountMap),
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
endpointsMap: make(proxy.EndpointsMap),
masqueradeAll: masqueradeAll,

View File

@ -27,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilpointer "k8s.io/utils/pointer"
"net"
"strings"
@ -119,7 +118,6 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
networkType: networkType,
}
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
endpointsMap: make(proxy.EndpointsMap),
clusterCIDR: clusterCIDR,