overhaul proxy healthchecks

The existing healthcheck lib was pretty complicated and was hiding some
bugs (like the count always being 1),  This is a reboot of the interface
and implementation to be significantly simpler and better tested.
This commit is contained in:
Tim Hockin 2017-04-01 21:14:30 -07:00
parent 7664b97ed2
commit 87d3f2c622
11 changed files with 595 additions and 648 deletions

View File

@ -11,20 +11,17 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"api.go",
"doc.go", "doc.go",
"healthcheck.go", "healthcheck.go",
"http.go",
"listener.go",
"worker.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:github.com/renstrom/dedent",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record",
], ],
) )
@ -34,6 +31,7 @@ go_test(
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/sets",
], ],

View File

@ -1,65 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
)
// All public API Methods for this package
// UpdateEndpoints Update the set of local endpoints for a service
func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) {
req := &proxyMutationRequest{
serviceName: serviceName,
endpointUids: &endpointUids,
}
healthchecker.mutationRequestChannel <- req
}
func updateServiceListener(serviceName types.NamespacedName, listenPort int, add bool) bool {
responseChannel := make(chan bool)
req := &proxyListenerRequest{
serviceName: serviceName,
listenPort: uint16(listenPort),
add: add,
responseChannel: responseChannel,
}
healthchecker.listenerRequestChannel <- req
return <-responseChannel
}
// AddServiceListener Request addition of a listener for a service's health check
func AddServiceListener(serviceName types.NamespacedName, listenPort int) bool {
return updateServiceListener(serviceName, listenPort, true)
}
// DeleteServiceListener Request deletion of a listener for a service's health check
func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) bool {
return updateServiceListener(serviceName, listenPort, false)
}
// Run Start the healthchecker main loop
func Run() {
healthchecker = proxyHealthCheckFactory()
// Wrap with a wait.Forever to handle panics.
go wait.Forever(func() {
healthchecker.handlerLoop()
}, 0)
}

View File

@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies // Package healthcheck provides tools for serving kube-proxy healthchecks.
package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"

View File

@ -20,108 +20,210 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"strings"
"sync"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/renstrom/dedent"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/cache" clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
) )
// proxyMutationRequest: Message to request addition/deletion of endpoints for a service // Server serves HTTP endpoints for each service name, with results
type proxyMutationRequest struct { // based on the endpoints. If there are 0 endpoints for a service, it returns a
serviceName types.NamespacedName // 503 "Service Unavailable" error (telling LBs not to use this node). If there
endpointUids *sets.String // are 1 or more endpoints, it returns a 200 "OK".
type Server interface {
// Make the new set of services be active. Services that were open before
// will be closed. Services that are new will be opened. Service that
// existed and are in the new set will be left alone. The value of the map
// is the healthcheck-port to listen on.
SyncServices(newServices map[types.NamespacedName]uint16) error
// Make the new set of endpoints be active. Endpoints for services that do
// not exist will be dropped. The value of the map is the number of
// endpoints the service has on this node.
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
} }
// proxyListenerRequest: Message to request addition/deletion of a service responder on a listening port // Listener allows for testing of Server. If the Listener argument
type proxyListenerRequest struct { // to NewServer() is nil, the real net.Listen function will be used.
serviceName types.NamespacedName type Listener interface {
listenPort uint16 // Listen is very much like net.Listen, except the first arg (network) is
add bool // fixed to be "tcp".
responseChannel chan bool Listen(addr string) (net.Listener, error)
} }
// serviceEndpointsList: A list of endpoints for a service // HTTPServerFactory allows for testing of Server. If the
type serviceEndpointsList struct { // HTTPServerFactory argument to NewServer() is nil, the real
serviceName types.NamespacedName // http.Server type will be used.
endpoints *sets.String type HTTPServerFactory interface {
// New creates an instance of a type satisfying HTTPServer. This is
// designed to include http.Server.
New(addr string, handler http.Handler) HTTPServer
} }
// serviceResponder: Contains net/http datastructures necessary for responding to each Service's health check on its aux nodePort // HTTPServer allows for testing of Server.
type serviceResponder struct { type HTTPServer interface {
serviceName types.NamespacedName // Server is designed so that http.Server satifies this interface,
listenPort uint16 Serve(listener net.Listener) error
listener *net.Listener
server *http.Server
} }
// proxyHC: Handler structure for health check, endpoint add/delete and service listener add/delete requests // NewServer allocates a new healthcheck server manager. If either
type proxyHC struct { // of the injected arguments are nil, defaults will be used.
serviceEndpointsMap cache.ThreadSafeStore func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server {
serviceResponderMap map[types.NamespacedName]serviceResponder if listener == nil {
mutationRequestChannel chan *proxyMutationRequest listener = stdNetListener{}
listenerRequestChannel chan *proxyListenerRequest
}
// handleHealthCheckRequest - received a health check request - lookup and respond to HC.
func (h *proxyHC) handleHealthCheckRequest(rw http.ResponseWriter, serviceName string) {
s, ok := h.serviceEndpointsMap.Get(serviceName)
if !ok {
glog.V(4).Infof("Service %s not found or has no local endpoints", serviceName)
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "No Service Endpoints Found")
return
} }
numEndpoints := len(*s.(*serviceEndpointsList).endpoints) if httpServerFactory == nil {
if numEndpoints > 0 { httpServerFactory = stdHTTPServerFactory{}
sendHealthCheckResponse(rw, http.StatusOK, fmt.Sprintf("%d Service Endpoints found", numEndpoints)) }
return return &server{
hostname: hostname,
recorder: recorder,
listener: listener,
httpFactory: httpServerFactory,
services: map[types.NamespacedName]*hcInstance{},
} }
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "0 local Endpoints are alive")
} }
// handleMutationRequest - receive requests to mutate the table entry for a service // Implement Listener in terms of net.Listen.
func (h *proxyHC) handleMutationRequest(req *proxyMutationRequest) { type stdNetListener struct{}
numEndpoints := len(*req.endpointUids)
glog.V(4).Infof("LB service health check mutation request Service: %s - %d Endpoints %v", func (stdNetListener) Listen(addr string) (net.Listener, error) {
req.serviceName, numEndpoints, (*req.endpointUids).List()) return net.Listen("tcp", addr)
if numEndpoints == 0 { }
if _, ok := h.serviceEndpointsMap.Get(req.serviceName.String()); ok {
glog.V(4).Infof("Deleting endpoints map for service %s, all local endpoints gone", req.serviceName.String()) var _ Listener = stdNetListener{}
h.serviceEndpointsMap.Delete(req.serviceName.String())
} // Implement HTTPServerFactory in terms of http.Server.
return type stdHTTPServerFactory struct{}
func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
return &http.Server{
Addr: addr,
Handler: handler,
} }
var entry *serviceEndpointsList }
e, exists := h.serviceEndpointsMap.Get(req.serviceName.String())
if exists { var _ HTTPServerFactory = stdHTTPServerFactory{}
entry = e.(*serviceEndpointsList)
if entry.endpoints.Equal(*req.endpointUids) { type server struct {
return hostname string
} recorder record.EventRecorder // can be nil
// Compute differences just for printing logs about additions and removals listener Listener
deletedEndpoints := entry.endpoints.Difference(*req.endpointUids) httpFactory HTTPServerFactory
newEndpoints := req.endpointUids.Difference(*entry.endpoints)
for _, e := range newEndpoints.List() { lock sync.Mutex
glog.V(4).Infof("Adding local endpoint %s to LB health check for service %s", services map[types.NamespacedName]*hcInstance
e, req.serviceName.String()) }
}
for _, d := range deletedEndpoints.List() { func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
glog.V(4).Infof("Deleted endpoint %s from service %s LB health check (%d endpoints left)", hcs.lock.Lock()
d, req.serviceName.String(), len(*entry.endpoints)) defer hcs.lock.Unlock()
// Remove any that are not needed any more.
for nsn, svc := range hcs.services {
if port, found := newServices[nsn]; !found || port != svc.port {
glog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.listener.Close(); err != nil {
glog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
}
delete(hcs.services, nsn)
} }
} }
entry = &serviceEndpointsList{serviceName: req.serviceName, endpoints: req.endpointUids}
h.serviceEndpointsMap.Add(req.serviceName.String(), entry) // Add any that are needed.
for nsn, port := range newServices {
if hcs.services[nsn] != nil {
glog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port)
continue
}
glog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
svc := &hcInstance{port: port}
addr := fmt.Sprintf(":%d", port)
svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
var err error
svc.listener, err = hcs.listener.Listen(addr)
if err != nil {
msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
if hcs.recorder != nil {
hcs.recorder.Eventf(
&clientv1.ObjectReference{
Kind: "Service",
Namespace: nsn.Namespace,
Name: nsn.Name,
UID: types.UID(nsn.String()),
}, api.EventTypeWarning, "FailedToStartHealthcheck", msg)
}
glog.Error(msg)
continue
}
hcs.services[nsn] = svc
go func(nsn types.NamespacedName, svc *hcInstance) {
// Serve() will exit when the listener is closed.
glog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.server.Serve(svc.listener); err != nil {
glog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
return
}
glog.V(3).Infof("Healthcheck %q closed", nsn.String())
}(nsn, svc)
}
return nil
} }
// proxyHealthCheckRequest - Factory method to instantiate the health check handler type hcInstance struct {
func proxyHealthCheckFactory() *proxyHC { port uint16
glog.V(2).Infof("Initializing kube-proxy health checker") listener net.Listener
phc := &proxyHC{ server HTTPServer
serviceEndpointsMap: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), endpoints int // number of local endpoints for a service
serviceResponderMap: make(map[types.NamespacedName]serviceResponder), }
mutationRequestChannel: make(chan *proxyMutationRequest, 1024),
listenerRequestChannel: make(chan *proxyListenerRequest, 1024), type hcHandler struct {
} name types.NamespacedName
return phc hcs *server
}
var _ http.Handler = hcHandler{}
func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
h.hcs.lock.Lock()
count := h.hcs.services[h.name].endpoints
h.hcs.lock.Unlock()
resp.Header().Set("Content-Type", "application/json")
if count == 0 {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
{
"service": {
"namespace": %q,
"name": %q
},
"localEndpoints": %d
}
`, h.name.Namespace, h.name.Name, count)), "\n"))
}
func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
hcs.lock.Lock()
defer hcs.lock.Unlock()
for nsn, count := range newEndpoints {
if hcs.services[nsn] == nil {
glog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String())
continue
}
glog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String())
hcs.services[nsn].endpoints = count
}
return nil
} }

View File

@ -17,142 +17,310 @@ limitations under the License.
package healthcheck package healthcheck
import ( import (
"fmt" "encoding/json"
"io/ioutil" "net"
"math/rand"
"net/http" "net/http"
"net/http/httptest"
"testing" "testing"
"time"
"github.com/davecgh/go-spew/spew"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
) )
type TestCaseData struct { type fakeListener struct {
nodePorts int openPorts sets.String
numEndpoints int
nodePortList []int
svcNames []types.NamespacedName
} }
const ( func newFakeListener() *fakeListener {
startPort = 20000 return &fakeListener{
endPort = 40000 openPorts: sets.String{},
) }
}
var (
choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func (fake *fakeListener) hasPort(addr string) bool {
) return fake.openPorts.Has(addr)
}
func generateRandomString(n int) string {
func (fake *fakeListener) Listen(addr string) (net.Listener, error) {
b := make([]byte, n) fake.openPorts.Insert(addr)
l := len(choices) return &fakeNetListener{
for i := range b { parent: fake,
b[i] = choices[rand.Intn(l)] addr: addr,
} }, nil
return string(b) }
}
type fakeNetListener struct {
func chooseServiceName(tc int, hint int) types.NamespacedName { parent *fakeListener
var svc types.NamespacedName addr string
svc.Namespace = fmt.Sprintf("ns_%d", tc) }
svc.Name = fmt.Sprintf("name_%d", hint)
return svc func (fake *fakeNetListener) Accept() (net.Conn, error) {
} // Not implemented
return nil, nil
func generateEndpointSet(max int) sets.String { }
s := sets.NewString()
for i := 0; i < max; i++ { func (fake *fakeNetListener) Close() error {
s.Insert(fmt.Sprintf("%d%s", i, generateRandomString(8))) fake.parent.openPorts.Delete(fake.addr)
} return nil
return s }
}
func (fake *fakeNetListener) Addr() net.Addr {
func verifyHealthChecks(tc *TestCaseData, t *testing.T) bool { // Not implemented
var success = true return nil
time.Sleep(100 * time.Millisecond) }
for i := 0; i < tc.nodePorts; i++ {
t.Logf("Validating HealthCheck works for svc %s nodePort %d\n", tc.svcNames[i], tc.nodePortList[i]) type fakeHTTPServerFactory struct{}
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", tc.nodePortList[i]))
if err != nil { func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
t.Logf("ERROR: Failed to connect to listening port") return &fakeHTTPServerFactory{}
success = false }
continue
} func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
robots, err := ioutil.ReadAll(res.Body) return &fakeHTTPServer{
if res.StatusCode == http.StatusServiceUnavailable { addr: addr,
t.Logf("ERROR: HealthCheck returned %s: %s", res.Status, string(robots)) handler: handler,
success = false }
continue }
}
res.Body.Close() type fakeHTTPServer struct {
if err != nil { addr string
t.Logf("Error: reading body of response (%s)", err) handler http.Handler
success = false }
continue
} func (fake *fakeHTTPServer) Serve(listener net.Listener) error {
} return nil // Cause the goroutine to return
if success { }
t.Logf("Success: All nodePorts found active")
} func mknsn(ns, name string) types.NamespacedName {
return success return types.NamespacedName{
} Namespace: ns,
Name: name,
func TestHealthChecker(t *testing.T) { }
testcases := []TestCaseData{ }
{
nodePorts: 1, type hcPayload struct {
numEndpoints: 2, Service struct {
}, Namespace string
{ Name string
nodePorts: 10, }
numEndpoints: 6, LocalEndpoints int
}, }
{
nodePorts: 100, func TestServer(t *testing.T) {
numEndpoints: 1, listener := newFakeListener()
}, httpFactory := newFakeHTTPServerFactory()
}
hcsi := NewServer("hostname", nil, listener, httpFactory)
Run() hcs := hcsi.(*server)
if len(hcs.services) != 0 {
ports := startPort t.Errorf("expected 0 services, got %d", len(hcs.services))
for n, tc := range testcases { }
tc.nodePortList = make([]int, tc.nodePorts)
tc.svcNames = make([]types.NamespacedName, tc.nodePorts) // sync nothing
for i := 0; i < tc.nodePorts; i++ { hcs.SyncServices(nil)
tc.svcNames[i] = chooseServiceName(n, i) if len(hcs.services) != 0 {
t.Logf("Updating endpoints map for %s %d", tc.svcNames[i], tc.numEndpoints) t.Errorf("expected 0 services, got %d", len(hcs.services))
for { }
UpdateEndpoints(tc.svcNames[i], generateEndpointSet(tc.numEndpoints)) hcs.SyncEndpoints(nil)
tc.nodePortList[i] = ports if len(hcs.services) != 0 {
ports++ t.Errorf("expected 0 services, got %d", len(hcs.services))
if AddServiceListener(tc.svcNames[i], tc.nodePortList[i]) { }
break
} // sync unknown endpoints, should be dropped
DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93})
// Keep searching for a port that works if len(hcs.services) != 0 {
t.Logf("Failed to bind/listen on port %d...trying next port", ports-1) t.Errorf("expected 0 services, got %d", len(hcs.services))
if ports > endPort { }
t.Errorf("Exhausted range of ports available for tests")
return // sync a real service
} nsn := mknsn("a", "b")
} hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376})
} if len(hcs.services) != 1 {
t.Logf("Validating if all nodePorts for tc %d work", n) t.Errorf("expected 1 service, got %d", len(hcs.services))
if !verifyHealthChecks(&tc, t) { }
t.Errorf("Healthcheck validation failed") if hcs.services[nsn].endpoints != 0 {
} t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
}
for i := 0; i < tc.nodePorts; i++ { if len(listener.openPorts) != 1 {
DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts))
UpdateEndpoints(tc.svcNames[i], sets.NewString()) }
} if !listener.hasPort(":9376") {
t.Errorf("expected port :9376 to be open\n%s", spew.Sdump(listener.openPorts))
// Ensure that all listeners have been shutdown }
if verifyHealthChecks(&tc, t) { // test the handler
t.Errorf("Healthcheck validation failed") testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
}
// sync an endpoint
hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18})
if len(hcs.services) != 1 {
t.Errorf("expected 1 service, got %d", len(hcs.services))
}
if hcs.services[nsn].endpoints != 18 {
t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints)
}
// test the handler
testHandler(hcs, nsn, http.StatusOK, 18, t)
// sync zero endpoints
hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 0})
if len(hcs.services) != 1 {
t.Errorf("expected 1 service, got %d", len(hcs.services))
}
if hcs.services[nsn].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
}
// test the handler
testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
// sync nil endpoints
hcs.SyncEndpoints(nil)
if len(hcs.services) != 1 {
t.Errorf("expected 1 service, got %d", len(hcs.services))
}
if hcs.services[nsn].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints)
}
// test the handler
testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t)
// put the endpoint back
hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18})
if len(hcs.services) != 1 {
t.Errorf("expected 1 service, got %d", len(hcs.services))
}
if hcs.services[nsn].endpoints != 18 {
t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints)
}
// delete the service
hcs.SyncServices(nil)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
}
// sync multiple services
nsn1 := mknsn("a", "b")
nsn2 := mknsn("c", "d")
nsn3 := mknsn("e", "f")
nsn4 := mknsn("g", "h")
hcs.SyncServices(map[types.NamespacedName]uint16{
nsn1: 9376,
nsn2: 12909,
nsn3: 11113,
})
if len(hcs.services) != 3 {
t.Errorf("expected 3 service, got %d", len(hcs.services))
}
if hcs.services[nsn1].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn1].endpoints)
}
if hcs.services[nsn2].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn2].endpoints)
}
if hcs.services[nsn3].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints)
}
if len(listener.openPorts) != 3 {
t.Errorf("expected 3 open ports, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts))
}
// test the handlers
testHandler(hcs, nsn1, http.StatusServiceUnavailable, 0, t)
testHandler(hcs, nsn2, http.StatusServiceUnavailable, 0, t)
// sync endpoints
hcs.SyncEndpoints(map[types.NamespacedName]int{
nsn1: 9,
nsn2: 3,
nsn3: 7,
})
if len(hcs.services) != 3 {
t.Errorf("expected 3 services, got %d", len(hcs.services))
}
if hcs.services[nsn1].endpoints != 9 {
t.Errorf("expected 9 endpoints, got %d", hcs.services[nsn1].endpoints)
}
if hcs.services[nsn2].endpoints != 3 {
t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints)
}
if hcs.services[nsn3].endpoints != 7 {
t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints)
}
// test the handlers
testHandler(hcs, nsn1, http.StatusOK, 9, t)
testHandler(hcs, nsn2, http.StatusOK, 3, t)
testHandler(hcs, nsn3, http.StatusOK, 7, t)
// sync new services
hcs.SyncServices(map[types.NamespacedName]uint16{
//nsn1: 9376, // remove it
nsn2: 12909, // leave it
nsn3: 11114, // change it
nsn4: 11878, // add it
})
if len(hcs.services) != 3 {
t.Errorf("expected 3 service, got %d", len(hcs.services))
}
if hcs.services[nsn2].endpoints != 3 {
t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints)
}
if hcs.services[nsn3].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints)
}
if hcs.services[nsn4].endpoints != 0 {
t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn4].endpoints)
}
// test the handlers
testHandler(hcs, nsn2, http.StatusOK, 3, t)
testHandler(hcs, nsn3, http.StatusServiceUnavailable, 0, t)
testHandler(hcs, nsn4, http.StatusServiceUnavailable, 0, t)
// sync endpoints
hcs.SyncEndpoints(map[types.NamespacedName]int{
nsn1: 9,
nsn2: 3,
nsn3: 7,
nsn4: 6,
})
if len(hcs.services) != 3 {
t.Errorf("expected 3 services, got %d", len(hcs.services))
}
if hcs.services[nsn2].endpoints != 3 {
t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints)
}
if hcs.services[nsn3].endpoints != 7 {
t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints)
}
if hcs.services[nsn4].endpoints != 6 {
t.Errorf("expected 6 endpoints, got %d", hcs.services[nsn4].endpoints)
}
// test the handlers
testHandler(hcs, nsn2, http.StatusOK, 3, t)
testHandler(hcs, nsn3, http.StatusOK, 7, t)
testHandler(hcs, nsn4, http.StatusOK, 6, t)
}
func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) {
handler := hcs.services[nsn].server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", "/healthz", nil)
if err != nil {
t.Fatal(err)
}
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
if resp.Code != status {
t.Errorf("expected status code %v, got %v", status, resp.Code)
}
var payload hcPayload
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
t.Fatal(err)
}
if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace {
t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service)
}
if payload.LocalEndpoints != endpoints {
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
} }
} }

View File

@ -1,46 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"fmt"
"net/http"
"github.com/golang/glog"
)
// A healthCheckHandler serves http requests on /healthz on the service health check node port,
// and responds to every request with either:
// 200 OK and the count of endpoints for the given service that are local to this node.
// or
// 503 Service Unavailable If the count is zero or the service does not exist
type healthCheckHandler struct {
svcNsName string
}
// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object
func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error string) {
rw.Header().Set("Content-Type", "text/plain")
rw.WriteHeader(statusCode)
fmt.Fprint(rw, error)
}
// ServeHTTP: Interface callback method for net.Listener Handlers
func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) {
glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcNsName)
healthchecker.handleHealthCheckRequest(response, h.svcNsName)
}

View File

@ -1,77 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
// Create/Delete dynamic listeners on the required nodePorts
import (
"fmt"
"net"
"net/http"
"github.com/golang/glog"
)
// handleServiceListenerRequest: receive requests to add/remove service health check listening ports
func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool {
sr, serviceFound := h.serviceResponderMap[req.serviceName]
if !req.add {
if !serviceFound {
return false
}
glog.Infof("Deleting HealthCheckListenPort for service %s port %d",
req.serviceName, req.listenPort)
delete(h.serviceResponderMap, req.serviceName)
(*sr.listener).Close()
return true
} else if serviceFound {
if req.listenPort == sr.listenPort {
// Addition requested but responder for service already exists and port is unchanged
return true
}
// Addition requested but responder for service already exists but the listen port has changed
glog.Infof("HealthCheckListenPort for service %s changed from %d to %d - closing old listening port",
req.serviceName, sr.listenPort, req.listenPort)
delete(h.serviceResponderMap, req.serviceName)
(*sr.listener).Close()
}
// Create a service responder object and start listening and serving on the provided port
glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort)
server := http.Server{
Addr: fmt.Sprintf(":%d", req.listenPort),
Handler: healthCheckHandler{svcNsName: req.serviceName.String()},
}
listener, err := net.Listen("tcp", server.Addr)
if err != nil {
glog.Warningf("FAILED to listen on address %s (%s)\n", server.Addr, err)
return false
}
h.serviceResponderMap[req.serviceName] = serviceResponder{serviceName: req.serviceName,
listenPort: req.listenPort,
listener: &listener,
server: &server}
go func() {
// Anonymous goroutine to block on Serve for this listen port - Serve will exit when the listener is closed
glog.V(3).Infof("Goroutine blocking on serving health checks for %s on port %d", req.serviceName, req.listenPort)
if err := server.Serve(listener); err != nil {
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed with error %s\n", req.listenPort, req.serviceName, err)
return
}
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed\n", req.listenPort, req.serviceName)
}()
return true
}

View File

@ -1,53 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies
package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"
import (
"time"
"github.com/golang/glog"
)
var healthchecker *proxyHC
// handlerLoop Serializes all requests to prevent concurrent access to the maps
func (h *proxyHC) handlerLoop() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case req := <-h.mutationRequestChannel:
h.handleMutationRequest(req)
case req := <-h.listenerRequestChannel:
req.responseChannel <- h.handleServiceListenerRequest(req)
case <-ticker.C:
go h.sync()
}
}
}
func (h *proxyHC) sync() {
glog.V(4).Infof("%d Health Check Listeners", len(h.serviceResponderMap))
glog.V(4).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List()))
for _, svc := range h.serviceEndpointsMap.ListKeys() {
if e, ok := h.serviceEndpointsMap.Get(svc); ok {
endpointList := e.(*serviceEndpointsList)
glog.V(4).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len())
}
}
}

View File

@ -50,7 +50,6 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
], ],
) )

View File

@ -219,7 +219,7 @@ type Proxier struct {
nodeIP net.IP nodeIP net.IP
portMapper portOpener portMapper portOpener
recorder record.EventRecorder recorder record.EventRecorder
healthChecker healthChecker healthChecker healthcheck.Server
} }
type localPort struct { type localPort struct {
@ -251,17 +251,6 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
return openLocalPort(lp) return openLocalPort(lp)
} }
type healthChecker interface {
UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String)
}
// TODO: the healthcheck pkg should offer a type
type globalHealthChecker struct{}
func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {
healthcheck.UpdateEndpoints(serviceName, endpointUIDs)
}
// Proxier implements ProxyProvider // Proxier implements ProxyProvider
var _ proxy.ProxyProvider = &Proxier{} var _ proxy.ProxyProvider = &Proxier{}
@ -315,8 +304,7 @@ func NewProxier(ipt utiliptables.Interface,
glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
} }
healthChecker := globalHealthChecker{} healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
go healthcheck.Run()
var throttle flowcontrol.RateLimiter var throttle flowcontrol.RateLimiter
// Defaulting back to not limit sync rate when minSyncPeriod is 0. // Defaulting back to not limit sync rate when minSyncPeriod is 0.
@ -450,18 +438,12 @@ func (proxier *Proxier) SyncLoop() {
} }
} }
type healthCheckPort struct {
namespace types.NamespacedName
nodeport int
}
// Accepts a list of Services and the existing service map. Returns the new // Accepts a list of Services and the existing service map. Returns the new
// service map, a list of healthcheck ports to add to or remove from the health // service map, a map of healthcheck ports, and a set of stale UDP
// checking listener service, and a set of stale UDP services. // services.
func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
newServiceMap := make(proxyServiceMap) newServiceMap := make(proxyServiceMap)
healthCheckAdd := make([]healthCheckPort, 0) hcPorts := make(map[types.NamespacedName]uint16)
healthCheckDel := make([]healthCheckPort, 0)
for _, service := range allServices { for _, service := range allServices {
svcName := types.NamespacedName{ svcName := types.NamespacedName{
@ -497,12 +479,8 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} }
if !exists || !equal { if info.onlyNodeLocalEndpoints {
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { hcPorts[svcName] = uint16(info.healthCheckNodePort)
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
} else {
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
}
} }
newServiceMap[serviceName] = info newServiceMap[serviceName] = info
@ -510,6 +488,13 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
} }
} }
for nsn, port := range hcPorts {
if port == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", nsn)
delete(hcPorts, nsn)
}
}
staleUDPServices := sets.NewString() staleUDPServices := sets.NewString()
// Remove serviceports missing from the update. // Remove serviceports missing from the update.
for name, info := range oldServiceMap { for name, info := range oldServiceMap {
@ -518,13 +503,10 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
if info.protocol == api.ProtocolUDP { if info.protocol == api.ProtocolUDP {
staleUDPServices.Insert(info.clusterIP.String()) staleUDPServices.Insert(info.clusterIP.String())
} }
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
}
} }
} }
return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices return newServiceMap, hcPorts, staleUDPServices
} }
// OnServiceUpdate tracks the active set of service proxies. // OnServiceUpdate tracks the active set of service proxies.
@ -537,19 +519,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
} }
proxier.allServices = allServices proxier.allServices = allServices
newServiceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap) newServiceMap, hcPorts, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap)
for _, hc := range hcAdd {
glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport) // update healthcheck ports
// Turn on healthcheck responder to listen on the health check nodePort if err := proxier.healthChecker.SyncServices(hcPorts); err != nil {
// FIXME: handle failures from adding the service glog.Errorf("Error syncing healtcheck ports: %v", err)
healthcheck.AddServiceListener(hc.namespace, hc.nodeport)
}
for _, hc := range hcDel {
// Remove ServiceListener health check nodePorts from the health checker
// TODO - Stats
glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport)
// FIXME: handle failures from deleting the service
healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport)
} }
if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) { if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) {
@ -585,7 +559,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string, func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string,
healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) { healthChecker healthcheck.Server) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) {
// return values // return values
newMap = make(proxyEndpointMap) newMap = make(proxyEndpointMap)
@ -618,41 +592,28 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
return return
} }
// Update service health check. We include entries from the current map, // accumulate local IPs per service, ignoring ports
// with zero-length value, to trigger the healthchecker to stop reporting localIPs := map[types.NamespacedName]sets.String{}
// health for that service.
//
// This whole mechanism may be over-designed. It builds a list of endpoints
// per service, filters for local endpoints, builds a string that is the
// same as the name, and then passes each (name, list) pair over a channel.
//
// I am pretty sure that there's no way there can be more than one entry in
// the final list, and passing an empty list as a delete signal is weird.
// It could probably be simplified to a synchronous function call of a set
// of NamespacedNames. I am not making that simplification at this time.
//
// ServicePortName includes the port name, which doesn't matter for
// healthchecks. It's possible that a single update both added and removed
// ports on the same IP, so we need to make sure that removals are counted,
// with additions overriding them. Track all endpoints so we can find local
// ones.
epsBySvcName := map[types.NamespacedName][]*endpointsInfo{}
for svcPort := range curMap {
epsBySvcName[svcPort.NamespacedName] = nil
}
for svcPort := range newMap { for svcPort := range newMap {
epsBySvcName[svcPort.NamespacedName] = append(epsBySvcName[svcPort.NamespacedName], newMap[svcPort]...) for _, ep := range newMap[svcPort] {
}
for nsn, eps := range epsBySvcName {
// Use a set instead of a slice to provide deduplication
epSet := sets.NewString()
for _, ep := range eps {
if ep.isLocal { if ep.isLocal {
// kube-proxy health check only needs local endpoints nsn := svcPort.NamespacedName
epSet.Insert(fmt.Sprintf("%s/%s", nsn.Namespace, nsn.Name)) if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
}
ip := strings.Split(ep.endpoint, ":")[0] // just the IP part
localIPs[nsn].Insert(ip)
} }
} }
healthChecker.UpdateEndpoints(nsn, epSet) }
// produce a count per service
localEndpointCounts := map[types.NamespacedName]int{}
for nsn, ips := range localIPs {
localEndpointCounts[nsn] = len(ips)
}
// update healthcheck endpoints
if err := healthChecker.SyncEndpoints(localEndpointCounts); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
} }
return newMap, staleSet return newMap, staleSet

View File

@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
@ -357,21 +356,25 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
} }
type fakeHealthChecker struct { type fakeHealthChecker struct {
endpoints map[types.NamespacedName]sets.String services map[types.NamespacedName]uint16
endpoints map[types.NamespacedName]int
} }
func newFakeHealthChecker() *fakeHealthChecker { func newFakeHealthChecker() *fakeHealthChecker {
return &fakeHealthChecker{ return &fakeHealthChecker{
endpoints: map[types.NamespacedName]sets.String{}, services: map[types.NamespacedName]uint16{},
endpoints: map[types.NamespacedName]int{},
} }
} }
func (fake *fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
if len(endpointUIDs) == 0 { fake.services = newServices
delete(fake.endpoints, serviceName) return nil
} else { }
fake.endpoints[serviceName] = endpointUIDs
} func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
fake.endpoints = newEndpoints
return nil
} }
const testHostname = "test-hostname" const testHostname = "test-hostname"
@ -941,30 +944,18 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
}), }),
} }
serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
if len(serviceMap) != 8 { if len(serviceMap) != 8 {
t.Errorf("expected service map length 8, got %v", serviceMap) t.Errorf("expected service map length 8, got %v", serviceMap)
} }
// The only-local-loadbalancer ones get added // The only-local-loadbalancer ones get added
if len(hcAdd) != 2 { if len(hcPorts) != 1 {
t.Errorf("expected healthcheck add length 2, got %v", hcAdd) t.Errorf("expected 1 healthcheck port, got %v", hcPorts)
} else { } else {
for _, hc := range hcAdd { nsn := makeNSN("somewhere", "only-local-load-balancer")
if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" { if port, found := hcPorts[nsn]; !found || port != 345 {
t.Errorf("unexpected healthcheck listener added: %v", hc) t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, hcPorts)
}
}
}
// All the rest get deleted
if len(hcDel) != 6 {
t.Errorf("expected healthcheck del length 6, got %v", hcDel)
} else {
for _, hc := range hcDel {
if hc.namespace.Namespace == "somewhere" && hc.namespace.Name == "only-local-load-balancer" {
t.Errorf("unexpected healthcheck listener deleted: %v", hc)
}
} }
} }
@ -976,27 +967,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
// Remove some stuff // Remove some stuff
services = []*api.Service{services[0]} services = []*api.Service{services[0]}
services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]}
serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(services, serviceMap) serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap)
if len(serviceMap) != 1 { if len(serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", serviceMap) t.Errorf("expected service map length 1, got %v", serviceMap)
} }
if len(hcAdd) != 0 { if len(hcPorts) != 0 {
t.Errorf("expected healthcheck add length 1, got %v", hcAdd) t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
}
// The only OnlyLocal annotation was removed above, so we expect a delete now.
// FIXME: Since the BetaAnnotationHealthCheckNodePort is the same for all
// ServicePorts, we'll get one delete per ServicePort, even though they all
// contain the same information
if len(hcDel) != 2 {
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
} else {
for _, hc := range hcDel {
if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" {
t.Errorf("unexpected healthcheck listener deleted: %v", hc)
}
}
} }
// All services but one were deleted. While you'd expect only the ClusterIPs // All services but one were deleted. While you'd expect only the ClusterIPs
@ -1023,17 +1000,14 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
} }
// Headless service should be ignored // Headless service should be ignored
serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
if len(serviceMap) != 0 { if len(serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(serviceMap)) t.Errorf("expected service map length 0, got %d", len(serviceMap))
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
if len(hcAdd) != 0 { if len(hcPorts) != 0 {
t.Errorf("expected healthcheck add length 0, got %d", len(hcAdd)) t.Errorf("expected healthcheck ports length 0, got %d", len(hcPorts))
}
if len(hcDel) != 0 {
t.Errorf("expected healthcheck del length 0, got %d", len(hcDel))
} }
if len(staleUDPServices) != 0 { if len(staleUDPServices) != 0 {
@ -1051,16 +1025,13 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
}), }),
} }
serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
if len(serviceMap) != 0 { if len(serviceMap) != 0 {
t.Errorf("expected service map length 0, got %v", serviceMap) t.Errorf("expected service map length 0, got %v", serviceMap)
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
if len(hcAdd) != 0 { if len(hcPorts) != 0 {
t.Errorf("expected healthcheck add length 0, got %v", hcAdd) t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
}
if len(hcDel) != 0 {
t.Errorf("expected healthcheck del length 0, got %v", hcDel)
} }
if len(staleUDPServices) != 0 { if len(staleUDPServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices) t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices)
@ -1096,15 +1067,12 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
}), }),
} }
serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap))
if len(serviceMap) != 2 { if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap) t.Errorf("expected service map length 2, got %v", serviceMap)
} }
if len(hcAdd) != 0 { if len(hcPorts) != 0 {
t.Errorf("expected healthcheck add length 0, got %v", hcAdd) t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
}
if len(hcDel) != 2 {
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
} }
if len(staleUDPServices) != 0 { if len(staleUDPServices) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
@ -1112,15 +1080,12 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
} }
// Change service to load-balancer // Change service to load-balancer
serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(second, serviceMap) serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap)
if len(serviceMap) != 2 { if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap) t.Errorf("expected service map length 2, got %v", serviceMap)
} }
if len(hcAdd) != 2 { if len(hcPorts) != 1 {
t.Errorf("expected healthcheck add length 2, got %v", hcAdd) t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
}
if len(hcDel) != 0 {
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
} }
if len(staleUDPServices) != 0 { if len(staleUDPServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
@ -1128,30 +1093,24 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// No change; make sure the service map stays the same and there are // No change; make sure the service map stays the same and there are
// no health-check changes // no health-check changes
serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(second, serviceMap) serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap)
if len(serviceMap) != 2 { if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap) t.Errorf("expected service map length 2, got %v", serviceMap)
} }
if len(hcAdd) != 0 { if len(hcPorts) != 1 {
t.Errorf("expected healthcheck add length 0, got %v", hcAdd) t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
}
if len(hcDel) != 0 {
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
} }
if len(staleUDPServices) != 0 { if len(staleUDPServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
} }
// And back to ClusterIP // And back to ClusterIP
serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(first, serviceMap) serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(first, serviceMap)
if len(serviceMap) != 2 { if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap) t.Errorf("expected service map length 2, got %v", serviceMap)
} }
if len(hcAdd) != 0 { if len(hcPorts) != 0 {
t.Errorf("expected healthcheck add length 0, got %v", hcAdd) t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
}
if len(hcDel) != 2 {
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
} }
if len(staleUDPServices) != 0 { if len(staleUDPServices) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
@ -1401,13 +1360,14 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap
return ept return ept
} }
func makeNSN(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}
func makeServicePortName(ns, name, port string) proxy.ServicePortName { func makeServicePortName(ns, name, port string) proxy.ServicePortName {
return proxy.ServicePortName{ return proxy.ServicePortName{
NamespacedName: types.NamespacedName{ NamespacedName: makeNSN(ns, name),
Namespace: ns, Port: port,
Name: name,
},
Port: port,
} }
} }
@ -1419,14 +1379,14 @@ func Test_buildNewEndpointsMap(t *testing.T) {
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo
expectedStale []endpointServicePair expectedStale []endpointServicePair
expectedHealthchecks map[types.NamespacedName]sets.String expectedHealthchecks map[types.NamespacedName]int
}{{ }{{
// Case[0]: nothing // Case[0]: nothing
newEndpoints: []*api.Endpoints{}, newEndpoints: []*api.Endpoints{},
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[1]: no change, unnamed port // Case[1]: no change, unnamed port
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -1452,7 +1412,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[2]: no change, named port, local // Case[2]: no change, named port, local
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -1480,8 +1440,8 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{ expectedHealthchecks: map[types.NamespacedName]int{
types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), makeNSN("ns1", "ep1"): 1,
}, },
}, { }, {
// Case[3]: no change, multiple subsets // Case[3]: no change, multiple subsets
@ -1523,7 +1483,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[4]: no change, multiple subsets, multiple ports, local // Case[4]: no change, multiple subsets, multiple ports, local
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -1574,8 +1534,8 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{ expectedHealthchecks: map[types.NamespacedName]int{
types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), makeNSN("ns1", "ep1"): 1,
}, },
}, { }, {
// Case[5]: no change, multiple endpoints, subsets, IPs, and ports // Case[5]: no change, multiple endpoints, subsets, IPs, and ports
@ -1682,9 +1642,9 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{ expectedHealthchecks: map[types.NamespacedName]int{
types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), makeNSN("ns1", "ep1"): 2,
types.NamespacedName{Namespace: "ns2", Name: "ep2"}: sets.NewString("ns2/ep2"), makeNSN("ns2", "ep2"): 1,
}, },
}, { }, {
// Case[6]: add an Endpoints // Case[6]: add an Endpoints
@ -1708,8 +1668,8 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{ expectedHealthchecks: map[types.NamespacedName]int{
types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), makeNSN("ns1", "ep1"): 1,
}, },
}, { }, {
// Case[7]: remove an Endpoints // Case[7]: remove an Endpoints
@ -1724,7 +1684,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
endpoint: "1.1.1.1:11", endpoint: "1.1.1.1:11",
servicePortName: makeServicePortName("ns1", "ep1", ""), servicePortName: makeServicePortName("ns1", "ep1", ""),
}}, }},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[8]: add an IP and port // Case[8]: add an IP and port
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -1762,8 +1722,8 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{ expectedHealthchecks: map[types.NamespacedName]int{
types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), makeNSN("ns1", "ep1"): 1,
}, },
}, { }, {
// Case[9]: remove an IP and port // Case[9]: remove an IP and port
@ -1805,7 +1765,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
endpoint: "1.1.1.2:12", endpoint: "1.1.1.2:12",
servicePortName: makeServicePortName("ns1", "ep1", "p12"), servicePortName: makeServicePortName("ns1", "ep1", "p12"),
}}, }},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[10]: add a subset // Case[10]: add a subset
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -1844,8 +1804,8 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]sets.String{ expectedHealthchecks: map[types.NamespacedName]int{
types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), makeNSN("ns1", "ep1"): 1,
}, },
}, { }, {
// Case[11]: remove a subset // Case[11]: remove a subset
@ -1879,7 +1839,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
endpoint: "2.2.2.2:22", endpoint: "2.2.2.2:22",
servicePortName: makeServicePortName("ns1", "ep1", "p22"), servicePortName: makeServicePortName("ns1", "ep1", "p22"),
}}, }},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[12]: rename a port // Case[12]: rename a port
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -1909,7 +1869,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
endpoint: "1.1.1.1:11", endpoint: "1.1.1.1:11",
servicePortName: makeServicePortName("ns1", "ep1", "p11"), servicePortName: makeServicePortName("ns1", "ep1", "p11"),
}}, }},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[13]: renumber a port // Case[13]: renumber a port
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -1939,7 +1899,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
endpoint: "1.1.1.1:11", endpoint: "1.1.1.1:11",
servicePortName: makeServicePortName("ns1", "ep1", "p11"), servicePortName: makeServicePortName("ns1", "ep1", "p11"),
}}, }},
expectedHealthchecks: map[types.NamespacedName]sets.String{}, expectedHealthchecks: map[types.NamespacedName]int{},
}, { }, {
// Case[14]: complex add and remove // Case[14]: complex add and remove
newEndpoints: []*api.Endpoints{ newEndpoints: []*api.Endpoints{
@ -2044,8 +2004,8 @@ func Test_buildNewEndpointsMap(t *testing.T) {
endpoint: "4.4.4.6:45", endpoint: "4.4.4.6:45",
servicePortName: makeServicePortName("ns4", "ep4", "p45"), servicePortName: makeServicePortName("ns4", "ep4", "p45"),
}}, }},
expectedHealthchecks: map[types.NamespacedName]sets.String{ expectedHealthchecks: map[types.NamespacedName]int{
types.NamespacedName{Namespace: "ns4", Name: "ep4"}: sets.NewString("ns4/ep4"), makeNSN("ns4", "ep4"): 1,
}, },
}} }}