proxy/iptables: clean up service map creation

Instead of copying the map, like OnServicesUpdate() used to do and which
was copied into buildServiceMap() to preserve semantics while creating
testcases, start with a new empty map and do deletion checking later.
This commit is contained in:
Dan Williams
2016-12-20 12:24:43 -06:00
parent 6aa784e6f2
commit 5907639140
2 changed files with 141 additions and 89 deletions

View File

@@ -153,11 +153,35 @@ type endpointsInfo struct {
} }
// returns a new serviceInfo struct // returns a new serviceInfo struct
func newServiceInfo(service proxy.ServicePortName) *serviceInfo { func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
return &serviceInfo{ onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
sessionAffinityType: api.ServiceAffinityNone, // default info := &serviceInfo{
clusterIP: net.ParseIP(service.Spec.ClusterIP),
port: int(port.Port),
protocol: port.Protocol,
nodePort: int(port.NodePort),
// Deep-copy in case the service instance changes
loadBalancerStatus: *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer),
sessionAffinityType: service.Spec.SessionAffinity,
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
externalIPs: make([]string, len(service.Spec.ExternalIPs)),
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
} }
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
copy(info.externalIPs, service.Spec.ExternalIPs)
if info.onlyNodeLocalEndpoints {
p := apiservice.GetServiceHealthCheckNodePort(service)
if p == 0 {
glog.Errorf("Service does not contain necessary annotation %v",
apiservice.BetaAnnotationHealthCheckNodePort)
} else {
info.healthCheckNodePort = int(p)
}
}
return info
} }
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
@@ -384,44 +408,6 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
return encounteredError return encounteredError
} }
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
if info.protocol != port.Protocol || info.port != int(port.Port) || info.nodePort != int(port.NodePort) {
return false
}
if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) {
return false
}
if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) {
return false
}
if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) {
return false
}
if info.sessionAffinityType != service.Spec.SessionAffinity {
return false
}
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly()
if info.onlyNodeLocalEndpoints != onlyNodeLocalEndpoints {
return false
}
if !reflect.DeepEqual(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) {
return false
}
return true
}
func ipsEqual(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i] != rhs[i] {
return false
}
}
return true
}
// Sync is called to immediately synchronize the proxier state to iptables // Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
proxier.mu.Lock() proxier.mu.Lock()
@@ -449,16 +435,10 @@ type healthCheckPort struct {
// service map, a list of healthcheck ports to add to or remove from the health // service map, a list of healthcheck ports to add to or remove from the health
// checking listener service, and a set of stale UDP services. // checking listener service, and a set of stale UDP services.
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
newServiceMap := make(proxyServiceMap)
healthCheckAdd := make([]healthCheckPort, 0) healthCheckAdd := make([]healthCheckPort, 0)
healthCheckDel := make([]healthCheckPort, 0) healthCheckDel := make([]healthCheckPort, 0)
newServiceMap := make(proxyServiceMap)
for key, value := range oldServiceMap {
newServiceMap[key] = value
}
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for i := range allServices { for i := range allServices {
service := &allServices[i] service := &allServices[i]
svcName := types.NamespacedName{ svcName := types.NamespacedName{
@@ -484,57 +464,37 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (
NamespacedName: svcName, NamespacedName: svcName,
Port: servicePort.Name, Port: servicePort.Name,
} }
activeServices[serviceName] = true
info, exists := newServiceMap[serviceName] info := newServiceInfo(serviceName, servicePort, service)
if exists { oldInfo, exists := oldServiceMap[serviceName]
if sameConfig(info, service, servicePort) { equal := reflect.DeepEqual(info, oldInfo)
// Nothing changed. if !exists {
continue glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} else if !equal {
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} }
// Something changed.
glog.V(3).Infof("Something changed for service %q: removing it", serviceName) if !exists || !equal {
delete(newServiceMap, serviceName) if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
}
serviceIP := net.ParseIP(service.Spec.ClusterIP)
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info = newServiceInfo(serviceName)
info.clusterIP = serviceIP
info.port = int(servicePort.Port)
info.protocol = servicePort.Protocol
info.nodePort = int(servicePort.NodePort)
info.externalIPs = service.Spec.ExternalIPs
// Deep-copy in case the service instance changes
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.sessionAffinityType = service.Spec.SessionAffinity
info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
if info.onlyNodeLocalEndpoints {
p := apiservice.GetServiceHealthCheckNodePort(service)
if p == 0 {
glog.Errorf("Service does not contain necessary annotation %v",
apiservice.BetaAnnotationHealthCheckNodePort)
} else {
info.healthCheckNodePort = int(p)
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort}) healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
}
} else { } else {
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0}) healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
} }
newServiceMap[serviceName] = info }
newServiceMap[serviceName] = info
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info)) glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
} }
} }
staleUDPServices := sets.NewString() staleUDPServices := sets.NewString()
// Remove serviceports missing from the update. // Remove serviceports missing from the update.
for name, info := range newServiceMap { for name, info := range oldServiceMap {
if !activeServices[name] { if _, exists := newServiceMap[name]; !exists {
glog.V(1).Infof("Removing service %q", name) glog.V(1).Infof("Removing service %q", name)
if info.protocol == api.ProtocolUDP { if info.protocol == api.ProtocolUDP {
staleUDPServices.Insert(info.clusterIP.String()) staleUDPServices.Insert(info.clusterIP.String())
} }
delete(newServiceMap, name)
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort}) healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
} }

View File

@@ -1079,4 +1079,96 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
} }
} }
func TestBuildServiceMapServiceUpdate(t *testing.T) {
first := []api.Service{
makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.16.55.4"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
}),
}
second := []api.Service{
makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.ObjectMeta.Annotations = map[string]string{
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
service.BetaAnnotationHealthCheckNodePort: "345",
}
svc.Spec.Type = api.ServiceTypeLoadBalancer
svc.Spec.ClusterIP = "172.16.55.4"
svc.Spec.LoadBalancerIP = "5.6.7.8"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
svc.Status.LoadBalancer = api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
{IP: "10.1.2.3"},
},
}
}),
}
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(first, make(proxyServiceMap))
if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap)
}
if len(hcAdd) != 0 {
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
}
if len(hcDel) != 2 {
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
}
if len(staleUDPServices) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
}
// Change service to load-balancer
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap)
if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap)
}
if len(hcAdd) != 2 {
t.Errorf("expected healthcheck add length 2, got %v", hcAdd)
}
if len(hcDel) != 0 {
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
}
if len(staleUDPServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
}
// No change; make sure the service map stays the same and there are
// no health-check changes
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap)
if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap)
}
if len(hcAdd) != 0 {
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
}
if len(hcDel) != 0 {
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
}
if len(staleUDPServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
}
// And back to ClusterIP
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(first, serviceMap)
if len(serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", serviceMap)
}
if len(hcAdd) != 0 {
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
}
if len(hcDel) != 2 {
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
}
if len(staleUDPServices) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
}
}
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.