/* Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package endpointslice import ( "context" "fmt" "reflect" "strconv" "testing" "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" "k8s.io/kubernetes/pkg/features" utilpointer "k8s.io/utils/pointer" ) // Most of the tests related to EndpointSlice allocation can be found in reconciler_test.go // Tests here primarily focus on unique controller functionality before the reconciler begins var alwaysReady = func() bool { return true } type endpointSliceController struct { *Controller endpointSliceStore cache.Store nodeStore cache.Store podStore cache.Store serviceStore cache.Store } func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceController) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) nodeInformer := informerFactory.Core().V1().Nodes() indexer := nodeInformer.Informer().GetIndexer() for _, nodeName := range nodeNames { indexer.Add(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}) } esInformer := informerFactory.Discovery().V1().EndpointSlices() esIndexer := esInformer.Informer().GetIndexer() // These reactors are required to mock functionality that would be covered // automatically if we weren't using the fake client. client.PrependReactor("create", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice) if endpointSlice.ObjectMeta.GenerateName != "" { endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8)) endpointSlice.ObjectMeta.GenerateName = "" } endpointSlice.Generation = 1 esIndexer.Add(endpointSlice) return false, endpointSlice, nil })) client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice) endpointSlice.Generation++ esIndexer.Update(endpointSlice) return false, endpointSlice, nil })) esController := NewController( informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), nodeInformer, esInformer, int32(100), client, batchPeriod) esController.nodesSynced = alwaysReady esController.podsSynced = alwaysReady esController.servicesSynced = alwaysReady esController.endpointSlicesSynced = alwaysReady return client, &endpointSliceController{ esController, informerFactory.Discovery().V1().EndpointSlices().Informer().GetStore(), informerFactory.Core().V1().Nodes().Informer().GetStore(), informerFactory.Core().V1().Pods().Informer().GetStore(), informerFactory.Core().V1().Services().Informer().GetStore(), } } // Ensure SyncService for service with no selector results in no action func TestSyncServiceNoSelector(t *testing.T) { ns := metav1.NamespaceDefault serviceName := "testing-1" client, esController := newController([]string{"node-1"}, time.Duration(0)) esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns}, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}}, }, }) err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName)) assert.NoError(t, err) assert.Len(t, client.Actions(), 0) } // Ensure SyncService for service with pending deletion results in no action func TestSyncServicePendingDeletion(t *testing.T) { ns := metav1.NamespaceDefault serviceName := "testing-1" deletionTimestamp := metav1.Now() client, esController := newController([]string{"node-1"}, time.Duration(0)) esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns, DeletionTimestamp: &deletionTimestamp}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}}, }, }) err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName)) assert.NoError(t, err) assert.Len(t, client.Actions(), 0) } // Ensure SyncService for service with selector but no pods results in placeholder EndpointSlice func TestSyncServiceWithSelector(t *testing.T) { ns := metav1.NamespaceDefault serviceName := "testing-1" client, esController := newController([]string{"node-1"}, time.Duration(0)) standardSyncService(t, esController, ns, serviceName) expectActions(t, client.Actions(), 1, "create", "endpointslices") sliceList, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{}) assert.Nil(t, err, "Expected no error fetching endpoint slices") assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices") slice := sliceList.Items[0] assert.Regexp(t, "^"+serviceName, slice.Name) assert.Equal(t, serviceName, slice.Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slice.Ports) assert.EqualValues(t, []discovery.Endpoint{}, slice.Endpoints) assert.NotEmpty(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"]) } // Ensure SyncService gracefully handles a missing service. This test also // populates another existing service to ensure a clean up process doesn't // remove too much. func TestSyncServiceMissing(t *testing.T) { namespace := metav1.NamespaceDefault client, esController := newController([]string{"node-1"}, time.Duration(0)) // Build up existing service existingServiceName := "stillthere" existingServiceKey := endpointutil.ServiceKey{Name: existingServiceName, Namespace: namespace} esController.triggerTimeTracker.ServiceStates[existingServiceKey] = endpointutil.ServiceState{} esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: existingServiceName, Namespace: namespace}, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}}, Selector: map[string]string{"foo": "bar"}, }, }) // Add missing service to triggerTimeTracker to ensure the reference is cleaned up missingServiceName := "notthere" missingServiceKey := endpointutil.ServiceKey{Name: missingServiceName, Namespace: namespace} esController.triggerTimeTracker.ServiceStates[missingServiceKey] = endpointutil.ServiceState{} err := esController.syncService(fmt.Sprintf("%s/%s", namespace, missingServiceName)) // nil should be returned when the service doesn't exist assert.Nil(t, err, "Expected no error syncing service") // That should mean no client actions were performed assert.Len(t, client.Actions(), 0) // TriggerTimeTracker should have removed the reference to the missing service assert.NotContains(t, esController.triggerTimeTracker.ServiceStates, missingServiceKey) // TriggerTimeTracker should have left the reference to the missing service assert.Contains(t, esController.triggerTimeTracker.ServiceStates, existingServiceKey) } // Ensure SyncService correctly selects Pods. func TestSyncServicePodSelection(t *testing.T) { client, esController := newController([]string{"node-1"}, time.Duration(0)) ns := metav1.NamespaceDefault pod1 := newPod(1, ns, true, 0, false) esController.podStore.Add(pod1) // ensure this pod will not match the selector pod2 := newPod(2, ns, true, 0, false) pod2.Labels["foo"] = "boo" esController.podStore.Add(pod2) standardSyncService(t, esController, ns, "testing-1") expectActions(t, client.Actions(), 1, "create", "endpointslices") // an endpoint slice should be created, it should only reference pod1 (not pod2) slices, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{}) assert.Nil(t, err, "Expected no error fetching endpoint slices") assert.Len(t, slices.Items, 1, "Expected 1 endpoint slices") slice := slices.Items[0] assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoint in first slice") assert.NotEmpty(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime]) endpoint := slice.Endpoints[0] assert.EqualValues(t, endpoint.TargetRef, &v1.ObjectReference{Kind: "Pod", Namespace: ns, Name: pod1.Name}) } func TestSyncServiceEndpointSlicePendingDeletion(t *testing.T) { client, esController := newController([]string{"node-1"}, time.Duration(0)) ns := metav1.NamespaceDefault serviceName := "testing-1" service := createService(t, esController, ns, serviceName) err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName)) assert.Nil(t, err, "Expected no error syncing service") gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"} ownerRef := metav1.NewControllerRef(service, gvk) deletedTs := metav1.Now() endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: "epSlice-1", Namespace: ns, OwnerReferences: []metav1.OwnerReference{*ownerRef}, Labels: map[string]string{ discovery.LabelServiceName: serviceName, discovery.LabelManagedBy: controllerName, }, DeletionTimestamp: &deletedTs, }, AddressType: discovery.AddressTypeIPv4, } err = esController.endpointSliceStore.Add(endpointSlice) if err != nil { t.Fatalf("Expected no error adding EndpointSlice: %v", err) } _, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}) if err != nil { t.Fatalf("Expected no error creating EndpointSlice: %v", err) } numActionsBefore := len(client.Actions()) err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName)) assert.Nil(t, err, "Expected no error syncing service") // The EndpointSlice marked for deletion should be ignored by the controller, and thus // should not result in any action. if len(client.Actions()) != numActionsBefore { t.Errorf("Expected 0 more actions, got %d", len(client.Actions())-numActionsBefore) } } // Ensure SyncService correctly selects and labels EndpointSlices. func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) { client, esController := newController([]string{"node-1"}, time.Duration(0)) ns := metav1.NamespaceDefault serviceName := "testing-1" service := createService(t, esController, ns, serviceName) gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"} ownerRef := metav1.NewControllerRef(service, gvk) // 5 slices, 3 with matching labels for our service endpointSlices := []*discovery.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ Name: "matching-1", Namespace: ns, OwnerReferences: []metav1.OwnerReference{*ownerRef}, Labels: map[string]string{ discovery.LabelServiceName: serviceName, discovery.LabelManagedBy: controllerName, }, }, AddressType: discovery.AddressTypeIPv4, }, { ObjectMeta: metav1.ObjectMeta{ Name: "matching-2", Namespace: ns, OwnerReferences: []metav1.OwnerReference{*ownerRef}, Labels: map[string]string{ discovery.LabelServiceName: serviceName, discovery.LabelManagedBy: controllerName, }, }, AddressType: discovery.AddressTypeIPv4, }, { ObjectMeta: metav1.ObjectMeta{ Name: "partially-matching-1", Namespace: ns, Labels: map[string]string{ discovery.LabelServiceName: serviceName, }, }, AddressType: discovery.AddressTypeIPv4, }, { ObjectMeta: metav1.ObjectMeta{ Name: "not-matching-1", Namespace: ns, Labels: map[string]string{ discovery.LabelServiceName: "something-else", discovery.LabelManagedBy: controllerName, }, }, AddressType: discovery.AddressTypeIPv4, }, { ObjectMeta: metav1.ObjectMeta{ Name: "not-matching-2", Namespace: ns, Labels: map[string]string{ discovery.LabelServiceName: serviceName, discovery.LabelManagedBy: "something-else", }, }, AddressType: discovery.AddressTypeIPv4, }} cmc := newCacheMutationCheck(endpointSlices) // need to add them to both store and fake clientset for _, endpointSlice := range endpointSlices { err := esController.endpointSliceStore.Add(endpointSlice) if err != nil { t.Fatalf("Expected no error adding EndpointSlice: %v", err) } _, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}) if err != nil { t.Fatalf("Expected no error creating EndpointSlice: %v", err) } } numActionsBefore := len(client.Actions()) err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName)) assert.Nil(t, err, "Expected no error syncing service") if len(client.Actions()) != numActionsBefore+2 { t.Errorf("Expected 2 more actions, got %d", len(client.Actions())-numActionsBefore) } // only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices") expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices") // ensure cache mutation has not occurred cmc.Check(t) } func TestOnEndpointSliceUpdate(t *testing.T) { _, esController := newController([]string{"node-1"}, time.Duration(0)) ns := metav1.NamespaceDefault serviceName := "testing-1" epSlice1 := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: "matching-1", Namespace: ns, Labels: map[string]string{ discovery.LabelServiceName: serviceName, discovery.LabelManagedBy: controllerName, }, }, AddressType: discovery.AddressTypeIPv4, } epSlice2 := epSlice1.DeepCopy() epSlice2.Labels[discovery.LabelManagedBy] = "something else" assert.Equal(t, 0, esController.queue.Len()) esController.onEndpointSliceUpdate(epSlice1, epSlice2) err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) { if esController.queue.Len() > 0 { return true, nil } return false, nil }) if err != nil { t.Fatalf("unexpected error waiting for add to queue") } assert.Equal(t, 1, esController.queue.Len()) } func TestSyncService(t *testing.T) { creationTimestamp := metav1.Now() deletionTimestamp := metav1.Now() testcases := []struct { name string service *v1.Service pods []*v1.Pod expectedEndpointPorts []discovery.EndpointPort expectedEndpoints []discovery.Endpoint terminatingGateEnabled bool }{ { name: "pods with multiple IPs and Service with ipFamilies=ipv4", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", Namespace: "default", CreationTimestamp: creationTimestamp, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ {Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}, {Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP}, {Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP}, }, Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, }, }, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod0", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.1", PodIPs: []v1.PodIP{{ IP: "10.0.0.1", }}, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod1", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.2", PodIPs: []v1.PodIP{ { IP: "10.0.0.2", }, { IP: "fd08::5678:0000:0000:9abc:def0", }, }, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, }, expectedEndpointPorts: []discovery.EndpointPort{ { Name: utilpointer.StringPtr("sctp-example"), Protocol: protoPtr(v1.ProtocolSCTP), Port: utilpointer.Int32Ptr(int32(3456)), }, { Name: utilpointer.StringPtr("udp-example"), Protocol: protoPtr(v1.ProtocolUDP), Port: utilpointer.Int32Ptr(int32(161)), }, { Name: utilpointer.StringPtr("tcp-example"), Protocol: protoPtr(v1.ProtocolTCP), Port: utilpointer.Int32Ptr(int32(80)), }, }, expectedEndpoints: []discovery.Endpoint{ { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(true), }, Addresses: []string{"10.0.0.1"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, NodeName: utilpointer.StringPtr("node-1"), }, { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(true), }, Addresses: []string{"10.0.0.2"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"}, NodeName: utilpointer.StringPtr("node-1"), }, }, }, { name: "pods with multiple IPs and Service with ipFamilies=ipv6", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", Namespace: "default", CreationTimestamp: creationTimestamp, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ {Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}, {Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP}, {Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP}, }, Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv6Protocol}, }, }, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod0", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.1", PodIPs: []v1.PodIP{{ IP: "10.0.0.1", }}, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod1", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.2", PodIPs: []v1.PodIP{ { IP: "10.0.0.2", }, { IP: "fd08::5678:0000:0000:9abc:def0", }, }, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, }, expectedEndpointPorts: []discovery.EndpointPort{ { Name: utilpointer.StringPtr("sctp-example"), Protocol: protoPtr(v1.ProtocolSCTP), Port: utilpointer.Int32Ptr(int32(3456)), }, { Name: utilpointer.StringPtr("udp-example"), Protocol: protoPtr(v1.ProtocolUDP), Port: utilpointer.Int32Ptr(int32(161)), }, { Name: utilpointer.StringPtr("tcp-example"), Protocol: protoPtr(v1.ProtocolTCP), Port: utilpointer.Int32Ptr(int32(80)), }, }, expectedEndpoints: []discovery.Endpoint{ { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(true), }, Addresses: []string{"fd08::5678:0000:0000:9abc:def0"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"}, NodeName: utilpointer.StringPtr("node-1"), }, }, }, { name: "Terminating pods with EndpointSliceTerminatingCondition enabled", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", Namespace: "default", CreationTimestamp: creationTimestamp, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ {Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}, {Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP}, {Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP}, }, Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, }, }, pods: []*v1.Pod{ { // one ready pod for comparison ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod0", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.1", PodIPs: []v1.PodIP{{ IP: "10.0.0.1", }}, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod1", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: &deletionTimestamp, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.2", PodIPs: []v1.PodIP{ { IP: "10.0.0.2", }, }, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, }, expectedEndpointPorts: []discovery.EndpointPort{ { Name: utilpointer.StringPtr("sctp-example"), Protocol: protoPtr(v1.ProtocolSCTP), Port: utilpointer.Int32Ptr(int32(3456)), }, { Name: utilpointer.StringPtr("udp-example"), Protocol: protoPtr(v1.ProtocolUDP), Port: utilpointer.Int32Ptr(int32(161)), }, { Name: utilpointer.StringPtr("tcp-example"), Protocol: protoPtr(v1.ProtocolTCP), Port: utilpointer.Int32Ptr(int32(80)), }, }, expectedEndpoints: []discovery.Endpoint{ { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(true), Serving: utilpointer.BoolPtr(true), Terminating: utilpointer.BoolPtr(false), }, Addresses: []string{"10.0.0.1"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, NodeName: utilpointer.StringPtr("node-1"), }, { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(false), Serving: utilpointer.BoolPtr(true), Terminating: utilpointer.BoolPtr(true), }, Addresses: []string{"10.0.0.2"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"}, NodeName: utilpointer.StringPtr("node-1"), }, }, terminatingGateEnabled: true, }, { name: "Terminating pods with EndpointSliceTerminatingCondition disabled", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", Namespace: "default", CreationTimestamp: creationTimestamp, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ {Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}, {Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP}, {Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP}, }, Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, }, }, pods: []*v1.Pod{ { // one ready pod for comparison ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod0", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.1", PodIPs: []v1.PodIP{{ IP: "10.0.0.1", }}, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod1", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: &deletionTimestamp, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.2", PodIPs: []v1.PodIP{ { IP: "10.0.0.2", }, }, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, }, expectedEndpointPorts: []discovery.EndpointPort{ { Name: utilpointer.StringPtr("sctp-example"), Protocol: protoPtr(v1.ProtocolSCTP), Port: utilpointer.Int32Ptr(int32(3456)), }, { Name: utilpointer.StringPtr("udp-example"), Protocol: protoPtr(v1.ProtocolUDP), Port: utilpointer.Int32Ptr(int32(161)), }, { Name: utilpointer.StringPtr("tcp-example"), Protocol: protoPtr(v1.ProtocolTCP), Port: utilpointer.Int32Ptr(int32(80)), }, }, expectedEndpoints: []discovery.Endpoint{ { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(true), }, Addresses: []string{"10.0.0.1"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, NodeName: utilpointer.StringPtr("node-1"), }, }, terminatingGateEnabled: false, }, { name: "Not ready terminating pods with EndpointSliceTerminatingCondition enabled", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", Namespace: "default", CreationTimestamp: creationTimestamp, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ {Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}, {Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP}, {Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP}, }, Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, }, }, pods: []*v1.Pod{ { // one ready pod for comparison ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod0", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.1", PodIPs: []v1.PodIP{{ IP: "10.0.0.1", }}, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod1", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: &deletionTimestamp, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.2", PodIPs: []v1.PodIP{ { IP: "10.0.0.2", }, }, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionFalse, }, }, }, }, }, expectedEndpointPorts: []discovery.EndpointPort{ { Name: utilpointer.StringPtr("sctp-example"), Protocol: protoPtr(v1.ProtocolSCTP), Port: utilpointer.Int32Ptr(int32(3456)), }, { Name: utilpointer.StringPtr("udp-example"), Protocol: protoPtr(v1.ProtocolUDP), Port: utilpointer.Int32Ptr(int32(161)), }, { Name: utilpointer.StringPtr("tcp-example"), Protocol: protoPtr(v1.ProtocolTCP), Port: utilpointer.Int32Ptr(int32(80)), }, }, expectedEndpoints: []discovery.Endpoint{ { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(true), Serving: utilpointer.BoolPtr(true), Terminating: utilpointer.BoolPtr(false), }, Addresses: []string{"10.0.0.1"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, NodeName: utilpointer.StringPtr("node-1"), }, { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(false), Serving: utilpointer.BoolPtr(false), Terminating: utilpointer.BoolPtr(true), }, Addresses: []string{"10.0.0.2"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"}, NodeName: utilpointer.StringPtr("node-1"), }, }, terminatingGateEnabled: true, }, { name: "Not ready terminating pods with EndpointSliceTerminatingCondition disabled", service: &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", Namespace: "default", CreationTimestamp: creationTimestamp, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ {Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}, {Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP}, {Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP}, }, Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, }, }, pods: []*v1.Pod{ { // one ready pod for comparison ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod0", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: nil, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.1", PodIPs: []v1.PodIP{{ IP: "10.0.0.1", }}, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, }, }, }, }, { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "pod1", Labels: map[string]string{"foo": "bar"}, DeletionTimestamp: &deletionTimestamp, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "container-1", }}, NodeName: "node-1", }, Status: v1.PodStatus{ PodIP: "10.0.0.2", PodIPs: []v1.PodIP{ { IP: "10.0.0.2", }, }, Conditions: []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionFalse, }, }, }, }, }, expectedEndpointPorts: []discovery.EndpointPort{ { Name: utilpointer.StringPtr("sctp-example"), Protocol: protoPtr(v1.ProtocolSCTP), Port: utilpointer.Int32Ptr(int32(3456)), }, { Name: utilpointer.StringPtr("udp-example"), Protocol: protoPtr(v1.ProtocolUDP), Port: utilpointer.Int32Ptr(int32(161)), }, { Name: utilpointer.StringPtr("tcp-example"), Protocol: protoPtr(v1.ProtocolTCP), Port: utilpointer.Int32Ptr(int32(80)), }, }, expectedEndpoints: []discovery.Endpoint{ { Conditions: discovery.EndpointConditions{ Ready: utilpointer.BoolPtr(true), }, Addresses: []string{"10.0.0.1"}, TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, NodeName: utilpointer.StringPtr("node-1"), }, }, terminatingGateEnabled: false, }, } for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testcase.terminatingGateEnabled)() client, esController := newController([]string{"node-1"}, time.Duration(0)) for _, pod := range testcase.pods { esController.podStore.Add(pod) } esController.serviceStore.Add(testcase.service) _, err := esController.client.CoreV1().Services(testcase.service.Namespace).Create(context.TODO(), testcase.service, metav1.CreateOptions{}) assert.Nil(t, err, "Expected no error creating service") err = esController.syncService(fmt.Sprintf("%s/%s", testcase.service.Namespace, testcase.service.Name)) assert.Nil(t, err) // last action should be to create endpoint slice expectActions(t, client.Actions(), 1, "create", "endpointslices") sliceList, err := client.DiscoveryV1().EndpointSlices(testcase.service.Namespace).List(context.TODO(), metav1.ListOptions{}) assert.Nil(t, err, "Expected no error fetching endpoint slices") assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices") // ensure all attributes of endpoint slice match expected state slice := sliceList.Items[0] assert.Equal(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime], creationTimestamp.UTC().Format(time.RFC3339Nano)) assert.ElementsMatch(t, testcase.expectedEndpointPorts, slice.Ports) assert.ElementsMatch(t, testcase.expectedEndpoints, slice.Endpoints) }) } } // TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. // TODO(mborsz): Migrate this test to mock clock when possible. func TestPodAddsBatching(t *testing.T) { t.Parallel() type podAdd struct { delay time.Duration } tests := []struct { name string batchPeriod time.Duration adds []podAdd finalDelay time.Duration wantRequestCount int }{ { name: "three adds with no batching", batchPeriod: 0 * time.Second, adds: []podAdd{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, }, finalDelay: 3 * time.Second, wantRequestCount: 3, }, { name: "three adds in one batch", batchPeriod: 1 * time.Second, adds: []podAdd{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, }, finalDelay: 3 * time.Second, wantRequestCount: 1, }, { name: "three adds in two batches", batchPeriod: 1 * time.Second, adds: []podAdd{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, }, { delay: 100 * time.Millisecond, }, { delay: 1 * time.Second, }, }, finalDelay: 3 * time.Second, wantRequestCount: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ns := metav1.NamespaceDefault client, esController := newController([]string{"node-1"}, tc.batchPeriod) stopCh := make(chan struct{}) defer close(stopCh) go esController.Run(1, stopCh) esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, Ports: []v1.ServicePort{{Port: 80}}, }, }) for i, add := range tc.adds { time.Sleep(add.delay) p := newPod(i, ns, true, 0, false) esController.podStore.Add(p) esController.addPod(p) } time.Sleep(tc.finalDelay) assert.Len(t, client.Actions(), tc.wantRequestCount) // In case of error, make debugging easier. for _, action := range client.Actions() { t.Logf("action: %v %v", action.GetVerb(), action.GetResource()) } }) } } // TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. // TODO(mborsz): Migrate this test to mock clock when possible. func TestPodUpdatesBatching(t *testing.T) { t.Parallel() resourceVersion := 1 type podUpdate struct { delay time.Duration podName string podIP string } tests := []struct { name string batchPeriod time.Duration podsCount int updates []podUpdate finalDelay time.Duration wantRequestCount int }{ { name: "three updates with no batching", batchPeriod: 0 * time.Second, podsCount: 10, updates: []podUpdate{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", podIP: "10.0.0.0", }, { delay: 100 * time.Millisecond, podName: "pod1", podIP: "10.0.0.1", }, { delay: 100 * time.Millisecond, podName: "pod2", podIP: "10.0.0.2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 3, }, { name: "three updates in one batch", batchPeriod: 1 * time.Second, podsCount: 10, updates: []podUpdate{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", podIP: "10.0.0.0", }, { delay: 100 * time.Millisecond, podName: "pod1", podIP: "10.0.0.1", }, { delay: 100 * time.Millisecond, podName: "pod2", podIP: "10.0.0.2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 1, }, { name: "three updates in two batches", batchPeriod: 1 * time.Second, podsCount: 10, updates: []podUpdate{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", podIP: "10.0.0.0", }, { delay: 100 * time.Millisecond, podName: "pod1", podIP: "10.0.0.1", }, { delay: 1 * time.Second, podName: "pod2", podIP: "10.0.0.2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ns := metav1.NamespaceDefault client, esController := newController([]string{"node-1"}, tc.batchPeriod) stopCh := make(chan struct{}) defer close(stopCh) go esController.Run(1, stopCh) addPods(t, esController, ns, tc.podsCount) esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, Ports: []v1.ServicePort{{Port: 80}}, }, }) for _, update := range tc.updates { time.Sleep(update.delay) old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName)) if err != nil { t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err) } if !exists { t.Fatalf("Pod %q doesn't exist", update.podName) } oldPod := old.(*v1.Pod) newPod := oldPod.DeepCopy() newPod.Status.PodIPs[0].IP = update.podIP newPod.ResourceVersion = strconv.Itoa(resourceVersion) resourceVersion++ esController.podStore.Update(newPod) esController.updatePod(oldPod, newPod) } time.Sleep(tc.finalDelay) assert.Len(t, client.Actions(), tc.wantRequestCount) // In case of error, make debugging easier. for _, action := range client.Actions() { t.Logf("action: %v %v", action.GetVerb(), action.GetResource()) } }) } } // TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now. // TODO(mborsz): Migrate this test to mock clock when possible. func TestPodDeleteBatching(t *testing.T) { t.Parallel() type podDelete struct { delay time.Duration podName string } tests := []struct { name string batchPeriod time.Duration podsCount int deletes []podDelete finalDelay time.Duration wantRequestCount int }{ { name: "three deletes with no batching", batchPeriod: 0 * time.Second, podsCount: 10, deletes: []podDelete{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", }, { delay: 100 * time.Millisecond, podName: "pod1", }, { delay: 100 * time.Millisecond, podName: "pod2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 3, }, { name: "three deletes in one batch", batchPeriod: 1 * time.Second, podsCount: 10, deletes: []podDelete{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", }, { delay: 100 * time.Millisecond, podName: "pod1", }, { delay: 100 * time.Millisecond, podName: "pod2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 1, }, { name: "three deletes in two batches", batchPeriod: 1 * time.Second, podsCount: 10, deletes: []podDelete{ { // endpoints.Run needs ~100 ms to start processing updates. delay: 200 * time.Millisecond, podName: "pod0", }, { delay: 100 * time.Millisecond, podName: "pod1", }, { delay: 1 * time.Second, podName: "pod2", }, }, finalDelay: 3 * time.Second, wantRequestCount: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ns := metav1.NamespaceDefault client, esController := newController([]string{"node-1"}, tc.batchPeriod) stopCh := make(chan struct{}) defer close(stopCh) go esController.Run(1, stopCh) addPods(t, esController, ns, tc.podsCount) esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, Ports: []v1.ServicePort{{Port: 80}}, }, }) for _, update := range tc.deletes { time.Sleep(update.delay) old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName)) assert.Nil(t, err, "error while retrieving old value of %q: %v", update.podName, err) assert.Equal(t, true, exists, "pod should exist") esController.podStore.Delete(old) esController.deletePod(old) } time.Sleep(tc.finalDelay) assert.Len(t, client.Actions(), tc.wantRequestCount) // In case of error, make debugging easier. for _, action := range client.Actions() { t.Logf("action: %v %v", action.GetVerb(), action.GetResource()) } }) } } func TestSyncServiceStaleInformer(t *testing.T) { testcases := []struct { name string informerGenerationNumber int64 trackerGenerationNumber int64 expectError bool }{ { name: "informer cache outdated", informerGenerationNumber: 10, trackerGenerationNumber: 12, expectError: true, }, { name: "cache and tracker synced", informerGenerationNumber: 10, trackerGenerationNumber: 10, expectError: false, }, { name: "tracker outdated", informerGenerationNumber: 10, trackerGenerationNumber: 1, expectError: false, }, } for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { _, esController := newController([]string{"node-1"}, time.Duration(0)) ns := metav1.NamespaceDefault serviceName := "testing-1" // Store Service in the cache esController.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}}, }, }) // Create EndpointSlice in the informer cache with informerGenerationNumber epSlice1 := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: "matching-1", Namespace: ns, Generation: testcase.informerGenerationNumber, Labels: map[string]string{ discovery.LabelServiceName: serviceName, discovery.LabelManagedBy: controllerName, }, }, AddressType: discovery.AddressTypeIPv4, } err := esController.endpointSliceStore.Add(epSlice1) if err != nil { t.Fatalf("Expected no error adding EndpointSlice: %v", err) } // Create EndpointSlice in the tracker with trackerGenerationNumber epSlice2 := epSlice1.DeepCopy() epSlice2.Generation = testcase.trackerGenerationNumber esController.endpointSliceTracker.Update(epSlice2) err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName)) // Check if we got a StaleInformerCache error if endpointsliceutil.IsStaleInformerCacheErr(err) != testcase.expectError { t.Fatalf("Expected error because informer cache is outdated") } }) } } func Test_checkNodeTopologyDistribution(t *testing.T) { zoneA := "zone-a" zoneB := "zone-b" zoneC := "zone-c" readyTrue := true readyFalse := false cpu100 := resource.MustParse("100m") cpu1000 := resource.MustParse("1000m") cpu2000 := resource.MustParse("2000m") type nodeInfo struct { zoneLabel *string ready *bool cpu *resource.Quantity } testCases := []struct { name string nodes []nodeInfo topologyCacheEnabled bool endpointZoneInfo map[string]topologycache.EndpointZoneInfo expectedQueueLen int }{{ name: "empty", nodes: []nodeInfo{}, topologyCacheEnabled: false, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{}, expectedQueueLen: 0, }, { name: "lopsided, queue required", nodes: []nodeInfo{ {zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu100}, {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3}, }, expectedQueueLen: 1, }, { name: "lopsided but 1 unready, queue required because unready node means 0 CPU in one zone", nodes: []nodeInfo{ {zoneLabel: &zoneA, ready: &readyFalse, cpu: &cpu100}, {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3}, }, expectedQueueLen: 1, }, { name: "even zones, uneven endpoint distribution but within threshold, no sync required", nodes: []nodeInfo{ {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneB: 5, zoneC: 4}, }, expectedQueueLen: 0, }, { name: "even zones but node missing zone, sync required", nodes: []nodeInfo{ {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, {ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneB: 5, zoneC: 4}, }, expectedQueueLen: 1, }, { name: "even zones but node missing cpu, sync required", nodes: []nodeInfo{ {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneB, ready: &readyTrue}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneB: 5, zoneC: 4}, }, expectedQueueLen: 1, }, { name: "even zones, uneven endpoint distribution beyond threshold, no sync required", nodes: []nodeInfo{ {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneB: 6, zoneC: 4}, }, expectedQueueLen: 1, }, { name: "3 uneven zones, matching endpoint distribution, no sync required", nodes: []nodeInfo{ {zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 1}, }, expectedQueueLen: 0, }, { name: "3 uneven zones, endpoint distribution within threshold but below 1, sync required", nodes: []nodeInfo{ {zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000}, {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100}, }, topologyCacheEnabled: true, endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ "ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 0}, }, expectedQueueLen: 1, }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { _, esController := newController([]string{}, time.Duration(0)) for i, nodeInfo := range tc.nodes { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("node-%d", i)}, Status: v1.NodeStatus{}, } if nodeInfo.zoneLabel != nil { node.Labels = map[string]string{v1.LabelTopologyZone: *nodeInfo.zoneLabel} } if nodeInfo.ready != nil { status := v1.ConditionFalse if *nodeInfo.ready { status = v1.ConditionTrue } node.Status.Conditions = []v1.NodeCondition{{ Type: v1.NodeReady, Status: status, }} } if nodeInfo.cpu != nil { node.Status.Allocatable = v1.ResourceList{ v1.ResourceCPU: *nodeInfo.cpu, } } esController.nodeStore.Add(node) if tc.topologyCacheEnabled { esController.topologyCache = topologycache.NewTopologyCache() for serviceKey, endpointZoneInfo := range tc.endpointZoneInfo { esController.topologyCache.SetHints(serviceKey, discovery.AddressTypeIPv4, endpointZoneInfo) } } } esController.checkNodeTopologyDistribution() if esController.queue.Len() != tc.expectedQueueLen { t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len()) } }) } } // Test helpers func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) { t.Helper() for i := 0; i < podsCount; i++ { pod := newPod(i, namespace, true, 0, false) esController.podStore.Add(pod) } } func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) { t.Helper() createService(t, esController, namespace, serviceName) err := esController.syncService(fmt.Sprintf("%s/%s", namespace, serviceName)) assert.Nil(t, err, "Expected no error syncing service") } func createService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) *v1.Service { t.Helper() service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: namespace, CreationTimestamp: metav1.NewTime(time.Now()), UID: types.UID(namespace + "-" + serviceName), }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}}, Selector: map[string]string{"foo": "bar"}, IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, }, } esController.serviceStore.Add(service) _, err := esController.client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) assert.Nil(t, err, "Expected no error creating service") return service } func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, resource string) { t.Helper() if len(actions) <= index { t.Fatalf("Expected at least %d actions, got %d", index+1, len(actions)) } action := actions[index] if action.GetVerb() != verb { t.Errorf("Expected action %d verb to be %s, got %s", index, verb, action.GetVerb()) } if action.GetResource().Resource != resource { t.Errorf("Expected action %d resource to be %s, got %s", index, resource, action.GetResource().Resource) } } // protoPtr takes a Protocol and returns a pointer to it. func protoPtr(proto v1.Protocol) *v1.Protocol { return &proto } // cacheMutationCheck helps ensure that cached objects have not been changed // in any way throughout a test run. type cacheMutationCheck struct { objects []cacheObject } // cacheObject stores a reference to an original object as well as a deep copy // of that object to track any mutations in the original object. type cacheObject struct { original runtime.Object deepCopy runtime.Object } // newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices. func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck { cmc := cacheMutationCheck{} for _, endpointSlice := range endpointSlices { cmc.Add(endpointSlice) } return cmc } // Add appends a runtime.Object and a deep copy of that object into the // cacheMutationCheck. func (cmc *cacheMutationCheck) Add(o runtime.Object) { cmc.objects = append(cmc.objects, cacheObject{ original: o, deepCopy: o.DeepCopyObject(), }) } // Check verifies that no objects in the cacheMutationCheck have been mutated. func (cmc *cacheMutationCheck) Check(t *testing.T) { for _, o := range cmc.objects { if !reflect.DeepEqual(o.original, o.deepCopy) { // Cached objects can't be safely mutated and instead should be deep // copied before changed in any way. t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original) } } } func Test_dropEndpointSlicesPendingDeletion(t *testing.T) { now := metav1.Now() endpointSlices := []*discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ Name: "epSlice1", DeletionTimestamp: &now, }, }, { ObjectMeta: metav1.ObjectMeta{ Name: "epSlice2", }, AddressType: discovery.AddressTypeIPv4, Endpoints: []discovery.Endpoint{ { Addresses: []string{"172.18.0.2"}, }, }, }, { ObjectMeta: metav1.ObjectMeta{ Name: "epSlice3", }, AddressType: discovery.AddressTypeIPv6, Endpoints: []discovery.Endpoint{ { Addresses: []string{"3001:0da8:75a3:0000:0000:8a2e:0370:7334"}, }, }, }, } epSlice2 := endpointSlices[1] epSlice3 := endpointSlices[2] result := dropEndpointSlicesPendingDeletion(endpointSlices) assert.Len(t, result, 2) for _, epSlice := range result { if epSlice.Name == "epSlice1" { t.Errorf("Expected EndpointSlice marked for deletion to be dropped.") } } // We don't use endpointSlices and instead check manually for equality, because // `dropEndpointSlicesPendingDeletion` mutates the slice it receives, so it's easy // to break this test later. This way, we can be absolutely sure that the result // has exactly what we expect it to. if !reflect.DeepEqual(epSlice2, result[0]) { t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice2, result[0]) } if !reflect.DeepEqual(epSlice3, result[1]) { t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice3, result[1]) } }