Revert "Implement a field selector for ClusterIP on Services"
This commit is contained in:
		@@ -97,9 +97,6 @@ func addConversionFuncs(scheme *runtime.Scheme) error {
 | 
			
		||||
	if err := AddFieldLabelConversionsForSecret(scheme); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if err := AddFieldLabelConversionsForService(scheme); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -491,20 +488,6 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error {
 | 
			
		||||
		})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error {
 | 
			
		||||
	return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"),
 | 
			
		||||
		func(label, value string) (string, string, error) {
 | 
			
		||||
			switch label {
 | 
			
		||||
			case "metadata.namespace",
 | 
			
		||||
				"metadata.name",
 | 
			
		||||
				"spec.clusterIP":
 | 
			
		||||
				return label, value, nil
 | 
			
		||||
			default:
 | 
			
		||||
				return "", "", fmt.Errorf("field label not supported: %s", label)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var initContainerAnnotations = map[string]bool{
 | 
			
		||||
	"pod.beta.kubernetes.io/init-containers":          true,
 | 
			
		||||
	"pod.alpha.kubernetes.io/init-containers":         true,
 | 
			
		||||
 
 | 
			
		||||
@@ -456,11 +456,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | 
			
		||||
	var serviceLister corelisters.ServiceLister
 | 
			
		||||
	var serviceHasSynced cache.InformerSynced
 | 
			
		||||
	if kubeDeps.KubeClient != nil {
 | 
			
		||||
		// don't watch headless services, they are not needed since this informer is only used to create the environment variables for pods.
 | 
			
		||||
		// See https://issues.k8s.io/122394
 | 
			
		||||
		kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
 | 
			
		||||
			options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", v1.ClusterIPNone).String()
 | 
			
		||||
		}))
 | 
			
		||||
		kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0)
 | 
			
		||||
		serviceLister = kubeInformers.Core().V1().Services().Lister()
 | 
			
		||||
		serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
 | 
			
		||||
		kubeInformers.Start(wait.NeverStop)
 | 
			
		||||
 
 | 
			
		||||
@@ -88,7 +88,6 @@ func NewREST(
 | 
			
		||||
	store := &genericregistry.Store{
 | 
			
		||||
		NewFunc:                   func() runtime.Object { return &api.Service{} },
 | 
			
		||||
		NewListFunc:               func() runtime.Object { return &api.ServiceList{} },
 | 
			
		||||
		PredicateFunc:             svcreg.Matcher,
 | 
			
		||||
		DefaultQualifiedResource:  api.Resource("services"),
 | 
			
		||||
		SingularQualifiedResource: api.Resource("service"),
 | 
			
		||||
		ReturnDeletedObject:       true,
 | 
			
		||||
@@ -100,10 +99,7 @@ func NewREST(
 | 
			
		||||
 | 
			
		||||
		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
 | 
			
		||||
	}
 | 
			
		||||
	options := &generic.StoreOptions{
 | 
			
		||||
		RESTOptions: optsGetter,
 | 
			
		||||
		AttrFunc:    svcreg.GetAttrs,
 | 
			
		||||
	}
 | 
			
		||||
	options := &generic.StoreOptions{RESTOptions: optsGetter}
 | 
			
		||||
	if err := store.CompleteWithOptions(options); err != nil {
 | 
			
		||||
		return nil, nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -18,16 +18,11 @@ package service
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"reflect"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/fields"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/validation/field"
 | 
			
		||||
	"k8s.io/apiserver/pkg/registry/generic"
 | 
			
		||||
	pkgstorage "k8s.io/apiserver/pkg/storage"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/names"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/legacyscheme"
 | 
			
		||||
@@ -171,33 +166,6 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAttrs returns labels and fields of a given object for filtering purposes.
 | 
			
		||||
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
 | 
			
		||||
	service, ok := obj.(*api.Service)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, nil, fmt.Errorf("not a service")
 | 
			
		||||
	}
 | 
			
		||||
	return service.Labels, SelectableFields(service), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Matcher returns a selection predicate for a given label and field selector.
 | 
			
		||||
func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate {
 | 
			
		||||
	return pkgstorage.SelectionPredicate{
 | 
			
		||||
		Label:    label,
 | 
			
		||||
		Field:    field,
 | 
			
		||||
		GetAttrs: GetAttrs,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SelectableFields returns a field set that can be used for filter selection
 | 
			
		||||
func SelectableFields(service *api.Service) fields.Set {
 | 
			
		||||
	objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, false)
 | 
			
		||||
	serviceSpecificFieldsSet := fields.Set{
 | 
			
		||||
		"spec.clusterIP": service.Spec.ClusterIP,
 | 
			
		||||
	}
 | 
			
		||||
	return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates
 | 
			
		||||
// are not enabled.  The typical pattern is:
 | 
			
		||||
//
 | 
			
		||||
 
 | 
			
		||||
@@ -23,8 +23,6 @@ import (
 | 
			
		||||
	"github.com/google/go-cmp/cmp"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/fields"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
			
		||||
	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
			
		||||
	"k8s.io/apiserver/pkg/registry/rest"
 | 
			
		||||
@@ -788,95 +786,3 @@ func TestDropTypeDependentFields(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMatchService(t *testing.T) {
 | 
			
		||||
	testCases := []struct {
 | 
			
		||||
		name          string
 | 
			
		||||
		in            *api.Service
 | 
			
		||||
		fieldSelector fields.Selector
 | 
			
		||||
		expectMatch   bool
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "match on headless service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
 | 
			
		||||
			expectMatch:   true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "no match on clusterIP service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
 | 
			
		||||
			expectMatch:   false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "match on clusterIP service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
 | 
			
		||||
			expectMatch:   true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "match on non-headless service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"),
 | 
			
		||||
			expectMatch:   true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "match on any ClusterIP set service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""),
 | 
			
		||||
			expectMatch:   true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "match on clusterIP IPv6 service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
 | 
			
		||||
			expectMatch:   true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "no match on headless service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
 | 
			
		||||
			expectMatch:   false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "no match on headless service",
 | 
			
		||||
			in: &api.Service{
 | 
			
		||||
				Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
			
		||||
			},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
 | 
			
		||||
			expectMatch:   false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:          "no match on empty service",
 | 
			
		||||
			in:            &api.Service{},
 | 
			
		||||
			fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
 | 
			
		||||
			expectMatch:   false,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, testCase := range testCases {
 | 
			
		||||
		t.Run(testCase.name, func(t *testing.T) {
 | 
			
		||||
			m := Matcher(labels.Everything(), testCase.fieldSelector)
 | 
			
		||||
			result, err := m.Matches(testCase.in)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Unexpected error %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			if result != testCase.expectMatch {
 | 
			
		||||
				t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -19,16 +19,10 @@ package service
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	corev1 "k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/fields"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
 | 
			
		||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
			
		||||
)
 | 
			
		||||
@@ -270,139 +264,3 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t
 | 
			
		||||
		t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_ServiceClusterIPSelector(t *testing.T) {
 | 
			
		||||
	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
			
		||||
	defer server.TearDownFn()
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	client, err := clientset.NewForConfig(server.ClientConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error creating clientset: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
 | 
			
		||||
	defer framework.DeleteNamespaceOrDie(client, ns, t)
 | 
			
		||||
 | 
			
		||||
	// create headless service
 | 
			
		||||
	service := &corev1.Service{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name:      "test-headless",
 | 
			
		||||
			Namespace: ns.Name,
 | 
			
		||||
		},
 | 
			
		||||
		Spec: corev1.ServiceSpec{
 | 
			
		||||
			ClusterIP: corev1.ClusterIPNone,
 | 
			
		||||
			Type:      corev1.ServiceTypeClusterIP,
 | 
			
		||||
			Ports: []corev1.ServicePort{{
 | 
			
		||||
				Port: int32(80),
 | 
			
		||||
			}},
 | 
			
		||||
			Selector: map[string]string{
 | 
			
		||||
				"foo": "bar",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error creating test service: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// informer to watch only non-headless services
 | 
			
		||||
	kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
 | 
			
		||||
		options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String()
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
	serviceInformer := kubeInformers.Core().V1().Services().Informer()
 | 
			
		||||
	serviceLister := kubeInformers.Core().V1().Services().Lister()
 | 
			
		||||
	serviceHasSynced := serviceInformer.HasSynced
 | 
			
		||||
	if _, err = serviceInformer.AddEventHandler(
 | 
			
		||||
		cache.ResourceEventHandlerFuncs{
 | 
			
		||||
			AddFunc: func(obj interface{}) {
 | 
			
		||||
				svc := obj.(*corev1.Service)
 | 
			
		||||
				t.Logf("Added Service %#v", svc)
 | 
			
		||||
			},
 | 
			
		||||
			UpdateFunc: func(oldObj, newObj interface{}) {
 | 
			
		||||
				oldSvc := oldObj.(*corev1.Service)
 | 
			
		||||
				newSvc := newObj.(*corev1.Service)
 | 
			
		||||
				t.Logf("Updated Service %#v to %#v", oldSvc, newSvc)
 | 
			
		||||
			},
 | 
			
		||||
			DeleteFunc: func(obj interface{}) {
 | 
			
		||||
				svc := obj.(*corev1.Service)
 | 
			
		||||
				t.Logf("Deleted Service %#v", svc)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	); err != nil {
 | 
			
		||||
		t.Fatalf("Error adding service informer handler: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	kubeInformers.Start(ctx.Done())
 | 
			
		||||
	cache.WaitForCacheSync(ctx.Done(), serviceHasSynced)
 | 
			
		||||
	svcs, err := serviceLister.List(labels.Everything())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error listing services: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	// only the kubernetes.default service expected
 | 
			
		||||
	if len(svcs) != 1 || svcs[0].Name != "kubernetes" {
 | 
			
		||||
		t.Fatalf("expected 1 services, got %d", len(svcs))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// create a new service with ClusterIP
 | 
			
		||||
	service2 := service.DeepCopy()
 | 
			
		||||
	service2.Spec.ClusterIP = ""
 | 
			
		||||
	service2.Name = "test-clusterip"
 | 
			
		||||
	_, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error creating test service: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
 | 
			
		||||
		svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name)
 | 
			
		||||
		if svc == nil || err != nil {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error waiting for test service test-clusterip: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ...
 | 
			
		||||
	service.Spec.ExternalName = "test"
 | 
			
		||||
	service.Spec.Type = corev1.ServiceTypeExternalName
 | 
			
		||||
	_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error creating test service: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
 | 
			
		||||
		svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
 | 
			
		||||
		if svc == nil || err != nil {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error waiting for test service without ClusterIP: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// mutate the Service to get the ClusterIP again
 | 
			
		||||
	service.Spec.ExternalName = ""
 | 
			
		||||
	service.Spec.ClusterIP = ""
 | 
			
		||||
	service.Spec.Type = corev1.ServiceTypeClusterIP
 | 
			
		||||
	_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error creating test service: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
 | 
			
		||||
		svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
 | 
			
		||||
		if svc == nil || err != nil {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error waiting for test service with ClusterIP: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user