Fix bug in iptables proxy that clobbered endpoints

There is a race at startup where the two watch operations might clobber state
if the initial message comes in the wrong order.
This commit is contained in:
Tim Hockin 2015-09-15 13:07:33 -07:00
parent 9ae90d2149
commit 84a9b0a37a

View File

@ -127,7 +127,6 @@ type serviceInfo struct {
loadBalancerStatus api.LoadBalancerStatus loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity sessionAffinityType api.ServiceAffinity
stickyMaxAgeSeconds int stickyMaxAgeSeconds int
endpoints []string
// Deprecated, but required for back-compat (including e2e) // Deprecated, but required for back-compat (including e2e)
externalIPs []string externalIPs []string
} }
@ -145,6 +144,7 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
type Proxier struct { type Proxier struct {
mu sync.Mutex // protects the following fields mu sync.Mutex // protects the following fields
serviceMap map[proxy.ServicePortName]*serviceInfo serviceMap map[proxy.ServicePortName]*serviceInfo
endpointsMap map[proxy.ServicePortName][]string
portsMap map[localPort]closeable portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
@ -194,6 +194,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
return &Proxier{ return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo), serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
endpointsMap: make(map[proxy.ServicePortName][]string),
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
iptables: ipt, iptables: ipt,
@ -299,7 +300,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
continue continue
} }
if exists { if exists {
//Something changed. // Something changed.
glog.V(3).Infof("Something changed for service %q: removing it", serviceName) glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
delete(proxier.serviceMap, serviceName) delete(proxier.serviceMap, serviceName)
} }
@ -321,10 +322,9 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
} }
} }
for name, info := range proxier.serviceMap { // Remove services missing from the update.
// Check for servicePorts that were not in this update and have no endpoints. for name := range proxier.serviceMap {
// This helps prevent unnecessarily removing and adding services. if !activeServices[name] {
if !activeServices[name] && info.endpoints == nil {
glog.V(1).Infof("Removing service %q", name) glog.V(1).Infof("Removing service %q", name)
delete(proxier.serviceMap, name) delete(proxier.serviceMap, name)
} }
@ -339,7 +339,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true proxier.haveReceivedEndpointsUpdate = true
registeredEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
// Update endpoints for services. // Update endpoints for services.
for i := range allEndpoints { for i := range allEndpoints {
@ -361,33 +361,21 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
for portname := range portsToEndpoints { for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
state, exists := proxier.serviceMap[svcPort] curEndpoints := proxier.endpointsMap[svcPort]
if !exists || state == nil {
state = newServiceInfo(svcPort)
proxier.serviceMap[svcPort] = state
}
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(1).Infof("Setting endpoints for %s to %+v", svcPort, newEndpoints) glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
state.endpoints = newEndpoints proxier.endpointsMap[svcPort] = newEndpoints
} }
registeredEndpoints[svcPort] = true activeEndpoints[svcPort] = true
} }
} }
// Remove endpoints missing from the update. // Remove endpoints missing from the update.
for service, info := range proxier.serviceMap { for name := range proxier.endpointsMap {
// if missing from update and not already set by previous endpoints event if !activeEndpoints[name] {
if _, exists := registeredEndpoints[service]; !exists && info.endpoints != nil { glog.V(2).Infof("Removing endpoints for %q", name)
glog.V(2).Infof("Removing endpoints for %s", service) delete(proxier.endpointsMap, name)
// Set the endpoints to nil, we will check for this in OnServiceUpdate so that we
// only remove ServicePorts that have no endpoints and were not in the service update,
// that way we only remove ServicePorts that were not in both.
proxier.serviceMap[service].endpoints = nil
} }
} }
@ -658,7 +646,7 @@ func (proxier *Proxier) syncProxyRules() {
// can group rules together. // can group rules together.
endpoints := make([]string, 0) endpoints := make([]string, 0)
endpointChains := make([]utiliptables.Chain, 0) endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range svcInfo.endpoints { for _, ep := range proxier.endpointsMap[svcName] {
endpoints = append(endpoints, ep) endpoints = append(endpoints, ep)
endpointChain := servicePortEndpointChainName(svcName, protocol, ep) endpointChain := servicePortEndpointChainName(svcName, protocol, ep)
endpointChains = append(endpointChains, endpointChain) endpointChains = append(endpointChains, endpointChain)