Implements Service Internal Traffic Policy
1. Add API definitions; 2. Add feature gate and drops the field when feature gate is not on; 3. Set default values for the field; 4. Add API Validation 5. add kube-proxy iptables and ipvs implementations 6. add tests
This commit is contained in:
@@ -3475,6 +3475,19 @@ const (
|
||||
ServiceTypeExternalName ServiceType = "ExternalName"
|
||||
)
|
||||
|
||||
// ServiceInternalTrafficPolicyType describes the type of traffic routing for
|
||||
// internal traffic
|
||||
type ServiceInternalTrafficPolicyType string
|
||||
|
||||
const (
|
||||
// ServiceInternalTrafficPolicyCluster routes traffic to all endpoints
|
||||
ServiceInternalTrafficPolicyCluster ServiceInternalTrafficPolicyType = "Cluster"
|
||||
|
||||
// ServiceInternalTrafficPolicyLocal only routes to node-local
|
||||
// endpoints, otherwise drops the traffic
|
||||
ServiceInternalTrafficPolicyLocal ServiceInternalTrafficPolicyType = "Local"
|
||||
)
|
||||
|
||||
// ServiceExternalTrafficPolicyType string
|
||||
type ServiceExternalTrafficPolicyType string
|
||||
|
||||
@@ -3739,9 +3752,19 @@ type ServiceSpec struct {
|
||||
// implementation (e.g. cloud providers) should ignore Services that set this field.
|
||||
// This field can only be set when creating or updating a Service to type 'LoadBalancer'.
|
||||
// Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.
|
||||
// featureGate=LoadBalancerClass
|
||||
// +featureGate=LoadBalancerClass
|
||||
// +optional
|
||||
LoadBalancerClass *string
|
||||
|
||||
// InternalTrafficPolicy specifies if the cluster internal traffic
|
||||
// should be routed to all endpoints or node-local endpoints only.
|
||||
// "Cluster" routes internal traffic to a Service to all endpoints.
|
||||
// "Local" routes traffic to node-local endpoints only, traffic is
|
||||
// dropped if no node-local endpoints are ready.
|
||||
// The default value is "Cluster".
|
||||
// +featureGate=ServiceInternalTrafficPolicy
|
||||
// +optional
|
||||
InternalTrafficPolicy *ServiceInternalTrafficPolicyType
|
||||
}
|
||||
|
||||
// ServicePort represents the port on which the service is exposed
|
||||
|
@@ -131,6 +131,11 @@ func SetDefaults_Service(obj *v1.Service) {
|
||||
obj.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && obj.Spec.InternalTrafficPolicy == nil {
|
||||
serviceInternalTrafficPolicyCluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
obj.Spec.InternalTrafficPolicy = &serviceInternalTrafficPolicyCluster
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceLBNodePortControl) {
|
||||
if obj.Spec.Type == v1.ServiceTypeLoadBalancer {
|
||||
if obj.Spec.AllocateLoadBalancerNodePorts == nil {
|
||||
|
@@ -29,8 +29,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
||||
// ensure types are installed
|
||||
@@ -1798,3 +1801,64 @@ func TestSetDefaultEnableServiceLinks(t *testing.T) {
|
||||
t.Errorf("Expected enableServiceLinks value: %+v\ngot: %+v\n", v1.DefaultEnableServiceLinks, *output.Spec.EnableServiceLinks)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDefaultServiceInternalTrafficPolicy(t *testing.T) {
|
||||
cluster := v1.ServiceInternalTrafficPolicyCluster
|
||||
local := v1.ServiceInternalTrafficPolicyLocal
|
||||
testCases := []struct {
|
||||
name string
|
||||
expectedInternalTrafficPolicy v1.ServiceInternalTrafficPolicyType
|
||||
svc v1.Service
|
||||
featureGateOn bool
|
||||
}{
|
||||
{
|
||||
name: "must set default internalTrafficPolicy",
|
||||
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyCluster,
|
||||
svc: v1.Service{},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "must not set default internalTrafficPolicy when it's cluster",
|
||||
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyCluster,
|
||||
svc: v1.Service{
|
||||
Spec: v1.ServiceSpec{
|
||||
InternalTrafficPolicy: &cluster,
|
||||
},
|
||||
},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "must not set default internalTrafficPolicy when it's local",
|
||||
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyLocal,
|
||||
svc: v1.Service{
|
||||
Spec: v1.ServiceSpec{
|
||||
InternalTrafficPolicy: &local,
|
||||
},
|
||||
},
|
||||
featureGateOn: true,
|
||||
},
|
||||
{
|
||||
name: "must not set default internalTrafficPolicy when gate is disabled",
|
||||
expectedInternalTrafficPolicy: "",
|
||||
svc: v1.Service{},
|
||||
featureGateOn: false,
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, test.featureGateOn)()
|
||||
obj := roundTrip(t, runtime.Object(&test.svc))
|
||||
svc := obj.(*v1.Service)
|
||||
|
||||
if test.expectedInternalTrafficPolicy == "" {
|
||||
if svc.Spec.InternalTrafficPolicy != nil {
|
||||
t.Fatalf("expected .spec.internalTrafficPolicy: null, got %v", *svc.Spec.InternalTrafficPolicy)
|
||||
}
|
||||
} else {
|
||||
if *svc.Spec.InternalTrafficPolicy != test.expectedInternalTrafficPolicy {
|
||||
t.Fatalf("expected .spec.internalTrafficPolicy: %v got %v", test.expectedInternalTrafficPolicy, *svc.Spec.InternalTrafficPolicy)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
@@ -7636,6 +7636,7 @@ func autoConvert_v1_ServiceSpec_To_core_ServiceSpec(in *v1.ServiceSpec, out *cor
|
||||
out.IPFamilyPolicy = (*core.IPFamilyPolicyType)(unsafe.Pointer(in.IPFamilyPolicy))
|
||||
out.AllocateLoadBalancerNodePorts = (*bool)(unsafe.Pointer(in.AllocateLoadBalancerNodePorts))
|
||||
out.LoadBalancerClass = (*string)(unsafe.Pointer(in.LoadBalancerClass))
|
||||
out.InternalTrafficPolicy = (*core.ServiceInternalTrafficPolicyType)(unsafe.Pointer(in.InternalTrafficPolicy))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -7664,6 +7665,7 @@ func autoConvert_core_ServiceSpec_To_v1_ServiceSpec(in *core.ServiceSpec, out *v
|
||||
out.TopologyKeys = *(*[]string)(unsafe.Pointer(&in.TopologyKeys))
|
||||
out.AllocateLoadBalancerNodePorts = (*bool)(unsafe.Pointer(in.AllocateLoadBalancerNodePorts))
|
||||
out.LoadBalancerClass = (*string)(unsafe.Pointer(in.LoadBalancerClass))
|
||||
out.InternalTrafficPolicy = (*v1.ServiceInternalTrafficPolicyType)(unsafe.Pointer(in.InternalTrafficPolicy))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -4171,6 +4171,8 @@ var supportedSessionAffinityType = sets.NewString(string(core.ServiceAffinityCli
|
||||
var supportedServiceType = sets.NewString(string(core.ServiceTypeClusterIP), string(core.ServiceTypeNodePort),
|
||||
string(core.ServiceTypeLoadBalancer), string(core.ServiceTypeExternalName))
|
||||
|
||||
var supportedServiceInternalTrafficPolicy = sets.NewString(string(core.ServiceInternalTrafficPolicyCluster), string(core.ServiceExternalTrafficPolicyTypeLocal))
|
||||
|
||||
var supportedServiceIPFamily = sets.NewString(string(core.IPv4Protocol), string(core.IPv6Protocol))
|
||||
var supportedServiceIPFamilyPolicy = sets.NewString(string(core.IPFamilyPolicySingleStack), string(core.IPFamilyPolicyPreferDualStack), string(core.IPFamilyPolicyRequireDualStack))
|
||||
|
||||
@@ -4378,6 +4380,10 @@ func ValidateService(service *core.Service) field.ErrorList {
|
||||
|
||||
// external traffic fields
|
||||
allErrs = append(allErrs, validateServiceExternalTrafficFieldsValue(service)...)
|
||||
|
||||
// internal traffic policy field
|
||||
allErrs = append(allErrs, validateServiceInternalTrafficFieldsValue(service)...)
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
@@ -4446,6 +4452,24 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro
|
||||
return allErrs
|
||||
}
|
||||
|
||||
// validateServiceInternalTrafficFieldsValue validates InternalTraffic related
|
||||
// spec have legal value.
|
||||
func validateServiceInternalTrafficFieldsValue(service *core.Service) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
|
||||
if service.Spec.InternalTrafficPolicy == nil {
|
||||
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("internalTrafficPolicy"), ""))
|
||||
}
|
||||
}
|
||||
|
||||
if service.Spec.InternalTrafficPolicy != nil && !supportedServiceInternalTrafficPolicy.Has(string(*service.Spec.InternalTrafficPolicy)) {
|
||||
allErrs = append(allErrs, field.NotSupported(field.NewPath("spec").Child("internalTrafficPolicy"), *service.Spec.InternalTrafficPolicy, supportedServiceInternalTrafficPolicy.List()))
|
||||
}
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
// ValidateServiceExternalTrafficFieldsCombination validates if ExternalTrafficPolicy,
|
||||
// HealthCheckNodePort and Type combination are legal. For update, it should be called
|
||||
// after clearing externalTraffic related fields for the ease of transitioning between
|
||||
|
@@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/featuregate"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/capabilities"
|
||||
@@ -10135,9 +10136,10 @@ func TestValidateServiceCreate(t *testing.T) {
|
||||
preferDualStack := core.IPFamilyPolicyPreferDualStack
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tweakSvc func(svc *core.Service) // given a basic valid service, each test case can customize it
|
||||
numErrs int
|
||||
name string
|
||||
tweakSvc func(svc *core.Service) // given a basic valid service, each test case can customize it
|
||||
numErrs int
|
||||
featureGates []featuregate.Feature
|
||||
}{
|
||||
{
|
||||
name: "missing namespace",
|
||||
@@ -10750,6 +10752,22 @@ func TestValidateServiceCreate(t *testing.T) {
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "nil internalTraffic field when feature gate is on",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.InternalTrafficPolicy = nil
|
||||
},
|
||||
featureGates: []featuregate.Feature{features.ServiceInternalTrafficPolicy},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "invalid internalTraffic field",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
invalid := core.ServiceInternalTrafficPolicyType("invalid")
|
||||
s.Spec.InternalTrafficPolicy = &invalid
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "nagative healthCheckNodePort field",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
@@ -11323,6 +11341,9 @@ func TestValidateServiceCreate(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
for i := range tc.featureGates {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, tc.featureGates[i], true)()
|
||||
}
|
||||
svc := makeValidService()
|
||||
tc.tweakSvc(&svc)
|
||||
errs := ValidateServiceCreate(&svc)
|
||||
|
5
pkg/apis/core/zz_generated.deepcopy.go
generated
5
pkg/apis/core/zz_generated.deepcopy.go
generated
@@ -5335,6 +5335,11 @@ func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
if in.InternalTrafficPolicy != nil {
|
||||
in, out := &in.InternalTrafficPolicy, &out.InternalTrafficPolicy
|
||||
*out = new(ServiceInternalTrafficPolicyType)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user