implement field selector for clusterIP on services
This will allow components that don't need to watch headless services (heavily used on ai/ml workloads) to filter them server side. Specially useful for kubelet and kube-proxy Change-Id: If36c2c27f2ec80db400c9133c61428d14e124f3e
This commit is contained in:
		@@ -97,6 +97,9 @@ func addConversionFuncs(scheme *runtime.Scheme) error {
 | 
				
			|||||||
	if err := AddFieldLabelConversionsForSecret(scheme); err != nil {
 | 
						if err := AddFieldLabelConversionsForSecret(scheme); err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if err := AddFieldLabelConversionsForService(scheme); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -488,6 +491,20 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error {
 | 
				
			|||||||
		})
 | 
							})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error {
 | 
				
			||||||
 | 
						return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"),
 | 
				
			||||||
 | 
							func(label, value string) (string, string, error) {
 | 
				
			||||||
 | 
								switch label {
 | 
				
			||||||
 | 
								case "metadata.namespace",
 | 
				
			||||||
 | 
									"metadata.name",
 | 
				
			||||||
 | 
									"spec.clusterIP":
 | 
				
			||||||
 | 
									return label, value, nil
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									return "", "", fmt.Errorf("field label not supported: %s", label)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var initContainerAnnotations = map[string]bool{
 | 
					var initContainerAnnotations = map[string]bool{
 | 
				
			||||||
	"pod.beta.kubernetes.io/init-containers":          true,
 | 
						"pod.beta.kubernetes.io/init-containers":          true,
 | 
				
			||||||
	"pod.alpha.kubernetes.io/init-containers":         true,
 | 
						"pod.alpha.kubernetes.io/init-containers":         true,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -88,6 +88,7 @@ func NewREST(
 | 
				
			|||||||
	store := &genericregistry.Store{
 | 
						store := &genericregistry.Store{
 | 
				
			||||||
		NewFunc:                   func() runtime.Object { return &api.Service{} },
 | 
							NewFunc:                   func() runtime.Object { return &api.Service{} },
 | 
				
			||||||
		NewListFunc:               func() runtime.Object { return &api.ServiceList{} },
 | 
							NewListFunc:               func() runtime.Object { return &api.ServiceList{} },
 | 
				
			||||||
 | 
							PredicateFunc:             svcreg.Matcher,
 | 
				
			||||||
		DefaultQualifiedResource:  api.Resource("services"),
 | 
							DefaultQualifiedResource:  api.Resource("services"),
 | 
				
			||||||
		SingularQualifiedResource: api.Resource("service"),
 | 
							SingularQualifiedResource: api.Resource("service"),
 | 
				
			||||||
		ReturnDeletedObject:       true,
 | 
							ReturnDeletedObject:       true,
 | 
				
			||||||
@@ -99,7 +100,10 @@ func NewREST(
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
 | 
							TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	options := &generic.StoreOptions{RESTOptions: optsGetter}
 | 
						options := &generic.StoreOptions{
 | 
				
			||||||
 | 
							RESTOptions: optsGetter,
 | 
				
			||||||
 | 
							AttrFunc:    svcreg.GetAttrs,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	if err := store.CompleteWithOptions(options); err != nil {
 | 
						if err := store.CompleteWithOptions(options); err != nil {
 | 
				
			||||||
		return nil, nil, nil, err
 | 
							return nil, nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,11 +18,16 @@ package service
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/labels"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/validation/field"
 | 
						"k8s.io/apimachinery/pkg/util/validation/field"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/registry/generic"
 | 
				
			||||||
 | 
						pkgstorage "k8s.io/apiserver/pkg/storage"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/storage/names"
 | 
						"k8s.io/apiserver/pkg/storage/names"
 | 
				
			||||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/legacyscheme"
 | 
						"k8s.io/kubernetes/pkg/api/legacyscheme"
 | 
				
			||||||
@@ -166,6 +171,33 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetAttrs returns labels and fields of a given object for filtering purposes.
 | 
				
			||||||
 | 
					func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
 | 
				
			||||||
 | 
						service, ok := obj.(*api.Service)
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							return nil, nil, fmt.Errorf("not a service")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return service.Labels, SelectableFields(service), nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Matcher returns a selection predicate for a given label and field selector.
 | 
				
			||||||
 | 
					func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate {
 | 
				
			||||||
 | 
						return pkgstorage.SelectionPredicate{
 | 
				
			||||||
 | 
							Label:    label,
 | 
				
			||||||
 | 
							Field:    field,
 | 
				
			||||||
 | 
							GetAttrs: GetAttrs,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SelectableFields returns a field set that can be used for filter selection
 | 
				
			||||||
 | 
					func SelectableFields(service *api.Service) fields.Set {
 | 
				
			||||||
 | 
						objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, true)
 | 
				
			||||||
 | 
						serviceSpecificFieldsSet := fields.Set{
 | 
				
			||||||
 | 
							"spec.clusterIP": service.Spec.ClusterIP,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates
 | 
					// dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates
 | 
				
			||||||
// are not enabled.  The typical pattern is:
 | 
					// are not enabled.  The typical pattern is:
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,6 +23,8 @@ import (
 | 
				
			|||||||
	"github.com/google/go-cmp/cmp"
 | 
						"github.com/google/go-cmp/cmp"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						"k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/labels"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
						"k8s.io/apimachinery/pkg/util/intstr"
 | 
				
			||||||
	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
						genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/registry/rest"
 | 
						"k8s.io/apiserver/pkg/registry/rest"
 | 
				
			||||||
@@ -786,3 +788,143 @@ func TestDropTypeDependentFields(t *testing.T) {
 | 
				
			|||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestMatchService(t *testing.T) {
 | 
				
			||||||
 | 
						testCases := []struct {
 | 
				
			||||||
 | 
							name          string
 | 
				
			||||||
 | 
							in            *api.Service
 | 
				
			||||||
 | 
							fieldSelector fields.Selector
 | 
				
			||||||
 | 
							expectMatch   bool
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "match on name",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
										Name:      "test",
 | 
				
			||||||
 | 
										Namespace: "testns",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("metadata.name=test"),
 | 
				
			||||||
 | 
								expectMatch:   true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "match on namespace",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
										Name:      "test",
 | 
				
			||||||
 | 
										Namespace: "testns",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=testns"),
 | 
				
			||||||
 | 
								expectMatch:   true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "no match on name",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
										Name:      "test",
 | 
				
			||||||
 | 
										Namespace: "testns",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("metadata.name=nomatch"),
 | 
				
			||||||
 | 
								expectMatch:   false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "no match on namespace",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
										Name:      "test",
 | 
				
			||||||
 | 
										Namespace: "testns",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=nomatch"),
 | 
				
			||||||
 | 
								expectMatch:   false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "match on headless service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
 | 
				
			||||||
 | 
								expectMatch:   true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "no match on clusterIP service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
 | 
				
			||||||
 | 
								expectMatch:   false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "match on clusterIP service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
 | 
				
			||||||
 | 
								expectMatch:   true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "match on non-headless service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"),
 | 
				
			||||||
 | 
								expectMatch:   true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "match on any ClusterIP set service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""),
 | 
				
			||||||
 | 
								expectMatch:   true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "match on clusterIP IPv6 service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
 | 
				
			||||||
 | 
								expectMatch:   true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "no match on headless service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
 | 
				
			||||||
 | 
								expectMatch:   false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "no match on headless service",
 | 
				
			||||||
 | 
								in: &api.Service{
 | 
				
			||||||
 | 
									Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
 | 
				
			||||||
 | 
								expectMatch:   false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:          "no match on empty service",
 | 
				
			||||||
 | 
								in:            &api.Service{},
 | 
				
			||||||
 | 
								fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
 | 
				
			||||||
 | 
								expectMatch:   false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, testCase := range testCases {
 | 
				
			||||||
 | 
							t.Run(testCase.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								m := Matcher(labels.Everything(), testCase.fieldSelector)
 | 
				
			||||||
 | 
								result, err := m.Matches(testCase.in)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Errorf("Unexpected error %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if result != testCase.expectMatch {
 | 
				
			||||||
 | 
									t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -19,6 +19,7 @@ package service
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
@@ -26,11 +27,19 @@ import (
 | 
				
			|||||||
	corev1 "k8s.io/api/core/v1"
 | 
						corev1 "k8s.io/api/core/v1"
 | 
				
			||||||
	discoveryv1 "k8s.io/api/discovery/v1"
 | 
						discoveryv1 "k8s.io/api/discovery/v1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/labels"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/types"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
						"k8s.io/apimachinery/pkg/util/intstr"
 | 
				
			||||||
 | 
						utilrand "k8s.io/apimachinery/pkg/util/rand"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
				
			||||||
	"k8s.io/client-go/informers"
 | 
						"k8s.io/client-go/informers"
 | 
				
			||||||
	clientset "k8s.io/client-go/kubernetes"
 | 
						clientset "k8s.io/client-go/kubernetes"
 | 
				
			||||||
 | 
						"k8s.io/client-go/tools/cache"
 | 
				
			||||||
 | 
						watchtools "k8s.io/client-go/tools/watch"
 | 
				
			||||||
 | 
						"k8s.io/client-go/util/retry"
 | 
				
			||||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
						featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
				
			||||||
	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
 | 
						kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/controller/endpointslice"
 | 
						"k8s.io/kubernetes/pkg/controller/endpointslice"
 | 
				
			||||||
@@ -38,6 +47,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
						"k8s.io/kubernetes/test/integration/framework"
 | 
				
			||||||
	"k8s.io/kubernetes/test/utils/format"
 | 
						"k8s.io/kubernetes/test/utils/format"
 | 
				
			||||||
	"k8s.io/kubernetes/test/utils/ktesting"
 | 
						"k8s.io/kubernetes/test/utils/ktesting"
 | 
				
			||||||
 | 
						"k8s.io/utils/ptr"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Test_ExternalNameServiceStopsDefaultingInternalTrafficPolicy tests that Services no longer default
 | 
					// Test_ExternalNameServiceStopsDefaultingInternalTrafficPolicy tests that Services no longer default
 | 
				
			||||||
@@ -557,3 +567,382 @@ func Test_TransitionsForTrafficDistribution(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	logsBuffer.Reset()
 | 
						logsBuffer.Reset()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func Test_ServiceClusterIPSelector(t *testing.T) {
 | 
				
			||||||
 | 
						server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
				
			||||||
 | 
						defer server.TearDownFn()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						client, err := clientset.NewForConfig(server.ClientConfig)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error creating clientset: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
 | 
				
			||||||
 | 
						defer framework.DeleteNamespaceOrDie(client, ns, t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// create headless service
 | 
				
			||||||
 | 
						service := &corev1.Service{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								Name:      "test-headless",
 | 
				
			||||||
 | 
								Namespace: ns.Name,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Spec: corev1.ServiceSpec{
 | 
				
			||||||
 | 
								ClusterIP: corev1.ClusterIPNone,
 | 
				
			||||||
 | 
								Type:      corev1.ServiceTypeClusterIP,
 | 
				
			||||||
 | 
								Ports: []corev1.ServicePort{{
 | 
				
			||||||
 | 
									Port: int32(80),
 | 
				
			||||||
 | 
								}},
 | 
				
			||||||
 | 
								Selector: map[string]string{
 | 
				
			||||||
 | 
									"foo": "bar",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error creating test service: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// informer to watch only non-headless services
 | 
				
			||||||
 | 
						kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
 | 
				
			||||||
 | 
							options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String()
 | 
				
			||||||
 | 
						}))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						serviceInformer := kubeInformers.Core().V1().Services().Informer()
 | 
				
			||||||
 | 
						serviceLister := kubeInformers.Core().V1().Services().Lister()
 | 
				
			||||||
 | 
						serviceHasSynced := serviceInformer.HasSynced
 | 
				
			||||||
 | 
						if _, err = serviceInformer.AddEventHandler(
 | 
				
			||||||
 | 
							cache.ResourceEventHandlerFuncs{
 | 
				
			||||||
 | 
								AddFunc: func(obj interface{}) {
 | 
				
			||||||
 | 
									svc := obj.(*corev1.Service)
 | 
				
			||||||
 | 
									t.Logf("Added Service %#v", svc)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								UpdateFunc: func(oldObj, newObj interface{}) {
 | 
				
			||||||
 | 
									oldSvc := oldObj.(*corev1.Service)
 | 
				
			||||||
 | 
									newSvc := newObj.(*corev1.Service)
 | 
				
			||||||
 | 
									t.Logf("Updated Service %#v to %#v", oldSvc, newSvc)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								DeleteFunc: func(obj interface{}) {
 | 
				
			||||||
 | 
									svc := obj.(*corev1.Service)
 | 
				
			||||||
 | 
									t.Logf("Deleted Service %#v", svc)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error adding service informer handler: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						kubeInformers.Start(ctx.Done())
 | 
				
			||||||
 | 
						cache.WaitForCacheSync(ctx.Done(), serviceHasSynced)
 | 
				
			||||||
 | 
						svcs, err := serviceLister.List(labels.Everything())
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error listing services: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// only the kubernetes.default service expected
 | 
				
			||||||
 | 
						if len(svcs) != 1 || svcs[0].Name != "kubernetes" {
 | 
				
			||||||
 | 
							t.Fatalf("expected 1 services, got %d", len(svcs))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// create a new service with ClusterIP
 | 
				
			||||||
 | 
						service2 := service.DeepCopy()
 | 
				
			||||||
 | 
						service2.Spec.ClusterIP = ""
 | 
				
			||||||
 | 
						service2.Name = "test-clusterip"
 | 
				
			||||||
 | 
						_, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error creating test service: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
 | 
				
			||||||
 | 
							svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name)
 | 
				
			||||||
 | 
							if svc == nil || err != nil {
 | 
				
			||||||
 | 
								return false, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return true, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error waiting for test service test-clusterip: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ...
 | 
				
			||||||
 | 
						service.Spec.ExternalName = "test"
 | 
				
			||||||
 | 
						service.Spec.Type = corev1.ServiceTypeExternalName
 | 
				
			||||||
 | 
						_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error creating test service: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
 | 
				
			||||||
 | 
							svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
 | 
				
			||||||
 | 
							if svc == nil || err != nil {
 | 
				
			||||||
 | 
								return false, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return true, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error waiting for test service without ClusterIP: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// mutate the Service to get the ClusterIP again
 | 
				
			||||||
 | 
						service.Spec.ExternalName = ""
 | 
				
			||||||
 | 
						service.Spec.ClusterIP = ""
 | 
				
			||||||
 | 
						service.Spec.Type = corev1.ServiceTypeClusterIP
 | 
				
			||||||
 | 
						_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error creating test service: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
 | 
				
			||||||
 | 
							svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
 | 
				
			||||||
 | 
							if svc == nil || err != nil {
 | 
				
			||||||
 | 
								return false, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return true, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error waiting for test service with ClusterIP: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Repro https://github.com/kubernetes/kubernetes/issues/123853
 | 
				
			||||||
 | 
					func Test_ServiceWatchUntil(t *testing.T) {
 | 
				
			||||||
 | 
						svcReadyTimeout := 30 * time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
 | 
				
			||||||
 | 
						defer server.TearDownFn()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						client, err := clientset.NewForConfig(server.ClientConfig)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error creating clientset: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ns := framework.CreateNamespaceOrDie(client, "test-service-watchuntil", t)
 | 
				
			||||||
 | 
						defer framework.DeleteNamespaceOrDie(client, ns, t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						testSvcName := "test-service-" + utilrand.String(5)
 | 
				
			||||||
 | 
						testSvcLabels := map[string]string{"test-service-static": "true"}
 | 
				
			||||||
 | 
						testSvcLabelsFlat := "test-service-static=true"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						w := &cache.ListWatch{
 | 
				
			||||||
 | 
							WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
 | 
								options.LabelSelector = testSvcLabelsFlat
 | 
				
			||||||
 | 
								return client.CoreV1().Services(ns.Name).Watch(ctx, options)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						svcList, err := client.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: testSvcLabelsFlat})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to list Services: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// create  service
 | 
				
			||||||
 | 
						service := &corev1.Service{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								Name:   testSvcName,
 | 
				
			||||||
 | 
								Labels: testSvcLabels,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Spec: corev1.ServiceSpec{
 | 
				
			||||||
 | 
								Type: "LoadBalancer",
 | 
				
			||||||
 | 
								Ports: []corev1.ServicePort{{
 | 
				
			||||||
 | 
									Name:       "http",
 | 
				
			||||||
 | 
									Protocol:   corev1.ProtocolTCP,
 | 
				
			||||||
 | 
									Port:       int32(80),
 | 
				
			||||||
 | 
									TargetPort: intstr.FromInt32(80),
 | 
				
			||||||
 | 
								}},
 | 
				
			||||||
 | 
								LoadBalancerClass: ptr.To[string]("example.com/internal-vip"),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error creating test service: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ctxUntil, cancel := context.WithTimeout(ctx, svcReadyTimeout)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
							if svc, ok := event.Object.(*corev1.Service); ok {
 | 
				
			||||||
 | 
								found := svc.ObjectMeta.Name == service.ObjectMeta.Name &&
 | 
				
			||||||
 | 
									svc.ObjectMeta.Namespace == ns.Name &&
 | 
				
			||||||
 | 
									svc.Labels["test-service-static"] == "true"
 | 
				
			||||||
 | 
								if !found {
 | 
				
			||||||
 | 
									t.Logf("observed Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports)
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								t.Logf("Found Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports)
 | 
				
			||||||
 | 
								return found, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							t.Logf("Observed event: %+v", event.Object)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error service not found: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("patching the ServiceStatus")
 | 
				
			||||||
 | 
						lbStatus := corev1.LoadBalancerStatus{
 | 
				
			||||||
 | 
							Ingress: []corev1.LoadBalancerIngress{{IP: "203.0.113.1"}},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						lbStatusJSON, err := json.Marshal(lbStatus)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error marshalling status: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = client.CoreV1().Services(ns.Name).Patch(ctx, testSvcName, types.MergePatchType,
 | 
				
			||||||
 | 
							[]byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":{"loadBalancer":`+string(lbStatusJSON)+`}}`),
 | 
				
			||||||
 | 
							metav1.PatchOptions{}, "status")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Could not patch service status: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("watching for the Service to be patched")
 | 
				
			||||||
 | 
						ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
							if svc, ok := event.Object.(*corev1.Service); ok {
 | 
				
			||||||
 | 
								found := svc.ObjectMeta.Name == service.ObjectMeta.Name &&
 | 
				
			||||||
 | 
									svc.ObjectMeta.Namespace == ns.Name &&
 | 
				
			||||||
 | 
									svc.Annotations["patchedstatus"] == "true"
 | 
				
			||||||
 | 
								if !found {
 | 
				
			||||||
 | 
									t.Logf("observed Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								t.Logf("Found Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
 | 
				
			||||||
 | 
								return found, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							t.Logf("Observed event: %+v", event.Object)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Logf("Service %s has service status patched", testSvcName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("updating the ServiceStatus")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var statusToUpdate, updatedStatus *corev1.Service
 | 
				
			||||||
 | 
						err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
 | 
				
			||||||
 | 
							statusToUpdate, err = client.CoreV1().Services(ns.Name).Get(ctx, testSvcName, metav1.GetOptions{})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, metav1.Condition{
 | 
				
			||||||
 | 
								Type:    "StatusUpdate",
 | 
				
			||||||
 | 
								Status:  metav1.ConditionTrue,
 | 
				
			||||||
 | 
								Reason:  "E2E",
 | 
				
			||||||
 | 
								Message: "Set from e2e test",
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							updatedStatus, err = client.CoreV1().Services(ns.Name).UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("\n\n Failed to UpdateStatus. %v\n\n", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("watching for the Service to be updated")
 | 
				
			||||||
 | 
						ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
							if svc, ok := event.Object.(*corev1.Service); ok {
 | 
				
			||||||
 | 
								found := svc.ObjectMeta.Name == service.ObjectMeta.Name &&
 | 
				
			||||||
 | 
									svc.ObjectMeta.Namespace == ns.Name &&
 | 
				
			||||||
 | 
									svc.Annotations["patchedstatus"] == "true"
 | 
				
			||||||
 | 
								if !found {
 | 
				
			||||||
 | 
									t.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								for _, cond := range svc.Status.Conditions {
 | 
				
			||||||
 | 
									if cond.Type == "StatusUpdate" &&
 | 
				
			||||||
 | 
										cond.Reason == "E2E" &&
 | 
				
			||||||
 | 
										cond.Message == "Set from e2e test" {
 | 
				
			||||||
 | 
										t.Logf("Found Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.Conditions)
 | 
				
			||||||
 | 
										return found, nil
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										t.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
 | 
				
			||||||
 | 
										return false, nil
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							t.Logf("Observed event: %+v", event.Object)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Logf("Service %s has service status updated", testSvcName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("patching the service")
 | 
				
			||||||
 | 
						servicePatchPayload, err := json.Marshal(corev1.Service{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								Labels: map[string]string{
 | 
				
			||||||
 | 
									"test-service": "patched",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						_, err = client.CoreV1().Services(ns.Name).Patch(ctx, testSvcName, types.StrategicMergePatchType, []byte(servicePatchPayload), metav1.PatchOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to patch service. %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("watching for the Service to be patched")
 | 
				
			||||||
 | 
						ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
							if svc, ok := event.Object.(*corev1.Service); ok {
 | 
				
			||||||
 | 
								found := svc.ObjectMeta.Name == service.ObjectMeta.Name &&
 | 
				
			||||||
 | 
									svc.ObjectMeta.Namespace == ns.Name &&
 | 
				
			||||||
 | 
									svc.Labels["test-service"] == "patched"
 | 
				
			||||||
 | 
								if !found {
 | 
				
			||||||
 | 
									t.Logf("observed Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels)
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								t.Logf("Found Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels)
 | 
				
			||||||
 | 
								return found, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							t.Logf("Observed event: %+v", event.Object)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Logf("Service %s patched", testSvcName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("deleting the service")
 | 
				
			||||||
 | 
						err = client.CoreV1().Services(ns.Name).Delete(ctx, testSvcName, metav1.DeleteOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to delete the Service. %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log("watching for the Service to be deleted")
 | 
				
			||||||
 | 
						ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						_, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
							switch event.Type {
 | 
				
			||||||
 | 
							case watch.Deleted:
 | 
				
			||||||
 | 
								if svc, ok := event.Object.(*corev1.Service); ok {
 | 
				
			||||||
 | 
									found := svc.ObjectMeta.Name == service.ObjectMeta.Name &&
 | 
				
			||||||
 | 
										svc.ObjectMeta.Namespace == ns.Name &&
 | 
				
			||||||
 | 
										svc.Labels["test-service-static"] == "true"
 | 
				
			||||||
 | 
									if !found {
 | 
				
			||||||
 | 
										t.Logf("observed Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations)
 | 
				
			||||||
 | 
										return false, nil
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									t.Logf("Found Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations)
 | 
				
			||||||
 | 
									return found, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								t.Logf("Observed event: %+v", event.Type)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to delete Service %v in namespace %v", service.ObjectMeta.Name, ns)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Logf("Service %s deleted", testSvcName)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user