Merge pull request #13988 from thockin/kube-proxy-startup-clobber
Auto commit by PR queue bot
This commit is contained in:
commit
bf990acefa
@ -127,7 +127,6 @@ type serviceInfo struct {
|
|||||||
loadBalancerStatus api.LoadBalancerStatus
|
loadBalancerStatus api.LoadBalancerStatus
|
||||||
sessionAffinityType api.ServiceAffinity
|
sessionAffinityType api.ServiceAffinity
|
||||||
stickyMaxAgeSeconds int
|
stickyMaxAgeSeconds int
|
||||||
endpoints []string
|
|
||||||
// Deprecated, but required for back-compat (including e2e)
|
// Deprecated, but required for back-compat (including e2e)
|
||||||
externalIPs []string
|
externalIPs []string
|
||||||
}
|
}
|
||||||
@ -145,6 +144,7 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
|
|||||||
type Proxier struct {
|
type Proxier struct {
|
||||||
mu sync.Mutex // protects the following fields
|
mu sync.Mutex // protects the following fields
|
||||||
serviceMap map[proxy.ServicePortName]*serviceInfo
|
serviceMap map[proxy.ServicePortName]*serviceInfo
|
||||||
|
endpointsMap map[proxy.ServicePortName][]string
|
||||||
portsMap map[localPort]closeable
|
portsMap map[localPort]closeable
|
||||||
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
||||||
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
|
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
|
||||||
@ -194,6 +194,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
|
|||||||
|
|
||||||
return &Proxier{
|
return &Proxier{
|
||||||
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
||||||
|
endpointsMap: make(map[proxy.ServicePortName][]string),
|
||||||
portsMap: make(map[localPort]closeable),
|
portsMap: make(map[localPort]closeable),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
iptables: ipt,
|
iptables: ipt,
|
||||||
@ -324,10 +325,9 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, info := range proxier.serviceMap {
|
// Remove services missing from the update.
|
||||||
// Check for servicePorts that were not in this update and have no endpoints.
|
for name := range proxier.serviceMap {
|
||||||
// This helps prevent unnecessarily removing and adding services.
|
if !activeServices[name] {
|
||||||
if !activeServices[name] && info.endpoints == nil {
|
|
||||||
glog.V(1).Infof("Removing service %q", name)
|
glog.V(1).Infof("Removing service %q", name)
|
||||||
delete(proxier.serviceMap, name)
|
delete(proxier.serviceMap, name)
|
||||||
}
|
}
|
||||||
@ -342,7 +342,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
|||||||
defer proxier.mu.Unlock()
|
defer proxier.mu.Unlock()
|
||||||
proxier.haveReceivedEndpointsUpdate = true
|
proxier.haveReceivedEndpointsUpdate = true
|
||||||
|
|
||||||
registeredEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
|
activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
|
||||||
|
|
||||||
// Update endpoints for services.
|
// Update endpoints for services.
|
||||||
for i := range allEndpoints {
|
for i := range allEndpoints {
|
||||||
@ -364,33 +364,21 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
|||||||
|
|
||||||
for portname := range portsToEndpoints {
|
for portname := range portsToEndpoints {
|
||||||
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
|
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
|
||||||
state, exists := proxier.serviceMap[svcPort]
|
curEndpoints := proxier.endpointsMap[svcPort]
|
||||||
if !exists || state == nil {
|
|
||||||
state = newServiceInfo(svcPort)
|
|
||||||
proxier.serviceMap[svcPort] = state
|
|
||||||
}
|
|
||||||
curEndpoints := []string{}
|
|
||||||
if state != nil {
|
|
||||||
curEndpoints = state.endpoints
|
|
||||||
}
|
|
||||||
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
||||||
|
|
||||||
if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
|
if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
|
||||||
glog.V(1).Infof("Setting endpoints for %s to %+v", svcPort, newEndpoints)
|
glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
|
||||||
state.endpoints = newEndpoints
|
proxier.endpointsMap[svcPort] = newEndpoints
|
||||||
}
|
}
|
||||||
registeredEndpoints[svcPort] = true
|
activeEndpoints[svcPort] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove endpoints missing from the update.
|
// Remove endpoints missing from the update.
|
||||||
for service, info := range proxier.serviceMap {
|
for name := range proxier.endpointsMap {
|
||||||
// if missing from update and not already set by previous endpoints event
|
if !activeEndpoints[name] {
|
||||||
if _, exists := registeredEndpoints[service]; !exists && info.endpoints != nil {
|
glog.V(2).Infof("Removing endpoints for %q", name)
|
||||||
glog.V(2).Infof("Removing endpoints for %s", service)
|
delete(proxier.endpointsMap, name)
|
||||||
// Set the endpoints to nil, we will check for this in OnServiceUpdate so that we
|
|
||||||
// only remove ServicePorts that have no endpoints and were not in the service update,
|
|
||||||
// that way we only remove ServicePorts that were not in both.
|
|
||||||
proxier.serviceMap[service].endpoints = nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -661,7 +649,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// can group rules together.
|
// can group rules together.
|
||||||
endpoints := make([]string, 0)
|
endpoints := make([]string, 0)
|
||||||
endpointChains := make([]utiliptables.Chain, 0)
|
endpointChains := make([]utiliptables.Chain, 0)
|
||||||
for _, ep := range svcInfo.endpoints {
|
for _, ep := range proxier.endpointsMap[svcName] {
|
||||||
endpoints = append(endpoints, ep)
|
endpoints = append(endpoints, ep)
|
||||||
endpointChain := servicePortEndpointChainName(svcName, protocol, ep)
|
endpointChain := servicePortEndpointChainName(svcName, protocol, ep)
|
||||||
endpointChains = append(endpointChains, endpointChain)
|
endpointChains = append(endpointChains, endpointChain)
|
||||||
|
Loading…
Reference in New Issue
Block a user