Address remaining TODOs in kube-proxy.
This commit is contained in:
		| @@ -30,6 +30,7 @@ import ( | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| @@ -237,14 +238,8 @@ func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, prev | ||||
| 	change.current = endpointsToEndpointsMap(current, ecm.hostname) | ||||
| 	if reflect.DeepEqual(change.previous, change.current) { | ||||
| 		delete(ecm.items, *namespacedName) | ||||
| 		return false | ||||
| 	} | ||||
| 	// TODO: Instead of returning true/false, we should consider returning whether | ||||
| 	// the map contains some element or not. Currently, if the change is | ||||
| 	// "reverting" some previous endpoints update, but there are still some other | ||||
| 	// modified endpoints, we will return false, even though there are some change | ||||
| 	// to apply. | ||||
| 	return true | ||||
| 	return len(ecm.items) > 0 | ||||
| } | ||||
|  | ||||
| func newServiceChangeMap() serviceChangeMap { | ||||
| @@ -266,14 +261,8 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo | ||||
| 	change.current = serviceToServiceMap(current) | ||||
| 	if reflect.DeepEqual(change.previous, change.current) { | ||||
| 		delete(scm.items, *namespacedName) | ||||
| 		return false | ||||
| 	} | ||||
| 	// TODO: Instead of returning true/false, we should consider returning whether | ||||
| 	// the map contains some element or not. Currently, if the change is | ||||
| 	// "reverting" some previous endpoints update, but there are still some other | ||||
| 	// modified endpoints, we will return false, even though there are some change | ||||
| 	// to apply. | ||||
| 	return true | ||||
| 	return len(scm.items) > 0 | ||||
| } | ||||
|  | ||||
| func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { | ||||
| @@ -340,6 +329,7 @@ type Proxier struct { | ||||
| 	// with some partial data after kube-proxy restart. | ||||
| 	endpointsSynced bool | ||||
| 	servicesSynced  bool | ||||
| 	initialized     int32 | ||||
|  | ||||
| 	throttle flowcontrol.RateLimiter | ||||
|  | ||||
| @@ -595,41 +585,35 @@ func (proxier *Proxier) SyncLoop() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) setInitialized(value bool) { | ||||
| 	var initialized int32 | ||||
| 	if value { | ||||
| 		initialized = 1 | ||||
| 	} | ||||
| 	atomic.StoreInt32(&proxier.initialized, initialized) | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) isInitialized() bool { | ||||
| 	return atomic.LoadInt32(&proxier.initialized) > 0 | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) OnServiceAdd(service *api.Service) { | ||||
| 	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} | ||||
| 	if proxier.serviceChanges.update(&namespacedName, nil, service) { | ||||
| 		// TODO(wojtek-t): If the initial sync of informer either for endpoints or | ||||
| 		// services is not finished, it doesn't make sense to call syncProxyRules | ||||
| 		// because it will early-return (to avoid resyncing iptables with partial | ||||
| 		// state right after kube-proxy restart). This can eat a token for calling | ||||
| 		// syncProxyRules, but is not that critical since it can happen only | ||||
| 		// after kube-proxy was (re)started. | ||||
| 	if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { | ||||
| 		proxier.syncProxyRules() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { | ||||
| 	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} | ||||
| 	if proxier.serviceChanges.update(&namespacedName, oldService, service) { | ||||
| 		// TODO(wojtek-t): If the initial sync of informer either for endpoints or | ||||
| 		// services is not finished, it doesn't make sense to call syncProxyRules | ||||
| 		// because it will early-return (to avoid resyncing iptables with partial | ||||
| 		// state right after kube-proxy restart). This can eat a token for calling | ||||
| 		// syncProxyRules, but is not that critical since it can happen only | ||||
| 		// after kube-proxy was (re)started. | ||||
| 	if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { | ||||
| 		proxier.syncProxyRules() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) OnServiceDelete(service *api.Service) { | ||||
| 	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} | ||||
| 	if proxier.serviceChanges.update(&namespacedName, service, nil) { | ||||
| 		// TODO(wojtek-t): If the initial sync of informer either for endpoints or | ||||
| 		// services is not finished, it doesn't make sense to call syncProxyRules | ||||
| 		// because it will early-return (to avoid resyncing iptables with partial | ||||
| 		// state right after kube-proxy restart). This can eat a token for calling | ||||
| 		// syncProxyRules, but is not that critical since it can happen only | ||||
| 		// after kube-proxy was (re)started. | ||||
| 	if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { | ||||
| 		proxier.syncProxyRules() | ||||
| 	} | ||||
| } | ||||
| @@ -637,6 +621,7 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) { | ||||
| func (proxier *Proxier) OnServiceSynced() { | ||||
| 	proxier.mu.Lock() | ||||
| 	proxier.servicesSynced = true | ||||
| 	proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) | ||||
| 	proxier.mu.Unlock() | ||||
| 	// Call it unconditionally - this is called once per lifetime. | ||||
| 	proxier.syncProxyRules() | ||||
| @@ -687,39 +672,21 @@ func updateServiceMap( | ||||
|  | ||||
| func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { | ||||
| 	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} | ||||
| 	if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) { | ||||
| 		// TODO(wojtek-t): If the initial sync of informer either for endpoints or | ||||
| 		// services is not finished, it doesn't make sense to call syncProxyRules | ||||
| 		// because it will early-return (to avoid resyncing iptables with partial | ||||
| 		// state right after kube-proxy restart). This can eat a token for calling | ||||
| 		// syncProxyRules, but is not that critical since it can happen only | ||||
| 		// after kube-proxy was (re)started. | ||||
| 	if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { | ||||
| 		proxier.syncProxyRules() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { | ||||
| 	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} | ||||
| 	if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) { | ||||
| 		// TODO(wojtek-t): If the initial sync of informer either for endpoints or | ||||
| 		// services is not finished, it doesn't make sense to call syncProxyRules | ||||
| 		// because it will early-return (to avoid resyncing iptables with partial | ||||
| 		// state right after kube-proxy restart). This can eat a token for calling | ||||
| 		// syncProxyRules, but is not that critical since it can happen only | ||||
| 		// after kube-proxy was (re)started. | ||||
| 	if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { | ||||
| 		proxier.syncProxyRules() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { | ||||
| 	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} | ||||
| 	if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) { | ||||
| 		// TODO(wojtek-t): If the initial sync of informer either for endpoints or | ||||
| 		// services is not finished, it doesn't make sense to call syncProxyRules | ||||
| 		// because it will early-return (to avoid resyncing iptables with partial | ||||
| 		// state right after kube-proxy restart). This can eat a token for calling | ||||
| 		// syncProxyRules, but is not that critical since it can happen only | ||||
| 		// after kube-proxy was (re)started. | ||||
| 	if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { | ||||
| 		proxier.syncProxyRules() | ||||
| 	} | ||||
| } | ||||
| @@ -727,6 +694,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { | ||||
| func (proxier *Proxier) OnEndpointsSynced() { | ||||
| 	proxier.mu.Lock() | ||||
| 	proxier.endpointsSynced = true | ||||
| 	proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) | ||||
| 	proxier.mu.Unlock() | ||||
| 	// Call it unconditionally - this is called once per lifetime. | ||||
| 	proxier.syncProxyRules() | ||||
| @@ -1530,9 +1498,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 	proxier.iptablesData.Write(proxier.natChains.Bytes()) | ||||
| 	proxier.iptablesData.Write(proxier.natRules.Bytes()) | ||||
|  | ||||
| 	if glog.V(5) { | ||||
| 		glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) | ||||
| 	} | ||||
| 	glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) | ||||
| 	err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes()) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Wojciech Tyczynski
					Wojciech Tyczynski