Adding EndpointSlice controller

This commit is contained in:
Rob Scott
2019-07-30 15:42:01 -07:00
parent 550fb1bfc3
commit 75f6c24923
52 changed files with 3852 additions and 562 deletions

View File

@@ -45,6 +45,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
"k8s.io/kubernetes/pkg/util/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -113,7 +114,7 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced
e.triggerTimeTracker = NewTriggerTimeTracker()
e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
e.eventBroadcaster = broadcaster
e.eventRecorder = recorder
@@ -161,7 +162,7 @@ type EndpointController struct {
// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
// annotation.
triggerTimeTracker *TriggerTimeTracker
triggerTimeTracker *endpointutil.TriggerTimeTracker
endpointUpdatesBatchPeriod time.Duration
}
@@ -267,124 +268,33 @@ func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
}}
}
func podChanged(oldPod, newPod *v1.Pod) bool {
// If the pod's deletion timestamp is set, remove endpoint from ready address.
if newPod.DeletionTimestamp != oldPod.DeletionTimestamp {
return true
}
// If the pod's readiness has changed, the associated endpoint address
// will move from the unready endpoints set to the ready endpoints.
// So for the purposes of an endpoint, a readiness change on a pod
// means we have a changed pod.
if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) {
return true
}
// Convert the pod to an EndpointAddress, clear inert fields,
// and see if they are the same. Even in a dual stack (multi pod IP) a pod
// will never change just one of its IPs, it will always change all. the below
// comparison to check if a pod has changed will still work
newEndpointAddress := podToEndpointAddress(newPod)
oldEndpointAddress := podToEndpointAddress(oldPod)
// Ignore the ResourceVersion because it changes
// with every pod update. This allows the comparison to
// show equality if all other relevant fields match.
newEndpointAddress.TargetRef.ResourceVersion = ""
oldEndpointAddress.TargetRef.ResourceVersion = ""
if reflect.DeepEqual(newEndpointAddress, oldEndpointAddress) {
// The pod has not changed in any way that impacts the endpoints
return false
}
return true
}
func endpointChanged(pod1, pod2 *v1.Pod) bool {
endpointAddress1 := podToEndpointAddress(pod1)
endpointAddress2 := podToEndpointAddress(pod2)
func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
if podChanged {
// if the labels and pod changed, all services need to be updated
services = services.Union(oldServices)
} else {
// if only the labels changed, services not common to
// both the new and old service set (i.e the disjunctive union)
// need to be updated
services = services.Difference(oldServices).Union(oldServices.Difference(services))
}
return services
endpointAddress1.TargetRef.ResourceVersion = ""
endpointAddress2.TargetRef.ResourceVersion = ""
return !reflect.DeepEqual(endpointAddress1, endpointAddress2)
}
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *v1.Pod types.
func (e *EndpointController) updatePod(old, cur interface{}) {
newPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if newPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
podChangedFlag := podChanged(oldPod, newPod)
// Check if the pod labels have changed, indicating a possible
// change in the service membership
labelsChanged := false
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
!hostNameAndDomainAreEqual(newPod, oldPod) {
labelsChanged = true
}
// If both the pod and labels are unchanged, no update is needed
if !podChangedFlag && !labelsChanged {
return
}
services, err := e.getPodServiceMemberships(newPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
return
}
if labelsChanged {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
return
}
services = determineNeededServiceUpdates(oldServices, services, podChangedFlag)
}
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur, endpointChanged)
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
}
func hostNameAndDomainAreEqual(pod1, pod2 *v1.Pod) bool {
return pod1.Spec.Hostname == pod2.Spec.Hostname &&
pod1.Spec.Subdomain == pod2.Spec.Subdomain
}
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
if _, ok := obj.(*v1.Pod); ok {
// Enqueue all the services that the pod used to be a member
// of. This happens to be exactly the same thing we do when a
// pod is added.
e.addPod(obj)
return
pod := endpointutil.GetPodFromDeleteAction(obj)
if pod != nil {
e.addPod(pod)
}
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
pod, ok := tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
return
}
klog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name)
e.addPod(pod)
}
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
@@ -462,7 +372,7 @@ func (e *EndpointController) syncService(key string) error {
if err != nil && !errors.IsNotFound(err) {
return err
}
e.triggerTimeTracker.DeleteEndpoints(namespace, name)
e.triggerTimeTracker.DeleteService(namespace, name)
return nil
}
@@ -491,11 +401,11 @@ func (e *EndpointController) syncService(key string) error {
}
}
// We call ComputeEndpointsLastChangeTriggerTime here to make sure that the state of the trigger
// time tracker gets updated even if the sync turns out to be no-op and we don't update the
// endpoints object.
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the endpoints object.
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
ComputeEndpointsLastChangeTriggerTime(namespace, name, service, pods)
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
subsets := []v1.EndpointSubset{}
var totalReadyEps int