Merge pull request #123905 from aojea/field_selector
Field selector for Services based on ClusterIP and Type
This commit is contained in:
		| @@ -97,6 +97,9 @@ func addConversionFuncs(scheme *runtime.Scheme) error { | ||||
| 	if err := AddFieldLabelConversionsForSecret(scheme); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := AddFieldLabelConversionsForService(scheme); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -488,6 +491,21 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error { | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error { | ||||
| 	return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"), | ||||
| 		func(label, value string) (string, string, error) { | ||||
| 			switch label { | ||||
| 			case "metadata.namespace", | ||||
| 				"metadata.name", | ||||
| 				"spec.clusterIP", | ||||
| 				"spec.type": | ||||
| 				return label, value, nil | ||||
| 			default: | ||||
| 				return "", "", fmt.Errorf("field label not supported: %s", label) | ||||
| 			} | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| var initContainerAnnotations = map[string]bool{ | ||||
| 	"pod.beta.kubernetes.io/init-containers":          true, | ||||
| 	"pod.alpha.kubernetes.io/init-containers":         true, | ||||
|   | ||||
| @@ -455,7 +455,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, | ||||
| 	var serviceLister corelisters.ServiceLister | ||||
| 	var serviceHasSynced cache.InformerSynced | ||||
| 	if kubeDeps.KubeClient != nil { | ||||
| 		kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0) | ||||
| 		// don't watch headless services, they are not needed since this informer is only used to create the environment variables for pods. | ||||
| 		// See https://issues.k8s.io/122394 | ||||
| 		kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { | ||||
| 			options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", v1.ClusterIPNone).String() | ||||
| 		})) | ||||
| 		serviceLister = kubeInformers.Core().V1().Services().Lister() | ||||
| 		serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced | ||||
| 		kubeInformers.Start(wait.NeverStop) | ||||
|   | ||||
| @@ -88,6 +88,7 @@ func NewREST( | ||||
| 	store := &genericregistry.Store{ | ||||
| 		NewFunc:                   func() runtime.Object { return &api.Service{} }, | ||||
| 		NewListFunc:               func() runtime.Object { return &api.ServiceList{} }, | ||||
| 		PredicateFunc:             svcreg.Matcher, | ||||
| 		DefaultQualifiedResource:  api.Resource("services"), | ||||
| 		SingularQualifiedResource: api.Resource("service"), | ||||
| 		ReturnDeletedObject:       true, | ||||
| @@ -99,7 +100,10 @@ func NewREST( | ||||
|  | ||||
| 		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, | ||||
| 	} | ||||
| 	options := &generic.StoreOptions{RESTOptions: optsGetter} | ||||
| 	options := &generic.StoreOptions{ | ||||
| 		RESTOptions: optsGetter, | ||||
| 		AttrFunc:    svcreg.GetAttrs, | ||||
| 	} | ||||
| 	if err := store.CompleteWithOptions(options); err != nil { | ||||
| 		return nil, nil, nil, err | ||||
| 	} | ||||
|   | ||||
| @@ -18,11 +18,16 @@ package service | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/fields" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/validation/field" | ||||
| 	"k8s.io/apiserver/pkg/registry/generic" | ||||
| 	pkgstorage "k8s.io/apiserver/pkg/storage" | ||||
| 	"k8s.io/apiserver/pkg/storage/names" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/kubernetes/pkg/api/legacyscheme" | ||||
| @@ -166,6 +171,34 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // GetAttrs returns labels and fields of a given object for filtering purposes. | ||||
| func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { | ||||
| 	service, ok := obj.(*api.Service) | ||||
| 	if !ok { | ||||
| 		return nil, nil, fmt.Errorf("not a service") | ||||
| 	} | ||||
| 	return service.Labels, SelectableFields(service), nil | ||||
| } | ||||
|  | ||||
| // Matcher returns a selection predicate for a given label and field selector. | ||||
| func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate { | ||||
| 	return pkgstorage.SelectionPredicate{ | ||||
| 		Label:    label, | ||||
| 		Field:    field, | ||||
| 		GetAttrs: GetAttrs, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // SelectableFields returns a field set that can be used for filter selection | ||||
| func SelectableFields(service *api.Service) fields.Set { | ||||
| 	objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, true) | ||||
| 	serviceSpecificFieldsSet := fields.Set{ | ||||
| 		"spec.clusterIP": service.Spec.ClusterIP, | ||||
| 		"spec.type":      string(service.Spec.Type), | ||||
| 	} | ||||
| 	return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet) | ||||
| } | ||||
|  | ||||
| // dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates | ||||
| // are not enabled.  The typical pattern is: | ||||
| // | ||||
|   | ||||
| @@ -23,6 +23,8 @@ import ( | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
| 	"k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/fields" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/util/intstr" | ||||
| 	genericapirequest "k8s.io/apiserver/pkg/endpoints/request" | ||||
| 	"k8s.io/apiserver/pkg/registry/rest" | ||||
| @@ -785,3 +787,159 @@ func TestDropTypeDependentFields(t *testing.T) { | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestMatchService(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		name          string | ||||
| 		in            *api.Service | ||||
| 		fieldSelector fields.Selector | ||||
| 		expectMatch   bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "match on name", | ||||
| 			in: &api.Service{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "test", | ||||
| 					Namespace: "testns", | ||||
| 				}, | ||||
| 				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("metadata.name=test"), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "match on namespace", | ||||
| 			in: &api.Service{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "test", | ||||
| 					Namespace: "testns", | ||||
| 				}, | ||||
| 				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=testns"), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "no match on name", | ||||
| 			in: &api.Service{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "test", | ||||
| 					Namespace: "testns", | ||||
| 				}, | ||||
| 				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("metadata.name=nomatch"), | ||||
| 			expectMatch:   false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "no match on namespace", | ||||
| 			in: &api.Service{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "test", | ||||
| 					Namespace: "testns", | ||||
| 				}, | ||||
| 				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=nomatch"), | ||||
| 			expectMatch:   false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "match on loadbalancer type service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{Type: api.ServiceTypeLoadBalancer}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.type=LoadBalancer"), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "no match on nodeport type service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{Type: api.ServiceTypeNodePort}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.type=LoadBalancer"), | ||||
| 			expectMatch:   false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "match on headless service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "no match on clusterIP service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), | ||||
| 			expectMatch:   false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "match on clusterIP service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "match on non-headless service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "match on any ClusterIP set service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "match on clusterIP IPv6 service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"), | ||||
| 			expectMatch:   true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "no match on headless service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"), | ||||
| 			expectMatch:   false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "no match on headless service", | ||||
| 			in: &api.Service{ | ||||
| 				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, | ||||
| 			}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"), | ||||
| 			expectMatch:   false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "no match on empty service", | ||||
| 			in:            &api.Service{}, | ||||
| 			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), | ||||
| 			expectMatch:   false, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, testCase := range testCases { | ||||
| 		t.Run(testCase.name, func(t *testing.T) { | ||||
| 			m := Matcher(labels.Everything(), testCase.fieldSelector) | ||||
| 			result, err := m.Matches(testCase.in) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Unexpected error %v", err) | ||||
| 			} | ||||
| 			if result != testCase.expectMatch { | ||||
| 				t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -19,6 +19,7 @@ package service | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| @@ -26,11 +27,19 @@ import ( | ||||
| 	corev1 "k8s.io/api/core/v1" | ||||
| 	discoveryv1 "k8s.io/api/discovery/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/fields" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/intstr" | ||||
| 	utilrand "k8s.io/apimachinery/pkg/util/rand" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/client-go/informers" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 	watchtools "k8s.io/client-go/tools/watch" | ||||
| 	"k8s.io/client-go/util/retry" | ||||
| 	featuregatetesting "k8s.io/component-base/featuregate/testing" | ||||
| 	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" | ||||
| 	"k8s.io/kubernetes/pkg/controller/endpointslice" | ||||
| @@ -38,6 +47,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/test/integration/framework" | ||||
| 	"k8s.io/kubernetes/test/utils/format" | ||||
| 	"k8s.io/kubernetes/test/utils/ktesting" | ||||
| 	"k8s.io/utils/ptr" | ||||
| ) | ||||
|  | ||||
| // Test_ExternalNameServiceStopsDefaultingInternalTrafficPolicy tests that Services no longer default | ||||
| @@ -557,3 +567,382 @@ func Test_TransitionsForTrafficDistribution(t *testing.T) { | ||||
| 	} | ||||
| 	logsBuffer.Reset() | ||||
| } | ||||
|  | ||||
| func Test_ServiceClusterIPSelector(t *testing.T) { | ||||
| 	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) | ||||
| 	defer server.TearDownFn() | ||||
|  | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	client, err := clientset.NewForConfig(server.ClientConfig) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating clientset: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t) | ||||
| 	defer framework.DeleteNamespaceOrDie(client, ns, t) | ||||
|  | ||||
| 	// create headless service | ||||
| 	service := &corev1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      "test-headless", | ||||
| 			Namespace: ns.Name, | ||||
| 		}, | ||||
| 		Spec: corev1.ServiceSpec{ | ||||
| 			ClusterIP: corev1.ClusterIPNone, | ||||
| 			Type:      corev1.ServiceTypeClusterIP, | ||||
| 			Ports: []corev1.ServicePort{{ | ||||
| 				Port: int32(80), | ||||
| 			}}, | ||||
| 			Selector: map[string]string{ | ||||
| 				"foo": "bar", | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating test service: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// informer to watch only non-headless services | ||||
| 	kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { | ||||
| 		options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String() | ||||
| 	})) | ||||
|  | ||||
| 	serviceInformer := kubeInformers.Core().V1().Services().Informer() | ||||
| 	serviceLister := kubeInformers.Core().V1().Services().Lister() | ||||
| 	serviceHasSynced := serviceInformer.HasSynced | ||||
| 	if _, err = serviceInformer.AddEventHandler( | ||||
| 		cache.ResourceEventHandlerFuncs{ | ||||
| 			AddFunc: func(obj interface{}) { | ||||
| 				svc := obj.(*corev1.Service) | ||||
| 				t.Logf("Added Service %#v", svc) | ||||
| 			}, | ||||
| 			UpdateFunc: func(oldObj, newObj interface{}) { | ||||
| 				oldSvc := oldObj.(*corev1.Service) | ||||
| 				newSvc := newObj.(*corev1.Service) | ||||
| 				t.Logf("Updated Service %#v to %#v", oldSvc, newSvc) | ||||
| 			}, | ||||
| 			DeleteFunc: func(obj interface{}) { | ||||
| 				svc := obj.(*corev1.Service) | ||||
| 				t.Logf("Deleted Service %#v", svc) | ||||
| 			}, | ||||
| 		}, | ||||
| 	); err != nil { | ||||
| 		t.Fatalf("Error adding service informer handler: %v", err) | ||||
| 	} | ||||
| 	kubeInformers.Start(ctx.Done()) | ||||
| 	cache.WaitForCacheSync(ctx.Done(), serviceHasSynced) | ||||
| 	svcs, err := serviceLister.List(labels.Everything()) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error listing services: %v", err) | ||||
| 	} | ||||
| 	// only the kubernetes.default service expected | ||||
| 	if len(svcs) != 1 || svcs[0].Name != "kubernetes" { | ||||
| 		t.Fatalf("expected 1 services, got %d", len(svcs)) | ||||
| 	} | ||||
|  | ||||
| 	// create a new service with ClusterIP | ||||
| 	service2 := service.DeepCopy() | ||||
| 	service2.Spec.ClusterIP = "" | ||||
| 	service2.Name = "test-clusterip" | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating test service: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { | ||||
| 		svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name) | ||||
| 		if svc == nil || err != nil { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error waiting for test service test-clusterip: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ... | ||||
| 	service.Spec.ExternalName = "test" | ||||
| 	service.Spec.Type = corev1.ServiceTypeExternalName | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating test service: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { | ||||
| 		svc, err := serviceLister.Services(service.Namespace).Get(service.Name) | ||||
| 		if svc == nil || err != nil { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error waiting for test service without ClusterIP: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// mutate the Service to get the ClusterIP again | ||||
| 	service.Spec.ExternalName = "" | ||||
| 	service.Spec.ClusterIP = "" | ||||
| 	service.Spec.Type = corev1.ServiceTypeClusterIP | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating test service: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { | ||||
| 		svc, err := serviceLister.Services(service.Namespace).Get(service.Name) | ||||
| 		if svc == nil || err != nil { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error waiting for test service with ClusterIP: %v", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Repro https://github.com/kubernetes/kubernetes/issues/123853 | ||||
| func Test_ServiceWatchUntil(t *testing.T) { | ||||
| 	svcReadyTimeout := 30 * time.Second | ||||
|  | ||||
| 	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) | ||||
| 	defer server.TearDownFn() | ||||
|  | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	client, err := clientset.NewForConfig(server.ClientConfig) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating clientset: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	ns := framework.CreateNamespaceOrDie(client, "test-service-watchuntil", t) | ||||
| 	defer framework.DeleteNamespaceOrDie(client, ns, t) | ||||
|  | ||||
| 	testSvcName := "test-service-" + utilrand.String(5) | ||||
| 	testSvcLabels := map[string]string{"test-service-static": "true"} | ||||
| 	testSvcLabelsFlat := "test-service-static=true" | ||||
|  | ||||
| 	w := &cache.ListWatch{ | ||||
| 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||||
| 			options.LabelSelector = testSvcLabelsFlat | ||||
| 			return client.CoreV1().Services(ns.Name).Watch(ctx, options) | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	svcList, err := client.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: testSvcLabelsFlat}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to list Services: %v", err) | ||||
| 	} | ||||
| 	// create  service | ||||
| 	service := &corev1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:   testSvcName, | ||||
| 			Labels: testSvcLabels, | ||||
| 		}, | ||||
| 		Spec: corev1.ServiceSpec{ | ||||
| 			Type: "LoadBalancer", | ||||
| 			Ports: []corev1.ServicePort{{ | ||||
| 				Name:       "http", | ||||
| 				Protocol:   corev1.ProtocolTCP, | ||||
| 				Port:       int32(80), | ||||
| 				TargetPort: intstr.FromInt32(80), | ||||
| 			}}, | ||||
| 			LoadBalancerClass: ptr.To[string]("example.com/internal-vip"), | ||||
| 		}, | ||||
| 	} | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating test service: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	ctxUntil, cancel := context.WithTimeout(ctx, svcReadyTimeout) | ||||
| 	defer cancel() | ||||
| 	_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { | ||||
| 		if svc, ok := event.Object.(*corev1.Service); ok { | ||||
| 			found := svc.ObjectMeta.Name == service.ObjectMeta.Name && | ||||
| 				svc.ObjectMeta.Namespace == ns.Name && | ||||
| 				svc.Labels["test-service-static"] == "true" | ||||
| 			if !found { | ||||
| 				t.Logf("observed Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports) | ||||
| 				return false, nil | ||||
| 			} | ||||
| 			t.Logf("Found Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports) | ||||
| 			return found, nil | ||||
| 		} | ||||
| 		t.Logf("Observed event: %+v", event.Object) | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error service not found: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	t.Log("patching the ServiceStatus") | ||||
| 	lbStatus := corev1.LoadBalancerStatus{ | ||||
| 		Ingress: []corev1.LoadBalancerIngress{{IP: "203.0.113.1"}}, | ||||
| 	} | ||||
| 	lbStatusJSON, err := json.Marshal(lbStatus) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error marshalling status: %v", err) | ||||
| 	} | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Patch(ctx, testSvcName, types.MergePatchType, | ||||
| 		[]byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":{"loadBalancer":`+string(lbStatusJSON)+`}}`), | ||||
| 		metav1.PatchOptions{}, "status") | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Could not patch service status: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	t.Log("watching for the Service to be patched") | ||||
| 	ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { | ||||
| 		if svc, ok := event.Object.(*corev1.Service); ok { | ||||
| 			found := svc.ObjectMeta.Name == service.ObjectMeta.Name && | ||||
| 				svc.ObjectMeta.Namespace == ns.Name && | ||||
| 				svc.Annotations["patchedstatus"] == "true" | ||||
| 			if !found { | ||||
| 				t.Logf("observed Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) | ||||
| 				return false, nil | ||||
| 			} | ||||
| 			t.Logf("Found Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) | ||||
| 			return found, nil | ||||
| 		} | ||||
| 		t.Logf("Observed event: %+v", event.Object) | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns) | ||||
| 	} | ||||
| 	t.Logf("Service %s has service status patched", testSvcName) | ||||
|  | ||||
| 	t.Log("updating the ServiceStatus") | ||||
|  | ||||
| 	var statusToUpdate, updatedStatus *corev1.Service | ||||
| 	err = retry.RetryOnConflict(retry.DefaultRetry, func() error { | ||||
| 		statusToUpdate, err = client.CoreV1().Services(ns.Name).Get(ctx, testSvcName, metav1.GetOptions{}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, metav1.Condition{ | ||||
| 			Type:    "StatusUpdate", | ||||
| 			Status:  metav1.ConditionTrue, | ||||
| 			Reason:  "E2E", | ||||
| 			Message: "Set from e2e test", | ||||
| 		}) | ||||
|  | ||||
| 		updatedStatus, err = client.CoreV1().Services(ns.Name).UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{}) | ||||
| 		return err | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("\n\n Failed to UpdateStatus. %v\n\n", err) | ||||
| 	} | ||||
| 	t.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions) | ||||
|  | ||||
| 	t.Log("watching for the Service to be updated") | ||||
| 	ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) | ||||
| 	defer cancel() | ||||
| 	_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { | ||||
| 		if svc, ok := event.Object.(*corev1.Service); ok { | ||||
| 			found := svc.ObjectMeta.Name == service.ObjectMeta.Name && | ||||
| 				svc.ObjectMeta.Namespace == ns.Name && | ||||
| 				svc.Annotations["patchedstatus"] == "true" | ||||
| 			if !found { | ||||
| 				t.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) | ||||
| 				return false, nil | ||||
| 			} | ||||
| 			for _, cond := range svc.Status.Conditions { | ||||
| 				if cond.Type == "StatusUpdate" && | ||||
| 					cond.Reason == "E2E" && | ||||
| 					cond.Message == "Set from e2e test" { | ||||
| 					t.Logf("Found Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.Conditions) | ||||
| 					return found, nil | ||||
| 				} else { | ||||
| 					t.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) | ||||
| 					return false, nil | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		t.Logf("Observed event: %+v", event.Object) | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns) | ||||
| 	} | ||||
| 	t.Logf("Service %s has service status updated", testSvcName) | ||||
|  | ||||
| 	t.Log("patching the service") | ||||
| 	servicePatchPayload, err := json.Marshal(corev1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Labels: map[string]string{ | ||||
| 				"test-service": "patched", | ||||
| 			}, | ||||
| 		}, | ||||
| 	}) | ||||
|  | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Patch(ctx, testSvcName, types.StrategicMergePatchType, []byte(servicePatchPayload), metav1.PatchOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to patch service. %v", err) | ||||
| 	} | ||||
|  | ||||
| 	t.Log("watching for the Service to be patched") | ||||
| 	ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) | ||||
| 	defer cancel() | ||||
| 	_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { | ||||
| 		if svc, ok := event.Object.(*corev1.Service); ok { | ||||
| 			found := svc.ObjectMeta.Name == service.ObjectMeta.Name && | ||||
| 				svc.ObjectMeta.Namespace == ns.Name && | ||||
| 				svc.Labels["test-service"] == "patched" | ||||
| 			if !found { | ||||
| 				t.Logf("observed Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels) | ||||
| 				return false, nil | ||||
| 			} | ||||
| 			t.Logf("Found Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels) | ||||
| 			return found, nil | ||||
| 		} | ||||
| 		t.Logf("Observed event: %+v", event.Object) | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns) | ||||
| 	} | ||||
|  | ||||
| 	t.Logf("Service %s patched", testSvcName) | ||||
|  | ||||
| 	t.Log("deleting the service") | ||||
| 	err = client.CoreV1().Services(ns.Name).Delete(ctx, testSvcName, metav1.DeleteOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to delete the Service. %v", err) | ||||
| 	} | ||||
|  | ||||
| 	t.Log("watching for the Service to be deleted") | ||||
| 	ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) | ||||
| 	defer cancel() | ||||
| 	_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { | ||||
| 		switch event.Type { | ||||
| 		case watch.Deleted: | ||||
| 			if svc, ok := event.Object.(*corev1.Service); ok { | ||||
| 				found := svc.ObjectMeta.Name == service.ObjectMeta.Name && | ||||
| 					svc.ObjectMeta.Namespace == ns.Name && | ||||
| 					svc.Labels["test-service-static"] == "true" | ||||
| 				if !found { | ||||
| 					t.Logf("observed Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations) | ||||
| 					return false, nil | ||||
| 				} | ||||
| 				t.Logf("Found Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations) | ||||
| 				return found, nil | ||||
| 			} | ||||
| 		default: | ||||
| 			t.Logf("Observed event: %+v", event.Type) | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to delete Service %v in namespace %v", service.ObjectMeta.Name, ns) | ||||
| 	} | ||||
| 	t.Logf("Service %s deleted", testSvcName) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot