add service selector cache in endpoint controller and endpointSlice controller

This commit is contained in:
louisgong
2019-10-22 19:57:28 +08:00
parent 74cbf0dc33
commit 2133f9e9ff
6 changed files with 309 additions and 41 deletions

View File

@@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@@ -46,8 +47,6 @@ import (
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
utilnet "k8s.io/utils/net"
)
@@ -93,11 +92,11 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
}
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.enqueueService,
AddFunc: e.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {
e.enqueueService(cur)
e.onServiceUpdate(cur)
},
DeleteFunc: e.enqueueService,
DeleteFunc: e.onServiceDelete,
})
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynced
@@ -119,6 +118,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
e.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
return e
}
@@ -164,6 +165,10 @@ type EndpointController struct {
triggerTimeTracker *endpointutil.TriggerTimeTracker
endpointUpdatesBatchPeriod time.Duration
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
// to AsSelectorPreValidated (see #73527)
serviceSelectorCache *endpointutil.ServiceSelectorCache
}
// Run will not return until stopCh is closed. workers determines how many
@@ -195,7 +200,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
services, err := endpointutil.GetPodServiceMemberships(e.serviceLister, pod)
services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
return
@@ -263,7 +268,7 @@ func endpointChanged(pod1, pod2 *v1.Pod) bool {
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *v1.Pod types.
func (e *EndpointController) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur, endpointChanged)
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged)
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
@@ -278,14 +283,27 @@ func (e *EndpointController) deletePod(obj interface{}) {
}
}
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) enqueueService(obj interface{}) {
// onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
func (e *EndpointController) onServiceUpdate(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
e.queue.Add(key)
}
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
func (e *EndpointController) onServiceDelete(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
e.serviceSelectorCache.Delete(key)
e.queue.Add(key)
}