Merge pull request #35334 from timothysc/proxy_min_sync
Automatic merge from submit-queue Proxy min sync period <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md 2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md 3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes --> **What this PR does / why we need it**: Gives the proxy the option to set a lower bound on the sync period when there are a high number of endpoint changes. This prevents excessive iptables re-writes under a number of conditions. fixes #33693 and alleviates the symptoms of #26637 **NOTE:** There are other minor fixes that I'm working on but keeping the PRs separate. **Release note**: <!-- Steps to write your release note: 1. Use the release-note-* labels to set the release note state (if you have access) 2. Enter your extended release note in the below block; leaving it blank means using the PR title as the release note. If no release note is required, just write `NONE`. --> `Added iptables-min-syn-period(2) to proxy to prevent excessive iptables writes`
This commit is contained in:
		@@ -77,7 +77,8 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
 | 
			
		||||
	fs.StringVar(&s.HostnameOverride, "hostname-override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.")
 | 
			
		||||
	fs.Var(&s.Mode, "proxy-mode", "Which proxy mode to use: 'userspace' (older) or 'iptables' (faster). If blank, look at the Node object on the Kubernetes API and respect the '"+ExperimentalProxyModeAnnotation+"' annotation if provided.  Otherwise use the best-available proxy (currently iptables).  If the iptables proxy is selected, regardless of how, but the system's kernel or iptables versions are insufficient, this always falls back to the userspace proxy.")
 | 
			
		||||
	fs.Int32Var(s.IPTablesMasqueradeBit, "iptables-masquerade-bit", util.Int32PtrDerefOr(s.IPTablesMasqueradeBit, 14), "If using the pure iptables proxy, the bit of the fwmark space to mark packets requiring SNAT with.  Must be within the range [0, 31].")
 | 
			
		||||
	fs.DurationVar(&s.IPTablesSyncPeriod.Duration, "iptables-sync-period", s.IPTablesSyncPeriod.Duration, "How often iptables rules are refreshed (e.g. '5s', '1m', '2h22m').  Must be greater than 0.")
 | 
			
		||||
	fs.DurationVar(&s.IPTablesSyncPeriod.Duration, "iptables-sync-period", s.IPTablesSyncPeriod.Duration, "The maximum interval of how often iptables rules are refreshed (e.g. '5s', '1m', '2h22m').  Must be greater than 0.")
 | 
			
		||||
	fs.DurationVar(&s.IPTablesMinSyncPeriod.Duration, "iptables-min-sync-period", s.IPTablesMinSyncPeriod.Duration, "The minimum interval of how often the iptables rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m').  Must be greater than 0.")
 | 
			
		||||
	fs.DurationVar(&s.ConfigSyncPeriod, "config-sync-period", s.ConfigSyncPeriod, "How often configuration from the apiserver is refreshed.  Must be greater than 0.")
 | 
			
		||||
	fs.BoolVar(&s.MasqueradeAll, "masquerade-all", s.MasqueradeAll, "If using the pure iptables proxy, SNAT everything")
 | 
			
		||||
	fs.StringVar(&s.ClusterCIDR, "cluster-cidr", s.ClusterCIDR, "The CIDR range of pods in the cluster. It is used to bridge traffic coming from outside of the cluster. If not provided, no off-cluster bridging will be performed.")
 | 
			
		||||
 
 | 
			
		||||
@@ -206,7 +206,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
			
		||||
			// IPTablesMasqueradeBit must be specified or defaulted.
 | 
			
		||||
			return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
 | 
			
		||||
		}
 | 
			
		||||
		proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
 | 
			
		||||
		proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Fatalf("Unable to create proxier: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -229,6 +229,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
			
		||||
			iptInterface,
 | 
			
		||||
			*utilnet.ParsePortRangeOrDie(config.PortRange),
 | 
			
		||||
			config.IPTablesSyncPeriod.Duration,
 | 
			
		||||
			config.IPTablesMinSyncPeriod.Duration,
 | 
			
		||||
			config.UDPIdleTimeout.Duration,
 | 
			
		||||
		)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -272,6 +272,7 @@ instance-metadata
 | 
			
		||||
instance-name-prefix
 | 
			
		||||
iptables-drop-bit
 | 
			
		||||
iptables-masquerade-bit
 | 
			
		||||
iptables-min-sync-period
 | 
			
		||||
iptables-sync-period
 | 
			
		||||
ir-data-source
 | 
			
		||||
ir-dbname
 | 
			
		||||
 
 | 
			
		||||
@@ -44,6 +44,9 @@ type KubeProxyConfiguration struct {
 | 
			
		||||
	// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
 | 
			
		||||
	// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
 | 
			
		||||
	// kubeconfigPath is the path to the kubeconfig file with authorization information (the
 | 
			
		||||
	// master location is set by the master flag).
 | 
			
		||||
	KubeconfigPath string `json:"kubeconfigPath"`
 | 
			
		||||
 
 | 
			
		||||
@@ -80,6 +80,9 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
 | 
			
		||||
	if obj.IPTablesSyncPeriod.Duration == 0 {
 | 
			
		||||
		obj.IPTablesSyncPeriod = unversioned.Duration{Duration: 30 * time.Second}
 | 
			
		||||
	}
 | 
			
		||||
	if obj.IPTablesMinSyncPeriod.Duration == 0 {
 | 
			
		||||
		obj.IPTablesMinSyncPeriod = unversioned.Duration{Duration: 2 * time.Second}
 | 
			
		||||
	}
 | 
			
		||||
	zero := unversioned.Duration{}
 | 
			
		||||
	if obj.UDPIdleTimeout == zero {
 | 
			
		||||
		obj.UDPIdleTimeout = unversioned.Duration{Duration: 250 * time.Millisecond}
 | 
			
		||||
 
 | 
			
		||||
@@ -41,6 +41,9 @@ type KubeProxyConfiguration struct {
 | 
			
		||||
	// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
 | 
			
		||||
	// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
 | 
			
		||||
	// kubeconfigPath is the path to the kubeconfig file with authorization information (the
 | 
			
		||||
	// master location is set by the master flag).
 | 
			
		||||
	KubeconfigPath string `json:"kubeconfigPath"`
 | 
			
		||||
 
 | 
			
		||||
@@ -68,6 +68,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon
 | 
			
		||||
	out.HostnameOverride = in.HostnameOverride
 | 
			
		||||
	out.IPTablesMasqueradeBit = (*int32)(unsafe.Pointer(in.IPTablesMasqueradeBit))
 | 
			
		||||
	out.IPTablesSyncPeriod = in.IPTablesSyncPeriod
 | 
			
		||||
	out.IPTablesMinSyncPeriod = in.IPTablesMinSyncPeriod
 | 
			
		||||
	out.KubeconfigPath = in.KubeconfigPath
 | 
			
		||||
	out.MasqueradeAll = in.MasqueradeAll
 | 
			
		||||
	out.Master = in.Master
 | 
			
		||||
@@ -96,6 +97,7 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
 | 
			
		||||
	out.HostnameOverride = in.HostnameOverride
 | 
			
		||||
	out.IPTablesMasqueradeBit = (*int32)(unsafe.Pointer(in.IPTablesMasqueradeBit))
 | 
			
		||||
	out.IPTablesSyncPeriod = in.IPTablesSyncPeriod
 | 
			
		||||
	out.IPTablesMinSyncPeriod = in.IPTablesMinSyncPeriod
 | 
			
		||||
	out.KubeconfigPath = in.KubeconfigPath
 | 
			
		||||
	out.MasqueradeAll = in.MasqueradeAll
 | 
			
		||||
	out.Master = in.Master
 | 
			
		||||
 
 | 
			
		||||
@@ -65,6 +65,7 @@ func DeepCopy_v1alpha1_KubeProxyConfiguration(in interface{}, out interface{}, c
 | 
			
		||||
			out.IPTablesMasqueradeBit = nil
 | 
			
		||||
		}
 | 
			
		||||
		out.IPTablesSyncPeriod = in.IPTablesSyncPeriod
 | 
			
		||||
		out.IPTablesMinSyncPeriod = in.IPTablesMinSyncPeriod
 | 
			
		||||
		out.KubeconfigPath = in.KubeconfigPath
 | 
			
		||||
		out.MasqueradeAll = in.MasqueradeAll
 | 
			
		||||
		out.Master = in.Master
 | 
			
		||||
 
 | 
			
		||||
@@ -153,6 +153,7 @@ func DeepCopy_componentconfig_KubeProxyConfiguration(in interface{}, out interfa
 | 
			
		||||
			out.IPTablesMasqueradeBit = nil
 | 
			
		||||
		}
 | 
			
		||||
		out.IPTablesSyncPeriod = in.IPTablesSyncPeriod
 | 
			
		||||
		out.IPTablesMinSyncPeriod = in.IPTablesMinSyncPeriod
 | 
			
		||||
		out.KubeconfigPath = in.KubeconfigPath
 | 
			
		||||
		out.MasqueradeAll = in.MasqueradeAll
 | 
			
		||||
		out.Master = in.Master
 | 
			
		||||
 
 | 
			
		||||
@@ -1911,6 +1911,12 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
 | 
			
		||||
							Ref:         spec.MustCreateRef("#/definitions/unversioned.Duration"),
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
					"iptablesMinSyncPeriodSeconds": {
 | 
			
		||||
						SchemaProps: spec.SchemaProps{
 | 
			
		||||
							Description: "iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m', '2h22m').  Must be greater than 0.",
 | 
			
		||||
							Ref:         spec.MustCreateRef("#/definitions/unversioned.Duration"),
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
					"kubeconfigPath": {
 | 
			
		||||
						SchemaProps: spec.SchemaProps{
 | 
			
		||||
							Description: "kubeconfigPath is the path to the kubeconfig file with authorization information (the master location is set by the master flag).",
 | 
			
		||||
@@ -2000,7 +2006,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				Required: []string{"TypeMeta", "bindAddress", "clusterCIDR", "healthzBindAddress", "healthzPort", "hostnameOverride", "iptablesMasqueradeBit", "iptablesSyncPeriodSeconds", "kubeconfigPath", "masqueradeAll", "master", "oomScoreAdj", "mode", "portRange", "resourceContainer", "udpTimeoutMilliseconds", "conntrackMax", "conntrackMaxPerCore", "conntrackMin", "conntrackTCPEstablishedTimeout", "conntrackTCPCloseWaitTimeout"},
 | 
			
		||||
				Required: []string{"TypeMeta", "bindAddress", "clusterCIDR", "healthzBindAddress", "healthzPort", "hostnameOverride", "iptablesMasqueradeBit", "iptablesSyncPeriodSeconds", "iptablesMinSyncPeriodSeconds", "kubeconfigPath", "masqueradeAll", "master", "oomScoreAdj", "mode", "portRange", "resourceContainer", "udpTimeoutMilliseconds", "conntrackMax", "conntrackMaxPerCore", "conntrackMin", "conntrackTCPEstablishedTimeout", "conntrackTCPCloseWaitTimeout"},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Dependencies: []string{
 | 
			
		||||
@@ -13645,6 +13651,12 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
 | 
			
		||||
							Ref:         spec.MustCreateRef("#/definitions/unversioned.Duration"),
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
					"iptablesMinSyncPeriodSeconds": {
 | 
			
		||||
						SchemaProps: spec.SchemaProps{
 | 
			
		||||
							Description: "iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m', '2h22m').  Must be greater than 0.",
 | 
			
		||||
							Ref:         spec.MustCreateRef("#/definitions/unversioned.Duration"),
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
					"kubeconfigPath": {
 | 
			
		||||
						SchemaProps: spec.SchemaProps{
 | 
			
		||||
							Description: "kubeconfigPath is the path to the kubeconfig file with authorization information (the master location is set by the master flag).",
 | 
			
		||||
@@ -13734,7 +13746,7 @@ var OpenAPIDefinitions *common.OpenAPIDefinitions = &common.OpenAPIDefinitions{
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				Required: []string{"TypeMeta", "bindAddress", "clusterCIDR", "healthzBindAddress", "healthzPort", "hostnameOverride", "iptablesMasqueradeBit", "iptablesSyncPeriodSeconds", "kubeconfigPath", "masqueradeAll", "master", "oomScoreAdj", "mode", "portRange", "resourceContainer", "udpTimeoutMilliseconds", "conntrackMax", "conntrackMaxPerCore", "conntrackMin", "conntrackTCPEstablishedTimeout", "conntrackTCPCloseWaitTimeout"},
 | 
			
		||||
				Required: []string{"TypeMeta", "bindAddress", "clusterCIDR", "healthzBindAddress", "healthzPort", "hostnameOverride", "iptablesMasqueradeBit", "iptablesSyncPeriodSeconds", "iptablesMinSyncPeriodSeconds", "kubeconfigPath", "masqueradeAll", "master", "oomScoreAdj", "mode", "portRange", "resourceContainer", "udpTimeoutMilliseconds", "conntrackMax", "conntrackMaxPerCore", "conntrackMin", "conntrackTCPEstablishedTimeout", "conntrackTCPCloseWaitTimeout"},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Dependencies: []string{
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@ go_library(
 | 
			
		||||
        "//pkg/types:go_default_library",
 | 
			
		||||
        "//pkg/util/config:go_default_library",
 | 
			
		||||
        "//pkg/util/exec:go_default_library",
 | 
			
		||||
        "//pkg/util/flowcontrol:go_default_library",
 | 
			
		||||
        "//pkg/util/iptables:go_default_library",
 | 
			
		||||
        "//pkg/util/sets:go_default_library",
 | 
			
		||||
        "//pkg/util/slice:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -42,6 +42,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/types"
 | 
			
		||||
	featuregate "k8s.io/kubernetes/pkg/util/config"
 | 
			
		||||
	utilexec "k8s.io/kubernetes/pkg/util/exec"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/flowcontrol"
 | 
			
		||||
	utiliptables "k8s.io/kubernetes/pkg/util/iptables"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/sets"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/slice"
 | 
			
		||||
@@ -167,9 +168,11 @@ type Proxier struct {
 | 
			
		||||
	portsMap                    map[localPort]closeable
 | 
			
		||||
	haveReceivedServiceUpdate   bool // true once we've seen an OnServiceUpdate event
 | 
			
		||||
	haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
 | 
			
		||||
	throttle                    flowcontrol.RateLimiter
 | 
			
		||||
 | 
			
		||||
	// These are effectively const and do not need the mutex to be held.
 | 
			
		||||
	syncPeriod     time.Duration
 | 
			
		||||
	minSyncPeriod  time.Duration
 | 
			
		||||
	iptables       utiliptables.Interface
 | 
			
		||||
	masqueradeAll  bool
 | 
			
		||||
	masqueradeMark string
 | 
			
		||||
@@ -217,7 +220,12 @@ var _ proxy.ProxyProvider = &Proxier{}
 | 
			
		||||
// An error will be returned if iptables fails to update or acquire the initial lock.
 | 
			
		||||
// Once a proxier is created, it will keep iptables up to date in the background and
 | 
			
		||||
// will not terminate if a particular iptables call fails.
 | 
			
		||||
func NewProxier(ipt utiliptables.Interface, sysctl utilsysctl.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string, nodeIP net.IP) (*Proxier, error) {
 | 
			
		||||
func NewProxier(ipt utiliptables.Interface, sysctl utilsysctl.Interface, exec utilexec.Interface, syncPeriod time.Duration, minSyncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string, nodeIP net.IP) (*Proxier, error) {
 | 
			
		||||
	// check valid user input
 | 
			
		||||
	if minSyncPeriod == 0 || minSyncPeriod > syncPeriod {
 | 
			
		||||
		return nil, fmt.Errorf("min-sync (%v) must be < sync(%v) and > 0 ", minSyncPeriod, syncPeriod)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Set the route_localnet sysctl we need for
 | 
			
		||||
	if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
 | 
			
		||||
@@ -244,11 +252,16 @@ func NewProxier(ipt utiliptables.Interface, sysctl utilsysctl.Interface, exec ut
 | 
			
		||||
 | 
			
		||||
	go healthcheck.Run()
 | 
			
		||||
 | 
			
		||||
	syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
 | 
			
		||||
 | 
			
		||||
	return &Proxier{
 | 
			
		||||
		serviceMap:     make(map[proxy.ServicePortName]*serviceInfo),
 | 
			
		||||
		endpointsMap:   make(map[proxy.ServicePortName][]*endpointsInfo),
 | 
			
		||||
		portsMap:       make(map[localPort]closeable),
 | 
			
		||||
		syncPeriod:     syncPeriod,
 | 
			
		||||
		serviceMap:    make(map[proxy.ServicePortName]*serviceInfo),
 | 
			
		||||
		endpointsMap:  make(map[proxy.ServicePortName][]*endpointsInfo),
 | 
			
		||||
		portsMap:      make(map[localPort]closeable),
 | 
			
		||||
		syncPeriod:    syncPeriod,
 | 
			
		||||
		minSyncPeriod: minSyncPeriod,
 | 
			
		||||
		// The average use case will process 2 updates in short succession
 | 
			
		||||
		throttle:       flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2),
 | 
			
		||||
		iptables:       ipt,
 | 
			
		||||
		masqueradeAll:  masqueradeAll,
 | 
			
		||||
		masqueradeMark: masqueradeMark,
 | 
			
		||||
@@ -765,6 +778,9 @@ func (proxier *Proxier) execConntrackTool(parameters ...string) error {
 | 
			
		||||
// The only other iptables rules are those that are setup in iptablesInit()
 | 
			
		||||
// assumes proxier.mu is held
 | 
			
		||||
func (proxier *Proxier) syncProxyRules() {
 | 
			
		||||
	if proxier.throttle != nil {
 | 
			
		||||
		proxier.throttle.Accept()
 | 
			
		||||
	}
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
 | 
			
		||||
 
 | 
			
		||||
@@ -87,6 +87,7 @@ type Proxier struct {
 | 
			
		||||
	mu             sync.Mutex // protects serviceMap
 | 
			
		||||
	serviceMap     map[proxy.ServicePortName]*serviceInfo
 | 
			
		||||
	syncPeriod     time.Duration
 | 
			
		||||
	minSyncPeriod  time.Duration // unused atm, but plumbed through
 | 
			
		||||
	udpIdleTimeout time.Duration
 | 
			
		||||
	portMapMutex   sync.Mutex
 | 
			
		||||
	portMap        map[portMapKey]*portMapValue
 | 
			
		||||
@@ -139,7 +140,7 @@ func IsProxyLocked(err error) bool {
 | 
			
		||||
// if iptables fails to update or acquire the initial lock. Once a proxier is
 | 
			
		||||
// created, it will keep iptables up to date in the background and will not
 | 
			
		||||
// terminate if a particular iptables call fails.
 | 
			
		||||
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
 | 
			
		||||
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
 | 
			
		||||
	if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
 | 
			
		||||
		return nil, ErrProxyOnLocalhost
 | 
			
		||||
	}
 | 
			
		||||
@@ -157,10 +158,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
 | 
			
		||||
	proxyPorts := newPortAllocator(pr)
 | 
			
		||||
 | 
			
		||||
	glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
 | 
			
		||||
	return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, udpIdleTimeout)
 | 
			
		||||
	return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
 | 
			
		||||
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
 | 
			
		||||
	// convenient to pass nil for tests..
 | 
			
		||||
	if proxyPorts == nil {
 | 
			
		||||
		proxyPorts = newPortAllocator(utilnet.PortRange{})
 | 
			
		||||
@@ -175,10 +176,12 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
 | 
			
		||||
		return nil, fmt.Errorf("failed to flush iptables: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	return &Proxier{
 | 
			
		||||
		loadBalancer:   loadBalancer,
 | 
			
		||||
		serviceMap:     make(map[proxy.ServicePortName]*serviceInfo),
 | 
			
		||||
		portMap:        make(map[portMapKey]*portMapValue),
 | 
			
		||||
		syncPeriod:     syncPeriod,
 | 
			
		||||
		loadBalancer: loadBalancer,
 | 
			
		||||
		serviceMap:   make(map[proxy.ServicePortName]*serviceInfo),
 | 
			
		||||
		portMap:      make(map[portMapKey]*portMapValue),
 | 
			
		||||
		syncPeriod:   syncPeriod,
 | 
			
		||||
		// plumbed through if needed, not used atm.
 | 
			
		||||
		minSyncPeriod:  minSyncPeriod,
 | 
			
		||||
		udpIdleTimeout: udpIdleTimeout,
 | 
			
		||||
		listenIP:       listenIP,
 | 
			
		||||
		iptables:       iptables,
 | 
			
		||||
 
 | 
			
		||||
@@ -210,7 +210,7 @@ func TestTCPProxy(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -237,7 +237,7 @@ func TestUDPProxy(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -264,7 +264,7 @@ func TestUDPProxyTimeout(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -300,7 +300,7 @@ func TestMultiPortProxy(t *testing.T) {
 | 
			
		||||
		}},
 | 
			
		||||
	}})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -327,7 +327,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
 | 
			
		||||
	serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
 | 
			
		||||
	serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -390,7 +390,7 @@ func TestTCPProxyStop(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -434,7 +434,7 @@ func TestUDPProxyStop(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -472,7 +472,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -509,7 +509,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -545,7 +545,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -598,7 +598,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -652,7 +652,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -700,7 +700,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -745,7 +745,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -796,7 +796,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
 | 
			
		||||
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -44,6 +44,9 @@ type KubeProxyConfiguration struct {
 | 
			
		||||
	// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
 | 
			
		||||
	// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
 | 
			
		||||
	// kubeconfigPath is the path to the kubeconfig file with authorization information (the
 | 
			
		||||
	// master location is set by the master flag).
 | 
			
		||||
	KubeconfigPath string `json:"kubeconfigPath"`
 | 
			
		||||
 
 | 
			
		||||
@@ -80,6 +80,9 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
 | 
			
		||||
	if obj.IPTablesSyncPeriod.Duration == 0 {
 | 
			
		||||
		obj.IPTablesSyncPeriod = unversioned.Duration{Duration: 30 * time.Second}
 | 
			
		||||
	}
 | 
			
		||||
	if obj.IPTablesMinSyncPeriod.Duration == 0 {
 | 
			
		||||
		obj.IPTablesMinSyncPeriod = unversioned.Duration{Duration: 2 * time.Second}
 | 
			
		||||
	}
 | 
			
		||||
	zero := unversioned.Duration{}
 | 
			
		||||
	if obj.UDPIdleTimeout == zero {
 | 
			
		||||
		obj.UDPIdleTimeout = unversioned.Duration{Duration: 250 * time.Millisecond}
 | 
			
		||||
 
 | 
			
		||||
@@ -41,6 +41,9 @@ type KubeProxyConfiguration struct {
 | 
			
		||||
	// iptablesSyncPeriod is the period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesSyncPeriod unversioned.Duration `json:"iptablesSyncPeriodSeconds"`
 | 
			
		||||
	// iptablesMinSyncPeriod is the minimum period that iptables rules are refreshed (e.g. '5s', '1m',
 | 
			
		||||
	// '2h22m').  Must be greater than 0.
 | 
			
		||||
	IPTablesMinSyncPeriod unversioned.Duration `json:"iptablesMinSyncPeriodSeconds"`
 | 
			
		||||
	// kubeconfigPath is the path to the kubeconfig file with authorization information (the
 | 
			
		||||
	// master location is set by the master flag).
 | 
			
		||||
	KubeconfigPath string `json:"kubeconfigPath"`
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								vendor/BUILD
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								vendor/BUILD
									
									
									
									
										vendored
									
									
								
							@@ -9870,13 +9870,11 @@ go_library(
 | 
			
		||||
        "k8s.io/client-go/pkg/apis/componentconfig/doc.go",
 | 
			
		||||
        "k8s.io/client-go/pkg/apis/componentconfig/helpers.go",
 | 
			
		||||
        "k8s.io/client-go/pkg/apis/componentconfig/register.go",
 | 
			
		||||
        "k8s.io/client-go/pkg/apis/componentconfig/types.generated.go",
 | 
			
		||||
        "k8s.io/client-go/pkg/apis/componentconfig/types.go",
 | 
			
		||||
        "k8s.io/client-go/pkg/apis/componentconfig/zz_generated.deepcopy.go",
 | 
			
		||||
    ],
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//vendor:github.com/ugorji/go/codec",
 | 
			
		||||
        "//vendor:k8s.io/client-go/pkg/api/unversioned",
 | 
			
		||||
        "//vendor:k8s.io/client-go/pkg/conversion",
 | 
			
		||||
        "//vendor:k8s.io/client-go/pkg/runtime",
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user