Phase 2: service and endpoint processing

This commit is contained in:
Khaled Henidak(Kal)
2019-08-19 20:45:22 +00:00
parent 5e8ccda71c
commit 93c06821e6
21 changed files with 1656 additions and 247 deletions

View File

@@ -56,10 +56,16 @@ type ServerRunOptions struct {
KubeletConfig kubeletclient.KubeletClientConfig
KubernetesServiceNodePort int
MaxConnectionBytesPerSec int64
ServiceClusterIPRange net.IPNet // TODO: make this a list
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
// ServiceClusterIPRange is mapped to input provided by user
ServiceClusterIPRanges string
//PrimaryServiceClusterIPRange and SecondaryServiceClusterIPRange are the results
// of parsing ServiceClusterIPRange into actual values
PrimaryServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRange net.IPNet
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
@@ -114,7 +120,7 @@ func NewServerRunOptions() *ServerRunOptions {
},
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
}
s.ServiceClusterIPRange = kubeoptions.DefaultServiceIPCIDR
s.ServiceClusterIPRanges = kubeoptions.DefaultServiceIPCIDR.String()
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
@@ -179,7 +185,8 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
"of type NodePort, using this as the value of the port. If zero, the Kubernetes master "+
"service will be of type ClusterIP.")
fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, ""+
// TODO (khenidak) change documentation as we move IPv6DualStack feature from ALPHA to BETA
fs.StringVar(&s.ServiceClusterIPRanges, "service-cluster-ip-range", s.ServiceClusterIPRanges, ""+
"A CIDR notation IP range from which to assign service cluster IPs. This must not "+
"overlap with any IP ranges assigned to nodes for pods.")

View File

@@ -118,7 +118,7 @@ func TestAddFlags(t *testing.T) {
// This is a snapshot of expected options parsed by args.
expected := &ServerRunOptions{
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
ServiceClusterIPRange: kubeoptions.DefaultServiceIPCIDR,
ServiceClusterIPRanges: kubeoptions.DefaultServiceIPCIDR.String(),
MasterCount: 5,
EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType),
AllowPrivileged: false,

View File

@@ -19,26 +19,69 @@ package options
import (
"errors"
"fmt"
"net"
"strings"
apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
utilfeature "k8s.io/apiserver/pkg/util/feature"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/features"
netutils "k8s.io/utils/net"
)
// TODO: Longer term we should read this from some config store, rather than a flag.
// validateClusterIPFlags is expected to be called after Complete()
func validateClusterIPFlags(options *ServerRunOptions) []error {
var errs []error
if options.ServiceClusterIPRange.IP == nil {
errs = append(errs, errors.New("no --service-cluster-ip-range specified"))
// validate that primary has been processed by user provided values or it has been defaulted
if options.PrimaryServiceClusterIPRange.IP == nil {
errs = append(errs, errors.New("--service-cluster-ip-range must contain at least one valid cidr"))
}
var ones, bits = options.ServiceClusterIPRange.Mask.Size()
serviceClusterIPRangeList := strings.Split(options.ServiceClusterIPRanges, ",")
if len(serviceClusterIPRangeList) > 2 {
errs = append(errs, errors.New("--service-cluster-ip-range must not contain more than two entries"))
}
// Complete() expected to have set Primary* and Secondary*
// primary CIDR validation
var ones, bits = options.PrimaryServiceClusterIPRange.Mask.Size()
if bits-ones > 20 {
errs = append(errs, errors.New("specified --service-cluster-ip-range is too large"))
}
// Secondary IP validation
secondaryServiceClusterIPRangeUsed := (options.SecondaryServiceClusterIPRange.IP != nil)
if secondaryServiceClusterIPRangeUsed && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
errs = append(errs, fmt.Errorf("--secondary-service-cluster-ip-range can only be used if %v feature is enabled", string(features.IPv6DualStack)))
}
// note: While the cluster might be dualstack (i.e. pods with multiple IPs), the user may choose
// to only ingress traffic within and into the cluster on one IP family only. this family is decided
// by the range set on --service-cluster-ip-range. If/when the user decides to use dual stack services
// the Secondary* must be of different IPFamily than --service-cluster-ip-range
if secondaryServiceClusterIPRangeUsed {
// Should be dualstack IPFamily(PrimaryServiceClusterIPRange) != IPFamily(SecondaryServiceClusterIPRange)
dualstack, err := netutils.IsDualStackCIDRs([]*net.IPNet{&options.PrimaryServiceClusterIPRange, &options.SecondaryServiceClusterIPRange})
if err != nil {
errs = append(errs, errors.New("error attempting to validate dualstack for --service-cluster-ip-range and --secondary-service-cluster-ip-range"))
}
if !dualstack {
errs = append(errs, errors.New("--service-cluster-ip-range and --secondary-service-cluster-ip-range must be of different IP family"))
}
// should be smallish sized cidr, this thing is kept in etcd
// bigger cidr (specially those offered by IPv6) will add no value
// significantly increase snapshotting time.
var ones, bits = options.SecondaryServiceClusterIPRange.Mask.Size()
if bits-ones > 20 {
errs = append(errs, errors.New("specified --secondary-service-cluster-ip-range is too large"))
}
}
return errs
}

View File

@@ -0,0 +1,132 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"net"
"testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
)
func makeOptionsWithCIDRs(serviceCIDR string, secondaryServiceCIDR string) *ServerRunOptions {
value := serviceCIDR
if len(secondaryServiceCIDR) > 0 {
value = value + "," + secondaryServiceCIDR
}
var primaryCIDR, secondaryCIDR net.IPNet
if len(serviceCIDR) > 0 {
_, cidr, _ := net.ParseCIDR(serviceCIDR)
if cidr != nil {
primaryCIDR = *(cidr)
}
}
if len(secondaryServiceCIDR) > 0 {
_, cidr, _ := net.ParseCIDR(secondaryServiceCIDR)
if cidr != nil {
secondaryCIDR = *(cidr)
}
}
return &ServerRunOptions{
ServiceClusterIPRanges: value,
PrimaryServiceClusterIPRange: primaryCIDR,
SecondaryServiceClusterIPRange: secondaryCIDR,
}
}
func TestClusterSerivceIPRange(t *testing.T) {
testCases := []struct {
name string
options *ServerRunOptions
enableDualStack bool
expectErrors bool
}{
{
name: "no service cidr",
expectErrors: true,
options: makeOptionsWithCIDRs("", ""),
enableDualStack: false,
},
{
name: "only secondary service cidr, dual stack gate on",
expectErrors: true,
options: makeOptionsWithCIDRs("", "10.0.0.0/16"),
enableDualStack: true,
},
{
name: "only secondary service cidr, dual stack gate off",
expectErrors: true,
options: makeOptionsWithCIDRs("", "10.0.0.0/16"),
enableDualStack: false,
},
{
name: "primary and secondary are provided but not dual stack v4-v4",
expectErrors: true,
options: makeOptionsWithCIDRs("10.0.0.0/16", "11.0.0.0/16"),
enableDualStack: true,
},
{
name: "primary and secondary are provided but not dual stack v6-v6",
expectErrors: true,
options: makeOptionsWithCIDRs("2000::/108", "3000::/108"),
enableDualStack: true,
},
{
name: "valid dual stack with gate disabled",
expectErrors: true,
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/108"),
enableDualStack: false,
},
/* success cases */
{
name: "valid primary",
expectErrors: false,
options: makeOptionsWithCIDRs("10.0.0.0/16", ""),
enableDualStack: false,
},
{
name: "valid v4-v6 dual stack + gate on",
expectErrors: false,
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/108"),
enableDualStack: true,
},
{
name: "valid v6-v4 dual stack + gate on",
expectErrors: false,
options: makeOptionsWithCIDRs("3000::/108", "10.0.0.0/16"),
enableDualStack: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
errs := validateClusterIPFlags(tc.options)
if len(errs) > 0 && !tc.expectErrors {
t.Errorf("expected no errors, errors found %+v", errs)
}
if len(errs) == 0 && tc.expectErrors {
t.Errorf("expected errors, no errors found")
}
})
}
}

View File

@@ -306,11 +306,21 @@ func CreateKubeAPIServerConfig(
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
})
serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.PrimaryServiceClusterIPRange)
if lastErr != nil {
return
}
// defaults to empty range and ip
var secondaryServiceIPRange net.IPNet
// process secondary range only if provided by user
if s.SecondaryServiceClusterIPRange.IP != nil {
secondaryServiceIPRange, _, lastErr = master.DefaultServiceIPRange(s.SecondaryServiceClusterIPRange)
if lastErr != nil {
return
}
}
clientCA, lastErr := readCAorNil(s.Authentication.ClientCert.ClientCA)
if lastErr != nil {
return
@@ -341,8 +351,10 @@ func CreateKubeAPIServerConfig(
Tunneler: nodeTunneler,
ServiceIPRange: serviceIPRange,
APIServerServiceIP: apiServerServiceIP,
ServiceIPRange: serviceIPRange,
APIServerServiceIP: apiServerServiceIP,
SecondaryServiceIPRange: secondaryServiceIPRange,
APIServerServicePort: 443,
ServiceNodePortRange: s.ServiceNodePortRange,
@@ -548,11 +560,49 @@ func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing.DeprecatedInsecureServingOptions); err != nil {
return options, err
}
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return options, fmt.Errorf("error determining service IP ranges: %v", err)
// process s.ServiceClusterIPRange from list to Primary and Secondary
// we process secondary only if provided by user
serviceClusterIPRangeList := strings.Split(s.ServiceClusterIPRanges, ",")
var apiServerServiceIP net.IP
var serviceIPRange net.IPNet
var err error
// nothing provided by user, use default range (only applies to the Primary)
if len(serviceClusterIPRangeList) == 0 {
var primaryServiceClusterCIDR net.IPNet
serviceIPRange, apiServerServiceIP, err = master.DefaultServiceIPRange(primaryServiceClusterCIDR)
if err != nil {
return options, fmt.Errorf("error determining service IP ranges: %v", err)
}
s.PrimaryServiceClusterIPRange = serviceIPRange
}
s.ServiceClusterIPRange = serviceIPRange
if len(serviceClusterIPRangeList) > 0 {
_, primaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[0])
if err != nil {
return options, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr")
}
serviceIPRange, apiServerServiceIP, err = master.DefaultServiceIPRange(*(primaryServiceClusterCIDR))
if err != nil {
return options, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err)
}
s.PrimaryServiceClusterIPRange = serviceIPRange
}
// user provided at least two entries
if len(serviceClusterIPRangeList) > 1 {
_, secondaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[1])
if err != nil {
return options, fmt.Errorf("service-cluster-ip-range[1] is not an ip net")
}
s.SecondaryServiceClusterIPRange = *(secondaryServiceClusterCIDR)
}
//note: validation asserts that the list is max of two dual stack entries
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
return options, fmt.Errorf("error creating self-signed certificates: %v", err)
}

View File

@@ -133,8 +133,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
s.SecureServing.ServerCert.FixtureDirectory = path.Join(path.Dir(thisFile), "testdata")
s.ServiceClusterIPRange.IP = net.IPv4(10, 0, 0, 0)
s.ServiceClusterIPRange.Mask = net.CIDRMask(16, 32)
s.ServiceClusterIPRanges = "10.0.0.0/16"
s.Etcd.StorageConfig = *storageConfig
s.APIEnablement.RuntimeConfig.Set("api/all=true")

View File

@@ -45,6 +45,10 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
utilnet "k8s.io/utils/net"
)
const (
@@ -218,6 +222,37 @@ func (e *EndpointController) addPod(obj interface{}) {
}
}
func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return podToEndpointAddress(pod), nil
}
// api-server service controller ensured that the service got the correct IP Family
// according to user setup, here we only need to match EndPoint IPs' family to service
// actual IP family. as in, we don't need to check service.IPFamily
ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP)
for _, podIP := range pod.Status.PodIPs {
ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
// same family?
// TODO (khenidak) when we remove the max of 2 PodIP limit from pods
// we will have to return multiple endpoint addresses
if ipv6ClusterIP == ipv6PodIP {
return &v1.EndpointAddress{
IP: podIP.IP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}, nil
}
}
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
}
func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
return &v1.EndpointAddress{
IP: pod.Status.PodIP,
@@ -244,7 +279,9 @@ func podChanged(oldPod, newPod *v1.Pod) bool {
return true
}
// Convert the pod to an EndpointAddress, clear inert fields,
// and see if they are the same.
// and see if they are the same. Even in a dual stack (multi pod IP) a pod
// will never change just one of its IPs, it will always change all. the below
// comparison to check if a pod has changed will still work
newEndpointAddress := podToEndpointAddress(newPod)
oldEndpointAddress := podToEndpointAddress(oldPod)
// Ignore the ResourceVersion because it changes
@@ -473,7 +510,14 @@ func (e *EndpointController) syncService(key string) error {
continue
}
epa := *podToEndpointAddress(pod)
ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
// such as the case of an upgrade..
klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)
continue
}
epa := *ep
hostname := pod.Spec.Hostname
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {

View File

@@ -31,15 +31,18 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/api/testapi"
endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
)
var alwaysReady = func() bool { return true }
@@ -49,7 +52,7 @@ var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod {
func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod {
p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
@@ -77,14 +80,24 @@ func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
}
if makeDualstack {
p.Status.PodIPs = []v1.PodIP{
{
IP: p.Status.PodIP,
},
{
IP: fmt.Sprintf("2000::%d", id),
},
}
}
return p
}
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) {
for i := 0; i < nPods+nNotReady; i++ {
isReady := i < nPods
pod := testPod(namespace, i, nPorts, isReady)
pod := testPod(namespace, i, nPorts, isReady, makeDualstack)
store.Add(pod)
}
}
@@ -289,7 +302,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -330,7 +343,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -371,7 +384,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -409,7 +422,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -446,7 +459,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 0, 1, 1)
addPods(endpoints.podStore, ns, 0, 1, 1, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -483,7 +496,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 1, 1, 1)
addPods(endpoints.podStore, ns, 1, 1, 1, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -524,7 +537,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -564,7 +577,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0)
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
Spec: v1.ServiceSpec{
@@ -581,8 +594,9 @@ func TestSyncEndpointsItems(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0)
addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found!
addPods(endpoints.podStore, ns, 3, 2, 0, false)
addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found!
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -622,7 +636,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0)
addPods(endpoints.podStore, ns, 3, 2, 0, false)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
@@ -682,7 +696,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
@@ -731,7 +745,8 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -782,7 +797,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -928,7 +943,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
Ports: nil,
},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
@@ -1033,11 +1048,146 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
}
}
}
func TestPodToEndpointAddressForService(t *testing.T) {
testCases := []struct {
name string
expectedEndPointIP string
enableDualStack bool
expectError bool
enableDualStackPod bool
service v1.Service
}{
{
name: "v4 service, in a single stack cluster",
expectedEndPointIP: "1.2.3.4",
enableDualStack: false,
expectError: false,
enableDualStackPod: false,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
},
{
name: "v4 service, in a dual stack cluster",
expectedEndPointIP: "1.2.3.4",
enableDualStack: true,
expectError: false,
enableDualStackPod: true,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
},
{
name: "v6 service, in a dual stack cluster. dual stack enabled",
expectedEndPointIP: "2000::0",
enableDualStack: true,
expectError: false,
enableDualStackPod: true,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
},
// in reality this is a misconfigured cluster
// i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
// we are testing that we will keep producing the expected behavior
{
name: "v6 service, in a v4 only cluster. dual stack disabled",
expectedEndPointIP: "1.2.3.4",
enableDualStack: false,
expectError: false,
enableDualStackPod: false,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
},
{
name: "v6 service, in a v4 only cluster - dual stack enabled",
expectedEndPointIP: "1.2.3.4",
enableDualStack: true,
expectError: true,
enableDualStackPod: false,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod)
pods := podStore.List()
if len(pods) != 1 {
t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
}
pod := pods[0].(*v1.Pod)
epa, err := podToEndpointAddressForService(&tc.service, pod)
if err != nil && !tc.expectError {
t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
}
if err == nil && tc.expectError {
t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
}
if err != nil && tc.expectError {
return
}
if epa.IP != tc.expectedEndPointIP {
t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
}
if *(epa.NodeName) != pod.Spec.NodeName {
t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
}
if epa.TargetRef.Kind != "Pod" {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
}
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
}
if epa.TargetRef.Name != pod.ObjectMeta.Name {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
}
if epa.TargetRef.UID != pod.ObjectMeta.UID {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
}
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
}
})
}
}
func TestPodToEndpointAddress(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0)
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
@@ -1071,7 +1221,7 @@ func TestPodToEndpointAddress(t *testing.T) {
func TestPodChanged(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0)
addPods(podStore, ns, 1, 1, 0, false)
pods := podStore.List()
if len(pods) != 1 {
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
@@ -1102,6 +1252,80 @@ func TestPodChanged(t *testing.T) {
}
newPod.Status.PodIP = oldPod.Status.PodIP
/* dual stack tests */
// primary changes, because changing IPs is done by changing sandbox
// case 1: add new secondrary IP
newPod.Status.PodIP = "1.1.3.1"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
if !podChanged(oldPod, newPod) {
t.Errorf("Expected pod to be changed with adding secondary IP")
}
// reset
newPod.Status.PodIPs = nil
newPod.Status.PodIP = oldPod.Status.PodIP
// case 2: removing a secondary IP
saved := oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
// case 3: change secondary
// case 2: removing a secondary IP
saved = oldPod.Status.PodIP
oldPod.Status.PodIP = "1.1.3.1"
oldPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.1.3.1",
},
{
IP: "2000::1",
},
}
newPod.Status.PodIP = "1.2.3.4"
newPod.Status.PodIPs = []v1.PodIP{
{
IP: "1.2.3.4",
},
{
IP: "2000::2",
},
}
// reset
oldPod.Status.PodIPs = nil
newPod.Status.PodIPs = nil
oldPod.Status.PodIP = saved
newPod.Status.PodIP = saved
/* end dual stack testing */
newPod.ObjectMeta.Name = "wrong-name"
if !podChanged(oldPod, newPod) {
t.Errorf("Expected pod to be changed with pod name change")
@@ -1203,7 +1427,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
@@ -1250,7 +1474,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
@@ -1298,7 +1522,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
}},
})
// Neither pod nor service has trigger time, this should cause annotation to be cleared.
addPods(endpoints.podStore, ns, 1, 1, 0)
addPods(endpoints.podStore, ns, 1, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
@@ -1435,7 +1659,7 @@ func TestPodUpdatesBatching(t *testing.T) {
go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@@ -1568,7 +1792,7 @@ func TestPodAddsBatching(t *testing.T) {
for i, add := range tc.adds {
time.Sleep(add.delay)
p := testPod(ns, i, 1, true)
p := testPod(ns, i, 1, true, false)
endpoints.podStore.Add(p)
endpoints.addPod(p)
}
@@ -1679,7 +1903,7 @@ func TestPodDeleteBatching(t *testing.T) {
go endpoints.Run(1, stopCh)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},

View File

@@ -55,9 +55,12 @@ type Controller struct {
EventClient corev1client.EventsGetter
healthClient rest.Interface
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
SecondaryServiceClusterIPRange net.IPNet
ServiceClusterIPInterval time.Duration
ServiceClusterIPRange net.IPNet
ServiceNodePortRegistry rangeallocation.RangeRegistry
ServiceNodePortInterval time.Duration
@@ -106,8 +109,11 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
SystemNamespaces: systemNamespaces,
SystemNamespacesInterval: 1 * time.Minute,
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceClusterIPInterval: 3 * time.Minute,
ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
@@ -148,7 +154,7 @@ func (c *Controller) Start() {
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// run all of the controllers once prior to returning from Start.

View File

@@ -132,6 +132,13 @@ type ExtraConfig struct {
ServiceIPRange net.IPNet
// The IP address for the GenericAPIServer service (must be inside ServiceIPRange)
APIServerServiceIP net.IP
// dual stack services, the range represents an alternative IP range for service IP
// must be of different family than primary (ServiceIPRange)
SecondaryServiceIPRange net.IPNet
// the secondary IP address the GenericAPIServer service (must be inside SecondaryServiceIPRange)
SecondaryAPIServerServiceIP net.IP
// Port for the apiserver service.
APIServerServicePort int
@@ -323,6 +330,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,

View File

@@ -77,8 +77,10 @@ type LegacyRESTStorageProvider struct {
EventTTL time.Duration
// ServiceIPRange is used to build cluster IPs for discovery.
ServiceIPRange net.IPNet
ServiceNodePortRange utilnet.PortRange
ServiceIPRange net.IPNet
// allocates ips for secondary service cidr in dual stack clusters
SecondaryServiceIPRange net.IPNet
ServiceNodePortRange utilnet.PortRange
ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountMaxExpiration time.Duration
@@ -92,8 +94,9 @@ type LegacyRESTStorageProvider struct {
// master.go for wiring controllers.
// TODO remove this by running the controller as a poststarthook
type LegacyRESTStorage struct {
ServiceClusterIPAllocator rangeallocation.RangeRegistry
ServiceNodePortAllocator rangeallocation.RangeRegistry
ServiceClusterIPAllocator rangeallocation.RangeRegistry
SecondaryServiceClusterIPAllocator rangeallocation.RangeRegistry
ServiceNodePortAllocator rangeallocation.RangeRegistry
}
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
@@ -216,6 +219,26 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
}
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
// allocator for secondary service ip range
var secondaryServiceClusterIPAllocator ipallocator.Interface
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
secondaryServiceClusterIPAllocator, err = ipallocator.NewAllocatorCIDRRange(&c.SecondaryServiceIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
if err != nil {
return nil, err
}
secondaryServiceClusterIPRegistry = etcd
return etcd, nil
})
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
}
restStorage.SecondaryServiceClusterIPAllocator = secondaryServiceClusterIPRegistry
}
var serviceNodePortRegistry rangeallocation.RangeRegistry
serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
@@ -237,7 +260,13 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage, endpointsStorage, podStorage.Pod, serviceClusterIPAllocator, serviceNodePortAllocator, c.ProxyTransport)
serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage,
endpointsStorage,
podStorage.Pod,
serviceClusterIPAllocator,
secondaryServiceClusterIPAllocator,
serviceNodePortAllocator,
c.ProxyTransport)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,

View File

@@ -32,6 +32,7 @@ type Interface interface {
AllocateNext() (net.IP, error)
Release(net.IP) error
ForEach(func(net.IP))
CIDR() net.IPNet
// For testing
Has(ip net.IP) bool

View File

@@ -69,6 +69,12 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != tc.free {
t.Errorf("Test %s unexpected free %d", tc.name, f)
}
rCIDR := r.CIDR()
if rCIDR.String() != tc.cidr {
t.Errorf("allocator returned a different cidr")
}
if f := r.Used(); f != 0 {
t.Errorf("Test %s unexpected used %d", tc.name, f)
}

View File

@@ -34,6 +34,10 @@ import (
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
netutil "k8s.io/utils/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// Repair is a controller loop that periodically examines all service ClusterIP allocations
@@ -54,10 +58,14 @@ import (
type Repair struct {
interval time.Duration
serviceClient corev1client.ServicesGetter
network *net.IPNet
alloc rangeallocation.RangeRegistry
leaks map[string]int // counter per leaked IP
recorder record.EventRecorder
network *net.IPNet
alloc rangeallocation.RangeRegistry
secondaryNetwork *net.IPNet
secondaryAlloc rangeallocation.RangeRegistry
leaks map[string]int // counter per leaked IP
recorder record.EventRecorder
}
// How many times we need to detect a leak before we clean up. This is to
@@ -66,7 +74,7 @@ const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"})
@@ -74,10 +82,14 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
return &Repair{
interval: interval,
serviceClient: serviceClient,
network: network,
alloc: alloc,
leaks: map[string]int{},
recorder: recorder,
network: network,
alloc: alloc,
secondaryNetwork: secondaryNetwork,
secondaryAlloc: secondaryAlloc,
leaks: map[string]int{},
recorder: recorder,
}
}
@@ -95,6 +107,29 @@ func (c *Repair) RunOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.runOnce)
}
// selectAllocForIP returns an allocator for an IP based weather it belongs to the primary or the secondary allocator
func (c *Repair) selectAllocForIP(ip net.IP, primary ipallocator.Interface, secondary ipallocator.Interface) ipallocator.Interface {
if !c.shouldWorkOnSecondary() {
return primary
}
cidr := secondary.CIDR()
if netutil.IsIPv6CIDR(&cidr) && netutil.IsIPv6(ip) {
return secondary
}
return primary
}
// shouldWorkOnSecondary returns true if the repairer should perform work for secondary network (dual stack)
func (c *Repair) shouldWorkOnSecondary() bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
return false
}
return c.secondaryNetwork != nil && c.secondaryNetwork.IP != nil
}
// runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
@@ -107,10 +142,26 @@ func (c *Repair) runOnce() error {
// If etcd server is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and etcd at the same time.
var snapshot *api.RangeAllocation
err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
var secondarySnapshot *api.RangeAllocation
var stored, secondaryStored ipallocator.Interface
var err, secondaryErr error
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
var err error
snapshot, err = c.alloc.Get()
return err == nil, err
if err != nil {
return false, err
}
if c.shouldWorkOnSecondary() {
secondarySnapshot, err = c.secondaryAlloc.Get()
if err != nil {
return false, err
}
}
return true, nil
})
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
@@ -119,10 +170,19 @@ func (c *Repair) runOnce() error {
if snapshot.Range == "" {
snapshot.Range = c.network.String()
}
if c.shouldWorkOnSecondary() && secondarySnapshot.Range == "" {
secondarySnapshot.Range = c.secondaryNetwork.String()
}
// Create an allocator because it is easy to use.
stored, err := ipallocator.NewFromSnapshot(snapshot)
if err != nil {
return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
stored, err = ipallocator.NewFromSnapshot(snapshot)
if c.shouldWorkOnSecondary() {
secondaryStored, secondaryErr = ipallocator.NewFromSnapshot(secondarySnapshot)
}
if err != nil || secondaryErr != nil {
return fmt.Errorf("unable to rebuild allocator from snapshots: %v", err)
}
// We explicitly send no resource version, since the resource version
@@ -135,10 +195,20 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
rebuilt, err := ipallocator.NewCIDRRange(c.network)
var rebuilt, secondaryRebuilt *ipallocator.Range
rebuilt, err = ipallocator.NewCIDRRange(c.network)
if err != nil {
return fmt.Errorf("unable to create CIDR range: %v", err)
}
if c.shouldWorkOnSecondary() {
secondaryRebuilt, err = ipallocator.NewCIDRRange(c.secondaryNetwork)
}
if err != nil {
return fmt.Errorf("unable to create CIDR range: %v", err)
}
// Check every Service's ClusterIP, and rebuild the state as we think it should be.
for _, svc := range list.Items {
if !helper.IsServiceIPSet(&svc) {
@@ -152,12 +222,15 @@ func (c *Repair) runOnce() error {
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
continue
}
// mark it as in-use
switch err := rebuilt.Allocate(ip); err {
actualAlloc := c.selectAllocForIP(ip, rebuilt, secondaryRebuilt)
switch err := actualAlloc.Allocate(ip); err {
case nil:
if stored.Has(ip) {
actualStored := c.selectAllocForIP(ip, stored, secondaryStored)
if actualStored.Has(ip) {
// remove it from the old set, so we can find leaks
stored.Release(ip)
actualStored.Release(ip)
} else {
// cluster IP doesn't seem to be allocated
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP %s is not allocated; repairing", ip)
@@ -174,14 +247,50 @@ func (c *Repair) runOnce() error {
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network))
case ipallocator.ErrFull:
// somehow we are out of IPs
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %s is full; you must widen the CIDR in order to create new services", c.network)
return fmt.Errorf("the service CIDR %s is full; you must widen the CIDR in order to create new services", c.network)
cidr := actualAlloc.CIDR()
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %v is full; you must widen the CIDR in order to create new services", cidr)
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", cidr)
default:
c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP %s due to an unknown error", ip)
return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err)
}
}
c.checkLeaked(stored, rebuilt)
if c.shouldWorkOnSecondary() {
c.checkLeaked(secondaryStored, secondaryRebuilt)
}
// Blast the rebuilt state into storage.
err = c.saveSnapShot(rebuilt, c.alloc, snapshot)
if err != nil {
return err
}
if c.shouldWorkOnSecondary() {
err := c.saveSnapShot(secondaryRebuilt, c.secondaryAlloc, secondarySnapshot)
if err != nil {
return nil
}
}
return nil
}
func (c *Repair) saveSnapShot(rebuilt *ipallocator.Range, alloc rangeallocation.RangeRegistry, snapshot *api.RangeAllocation) error {
if err := rebuilt.Snapshot(snapshot); err != nil {
return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
}
if err := alloc.CreateOrUpdate(snapshot); err != nil {
if errors.IsConflict(err) {
return err
}
return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
}
return nil
}
func (c *Repair) checkLeaked(stored ipallocator.Interface, rebuilt *ipallocator.Range) {
// Check for IPs that are left in the old set. They appear to have been leaked.
stored.ForEach(func(ip net.IP) {
count, found := c.leaks[ip.String()]
@@ -203,15 +312,4 @@ func (c *Repair) runOnce() error {
}
})
// Blast the rebuilt state into storage.
if err := rebuilt.Snapshot(snapshot); err != nil {
return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(snapshot); err != nil {
if errors.IsConflict(err) {
return err
}
return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
}
return nil
}

View File

@@ -27,6 +27,10 @@ import (
"k8s.io/client-go/kubernetes/fake"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
)
type mockRangeRegistry struct {
@@ -56,7 +60,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
}
_, cidr, _ := net.ParseCIDR(ipregistry.item.Range)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
@@ -69,7 +73,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry)
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
@@ -100,7 +104,7 @@ func TestRepairLeak(t *testing.T) {
},
}
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
@@ -176,7 +180,7 @@ func TestRepairWithExisting(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
@@ -191,3 +195,342 @@ func TestRepairWithExisting(t *testing.T) {
t.Errorf("unexpected ipallocator state: %d free", free)
}
}
func makeRangeRegistry(t *testing.T, cidrRange string) *mockRangeRegistry {
_, cidr, _ := net.ParseCIDR(cidrRange)
previous, err := ipallocator.NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
var dst api.RangeAllocation
err = previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
return &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
Range: dst.Range,
Data: dst.Data,
},
}
}
func makeFakeClientSet() *fake.Clientset {
return fake.NewSimpleClientset()
}
func makeIPNet(cidr string) *net.IPNet {
_, net, _ := net.ParseCIDR(cidr)
return net
}
func TestShouldWorkOnSecondary(t *testing.T) {
testCases := []struct {
name string
enableDualStack bool
expectedResult bool
primaryNet *net.IPNet
secondaryNet *net.IPNet
}{
{
name: "not a dual stack, primary only",
enableDualStack: false,
expectedResult: false,
primaryNet: makeIPNet("10.0.0.0/16"),
secondaryNet: nil,
},
{
name: "not a dual stack, primary and secondary provided",
enableDualStack: false,
expectedResult: false,
primaryNet: makeIPNet("10.0.0.0/16"),
secondaryNet: makeIPNet("2000::/120"),
},
{
name: "dual stack, primary only",
enableDualStack: true,
expectedResult: false,
primaryNet: makeIPNet("10.0.0.0/16"),
secondaryNet: nil,
},
{
name: "dual stack, primary and secondary",
enableDualStack: true,
expectedResult: true,
primaryNet: makeIPNet("10.0.0.0/16"),
secondaryNet: makeIPNet("2000::/120"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
fakeClient := makeFakeClientSet()
primaryRegistry := makeRangeRegistry(t, tc.primaryNet.String())
var secondaryRegistery *mockRangeRegistry
if tc.secondaryNet != nil {
secondaryRegistery = makeRangeRegistry(t, tc.secondaryNet.String())
}
repair := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), tc.primaryNet, primaryRegistry, tc.secondaryNet, secondaryRegistery)
if repair.shouldWorkOnSecondary() != tc.expectedResult {
t.Errorf("shouldWorkOnSecondary should be %v and found %v", tc.expectedResult, repair.shouldWorkOnSecondary())
}
})
}
}
func TestRepairDualStack(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)()
fakeClient := fake.NewSimpleClientset()
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
}
secondaryIPRegistry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "2000::/108"},
}
_, cidr, _ := net.ParseCIDR(ipregistry.item.Range)
_, secondaryCIDR, _ := net.ParseCIDR(secondaryIPRegistry.item.Range)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
if !ipregistry.updateCalled || ipregistry.updated == nil || ipregistry.updated.Range != cidr.String() || ipregistry.updated != ipregistry.item {
t.Errorf("unexpected ipregistry: %#v", ipregistry)
}
if !secondaryIPRegistry.updateCalled || secondaryIPRegistry.updated == nil || secondaryIPRegistry.updated.Range != secondaryCIDR.String() || secondaryIPRegistry.updated != secondaryIPRegistry.item {
t.Errorf("unexpected ipregistry: %#v", ipregistry)
}
ipregistry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"),
}
secondaryIPRegistry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "2000::/108"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
}
func TestRepairLeakDualStack(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)()
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous, err := ipallocator.NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
previous.Allocate(net.ParseIP("192.168.1.10"))
_, secondaryCIDR, _ := net.ParseCIDR("2000::/108")
secondaryPrevious, err := ipallocator.NewCIDRRange(secondaryCIDR)
if err != nil {
t.Fatal(err)
}
secondaryPrevious.Allocate(net.ParseIP("2000::1"))
var dst api.RangeAllocation
err = previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
var secondaryDST api.RangeAllocation
err = secondaryPrevious.Snapshot(&secondaryDST)
if err != nil {
t.Fatal(err)
}
fakeClient := fake.NewSimpleClientset()
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
Range: dst.Range,
Data: dst.Data,
},
}
secondaryIPRegistry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
Range: secondaryDST.Range,
Data: secondaryDST.Data,
},
}
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(net.ParseIP("192.168.1.10")) {
t.Errorf("expected ipallocator to still have leaked IP")
}
secondaryAfter, err := ipallocator.NewFromSnapshot(secondaryIPRegistry.updated)
if err != nil {
t.Fatal(err)
}
if !secondaryAfter.Has(net.ParseIP("2000::1")) {
t.Errorf("expected ipallocator to still have leaked IP")
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err != nil {
t.Fatal(err)
}
if after.Has(net.ParseIP("192.168.1.10")) {
t.Errorf("expected ipallocator to not have leaked IP")
}
secondaryAfter, err := ipallocator.NewFromSnapshot(secondaryIPRegistry.updated)
if err != nil {
t.Fatal(err)
}
if secondaryAfter.Has(net.ParseIP("2000::1")) {
t.Errorf("expected ipallocator to not have leaked IP")
}
}
func TestRepairWithExistingDualStack(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)()
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous, err := ipallocator.NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
_, secondaryCIDR, _ := net.ParseCIDR("2000::/108")
secondaryPrevious, err := ipallocator.NewCIDRRange(secondaryCIDR)
if err != nil {
t.Fatal(err)
}
var dst api.RangeAllocation
err = previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
var secondaryDST api.RangeAllocation
err = secondaryPrevious.Snapshot(&secondaryDST)
if err != nil {
t.Fatal(err)
}
fakeClient := fake.NewSimpleClientset(
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"},
},
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one-v6"},
Spec: corev1.ServiceSpec{ClusterIP: "2000::1"},
},
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.100"},
},
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two-6"},
Spec: corev1.ServiceSpec{ClusterIP: "2000::2"},
},
&corev1.Service{ // outside CIDR, will be dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.0.1"},
},
&corev1.Service{ // outside CIDR, will be dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three-v6"},
Spec: corev1.ServiceSpec{ClusterIP: "3000::1"},
},
&corev1.Service{ // empty, ignored
ObjectMeta: metav1.ObjectMeta{Namespace: "four", Name: "four"},
Spec: corev1.ServiceSpec{ClusterIP: ""},
},
&corev1.Service{ // duplicate, dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five"},
Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"},
},
&corev1.Service{ // duplicate, dropped
ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five-v6"},
Spec: corev1.ServiceSpec{ClusterIP: "2000::2"},
},
&corev1.Service{ // headless
ObjectMeta: metav1.ObjectMeta{Namespace: "six", Name: "six"},
Spec: corev1.ServiceSpec{ClusterIP: "None"},
},
)
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
Range: dst.Range,
Data: dst.Data,
},
}
secondaryIPRegistry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
Range: secondaryDST.Range,
Data: secondaryDST.Data,
},
}
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) {
t.Errorf("unexpected ipallocator state: %#v", after)
}
if free := after.Free(); free != 252 {
t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 252)", free)
}
secondaryAfter, err := ipallocator.NewFromSnapshot(secondaryIPRegistry.updated)
if err != nil {
t.Fatal(err)
}
if !secondaryAfter.Has(net.ParseIP("2000::1")) || !secondaryAfter.Has(net.ParseIP("2000::2")) {
t.Errorf("unexpected ipallocator state: %#v", secondaryAfter)
}
if free := secondaryAfter.Free(); free != 65532 {
t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 65532)", free)
}
}

View File

@@ -46,16 +46,22 @@ import (
registry "k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
netutil "k8s.io/utils/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
services ServiceStorage
endpoints EndpointsStorage
serviceIPs ipallocator.Interface
serviceNodePorts portallocator.Interface
proxyTransport http.RoundTripper
pods rest.Getter
services ServiceStorage
endpoints EndpointsStorage
serviceIPs ipallocator.Interface
secondaryServiceIPs ipallocator.Interface
defaultServiceIPFamily api.IPFamily
serviceNodePorts portallocator.Interface
proxyTransport http.RoundTripper
pods rest.Getter
}
// ServiceNodePort includes protocol and port number of a service NodePort.
@@ -94,16 +100,29 @@ func NewREST(
endpoints EndpointsStorage,
pods rest.Getter,
serviceIPs ipallocator.Interface,
secondaryServiceIPs ipallocator.Interface,
serviceNodePorts portallocator.Interface,
proxyTransport http.RoundTripper,
) (*REST, *registry.ProxyREST) {
// detect this cluster default Service IPFamily (ipfamily of --service-cluster-ip-range)
// we do it once here, to avoid having to do it over and over during ipfamily assignment
serviceIPFamily := api.IPv4Protocol
cidr := serviceIPs.CIDR()
if netutil.IsIPv6CIDR(&cidr) {
serviceIPFamily = api.IPv6Protocol
}
klog.V(0).Infof("the default service ipfamily for this cluster is: %s", string(serviceIPFamily))
rest := &REST{
services: services,
endpoints: endpoints,
serviceIPs: serviceIPs,
serviceNodePorts: serviceNodePorts,
proxyTransport: proxyTransport,
pods: pods,
services: services,
endpoints: endpoints,
serviceIPs: serviceIPs,
secondaryServiceIPs: secondaryServiceIPs,
serviceNodePorts: serviceNodePorts,
defaultServiceIPFamily: serviceIPFamily,
proxyTransport: proxyTransport,
pods: pods,
}
return rest, &registry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport}
}
@@ -160,6 +179,11 @@ func (rs *REST) Export(ctx context.Context, name string, opts metav1.ExportOptio
func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
service := obj.(*api.Service)
// set the service ip family, if it was not already set
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && service.Spec.IPFamily == nil {
service.Spec.IPFamily = &rs.defaultServiceIPFamily
}
if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil {
return nil, err
}
@@ -169,7 +193,8 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
defer func() {
if releaseServiceIP {
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
allocator := rs.getAllocatorByClusterIP(service)
allocator.Release(net.ParseIP(service.Spec.ClusterIP))
}
}
}()
@@ -177,7 +202,8 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
var err error
if !dryrun.IsDryRun(options.DryRun) {
if service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
allocator := rs.getAllocatorBySpec(service)
if releaseServiceIP, err = initClusterIP(service, allocator); err != nil {
return nil, err
}
}
@@ -256,7 +282,8 @@ func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.Val
func (rs *REST) releaseAllocatedResources(svc *api.Service) {
if helper.IsServiceIPSet(svc) {
rs.serviceIPs.Release(net.ParseIP(svc.Spec.ClusterIP))
allocator := rs.getAllocatorByClusterIP(svc)
allocator.Release(net.ParseIP(svc.Spec.ClusterIP))
}
for _, nodePort := range collectServiceNodePorts(svc) {
@@ -365,6 +392,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
}
service := obj.(*api.Service)
if !rest.ValidNamespace(ctx, &service.ObjectMeta) {
return nil, false, errors.NewConflict(api.Resource("services"), service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context"))
}
@@ -379,7 +407,8 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
defer func() {
if releaseServiceIP {
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
allocator := rs.getAllocatorByClusterIP(service)
allocator.Release(net.ParseIP(service.Spec.ClusterIP))
}
}
}()
@@ -389,15 +418,19 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
if !dryrun.IsDryRun(options.DryRun) {
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
// Since we don't support changing the ip family of a service we don't need to handle
// oldService.Spec.ServiceIPFamily != service.Spec.ServiceIPFamily
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
allocator := rs.getAllocatorBySpec(service)
if releaseServiceIP, err = initClusterIP(service, allocator); err != nil {
return nil, false, err
}
}
// Update service from non-ExternalName to ExternalName, should release ClusterIP if exists.
if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName {
if helper.IsServiceIPSet(oldService) {
rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP))
allocator := rs.getAllocatorByClusterIP(service)
allocator.Release(net.ParseIP(oldService.Spec.ClusterIP))
}
}
}
@@ -521,6 +554,35 @@ func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableO
return r.services.ConvertToTable(ctx, object, tableOptions)
}
// When allocating we always use BySpec, when releasing we always use ByClusterIP
func (r *REST) getAllocatorByClusterIP(service *api.Service) ipallocator.Interface {
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) || r.secondaryServiceIPs == nil {
return r.serviceIPs
}
secondaryAllocatorCIDR := r.secondaryServiceIPs.CIDR()
if netutil.IsIPv6String(service.Spec.ClusterIP) && netutil.IsIPv6CIDR(&secondaryAllocatorCIDR) {
return r.secondaryServiceIPs
}
return r.serviceIPs
}
func (r *REST) getAllocatorBySpec(service *api.Service) ipallocator.Interface {
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) ||
service.Spec.IPFamily == nil ||
r.secondaryServiceIPs == nil {
return r.serviceIPs
}
secondaryAllocatorCIDR := r.secondaryServiceIPs.CIDR()
if *(service.Spec.IPFamily) == api.IPv6Protocol && netutil.IsIPv6CIDR(&secondaryAllocatorCIDR) {
return r.secondaryServiceIPs
}
return r.serviceIPs
}
func isValidAddress(ctx context.Context, addr *api.EndpointAddress, pods rest.Getter) error {
if addr.TargetRef == nil {
return fmt.Errorf("Address has no target ref, skipping: %v", addr)
@@ -603,11 +665,11 @@ func allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator
}
// The return bool value indicates if a cluster IP is allocated successfully.
func initClusterIP(service *api.Service, serviceIPs ipallocator.Interface) (bool, error) {
func initClusterIP(service *api.Service, allocator ipallocator.Interface) (bool, error) {
switch {
case service.Spec.ClusterIP == "":
// Allocate next available.
ip, err := serviceIPs.AllocateNext()
ip, err := allocator.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
@@ -618,7 +680,7 @@ func initClusterIP(service *api.Service, serviceIPs ipallocator.Interface) (bool
return true, nil
case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "":
// Try to respect the requested IP.
if err := serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
if err := allocator.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package storage
import (
"bytes"
"context"
"net"
"reflect"
@@ -41,12 +42,15 @@ import (
"k8s.io/kubernetes/pkg/api/service"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"k8s.io/kubernetes/pkg/registry/registrytest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
)
// TODO(wojtek-t): Cleanup this file.
@@ -167,11 +171,11 @@ func generateRandomNodePort() int32 {
return int32(rand.IntnRange(30001, 30999))
}
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) {
return NewTestRESTWithPods(t, endpoints, nil)
func NewTestREST(t *testing.T, endpoints *api.EndpointsList, dualStack bool) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) {
return NewTestRESTWithPods(t, endpoints, nil, dualStack)
}
func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) {
func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList, dualStack bool) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
serviceStorage := &serviceStorage{}
@@ -216,6 +220,13 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P
if err != nil {
t.Fatalf("cannot create CIDR Range %v", err)
}
var rSecondary ipallocator.Interface
if dualStack {
rSecondary, err = ipallocator.NewCIDRRange(makeIPNet6(t))
if err != nil {
t.Fatalf("cannot create CIDR Range(secondary) %v", err)
}
}
portRange := utilnet.PortRange{Base: 30000, Size: 1000}
portAllocator, err := portallocator.NewPortAllocator(portRange)
@@ -223,7 +234,7 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P
t.Fatalf("cannot create port allocator %v", err)
}
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, portAllocator, nil)
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, rSecondary, portAllocator, nil)
return rest, serviceStorage, server
}
@@ -235,6 +246,27 @@ func makeIPNet(t *testing.T) *net.IPNet {
}
return net
}
func makeIPNet6(t *testing.T) *net.IPNet {
_, net, err := net.ParseCIDR("2000::/108")
if err != nil {
t.Error(err)
}
return net
}
func ipnetGet(t *testing.T, secondary bool) *net.IPNet {
if secondary {
return makeIPNet6(t)
}
return makeIPNet(t)
}
func allocGet(r *REST, secondary bool) ipallocator.Interface {
if secondary {
return r.secondaryServiceIPs
}
return r.serviceIPs
}
func releaseServiceNodePorts(t *testing.T, ctx context.Context, svcName string, rest *REST, registry ServiceStorage) {
obj, err := registry.Get(ctx, svcName, &metav1.GetOptions{})
@@ -256,90 +288,214 @@ func releaseServiceNodePorts(t *testing.T, ctx context.Context, svcName string,
}
func TestServiceRegistryCreate(t *testing.T) {
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
ipv4Service := api.IPv4Protocol
ipv6Service := api.IPv6Protocol
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
testCases := []struct {
svc *api.Service
name string
enableDualStack bool
useSecondary bool
}{
{
name: "Service IPFamily default cluster dualstack:off",
enableDualStack: false,
useSecondary: false,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
},
{
name: "Service IPFamily:v4 dualstack off",
enableDualStack: false,
useSecondary: false,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv4Service,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
},
{
name: "Service IPFamily:v4 dualstack on",
enableDualStack: true,
useSecondary: false,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv4Service,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
},
{
name: "Service IPFamily:v6 dualstack on",
enableDualStack: true,
useSecondary: true,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv6Service,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
},
}
ctx := genericapirequest.NewDefaultContext()
created_svc, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
created_service := created_svc.(*api.Service)
objMeta, err := meta.Accessor(created_service)
if err != nil {
t.Fatal(err)
}
if !metav1.HasObjectMetaSystemFieldValues(objMeta) {
t.Errorf("storage did not populate object meta field values")
}
if created_service.Name != "foo" {
t.Errorf("Expected foo, but got %v", created_service.Name)
}
if created_service.CreationTimestamp.IsZero() {
t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp)
}
if !makeIPNet(t).Contains(net.ParseIP(created_service.Spec.ClusterIP)) {
t.Errorf("Unexpected ClusterIP: %s", created_service.Spec.ClusterIP)
}
srv, err := registry.GetService(ctx, svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv == nil {
t.Errorf("Failed to find service: %s", svc.Name)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
storage, registry, server := NewTestREST(t, nil, tc.enableDualStack)
defer server.Terminate(t)
ctx := genericapirequest.NewDefaultContext()
created_svc, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
created_service := created_svc.(*api.Service)
objMeta, err := meta.Accessor(created_service)
if err != nil {
t.Fatal(err)
}
if !metav1.HasObjectMetaSystemFieldValues(objMeta) {
t.Errorf("storage did not populate object meta field values")
}
if created_service.Name != "foo" {
t.Errorf("Expected foo, but got %v", created_service.Name)
}
if created_service.CreationTimestamp.IsZero() {
t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp)
}
allocNet := ipnetGet(t, tc.useSecondary)
if !allocNet.Contains(net.ParseIP(created_service.Spec.ClusterIP)) {
t.Errorf("Unexpected ClusterIP: %s", created_service.Spec.ClusterIP)
}
srv, err := registry.GetService(ctx, tc.svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv == nil {
t.Errorf("Failed to find service: %s", tc.svc.Name)
}
})
}
}
func TestServiceRegistryCreateDryRun(t *testing.T) {
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
// Test dry run create request with cluster ip
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
ipv6Service := api.IPv6Protocol
testCases := []struct {
name string
svc *api.Service
enableDualStack bool
useSecondary bool
}{
{
name: "v4 service",
enableDualStack: false,
useSecondary: false,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
},
{
name: "v6 service",
enableDualStack: true,
useSecondary: true,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv6Service,
ClusterIP: "2000:0:0:0:0:0:0:1",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
},
}
ctx := genericapirequest.NewDefaultContext()
_, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if storage.serviceIPs.Has(net.ParseIP("1.2.3.4")) {
t.Errorf("unexpected side effect: ip allocated")
}
srv, err := registry.GetService(ctx, svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv != nil {
t.Errorf("unexpected service found: %v", srv)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
storage, registry, server := NewTestREST(t, nil, tc.enableDualStack)
defer server.Terminate(t)
ctx := genericapirequest.NewDefaultContext()
_, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
alloc := allocGet(storage, tc.useSecondary)
if alloc.Has(net.ParseIP(tc.svc.Spec.ClusterIP)) {
t.Errorf("unexpected side effect: ip allocated")
}
srv, err := registry.GetService(ctx, tc.svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv != nil {
t.Errorf("unexpected service found: %v", srv)
}
})
}
}
func TestDryRunNodePort(t *testing.T) {
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
// Test dry run create request with a node port
svc = &api.Service{
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
@@ -353,14 +509,16 @@ func TestServiceRegistryCreateDryRun(t *testing.T) {
}},
},
}
_, err = storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
ctx := genericapirequest.NewDefaultContext()
_, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if storage.serviceNodePorts.Has(30010) {
t.Errorf("unexpected side effect: NodePort allocated")
}
srv, err = registry.GetService(ctx, svc.Name, &metav1.GetOptions{})
srv, err := registry.GetService(ctx, svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -452,7 +610,8 @@ func TestServiceRegistryCreateDryRun(t *testing.T) {
}
func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) {
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
testCases := []struct {
@@ -582,7 +741,7 @@ func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) {
}
func TestServiceStorageValidatesCreate(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
failureCases := map[string]api.Service{
"empty ID": {
@@ -636,7 +795,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) {
func TestServiceRegistryUpdate(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
obj, err := registry.Create(ctx, &api.Service{
@@ -688,8 +847,9 @@ func TestServiceRegistryUpdate(t *testing.T) {
}
func TestServiceRegistryUpdateDryRun(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
obj, err := registry.Create(ctx, &api.Service{
@@ -854,7 +1014,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) {
func TestServiceStorageValidatesUpdate(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
@@ -907,7 +1067,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
func TestServiceRegistryExternalService(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
@@ -946,7 +1106,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
func TestServiceRegistryDelete(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
@@ -969,7 +1129,7 @@ func TestServiceRegistryDelete(t *testing.T) {
func TestServiceRegistryDeleteDryRun(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
// Test dry run delete request with cluster ip
@@ -1035,7 +1195,7 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) {
func TestServiceRegistryDeleteExternal(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
@@ -1058,7 +1218,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
func TestServiceRegistryUpdateExternalService(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
// Create non-external load balancer.
@@ -1097,7 +1257,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) {
func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
// Create external load balancer.
@@ -1135,7 +1295,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) {
func TestServiceRegistryGet(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
@@ -1211,7 +1371,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
},
},
}
storage, registry, server := NewTestRESTWithPods(t, endpoints, pods)
storage, registry, server := NewTestRESTWithPods(t, endpoints, pods, false)
defer server.Terminate(t)
for _, name := range []string{"foo", "bad"} {
registry.Create(ctx, &api.Service{
@@ -1311,7 +1471,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
func TestServiceRegistryList(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
@@ -1343,7 +1503,7 @@ func TestServiceRegistryList(t *testing.T) {
}
func TestServiceRegistryIPAllocation(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc1 := &api.Service{
@@ -1426,7 +1586,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) {
}
func TestServiceRegistryIPReallocation(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc1 := &api.Service{
@@ -1482,7 +1642,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) {
}
func TestServiceRegistryIPUpdate(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
@@ -1537,7 +1697,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) {
}
func TestServiceRegistryIPLoadBalancer(t *testing.T) {
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
@@ -1577,7 +1737,7 @@ func TestServiceRegistryIPLoadBalancer(t *testing.T) {
}
func TestUpdateServiceWithConflictingNamespace(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
service := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "not-default"},
@@ -1599,7 +1759,7 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) {
// and type is LoadBalancer.
func TestServiceRegistryExternalTrafficHealthCheckNodePortAllocation(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"},
@@ -1639,7 +1799,7 @@ func TestServiceRegistryExternalTrafficHealthCheckNodePortAllocation(t *testing.
func TestServiceRegistryExternalTrafficHealthCheckNodePortUserAllocation(t *testing.T) {
randomNodePort := generateRandomNodePort()
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"},
@@ -1682,7 +1842,7 @@ func TestServiceRegistryExternalTrafficHealthCheckNodePortUserAllocation(t *test
// Validate that the service creation fails when the requested port number is -1.
func TestServiceRegistryExternalTrafficHealthCheckNodePortNegative(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"},
@@ -1709,7 +1869,7 @@ func TestServiceRegistryExternalTrafficHealthCheckNodePortNegative(t *testing.T)
// Validate that the health check nodePort is not allocated when ExternalTrafficPolicy is set to Global.
func TestServiceRegistryExternalTrafficGlobal(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
storage, registry, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"},
@@ -1745,13 +1905,17 @@ func TestServiceRegistryExternalTrafficGlobal(t *testing.T) {
}
func TestInitClusterIP(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
defer server.Terminate(t)
ipv4Service := api.IPv4Protocol
ipv6Service := api.IPv6Protocol
testCases := []struct {
name string
svc *api.Service
expectClusterIP bool
name string
svc *api.Service
expectClusterIP bool
enableDualStack bool
allocateSpecificIP bool
useSecondaryAlloc bool
expectedAllocatedIP string
}{
{
name: "Allocate new ClusterIP",
@@ -1769,6 +1933,27 @@ func TestInitClusterIP(t *testing.T) {
},
},
expectClusterIP: true,
enableDualStack: false,
},
{
name: "Allocate new ClusterIP-v6",
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv6Service,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
expectClusterIP: true,
useSecondaryAlloc: true,
enableDualStack: true,
},
{
name: "Allocate specified ClusterIP",
@@ -1778,6 +1963,7 @@ func TestInitClusterIP(t *testing.T) {
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv4Service,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
@@ -1786,7 +1972,33 @@ func TestInitClusterIP(t *testing.T) {
}},
},
},
expectClusterIP: true,
expectClusterIP: true,
allocateSpecificIP: true,
expectedAllocatedIP: "1.2.3.4",
enableDualStack: true,
},
{
name: "Allocate specified ClusterIP-v6",
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv6Service,
ClusterIP: "2000:0:0:0:0:0:0:1",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
expectClusterIP: true,
allocateSpecificIP: true,
expectedAllocatedIP: "2000:0:0:0:0:0:0:1",
useSecondaryAlloc: true,
enableDualStack: true,
},
{
name: "Shouldn't allocate ClusterIP",
@@ -1809,35 +2021,41 @@ func TestInitClusterIP(t *testing.T) {
}
for _, test := range testCases {
hasAllocatedIP, err := initClusterIP(test.svc, storage.serviceIPs)
if err != nil {
t.Errorf("%q: unexpected error: %v", test.name, err)
}
t.Run(test.name, func(t *testing.T) {
if hasAllocatedIP != test.expectClusterIP {
t.Errorf("%q: expected %v, but got %v", test.name, test.expectClusterIP, hasAllocatedIP)
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, test.enableDualStack)()
if test.expectClusterIP {
if !storage.serviceIPs.Has(net.ParseIP(test.svc.Spec.ClusterIP)) {
t.Errorf("%q: unexpected ClusterIP %q, out of range", test.name, test.svc.Spec.ClusterIP)
storage, _, server := NewTestREST(t, nil, test.enableDualStack)
defer server.Terminate(t)
whichAlloc := allocGet(storage, test.useSecondaryAlloc)
hasAllocatedIP, err := initClusterIP(test.svc, whichAlloc)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
if test.name == "Allocate specified ClusterIP" && test.svc.Spec.ClusterIP != "1.2.3.4" {
t.Errorf("%q: expected ClusterIP %q, but got %q", test.name, "1.2.3.4", test.svc.Spec.ClusterIP)
}
if hasAllocatedIP {
if helper.IsServiceIPSet(test.svc) {
storage.serviceIPs.Release(net.ParseIP(test.svc.Spec.ClusterIP))
if hasAllocatedIP != test.expectClusterIP {
t.Errorf("expected %v, but got %v", test.expectClusterIP, hasAllocatedIP)
}
}
if test.expectClusterIP {
alloc := allocGet(storage, test.useSecondaryAlloc)
if !alloc.Has(net.ParseIP(test.svc.Spec.ClusterIP)) {
t.Errorf("unexpected ClusterIP %q, out of range", test.svc.Spec.ClusterIP)
}
}
if test.allocateSpecificIP && test.expectedAllocatedIP != test.svc.Spec.ClusterIP {
t.Errorf(" expected ClusterIP %q, but got %q", test.expectedAllocatedIP, test.svc.Spec.ClusterIP)
}
})
}
}
func TestInitNodePorts(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false)
defer nodePortOp.Finish()
@@ -2019,7 +2237,7 @@ func TestInitNodePorts(t *testing.T) {
}
func TestUpdateNodePorts(t *testing.T) {
storage, _, server := NewTestREST(t, nil)
storage, _, server := NewTestREST(t, nil, false)
defer server.Terminate(t)
nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false)
defer nodePortOp.Finish()
@@ -2287,3 +2505,142 @@ func TestUpdateNodePorts(t *testing.T) {
}
}
}
func TestAllocGetters(t *testing.T) {
ipv4Service := api.IPv4Protocol
ipv6Service := api.IPv6Protocol
testCases := []struct {
name string
enableDualStack bool
specExpctPrimary bool
clusterIPExpectPrimary bool
svc *api.Service
}{
{
name: "spec:v4 ip:v4 dualstack:off",
specExpctPrimary: true,
clusterIPExpectPrimary: true,
enableDualStack: false,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv4Service,
ClusterIP: "10.0.0.1",
},
},
},
{
name: "spec:v4 ip:v4 dualstack:on",
specExpctPrimary: true,
clusterIPExpectPrimary: true,
enableDualStack: true,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv4Service,
ClusterIP: "10.0.0.1",
},
},
},
{
name: "spec:v4 ip:v6 dualstack:on",
specExpctPrimary: true,
clusterIPExpectPrimary: false,
enableDualStack: true,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv4Service,
ClusterIP: "2000::1",
},
},
},
{
name: "spec:v6 ip:v6 dualstack:on",
specExpctPrimary: false,
clusterIPExpectPrimary: false,
enableDualStack: true,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv6Service,
ClusterIP: "2000::1",
},
},
},
{
name: "spec:v6 ip:v4 dualstack:on",
specExpctPrimary: false,
clusterIPExpectPrimary: true,
enableDualStack: true,
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
Type: api.ServiceTypeClusterIP,
IPFamily: &ipv6Service,
ClusterIP: "10.0.0.10",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
storage, _, server := NewTestREST(t, nil, tc.enableDualStack)
defer server.Terminate(t)
if tc.enableDualStack && storage.secondaryServiceIPs == nil {
t.Errorf("storage must allocate secondary ServiceIPs allocator for dual stack")
return
}
alloc := storage.getAllocatorByClusterIP(tc.svc)
if tc.clusterIPExpectPrimary && !bytes.Equal(alloc.CIDR().IP, storage.serviceIPs.CIDR().IP) {
t.Errorf("expected primary allocator, but primary allocator was not selected")
return
}
if tc.enableDualStack && !tc.clusterIPExpectPrimary && !bytes.Equal(alloc.CIDR().IP, storage.secondaryServiceIPs.CIDR().IP) {
t.Errorf("expected secondary allocator, but secondary allocator was not selected")
}
alloc = storage.getAllocatorBySpec(tc.svc)
if tc.specExpctPrimary && !bytes.Equal(alloc.CIDR().IP, storage.serviceIPs.CIDR().IP) {
t.Errorf("expected primary allocator, but primary allocator was not selected")
return
}
if tc.enableDualStack && !tc.specExpctPrimary && !bytes.Equal(alloc.CIDR().IP, storage.secondaryServiceIPs.CIDR().IP) {
t.Errorf("expected secondary allocator, but secondary allocator was not selected")
}
})
}
}

View File

@@ -53,7 +53,7 @@ func (a *APIServer) Start() error {
if err != nil {
return err
}
o.ServiceClusterIPRange = *ipnet
o.ServiceClusterIPRanges = ipnet.String()
o.AllowPrivileged = true
o.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"}
errCh := make(chan error)

View File

@@ -76,7 +76,7 @@ func StartRealMasterOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOp
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.ServiceClusterIPRanges = defaultServiceClusterIPRange.String()
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
kubeAPIServerOptions.APIEnablement.RuntimeConfig["api/all"] = "true"

View File

@@ -101,7 +101,7 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.ServiceClusterIPRanges = defaultServiceClusterIPRange.String()
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}

View File

@@ -92,7 +92,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry")
kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.ServiceClusterIPRanges = defaultServiceClusterIPRange.String()
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}