kubernetes/pkg/controller/endpoint/endpoints_controller_test.go
Abhishek Kr Srivastav 9d10ddb060 Fix Go vet errors for master golang
Co-authored-by: Rajalakshmi-Girish <rajalakshmi.girish1@ibm.com>
Co-authored-by: Abhishek Kr Srivastav <Abhishek.kr.srivastav@ibm.com>
2025-01-08 15:11:34 +05:30

2887 lines
91 KiB
Go

/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clientscheme "k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
api "k8s.io/kubernetes/pkg/apis/core"
controllerpkg "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/test/utils/ktesting"
utilnet "k8s.io/utils/net"
"k8s.io/utils/pointer"
)
var alwaysReady = func() bool { return true }
var neverReady = func() bool { return false }
var emptyNodeName string
var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
var ipv4only = []v1.IPFamily{v1.IPv4Protocol}
var ipv6only = []v1.IPFamily{v1.IPv6Protocol}
var ipv4ipv6 = []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol}
var ipv6ipv4 = []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol}
func testPod(namespace string, id int, nPorts int, isReady bool, ipFamilies []v1.IPFamily) *v1.Pod {
p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("pod%d", id),
Labels: map[string]string{"foo": "bar"},
ResourceVersion: fmt.Sprint(id),
},
Spec: v1.PodSpec{
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
}
if !isReady {
p.Status.Conditions[0].Status = v1.ConditionFalse
}
for j := 0; j < nPorts; j++ {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
}
for _, family := range ipFamilies {
var ip string
if family == v1.IPv4Protocol {
ip = fmt.Sprintf("1.2.3.%d", 4+id)
} else {
ip = fmt.Sprintf("2000::%d", 4+id)
}
p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: ip})
}
p.Status.PodIP = p.Status.PodIPs[0].IP
return p
}
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, ipFamilies []v1.IPFamily) {
for i := 0; i < nPods+nNotReady; i++ {
isReady := i < nPods
pod := testPod(namespace, i, nPorts, isReady, ipFamilies)
store.Add(pod)
}
}
func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
for i := 0; i < nPods; i++ {
p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("pod%d", i),
Labels: map[string]string{"foo": "bar"},
},
Spec: v1.PodSpec{
RestartPolicy: restartPolicy,
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
},
Status: v1.PodStatus{
PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
Phase: podPhase,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
},
},
}
for j := 0; j < nPorts; j++ {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
}
store.Add(p)
}
}
func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
fakeEndpointsHandler := utiltesting.FakeHandler{
StatusCode: http.StatusOK,
ResponseBody: runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}),
}
mux := http.NewServeMux()
if namespace == "" {
t.Fatal("namespace cannot be empty")
}
mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints", &fakeEndpointsHandler)
mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints/", &fakeEndpointsHandler)
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
http.Error(res, "", http.StatusNotFound)
})
return httptest.NewServer(mux), &fakeEndpointsHandler
}
// makeBlockingEndpointTestServer will signal the blockNextAction channel on endpoint "POST", "PUT", and "DELETE"
// requests. "POST" and "PUT" requests will wait on a blockUpdate signal if provided, while "DELETE" requests will wait
// on a blockDelete signal if provided. If controller is nil, an error will be sent in the response.
func makeBlockingEndpointTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockUpdate, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
handlerFunc := func(res http.ResponseWriter, req *http.Request) {
if controller == nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte("controller has not been set yet"))
return
}
if req.Method == "POST" || req.Method == "PUT" {
if blockUpdate != nil {
go func() {
// Delay the update of endpoints to make endpoints cache out of sync
<-blockUpdate
_ = controller.endpointsStore.Add(endpoint)
}()
} else {
_ = controller.endpointsStore.Add(endpoint)
}
blockNextAction <- struct{}{}
}
if req.Method == "DELETE" {
if blockDelete != nil {
go func() {
// Delay the deletion of endpoints to make endpoints cache out of sync
<-blockDelete
_ = controller.endpointsStore.Delete(endpoint)
controller.onEndpointsDelete(endpoint)
}()
} else {
_ = controller.endpointsStore.Delete(endpoint)
controller.onEndpointsDelete(endpoint)
}
blockNextAction <- struct{}{}
}
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusOK)
_, _ = res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpoint)))
}
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints", handlerFunc)
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints/", handlerFunc)
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/events", func(res http.ResponseWriter, req *http.Request) {})
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
http.Error(res, "", http.StatusNotFound)
})
return httptest.NewServer(mux)
}
type endpointController struct {
*Controller
podStore cache.Store
serviceStore cache.Store
endpointsStore cache.Store
}
func newController(ctx context.Context, url string, batchPeriod time.Duration) *endpointController {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
endpoints := NewEndpointController(ctx, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
informerFactory.Core().V1().Endpoints(), client, batchPeriod)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
return &endpointController{
endpoints,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Core().V1().Services().Informer().GetStore(),
informerFactory.Core().V1().Endpoints().Informer().GetStore(),
}
}
func newFakeController(ctx context.Context, batchPeriod time.Duration) (*fake.Clientset, *endpointController) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
eController := NewEndpointController(
ctx,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Services(),
informerFactory.Core().V1().Endpoints(),
client,
batchPeriod)
eController.podsSynced = alwaysReady
eController.servicesSynced = alwaysReady
eController.endpointsSynced = alwaysReady
return client, &endpointController{
eController,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Core().V1().Services().Informer().GetStore(),
informerFactory.Core().V1().Endpoints().Informer().GetStore(),
}
}
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: nil,
})
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
})
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
pod0 := testPod(ns, 0, 1, true, ipv4only)
pod1 := testPod(ns, 1, 1, false, ipv4only)
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{
IP: pod0.Status.PodIPs[0].IP,
NodeName: &emptyNodeName,
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod0.Name, Namespace: ns, ResourceVersion: "1"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: pod1.Status.PodIPs[0].IP,
NodeName: &emptyNodeName,
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod1.Name, Namespace: ns, ResourceVersion: "2"},
},
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
pod0.ResourceVersion = "3"
pod1.ResourceVersion = "4"
endpoints.podStore.Add(pod0)
endpoints.podStore.Add(pod1)
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsNewNoSubsets(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
}
func TestCheckLeftoverEndpoints(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, _ := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
endpoints.checkLeftoverEndpoints()
if e, a := 1, endpoints.queue.Len(); e != a {
t.Fatalf("Expected %v, got %v", e, a)
}
got, _ := endpoints.queue.Get()
if e, a := ns+"/foo", got; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestSyncEndpointsProtocolTCP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{},
})
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncServiceExternalNameType(t *testing.T) {
serviceName := "testing-1"
namespace := metav1.NamespaceDefault
testCases := []struct {
desc string
service *v1.Service
}{
{
desc: "External name with selector and ports should not receive endpoints",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
Type: v1.ServiceTypeExternalName,
},
},
},
{
desc: "External name with ports should not receive endpoints",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{Port: 80}},
Type: v1.ServiceTypeExternalName,
},
},
},
{
desc: "External name with selector should not receive endpoints",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Type: v1.ServiceTypeExternalName,
},
},
},
{
desc: "External name without selector and ports should not receive endpoints",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespace},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeExternalName,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
testServer, endpointsHandler := makeTestServer(t, namespace)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
err := endpoints.serviceStore.Add(tc.service)
if err != nil {
t.Fatalf("Error adding service to service store: %v", err)
}
err = endpoints.syncService(tCtx, namespace+"/"+serviceName)
if err != nil {
t.Fatalf("Error syncing service: %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
})
}
}
func TestSyncEndpointsProtocolUDP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "UDP"}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsProtocolSCTP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "SCTP"}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 0, 1, 1, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
})
addPods(endpoints.podStore, ns, 1, 1, 1, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsPreexisting(t *testing.T) {
ns := "bar"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
Name: "foo",
Namespace: ns,
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsItems(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
addPods(endpoints.podStore, "blah", 5, 2, 0, ipv4only) // make sure these aren't found!
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)},
},
},
})
err := endpoints.syncService(tCtx, "other/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
expectedSubsets := []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
},
Ports: []v1.EndpointPort{
{Name: "port0", Port: 8080, Protocol: "TCP"},
{Name: "port1", Port: 8088, Protocol: "TCP"},
},
}}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "",
Name: "foo",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: endptspkg.SortSubsets(expectedSubsets),
})
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
}
func TestSyncEndpointsItemsWithLabels(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 3, 2, 0, ipv4only)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
Labels: serviceLabels,
},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt32(8088)},
},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
expectedSubsets := []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
},
Ports: []v1.EndpointPort{
{Name: "port0", Port: 8080, Protocol: "TCP"},
{Name: "port1", Port: 8088, Protocol: "TCP"},
},
}}
serviceLabels[v1.IsHeadlessService] = ""
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "",
Name: "foo",
Labels: serviceLabels,
},
Subsets: endptspkg.SortSubsets(expectedSubsets),
})
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
}
func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
ns := "bar"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
Labels: serviceLabels,
},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
serviceLabels[v1.IsHeadlessService] = ""
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: serviceLabels,
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
var tests = []struct {
podsSynced func() bool
servicesSynced func() bool
endpointsSynced func() bool
shouldUpdateEndpoints bool
}{
{neverReady, alwaysReady, alwaysReady, false},
{alwaysReady, neverReady, alwaysReady, false},
{alwaysReady, alwaysReady, neverReady, false},
{alwaysReady, alwaysReady, alwaysReady, true},
}
for _, test := range tests {
func() {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
},
}
endpoints.serviceStore.Add(service)
endpoints.onServiceUpdate(service)
endpoints.podsSynced = test.podsSynced
endpoints.servicesSynced = test.servicesSynced
endpoints.endpointsSynced = test.endpointsSynced
endpoints.workerLoopPeriod = 10 * time.Millisecond
go endpoints.Run(tCtx, 1)
// cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
// To ensure we get all updates, including unexpected ones, we need to wait at least as long as
// a single cache sync period and worker period, with some fudge room.
time.Sleep(150 * time.Millisecond)
if test.shouldUpdateEndpoints {
// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
return endpoints.queue.Len() == 0, nil
})
endpointsHandler.ValidateRequestCount(t, 1)
} else {
endpointsHandler.ValidateRequestCount(t, 0)
}
}()
}
}
func TestSyncEndpointsHeadlessService(t *testing.T) {
ns := "headless"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
ClusterIP: api.ClusterIPNone,
Ports: []v1.ServicePort{},
},
}
originalService := service.DeepCopy()
endpoints.serviceStore.Add(service)
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"a": "b",
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{},
}},
})
if !reflect.DeepEqual(originalService, service) {
t.Fatalf("syncing endpoints changed service: %s", cmp.Diff(service, originalService))
}
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{},
})
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{},
})
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{},
})
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: "None",
Ports: nil,
},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: nil,
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
}
func TestPodToEndpointAddressForService(t *testing.T) {
ipv4 := v1.IPv4Protocol
ipv6 := v1.IPv6Protocol
testCases := []struct {
name string
ipFamilies []v1.IPFamily
service v1.Service
expectedEndpointFamily v1.IPFamily
expectError bool
}{
{
name: "v4 service, in a single stack cluster",
ipFamilies: ipv4only,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 service, in a dual stack cluster",
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 service, in a dual stack ipv6-primary cluster",
ipFamilies: ipv6ipv4,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 headless service, in a single stack cluster",
ipFamilies: ipv4only,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 headless service, in a dual stack cluster",
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 legacy headless service, in a dual stack cluster",
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv4,
},
{
name: "v4 legacy headless service, in a dual stack ipv6-primary cluster",
ipFamilies: ipv6ipv4,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv6,
},
{
name: "v6 service, in a dual stack cluster",
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
expectedEndpointFamily: ipv6,
},
{
name: "v6 headless service, in a single stack cluster",
ipFamilies: ipv6only,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
},
},
expectedEndpointFamily: ipv6,
},
{
name: "v6 headless service, in a dual stack cluster (connected to a new api-server)",
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone,
IPFamilies: []v1.IPFamily{v1.IPv6Protocol}, // <- set by a api-server defaulting logic
},
},
expectedEndpointFamily: ipv6,
},
{
name: "v6 legacy headless service, in a dual stack cluster (connected to a old api-server)",
ipFamilies: ipv4ipv6,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: v1.ClusterIPNone, // <- families are not set by api-server
},
},
expectedEndpointFamily: ipv4,
},
// in reality this is a misconfigured cluster
// i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
// previously controller could assign wrong ip to endpoint address
// with gate removed. this is no longer the case. this is *not* behavior change
// because previously things would have failed in kube-proxy anyway (due to editing wrong iptables).
{
name: "v6 service, in a v4 only cluster.",
ipFamilies: ipv4only,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
expectError: true,
expectedEndpointFamily: ipv4,
},
// but this will actually give an error
{
name: "v6 service, in a v4 only cluster",
ipFamilies: ipv4only,
service: v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "3000::1",
},
},
expectError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test"
addPods(podStore, ns, 1, 1, 0, tc.ipFamilies)
pods := podStore.List()
if len(pods) != 1 {
t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
}
pod := pods[0].(*v1.Pod)
epa, err := podToEndpointAddressForService(&tc.service, pod)
if err != nil && !tc.expectError {
t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
}
if err == nil && tc.expectError {
t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
}
if err != nil && tc.expectError {
return
}
if utilnet.IsIPv6String(epa.IP) != (tc.expectedEndpointFamily == ipv6) {
t.Fatalf("IP: expected %s, got: %s", tc.expectedEndpointFamily, epa.IP)
}
if *(epa.NodeName) != pod.Spec.NodeName {
t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
}
if epa.TargetRef.Kind != "Pod" {
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
}
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
t.Fatalf("TargetRef.Namespace: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
}
if epa.TargetRef.Name != pod.ObjectMeta.Name {
t.Fatalf("TargetRef.Name: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
}
if epa.TargetRef.UID != pod.ObjectMeta.UID {
t.Fatalf("TargetRef.UID: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
}
if epa.TargetRef.ResourceVersion != "" {
t.Fatalf("TargetRef.ResourceVersion: expected empty, got: %s", epa.TargetRef.ResourceVersion)
}
})
}
}
func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
},
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{
v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
},
Labels: map[string]string{
v1.IsHeadlessService: "",
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0*time.Second)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
// Neither pod nor service has trigger time, this should cause annotation to be cleared.
addPods(endpoints.podStore, ns, 1, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080), Protocol: "TCP"}},
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
v1.IsHeadlessService: "",
}, // Annotation not set anymore.
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
}
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodUpdatesBatching(t *testing.T) {
type podUpdate struct {
delay time.Duration
podName string
podIP string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
updates []podUpdate
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three updates with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three updates in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three updates in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 1 * time.Second,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := "other"
resourceVersion := 1
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
endpoints.workerLoopPeriod = 10 * time.Millisecond
go endpoints.Run(tCtx, 1)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.updates {
time.Sleep(update.delay)
old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
if err != nil {
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
}
if !exists {
t.Fatalf("Pod %q doesn't exist", update.podName)
}
oldPod := old.(*v1.Pod)
newPod := oldPod.DeepCopy()
newPod.Status.PodIP = update.podIP
newPod.Status.PodIPs[0].IP = update.podIP
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
resourceVersion++
endpoints.podStore.Update(newPod)
endpoints.updatePod(oldPod, newPod)
}
time.Sleep(tc.finalDelay)
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
})
}
}
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodAddsBatching(t *testing.T) {
type podAdd struct {
delay time.Duration
}
tests := []struct {
name string
batchPeriod time.Duration
adds []podAdd
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three adds with no batching",
batchPeriod: 0 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three adds in one batch",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three adds in two batches",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 1 * time.Second,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
endpoints.workerLoopPeriod = 10 * time.Millisecond
go endpoints.Run(tCtx, 1)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for i, add := range tc.adds {
time.Sleep(add.delay)
p := testPod(ns, i, 1, true, ipv4only)
endpoints.podStore.Add(p)
endpoints.addPod(p)
}
time.Sleep(tc.finalDelay)
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
})
}
}
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodDeleteBatching(t *testing.T) {
type podDelete struct {
delay time.Duration
podName string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
deletes []podDelete
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three deletes with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three deletes in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three deletes in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 1 * time.Second,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, tc.batchPeriod)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
endpoints.workerLoopPeriod = 10 * time.Millisecond
go endpoints.Run(tCtx, 1)
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, ipv4only)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.deletes {
time.Sleep(update.delay)
old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
if err != nil {
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
}
if !exists {
t.Fatalf("Pod %q doesn't exist", update.podName)
}
endpoints.podStore.Delete(old)
endpoints.deletePod(old)
}
time.Sleep(tc.finalDelay)
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
})
}
}
func TestSyncEndpointsServiceNotFound(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
endpoints := newController(tCtx, testServer.URL, 0)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
})
err := endpoints.syncService(tCtx, ns+"/foo")
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
}
func TestSyncServiceOverCapacity(t *testing.T) {
testCases := []struct {
name string
startingAnnotation *string
numExisting int
numDesired int
numDesiredNotReady int
numExpectedReady int
numExpectedNotReady int
expectedAnnotation bool
}{{
name: "empty",
startingAnnotation: nil,
numExisting: 0,
numDesired: 0,
numExpectedReady: 0,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "annotation added past capacity, < than maxCapacity of Ready Addresses",
startingAnnotation: nil,
numExisting: maxCapacity - 1,
numDesired: maxCapacity - 3,
numDesiredNotReady: 4,
numExpectedReady: maxCapacity - 3,
numExpectedNotReady: 3,
expectedAnnotation: true,
}, {
name: "annotation added past capacity, maxCapacity of Ready Addresses ",
startingAnnotation: nil,
numExisting: maxCapacity - 1,
numDesired: maxCapacity,
numDesiredNotReady: 10,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: true,
}, {
name: "annotation removed below capacity",
startingAnnotation: pointer.String("truncated"),
numExisting: maxCapacity - 1,
numDesired: maxCapacity - 1,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity - 1,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "annotation was set to warning previously, annotation removed at capacity",
startingAnnotation: pointer.String("warning"),
numExisting: maxCapacity,
numDesired: maxCapacity,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "annotation was set to warning previously but still over capacity",
startingAnnotation: pointer.String("warning"),
numExisting: maxCapacity + 1,
numDesired: maxCapacity + 1,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: true,
}, {
name: "annotation removed at capacity",
startingAnnotation: pointer.String("truncated"),
numExisting: maxCapacity,
numDesired: maxCapacity,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "no endpoints change, annotation value corrected",
startingAnnotation: pointer.String("invalid"),
numExisting: maxCapacity + 1,
numDesired: maxCapacity + 1,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: true,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tCtx := ktesting.Init(t)
ns := "test"
client, c := newFakeController(tCtx, 0*time.Second)
addPods(c.podStore, ns, tc.numDesired, 1, tc.numDesiredNotReady, ipv4only)
pods := c.podStore.List()
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
}
c.serviceStore.Add(svc)
subset := v1.EndpointSubset{}
for i := 0; i < tc.numExisting; i++ {
pod := pods[i].(*v1.Pod)
epa, _ := podToEndpointAddressForService(svc, pod)
subset.Addresses = append(subset.Addresses, *epa)
}
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{},
},
Subsets: []v1.EndpointSubset{subset},
}
if tc.startingAnnotation != nil {
endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation
}
c.endpointsStore.Add(endpoints)
_, err := client.CoreV1().Endpoints(ns).Create(tCtx, endpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unexpected error creating endpoints: %v", err)
}
err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, svc.Name))
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, endpoints.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("unexpected error getting endpoints: %v", err)
}
actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity]
if tc.expectedAnnotation {
if !ok {
t.Errorf("Expected EndpointsOverCapacity annotation to be set")
} else if actualAnnotation != "truncated" {
t.Errorf("Expected EndpointsOverCapacity annotation to be 'truncated', got %s", actualAnnotation)
}
} else {
if ok {
t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation)
}
}
numActualReady := 0
numActualNotReady := 0
for _, subset := range actualEndpoints.Subsets {
numActualReady += len(subset.Addresses)
numActualNotReady += len(subset.NotReadyAddresses)
}
if numActualReady != tc.numExpectedReady {
t.Errorf("Unexpected number of actual ready Endpoints: got %d endpoints, want %d endpoints", numActualReady, tc.numExpectedReady)
}
if numActualNotReady != tc.numExpectedNotReady {
t.Errorf("Unexpected number of actual not ready Endpoints: got %d endpoints, want %d endpoints", numActualNotReady, tc.numExpectedNotReady)
}
})
}
}
func TestTruncateEndpoints(t *testing.T) {
testCases := []struct {
desc string
// subsetsReady, subsetsNotReady, expectedReady, expectedNotReady
// must all be the same length
subsetsReady []int
subsetsNotReady []int
expectedReady []int
expectedNotReady []int
}{{
desc: "empty",
subsetsReady: []int{},
subsetsNotReady: []int{},
expectedReady: []int{},
expectedNotReady: []int{},
}, {
desc: "total endpoints < max capacity",
subsetsReady: []int{50, 100, 100, 100, 100},
subsetsNotReady: []int{50, 100, 100, 100, 100},
expectedReady: []int{50, 100, 100, 100, 100},
expectedNotReady: []int{50, 100, 100, 100, 100},
}, {
desc: "total endpoints = max capacity",
subsetsReady: []int{100, 100, 100, 100, 100},
subsetsNotReady: []int{100, 100, 100, 100, 100},
expectedReady: []int{100, 100, 100, 100, 100},
expectedNotReady: []int{100, 100, 100, 100, 100},
}, {
desc: "total ready endpoints < max capacity, but total endpoints > max capacity",
subsetsReady: []int{90, 110, 50, 10, 20},
subsetsNotReady: []int{101, 200, 200, 201, 298},
expectedReady: []int{90, 110, 50, 10, 20},
expectedNotReady: []int{73, 144, 144, 145, 214},
}, {
desc: "total ready endpoints > max capacity",
subsetsReady: []int{205, 400, 402, 400, 693},
subsetsNotReady: []int{100, 200, 200, 200, 300},
expectedReady: []int{98, 191, 192, 191, 328},
expectedNotReady: []int{0, 0, 0, 0, 0},
}}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
var subsets []v1.EndpointSubset
for subsetIndex, numReady := range tc.subsetsReady {
subset := v1.EndpointSubset{}
for i := 0; i < numReady; i++ {
subset.Addresses = append(subset.Addresses, v1.EndpointAddress{})
}
numNotReady := tc.subsetsNotReady[subsetIndex]
for i := 0; i < numNotReady; i++ {
subset.NotReadyAddresses = append(subset.NotReadyAddresses, v1.EndpointAddress{})
}
subsets = append(subsets, subset)
}
endpoints := &v1.Endpoints{Subsets: subsets}
truncateEndpoints(endpoints)
for i, subset := range endpoints.Subsets {
if len(subset.Addresses) != tc.expectedReady[i] {
t.Errorf("Unexpected number of actual ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.Addresses), tc.expectedReady[i])
}
if len(subset.NotReadyAddresses) != tc.expectedNotReady[i] {
t.Errorf("Unexpected number of actual not ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.NotReadyAddresses), tc.expectedNotReady[i])
}
}
})
}
}
func TestEndpointPortFromServicePort(t *testing.T) {
http := pointer.String("http")
testCases := map[string]struct {
serviceAppProtocol *string
expectedEndpointsAppProtocol *string
}{
"empty app protocol": {
serviceAppProtocol: nil,
expectedEndpointsAppProtocol: nil,
},
"http app protocol": {
serviceAppProtocol: http,
expectedEndpointsAppProtocol: http,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
epp := endpointPortFromServicePort(&v1.ServicePort{Name: "test", AppProtocol: tc.serviceAppProtocol}, 80)
if epp.AppProtocol != tc.expectedEndpointsAppProtocol {
t.Errorf("Expected Endpoints AppProtocol to be %s, got %s", stringVal(tc.expectedEndpointsAppProtocol), stringVal(epp.AppProtocol))
}
})
}
}
// TestMultipleServiceChanges tests that endpoints that are not created because of an out of sync endpoints cache are eventually recreated
// A service will be created. After the endpoints exist, the service will be deleted and the endpoints will not be deleted from the cache immediately.
// After the service is recreated, the endpoints will be deleted replicating an out of sync cache. Expect that eventually the endpoints will be recreated.
func TestMultipleServiceChanges(t *testing.T) {
ns := metav1.NamespaceDefault
expectedSubsets := []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
},
}}
endpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
Subsets: expectedSubsets,
}
controller := &endpointController{}
blockDelete := make(chan struct{})
blockNextAction := make(chan struct{})
stopChan := make(chan struct{})
testServer := makeBlockingEndpointTestServer(t, controller, endpoint, nil, blockDelete, blockNextAction, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
*controller = *newController(tCtx, testServer.URL, 0*time.Second)
addPods(controller.podStore, ns, 1, 1, 0, ipv4only)
go func() { controller.Run(tCtx, 1) }()
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: "None",
Ports: nil,
},
}
controller.serviceStore.Add(svc)
controller.onServiceUpdate(svc)
// blockNextAction should eventually unblock once server gets endpoint request.
waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Add should have caused a request to be sent to the test server")
controller.serviceStore.Delete(svc)
controller.onServiceDelete(svc)
waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Delete should have caused a request to be sent to the test server")
// If endpoints cache has not updated before service update is registered
// Services add will not trigger a Create endpoint request.
controller.serviceStore.Add(svc)
controller.onServiceUpdate(svc)
// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
return controller.queue.Len() == 0, nil
})
// Cause test server to delete endpoints
close(blockDelete)
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoint should have been recreated")
close(blockNextAction)
close(stopChan)
}
// TestMultiplePodChanges tests that endpoints that are not updated because of an out of sync endpoints cache are
// eventually resynced after multiple Pod changes.
func TestMultiplePodChanges(t *testing.T) {
ns := metav1.NamespaceDefault
readyEndpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
}},
}
notReadyEndpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "2"},
Subsets: []v1.EndpointSubset{{
NotReadyAddresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
}},
}
controller := &endpointController{}
blockUpdate := make(chan struct{})
blockNextAction := make(chan struct{})
stopChan := make(chan struct{})
testServer := makeBlockingEndpointTestServer(t, controller, notReadyEndpoints, blockUpdate, nil, blockNextAction, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
*controller = *newController(tCtx, testServer.URL, 0*time.Second)
pod := testPod(ns, 0, 1, true, ipv4only)
_ = controller.podStore.Add(pod)
_ = controller.endpointsStore.Add(readyEndpoints)
_ = controller.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})
go func() { controller.Run(tCtx, 1) }()
// Rapidly update the Pod: Ready -> NotReady -> Ready.
pod2 := pod.DeepCopy()
pod2.ResourceVersion = "2"
pod2.Status.Conditions[0].Status = v1.ConditionFalse
_ = controller.podStore.Update(pod2)
controller.updatePod(pod, pod2)
// blockNextAction should eventually unblock once server gets endpoints request.
waitForChanReceive(t, 1*time.Second, blockNextAction, "Pod Update should have caused a request to be sent to the test server")
// The endpoints update hasn't been applied to the cache yet.
pod3 := pod.DeepCopy()
pod3.ResourceVersion = "3"
pod3.Status.Conditions[0].Status = v1.ConditionTrue
_ = controller.podStore.Update(pod3)
controller.updatePod(pod2, pod3)
// It shouldn't get endpoints request as the endpoints in the cache is out-of-date.
timer := time.NewTimer(100 * time.Millisecond)
select {
case <-timer.C:
case <-blockNextAction:
t.Errorf("Pod Update shouldn't have caused a request to be sent to the test server")
}
// Applying the endpoints update to the cache should cause test server to update endpoints.
close(blockUpdate)
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoints should have been updated")
close(blockNextAction)
close(stopChan)
}
func TestSyncServiceAddresses(t *testing.T) {
makeService := func(tolerateUnready bool) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
PublishNotReadyAddresses: tolerateUnready,
Type: v1.ServiceTypeClusterIP,
ClusterIP: "1.1.1.1",
Ports: []v1.ServicePort{{Port: 80}},
},
}
}
makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod {
statusCondition := v1.ConditionFalse
if isReady {
statusCondition = v1.ConditionTrue
}
now := metav1.Now()
deletionTimestamp := &now
if !terminating {
deletionTimestamp = nil
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "fakepod",
DeletionTimestamp: deletionTimestamp,
Labels: map[string]string{"foo": "bar"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{{Ports: []v1.ContainerPort{
{Name: "port1", ContainerPort: int32(8080)},
}}},
},
Status: v1.PodStatus{
Phase: phase,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: statusCondition,
},
},
PodIP: "10.1.1.1",
PodIPs: []v1.PodIP{
{IP: "10.1.1.1"},
},
},
}
}
testCases := []struct {
name string
pod *v1.Pod
service *v1.Service
expectedReady int
expectedUnready int
}{
{
name: "pod running phase",
pod: makePod(v1.PodRunning, true, false),
service: makeService(false),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod running phase being deleted",
pod: makePod(v1.PodRunning, true, true),
service: makeService(false),
expectedReady: 0,
expectedUnready: 0,
},
{
name: "pod unknown phase container ready",
pod: makePod(v1.PodUnknown, true, false),
service: makeService(false),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod unknown phase container ready being deleted",
pod: makePod(v1.PodUnknown, true, true),
service: makeService(false),
expectedReady: 0,
expectedUnready: 0,
},
{
name: "pod pending phase container ready",
pod: makePod(v1.PodPending, true, false),
service: makeService(false),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod pending phase container ready being deleted",
pod: makePod(v1.PodPending, true, true),
service: makeService(false),
expectedReady: 0,
expectedUnready: 0,
},
{
name: "pod unknown phase container not ready",
pod: makePod(v1.PodUnknown, false, false),
service: makeService(false),
expectedReady: 0,
expectedUnready: 1,
},
{
name: "pod pending phase container not ready",
pod: makePod(v1.PodPending, false, false),
service: makeService(false),
expectedReady: 0,
expectedUnready: 1,
},
{
name: "pod failed phase",
pod: makePod(v1.PodFailed, false, false),
service: makeService(false),
expectedReady: 0,
expectedUnready: 0,
},
{
name: "pod succeeded phase",
pod: makePod(v1.PodSucceeded, false, false),
service: makeService(false),
expectedReady: 0,
expectedUnready: 0,
},
{
name: "pod running phase and tolerate unready",
pod: makePod(v1.PodRunning, false, false),
service: makeService(true),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod running phase and tolerate unready being deleted",
pod: makePod(v1.PodRunning, false, true),
service: makeService(true),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod unknown phase and tolerate unready",
pod: makePod(v1.PodUnknown, false, false),
service: makeService(true),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod unknown phase and tolerate unready being deleted",
pod: makePod(v1.PodUnknown, false, true),
service: makeService(true),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod pending phase and tolerate unready",
pod: makePod(v1.PodPending, false, false),
service: makeService(true),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod pending phase and tolerate unready being deleted",
pod: makePod(v1.PodPending, false, true),
service: makeService(true),
expectedReady: 1,
expectedUnready: 0,
},
{
name: "pod failed phase and tolerate unready",
pod: makePod(v1.PodFailed, false, false),
service: makeService(true),
expectedReady: 0,
expectedUnready: 0,
},
{
name: "pod succeeded phase and tolerate unready endpoints",
pod: makePod(v1.PodSucceeded, false, false),
service: makeService(true),
expectedReady: 0,
expectedUnready: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tCtx := ktesting.Init(t)
ns := tc.service.Namespace
client, c := newFakeController(tCtx, 0*time.Second)
err := c.podStore.Add(tc.pod)
if err != nil {
t.Errorf("Unexpected error adding pod %v", err)
}
err = c.serviceStore.Add(tc.service)
if err != nil {
t.Errorf("Unexpected error adding service %v", err)
}
err = c.syncService(tCtx, fmt.Sprintf("%s/%s", ns, tc.service.Name))
if err != nil {
t.Errorf("Unexpected error syncing service %v", err)
}
endpoints, err := client.CoreV1().Endpoints(ns).Get(tCtx, tc.service.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Unexpected error %v", err)
}
readyEndpoints := 0
unreadyEndpoints := 0
for _, subset := range endpoints.Subsets {
readyEndpoints += len(subset.Addresses)
unreadyEndpoints += len(subset.NotReadyAddresses)
}
if tc.expectedReady != readyEndpoints {
t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints)
}
if tc.expectedUnready != unreadyEndpoints {
t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints)
}
})
}
}
func TestEndpointsDeletionEvents(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, _ := makeTestServer(t, ns)
defer testServer.Close()
tCtx := ktesting.Init(t)
controller := newController(tCtx, testServer.URL, 0)
store := controller.endpointsStore
ep1 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "rv1",
},
}
// Test Unexpected and Expected Deletes
store.Delete(ep1)
controller.onEndpointsDelete(ep1)
if controller.queue.Len() != 1 {
t.Errorf("Expected one service to be in the queue, found %d", controller.queue.Len())
}
}
func stringVal(str *string) string {
if str == nil {
return "nil"
}
return *str
}
// waitForChanReceive blocks up to the timeout waiting for the receivingChan to receive
func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan struct{}, errorMsg string) {
timer := time.NewTimer(timeout)
select {
case <-timer.C:
t.Error(errorMsg)
case <-receivingChan:
}
}
func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) {
copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset {
newSubSet := orig.DeepCopy()
mutator(newSubSet)
return newSubSet
}
es1 := &v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "1.1.1.1",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: "1.1.1.2",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"},
},
},
Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}},
}
es2 := &v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "2.2.2.1",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"},
},
},
NotReadyAddresses: []v1.EndpointAddress{
{
IP: "2.2.2.2",
TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"},
},
},
Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}},
}
tests := []struct {
name string
subsets1 []v1.EndpointSubset
subsets2 []v1.EndpointSubset
expected bool
}{
{
name: "Subsets removed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1},
expected: false,
},
{
name: "Ready Pod IP changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
es.Addresses[0].IP = "1.1.1.10"
}), *es2},
expected: false,
},
{
name: "NotReady Pod IP changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.NotReadyAddresses[0].IP = "2.2.2.10"
})},
expected: false,
},
{
name: "Pod ResourceVersion changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.Addresses[0].TargetRef.ResourceVersion = "100"
})},
expected: true,
},
{
name: "Pod ResourceVersion removed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) {
es.Addresses[0].TargetRef.ResourceVersion = ""
})},
expected: true,
},
{
name: "Ports changed",
subsets1: []v1.EndpointSubset{*es1, *es2},
subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) {
es.Ports[0].Port = 8082
})},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := endpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected {
t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected)
}
})
}
}