diff --git a/pkg/proxy/healthcheck/BUILD b/pkg/proxy/healthcheck/BUILD index 5eea7679c26..c7503ac898f 100644 --- a/pkg/proxy/healthcheck/BUILD +++ b/pkg/proxy/healthcheck/BUILD @@ -11,20 +11,17 @@ load( go_library( name = "go_default_library", srcs = [ - "api.go", "doc.go", "healthcheck.go", - "http.go", - "listener.go", - "worker.go", ], tags = ["automanaged"], deps = [ "//vendor:github.com/golang/glog", + "//vendor:github.com/renstrom/dedent", "//vendor:k8s.io/apimachinery/pkg/types", - "//vendor:k8s.io/apimachinery/pkg/util/sets", - "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/pkg/api", + "//vendor:k8s.io/client-go/pkg/api/v1", + "//vendor:k8s.io/client-go/tools/record", ], ) @@ -34,6 +31,7 @@ go_test( library = ":go_default_library", tags = ["automanaged"], deps = [ + "//vendor:github.com/davecgh/go-spew/spew", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/sets", ], diff --git a/pkg/proxy/healthcheck/api.go b/pkg/proxy/healthcheck/api.go deleted file mode 100644 index 91aa3bd7255..00000000000 --- a/pkg/proxy/healthcheck/api.go +++ /dev/null @@ -1,65 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -import ( - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" -) - -// All public API Methods for this package - -// UpdateEndpoints Update the set of local endpoints for a service -func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) { - req := &proxyMutationRequest{ - serviceName: serviceName, - endpointUids: &endpointUids, - } - healthchecker.mutationRequestChannel <- req -} - -func updateServiceListener(serviceName types.NamespacedName, listenPort int, add bool) bool { - responseChannel := make(chan bool) - req := &proxyListenerRequest{ - serviceName: serviceName, - listenPort: uint16(listenPort), - add: add, - responseChannel: responseChannel, - } - healthchecker.listenerRequestChannel <- req - return <-responseChannel -} - -// AddServiceListener Request addition of a listener for a service's health check -func AddServiceListener(serviceName types.NamespacedName, listenPort int) bool { - return updateServiceListener(serviceName, listenPort, true) -} - -// DeleteServiceListener Request deletion of a listener for a service's health check -func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) bool { - return updateServiceListener(serviceName, listenPort, false) -} - -// Run Start the healthchecker main loop -func Run() { - healthchecker = proxyHealthCheckFactory() - // Wrap with a wait.Forever to handle panics. - go wait.Forever(func() { - healthchecker.handlerLoop() - }, 0) -} diff --git a/pkg/proxy/healthcheck/doc.go b/pkg/proxy/healthcheck/doc.go index 56ecc11e9a7..0a9ea0944bc 100644 --- a/pkg/proxy/healthcheck/doc.go +++ b/pkg/proxy/healthcheck/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies +// Package healthcheck provides tools for serving kube-proxy healthchecks. package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" diff --git a/pkg/proxy/healthcheck/healthcheck.go b/pkg/proxy/healthcheck/healthcheck.go index e9dfe86e75a..faaf001fadf 100644 --- a/pkg/proxy/healthcheck/healthcheck.go +++ b/pkg/proxy/healthcheck/healthcheck.go @@ -20,108 +20,210 @@ import ( "fmt" "net" "net/http" + "strings" + "sync" "github.com/golang/glog" + "github.com/renstrom/dedent" + "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/cache" + "k8s.io/client-go/pkg/api" + clientv1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/record" ) -// proxyMutationRequest: Message to request addition/deletion of endpoints for a service -type proxyMutationRequest struct { - serviceName types.NamespacedName - endpointUids *sets.String +// Server serves HTTP endpoints for each service name, with results +// based on the endpoints. If there are 0 endpoints for a service, it returns a +// 503 "Service Unavailable" error (telling LBs not to use this node). If there +// are 1 or more endpoints, it returns a 200 "OK". +type Server interface { + // Make the new set of services be active. Services that were open before + // will be closed. Services that are new will be opened. Service that + // existed and are in the new set will be left alone. The value of the map + // is the healthcheck-port to listen on. + SyncServices(newServices map[types.NamespacedName]uint16) error + // Make the new set of endpoints be active. Endpoints for services that do + // not exist will be dropped. The value of the map is the number of + // endpoints the service has on this node. + SyncEndpoints(newEndpoints map[types.NamespacedName]int) error } -// proxyListenerRequest: Message to request addition/deletion of a service responder on a listening port -type proxyListenerRequest struct { - serviceName types.NamespacedName - listenPort uint16 - add bool - responseChannel chan bool +// Listener allows for testing of Server. If the Listener argument +// to NewServer() is nil, the real net.Listen function will be used. +type Listener interface { + // Listen is very much like net.Listen, except the first arg (network) is + // fixed to be "tcp". + Listen(addr string) (net.Listener, error) } -// serviceEndpointsList: A list of endpoints for a service -type serviceEndpointsList struct { - serviceName types.NamespacedName - endpoints *sets.String +// HTTPServerFactory allows for testing of Server. If the +// HTTPServerFactory argument to NewServer() is nil, the real +// http.Server type will be used. +type HTTPServerFactory interface { + // New creates an instance of a type satisfying HTTPServer. This is + // designed to include http.Server. + New(addr string, handler http.Handler) HTTPServer } -// serviceResponder: Contains net/http datastructures necessary for responding to each Service's health check on its aux nodePort -type serviceResponder struct { - serviceName types.NamespacedName - listenPort uint16 - listener *net.Listener - server *http.Server +// HTTPServer allows for testing of Server. +type HTTPServer interface { + // Server is designed so that http.Server satifies this interface, + Serve(listener net.Listener) error } -// proxyHC: Handler structure for health check, endpoint add/delete and service listener add/delete requests -type proxyHC struct { - serviceEndpointsMap cache.ThreadSafeStore - serviceResponderMap map[types.NamespacedName]serviceResponder - mutationRequestChannel chan *proxyMutationRequest - listenerRequestChannel chan *proxyListenerRequest -} - -// handleHealthCheckRequest - received a health check request - lookup and respond to HC. -func (h *proxyHC) handleHealthCheckRequest(rw http.ResponseWriter, serviceName string) { - s, ok := h.serviceEndpointsMap.Get(serviceName) - if !ok { - glog.V(4).Infof("Service %s not found or has no local endpoints", serviceName) - sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "No Service Endpoints Found") - return +// NewServer allocates a new healthcheck server manager. If either +// of the injected arguments are nil, defaults will be used. +func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server { + if listener == nil { + listener = stdNetListener{} } - numEndpoints := len(*s.(*serviceEndpointsList).endpoints) - if numEndpoints > 0 { - sendHealthCheckResponse(rw, http.StatusOK, fmt.Sprintf("%d Service Endpoints found", numEndpoints)) - return + if httpServerFactory == nil { + httpServerFactory = stdHTTPServerFactory{} + } + return &server{ + hostname: hostname, + recorder: recorder, + listener: listener, + httpFactory: httpServerFactory, + services: map[types.NamespacedName]*hcInstance{}, } - sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "0 local Endpoints are alive") } -// handleMutationRequest - receive requests to mutate the table entry for a service -func (h *proxyHC) handleMutationRequest(req *proxyMutationRequest) { - numEndpoints := len(*req.endpointUids) - glog.V(4).Infof("LB service health check mutation request Service: %s - %d Endpoints %v", - req.serviceName, numEndpoints, (*req.endpointUids).List()) - if numEndpoints == 0 { - if _, ok := h.serviceEndpointsMap.Get(req.serviceName.String()); ok { - glog.V(4).Infof("Deleting endpoints map for service %s, all local endpoints gone", req.serviceName.String()) - h.serviceEndpointsMap.Delete(req.serviceName.String()) - } - return +// Implement Listener in terms of net.Listen. +type stdNetListener struct{} + +func (stdNetListener) Listen(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) +} + +var _ Listener = stdNetListener{} + +// Implement HTTPServerFactory in terms of http.Server. +type stdHTTPServerFactory struct{} + +func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { + return &http.Server{ + Addr: addr, + Handler: handler, } - var entry *serviceEndpointsList - e, exists := h.serviceEndpointsMap.Get(req.serviceName.String()) - if exists { - entry = e.(*serviceEndpointsList) - if entry.endpoints.Equal(*req.endpointUids) { - return - } - // Compute differences just for printing logs about additions and removals - deletedEndpoints := entry.endpoints.Difference(*req.endpointUids) - newEndpoints := req.endpointUids.Difference(*entry.endpoints) - for _, e := range newEndpoints.List() { - glog.V(4).Infof("Adding local endpoint %s to LB health check for service %s", - e, req.serviceName.String()) - } - for _, d := range deletedEndpoints.List() { - glog.V(4).Infof("Deleted endpoint %s from service %s LB health check (%d endpoints left)", - d, req.serviceName.String(), len(*entry.endpoints)) +} + +var _ HTTPServerFactory = stdHTTPServerFactory{} + +type server struct { + hostname string + recorder record.EventRecorder // can be nil + listener Listener + httpFactory HTTPServerFactory + + lock sync.Mutex + services map[types.NamespacedName]*hcInstance +} + +func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + // Remove any that are not needed any more. + for nsn, svc := range hcs.services { + if port, found := newServices[nsn]; !found || port != svc.port { + glog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port) + if err := svc.listener.Close(); err != nil { + glog.Errorf("Close(%v): %v", svc.listener.Addr(), err) + } + delete(hcs.services, nsn) } } - entry = &serviceEndpointsList{serviceName: req.serviceName, endpoints: req.endpointUids} - h.serviceEndpointsMap.Add(req.serviceName.String(), entry) + + // Add any that are needed. + for nsn, port := range newServices { + if hcs.services[nsn] != nil { + glog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port) + continue + } + + glog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port) + svc := &hcInstance{port: port} + addr := fmt.Sprintf(":%d", port) + svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs}) + var err error + svc.listener, err = hcs.listener.Listen(addr) + if err != nil { + msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err) + + if hcs.recorder != nil { + hcs.recorder.Eventf( + &clientv1.ObjectReference{ + Kind: "Service", + Namespace: nsn.Namespace, + Name: nsn.Name, + UID: types.UID(nsn.String()), + }, api.EventTypeWarning, "FailedToStartHealthcheck", msg) + } + glog.Error(msg) + continue + } + hcs.services[nsn] = svc + + go func(nsn types.NamespacedName, svc *hcInstance) { + // Serve() will exit when the listener is closed. + glog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port) + if err := svc.server.Serve(svc.listener); err != nil { + glog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err) + return + } + glog.V(3).Infof("Healthcheck %q closed", nsn.String()) + }(nsn, svc) + } + return nil } -// proxyHealthCheckRequest - Factory method to instantiate the health check handler -func proxyHealthCheckFactory() *proxyHC { - glog.V(2).Infof("Initializing kube-proxy health checker") - phc := &proxyHC{ - serviceEndpointsMap: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), - serviceResponderMap: make(map[types.NamespacedName]serviceResponder), - mutationRequestChannel: make(chan *proxyMutationRequest, 1024), - listenerRequestChannel: make(chan *proxyListenerRequest, 1024), - } - return phc +type hcInstance struct { + port uint16 + listener net.Listener + server HTTPServer + endpoints int // number of local endpoints for a service +} + +type hcHandler struct { + name types.NamespacedName + hcs *server +} + +var _ http.Handler = hcHandler{} + +func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + h.hcs.lock.Lock() + count := h.hcs.services[h.name].endpoints + h.hcs.lock.Unlock() + resp.Header().Set("Content-Type", "application/json") + if count == 0 { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } + fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(` + { + "service": { + "namespace": %q, + "name": %q + }, + "localEndpoints": %d + } + `, h.name.Namespace, h.name.Name, count)), "\n")) +} + +func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + for nsn, count := range newEndpoints { + if hcs.services[nsn] == nil { + glog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String()) + continue + } + glog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String()) + hcs.services[nsn].endpoints = count + } + return nil } diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index 72615589799..208ffb18adf 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -17,142 +17,310 @@ limitations under the License. package healthcheck import ( - "fmt" - "io/ioutil" - "math/rand" + "encoding/json" + "net" "net/http" + "net/http/httptest" "testing" - "time" + + "github.com/davecgh/go-spew/spew" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" ) -type TestCaseData struct { - nodePorts int - numEndpoints int - nodePortList []int - svcNames []types.NamespacedName +type fakeListener struct { + openPorts sets.String } -const ( - startPort = 20000 - endPort = 40000 -) - -var ( - choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") -) - -func generateRandomString(n int) string { - - b := make([]byte, n) - l := len(choices) - for i := range b { - b[i] = choices[rand.Intn(l)] - } - return string(b) -} - -func chooseServiceName(tc int, hint int) types.NamespacedName { - var svc types.NamespacedName - svc.Namespace = fmt.Sprintf("ns_%d", tc) - svc.Name = fmt.Sprintf("name_%d", hint) - return svc -} - -func generateEndpointSet(max int) sets.String { - s := sets.NewString() - for i := 0; i < max; i++ { - s.Insert(fmt.Sprintf("%d%s", i, generateRandomString(8))) - } - return s -} - -func verifyHealthChecks(tc *TestCaseData, t *testing.T) bool { - var success = true - time.Sleep(100 * time.Millisecond) - for i := 0; i < tc.nodePorts; i++ { - t.Logf("Validating HealthCheck works for svc %s nodePort %d\n", tc.svcNames[i], tc.nodePortList[i]) - res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", tc.nodePortList[i])) - if err != nil { - t.Logf("ERROR: Failed to connect to listening port") - success = false - continue - } - robots, err := ioutil.ReadAll(res.Body) - if res.StatusCode == http.StatusServiceUnavailable { - t.Logf("ERROR: HealthCheck returned %s: %s", res.Status, string(robots)) - success = false - continue - } - res.Body.Close() - if err != nil { - t.Logf("Error: reading body of response (%s)", err) - success = false - continue - } - } - if success { - t.Logf("Success: All nodePorts found active") - } - return success -} - -func TestHealthChecker(t *testing.T) { - testcases := []TestCaseData{ - { - nodePorts: 1, - numEndpoints: 2, - }, - { - nodePorts: 10, - numEndpoints: 6, - }, - { - nodePorts: 100, - numEndpoints: 1, - }, - } - - Run() - - ports := startPort - for n, tc := range testcases { - tc.nodePortList = make([]int, tc.nodePorts) - tc.svcNames = make([]types.NamespacedName, tc.nodePorts) - for i := 0; i < tc.nodePorts; i++ { - tc.svcNames[i] = chooseServiceName(n, i) - t.Logf("Updating endpoints map for %s %d", tc.svcNames[i], tc.numEndpoints) - for { - UpdateEndpoints(tc.svcNames[i], generateEndpointSet(tc.numEndpoints)) - tc.nodePortList[i] = ports - ports++ - if AddServiceListener(tc.svcNames[i], tc.nodePortList[i]) { - break - } - DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) - // Keep searching for a port that works - t.Logf("Failed to bind/listen on port %d...trying next port", ports-1) - if ports > endPort { - t.Errorf("Exhausted range of ports available for tests") - return - } - } - } - t.Logf("Validating if all nodePorts for tc %d work", n) - if !verifyHealthChecks(&tc, t) { - t.Errorf("Healthcheck validation failed") - } - - for i := 0; i < tc.nodePorts; i++ { - DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) - UpdateEndpoints(tc.svcNames[i], sets.NewString()) - } - - // Ensure that all listeners have been shutdown - if verifyHealthChecks(&tc, t) { - t.Errorf("Healthcheck validation failed") - } +func newFakeListener() *fakeListener { + return &fakeListener{ + openPorts: sets.String{}, + } +} + +func (fake *fakeListener) hasPort(addr string) bool { + return fake.openPorts.Has(addr) +} + +func (fake *fakeListener) Listen(addr string) (net.Listener, error) { + fake.openPorts.Insert(addr) + return &fakeNetListener{ + parent: fake, + addr: addr, + }, nil +} + +type fakeNetListener struct { + parent *fakeListener + addr string +} + +func (fake *fakeNetListener) Accept() (net.Conn, error) { + // Not implemented + return nil, nil +} + +func (fake *fakeNetListener) Close() error { + fake.parent.openPorts.Delete(fake.addr) + return nil +} + +func (fake *fakeNetListener) Addr() net.Addr { + // Not implemented + return nil +} + +type fakeHTTPServerFactory struct{} + +func newFakeHTTPServerFactory() *fakeHTTPServerFactory { + return &fakeHTTPServerFactory{} +} + +func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { + return &fakeHTTPServer{ + addr: addr, + handler: handler, + } +} + +type fakeHTTPServer struct { + addr string + handler http.Handler +} + +func (fake *fakeHTTPServer) Serve(listener net.Listener) error { + return nil // Cause the goroutine to return +} + +func mknsn(ns, name string) types.NamespacedName { + return types.NamespacedName{ + Namespace: ns, + Name: name, + } +} + +type hcPayload struct { + Service struct { + Namespace string + Name string + } + LocalEndpoints int +} + +func TestServer(t *testing.T) { + listener := newFakeListener() + httpFactory := newFakeHTTPServerFactory() + + hcsi := NewServer("hostname", nil, listener, httpFactory) + hcs := hcsi.(*server) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync nothing + hcs.SyncServices(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + hcs.SyncEndpoints(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync unknown endpoints, should be dropped + hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93}) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync a real service + nsn := mknsn("a", "b") + hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints) + } + if len(listener.openPorts) != 1 { + t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts)) + } + if !listener.hasPort(":9376") { + t.Errorf("expected port :9376 to be open\n%s", spew.Sdump(listener.openPorts)) + } + // test the handler + testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t) + + // sync an endpoint + hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 18 { + t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints) + } + // test the handler + testHandler(hcs, nsn, http.StatusOK, 18, t) + + // sync zero endpoints + hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 0}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints) + } + // test the handler + testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t) + + // sync nil endpoints + hcs.SyncEndpoints(nil) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints) + } + // test the handler + testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t) + + // put the endpoint back + hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 18 { + t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints) + } + // delete the service + hcs.SyncServices(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync multiple services + nsn1 := mknsn("a", "b") + nsn2 := mknsn("c", "d") + nsn3 := mknsn("e", "f") + nsn4 := mknsn("g", "h") + hcs.SyncServices(map[types.NamespacedName]uint16{ + nsn1: 9376, + nsn2: 12909, + nsn3: 11113, + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 service, got %d", len(hcs.services)) + } + if hcs.services[nsn1].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn1].endpoints) + } + if hcs.services[nsn2].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints) + } + if len(listener.openPorts) != 3 { + t.Errorf("expected 3 open ports, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts)) + } + // test the handlers + testHandler(hcs, nsn1, http.StatusServiceUnavailable, 0, t) + testHandler(hcs, nsn2, http.StatusServiceUnavailable, 0, t) + + // sync endpoints + hcs.SyncEndpoints(map[types.NamespacedName]int{ + nsn1: 9, + nsn2: 3, + nsn3: 7, + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 services, got %d", len(hcs.services)) + } + if hcs.services[nsn1].endpoints != 9 { + t.Errorf("expected 9 endpoints, got %d", hcs.services[nsn1].endpoints) + } + if hcs.services[nsn2].endpoints != 3 { + t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 7 { + t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints) + } + // test the handlers + testHandler(hcs, nsn1, http.StatusOK, 9, t) + testHandler(hcs, nsn2, http.StatusOK, 3, t) + testHandler(hcs, nsn3, http.StatusOK, 7, t) + + // sync new services + hcs.SyncServices(map[types.NamespacedName]uint16{ + //nsn1: 9376, // remove it + nsn2: 12909, // leave it + nsn3: 11114, // change it + nsn4: 11878, // add it + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 service, got %d", len(hcs.services)) + } + if hcs.services[nsn2].endpoints != 3 { + t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints) + } + if hcs.services[nsn4].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn4].endpoints) + } + // test the handlers + testHandler(hcs, nsn2, http.StatusOK, 3, t) + testHandler(hcs, nsn3, http.StatusServiceUnavailable, 0, t) + testHandler(hcs, nsn4, http.StatusServiceUnavailable, 0, t) + + // sync endpoints + hcs.SyncEndpoints(map[types.NamespacedName]int{ + nsn1: 9, + nsn2: 3, + nsn3: 7, + nsn4: 6, + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 services, got %d", len(hcs.services)) + } + if hcs.services[nsn2].endpoints != 3 { + t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 7 { + t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints) + } + if hcs.services[nsn4].endpoints != 6 { + t.Errorf("expected 6 endpoints, got %d", hcs.services[nsn4].endpoints) + } + // test the handlers + testHandler(hcs, nsn2, http.StatusOK, 3, t) + testHandler(hcs, nsn3, http.StatusOK, 7, t) + testHandler(hcs, nsn4, http.StatusOK, 6, t) +} + +func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) { + handler := hcs.services[nsn].server.(*fakeHTTPServer).handler + req, err := http.NewRequest("GET", "/healthz", nil) + if err != nil { + t.Fatal(err) + } + resp := httptest.NewRecorder() + + handler.ServeHTTP(resp, req) + + if resp.Code != status { + t.Errorf("expected status code %v, got %v", status, resp.Code) + } + var payload hcPayload + if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { + t.Fatal(err) + } + if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace { + t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service) + } + if payload.LocalEndpoints != endpoints { + t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) } } diff --git a/pkg/proxy/healthcheck/http.go b/pkg/proxy/healthcheck/http.go deleted file mode 100644 index dd3dcf3a750..00000000000 --- a/pkg/proxy/healthcheck/http.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -import ( - "fmt" - "net/http" - - "github.com/golang/glog" -) - -// A healthCheckHandler serves http requests on /healthz on the service health check node port, -// and responds to every request with either: -// 200 OK and the count of endpoints for the given service that are local to this node. -// or -// 503 Service Unavailable If the count is zero or the service does not exist -type healthCheckHandler struct { - svcNsName string -} - -// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object -func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error string) { - rw.Header().Set("Content-Type", "text/plain") - rw.WriteHeader(statusCode) - fmt.Fprint(rw, error) -} - -// ServeHTTP: Interface callback method for net.Listener Handlers -func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) { - glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcNsName) - healthchecker.handleHealthCheckRequest(response, h.svcNsName) -} diff --git a/pkg/proxy/healthcheck/listener.go b/pkg/proxy/healthcheck/listener.go deleted file mode 100644 index d61e741cc7e..00000000000 --- a/pkg/proxy/healthcheck/listener.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -// Create/Delete dynamic listeners on the required nodePorts - -import ( - "fmt" - "net" - "net/http" - - "github.com/golang/glog" -) - -// handleServiceListenerRequest: receive requests to add/remove service health check listening ports -func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool { - sr, serviceFound := h.serviceResponderMap[req.serviceName] - if !req.add { - if !serviceFound { - return false - } - glog.Infof("Deleting HealthCheckListenPort for service %s port %d", - req.serviceName, req.listenPort) - delete(h.serviceResponderMap, req.serviceName) - (*sr.listener).Close() - return true - } else if serviceFound { - if req.listenPort == sr.listenPort { - // Addition requested but responder for service already exists and port is unchanged - return true - } - // Addition requested but responder for service already exists but the listen port has changed - glog.Infof("HealthCheckListenPort for service %s changed from %d to %d - closing old listening port", - req.serviceName, sr.listenPort, req.listenPort) - delete(h.serviceResponderMap, req.serviceName) - (*sr.listener).Close() - } - // Create a service responder object and start listening and serving on the provided port - glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort) - server := http.Server{ - Addr: fmt.Sprintf(":%d", req.listenPort), - Handler: healthCheckHandler{svcNsName: req.serviceName.String()}, - } - listener, err := net.Listen("tcp", server.Addr) - if err != nil { - glog.Warningf("FAILED to listen on address %s (%s)\n", server.Addr, err) - return false - } - h.serviceResponderMap[req.serviceName] = serviceResponder{serviceName: req.serviceName, - listenPort: req.listenPort, - listener: &listener, - server: &server} - go func() { - // Anonymous goroutine to block on Serve for this listen port - Serve will exit when the listener is closed - glog.V(3).Infof("Goroutine blocking on serving health checks for %s on port %d", req.serviceName, req.listenPort) - if err := server.Serve(listener); err != nil { - glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed with error %s\n", req.listenPort, req.serviceName, err) - return - } - glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed\n", req.listenPort, req.serviceName) - }() - return true -} diff --git a/pkg/proxy/healthcheck/worker.go b/pkg/proxy/healthcheck/worker.go deleted file mode 100644 index 1c1d60a09e7..00000000000 --- a/pkg/proxy/healthcheck/worker.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies -package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" - -import ( - "time" - - "github.com/golang/glog" -) - -var healthchecker *proxyHC - -// handlerLoop Serializes all requests to prevent concurrent access to the maps -func (h *proxyHC) handlerLoop() { - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - for { - select { - case req := <-h.mutationRequestChannel: - h.handleMutationRequest(req) - case req := <-h.listenerRequestChannel: - req.responseChannel <- h.handleServiceListenerRequest(req) - case <-ticker.C: - go h.sync() - } - } -} - -func (h *proxyHC) sync() { - glog.V(4).Infof("%d Health Check Listeners", len(h.serviceResponderMap)) - glog.V(4).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List())) - for _, svc := range h.serviceEndpointsMap.ListKeys() { - if e, ok := h.serviceEndpointsMap.Get(svc); ok { - endpointList := e.(*serviceEndpointsList) - glog.V(4).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len()) - } - } -} diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 4b64995719f..e31f28b86be 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -50,7 +50,6 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", - "//vendor:k8s.io/apimachinery/pkg/util/sets", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 310f3acec42..10bae3294af 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -219,7 +219,7 @@ type Proxier struct { nodeIP net.IP portMapper portOpener recorder record.EventRecorder - healthChecker healthChecker + healthChecker healthcheck.Server } type localPort struct { @@ -251,17 +251,6 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return openLocalPort(lp) } -type healthChecker interface { - UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) -} - -// TODO: the healthcheck pkg should offer a type -type globalHealthChecker struct{} - -func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { - healthcheck.UpdateEndpoints(serviceName, endpointUIDs) -} - // Proxier implements ProxyProvider var _ proxy.ProxyProvider = &Proxier{} @@ -315,8 +304,7 @@ func NewProxier(ipt utiliptables.Interface, glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } - healthChecker := globalHealthChecker{} - go healthcheck.Run() + healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps var throttle flowcontrol.RateLimiter // Defaulting back to not limit sync rate when minSyncPeriod is 0. @@ -451,18 +439,12 @@ func (proxier *Proxier) SyncLoop() { } } -type healthCheckPort struct { - namespace types.NamespacedName - nodeport int -} - // Accepts a list of Services and the existing service map. Returns the new -// service map, a list of healthcheck ports to add to or remove from the health -// checking listener service, and a set of stale UDP services. -func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { +// service map, a map of healthcheck ports, and a set of stale UDP +// services. +func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { newServiceMap := make(proxyServiceMap) - healthCheckAdd := make([]healthCheckPort, 0) - healthCheckDel := make([]healthCheckPort, 0) + hcPorts := make(map[types.NamespacedName]uint16) for _, service := range allServices { svcName := types.NamespacedName{ @@ -498,12 +480,8 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) } - if !exists || !equal { - if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { - healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort}) - } else { - healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0}) - } + if info.onlyNodeLocalEndpoints { + hcPorts[svcName] = uint16(info.healthCheckNodePort) } newServiceMap[serviceName] = info @@ -511,6 +489,13 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa } } + for nsn, port := range hcPorts { + if port == 0 { + glog.Errorf("Service %q has no healthcheck nodeport", nsn) + delete(hcPorts, nsn) + } + } + staleUDPServices := sets.NewString() // Remove serviceports missing from the update. for name, info := range oldServiceMap { @@ -519,13 +504,10 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa if info.protocol == api.ProtocolUDP { staleUDPServices.Insert(info.clusterIP.String()) } - if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { - healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort}) - } } } - return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices + return newServiceMap, hcPorts, staleUDPServices } // OnServiceUpdate tracks the active set of service proxies. @@ -538,19 +520,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { } proxier.allServices = allServices - newServiceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap) - for _, hc := range hcAdd { - glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport) - // Turn on healthcheck responder to listen on the health check nodePort - // FIXME: handle failures from adding the service - healthcheck.AddServiceListener(hc.namespace, hc.nodeport) - } - for _, hc := range hcDel { - // Remove ServiceListener health check nodePorts from the health checker - // TODO - Stats - glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport) - // FIXME: handle failures from deleting the service - healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport) + newServiceMap, hcPorts, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap) + + // update healthcheck ports + if err := proxier.healthChecker.SyncServices(hcPorts); err != nil { + glog.Errorf("Error syncing healtcheck ports: %v", err) } if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) { @@ -573,7 +547,13 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { proxier.allEndpoints = allEndpoints // TODO: once service has made this same transform, move this into proxier.syncProxyRules() - newMap, staleConnections := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker) + newMap, hcEndpoints, staleConnections := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname) + + // update healthcheck endpoints + if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil { + glog.Errorf("Error syncing healthcheck endoints: %v", err) + } + if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) { proxier.endpointsMap = newMap proxier.syncProxyRules(syncReasonEndpoints) @@ -585,11 +565,11 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { } // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string, - healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) { +func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { // return values newMap = make(proxyEndpointMap) + hcEndpoints = make(map[types.NamespacedName]int) staleSet = make(map[endpointServicePair]bool) // Update endpoints for services. @@ -615,19 +595,30 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap } } - // Update service health check - allSvcPorts := make(map[proxy.ServicePortName]bool) - for svcPort := range curMap { - allSvcPorts[svcPort] = true - } - for svcPort := range newMap { - allSvcPorts[svcPort] = true - } - for svcPort := range allSvcPorts { - updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker) + if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { + return } - return newMap, staleSet + // accumulate local IPs per service, ignoring ports + localIPs := map[types.NamespacedName]sets.String{} + for svcPort := range newMap { + for _, ep := range newMap[svcPort] { + if ep.isLocal { + nsn := svcPort.NamespacedName + if localIPs[nsn] == nil { + localIPs[nsn] = sets.NewString() + } + ip := strings.Split(ep.endpoint, ":")[0] // just the IP part + localIPs[nsn].Insert(ip) + } + } + } + // produce a count per service + for nsn, ips := range localIPs { + hcEndpoints[nsn] = len(ips) + } + + return newMap, hcEndpoints, staleSet } // Gather information about all the endpoint state for a given api.Endpoints. @@ -675,23 +666,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, } } -// updateHealthCheckEntries - send the new set of local endpoints to the health checker -func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) { - if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { - return - } - - // Use a set instead of a slice to provide deduplication - epSet := sets.NewString() - for _, portInfo := range endpoints { - if portInfo.isLocal { - // kube-proxy health check only needs local endpoints - epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) - } - } - healthChecker.UpdateEndpoints(name, epSet) -} - // portProtoHash takes the ServicePortName and protocol for a service // returns the associated 16 character hash. This is computed by hashing (sha256) // then encoding to base32 and truncating to 16 chars. We do this because IPTables diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 016c7dba445..8a637519616 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -17,6 +17,7 @@ limitations under the License. package iptables import ( + "reflect" "strconv" "testing" @@ -29,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" @@ -355,9 +355,27 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return nil, nil } -type fakeHealthChecker struct{} +type fakeHealthChecker struct { + services map[types.NamespacedName]uint16 + endpoints map[types.NamespacedName]int +} -func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {} +func newFakeHealthChecker() *fakeHealthChecker { + return &fakeHealthChecker{ + services: map[types.NamespacedName]uint16{}, + endpoints: map[types.NamespacedName]int{}, + } +} + +func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error { + fake.services = newServices + return nil +} + +func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { + fake.endpoints = newEndpoints + return nil +} const testHostname = "test-hostname" @@ -374,7 +392,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { hostname: testHostname, portsMap: make(map[localPort]closeable), portMapper: &fakePortOpener{[]*localPort{}}, - healthChecker: fakeHealthChecker{}, + healthChecker: newFakeHealthChecker(), } } @@ -926,30 +944,18 @@ func TestBuildServiceMapAddRemove(t *testing.T) { }), } - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 8 { t.Errorf("expected service map length 8, got %v", serviceMap) } // The only-local-loadbalancer ones get added - if len(hcAdd) != 2 { - t.Errorf("expected healthcheck add length 2, got %v", hcAdd) + if len(hcPorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", hcPorts) } else { - for _, hc := range hcAdd { - if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" { - t.Errorf("unexpected healthcheck listener added: %v", hc) - } - } - } - - // All the rest get deleted - if len(hcDel) != 6 { - t.Errorf("expected healthcheck del length 6, got %v", hcDel) - } else { - for _, hc := range hcDel { - if hc.namespace.Namespace == "somewhere" && hc.namespace.Name == "only-local-load-balancer" { - t.Errorf("unexpected healthcheck listener deleted: %v", hc) - } + nsn := makeNSN("somewhere", "only-local-load-balancer") + if port, found := hcPorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, hcPorts) } } @@ -961,27 +967,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) { // Remove some stuff services = []*api.Service{services[0]} services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(services, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap) if len(serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 1, got %v", hcAdd) - } - - // The only OnlyLocal annotation was removed above, so we expect a delete now. - // FIXME: Since the BetaAnnotationHealthCheckNodePort is the same for all - // ServicePorts, we'll get one delete per ServicePort, even though they all - // contain the same information - if len(hcDel) != 2 { - t.Errorf("expected healthcheck del length 2, got %v", hcDel) - } else { - for _, hc := range hcDel { - if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" { - t.Errorf("unexpected healthcheck listener deleted: %v", hc) - } - } + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) } // All services but one were deleted. While you'd expect only the ClusterIPs @@ -1008,17 +1000,14 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { } // Headless service should be ignored - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(serviceMap)) } // No proxied services, so no healthchecks - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %d", len(hcAdd)) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck del length 0, got %d", len(hcDel)) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(hcPorts)) } if len(staleUDPServices) != 0 { @@ -1036,16 +1025,13 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), } - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", serviceMap) } // No proxied services, so no healthchecks - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck del length 0, got %v", hcDel) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) } if len(staleUDPServices) != 0 { t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices) @@ -1081,15 +1067,12 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { }), } - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 2 { - t.Errorf("expected healthcheck del length 2, got %v", hcDel) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) } if len(staleUDPServices) != 0 { // Services only added, so nothing stale yet @@ -1097,15 +1080,12 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } // Change service to load-balancer - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(second, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 2 { - t.Errorf("expected healthcheck add length 2, got %v", hcAdd) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck add length 2, got %v", hcDel) + if len(hcPorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) } if len(staleUDPServices) != 0 { t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) @@ -1113,30 +1093,24 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(second, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck add length 2, got %v", hcDel) + if len(hcPorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) } if len(staleUDPServices) != 0 { t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) } // And back to ClusterIP - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(first, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(first, serviceMap) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 2 { - t.Errorf("expected healthcheck del length 2, got %v", hcDel) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) } if len(staleUDPServices) != 0 { // Services only added, so nothing stale yet @@ -1386,28 +1360,33 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap return ept } +func makeNSN(namespace, name string) types.NamespacedName { + return types.NamespacedName{Namespace: namespace, Name: name} +} + func makeServicePortName(ns, name, port string) proxy.ServicePortName { return proxy.ServicePortName{ - NamespacedName: types.NamespacedName{ - Namespace: ns, - Name: name, - }, - Port: port, + NamespacedName: makeNSN(ns, name), + Port: port, } } func Test_buildNewEndpointsMap(t *testing.T) { + var nodeName = "host" + testCases := []struct { - newEndpoints []*api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo - expectedStale []endpointServicePair + newEndpoints []*api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedResult map[proxy.ServicePortName][]*endpointsInfo + expectedStale []endpointServicePair + expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - newEndpoints: []*api.Endpoints{}, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{}, + newEndpoints: []*api.Endpoints{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[1]: no change, unnamed port newEndpoints: []*api.Endpoints{ @@ -1432,14 +1411,16 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { - // Case[2]: no change, named port + // Case[2]: no change, named port, local newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", + IP: "1.1.1.1", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1450,15 +1431,18 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, + }, }, { // Case[3]: no change, multiple subsets newEndpoints: []*api.Endpoints{ @@ -1498,14 +1482,16 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.2:12", false}, }, }, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { - // Case[4]: no change, multiple subsets, multiple ports + // Case[4]: no change, multiple subsets, multiple ports, local newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", + IP: "1.1.1.1", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1527,10 +1513,10 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, + {"1.1.1.1:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, @@ -1538,16 +1524,19 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, + {"1.1.1.1:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, + }, }, { // Case[5]: no change, multiple endpoints, subsets, IPs, and ports newEndpoints: []*api.Endpoints{ @@ -1556,7 +1545,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "1.1.1.1", }, { - IP: "1.1.1.2", + IP: "1.1.1.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1569,7 +1559,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "1.1.1.3", }, { - IP: "1.1.1.4", + IP: "1.1.1.4", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p13", @@ -1585,7 +1576,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "2.2.2.1", }, { - IP: "2.2.2.2", + IP: "2.2.2.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p21", @@ -1600,63 +1592,68 @@ func Test_buildNewEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, - {"1.1.1.2:11", false}, + {"1.1.1.2:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { {"1.1.1.1:12", false}, - {"1.1.1.2:12", false}, + {"1.1.1.2:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, - {"1.1.1.4:13", false}, + {"1.1.1.4:13", true}, }, makeServicePortName("ns1", "ep1", "p14"): { {"1.1.1.3:14", false}, - {"1.1.1.4:14", false}, + {"1.1.1.4:14", true}, }, makeServicePortName("ns2", "ep2", "p21"): { {"2.2.2.1:21", false}, - {"2.2.2.2:21", false}, + {"2.2.2.2:21", true}, }, makeServicePortName("ns2", "ep2", "p22"): { {"2.2.2.1:22", false}, - {"2.2.2.2:22", false}, + {"2.2.2.2:22", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, - {"1.1.1.2:11", false}, + {"1.1.1.2:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { {"1.1.1.1:12", false}, - {"1.1.1.2:12", false}, + {"1.1.1.2:12", true}, }, makeServicePortName("ns1", "ep1", "p13"): { {"1.1.1.3:13", false}, - {"1.1.1.4:13", false}, + {"1.1.1.4:13", true}, }, makeServicePortName("ns1", "ep1", "p14"): { {"1.1.1.3:14", false}, - {"1.1.1.4:14", false}, + {"1.1.1.4:14", true}, }, makeServicePortName("ns2", "ep2", "p21"): { {"2.2.2.1:21", false}, - {"2.2.2.2:21", false}, + {"2.2.2.2:21", true}, }, makeServicePortName("ns2", "ep2", "p22"): { {"2.2.2.1:22", false}, - {"2.2.2.2:22", false}, + {"2.2.2.2:22", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 2, + makeNSN("ns2", "ep2"): 1, + }, }, { // Case[6]: add an Endpoints newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", + IP: "1.1.1.1", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Port: 11, @@ -1667,16 +1664,19 @@ func Test_buildNewEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, + }, }, { // Case[7]: remove an Endpoints newEndpoints: []*api.Endpoints{ /* empty */ }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, + {"1.1.1.1:11", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, @@ -1684,6 +1684,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", ""), }}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[8]: add an IP and port newEndpoints: []*api.Endpoints{ @@ -1692,7 +1693,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { Addresses: []api.EndpointAddress{{ IP: "1.1.1.1", }, { - IP: "1.1.1.2", + IP: "1.1.1.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p11", @@ -1712,14 +1714,17 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, - {"1.1.1.2:11", false}, + {"1.1.1.2:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { {"1.1.1.1:12", false}, - {"1.1.1.2:12", false}, + {"1.1.1.2:12", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, + }, }, { // Case[9]: remove an IP and port newEndpoints: []*api.Endpoints{ @@ -1760,6 +1765,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.2:12", servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[10]: add a subset newEndpoints: []*api.Endpoints{ @@ -1774,7 +1780,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { }}, }, { Addresses: []api.EndpointAddress{{ - IP: "2.2.2.2", + IP: "2.2.2.2", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p22", @@ -1793,10 +1800,13 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, makeServicePortName("ns1", "ep1", "p22"): { - {"2.2.2.2:22", false}, + {"2.2.2.2:22", true}, }, }, expectedStale: []endpointServicePair{}, + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, + }, }, { // Case[11]: remove a subset newEndpoints: []*api.Endpoints{ @@ -1829,6 +1839,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "2.2.2.2:22", servicePortName: makeServicePortName("ns1", "ep1", "p22"), }}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[12]: rename a port newEndpoints: []*api.Endpoints{ @@ -1858,6 +1869,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[13]: renumber a port newEndpoints: []*api.Endpoints{ @@ -1887,6 +1899,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[14]: complex add and remove newEndpoints: []*api.Endpoints{ @@ -1928,7 +1941,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { makeTestEndpoints("ns4", "ep4", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: "4.4.4.4", + IP: "4.4.4.4", + NodeName: &nodeName, }}, Ports: []api.EndpointPort{{ Name: "p44", @@ -1942,18 +1956,18 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, makeServicePortName("ns2", "ep2", "p22"): { - {"2.2.2.2:22", false}, - {"2.2.2.22:22", false}, + {"2.2.2.2:22", true}, + {"2.2.2.22:22", true}, }, makeServicePortName("ns2", "ep2", "p23"): { - {"2.2.2.3:23", false}, + {"2.2.2.3:23", true}, }, makeServicePortName("ns4", "ep4", "p44"): { - {"4.4.4.4:44", false}, - {"4.4.4.5:44", false}, + {"4.4.4.4:44", true}, + {"4.4.4.5:44", true}, }, makeServicePortName("ns4", "ep4", "p45"): { - {"4.4.4.6:45", false}, + {"4.4.4.6:45", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ @@ -1971,7 +1985,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { {"3.3.3.3:33", false}, }, makeServicePortName("ns4", "ep4", "p44"): { - {"4.4.4.4:44", false}, + {"4.4.4.4:44", true}, }, }, expectedStale: []endpointServicePair{{ @@ -1990,10 +2004,13 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "4.4.4.6:45", servicePortName: makeServicePortName("ns4", "ep4", "p45"), }}, + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns4", "ep4"): 1, + }, }} for tci, tc := range testCases { - newMap, stale := buildNewEndpointsMap(tc.newEndpoints, tc.oldEndpoints, "host", fakeHealthChecker{}) + newMap, hcEndpoints, stale := buildNewEndpointsMap(tc.newEndpoints, tc.oldEndpoints, nodeName) if len(newMap) != len(tc.expectedResult) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) } @@ -2016,6 +2033,9 @@ func Test_buildNewEndpointsMap(t *testing.T) { t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale) } } + if !reflect.DeepEqual(hcEndpoints, tc.expectedHealthchecks) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, hcEndpoints) + } } }