From 0595ec79423606c60d782b67649e28f994a0c85d Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sun, 31 Dec 2023 17:57:36 +0000 Subject: [PATCH] implement field selector for clusterIP on services This will allow components that don't need to watch headless services (heavily used on ai/ml workloads) to filter them server side. Specially useful for kubelet and kube-proxy Co-authored-by: Jianbo Ma Change-Id: I6434d2c8c77aaf725ec5c07acbcda14311f24bfa Change-Id: Iba9e25afb90712facfb3dee25c500bbe08ef38fc --- pkg/apis/core/v1/conversion.go | 17 +++ pkg/registry/core/service/storage/storage.go | 6 +- pkg/registry/core/service/strategy.go | 32 +++++ pkg/registry/core/service/strategy_test.go | 94 ++++++++++++ test/integration/service/service_test.go | 142 +++++++++++++++++++ 5 files changed, 290 insertions(+), 1 deletion(-) diff --git a/pkg/apis/core/v1/conversion.go b/pkg/apis/core/v1/conversion.go index 6793616f3a8..28cf6e3efeb 100644 --- a/pkg/apis/core/v1/conversion.go +++ b/pkg/apis/core/v1/conversion.go @@ -97,6 +97,9 @@ func addConversionFuncs(scheme *runtime.Scheme) error { if err := AddFieldLabelConversionsForSecret(scheme); err != nil { return err } + if err := AddFieldLabelConversionsForService(scheme); err != nil { + return err + } return nil } @@ -488,6 +491,20 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error { }) } +func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error { + return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"), + func(label, value string) (string, string, error) { + switch label { + case "metadata.namespace", + "metadata.name", + "spec.clusterIP": + return label, value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) +} + var initContainerAnnotations = map[string]bool{ "pod.beta.kubernetes.io/init-containers": true, "pod.alpha.kubernetes.io/init-containers": true, diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index b659aaad630..a59c79ed042 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -88,6 +88,7 @@ func NewREST( store := &genericregistry.Store{ NewFunc: func() runtime.Object { return &api.Service{} }, NewListFunc: func() runtime.Object { return &api.ServiceList{} }, + PredicateFunc: svcreg.Matcher, DefaultQualifiedResource: api.Resource("services"), SingularQualifiedResource: api.Resource("service"), ReturnDeletedObject: true, @@ -99,7 +100,10 @@ func NewREST( TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, } - options := &generic.StoreOptions{RESTOptions: optsGetter} + options := &generic.StoreOptions{ + RESTOptions: optsGetter, + AttrFunc: svcreg.GetAttrs, + } if err := store.CompleteWithOptions(options); err != nil { return nil, nil, nil, err } diff --git a/pkg/registry/core/service/strategy.go b/pkg/registry/core/service/strategy.go index 4a91778a37f..f2c08fbb7d8 100644 --- a/pkg/registry/core/service/strategy.go +++ b/pkg/registry/core/service/strategy.go @@ -18,11 +18,16 @@ package service import ( "context" + "fmt" "reflect" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/registry/generic" + pkgstorage "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -162,6 +167,33 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt return nil } +// GetAttrs returns labels and fields of a given object for filtering purposes. +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { + service, ok := obj.(*api.Service) + if !ok { + return nil, nil, fmt.Errorf("not a service") + } + return service.Labels, SelectableFields(service), nil +} + +// Matcher returns a selection predicate for a given label and field selector. +func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate { + return pkgstorage.SelectionPredicate{ + Label: label, + Field: field, + GetAttrs: GetAttrs, + } +} + +// SelectableFields returns a field set that can be used for filter selection +func SelectableFields(service *api.Service) fields.Set { + objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, false) + serviceSpecificFieldsSet := fields.Set{ + "spec.clusterIP": service.Spec.ClusterIP, + } + return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet) +} + // dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates // are not enabled. The typical pattern is: // diff --git a/pkg/registry/core/service/strategy_test.go b/pkg/registry/core/service/strategy_test.go index d36ed678adb..66fbd94ab97 100644 --- a/pkg/registry/core/service/strategy_test.go +++ b/pkg/registry/core/service/strategy_test.go @@ -23,6 +23,8 @@ import ( "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -782,3 +784,95 @@ func TestDropTypeDependentFields(t *testing.T) { }) } } + +func TestMatchService(t *testing.T) { + testCases := []struct { + name string + in *api.Service + fieldSelector fields.Selector + expectMatch bool + }{ + { + name: "match on headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), + expectMatch: true, + }, + { + name: "no match on clusterIP service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), + expectMatch: false, + }, + { + name: "match on clusterIP service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"), + expectMatch: true, + }, + { + name: "match on non-headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"), + expectMatch: true, + }, + { + name: "match on any ClusterIP set service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""), + expectMatch: true, + }, + { + name: "match on clusterIP IPv6 service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"), + expectMatch: true, + }, + { + name: "no match on headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"), + expectMatch: false, + }, + { + name: "no match on headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"), + expectMatch: false, + }, + { + name: "no match on empty service", + in: &api.Service{}, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), + expectMatch: false, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + m := Matcher(labels.Everything(), testCase.fieldSelector) + result, err := m.Matches(testCase.in) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if result != testCase.expectMatch { + t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in) + } + }) + } +} diff --git a/test/integration/service/service_test.go b/test/integration/service/service_test.go index 5d675609e2a..5a626d7a4d0 100644 --- a/test/integration/service/service_test.go +++ b/test/integration/service/service_test.go @@ -19,10 +19,16 @@ package service import ( "context" "testing" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) @@ -264,3 +270,139 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs") } } + +func Test_ServiceClusterIPSelector(t *testing.T) { + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // create headless service + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-headless", + Namespace: ns.Name, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: corev1.ClusterIPNone, + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + Selector: map[string]string{ + "foo": "bar", + }, + }, + } + + _, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + // informer to watch only non-headless services + kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String() + })) + + serviceInformer := kubeInformers.Core().V1().Services().Informer() + serviceLister := kubeInformers.Core().V1().Services().Lister() + serviceHasSynced := serviceInformer.HasSynced + if _, err = serviceInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + svc := obj.(*corev1.Service) + t.Logf("Added Service %#v", svc) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldSvc := oldObj.(*corev1.Service) + newSvc := newObj.(*corev1.Service) + t.Logf("Updated Service %#v to %#v", oldSvc, newSvc) + }, + DeleteFunc: func(obj interface{}) { + svc := obj.(*corev1.Service) + t.Logf("Deleted Service %#v", svc) + }, + }, + ); err != nil { + t.Fatalf("Error adding service informer handler: %v", err) + } + kubeInformers.Start(ctx.Done()) + cache.WaitForCacheSync(ctx.Done(), serviceHasSynced) + svcs, err := serviceLister.List(labels.Everything()) + if err != nil { + t.Fatalf("Error listing services: %v", err) + } + // only the kubernetes.default service expected + if len(svcs) != 1 || svcs[0].Name != "kubernetes" { + t.Fatalf("expected 1 services, got %d", len(svcs)) + } + + // create a new service with ClusterIP + service2 := service.DeepCopy() + service2.Spec.ClusterIP = "" + service2.Name = "test-clusterip" + _, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name) + if svc == nil || err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Error waiting for test service test-clusterip: %v", err) + } + + // mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ... + service.Spec.ExternalName = "test" + service.Spec.Type = corev1.ServiceTypeExternalName + _, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + svc, err := serviceLister.Services(service.Namespace).Get(service.Name) + if svc == nil || err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Error waiting for test service without ClusterIP: %v", err) + } + + // mutate the Service to get the ClusterIP again + service.Spec.ExternalName = "" + service.Spec.ClusterIP = "" + service.Spec.Type = corev1.ServiceTypeClusterIP + _, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + svc, err := serviceLister.Services(service.Namespace).Get(service.Name) + if svc == nil || err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Error waiting for test service with ClusterIP: %v", err) + } +}