From ee505677f70439ff8f5515cc899eab7c18cdb1dd Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 22 May 2017 09:01:17 +0200 Subject: [PATCH] Address remaining TODOs in kube-proxy. --- pkg/proxy/iptables/proxier.go | 84 +++++++++++------------------------ 1 file changed, 25 insertions(+), 59 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index b1015add671..41837bc775e 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -30,6 +30,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/golang/glog" @@ -237,14 +238,8 @@ func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, prev change.current = endpointsToEndpointsMap(current, ecm.hostname) if reflect.DeepEqual(change.previous, change.current) { delete(ecm.items, *namespacedName) - return false } - // TODO: Instead of returning true/false, we should consider returning whether - // the map contains some element or not. Currently, if the change is - // "reverting" some previous endpoints update, but there are still some other - // modified endpoints, we will return false, even though there are some change - // to apply. - return true + return len(ecm.items) > 0 } func newServiceChangeMap() serviceChangeMap { @@ -266,14 +261,8 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo change.current = serviceToServiceMap(current) if reflect.DeepEqual(change.previous, change.current) { delete(scm.items, *namespacedName) - return false } - // TODO: Instead of returning true/false, we should consider returning whether - // the map contains some element or not. Currently, if the change is - // "reverting" some previous endpoints update, but there are still some other - // modified endpoints, we will return false, even though there are some change - // to apply. - return true + return len(scm.items) > 0 } func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { @@ -340,6 +329,7 @@ type Proxier struct { // with some partial data after kube-proxy restart. endpointsSynced bool servicesSynced bool + initialized int32 throttle flowcontrol.RateLimiter @@ -595,41 +585,35 @@ func (proxier *Proxier) SyncLoop() { } } +func (proxier *Proxier) setInitialized(value bool) { + var initialized int32 + if value { + initialized = 1 + } + atomic.StoreInt32(&proxier.initialized, initialized) +} + +func (proxier *Proxier) isInitialized() bool { + return atomic.LoadInt32(&proxier.initialized) > 0 +} + func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, nil, service) { - // TODO(wojtek-t): If the initial sync of informer either for endpoints or - // services is not finished, it doesn't make sense to call syncProxyRules - // because it will early-return (to avoid resyncing iptables with partial - // state right after kube-proxy restart). This can eat a token for calling - // syncProxyRules, but is not that critical since it can happen only - // after kube-proxy was (re)started. + if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { proxier.syncProxyRules() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, oldService, service) { - // TODO(wojtek-t): If the initial sync of informer either for endpoints or - // services is not finished, it doesn't make sense to call syncProxyRules - // because it will early-return (to avoid resyncing iptables with partial - // state right after kube-proxy restart). This can eat a token for calling - // syncProxyRules, but is not that critical since it can happen only - // after kube-proxy was (re)started. + if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { proxier.syncProxyRules() } } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, service, nil) { - // TODO(wojtek-t): If the initial sync of informer either for endpoints or - // services is not finished, it doesn't make sense to call syncProxyRules - // because it will early-return (to avoid resyncing iptables with partial - // state right after kube-proxy restart). This can eat a token for calling - // syncProxyRules, but is not that critical since it can happen only - // after kube-proxy was (re)started. + if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { proxier.syncProxyRules() } } @@ -637,6 +621,7 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true + proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() // Call it unconditionally - this is called once per lifetime. proxier.syncProxyRules() @@ -687,39 +672,21 @@ func updateServiceMap( func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) { - // TODO(wojtek-t): If the initial sync of informer either for endpoints or - // services is not finished, it doesn't make sense to call syncProxyRules - // because it will early-return (to avoid resyncing iptables with partial - // state right after kube-proxy restart). This can eat a token for calling - // syncProxyRules, but is not that critical since it can happen only - // after kube-proxy was (re)started. + if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { proxier.syncProxyRules() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) { - // TODO(wojtek-t): If the initial sync of informer either for endpoints or - // services is not finished, it doesn't make sense to call syncProxyRules - // because it will early-return (to avoid resyncing iptables with partial - // state right after kube-proxy restart). This can eat a token for calling - // syncProxyRules, but is not that critical since it can happen only - // after kube-proxy was (re)started. + if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { proxier.syncProxyRules() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) { - // TODO(wojtek-t): If the initial sync of informer either for endpoints or - // services is not finished, it doesn't make sense to call syncProxyRules - // because it will early-return (to avoid resyncing iptables with partial - // state right after kube-proxy restart). This can eat a token for calling - // syncProxyRules, but is not that critical since it can happen only - // after kube-proxy was (re)started. + if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { proxier.syncProxyRules() } } @@ -727,6 +694,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() proxier.endpointsSynced = true + proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() // Call it unconditionally - this is called once per lifetime. proxier.syncProxyRules() @@ -1530,9 +1498,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) - if glog.V(5) { - glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) - } + glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())