proxy/iptables: split out service map creation and add testcases

This commit is contained in:
Dan Williams 2016-12-19 17:08:24 -06:00
parent 12e8271cd3
commit eae2b8e9ba
3 changed files with 237 additions and 33 deletions

View File

@ -40,8 +40,10 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/service:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/intstr:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/types",

View File

@ -160,11 +160,13 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
}
}
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap map[proxy.ServicePortName]*serviceInfo
serviceMap proxyServiceMap
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
@ -278,7 +280,7 @@ func NewProxier(ipt utiliptables.Interface,
}
return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
serviceMap: make(proxyServiceMap),
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod,
@ -382,7 +384,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
return encounteredError
}
func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
if info.protocol != port.Protocol || info.port != int(port.Port) || info.nodePort != int(port.NodePort) {
return false
}
@ -438,16 +440,22 @@ func (proxier *Proxier) SyncLoop() {
}
}
// OnServiceUpdate tracks the active set of service proxies.
// They will be synchronized using syncProxyRules()
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
start := time.Now()
defer func() {
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
}()
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.haveReceivedServiceUpdate = true
type healthCheckPort struct {
namespace types.NamespacedName
nodeport int
}
// Accepts a list of Services and the existing service map. Returns the new
// service map, a list of healthcheck ports to add to or remove from the health
// checking listener service, and a set of stale UDP services.
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
healthCheckAdd := make([]healthCheckPort, 0)
healthCheckDel := make([]healthCheckPort, 0)
newServiceMap := make(proxyServiceMap)
for key, value := range oldServiceMap {
newServiceMap[key] = value
}
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
@ -472,15 +480,15 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
Port: servicePort.Name,
}
activeServices[serviceName] = true
info, exists := proxier.serviceMap[serviceName]
if exists && proxier.sameConfig(info, service, servicePort) {
info, exists := newServiceMap[serviceName]
if exists {
if sameConfig(info, service, servicePort) {
// Nothing changed.
continue
}
if exists {
// Something changed.
glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
delete(proxier.serviceMap, serviceName)
delete(newServiceMap, serviceName)
}
serviceIP := net.ParseIP(service.Spec.ClusterIP)
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
@ -501,17 +509,13 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
glog.Errorf("Service does not contain necessary annotation %v",
apiservice.BetaAnnotationHealthCheckNodePort)
} else {
glog.V(4).Infof("Adding health check for %+v, port %v", serviceName.NamespacedName, p)
info.healthCheckNodePort = int(p)
// Turn on healthcheck responder to listen on the health check nodePort
healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort)
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
}
} else {
glog.V(4).Infof("Deleting health check for %+v", serviceName.NamespacedName)
// Delete healthcheck responders, if any, previously listening for this service
healthcheck.DeleteServiceListener(serviceName.NamespacedName, 0)
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
}
proxier.serviceMap[serviceName] = info
newServiceMap[serviceName] = info
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
}
@ -519,24 +523,52 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
staleUDPServices := sets.NewString()
// Remove serviceports missing from the update.
for name, info := range proxier.serviceMap {
for name, info := range newServiceMap {
if !activeServices[name] {
glog.V(1).Infof("Removing service %q", name)
if info.protocol == api.ProtocolUDP {
staleUDPServices.Insert(info.clusterIP.String())
}
delete(proxier.serviceMap, name)
delete(newServiceMap, name)
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
}
}
}
return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices
}
// OnServiceUpdate tracks the active set of service proxies.
// They will be synchronized using syncProxyRules()
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
start := time.Now()
defer func() {
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
}()
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.haveReceivedServiceUpdate = true
newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap)
proxier.serviceMap = newServiceMap
for _, hc := range hcAdd {
glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport)
// Turn on healthcheck responder to listen on the health check nodePort
// FIXME: handle failures from adding the service
healthcheck.AddServiceListener(hc.namespace, hc.nodeport)
}
for _, hc := range hcDel {
// Remove ServiceListener health check nodePorts from the health checker
// TODO - Stats
glog.V(4).Infof("Deleting health check for %+v, port %v", name.NamespacedName, info.healthCheckNodePort)
healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort)
}
}
glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport)
// FIXME: handle failures from deleting the service
healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport)
}
proxier.syncProxyRules()
proxier.deleteServiceConnections(staleUDPServices.List())
}
// Generate a list of ip strings from the list of endpoint infos

View File

@ -25,8 +25,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/intstr"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
)
@ -883,4 +885,172 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
}
}
func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Service {
svc := api.Service{
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: api.ServiceSpec{},
Status: api.ServiceStatus{},
}
svcFunc(&svc)
return svc
}
func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort {
svcPort := api.ServicePort{
Name: name,
Protocol: protocol,
Port: port,
NodePort: nodeport,
TargetPort: intstr.FromInt(targetPort),
}
return append(array, svcPort)
}
func TestBuildServiceMapAddRemove(t *testing.T) {
services := []api.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.16.55.4"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
}),
makeTestService("somewhere-else", "node-port", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeNodePort
svc.Spec.ClusterIP = "172.16.55.10"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
}),
makeTestService("somewhere", "load-balancer", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeLoadBalancer
svc.Spec.ClusterIP = "172.16.55.11"
svc.Spec.LoadBalancerIP = "5.6.7.8"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
svc.Status.LoadBalancer = api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
{IP: "10.1.2.4"},
},
}
}),
makeTestService("somewhere", "only-local-load-balancer", func(svc *api.Service) {
svc.ObjectMeta.Annotations = map[string]string{
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
service.BetaAnnotationHealthCheckNodePort: "345",
}
svc.Spec.Type = api.ServiceTypeLoadBalancer
svc.Spec.ClusterIP = "172.16.55.12"
svc.Spec.LoadBalancerIP = "5.6.7.8"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
svc.Status.LoadBalancer = api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
{IP: "10.1.2.3"},
},
}
}),
}
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap))
if len(serviceMap) != 8 {
t.Errorf("expected service map length 8, got %v", serviceMap)
}
// The only-local-loadbalancer ones get added
if len(hcAdd) != 2 {
t.Errorf("expected healthcheck add length 2, got %v", hcAdd)
} else {
for _, hc := range hcAdd {
if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" {
t.Errorf("unexpected healthcheck listener added: %v", hc)
}
}
}
// All the rest get deleted
if len(hcDel) != 6 {
t.Errorf("expected healthcheck del length 6, got %v", hcDel)
} else {
for _, hc := range hcDel {
if hc.namespace.Namespace == "somewhere" && hc.namespace.Name == "only-local-load-balancer" {
t.Errorf("unexpected healthcheck listener deleted: %v", hc)
}
}
}
if len(staleUDPServices) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
}
// Remove some stuff
services = []api.Service{services[0]}
services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]}
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap)
if len(serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", serviceMap)
}
if len(hcAdd) != 0 {
t.Errorf("expected healthcheck add length 1, got %v", hcAdd)
}
// The only OnlyLocal annotation was removed above, so we expect a delete now.
// FIXME: Since the BetaAnnotationHealthCheckNodePort is the same for all
// ServicePorts, we'll get one delete per ServicePort, even though they all
// contain the same information
if len(hcDel) != 2 {
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
} else {
for _, hc := range hcDel {
if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" {
t.Errorf("unexpected healthcheck listener deleted: %v", hc)
}
}
}
// All services but one were deleted. While you'd expect only the ClusterIPs
// from the three deleted services here, we still have the ClusterIP for
// the not-deleted service, because one of it's ServicePorts was deleted.
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
if len(staleUDPServices) != len(expectedStaleUDPServices) {
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), staleUDPServices.List())
}
for _, ip := range expectedStaleUDPServices {
if !staleUDPServices.Has(ip) {
t.Errorf("expected stale UDP service service %s", ip)
}
}
}
func TestBuildServiceMapServiceHeadless(t *testing.T) {
services := []api.Service{
makeTestService("somewhere-else", "headless", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = api.ClusterIPNone
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
}),
}
// Headless service should be ignored
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap))
if len(serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(serviceMap))
}
// No proxied services, so no healthchecks
if len(hcAdd) != 0 {
t.Errorf("expected healthcheck add length 0, got %d", len(hcAdd))
}
if len(hcDel) != 0 {
t.Errorf("expected healthcheck del length 0, got %d", len(hcDel))
}
if len(staleUDPServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
}
}
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.