diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 0df7cc5a7d4..b1cfcd32fd8 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -50,6 +50,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", + "//vendor:k8s.io/apimachinery/pkg/util/sets", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 284e5370d1a..d0cac007516 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -211,6 +211,7 @@ type Proxier struct { nodeIP net.IP portMapper portOpener recorder record.EventRecorder + healthChecker healthChecker } type localPort struct { @@ -242,6 +243,17 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return openLocalPort(lp) } +type healthChecker interface { + UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) +} + +// TODO: the healthcheck pkg should offer a type +type globalHealthChecker struct{} + +func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { + healthcheck.UpdateEndpoints(serviceName, endpointUIDs) +} + // Proxier implements ProxyProvider var _ proxy.ProxyProvider = &Proxier{} @@ -295,6 +307,7 @@ func NewProxier(ipt utiliptables.Interface, glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } + healthChecker := globalHealthChecker{} go healthcheck.Run() var throttle flowcontrol.RateLimiter @@ -321,6 +334,7 @@ func NewProxier(ipt utiliptables.Interface, nodeIP: nodeIP, portMapper: &listenPortOpener{}, recorder: recorder, + healthChecker: healthChecker, }, nil } @@ -588,7 +602,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { defer proxier.mu.Unlock() proxier.haveReceivedEndpointsUpdate = true - newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, updateHealthCheckEntries) + newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker) if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) { proxier.endpointsMap = newMap proxier.syncProxyRules() @@ -600,9 +614,8 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { } // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -// TODO: the hcUpdater should be a method on an interface, but it is a global pkg for now func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, - hcUpdater func(types.NamespacedName, []hostPortInfo)) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { + healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { // return values newMap = make(map[proxy.ServicePortName][]*endpointsInfo) @@ -637,7 +650,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN allSvcPorts[svcPort] = true } for svcPort := range allSvcPorts { - hcUpdater(svcPort.NamespacedName, svcPortToInfoMap[svcPort]) + updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker) } return newMap, stale @@ -699,7 +712,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, } // updateHealthCheckEntries - send the new set of local endpoints to the health checker -func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { +func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return } @@ -712,7 +725,7 @@ func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInf endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) } } - healthcheck.UpdateEndpoints(name, endpoints) + healthChecker.UpdateEndpoints(name, endpoints) } // used in OnEndpointsUpdate diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 1ebd9af31eb..48e87c10093 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" @@ -493,6 +494,10 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return nil, nil } +type fakeHealthChecker struct{} + +func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {} + func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. @@ -507,6 +512,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { hostname: "test-hostname", portsMap: make(map[localPort]closeable), portMapper: &fakePortOpener{[]*localPort{}}, + healthChecker: fakeHealthChecker{}, } } @@ -2050,7 +2056,7 @@ func Test_updateEndpoints(t *testing.T) { }} for tci, tc := range testCases { - newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", func(types.NamespacedName, []hostPortInfo) {}) + newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", fakeHealthChecker{}) if len(newMap) != len(tc.expectedResult) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) }