Make healthcheck an interface

This commit is contained in:
Tim Hockin 2017-02-03 18:45:52 -08:00
parent 6069d49d49
commit cddda17d42
3 changed files with 27 additions and 7 deletions

View File

@ -50,6 +50,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
], ],
) )

View File

@ -211,6 +211,7 @@ type Proxier struct {
nodeIP net.IP nodeIP net.IP
portMapper portOpener portMapper portOpener
recorder record.EventRecorder recorder record.EventRecorder
healthChecker healthChecker
} }
type localPort struct { type localPort struct {
@ -242,6 +243,17 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
return openLocalPort(lp) return openLocalPort(lp)
} }
type healthChecker interface {
UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String)
}
// TODO: the healthcheck pkg should offer a type
type globalHealthChecker struct{}
func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {
healthcheck.UpdateEndpoints(serviceName, endpointUIDs)
}
// Proxier implements ProxyProvider // Proxier implements ProxyProvider
var _ proxy.ProxyProvider = &Proxier{} var _ proxy.ProxyProvider = &Proxier{}
@ -295,6 +307,7 @@ func NewProxier(ipt utiliptables.Interface,
glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
} }
healthChecker := globalHealthChecker{}
go healthcheck.Run() go healthcheck.Run()
var throttle flowcontrol.RateLimiter var throttle flowcontrol.RateLimiter
@ -321,6 +334,7 @@ func NewProxier(ipt utiliptables.Interface,
nodeIP: nodeIP, nodeIP: nodeIP,
portMapper: &listenPortOpener{}, portMapper: &listenPortOpener{},
recorder: recorder, recorder: recorder,
healthChecker: healthChecker,
}, nil }, nil
} }
@ -588,7 +602,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true proxier.haveReceivedEndpointsUpdate = true
newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, updateHealthCheckEntries) newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker)
if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) { if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
proxier.endpointsMap = newMap proxier.endpointsMap = newMap
proxier.syncProxyRules() proxier.syncProxyRules()
@ -600,9 +614,8 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
} }
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
// TODO: the hcUpdater should be a method on an interface, but it is a global pkg for now
func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string,
hcUpdater func(types.NamespacedName, []hostPortInfo)) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) {
// return values // return values
newMap = make(map[proxy.ServicePortName][]*endpointsInfo) newMap = make(map[proxy.ServicePortName][]*endpointsInfo)
@ -637,7 +650,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
allSvcPorts[svcPort] = true allSvcPorts[svcPort] = true
} }
for svcPort := range allSvcPorts { for svcPort := range allSvcPorts {
hcUpdater(svcPort.NamespacedName, svcPortToInfoMap[svcPort]) updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker)
} }
return newMap, stale return newMap, stale
@ -699,7 +712,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
} }
// updateHealthCheckEntries - send the new set of local endpoints to the health checker // updateHealthCheckEntries - send the new set of local endpoints to the health checker
func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return return
} }
@ -712,7 +725,7 @@ func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInf
endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
} }
} }
healthcheck.UpdateEndpoints(name, endpoints) healthChecker.UpdateEndpoints(name, endpoints)
} }
// used in OnEndpointsUpdate // used in OnEndpointsUpdate

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
@ -493,6 +494,10 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
return nil, nil return nil, nil
} }
type fakeHealthChecker struct{}
func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {}
func NewFakeProxier(ipt utiliptables.Interface) *Proxier { func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine // TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method. // invocation into a Run() method.
@ -507,6 +512,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
hostname: "test-hostname", hostname: "test-hostname",
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}}, portMapper: &fakePortOpener{[]*localPort{}},
healthChecker: fakeHealthChecker{},
} }
} }
@ -2050,7 +2056,7 @@ func Test_updateEndpoints(t *testing.T) {
}} }}
for tci, tc := range testCases { for tci, tc := range testCases {
newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", func(types.NamespacedName, []hostPortInfo) {}) newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", fakeHealthChecker{})
if len(newMap) != len(tc.expectedResult) { if len(newMap) != len(tc.expectedResult) {
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap)
} }