
- applied review results by LuisSanchez - Co-Authored-By: Luis Sanchez <sanchezl@redhat.com> genernal -> general iniital -> initial initalObjects -> initialObjects intentionaly -> intentionally inforer -> informer anotother -> another triger -> trigger mutli -> multi Verifyies -> Verifies valume -> volume unexpect -> unexpected unfulfiled -> unfulfilled implenets -> implements assignement -> assignment expectataions -> expectations nexpected -> unexpected boundSatsified -> boundSatisfied externel -> external calcuates -> calculates workes -> workers unitialized -> uninitialized afater -> after Espected -> Expected nodeMontiorGracePeriod -> NodeMonitorGracePeriod estimateGrracefulTermination -> estimateGracefulTermination secondrary -> secondary ShouldRunDaemonPodOnUnscheduableNode -> ShouldRunDaemonPodOnUnschedulableNode rrror -> error expectatitons -> expectations foud -> found epackage -> package succesfulJobs -> successfulJobs namesapce -> namespace ConfigMapResynce -> ConfigMapResync
1954 lines
60 KiB
Go
1954 lines
60 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package endpoint
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"reflect"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/diff"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/client-go/informers"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
clientscheme "k8s.io/client-go/kubernetes/scheme"
|
|
restclient "k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
utiltesting "k8s.io/client-go/util/testing"
|
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
|
endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
|
|
api "k8s.io/kubernetes/pkg/apis/core"
|
|
"k8s.io/kubernetes/pkg/controller"
|
|
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
|
"k8s.io/kubernetes/pkg/features"
|
|
)
|
|
|
|
var alwaysReady = func() bool { return true }
|
|
var neverReady = func() bool { return false }
|
|
var emptyNodeName string
|
|
var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
|
|
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
|
|
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
|
|
|
|
func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod {
|
|
p := &v1.Pod{
|
|
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: namespace,
|
|
Name: fmt.Sprintf("pod%d", id),
|
|
Labels: map[string]string{"foo": "bar"},
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
if !isReady {
|
|
p.Status.Conditions[0].Status = v1.ConditionFalse
|
|
}
|
|
for j := 0; j < nPorts; j++ {
|
|
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
|
|
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
|
|
}
|
|
if makeDualstack {
|
|
p.Status.PodIPs = []v1.PodIP{
|
|
{
|
|
IP: p.Status.PodIP,
|
|
},
|
|
{
|
|
IP: fmt.Sprintf("2000::%d", id),
|
|
},
|
|
}
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) {
|
|
for i := 0; i < nPods+nNotReady; i++ {
|
|
isReady := i < nPods
|
|
pod := testPod(namespace, i, nPorts, isReady, makeDualstack)
|
|
store.Add(pod)
|
|
}
|
|
}
|
|
|
|
func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
|
|
for i := 0; i < nPods; i++ {
|
|
p := &v1.Pod{
|
|
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: namespace,
|
|
Name: fmt.Sprintf("pod%d", i),
|
|
Labels: map[string]string{"foo": "bar"},
|
|
},
|
|
Spec: v1.PodSpec{
|
|
RestartPolicy: restartPolicy,
|
|
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
|
|
Phase: podPhase,
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionFalse,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for j := 0; j < nPorts; j++ {
|
|
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
|
|
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
|
|
}
|
|
store.Add(p)
|
|
}
|
|
}
|
|
|
|
func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
|
|
fakeEndpointsHandler := utiltesting.FakeHandler{
|
|
StatusCode: http.StatusOK,
|
|
ResponseBody: runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}),
|
|
}
|
|
mux := http.NewServeMux()
|
|
if namespace == "" {
|
|
t.Fatal("namespace cannot be empty")
|
|
}
|
|
mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints", &fakeEndpointsHandler)
|
|
mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints/", &fakeEndpointsHandler)
|
|
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
|
|
t.Errorf("unexpected request: %v", req.RequestURI)
|
|
http.Error(res, "", http.StatusNotFound)
|
|
})
|
|
return httptest.NewServer(mux), &fakeEndpointsHandler
|
|
}
|
|
|
|
type endpointController struct {
|
|
*EndpointController
|
|
podStore cache.Store
|
|
serviceStore cache.Store
|
|
endpointsStore cache.Store
|
|
}
|
|
|
|
func newController(url string, batchPeriod time.Duration) *endpointController {
|
|
client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
|
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
|
endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
|
|
informerFactory.Core().V1().Endpoints(), client, batchPeriod)
|
|
endpoints.podsSynced = alwaysReady
|
|
endpoints.servicesSynced = alwaysReady
|
|
endpoints.endpointsSynced = alwaysReady
|
|
return &endpointController{
|
|
endpoints,
|
|
informerFactory.Core().V1().Pods().Informer().GetStore(),
|
|
informerFactory.Core().V1().Services().Informer().GetStore(),
|
|
informerFactory.Core().V1().Endpoints().Informer().GetStore(),
|
|
}
|
|
}
|
|
|
|
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000}},
|
|
}},
|
|
})
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
endpointsHandler.ValidateRequestCount(t, 0)
|
|
}
|
|
|
|
func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: nil,
|
|
})
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
endpointsHandler.ValidateRequestCount(t, 0)
|
|
}
|
|
|
|
func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
endpointsHandler.ValidateRequestCount(t, 0)
|
|
}
|
|
|
|
func TestSyncEndpointsNewNoSubsets(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
}
|
|
|
|
func TestCheckLeftoverEndpoints(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, _ := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000}},
|
|
}},
|
|
})
|
|
endpoints.checkLeftoverEndpoints()
|
|
if e, a := 1, endpoints.queue.Len(); e != a {
|
|
t.Fatalf("Expected %v, got %v", e, a)
|
|
}
|
|
got, _ := endpoints.queue.Get()
|
|
if e, a := ns+"/foo", got; e != a {
|
|
t.Errorf("Expected %v, got %v", e, a)
|
|
}
|
|
}
|
|
|
|
func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsProtocolSCTP(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "SCTP"}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
addPods(endpoints.podStore, ns, 0, 1, 1, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 1, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
|
ns := "bar"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
ResourceVersion: "1",
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
endpointsHandler.ValidateRequestCount(t, 0)
|
|
}
|
|
|
|
func TestSyncEndpointsItems(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
addPods(endpoints.podStore, ns, 3, 2, 0, false)
|
|
addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found!
|
|
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{
|
|
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
|
|
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
|
|
},
|
|
},
|
|
})
|
|
endpoints.syncService("other/foo")
|
|
|
|
expectedSubsets := []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{
|
|
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
|
|
{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
|
|
{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
|
|
},
|
|
Ports: []v1.EndpointPort{
|
|
{Name: "port0", Port: 8080, Protocol: "TCP"},
|
|
{Name: "port1", Port: 8088, Protocol: "TCP"},
|
|
},
|
|
}}
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
ResourceVersion: "",
|
|
Name: "foo",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: endptspkg.SortSubsets(expectedSubsets),
|
|
})
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsWithLabels(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
addPods(endpoints.podStore, ns, 3, 2, 0, false)
|
|
serviceLabels := map[string]string{"foo": "bar"}
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
Labels: serviceLabels,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{
|
|
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
|
|
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
|
|
},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
expectedSubsets := []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{
|
|
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
|
|
{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
|
|
{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
|
|
},
|
|
Ports: []v1.EndpointPort{
|
|
{Name: "port0", Port: 8080, Protocol: "TCP"},
|
|
{Name: "port1", Port: 8088, Protocol: "TCP"},
|
|
},
|
|
}}
|
|
|
|
serviceLabels[v1.IsHeadlessService] = ""
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
ResourceVersion: "",
|
|
Name: "foo",
|
|
Labels: serviceLabels,
|
|
},
|
|
Subsets: endptspkg.SortSubsets(expectedSubsets),
|
|
})
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
|
|
ns := "bar"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
"foo": "bar",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
serviceLabels := map[string]string{"baz": "blah"}
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
Labels: serviceLabels,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
serviceLabels[v1.IsHeadlessService] = ""
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: serviceLabels,
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
|
|
var tests = []struct {
|
|
podsSynced func() bool
|
|
servicesSynced func() bool
|
|
endpointsSynced func() bool
|
|
shouldUpdateEndpoints bool
|
|
}{
|
|
{neverReady, alwaysReady, alwaysReady, false},
|
|
{alwaysReady, neverReady, alwaysReady, false},
|
|
{alwaysReady, alwaysReady, neverReady, false},
|
|
{alwaysReady, alwaysReady, alwaysReady, true},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
func() {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
|
|
service := &v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
|
|
},
|
|
}
|
|
endpoints.serviceStore.Add(service)
|
|
endpoints.onServiceUpdate(service)
|
|
endpoints.podsSynced = test.podsSynced
|
|
endpoints.servicesSynced = test.servicesSynced
|
|
endpoints.endpointsSynced = test.endpointsSynced
|
|
endpoints.workerLoopPeriod = 10 * time.Millisecond
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
go endpoints.Run(1, stopCh)
|
|
|
|
// cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
|
|
// To ensure we get all updates, including unexpected ones, we need to wait at least as long as
|
|
// a single cache sync period and worker period, with some fudge room.
|
|
time.Sleep(150 * time.Millisecond)
|
|
if test.shouldUpdateEndpoints {
|
|
// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
|
|
wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
|
|
return endpoints.queue.Len() == 0, nil
|
|
})
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
} else {
|
|
endpointsHandler.ValidateRequestCount(t, 0)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func TestSyncEndpointsHeadlessService(t *testing.T) {
|
|
ns := "headless"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
service := &v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
ClusterIP: api.ClusterIPNone,
|
|
Ports: []v1.ServicePort{},
|
|
},
|
|
}
|
|
originalService := service.DeepCopy()
|
|
endpoints.serviceStore.Add(service)
|
|
endpoints.syncService(ns + "/foo")
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
"a": "b",
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{},
|
|
}},
|
|
})
|
|
if !reflect.DeepEqual(originalService, service) {
|
|
t.Fatalf("syncing endpoints changed service: %s", diff.ObjectReflectDiff(service, originalService))
|
|
}
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
"foo": "bar",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
"foo": "bar",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
"foo": "bar",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
ClusterIP: "None",
|
|
Ports: nil,
|
|
},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.syncService(ns + "/foo")
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: nil,
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
|
|
}
|
|
|
|
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
|
|
// Just list all of the 3 false cases and 3 of the 12 true cases.
|
|
func TestShouldPodBeInEndpoints(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
pod *v1.Pod
|
|
expected bool
|
|
}{
|
|
// Pod should not be in endpoints cases:
|
|
{
|
|
name: "Failed pod with Never RestartPolicy",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
RestartPolicy: v1.RestartPolicyNever,
|
|
},
|
|
Status: v1.PodStatus{
|
|
Phase: v1.PodFailed,
|
|
},
|
|
},
|
|
expected: false,
|
|
},
|
|
{
|
|
name: "Succeeded pod with Never RestartPolicy",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
RestartPolicy: v1.RestartPolicyNever,
|
|
},
|
|
Status: v1.PodStatus{
|
|
Phase: v1.PodSucceeded,
|
|
},
|
|
},
|
|
expected: false,
|
|
},
|
|
{
|
|
name: "Succeeded pod with OnFailure RestartPolicy",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
RestartPolicy: v1.RestartPolicyOnFailure,
|
|
},
|
|
Status: v1.PodStatus{
|
|
Phase: v1.PodSucceeded,
|
|
},
|
|
},
|
|
expected: false,
|
|
},
|
|
// Pod should be in endpoints cases:
|
|
{
|
|
name: "Failed pod with Always RestartPolicy",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
RestartPolicy: v1.RestartPolicyAlways,
|
|
},
|
|
Status: v1.PodStatus{
|
|
Phase: v1.PodFailed,
|
|
},
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "Pending pod with Never RestartPolicy",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
RestartPolicy: v1.RestartPolicyNever,
|
|
},
|
|
Status: v1.PodStatus{
|
|
Phase: v1.PodPending,
|
|
},
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "Unknown pod with OnFailure RestartPolicy",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
RestartPolicy: v1.RestartPolicyOnFailure,
|
|
},
|
|
Status: v1.PodStatus{
|
|
Phase: v1.PodUnknown,
|
|
},
|
|
},
|
|
expected: true,
|
|
},
|
|
}
|
|
for _, test := range testCases {
|
|
result := shouldPodBeInEndpoints(test.pod)
|
|
if result != test.expected {
|
|
t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
|
|
}
|
|
}
|
|
}
|
|
func TestPodToEndpointAddressForService(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
expectedEndPointIP string
|
|
enableDualStack bool
|
|
expectError bool
|
|
enableDualStackPod bool
|
|
|
|
service v1.Service
|
|
}{
|
|
{
|
|
name: "v4 service, in a single stack cluster",
|
|
expectedEndPointIP: "1.2.3.4",
|
|
|
|
enableDualStack: false,
|
|
expectError: false,
|
|
enableDualStackPod: false,
|
|
|
|
service: v1.Service{
|
|
Spec: v1.ServiceSpec{
|
|
ClusterIP: "10.0.0.1",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "v4 service, in a dual stack cluster",
|
|
|
|
expectedEndPointIP: "1.2.3.4",
|
|
enableDualStack: true,
|
|
expectError: false,
|
|
enableDualStackPod: true,
|
|
|
|
service: v1.Service{
|
|
Spec: v1.ServiceSpec{
|
|
ClusterIP: "10.0.0.1",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "v6 service, in a dual stack cluster. dual stack enabled",
|
|
expectedEndPointIP: "2000::0",
|
|
|
|
enableDualStack: true,
|
|
expectError: false,
|
|
enableDualStackPod: true,
|
|
|
|
service: v1.Service{
|
|
Spec: v1.ServiceSpec{
|
|
ClusterIP: "3000::1",
|
|
},
|
|
},
|
|
},
|
|
|
|
// in reality this is a misconfigured cluster
|
|
// i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
|
|
// we are testing that we will keep producing the expected behavior
|
|
{
|
|
name: "v6 service, in a v4 only cluster. dual stack disabled",
|
|
expectedEndPointIP: "1.2.3.4",
|
|
|
|
enableDualStack: false,
|
|
expectError: false,
|
|
enableDualStackPod: false,
|
|
|
|
service: v1.Service{
|
|
Spec: v1.ServiceSpec{
|
|
ClusterIP: "3000::1",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "v6 service, in a v4 only cluster - dual stack enabled",
|
|
expectedEndPointIP: "1.2.3.4",
|
|
|
|
enableDualStack: true,
|
|
expectError: true,
|
|
enableDualStackPod: false,
|
|
|
|
service: v1.Service{
|
|
Spec: v1.ServiceSpec{
|
|
ClusterIP: "3000::1",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
|
|
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
|
|
ns := "test"
|
|
addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod)
|
|
pods := podStore.List()
|
|
if len(pods) != 1 {
|
|
t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
|
|
}
|
|
pod := pods[0].(*v1.Pod)
|
|
epa, err := podToEndpointAddressForService(&tc.service, pod)
|
|
|
|
if err != nil && !tc.expectError {
|
|
t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
|
|
}
|
|
|
|
if err == nil && tc.expectError {
|
|
t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
|
|
}
|
|
|
|
if err != nil && tc.expectError {
|
|
return
|
|
}
|
|
|
|
if epa.IP != tc.expectedEndPointIP {
|
|
t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
|
|
}
|
|
if *(epa.NodeName) != pod.Spec.NodeName {
|
|
t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
|
|
}
|
|
if epa.TargetRef.Kind != "Pod" {
|
|
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
|
|
}
|
|
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
|
|
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
|
|
}
|
|
if epa.TargetRef.Name != pod.ObjectMeta.Name {
|
|
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
|
|
}
|
|
if epa.TargetRef.UID != pod.ObjectMeta.UID {
|
|
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
|
|
}
|
|
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
|
|
t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
|
|
}
|
|
})
|
|
}
|
|
|
|
}
|
|
|
|
func TestPodToEndpointAddress(t *testing.T) {
|
|
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
|
|
ns := "test"
|
|
addPods(podStore, ns, 1, 1, 0, false)
|
|
pods := podStore.List()
|
|
if len(pods) != 1 {
|
|
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
|
|
return
|
|
}
|
|
pod := pods[0].(*v1.Pod)
|
|
epa := podToEndpointAddress(pod)
|
|
if epa.IP != pod.Status.PodIP {
|
|
t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
|
|
}
|
|
if *(epa.NodeName) != pod.Spec.NodeName {
|
|
t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
|
|
}
|
|
if epa.TargetRef.Kind != "Pod" {
|
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
|
|
}
|
|
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
|
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
|
|
}
|
|
if epa.TargetRef.Name != pod.ObjectMeta.Name {
|
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
|
|
}
|
|
if epa.TargetRef.UID != pod.ObjectMeta.UID {
|
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
|
|
}
|
|
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
|
|
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
|
|
}
|
|
}
|
|
|
|
func TestPodChanged(t *testing.T) {
|
|
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
|
|
ns := "test"
|
|
addPods(podStore, ns, 1, 1, 0, false)
|
|
pods := podStore.List()
|
|
if len(pods) != 1 {
|
|
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
|
|
return
|
|
}
|
|
oldPod := pods[0].(*v1.Pod)
|
|
newPod := oldPod.DeepCopy()
|
|
|
|
if podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be unchanged for copied pod")
|
|
}
|
|
|
|
newPod.Spec.NodeName = "changed"
|
|
if !podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be changed for pod with NodeName changed")
|
|
}
|
|
newPod.Spec.NodeName = oldPod.Spec.NodeName
|
|
|
|
newPod.ObjectMeta.ResourceVersion = "changed"
|
|
if podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
|
|
}
|
|
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
|
|
|
|
newPod.Status.PodIP = "1.2.3.1"
|
|
if !podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be changed with pod IP address change")
|
|
}
|
|
newPod.Status.PodIP = oldPod.Status.PodIP
|
|
|
|
/* dual stack tests */
|
|
// primary changes, because changing IPs is done by changing sandbox
|
|
// case 1: add new secondary IP
|
|
newPod.Status.PodIP = "1.1.3.1"
|
|
newPod.Status.PodIPs = []v1.PodIP{
|
|
{
|
|
IP: "1.1.3.1",
|
|
},
|
|
{
|
|
IP: "2000::1",
|
|
},
|
|
}
|
|
if !podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be changed with adding secondary IP")
|
|
}
|
|
// reset
|
|
newPod.Status.PodIPs = nil
|
|
newPod.Status.PodIP = oldPod.Status.PodIP
|
|
|
|
// case 2: removing a secondary IP
|
|
saved := oldPod.Status.PodIP
|
|
oldPod.Status.PodIP = "1.1.3.1"
|
|
oldPod.Status.PodIPs = []v1.PodIP{
|
|
{
|
|
IP: "1.1.3.1",
|
|
},
|
|
{
|
|
IP: "2000::1",
|
|
},
|
|
}
|
|
|
|
newPod.Status.PodIP = "1.2.3.4"
|
|
newPod.Status.PodIPs = []v1.PodIP{
|
|
{
|
|
IP: "1.2.3.4",
|
|
},
|
|
}
|
|
|
|
// reset
|
|
oldPod.Status.PodIPs = nil
|
|
newPod.Status.PodIPs = nil
|
|
oldPod.Status.PodIP = saved
|
|
newPod.Status.PodIP = saved
|
|
// case 3: change secondary
|
|
// case 2: removing a secondary IP
|
|
saved = oldPod.Status.PodIP
|
|
oldPod.Status.PodIP = "1.1.3.1"
|
|
oldPod.Status.PodIPs = []v1.PodIP{
|
|
{
|
|
IP: "1.1.3.1",
|
|
},
|
|
{
|
|
IP: "2000::1",
|
|
},
|
|
}
|
|
|
|
newPod.Status.PodIP = "1.2.3.4"
|
|
newPod.Status.PodIPs = []v1.PodIP{
|
|
{
|
|
IP: "1.2.3.4",
|
|
},
|
|
{
|
|
IP: "2000::2",
|
|
},
|
|
}
|
|
|
|
// reset
|
|
oldPod.Status.PodIPs = nil
|
|
newPod.Status.PodIPs = nil
|
|
oldPod.Status.PodIP = saved
|
|
newPod.Status.PodIP = saved
|
|
|
|
/* end dual stack testing */
|
|
|
|
newPod.ObjectMeta.Name = "wrong-name"
|
|
if !podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be changed with pod name change")
|
|
}
|
|
newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
|
|
|
|
saveConditions := oldPod.Status.Conditions
|
|
oldPod.Status.Conditions = nil
|
|
if !podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be changed with pod readiness change")
|
|
}
|
|
oldPod.Status.Conditions = saveConditions
|
|
|
|
now := metav1.NewTime(time.Now().UTC())
|
|
newPod.ObjectMeta.DeletionTimestamp = &now
|
|
if !podChangedHelper(oldPod, newPod, endpointChanged) {
|
|
t.Errorf("Expected pod to be changed with DeletionTimestamp change")
|
|
}
|
|
newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
|
|
}
|
|
|
|
func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Annotations: map[string]string{
|
|
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
|
|
},
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Annotations: map[string]string{
|
|
v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Annotations: map[string]string{
|
|
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
|
|
},
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0*time.Second)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Annotations: map[string]string{
|
|
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
|
|
},
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
|
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
// Neither pod nor service has trigger time, this should cause annotation to be cleared.
|
|
addPods(endpoints.podStore, ns, 1, 1, 0, false)
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{},
|
|
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
Labels: map[string]string{
|
|
v1.IsHeadlessService: "",
|
|
}, // Annotation not set anymore.
|
|
},
|
|
Subsets: []v1.EndpointSubset{{
|
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
|
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
|
}},
|
|
})
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
|
|
}
|
|
|
|
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
|
|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
|
|
// TODO(mborsz): Migrate this test to mock clock when possible.
|
|
func TestPodUpdatesBatching(t *testing.T) {
|
|
type podUpdate struct {
|
|
delay time.Duration
|
|
podName string
|
|
podIP string
|
|
}
|
|
|
|
tests := []struct {
|
|
name string
|
|
batchPeriod time.Duration
|
|
podsCount int
|
|
updates []podUpdate
|
|
finalDelay time.Duration
|
|
wantRequestCount int
|
|
}{
|
|
{
|
|
name: "three updates with no batching",
|
|
batchPeriod: 0 * time.Second,
|
|
podsCount: 10,
|
|
updates: []podUpdate{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
podIP: "10.0.0.0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
podIP: "10.0.0.1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
podIP: "10.0.0.2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 3,
|
|
},
|
|
{
|
|
name: "three updates in one batch",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
updates: []podUpdate{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
podIP: "10.0.0.0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
podIP: "10.0.0.1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
podIP: "10.0.0.2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 1,
|
|
},
|
|
{
|
|
name: "three updates in two batches",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
updates: []podUpdate{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
podIP: "10.0.0.0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
podIP: "10.0.0.1",
|
|
},
|
|
{
|
|
delay: 1 * time.Second,
|
|
podName: "pod2",
|
|
podIP: "10.0.0.2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 2,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
ns := "other"
|
|
resourceVersion := 1
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, tc.batchPeriod)
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
endpoints.podsSynced = alwaysReady
|
|
endpoints.servicesSynced = alwaysReady
|
|
endpoints.endpointsSynced = alwaysReady
|
|
endpoints.workerLoopPeriod = 10 * time.Millisecond
|
|
|
|
go endpoints.Run(1, stopCh)
|
|
|
|
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
|
|
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
|
|
for _, update := range tc.updates {
|
|
time.Sleep(update.delay)
|
|
|
|
old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
|
|
if err != nil {
|
|
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
|
|
}
|
|
if !exists {
|
|
t.Fatalf("Pod %q doesn't exist", update.podName)
|
|
}
|
|
oldPod := old.(*v1.Pod)
|
|
newPod := oldPod.DeepCopy()
|
|
newPod.Status.PodIP = update.podIP
|
|
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
|
|
resourceVersion++
|
|
|
|
endpoints.podStore.Update(newPod)
|
|
endpoints.updatePod(oldPod, newPod)
|
|
}
|
|
|
|
time.Sleep(tc.finalDelay)
|
|
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
|
|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
|
|
// TODO(mborsz): Migrate this test to mock clock when possible.
|
|
func TestPodAddsBatching(t *testing.T) {
|
|
type podAdd struct {
|
|
delay time.Duration
|
|
}
|
|
|
|
tests := []struct {
|
|
name string
|
|
batchPeriod time.Duration
|
|
adds []podAdd
|
|
finalDelay time.Duration
|
|
wantRequestCount int
|
|
}{
|
|
{
|
|
name: "three adds with no batching",
|
|
batchPeriod: 0 * time.Second,
|
|
adds: []podAdd{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 3,
|
|
},
|
|
{
|
|
name: "three adds in one batch",
|
|
batchPeriod: 1 * time.Second,
|
|
adds: []podAdd{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 1,
|
|
},
|
|
{
|
|
name: "three adds in two batches",
|
|
batchPeriod: 1 * time.Second,
|
|
adds: []podAdd{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
},
|
|
{
|
|
delay: 1 * time.Second,
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 2,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, tc.batchPeriod)
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
endpoints.podsSynced = alwaysReady
|
|
endpoints.servicesSynced = alwaysReady
|
|
endpoints.endpointsSynced = alwaysReady
|
|
endpoints.workerLoopPeriod = 10 * time.Millisecond
|
|
|
|
go endpoints.Run(1, stopCh)
|
|
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
|
|
for i, add := range tc.adds {
|
|
time.Sleep(add.delay)
|
|
|
|
p := testPod(ns, i, 1, true, false)
|
|
endpoints.podStore.Add(p)
|
|
endpoints.addPod(p)
|
|
}
|
|
|
|
time.Sleep(tc.finalDelay)
|
|
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
|
|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
|
|
// TODO(mborsz): Migrate this test to mock clock when possible.
|
|
func TestPodDeleteBatching(t *testing.T) {
|
|
type podDelete struct {
|
|
delay time.Duration
|
|
podName string
|
|
}
|
|
|
|
tests := []struct {
|
|
name string
|
|
batchPeriod time.Duration
|
|
podsCount int
|
|
deletes []podDelete
|
|
finalDelay time.Duration
|
|
wantRequestCount int
|
|
}{
|
|
{
|
|
name: "three deletes with no batching",
|
|
batchPeriod: 0 * time.Second,
|
|
podsCount: 10,
|
|
deletes: []podDelete{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 3,
|
|
},
|
|
{
|
|
name: "three deletes in one batch",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
deletes: []podDelete{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 1,
|
|
},
|
|
{
|
|
name: "three deletes in two batches",
|
|
batchPeriod: 1 * time.Second,
|
|
podsCount: 10,
|
|
deletes: []podDelete{
|
|
{
|
|
// endpoints.Run needs ~100 ms to start processing updates.
|
|
delay: 200 * time.Millisecond,
|
|
podName: "pod0",
|
|
},
|
|
{
|
|
delay: 100 * time.Millisecond,
|
|
podName: "pod1",
|
|
},
|
|
{
|
|
delay: 1 * time.Second,
|
|
podName: "pod2",
|
|
},
|
|
},
|
|
finalDelay: 3 * time.Second,
|
|
wantRequestCount: 2,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
ns := "other"
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, tc.batchPeriod)
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
endpoints.podsSynced = alwaysReady
|
|
endpoints.servicesSynced = alwaysReady
|
|
endpoints.endpointsSynced = alwaysReady
|
|
endpoints.workerLoopPeriod = 10 * time.Millisecond
|
|
|
|
go endpoints.Run(1, stopCh)
|
|
|
|
addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
|
|
|
|
endpoints.serviceStore.Add(&v1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: map[string]string{"foo": "bar"},
|
|
Ports: []v1.ServicePort{{Port: 80}},
|
|
},
|
|
})
|
|
|
|
for _, update := range tc.deletes {
|
|
time.Sleep(update.delay)
|
|
|
|
old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
|
|
if err != nil {
|
|
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
|
|
}
|
|
if !exists {
|
|
t.Fatalf("Pod %q doesn't exist", update.podName)
|
|
}
|
|
endpoints.podStore.Delete(old)
|
|
endpoints.deletePod(old)
|
|
}
|
|
|
|
time.Sleep(tc.finalDelay)
|
|
endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSyncEndpointsServiceNotFound(t *testing.T) {
|
|
ns := metav1.NamespaceDefault
|
|
testServer, endpointsHandler := makeTestServer(t, ns)
|
|
defer testServer.Close()
|
|
endpoints := newController(testServer.URL, 0)
|
|
endpoints.endpointsStore.Add(&v1.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "foo",
|
|
Namespace: ns,
|
|
ResourceVersion: "1",
|
|
},
|
|
})
|
|
endpoints.syncService(ns + "/foo")
|
|
endpointsHandler.ValidateRequestCount(t, 1)
|
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
|
|
}
|
|
|
|
func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
|
|
podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged)
|
|
return podChanged
|
|
}
|