Splitting IP address type into IPv4 and IPv6 for EndpointSlices
This commit is contained in:
@@ -41,6 +41,7 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
"//vendor/k8s.io/utils/net:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@@ -198,6 +198,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
discovery.LabelManagedBy: controllerName,
|
||||
},
|
||||
},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}, {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "matching-2",
|
||||
@@ -207,6 +208,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
discovery.LabelManagedBy: controllerName,
|
||||
},
|
||||
},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}, {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "partially-matching-1",
|
||||
@@ -215,6 +217,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
discovery.LabelServiceName: serviceName,
|
||||
},
|
||||
},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}, {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "not-matching-1",
|
||||
@@ -224,6 +227,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
discovery.LabelManagedBy: controllerName,
|
||||
},
|
||||
},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}, {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "not-matching-2",
|
||||
@@ -233,6 +237,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
discovery.LabelManagedBy: "something-else",
|
||||
},
|
||||
},
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}}
|
||||
|
||||
// need to add them to both store and fake clientset
|
||||
@@ -272,16 +277,13 @@ func TestSyncServiceFull(t *testing.T) {
|
||||
client, esController := newController([]string{"node-1"})
|
||||
namespace := metav1.NamespaceDefault
|
||||
serviceName := "all-the-protocols"
|
||||
ipv6Family := v1.IPv6Protocol
|
||||
|
||||
// pod 1 only uses PodIP status attr
|
||||
pod1 := newPod(1, namespace, true, 0)
|
||||
pod1.Status.PodIP = "1.2.3.4"
|
||||
pod1.Status.PodIPs = []v1.PodIP{}
|
||||
pod1.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}}
|
||||
esController.podStore.Add(pod1)
|
||||
|
||||
// pod 2 only uses PodIPs status attr
|
||||
pod2 := newPod(2, namespace, true, 0)
|
||||
pod2.Status.PodIP = ""
|
||||
pod2.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.5"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
|
||||
esController.podStore.Add(pod2)
|
||||
|
||||
@@ -300,6 +302,7 @@ func TestSyncServiceFull(t *testing.T) {
|
||||
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
|
||||
},
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
IPFamily: &ipv6Family,
|
||||
},
|
||||
}
|
||||
esController.serviceStore.Add(service)
|
||||
@@ -318,7 +321,7 @@ func TestSyncServiceFull(t *testing.T) {
|
||||
|
||||
// ensure all attributes of endpoint slice match expected state
|
||||
slice := sliceList.Items[0]
|
||||
assert.Len(t, slice.Endpoints, 2, "Expected 2 endpoints in first slice")
|
||||
assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoints in first slice")
|
||||
assert.Equal(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"], serviceCreateTime.Format(time.RFC3339Nano))
|
||||
assert.ElementsMatch(t, []discovery.EndpointPort{{
|
||||
Name: strPtr("tcp-example"),
|
||||
@@ -336,12 +339,7 @@ func TestSyncServiceFull(t *testing.T) {
|
||||
|
||||
assert.ElementsMatch(t, []discovery.Endpoint{{
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: namespace, Name: pod1.Name},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
}, {
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Addresses: []string{"1.2.3.5", "1234::5678:0000:0000:9abc:def0"},
|
||||
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: namespace, Name: pod2.Name},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
}}, slice.Endpoints)
|
||||
|
@@ -47,7 +47,7 @@ type reconciler struct {
|
||||
// that logic in reconciler
|
||||
type endpointMeta struct {
|
||||
Ports []discovery.EndpointPort `json:"ports" protobuf:"bytes,2,rep,name=ports"`
|
||||
AddressType *discovery.AddressType `json:"addressType" protobuf:"bytes,3,rep,name=addressType"`
|
||||
AddressType discovery.AddressType `json:"addressType" protobuf:"bytes,3,rep,name=addressType"`
|
||||
}
|
||||
|
||||
// reconcile takes a set of pods currently matching a service selector and
|
||||
@@ -55,13 +55,26 @@ type endpointMeta struct {
|
||||
// slices for the given service. It creates, updates, or deletes endpoint slices
|
||||
// to ensure the desired set of pods are represented by endpoint slices.
|
||||
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
|
||||
addressType := discovery.AddressTypeIPv4
|
||||
if service.Spec.IPFamily != nil && *service.Spec.IPFamily == corev1.IPv6Protocol {
|
||||
addressType = discovery.AddressTypeIPv6
|
||||
}
|
||||
|
||||
slicesToCreate := []*discovery.EndpointSlice{}
|
||||
slicesToUpdate := []*discovery.EndpointSlice{}
|
||||
slicesToDelete := []*discovery.EndpointSlice{}
|
||||
|
||||
// Build data structures for existing state.
|
||||
existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
|
||||
numExistingEndpoints := 0
|
||||
for _, existingSlice := range existingSlices {
|
||||
epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
|
||||
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
|
||||
numExistingEndpoints += len(existingSlice.Endpoints)
|
||||
if existingSlice.AddressType == addressType {
|
||||
epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
|
||||
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
|
||||
numExistingEndpoints += len(existingSlice.Endpoints)
|
||||
} else {
|
||||
slicesToDelete = append(slicesToDelete, existingSlice)
|
||||
}
|
||||
}
|
||||
|
||||
// Build data structures for desired state.
|
||||
@@ -78,10 +91,8 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
}
|
||||
|
||||
if _, ok := desiredMetaByPortMap[epHash]; !ok {
|
||||
// TODO: Support multiple backend types
|
||||
ipAddressType := discovery.AddressTypeIP
|
||||
desiredMetaByPortMap[epHash] = &endpointMeta{
|
||||
AddressType: &ipAddressType,
|
||||
AddressType: addressType,
|
||||
Ports: endpointPorts,
|
||||
}
|
||||
}
|
||||
@@ -90,16 +101,14 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
endpoint := podToEndpoint(pod, node, service)
|
||||
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
|
||||
numDesiredEndpoints++
|
||||
if len(endpoint.Addresses) > 0 {
|
||||
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
|
||||
numDesiredEndpoints++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
slicesToCreate := []*discovery.EndpointSlice{}
|
||||
slicesToUpdate := []*discovery.EndpointSlice{}
|
||||
sliceNamesToDelete := sets.String{}
|
||||
spMetrics := metrics.NewServicePortCache()
|
||||
totalAdded := 0
|
||||
totalRemoved := 0
|
||||
@@ -107,7 +116,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
// Determine changes necessary for each group of slices by port map.
|
||||
for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
|
||||
numEndpoints := len(desiredEndpoints)
|
||||
pmSlicesToCreate, pmSlicesToUpdate, pmSliceNamesToDelete, added, removed := r.reconcileByPortMapping(
|
||||
pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
|
||||
service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])
|
||||
|
||||
totalAdded += added
|
||||
@@ -115,7 +124,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
|
||||
spMetrics.Set(portMap, metrics.EfficiencyInfo{
|
||||
Endpoints: numEndpoints,
|
||||
Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSliceNamesToDelete),
|
||||
Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
|
||||
})
|
||||
|
||||
if len(pmSlicesToCreate) > 0 {
|
||||
@@ -124,8 +133,8 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
if len(pmSlicesToUpdate) > 0 {
|
||||
slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
|
||||
}
|
||||
if pmSliceNamesToDelete.Len() > 0 {
|
||||
sliceNamesToDelete = sliceNamesToDelete.Union(pmSliceNamesToDelete)
|
||||
if len(pmSlicesToDelete) > 0 {
|
||||
slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,14 +143,14 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
for portMap, existingSlices := range existingSlicesByPortMap {
|
||||
if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
|
||||
for _, existingSlice := range existingSlices {
|
||||
sliceNamesToDelete.Insert(existingSlice.Name)
|
||||
slicesToDelete = append(slicesToDelete, existingSlice)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When no endpoint slices would usually exist, we need to add a placeholder.
|
||||
if len(existingSlices) == sliceNamesToDelete.Len() && len(slicesToCreate) < 1 {
|
||||
placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}})
|
||||
if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
|
||||
placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: addressType})
|
||||
slicesToCreate = append(slicesToCreate, placeholderSlice)
|
||||
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
|
||||
Endpoints: 0,
|
||||
@@ -155,27 +164,41 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
|
||||
r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
|
||||
|
||||
return r.finalize(service, slicesToCreate, slicesToUpdate, sliceNamesToDelete, triggerTime)
|
||||
return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
|
||||
}
|
||||
|
||||
// finalize creates, updates, and deletes slices as specified
|
||||
func (r *reconciler) finalize(
|
||||
service *corev1.Service,
|
||||
slicesToCreate,
|
||||
slicesToUpdate []*discovery.EndpointSlice,
|
||||
sliceNamesToDelete sets.String,
|
||||
slicesToUpdate,
|
||||
slicesToDelete []*discovery.EndpointSlice,
|
||||
triggerTime time.Time,
|
||||
) error {
|
||||
errs := []error{}
|
||||
|
||||
// If there are slices to create and delete, change the creates to updates
|
||||
// of the slices that would otherwise be deleted.
|
||||
for len(slicesToCreate) > 0 && sliceNamesToDelete.Len() > 0 {
|
||||
sliceName, _ := sliceNamesToDelete.PopAny()
|
||||
for i := 0; i < len(slicesToDelete); {
|
||||
if len(slicesToCreate) == 0 {
|
||||
break
|
||||
}
|
||||
sliceToDelete := slicesToDelete[i]
|
||||
slice := slicesToCreate[len(slicesToCreate)-1]
|
||||
slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
|
||||
slice.Name = sliceName
|
||||
slicesToUpdate = append(slicesToUpdate, slice)
|
||||
// Only update EndpointSlices that have the same AddressType as this
|
||||
// field is considered immutable. Since Services also consider IPFamily
|
||||
// immutable, the only case where this should matter will be the
|
||||
// migration from IP to IPv4 and IPv6 AddressTypes, where there's a
|
||||
// chance EndpointSlices with an IP AddressType would otherwise be
|
||||
// updated to IPv4 or IPv6 without this check.
|
||||
if sliceToDelete.AddressType == slice.AddressType {
|
||||
slice.Name = sliceToDelete.Name
|
||||
slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
|
||||
slicesToUpdate = append(slicesToUpdate, slice)
|
||||
slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
|
||||
} else {
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
for _, endpointSlice := range slicesToCreate {
|
||||
@@ -202,11 +225,10 @@ func (r *reconciler) finalize(
|
||||
}
|
||||
}
|
||||
|
||||
for sliceNamesToDelete.Len() > 0 {
|
||||
sliceName, _ := sliceNamesToDelete.PopAny()
|
||||
err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Delete(sliceName, &metav1.DeleteOptions{})
|
||||
for _, endpointSlice := range slicesToDelete {
|
||||
err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Delete(endpointSlice.Name, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceName, service.Namespace, service.Name, err))
|
||||
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
|
||||
} else {
|
||||
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
|
||||
}
|
||||
@@ -229,7 +251,7 @@ func (r *reconciler) reconcileByPortMapping(
|
||||
existingSlices []*discovery.EndpointSlice,
|
||||
desiredSet endpointSet,
|
||||
endpointMeta *endpointMeta,
|
||||
) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, sets.String, int, int) {
|
||||
) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, []*discovery.EndpointSlice, int, int) {
|
||||
slicesByName := map[string]*discovery.EndpointSlice{}
|
||||
sliceNamesUnchanged := sets.String{}
|
||||
sliceNamesToUpdate := sets.String{}
|
||||
@@ -345,7 +367,13 @@ func (r *reconciler) reconcileByPortMapping(
|
||||
slicesToUpdate = append(slicesToUpdate, slicesByName[sliceName])
|
||||
}
|
||||
|
||||
return slicesToCreate, slicesToUpdate, sliceNamesToDelete, numAdded, numRemoved
|
||||
// Build slicesToDelete from slice names.
|
||||
slicesToDelete := []*discovery.EndpointSlice{}
|
||||
for _, sliceName := range sliceNamesToDelete.UnsortedList() {
|
||||
slicesToDelete = append(slicesToDelete, slicesByName[sliceName])
|
||||
}
|
||||
|
||||
return slicesToCreate, slicesToUpdate, slicesToDelete, numAdded, numRemoved
|
||||
}
|
||||
|
||||
func (r *reconciler) deleteService(namespace, name string) {
|
||||
|
@@ -18,6 +18,8 @@ package endpointslice
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -65,11 +67,14 @@ func TestReconcileEmpty(t *testing.T) {
|
||||
// Given a single pod matching a service selector and no existing endpoint slices,
|
||||
// a slice should be created
|
||||
func TestReconcile1Pod(t *testing.T) {
|
||||
client := newClientset()
|
||||
setupMetrics()
|
||||
namespace := "test"
|
||||
svc, _ := newServiceAndEndpointMeta("foo", namespace)
|
||||
ipv6Family := corev1.IPv6Protocol
|
||||
svcv4, _ := newServiceAndEndpointMeta("foo", namespace)
|
||||
svcv6, _ := newServiceAndEndpointMeta("foo", namespace)
|
||||
svcv6.Spec.IPFamily = &ipv6Family
|
||||
|
||||
pod1 := newPod(1, namespace, true, 1)
|
||||
pod1.Status.PodIPs = []corev1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
|
||||
pod1.Spec.Hostname = "example-hostname"
|
||||
node1 := &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -81,33 +86,92 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
triggerTime := time.Now()
|
||||
r := newReconciler(client, []*corev1.Node{node1}, defaultMaxEndpointsPerSlice)
|
||||
reconcileHelper(t, r, &svc, []*corev1.Pod{pod1}, []*discovery.EndpointSlice{}, triggerTime)
|
||||
assert.Len(t, client.Actions(), 1, "Expected 1 additional clientset action")
|
||||
testCases := map[string]struct {
|
||||
service corev1.Service
|
||||
expectedAddressType discovery.AddressType
|
||||
expectedEndpoint discovery.Endpoint
|
||||
}{
|
||||
"ipv4": {
|
||||
service: svcv4,
|
||||
expectedAddressType: discovery.AddressTypeIPv4,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
Name: "pod1",
|
||||
},
|
||||
},
|
||||
},
|
||||
"ipv6": {
|
||||
service: svcv6,
|
||||
expectedAddressType: discovery.AddressTypeIPv6,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
Name: "pod1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
slices := fetchEndpointSlices(t, client, namespace)
|
||||
assert.Len(t, slices, 1, "Expected 1 endpoint slices")
|
||||
assert.Regexp(t, "^"+svc.Name, slices[0].Name)
|
||||
assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
|
||||
assert.Equal(t, slices[0].Annotations, map[string]string{
|
||||
"endpoints.kubernetes.io/last-change-trigger-time": triggerTime.Format(time.RFC3339Nano),
|
||||
})
|
||||
assert.EqualValues(t, []discovery.Endpoint{{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
Name: "pod1",
|
||||
},
|
||||
}}, slices[0].Endpoints)
|
||||
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
|
||||
for name, testCase := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
client := newClientset()
|
||||
setupMetrics()
|
||||
triggerTime := time.Now()
|
||||
r := newReconciler(client, []*corev1.Node{node1}, defaultMaxEndpointsPerSlice)
|
||||
reconcileHelper(t, r, &testCase.service, []*corev1.Pod{pod1}, []*discovery.EndpointSlice{}, triggerTime)
|
||||
|
||||
if len(client.Actions()) != 1 {
|
||||
t.Errorf("Expected 1 clientset action, got %d", len(client.Actions()))
|
||||
}
|
||||
|
||||
slices := fetchEndpointSlices(t, client, namespace)
|
||||
|
||||
if len(slices) != 1 {
|
||||
t.Fatalf("Expected 1 EndpointSlice, got %d", len(slices))
|
||||
}
|
||||
|
||||
slice := slices[0]
|
||||
if !strings.HasPrefix(slice.Name, testCase.service.Name) {
|
||||
t.Errorf("Expected EndpointSlice name to start with %s, got %s", testCase.service.Name, slice.Name)
|
||||
}
|
||||
|
||||
if slice.Labels[discovery.LabelServiceName] != testCase.service.Name {
|
||||
t.Errorf("Expected EndpointSlice to have label set with %s value, got %s", testCase.service.Name, slice.Labels[discovery.LabelServiceName])
|
||||
}
|
||||
|
||||
if slice.Annotations[corev1.EndpointsLastChangeTriggerTime] != triggerTime.Format(time.RFC3339Nano) {
|
||||
t.Errorf("Expected EndpointSlice trigger time annotation to be %s, got %s", triggerTime.Format(time.RFC3339Nano), slice.Annotations[corev1.EndpointsLastChangeTriggerTime])
|
||||
}
|
||||
|
||||
if len(slice.Endpoints) != 1 {
|
||||
t.Fatalf("Expected 1 Endpoint, got %d", len(slice.Endpoints))
|
||||
}
|
||||
|
||||
endpoint := slice.Endpoints[0]
|
||||
if !reflect.DeepEqual(endpoint, testCase.expectedEndpoint) {
|
||||
t.Errorf("Expected endpoint: %+v, got: %+v", testCase.expectedEndpoint, endpoint)
|
||||
}
|
||||
|
||||
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// given an existing endpoint slice and no pods matching the service, the existing
|
||||
@@ -437,6 +501,56 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
|
||||
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})
|
||||
}
|
||||
|
||||
// In this test, we want to verify that old EndpointSlices with a deprecated IP
|
||||
// address type will be replaced with a newer IPv4 type.
|
||||
func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
|
||||
client := newClientset()
|
||||
setupMetrics()
|
||||
namespace := "test"
|
||||
|
||||
svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
|
||||
endpointMeta.AddressType = discovery.AddressTypeIP
|
||||
|
||||
existingSlices := []*discovery.EndpointSlice{}
|
||||
pods := []*corev1.Pod{}
|
||||
|
||||
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
for i := 0; i < 80; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
existingSlices = append(existingSlices, slice1)
|
||||
|
||||
slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
|
||||
for i := 100; i < 150; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
existingSlices = append(existingSlices, slice2)
|
||||
|
||||
createEndpointSlices(t, client, namespace, existingSlices)
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
|
||||
|
||||
// ensure that both original endpoint slices have been deleted
|
||||
expectActions(t, client.Actions(), 2, "delete", "endpointslices")
|
||||
|
||||
endpointSlices := fetchEndpointSlices(t, client, namespace)
|
||||
|
||||
// since this involved replacing both EndpointSlices, the result should be
|
||||
// perfectly packed.
|
||||
expectUnorderedSlicesWithLengths(t, endpointSlices, []int{100, 30})
|
||||
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
if endpointSlice.AddressType != discovery.AddressTypeIPv4 {
|
||||
t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Named ports can map to different port numbers on different pods.
|
||||
// This test ensures that EndpointSlices are grouped correctly in that case.
|
||||
func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
|
||||
@@ -489,7 +603,6 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
|
||||
|
||||
// generate data structures for expected slice ports and address types
|
||||
protoTCP := corev1.ProtocolTCP
|
||||
ipAddressType := discovery.AddressTypeIP
|
||||
expectedSlices := []discovery.EndpointSlice{}
|
||||
for i := range fetchedSlices {
|
||||
expectedSlices = append(expectedSlices, discovery.EndpointSlice{
|
||||
@@ -498,7 +611,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
|
||||
Protocol: &protoTCP,
|
||||
Port: utilpointer.Int32Ptr(int32(8080 + i)),
|
||||
}},
|
||||
AddressType: &ipAddressType,
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
})
|
||||
}
|
||||
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/apis/discovery/validation"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
utilnet "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
// podEndpointChanged returns true if the results of podToEndpoint are different
|
||||
@@ -70,7 +71,7 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service)
|
||||
|
||||
ready := service.Spec.PublishNotReadyAddresses || podutil.IsPodReady(pod)
|
||||
ep := discovery.Endpoint{
|
||||
Addresses: getEndpointAddresses(pod.Status),
|
||||
Addresses: getEndpointAddresses(pod.Status, service.Spec.IPFamily),
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: &ready,
|
||||
},
|
||||
@@ -124,16 +125,18 @@ func getEndpointPorts(service *corev1.Service, pod *corev1.Pod) []discovery.Endp
|
||||
}
|
||||
|
||||
// getEndpointAddresses returns a list of addresses generated from a pod status.
|
||||
func getEndpointAddresses(podStatus corev1.PodStatus) []string {
|
||||
if len(podStatus.PodIPs) > 1 {
|
||||
addresss := []string{}
|
||||
for _, podIP := range podStatus.PodIPs {
|
||||
addresss = append(addresss, podIP.IP)
|
||||
func getEndpointAddresses(podStatus corev1.PodStatus, ipFamily *corev1.IPFamily) []string {
|
||||
isIPv6Family := ipFamily != nil && *ipFamily == corev1.IPv6Protocol
|
||||
addresses := []string{}
|
||||
|
||||
for _, podIP := range podStatus.PodIPs {
|
||||
isIPv6PodIP := utilnet.IsIPv6String(podIP.IP)
|
||||
if isIPv6Family == isIPv6PodIP {
|
||||
addresses = append(addresses, podIP.IP)
|
||||
}
|
||||
return addresss
|
||||
}
|
||||
|
||||
return []string{podStatus.PodIP}
|
||||
return addresses
|
||||
}
|
||||
|
||||
// endpointsEqualBeyondHash returns true if endpoints have equal attributes
|
||||
|
@@ -38,12 +38,12 @@ import (
|
||||
)
|
||||
|
||||
func TestNewEndpointSlice(t *testing.T) {
|
||||
ipAddressType := discovery.AddressTypeIP
|
||||
ipAddressType := discovery.AddressTypeIPv4
|
||||
portName := "foo"
|
||||
protocol := v1.ProtocolTCP
|
||||
endpointMeta := endpointMeta{
|
||||
Ports: []discovery.EndpointPort{{Name: &portName, Protocol: &protocol}},
|
||||
AddressType: &ipAddressType,
|
||||
AddressType: ipAddressType,
|
||||
}
|
||||
service := v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test"},
|
||||
@@ -204,7 +204,7 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
node: node1,
|
||||
svc: &svc,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.4", "1234::5678:0000:0000:9abc:def0"},
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
@@ -283,11 +283,11 @@ func TestPodChangedWithPodEndpointChanged(t *testing.T) {
|
||||
}
|
||||
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
|
||||
|
||||
newPod.Status.PodIP = "1.2.3.1"
|
||||
newPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.1"}}
|
||||
if !podChangedHelper(oldPod, newPod, podEndpointChanged) {
|
||||
t.Errorf("Expected pod to be changed with pod IP address change")
|
||||
}
|
||||
newPod.Status.PodIP = oldPod.Status.PodIP
|
||||
newPod.Status.PodIPs = oldPod.Status.PodIPs
|
||||
|
||||
newPod.ObjectMeta.Name = "wrong-name"
|
||||
if !podChangedHelper(oldPod, newPod, podEndpointChanged) {
|
||||
@@ -333,6 +333,9 @@ func newPod(n int, namespace string, ready bool, nPorts int) *v1.Pod {
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
PodIP: fmt.Sprintf("1.2.3.%d", 4+n),
|
||||
PodIPs: []v1.PodIP{{
|
||||
IP: fmt.Sprintf("1.2.3.%d", 4+n),
|
||||
}},
|
||||
Conditions: []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodReady,
|
||||
@@ -381,10 +384,10 @@ func newServiceAndEndpointMeta(name, namespace string) (v1.Service, endpointMeta
|
||||
},
|
||||
}
|
||||
|
||||
ipAddressType := discovery.AddressTypeIP
|
||||
addressType := discovery.AddressTypeIPv4
|
||||
protocol := v1.ProtocolTCP
|
||||
endpointMeta := endpointMeta{
|
||||
AddressType: &ipAddressType,
|
||||
AddressType: addressType,
|
||||
Ports: []discovery.EndpointPort{{Name: &name, Port: &portNum, Protocol: &protocol}},
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user