implement field selector for clusterIP on services

This will allow components that don't need to watch headless services
(heavily used on ai/ml workloads) to filter them server side.

Specially useful for kubelet and kube-proxy

Co-authored-by: Jianbo Ma <sakuranlbj@gmail.com>

Change-Id: I6434d2c8c77aaf725ec5c07acbcda14311f24bfa

Change-Id: Iba9e25afb90712facfb3dee25c500bbe08ef38fc
This commit is contained in:
Antonio Ojea
2023-12-31 17:57:36 +00:00
parent 2a44cb0ab3
commit 0595ec7942
5 changed files with 290 additions and 1 deletions

View File

@@ -19,10 +19,16 @@ package service
import (
"context"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
@@ -264,3 +270,139 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t
t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
}
}
func Test_ServiceClusterIPSelector(t *testing.T) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// create headless service
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-headless",
Namespace: ns.Name,
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
Selector: map[string]string{
"foo": "bar",
},
},
}
_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}
// informer to watch only non-headless services
kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String()
}))
serviceInformer := kubeInformers.Core().V1().Services().Informer()
serviceLister := kubeInformers.Core().V1().Services().Lister()
serviceHasSynced := serviceInformer.HasSynced
if _, err = serviceInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
t.Logf("Added Service %#v", svc)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldSvc := oldObj.(*corev1.Service)
newSvc := newObj.(*corev1.Service)
t.Logf("Updated Service %#v to %#v", oldSvc, newSvc)
},
DeleteFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
t.Logf("Deleted Service %#v", svc)
},
},
); err != nil {
t.Fatalf("Error adding service informer handler: %v", err)
}
kubeInformers.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), serviceHasSynced)
svcs, err := serviceLister.List(labels.Everything())
if err != nil {
t.Fatalf("Error listing services: %v", err)
}
// only the kubernetes.default service expected
if len(svcs) != 1 || svcs[0].Name != "kubernetes" {
t.Fatalf("expected 1 services, got %d", len(svcs))
}
// create a new service with ClusterIP
service2 := service.DeepCopy()
service2.Spec.ClusterIP = ""
service2.Name = "test-clusterip"
_, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service test-clusterip: %v", err)
}
// mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ...
service.Spec.ExternalName = "test"
service.Spec.Type = corev1.ServiceTypeExternalName
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service without ClusterIP: %v", err)
}
// mutate the Service to get the ClusterIP again
service.Spec.ExternalName = ""
service.Spec.ClusterIP = ""
service.Spec.Type = corev1.ServiceTypeClusterIP
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
if svc == nil || err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Error waiting for test service with ClusterIP: %v", err)
}
}