1703 lines
52 KiB
Go
1703 lines
52 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package endpointslice
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
v1 "k8s.io/api/core/v1"
|
|
discovery "k8s.io/api/discovery/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes/fake"
|
|
k8stesting "k8s.io/client-go/testing"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/kubernetes/pkg/controller"
|
|
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
|
|
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
|
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
|
|
utilpointer "k8s.io/utils/pointer"
|
|
)
|
|
|
|
// Most of the tests related to EndpointSlice allocation can be found in reconciler_test.go
|
|
// Tests here primarily focus on unique controller functionality before the reconciler begins
|
|
|
|
var alwaysReady = func() bool { return true }
|
|
|
|
type endpointSliceController struct {
|
|
*Controller
|
|
endpointSliceStore cache.Store
|
|
nodeStore cache.Store
|
|
podStore cache.Store
|
|
serviceStore cache.Store
|
|
}
|
|
|
|
func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceController) {
|
|
client := fake.NewSimpleClientset()
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
|
nodeInformer := informerFactory.Core().V1().Nodes()
|
|
indexer := nodeInformer.Informer().GetIndexer()
|
|
for _, nodeName := range nodeNames {
|
|
indexer.Add(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
|
|
}
|
|
|
|
esInformer := informerFactory.Discovery().V1().EndpointSlices()
|
|
esIndexer := esInformer.Informer().GetIndexer()
|
|
|
|
// These reactors are required to mock functionality that would be covered
|
|
// automatically if we weren't using the fake client.
|
|
client.PrependReactor("create", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
|
|
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
|
|
|
|
if endpointSlice.ObjectMeta.GenerateName != "" {
|
|
endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8))
|
|
endpointSlice.ObjectMeta.GenerateName = ""
|
|
}
|
|
endpointSlice.Generation = 1
|
|
esIndexer.Add(endpointSlice)
|
|
|
|
return false, endpointSlice, nil
|
|
}))
|
|
client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
|
|
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
|
|
endpointSlice.Generation++
|
|
esIndexer.Update(endpointSlice)
|
|
|
|
return false, endpointSlice, nil
|
|
}))
|
|
|
|
esController := NewController(
|
|
informerFactory.Core().V1().Pods(),
|
|
informerFactory.Core().V1().Services(),
|
|
nodeInformer,
|
|
esInformer,
|
|
int32(100),
|
|
client,
|
|
batchPeriod)
|
|
|
|
esController.nodesSynced = alwaysReady
|
|
esController.podsSynced = alwaysReady
|
|
esController.servicesSynced = alwaysReady
|
|
esController.endpointSlicesSynced = alwaysReady
|
|
|
|
return client, &endpointSliceController{
|
|
esController,
|
|
informerFactory.Discovery().V1().EndpointSlices().Informer().GetStore(),
|
|
informerFactory.Core().V1().Nodes().Informer().GetStore(),
|
|
informerFactory.Core().V1().Pods().Informer().GetStore(),
|
|
informerFactory.Core().V1().Services().Informer().GetStore(),
|
|
}
|
|
}
|
|
|
|
// Ensure SyncService for service with no selector results in no action
|
|
func TestSyncServiceNoSelector(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
serviceName := "testing-1"
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
esController.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
|
|
},
|
|
})
|
|
|
|
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
|
|
assert.NoError(t, err)
|
|
assert.Len(t, client.Actions(), 0)
|
|
}
|
|
|
|
// Ensure SyncService for service with pending deletion results in no action
|
|
func TestSyncServicePendingDeletion(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
serviceName := "testing-1"
|
|
deletionTimestamp := metav1.Now()
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
esController.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns, DeletionTimestamp: &deletionTimestamp},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
|
|
},
|
|
})
|
|
|
|
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
|
|
assert.NoError(t, err)
|
|
assert.Len(t, client.Actions(), 0)
|
|
}
|
|
|
|
// Ensure SyncService for service with selector but no pods results in placeholder EndpointSlice
|
|
func TestSyncServiceWithSelector(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
serviceName := "testing-1"
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
standardSyncService(t, esController, ns, serviceName)
|
|
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
|
|
|
sliceList, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
|
|
assert.Nil(t, err, "Expected no error fetching endpoint slices")
|
|
assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")
|
|
slice := sliceList.Items[0]
|
|
assert.Regexp(t, "^"+serviceName, slice.Name)
|
|
assert.Equal(t, serviceName, slice.Labels[discovery.LabelServiceName])
|
|
assert.EqualValues(t, []discovery.EndpointPort{}, slice.Ports)
|
|
assert.EqualValues(t, []discovery.Endpoint{}, slice.Endpoints)
|
|
assert.NotEmpty(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"])
|
|
}
|
|
|
|
// Ensure SyncService gracefully handles a missing service. This test also
|
|
// populates another existing service to ensure a clean up process doesn't
|
|
// remove too much.
|
|
func TestSyncServiceMissing(t *testing.T) {
|
|
namespace := metav1.NamespaceDefault
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
|
|
// Build up existing service
|
|
existingServiceName := "stillthere"
|
|
existingServiceKey := endpointutil.ServiceKey{Name: existingServiceName, Namespace: namespace}
|
|
esController.triggerTimeTracker.ServiceStates[existingServiceKey] = endpointutil.ServiceState{}
|
|
esController.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: existingServiceName, Namespace: namespace},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
|
|
Selector: map[string]string{"foo": "bar"},
|
|
},
|
|
})
|
|
|
|
// Add missing service to triggerTimeTracker to ensure the reference is cleaned up
|
|
missingServiceName := "notthere"
|
|
missingServiceKey := endpointutil.ServiceKey{Name: missingServiceName, Namespace: namespace}
|
|
esController.triggerTimeTracker.ServiceStates[missingServiceKey] = endpointutil.ServiceState{}
|
|
|
|
err := esController.syncService(fmt.Sprintf("%s/%s", namespace, missingServiceName))
|
|
|
|
// nil should be returned when the service doesn't exist
|
|
assert.Nil(t, err, "Expected no error syncing service")
|
|
|
|
// That should mean no client actions were performed
|
|
assert.Len(t, client.Actions(), 0)
|
|
|
|
// TriggerTimeTracker should have removed the reference to the missing service
|
|
assert.NotContains(t, esController.triggerTimeTracker.ServiceStates, missingServiceKey)
|
|
|
|
// TriggerTimeTracker should have left the reference to the missing service
|
|
assert.Contains(t, esController.triggerTimeTracker.ServiceStates, existingServiceKey)
|
|
}
|
|
|
|
// Ensure SyncService correctly selects Pods.
|
|
func TestSyncServicePodSelection(t *testing.T) {
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
ns := metav1.NamespaceDefault
|
|
|
|
pod1 := newPod(1, ns, true, 0, false)
|
|
esController.podStore.Add(pod1)
|
|
|
|
// ensure this pod will not match the selector
|
|
pod2 := newPod(2, ns, true, 0, false)
|
|
pod2.Labels["foo"] = "boo"
|
|
esController.podStore.Add(pod2)
|
|
|
|
standardSyncService(t, esController, ns, "testing-1")
|
|
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
|
|
|
// an endpoint slice should be created, it should only reference pod1 (not pod2)
|
|
slices, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
|
|
assert.Nil(t, err, "Expected no error fetching endpoint slices")
|
|
assert.Len(t, slices.Items, 1, "Expected 1 endpoint slices")
|
|
slice := slices.Items[0]
|
|
assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoint in first slice")
|
|
assert.NotEmpty(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime])
|
|
endpoint := slice.Endpoints[0]
|
|
assert.EqualValues(t, endpoint.TargetRef, &v1.ObjectReference{Kind: "Pod", Namespace: ns, Name: pod1.Name})
|
|
}
|
|
|
|
func TestSyncServiceEndpointSlicePendingDeletion(t *testing.T) {
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
ns := metav1.NamespaceDefault
|
|
serviceName := "testing-1"
|
|
service := createService(t, esController, ns, serviceName)
|
|
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
|
|
assert.Nil(t, err, "Expected no error syncing service")
|
|
|
|
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
|
|
ownerRef := metav1.NewControllerRef(service, gvk)
|
|
|
|
deletedTs := metav1.Now()
|
|
endpointSlice := &discovery.EndpointSlice{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "epSlice-1",
|
|
Namespace: ns,
|
|
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: serviceName,
|
|
discovery.LabelManagedBy: controllerName,
|
|
},
|
|
DeletionTimestamp: &deletedTs,
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}
|
|
err = esController.endpointSliceStore.Add(endpointSlice)
|
|
if err != nil {
|
|
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
|
|
}
|
|
_, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Fatalf("Expected no error creating EndpointSlice: %v", err)
|
|
}
|
|
|
|
numActionsBefore := len(client.Actions())
|
|
err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
|
|
assert.Nil(t, err, "Expected no error syncing service")
|
|
|
|
// The EndpointSlice marked for deletion should be ignored by the controller, and thus
|
|
// should not result in any action.
|
|
if len(client.Actions()) != numActionsBefore {
|
|
t.Errorf("Expected 0 more actions, got %d", len(client.Actions())-numActionsBefore)
|
|
}
|
|
}
|
|
|
|
// Ensure SyncService correctly selects and labels EndpointSlices.
|
|
func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
ns := metav1.NamespaceDefault
|
|
serviceName := "testing-1"
|
|
service := createService(t, esController, ns, serviceName)
|
|
|
|
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
|
|
ownerRef := metav1.NewControllerRef(service, gvk)
|
|
|
|
// 5 slices, 3 with matching labels for our service
|
|
endpointSlices := []*discovery.EndpointSlice{{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "matching-1",
|
|
Namespace: ns,
|
|
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: serviceName,
|
|
discovery.LabelManagedBy: controllerName,
|
|
},
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}, {
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "matching-2",
|
|
Namespace: ns,
|
|
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: serviceName,
|
|
discovery.LabelManagedBy: controllerName,
|
|
},
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}, {
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "partially-matching-1",
|
|
Namespace: ns,
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: serviceName,
|
|
},
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}, {
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "not-matching-1",
|
|
Namespace: ns,
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: "something-else",
|
|
discovery.LabelManagedBy: controllerName,
|
|
},
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}, {
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "not-matching-2",
|
|
Namespace: ns,
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: serviceName,
|
|
discovery.LabelManagedBy: "something-else",
|
|
},
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}}
|
|
|
|
cmc := newCacheMutationCheck(endpointSlices)
|
|
|
|
// need to add them to both store and fake clientset
|
|
for _, endpointSlice := range endpointSlices {
|
|
err := esController.endpointSliceStore.Add(endpointSlice)
|
|
if err != nil {
|
|
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
|
|
}
|
|
_, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Fatalf("Expected no error creating EndpointSlice: %v", err)
|
|
}
|
|
}
|
|
|
|
numActionsBefore := len(client.Actions())
|
|
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
|
|
assert.Nil(t, err, "Expected no error syncing service")
|
|
|
|
if len(client.Actions()) != numActionsBefore+2 {
|
|
t.Errorf("Expected 2 more actions, got %d", len(client.Actions())-numActionsBefore)
|
|
}
|
|
|
|
// only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder
|
|
expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices")
|
|
expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices")
|
|
|
|
// ensure cache mutation has not occurred
|
|
cmc.Check(t)
|
|
}
|
|
|
|
func TestOnEndpointSliceUpdate(t *testing.T) {
|
|
_, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
ns := metav1.NamespaceDefault
|
|
serviceName := "testing-1"
|
|
epSlice1 := &discovery.EndpointSlice{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "matching-1",
|
|
Namespace: ns,
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: serviceName,
|
|
discovery.LabelManagedBy: controllerName,
|
|
},
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}
|
|
|
|
epSlice2 := epSlice1.DeepCopy()
|
|
epSlice2.Labels[discovery.LabelManagedBy] = "something else"
|
|
|
|
assert.Equal(t, 0, esController.queue.Len())
|
|
esController.onEndpointSliceUpdate(epSlice1, epSlice2)
|
|
err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) {
|
|
if esController.queue.Len() > 0 {
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error waiting for add to queue")
|
|
}
|
|
assert.Equal(t, 1, esController.queue.Len())
|
|
}
|
|
|
|
func TestSyncService(t *testing.T) {
|
|
creationTimestamp := metav1.Now()
|
|
deletionTimestamp := metav1.Now()
|
|
|
|
testcases := []struct {
|
|
name string
|
|
service *v1.Service
|
|
pods []*v1.Pod
|
|
expectedEndpointPorts []discovery.EndpointPort
|
|
expectedEndpoints []discovery.Endpoint
|
|
}{
|
|
{
|
|
name: "pods with multiple IPs and Service with ipFamilies=ipv4",
|
|
service: &v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foobar",
|
|
Namespace: "default",
|
|
CreationTimestamp: creationTimestamp,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{
|
|
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
|
|
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
|
|
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
|
|
},
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
|
|
},
|
|
},
|
|
pods: []*v1.Pod{
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod0",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: nil,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.1",
|
|
PodIPs: []v1.PodIP{{
|
|
IP: "10.0.0.1",
|
|
}},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod1",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: nil,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.2",
|
|
PodIPs: []v1.PodIP{
|
|
{
|
|
IP: "10.0.0.2",
|
|
},
|
|
{
|
|
IP: "fd08::5678:0000:0000:9abc:def0",
|
|
},
|
|
},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
expectedEndpointPorts: []discovery.EndpointPort{
|
|
{
|
|
Name: utilpointer.StringPtr("sctp-example"),
|
|
Protocol: protoPtr(v1.ProtocolSCTP),
|
|
Port: utilpointer.Int32Ptr(int32(3456)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("udp-example"),
|
|
Protocol: protoPtr(v1.ProtocolUDP),
|
|
Port: utilpointer.Int32Ptr(int32(161)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("tcp-example"),
|
|
Protocol: protoPtr(v1.ProtocolTCP),
|
|
Port: utilpointer.Int32Ptr(int32(80)),
|
|
},
|
|
},
|
|
expectedEndpoints: []discovery.Endpoint{
|
|
{
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: utilpointer.BoolPtr(true),
|
|
Serving: utilpointer.BoolPtr(true),
|
|
Terminating: utilpointer.BoolPtr(false),
|
|
},
|
|
Addresses: []string{"10.0.0.1"},
|
|
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
|
NodeName: utilpointer.StringPtr("node-1"),
|
|
},
|
|
{
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: utilpointer.BoolPtr(true),
|
|
Serving: utilpointer.BoolPtr(true),
|
|
Terminating: utilpointer.BoolPtr(false),
|
|
},
|
|
Addresses: []string{"10.0.0.2"},
|
|
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
|
NodeName: utilpointer.StringPtr("node-1"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "pods with multiple IPs and Service with ipFamilies=ipv6",
|
|
service: &v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foobar",
|
|
Namespace: "default",
|
|
CreationTimestamp: creationTimestamp,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{
|
|
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
|
|
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
|
|
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
|
|
},
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv6Protocol},
|
|
},
|
|
},
|
|
pods: []*v1.Pod{
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod0",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: nil,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.1",
|
|
PodIPs: []v1.PodIP{{
|
|
IP: "10.0.0.1",
|
|
}},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod1",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: nil,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.2",
|
|
PodIPs: []v1.PodIP{
|
|
{
|
|
IP: "10.0.0.2",
|
|
},
|
|
{
|
|
IP: "fd08::5678:0000:0000:9abc:def0",
|
|
},
|
|
},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
expectedEndpointPorts: []discovery.EndpointPort{
|
|
{
|
|
Name: utilpointer.StringPtr("sctp-example"),
|
|
Protocol: protoPtr(v1.ProtocolSCTP),
|
|
Port: utilpointer.Int32Ptr(int32(3456)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("udp-example"),
|
|
Protocol: protoPtr(v1.ProtocolUDP),
|
|
Port: utilpointer.Int32Ptr(int32(161)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("tcp-example"),
|
|
Protocol: protoPtr(v1.ProtocolTCP),
|
|
Port: utilpointer.Int32Ptr(int32(80)),
|
|
},
|
|
},
|
|
expectedEndpoints: []discovery.Endpoint{
|
|
{
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: utilpointer.BoolPtr(true),
|
|
Serving: utilpointer.BoolPtr(true),
|
|
Terminating: utilpointer.BoolPtr(false),
|
|
},
|
|
Addresses: []string{"fd08::5678:0000:0000:9abc:def0"},
|
|
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
|
NodeName: utilpointer.StringPtr("node-1"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "Terminating pods",
|
|
service: &v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foobar",
|
|
Namespace: "default",
|
|
CreationTimestamp: creationTimestamp,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{
|
|
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
|
|
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
|
|
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
|
|
},
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
|
|
},
|
|
},
|
|
pods: []*v1.Pod{
|
|
{
|
|
// one ready pod for comparison
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod0",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: nil,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.1",
|
|
PodIPs: []v1.PodIP{{
|
|
IP: "10.0.0.1",
|
|
}},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod1",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: &deletionTimestamp,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.2",
|
|
PodIPs: []v1.PodIP{
|
|
{
|
|
IP: "10.0.0.2",
|
|
},
|
|
},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
expectedEndpointPorts: []discovery.EndpointPort{
|
|
{
|
|
Name: utilpointer.StringPtr("sctp-example"),
|
|
Protocol: protoPtr(v1.ProtocolSCTP),
|
|
Port: utilpointer.Int32Ptr(int32(3456)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("udp-example"),
|
|
Protocol: protoPtr(v1.ProtocolUDP),
|
|
Port: utilpointer.Int32Ptr(int32(161)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("tcp-example"),
|
|
Protocol: protoPtr(v1.ProtocolTCP),
|
|
Port: utilpointer.Int32Ptr(int32(80)),
|
|
},
|
|
},
|
|
expectedEndpoints: []discovery.Endpoint{
|
|
{
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: utilpointer.BoolPtr(true),
|
|
Serving: utilpointer.BoolPtr(true),
|
|
Terminating: utilpointer.BoolPtr(false),
|
|
},
|
|
Addresses: []string{"10.0.0.1"},
|
|
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
|
NodeName: utilpointer.StringPtr("node-1"),
|
|
},
|
|
{
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: utilpointer.BoolPtr(false),
|
|
Serving: utilpointer.BoolPtr(true),
|
|
Terminating: utilpointer.BoolPtr(true),
|
|
},
|
|
Addresses: []string{"10.0.0.2"},
|
|
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
|
NodeName: utilpointer.StringPtr("node-1"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "Not ready terminating pods",
|
|
service: &v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foobar",
|
|
Namespace: "default",
|
|
CreationTimestamp: creationTimestamp,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{
|
|
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
|
|
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
|
|
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
|
|
},
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
|
|
},
|
|
},
|
|
pods: []*v1.Pod{
|
|
{
|
|
// one ready pod for comparison
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod0",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: nil,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.1",
|
|
PodIPs: []v1.PodIP{{
|
|
IP: "10.0.0.1",
|
|
}},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "pod1",
|
|
Labels: map[string]string{"foo": "bar"},
|
|
DeletionTimestamp: &deletionTimestamp,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{
|
|
Name: "container-1",
|
|
}},
|
|
NodeName: "node-1",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: "10.0.0.2",
|
|
PodIPs: []v1.PodIP{
|
|
{
|
|
IP: "10.0.0.2",
|
|
},
|
|
},
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionFalse,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
expectedEndpointPorts: []discovery.EndpointPort{
|
|
{
|
|
Name: utilpointer.StringPtr("sctp-example"),
|
|
Protocol: protoPtr(v1.ProtocolSCTP),
|
|
Port: utilpointer.Int32Ptr(int32(3456)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("udp-example"),
|
|
Protocol: protoPtr(v1.ProtocolUDP),
|
|
Port: utilpointer.Int32Ptr(int32(161)),
|
|
},
|
|
{
|
|
Name: utilpointer.StringPtr("tcp-example"),
|
|
Protocol: protoPtr(v1.ProtocolTCP),
|
|
Port: utilpointer.Int32Ptr(int32(80)),
|
|
},
|
|
},
|
|
expectedEndpoints: []discovery.Endpoint{
|
|
{
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: utilpointer.BoolPtr(true),
|
|
Serving: utilpointer.BoolPtr(true),
|
|
Terminating: utilpointer.BoolPtr(false),
|
|
},
|
|
Addresses: []string{"10.0.0.1"},
|
|
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
|
NodeName: utilpointer.StringPtr("node-1"),
|
|
},
|
|
{
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: utilpointer.BoolPtr(false),
|
|
Serving: utilpointer.BoolPtr(false),
|
|
Terminating: utilpointer.BoolPtr(true),
|
|
},
|
|
Addresses: []string{"10.0.0.2"},
|
|
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
|
NodeName: utilpointer.StringPtr("node-1"),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, testcase := range testcases {
|
|
t.Run(testcase.name, func(t *testing.T) {
|
|
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
|
|
for _, pod := range testcase.pods {
|
|
esController.podStore.Add(pod)
|
|
}
|
|
esController.serviceStore.Add(testcase.service)
|
|
|
|
_, err := esController.client.CoreV1().Services(testcase.service.Namespace).Create(context.TODO(), testcase.service, metav1.CreateOptions{})
|
|
assert.Nil(t, err, "Expected no error creating service")
|
|
|
|
err = esController.syncService(fmt.Sprintf("%s/%s", testcase.service.Namespace, testcase.service.Name))
|
|
assert.Nil(t, err)
|
|
|
|
// last action should be to create endpoint slice
|
|
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
|
sliceList, err := client.DiscoveryV1().EndpointSlices(testcase.service.Namespace).List(context.TODO(), metav1.ListOptions{})
|
|
assert.Nil(t, err, "Expected no error fetching endpoint slices")
|
|
assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")
|
|
|
|
// ensure all attributes of endpoint slice match expected state
|
|
slice := sliceList.Items[0]
|
|
assert.Equal(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime], creationTimestamp.UTC().Format(time.RFC3339Nano))
|
|
assert.ElementsMatch(t, testcase.expectedEndpointPorts, slice.Ports)
|
|
assert.ElementsMatch(t, testcase.expectedEndpoints, slice.Endpoints)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
|
|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
|
|
// TODO(mborsz): Migrate this test to mock clock when possible.
|
|
func TestPodAddsBatching(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
type podAdd struct {
|
|
delay time.Duration
|
|
}
|
|
|
|
tests := []struct {
|
|
name string
|
|
batchPeriod time.Duration
|
|
adds []podAdd
|
|
finalDelay time.Duration
|
|
wantRequestCount int
|
|
}{
|
|
{
|
|
name: "three adds with no batching",
|
|
batchPeriod: 0 * time.Second,
|
|
adds: []podAdd{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 3,
|
|
},
|
|
{
|
|
name: "three adds in one batch",
|
|
batchPeriod: 1 * time.Second,
|
|
adds: []podAdd{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 1,
|
|
},
|
|
{
|
|
name: "three adds in two batches",
|
|
batchPeriod: 1 * time.Second,
|
|
adds: []podAdd{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 1 * time.Second,
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 2,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
|
|
go esController.Run(1, stopCh)
|
|
|
|
esController.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
|
|
for i, add := range tc.adds {
|
|
time.Sleep(add.delay)
|
|
|
|
p := newPod(i, ns, true, 0, false)
|
|
esController.podStore.Add(p)
|
|
esController.addPod(p)
|
|
}
|
|
|
|
time.Sleep(tc.finalDelay)
|
|
assert.Len(t, client.Actions(), tc.wantRequestCount)
|
|
// In case of error, make debugging easier.
|
|
for _, action := range client.Actions() {
|
|
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
|
|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
|
|
// TODO(mborsz): Migrate this test to mock clock when possible.
|
|
func TestPodUpdatesBatching(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resourceVersion := 1
|
|
type podUpdate struct {
|
|
delay time.Duration
|
|
podName string
|
|
podIP string
|
|
}
|
|
|
|
tests := []struct {
|
|
name string
|
|
batchPeriod time.Duration
|
|
podsCount int
|
|
updates []podUpdate
|
|
finalDelay time.Duration
|
|
wantRequestCount int
|
|
}{
|
|
{
|
|
name: "three updates with no batching",
|
|
batchPeriod: 0 * time.Second,
|
|
podsCount: 10,
|
|
updates: []podUpdate{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
podIP: "10.0.0.0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
podIP: "10.0.0.1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
podIP: "10.0.0.2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 3,
|
|
},
|
|
{
|
|
name: "three updates in one batch",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
updates: []podUpdate{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
podIP: "10.0.0.0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
podIP: "10.0.0.1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
podIP: "10.0.0.2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 1,
|
|
},
|
|
{
|
|
name: "three updates in two batches",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
updates: []podUpdate{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
podIP: "10.0.0.0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
podIP: "10.0.0.1",
|
|
},
|
|
{
|
|
delay: 1 * time.Second,
|
|
podName: "pod2",
|
|
podIP: "10.0.0.2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 2,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
|
|
go esController.Run(1, stopCh)
|
|
|
|
addPods(t, esController, ns, tc.podsCount)
|
|
|
|
esController.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
|
|
for _, update := range tc.updates {
|
|
time.Sleep(update.delay)
|
|
|
|
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
|
|
if err != nil {
|
|
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
|
|
}
|
|
if !exists {
|
|
t.Fatalf("Pod %q doesn't exist", update.podName)
|
|
}
|
|
oldPod := old.(*v1.Pod)
|
|
newPod := oldPod.DeepCopy()
|
|
newPod.Status.PodIPs[0].IP = update.podIP
|
|
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
|
|
resourceVersion++
|
|
|
|
esController.podStore.Update(newPod)
|
|
esController.updatePod(oldPod, newPod)
|
|
}
|
|
|
|
time.Sleep(tc.finalDelay)
|
|
assert.Len(t, client.Actions(), tc.wantRequestCount)
|
|
// In case of error, make debugging easier.
|
|
for _, action := range client.Actions() {
|
|
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
|
|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
|
|
// TODO(mborsz): Migrate this test to mock clock when possible.
|
|
func TestPodDeleteBatching(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
type podDelete struct {
|
|
delay time.Duration
|
|
podName string
|
|
}
|
|
|
|
tests := []struct {
|
|
name string
|
|
batchPeriod time.Duration
|
|
podsCount int
|
|
deletes []podDelete
|
|
finalDelay time.Duration
|
|
wantRequestCount int
|
|
}{
|
|
{
|
|
name: "three deletes with no batching",
|
|
batchPeriod: 0 * time.Second,
|
|
podsCount: 10,
|
|
deletes: []podDelete{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 3,
|
|
},
|
|
{
|
|
name: "three deletes in one batch",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
deletes: []podDelete{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 1,
|
|
},
|
|
{
|
|
name: "three deletes in two batches",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
deletes: []podDelete{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
},
|
|
{
|
|
delay: 1 * time.Second,
|
|
podName: "pod2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 2,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
|
|
go esController.Run(1, stopCh)
|
|
|
|
addPods(t, esController, ns, tc.podsCount)
|
|
|
|
esController.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
|
|
for _, update := range tc.deletes {
|
|
time.Sleep(update.delay)
|
|
|
|
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
|
|
assert.Nil(t, err, "error while retrieving old value of %q: %v", update.podName, err)
|
|
assert.Equal(t, true, exists, "pod should exist")
|
|
esController.podStore.Delete(old)
|
|
esController.deletePod(old)
|
|
}
|
|
|
|
time.Sleep(tc.finalDelay)
|
|
assert.Len(t, client.Actions(), tc.wantRequestCount)
|
|
// In case of error, make debugging easier.
|
|
for _, action := range client.Actions() {
|
|
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSyncServiceStaleInformer(t *testing.T) {
|
|
testcases := []struct {
|
|
name string
|
|
informerGenerationNumber int64
|
|
trackerGenerationNumber int64
|
|
expectError bool
|
|
}{
|
|
{
|
|
name: "informer cache outdated",
|
|
informerGenerationNumber: 10,
|
|
trackerGenerationNumber: 12,
|
|
expectError: true,
|
|
},
|
|
{
|
|
name: "cache and tracker synced",
|
|
informerGenerationNumber: 10,
|
|
trackerGenerationNumber: 10,
|
|
expectError: false,
|
|
},
|
|
{
|
|
name: "tracker outdated",
|
|
informerGenerationNumber: 10,
|
|
trackerGenerationNumber: 1,
|
|
expectError: false,
|
|
},
|
|
}
|
|
|
|
for _, testcase := range testcases {
|
|
t.Run(testcase.name, func(t *testing.T) {
|
|
_, esController := newController([]string{"node-1"}, time.Duration(0))
|
|
ns := metav1.NamespaceDefault
|
|
serviceName := "testing-1"
|
|
|
|
// Store Service in the cache
|
|
esController.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
|
|
},
|
|
})
|
|
|
|
// Create EndpointSlice in the informer cache with informerGenerationNumber
|
|
epSlice1 := &discovery.EndpointSlice{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "matching-1",
|
|
Namespace: ns,
|
|
Generation: testcase.informerGenerationNumber,
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: serviceName,
|
|
discovery.LabelManagedBy: controllerName,
|
|
},
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
}
|
|
err := esController.endpointSliceStore.Add(epSlice1)
|
|
if err != nil {
|
|
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
|
|
}
|
|
|
|
// Create EndpointSlice in the tracker with trackerGenerationNumber
|
|
epSlice2 := epSlice1.DeepCopy()
|
|
epSlice2.Generation = testcase.trackerGenerationNumber
|
|
esController.endpointSliceTracker.Update(epSlice2)
|
|
|
|
err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
|
|
// Check if we got a StaleInformerCache error
|
|
if endpointsliceutil.IsStaleInformerCacheErr(err) != testcase.expectError {
|
|
t.Fatalf("Expected error because informer cache is outdated")
|
|
}
|
|
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_checkNodeTopologyDistribution(t *testing.T) {
|
|
zoneA := "zone-a"
|
|
zoneB := "zone-b"
|
|
zoneC := "zone-c"
|
|
|
|
readyTrue := true
|
|
readyFalse := false
|
|
|
|
cpu100 := resource.MustParse("100m")
|
|
cpu1000 := resource.MustParse("1000m")
|
|
cpu2000 := resource.MustParse("2000m")
|
|
|
|
type nodeInfo struct {
|
|
zoneLabel *string
|
|
ready *bool
|
|
cpu *resource.Quantity
|
|
}
|
|
|
|
testCases := []struct {
|
|
name string
|
|
nodes []nodeInfo
|
|
topologyCacheEnabled bool
|
|
endpointZoneInfo map[string]topologycache.EndpointZoneInfo
|
|
expectedQueueLen int
|
|
}{{
|
|
name: "empty",
|
|
nodes: []nodeInfo{},
|
|
topologyCacheEnabled: false,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{},
|
|
expectedQueueLen: 0,
|
|
}, {
|
|
name: "lopsided, queue required",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu100},
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
|
|
},
|
|
expectedQueueLen: 1,
|
|
}, {
|
|
name: "lopsided but 1 unready, queue required because unready node means 0 CPU in one zone",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneA, ready: &readyFalse, cpu: &cpu100},
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
|
|
},
|
|
expectedQueueLen: 1,
|
|
}, {
|
|
name: "even zones, uneven endpoint distribution but within threshold, no sync required",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneB: 5, zoneC: 4},
|
|
},
|
|
expectedQueueLen: 0,
|
|
}, {
|
|
name: "even zones but node missing zone, sync required",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
|
|
{ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneB: 5, zoneC: 4},
|
|
},
|
|
expectedQueueLen: 1,
|
|
}, {
|
|
name: "even zones but node missing cpu, sync required",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneB, ready: &readyTrue},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneB: 5, zoneC: 4},
|
|
},
|
|
expectedQueueLen: 1,
|
|
}, {
|
|
name: "even zones, uneven endpoint distribution beyond threshold, no sync required",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneB: 6, zoneC: 4},
|
|
},
|
|
expectedQueueLen: 1,
|
|
}, {
|
|
name: "3 uneven zones, matching endpoint distribution, no sync required",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 1},
|
|
},
|
|
expectedQueueLen: 0,
|
|
}, {
|
|
name: "3 uneven zones, endpoint distribution within threshold but below 1, sync required",
|
|
nodes: []nodeInfo{
|
|
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
|
|
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
|
|
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
|
|
},
|
|
topologyCacheEnabled: true,
|
|
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
|
|
"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 0},
|
|
},
|
|
expectedQueueLen: 1,
|
|
}}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
_, esController := newController([]string{}, time.Duration(0))
|
|
|
|
for i, nodeInfo := range tc.nodes {
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("node-%d", i)},
|
|
Status: v1.NodeStatus{},
|
|
}
|
|
if nodeInfo.zoneLabel != nil {
|
|
node.Labels = map[string]string{v1.LabelTopologyZone: *nodeInfo.zoneLabel}
|
|
}
|
|
if nodeInfo.ready != nil {
|
|
status := v1.ConditionFalse
|
|
if *nodeInfo.ready {
|
|
status = v1.ConditionTrue
|
|
}
|
|
node.Status.Conditions = []v1.NodeCondition{{
|
|
Type: v1.NodeReady,
|
|
Status: status,
|
|
}}
|
|
}
|
|
if nodeInfo.cpu != nil {
|
|
node.Status.Allocatable = v1.ResourceList{
|
|
v1.ResourceCPU: *nodeInfo.cpu,
|
|
}
|
|
}
|
|
esController.nodeStore.Add(node)
|
|
if tc.topologyCacheEnabled {
|
|
esController.topologyCache = topologycache.NewTopologyCache()
|
|
for serviceKey, endpointZoneInfo := range tc.endpointZoneInfo {
|
|
esController.topologyCache.SetHints(serviceKey, discovery.AddressTypeIPv4, endpointZoneInfo)
|
|
}
|
|
}
|
|
}
|
|
|
|
esController.checkNodeTopologyDistribution()
|
|
|
|
if esController.queue.Len() != tc.expectedQueueLen {
|
|
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// Test helpers
|
|
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
|
|
t.Helper()
|
|
for i := 0; i < podsCount; i++ {
|
|
pod := newPod(i, namespace, true, 0, false)
|
|
esController.podStore.Add(pod)
|
|
}
|
|
}
|
|
|
|
func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) {
|
|
t.Helper()
|
|
createService(t, esController, namespace, serviceName)
|
|
|
|
err := esController.syncService(fmt.Sprintf("%s/%s", namespace, serviceName))
|
|
assert.Nil(t, err, "Expected no error syncing service")
|
|
}
|
|
|
|
func createService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) *v1.Service {
|
|
t.Helper()
|
|
service := &v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: serviceName,
|
|
Namespace: namespace,
|
|
CreationTimestamp: metav1.NewTime(time.Now()),
|
|
UID: types.UID(namespace + "-" + serviceName),
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
|
|
Selector: map[string]string{"foo": "bar"},
|
|
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
|
|
},
|
|
}
|
|
esController.serviceStore.Add(service)
|
|
_, err := esController.client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
|
|
assert.Nil(t, err, "Expected no error creating service")
|
|
return service
|
|
}
|
|
|
|
func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, resource string) {
|
|
t.Helper()
|
|
if len(actions) <= index {
|
|
t.Fatalf("Expected at least %d actions, got %d", index+1, len(actions))
|
|
}
|
|
|
|
action := actions[index]
|
|
if action.GetVerb() != verb {
|
|
t.Errorf("Expected action %d verb to be %s, got %s", index, verb, action.GetVerb())
|
|
}
|
|
|
|
if action.GetResource().Resource != resource {
|
|
t.Errorf("Expected action %d resource to be %s, got %s", index, resource, action.GetResource().Resource)
|
|
}
|
|
}
|
|
|
|
// protoPtr takes a Protocol and returns a pointer to it.
|
|
func protoPtr(proto v1.Protocol) *v1.Protocol {
|
|
return &proto
|
|
}
|
|
|
|
// cacheMutationCheck helps ensure that cached objects have not been changed
|
|
// in any way throughout a test run.
|
|
type cacheMutationCheck struct {
|
|
objects []cacheObject
|
|
}
|
|
|
|
// cacheObject stores a reference to an original object as well as a deep copy
|
|
// of that object to track any mutations in the original object.
|
|
type cacheObject struct {
|
|
original runtime.Object
|
|
deepCopy runtime.Object
|
|
}
|
|
|
|
// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
|
|
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
|
|
cmc := cacheMutationCheck{}
|
|
for _, endpointSlice := range endpointSlices {
|
|
cmc.Add(endpointSlice)
|
|
}
|
|
return cmc
|
|
}
|
|
|
|
// Add appends a runtime.Object and a deep copy of that object into the
|
|
// cacheMutationCheck.
|
|
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
|
|
cmc.objects = append(cmc.objects, cacheObject{
|
|
original: o,
|
|
deepCopy: o.DeepCopyObject(),
|
|
})
|
|
}
|
|
|
|
// Check verifies that no objects in the cacheMutationCheck have been mutated.
|
|
func (cmc *cacheMutationCheck) Check(t *testing.T) {
|
|
for _, o := range cmc.objects {
|
|
if !reflect.DeepEqual(o.original, o.deepCopy) {
|
|
// Cached objects can't be safely mutated and instead should be deep
|
|
// copied before changed in any way.
|
|
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
|
|
}
|
|
}
|
|
}
|
|
|
|
func Test_dropEndpointSlicesPendingDeletion(t *testing.T) {
|
|
now := metav1.Now()
|
|
endpointSlices := []*discovery.EndpointSlice{
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "epSlice1",
|
|
DeletionTimestamp: &now,
|
|
},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "epSlice2",
|
|
},
|
|
AddressType: discovery.AddressTypeIPv4,
|
|
Endpoints: []discovery.Endpoint{
|
|
{
|
|
Addresses: []string{"172.18.0.2"},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "epSlice3",
|
|
},
|
|
AddressType: discovery.AddressTypeIPv6,
|
|
Endpoints: []discovery.Endpoint{
|
|
{
|
|
Addresses: []string{"3001:0da8:75a3:0000:0000:8a2e:0370:7334"},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
epSlice2 := endpointSlices[1]
|
|
epSlice3 := endpointSlices[2]
|
|
|
|
result := dropEndpointSlicesPendingDeletion(endpointSlices)
|
|
|
|
assert.Len(t, result, 2)
|
|
for _, epSlice := range result {
|
|
if epSlice.Name == "epSlice1" {
|
|
t.Errorf("Expected EndpointSlice marked for deletion to be dropped.")
|
|
}
|
|
}
|
|
|
|
// We don't use endpointSlices and instead check manually for equality, because
|
|
// `dropEndpointSlicesPendingDeletion` mutates the slice it receives, so it's easy
|
|
// to break this test later. This way, we can be absolutely sure that the result
|
|
// has exactly what we expect it to.
|
|
if !reflect.DeepEqual(epSlice2, result[0]) {
|
|
t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice2, result[0])
|
|
}
|
|
if !reflect.DeepEqual(epSlice3, result[1]) {
|
|
t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice3, result[1])
|
|
}
|
|
}
|