Merge pull request #82091 from khenidak/ipvs-dualstack
dualstack: IPVS proxier
This commit is contained in:
@@ -79,6 +79,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:darwin": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -86,6 +87,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:dragonfly": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -93,6 +95,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:freebsd": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -100,6 +103,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:linux": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -107,6 +111,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:nacl": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -114,6 +119,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:netbsd": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -121,6 +127,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:openbsd": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -128,6 +135,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:plan9": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -135,6 +143,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:solaris": [
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
@@ -142,6 +151,7 @@ go_library(
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
"@io_bazel_rules_go//go/platform:windows": [
|
||||
"//pkg/proxy/winkernel:go_default_library",
|
||||
|
@@ -24,12 +24,15 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
|
||||
@@ -46,6 +49,7 @@ import (
|
||||
utilnode "k8s.io/kubernetes/pkg/util/node"
|
||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||
"k8s.io/utils/exec"
|
||||
utilsnet "k8s.io/utils/net"
|
||||
|
||||
"k8s.io/klog"
|
||||
)
|
||||
@@ -170,6 +174,40 @@ func newProxyServer(
|
||||
metrics.RegisterMetrics()
|
||||
} else if proxyMode == proxyModeIPVS {
|
||||
klog.V(0).Info("Using ipvs Proxier.")
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
|
||||
klog.V(0).Info("creating dualStackProxier for ipvs.")
|
||||
|
||||
// Create iptables handlers for both families, one is already created
|
||||
var ipt [2]utiliptables.Interface
|
||||
if iptInterface.IsIpv6() {
|
||||
ipt[1] = iptInterface
|
||||
ipt[0] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4)
|
||||
} else {
|
||||
ipt[0] = iptInterface
|
||||
ipt[1] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6)
|
||||
}
|
||||
|
||||
proxier, err = ipvs.NewDualStackProxier(
|
||||
ipt,
|
||||
ipvsInterface,
|
||||
ipsetInterface,
|
||||
utilsysctl.New(),
|
||||
execer,
|
||||
config.IPVS.SyncPeriod.Duration,
|
||||
config.IPVS.MinSyncPeriod.Duration,
|
||||
config.IPVS.ExcludeCIDRs,
|
||||
config.IPVS.StrictARP,
|
||||
config.IPTables.MasqueradeAll,
|
||||
int(*config.IPTables.MasqueradeBit),
|
||||
cidrTuple(config.ClusterCIDR),
|
||||
hostname,
|
||||
nodeIPTuple(config.BindAddress),
|
||||
recorder,
|
||||
healthzServer,
|
||||
config.IPVS.Scheduler,
|
||||
config.NodePortAddresses,
|
||||
)
|
||||
} else {
|
||||
proxier, err = ipvs.NewProxier(
|
||||
iptInterface,
|
||||
ipvsInterface,
|
||||
@@ -190,6 +228,7 @@ func newProxyServer(
|
||||
config.IPVS.Scheduler,
|
||||
config.NodePortAddresses,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
}
|
||||
@@ -238,6 +277,46 @@ func newProxyServer(
|
||||
}, nil
|
||||
}
|
||||
|
||||
// cidrTuple takes a comma separated list of CIDRs and return a tuple (ipv4cidr,ipv6cidr)
|
||||
// The returned tuple is guaranteed to have the order (ipv4,ipv6) and if no cidr from a family is found an
|
||||
// empty string "" is inserted.
|
||||
func cidrTuple(cidrList string) [2]string {
|
||||
cidrs := [2]string{"", ""}
|
||||
foundIPv4 := false
|
||||
foundIPv6 := false
|
||||
|
||||
for _, cidr := range strings.Split(cidrList, ",") {
|
||||
if utilsnet.IsIPv6CIDRString(cidr) && !foundIPv6 {
|
||||
cidrs[1] = cidr
|
||||
foundIPv6 = true
|
||||
} else if !foundIPv4 {
|
||||
cidrs[0] = cidr
|
||||
foundIPv4 = true
|
||||
}
|
||||
if foundIPv6 && foundIPv4 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return cidrs
|
||||
}
|
||||
|
||||
// nodeIPTuple takes an addresses and return a tuple (ipv4,ipv6)
|
||||
// The returned tuple is guaranteed to have the order (ipv4,ipv6). The address NOT of the passed address
|
||||
// will have "any" address (0.0.0.0 or ::) inserted.
|
||||
func nodeIPTuple(bindAddress string) [2]net.IP {
|
||||
nodes := [2]net.IP{net.IPv4zero, net.IPv6zero}
|
||||
|
||||
adr := net.ParseIP(bindAddress)
|
||||
if utilsnet.IsIPv6(adr) {
|
||||
nodes[1] = adr
|
||||
} else {
|
||||
nodes[0] = adr
|
||||
}
|
||||
|
||||
return nodes
|
||||
}
|
||||
|
||||
func getProxyMode(proxyMode string, khandle ipvs.KernelHandler, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string {
|
||||
switch proxyMode {
|
||||
case proxyModeUserspace:
|
||||
|
@@ -43,10 +43,12 @@ go_library(
|
||||
srcs = [
|
||||
"graceful_termination.go",
|
||||
"ipset.go",
|
||||
"meta_proxier.go",
|
||||
"netlink.go",
|
||||
"netlink_linux.go",
|
||||
"netlink_unsupported.go",
|
||||
"proxier.go",
|
||||
"safe_ipset.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/proxy/ipvs",
|
||||
deps = [
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
|
||||
"fmt"
|
||||
"k8s.io/klog"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -102,6 +103,20 @@ func NewIPSet(handle utilipset.Interface, name string, setType utilipset.Type, i
|
||||
hashFamily := utilipset.ProtocolFamilyIPV4
|
||||
if isIPv6 {
|
||||
hashFamily = utilipset.ProtocolFamilyIPV6
|
||||
// In dual-stack both ipv4 and ipv6 ipset's can co-exist. To
|
||||
// ensure unique names the prefix for ipv6 is changed from
|
||||
// "KUBE-" to "KUBE-6-". The "KUBE-" prefix is kept for
|
||||
// backward compatibility. The maximum name length of an ipset
|
||||
// is 31 characters which must be taken into account. The
|
||||
// ipv4 names are not altered to minimize the risk for
|
||||
// problems on upgrades.
|
||||
if strings.HasPrefix(name, "KUBE-") {
|
||||
name = strings.Replace(name, "KUBE-", "KUBE-6-", 1)
|
||||
if len(name) > 31 {
|
||||
klog.Warningf("ipset name truncated; [%s] -> [%s]", name, name[:31])
|
||||
name = name[:31]
|
||||
}
|
||||
}
|
||||
}
|
||||
set := &IPSet{
|
||||
IPSet: utilipset.IPSet{
|
||||
|
203
pkg/proxy/ipvs/meta_proxier.go
Normal file
203
pkg/proxy/ipvs/meta_proxier.go
Normal file
@@ -0,0 +1,203 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ipvs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
utilnet "k8s.io/utils/net"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1alpha1"
|
||||
)
|
||||
|
||||
type metaProxier struct {
|
||||
ipv4Proxier proxy.Provider
|
||||
ipv6Proxier proxy.Provider
|
||||
}
|
||||
|
||||
// NewMetaProxier returns a dual-stack "meta-proxier". Proxier API
|
||||
// calls will be dispatched to the ProxyProvider instances depending
|
||||
// on address family.
|
||||
func NewMetaProxier(ipv4Proxier, ipv6Proxier proxy.Provider) proxy.Provider {
|
||||
return proxy.Provider(&metaProxier{
|
||||
ipv4Proxier: ipv4Proxier,
|
||||
ipv6Proxier: ipv6Proxier,
|
||||
})
|
||||
}
|
||||
|
||||
// Sync immediately synchronizes the ProxyProvider's current state to
|
||||
// proxy rules.
|
||||
func (proxier *metaProxier) Sync() {
|
||||
proxier.ipv4Proxier.Sync()
|
||||
proxier.ipv6Proxier.Sync()
|
||||
}
|
||||
|
||||
// SyncLoop runs periodic work. This is expected to run as a
|
||||
// goroutine or as the main loop of the app. It does not return.
|
||||
func (proxier *metaProxier) SyncLoop() {
|
||||
go proxier.ipv6Proxier.SyncLoop() // Use go-routine here!
|
||||
proxier.ipv4Proxier.SyncLoop() // never returns
|
||||
}
|
||||
|
||||
// OnServiceAdd is called whenever creation of new service object is observed.
|
||||
func (proxier *metaProxier) OnServiceAdd(service *v1.Service) {
|
||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnServiceAdd(service)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnServiceAdd(service)
|
||||
}
|
||||
|
||||
// OnServiceUpdate is called whenever modification of an existing
|
||||
// service object is observed.
|
||||
func (proxier *metaProxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
// IPFamily is immutable, hence we only need to check on the new service
|
||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnServiceUpdate(oldService, service)
|
||||
return
|
||||
}
|
||||
|
||||
proxier.ipv6Proxier.OnServiceUpdate(oldService, service)
|
||||
}
|
||||
|
||||
// OnServiceDelete is called whenever deletion of an existing service
|
||||
// object is observed.
|
||||
func (proxier *metaProxier) OnServiceDelete(service *v1.Service) {
|
||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnServiceDelete(service)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnServiceDelete(service)
|
||||
}
|
||||
|
||||
// OnServiceSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
func (proxier *metaProxier) OnServiceSynced() {
|
||||
proxier.ipv4Proxier.OnServiceSynced()
|
||||
proxier.ipv6Proxier.OnServiceSynced()
|
||||
}
|
||||
|
||||
// OnEndpointsAdd is called whenever creation of new endpoints object
|
||||
// is observed.
|
||||
func (proxier *metaProxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||
ipFamily, err := endpointsIPFamily(endpoints)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to add endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
|
||||
return
|
||||
}
|
||||
if *ipFamily == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnEndpointsAdd(endpoints)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnEndpointsAdd(endpoints)
|
||||
}
|
||||
|
||||
// OnEndpointsUpdate is called whenever modification of an existing
|
||||
// endpoints object is observed.
|
||||
func (proxier *metaProxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||
ipFamily, err := endpointsIPFamily(endpoints)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to update endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
if *ipFamily == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
|
||||
}
|
||||
|
||||
// OnEndpointsDelete is called whenever deletion of an existing
|
||||
// endpoints object is observed.
|
||||
func (proxier *metaProxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
||||
ipFamily, err := endpointsIPFamily(endpoints)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to delete endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
if *ipFamily == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnEndpointsDelete(endpoints)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnEndpointsDelete(endpoints)
|
||||
}
|
||||
|
||||
// OnEndpointsSynced is called once all the initial event handlers
|
||||
// were called and the state is fully propagated to local cache.
|
||||
func (proxier *metaProxier) OnEndpointsSynced() {
|
||||
proxier.ipv4Proxier.OnEndpointsSynced()
|
||||
proxier.ipv6Proxier.OnEndpointsSynced()
|
||||
}
|
||||
|
||||
// TODO: (khenidak) implement EndpointSlice handling
|
||||
|
||||
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
|
||||
// is observed.
|
||||
func (proxier *metaProxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
|
||||
// noop
|
||||
}
|
||||
|
||||
// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
|
||||
// slice object is observed.
|
||||
func (proxier *metaProxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
|
||||
//noop
|
||||
}
|
||||
|
||||
// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
|
||||
// object is observed.
|
||||
func (proxier *metaProxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
|
||||
//noop
|
||||
}
|
||||
|
||||
// OnEndpointSlicesSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
func (proxier *metaProxier) OnEndpointSlicesSynced() {
|
||||
//noop
|
||||
}
|
||||
|
||||
// endpointsIPFamily that returns IPFamily of endpoints or error if
|
||||
// failed to identify the IP family.
|
||||
func endpointsIPFamily(endpoints *v1.Endpoints) (*v1.IPFamily, error) {
|
||||
if len(endpoints.Subsets) == 0 {
|
||||
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no subsets)")
|
||||
}
|
||||
|
||||
// we only need to work with subset [0],endpoint controller
|
||||
// ensures that endpoints selected are of the same family.
|
||||
subset := endpoints.Subsets[0]
|
||||
if len(subset.Addresses) == 0 {
|
||||
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no addresses)")
|
||||
}
|
||||
// same apply on addresses
|
||||
address := subset.Addresses[0]
|
||||
if len(address.IP) == 0 {
|
||||
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (address has no ip)")
|
||||
}
|
||||
|
||||
ipv4 := v1.IPv4Protocol
|
||||
ipv6 := v1.IPv6Protocol
|
||||
if utilnet.IsIPv6String(address.IP) {
|
||||
return &ipv6, nil
|
||||
}
|
||||
|
||||
return &ipv4, nil
|
||||
}
|
@@ -471,6 +471,62 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
return proxier, nil
|
||||
}
|
||||
|
||||
// NewDualStackProxier returns a new Proxier for dual-stack operation
|
||||
func NewDualStackProxier(
|
||||
ipt [2]utiliptables.Interface,
|
||||
ipvs utilipvs.Interface,
|
||||
ipset utilipset.Interface,
|
||||
sysctl utilsysctl.Interface,
|
||||
exec utilexec.Interface,
|
||||
syncPeriod time.Duration,
|
||||
minSyncPeriod time.Duration,
|
||||
excludeCIDRs []string,
|
||||
strictARP bool,
|
||||
masqueradeAll bool,
|
||||
masqueradeBit int,
|
||||
clusterCIDR [2]string,
|
||||
hostname string,
|
||||
nodeIP [2]net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
scheduler string,
|
||||
nodePortAddresses []string,
|
||||
) (proxy.Provider, error) {
|
||||
|
||||
safeIpset := newSafeIpset(ipset)
|
||||
|
||||
// Create an ipv4 instance of the single-stack proxier
|
||||
ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl,
|
||||
exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
|
||||
masqueradeAll, masqueradeBit, clusterCIDR[0], hostname, nodeIP[0],
|
||||
recorder, healthzServer, scheduler, nodePortAddresses)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
|
||||
}
|
||||
|
||||
ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl,
|
||||
exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
|
||||
masqueradeAll, masqueradeBit, clusterCIDR[1], hostname, nodeIP[1],
|
||||
nil, nil, scheduler, nodePortAddresses)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
|
||||
}
|
||||
|
||||
// Return a meta-proxier that dispatch calls between the two
|
||||
// single-stack proxier instances
|
||||
return NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
|
||||
}
|
||||
|
||||
func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
|
||||
var filteredCIDRs []string
|
||||
for _, cidr := range cidrs {
|
||||
if utilnet.IsIPv6CIDRString(cidr) == wantIPv6 {
|
||||
filteredCIDRs = append(filteredCIDRs, cidr)
|
||||
}
|
||||
}
|
||||
return filteredCIDRs
|
||||
}
|
||||
|
||||
// internal struct for string service information
|
||||
type serviceInfo struct {
|
||||
*proxy.BaseServiceInfo
|
||||
@@ -1478,7 +1534,7 @@ func (proxier *Proxier) writeIptablesRules() {
|
||||
}
|
||||
args = append(args,
|
||||
"-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
|
||||
"-m", "set", "--match-set", set.name,
|
||||
"-m", "set", "--match-set", proxier.ipsetList[set.name].Name,
|
||||
set.matchType,
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", set.to)...)
|
||||
@@ -1489,7 +1545,7 @@ func (proxier *Proxier) writeIptablesRules() {
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
|
||||
"-m", "set", "--match-set", kubeClusterIPSet,
|
||||
"-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name,
|
||||
)
|
||||
if proxier.masqueradeAll {
|
||||
writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...)
|
||||
@@ -1517,7 +1573,7 @@ func (proxier *Proxier) writeIptablesRules() {
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
|
||||
"-m", "set", "--match-set", kubeExternalIPSet,
|
||||
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
|
||||
"dst,dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
@@ -1611,7 +1667,7 @@ func (proxier *Proxier) acceptIPVSTraffic() {
|
||||
}
|
||||
writeLine(proxier.natRules, []string{
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "set", "--match-set", set, matchType,
|
||||
"-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType,
|
||||
"-j", "ACCEPT",
|
||||
}...)
|
||||
}
|
||||
@@ -1845,11 +1901,16 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
||||
}
|
||||
|
||||
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
|
||||
isIPv6 := utilnet.IsIPv6(proxier.nodeIP)
|
||||
for cs := range currentServices {
|
||||
svc := currentServices[cs]
|
||||
if proxier.isIPInExcludeCIDRs(svc.Address) {
|
||||
continue
|
||||
}
|
||||
if utilnet.IsIPv6(svc.Address) != isIPv6 {
|
||||
// Not our family
|
||||
continue
|
||||
}
|
||||
if _, ok := activeServices[cs]; !ok {
|
||||
klog.V(4).Infof("Delete service %s", svc.String())
|
||||
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
|
||||
|
@@ -3716,3 +3716,32 @@ func TestEndpointSliceE2E(t *testing.T) {
|
||||
assert.Nil(t, rsErr2, "Expected no error getting real servers")
|
||||
assert.Len(t, realServers2, 0, "Expected 0 real servers")
|
||||
}
|
||||
func TestFilterCIDRs(t *testing.T) {
|
||||
var cidrList []string
|
||||
var cidrs []string
|
||||
var expected []string
|
||||
cidrs = filterCIDRs(true, []string{})
|
||||
if len(cidrs) > 0 {
|
||||
t.Errorf("An empty list produces a non-empty return %v", cidrs)
|
||||
}
|
||||
|
||||
cidrList = []string{"1000::/64", "10.0.0.0/16", "11.0.0.0/16", "2000::/64"}
|
||||
expected = []string{"1000::/64", "2000::/64"}
|
||||
cidrs = filterCIDRs(true, cidrList)
|
||||
if !reflect.DeepEqual(cidrs, expected) {
|
||||
t.Errorf("cidrs %v is not expected %v", cidrs, expected)
|
||||
}
|
||||
|
||||
expected = []string{"10.0.0.0/16", "11.0.0.0/16"}
|
||||
cidrs = filterCIDRs(false, cidrList)
|
||||
if !reflect.DeepEqual(cidrs, expected) {
|
||||
t.Errorf("cidrs %v is not expected %v", cidrs, expected)
|
||||
}
|
||||
|
||||
cidrList = []string{"1000::/64", "2000::/64"}
|
||||
expected = []string{}
|
||||
cidrs = filterCIDRs(false, cidrList)
|
||||
if len(cidrs) > 0 {
|
||||
t.Errorf("cidrs %v is not expected %v", cidrs, expected)
|
||||
}
|
||||
}
|
||||
|
104
pkg/proxy/ipvs/safe_ipset.go
Normal file
104
pkg/proxy/ipvs/safe_ipset.go
Normal file
@@ -0,0 +1,104 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ipvs
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util/ipset"
|
||||
)
|
||||
|
||||
type safeIpset struct {
|
||||
ipset ipset.Interface
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newSafeIpset(ipset ipset.Interface) ipset.Interface {
|
||||
return &safeIpset{
|
||||
ipset: ipset,
|
||||
}
|
||||
}
|
||||
|
||||
// FlushSet deletes all entries from a named set.
|
||||
func (s *safeIpset) FlushSet(set string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.FlushSet(set)
|
||||
}
|
||||
|
||||
// DestroySet deletes a named set.
|
||||
func (s *safeIpset) DestroySet(set string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.DestroySet(set)
|
||||
}
|
||||
|
||||
// DestroyAllSets deletes all sets.
|
||||
func (s *safeIpset) DestroyAllSets() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.DestroyAllSets()
|
||||
}
|
||||
|
||||
// CreateSet creates a new set. It will ignore error when the set already exists if ignoreExistErr=true.
|
||||
func (s *safeIpset) CreateSet(set *ipset.IPSet, ignoreExistErr bool) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.CreateSet(set, ignoreExistErr)
|
||||
}
|
||||
|
||||
// AddEntry adds a new entry to the named set. It will ignore error when the entry already exists if ignoreExistErr=true.
|
||||
func (s *safeIpset) AddEntry(entry string, set *ipset.IPSet, ignoreExistErr bool) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.AddEntry(entry, set, ignoreExistErr)
|
||||
}
|
||||
|
||||
// DelEntry deletes one entry from the named set
|
||||
func (s *safeIpset) DelEntry(entry string, set string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.DelEntry(entry, set)
|
||||
}
|
||||
|
||||
// Test test if an entry exists in the named set
|
||||
func (s *safeIpset) TestEntry(entry string, set string) (bool, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.TestEntry(entry, set)
|
||||
}
|
||||
|
||||
// ListEntries lists all the entries from a named set
|
||||
func (s *safeIpset) ListEntries(set string) ([]string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.ListEntries(set)
|
||||
}
|
||||
|
||||
// ListSets list all set names from kernel
|
||||
func (s *safeIpset) ListSets() ([]string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.ListSets()
|
||||
}
|
||||
|
||||
// GetVersion returns the "X.Y" version string for ipset.
|
||||
func (s *safeIpset) GetVersion() (string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.ipset.GetVersion()
|
||||
}
|
Reference in New Issue
Block a user