265 lines
9.1 KiB
Go
265 lines
9.1 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package endpoint
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"reflect"
|
|
"sort"
|
|
"sync"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
discovery "k8s.io/api/discovery/v1beta1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
v1listers "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
|
"k8s.io/kubernetes/pkg/controller"
|
|
"k8s.io/kubernetes/pkg/util/hash"
|
|
)
|
|
|
|
// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
|
|
type ServiceSelectorCache struct {
|
|
lock sync.RWMutex
|
|
cache map[string]labels.Selector
|
|
}
|
|
|
|
// NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller.
|
|
func NewServiceSelectorCache() *ServiceSelectorCache {
|
|
return &ServiceSelectorCache{
|
|
cache: map[string]labels.Selector{},
|
|
}
|
|
}
|
|
|
|
// Get return selector and existence in ServiceSelectorCache by key.
|
|
func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) {
|
|
sc.lock.RLock()
|
|
selector, ok := sc.cache[key]
|
|
// fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships
|
|
sc.lock.RUnlock()
|
|
return selector, ok
|
|
}
|
|
|
|
// Update can update or add a selector in ServiceSelectorCache while service's selector changed.
|
|
func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector {
|
|
sc.lock.Lock()
|
|
defer sc.lock.Unlock()
|
|
selector := labels.Set(rawSelector).AsSelectorPreValidated()
|
|
sc.cache[key] = selector
|
|
return selector
|
|
}
|
|
|
|
// Delete can delete selector which exist in ServiceSelectorCache.
|
|
func (sc *ServiceSelectorCache) Delete(key string) {
|
|
sc.lock.Lock()
|
|
defer sc.lock.Unlock()
|
|
delete(sc.cache, key)
|
|
}
|
|
|
|
// GetPodServiceMemberships returns a set of Service keys for Services that have
|
|
// a selector matching the given pod.
|
|
func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
|
|
set := sets.String{}
|
|
services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
|
|
if err != nil {
|
|
return set, err
|
|
}
|
|
|
|
var selector labels.Selector
|
|
for _, service := range services {
|
|
if service.Spec.Selector == nil {
|
|
// if the service has a nil selector this means selectors match nothing, not everything.
|
|
continue
|
|
}
|
|
key, err := controller.KeyFunc(service)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if v, ok := sc.Get(key); ok {
|
|
selector = v
|
|
} else {
|
|
selector = sc.Update(key, service.Spec.Selector)
|
|
}
|
|
|
|
if selector.Matches(labels.Set(pod.Labels)) {
|
|
set.Insert(key)
|
|
}
|
|
}
|
|
return set, nil
|
|
}
|
|
|
|
// EndpointsMatch is a type of function that returns true if pod endpoints match.
|
|
type EndpointsMatch func(*v1.Pod, *v1.Pod) bool
|
|
|
|
// PortMapKey is used to uniquely identify groups of endpoint ports.
|
|
type PortMapKey string
|
|
|
|
// NewPortMapKey generates a PortMapKey from endpoint ports.
|
|
func NewPortMapKey(endpointPorts []discovery.EndpointPort) PortMapKey {
|
|
sort.Sort(portsInOrder(endpointPorts))
|
|
return PortMapKey(DeepHashObjectToString(endpointPorts))
|
|
}
|
|
|
|
// DeepHashObjectToString creates a unique hash string from a go object.
|
|
func DeepHashObjectToString(objectToWrite interface{}) string {
|
|
hasher := md5.New()
|
|
hash.DeepHashObject(hasher, objectToWrite)
|
|
return hex.EncodeToString(hasher.Sum(nil)[0:])
|
|
}
|
|
|
|
// ShouldPodBeInEndpoints returns true if a specified pod should be in an
|
|
// endpoints object.
|
|
func ShouldPodBeInEndpoints(pod *v1.Pod) bool {
|
|
if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
|
|
return false
|
|
}
|
|
|
|
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
|
|
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
|
|
}
|
|
|
|
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
|
|
return pod.Status.Phase != v1.PodSucceeded
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// ShouldSetHostname returns true if the Hostname attribute should be set on an
|
|
// Endpoints Address or EndpointSlice Endpoint.
|
|
func ShouldSetHostname(pod *v1.Pod, svc *v1.Service) bool {
|
|
return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace
|
|
}
|
|
|
|
// PodChanged returns two boolean values, the first returns true if the pod.
|
|
// has changed, the second value returns true if the pod labels have changed.
|
|
func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, bool) {
|
|
// Check if the pod labels have changed, indicating a possible
|
|
// change in the service membership
|
|
labelsChanged := false
|
|
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
|
|
!hostNameAndDomainAreEqual(newPod, oldPod) {
|
|
labelsChanged = true
|
|
}
|
|
|
|
// If the pod's deletion timestamp is set, remove endpoint from ready address.
|
|
if newPod.DeletionTimestamp != oldPod.DeletionTimestamp {
|
|
return true, labelsChanged
|
|
}
|
|
// If the pod's readiness has changed, the associated endpoint address
|
|
// will move from the unready endpoints set to the ready endpoints.
|
|
// So for the purposes of an endpoint, a readiness change on a pod
|
|
// means we have a changed pod.
|
|
if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) {
|
|
return true, labelsChanged
|
|
}
|
|
// Convert the pod to an Endpoint, clear inert fields,
|
|
// and see if they are the same.
|
|
// TODO: Add a watcher for node changes separate from this
|
|
// We don't want to trigger multiple syncs at a pod level when a node changes
|
|
return endpointChanged(newPod, oldPod), labelsChanged
|
|
}
|
|
|
|
// GetServicesToUpdateOnPodChange returns a set of Service keys for Services
|
|
// that have potentially been affected by a change to this pod.
|
|
func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}, endpointChanged EndpointsMatch) sets.String {
|
|
newPod := cur.(*v1.Pod)
|
|
oldPod := old.(*v1.Pod)
|
|
if newPod.ResourceVersion == oldPod.ResourceVersion {
|
|
// Periodic resync will send update events for all known pods.
|
|
// Two different versions of the same pod will always have different RVs
|
|
return sets.String{}
|
|
}
|
|
|
|
podChanged, labelsChanged := PodChanged(oldPod, newPod, endpointChanged)
|
|
|
|
// If both the pod and labels are unchanged, no update is needed
|
|
if !podChanged && !labelsChanged {
|
|
return sets.String{}
|
|
}
|
|
|
|
services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod)
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
|
|
return sets.String{}
|
|
}
|
|
|
|
if labelsChanged {
|
|
oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod)
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
|
|
}
|
|
services = determineNeededServiceUpdates(oldServices, services, podChanged)
|
|
}
|
|
|
|
return services
|
|
}
|
|
|
|
// GetPodFromDeleteAction returns a pointer to a pod if one can be derived from
|
|
// obj (could be a *v1.Pod, or a DeletionFinalStateUnknown marker item).
|
|
func GetPodFromDeleteAction(obj interface{}) *v1.Pod {
|
|
if pod, ok := obj.(*v1.Pod); ok {
|
|
// Enqueue all the services that the pod used to be a member of.
|
|
// This is the same thing we do when we add a pod.
|
|
return pod
|
|
}
|
|
// If we reached here it means the pod was deleted but its final state is unrecorded.
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
|
return nil
|
|
}
|
|
pod, ok := tombstone.Obj.(*v1.Pod)
|
|
if !ok {
|
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
|
|
return nil
|
|
}
|
|
return pod
|
|
}
|
|
|
|
func hostNameAndDomainAreEqual(pod1, pod2 *v1.Pod) bool {
|
|
return pod1.Spec.Hostname == pod2.Spec.Hostname &&
|
|
pod1.Spec.Subdomain == pod2.Spec.Subdomain
|
|
}
|
|
|
|
func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
|
|
if podChanged {
|
|
// if the labels and pod changed, all services need to be updated
|
|
services = services.Union(oldServices)
|
|
} else {
|
|
// if only the labels changed, services not common to both the new
|
|
// and old service set (the disjuntive union) need to be updated
|
|
services = services.Difference(oldServices).Union(oldServices.Difference(services))
|
|
}
|
|
return services
|
|
}
|
|
|
|
// portsInOrder helps sort endpoint ports in a consistent way for hashing.
|
|
type portsInOrder []discovery.EndpointPort
|
|
|
|
func (sl portsInOrder) Len() int { return len(sl) }
|
|
func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
|
|
func (sl portsInOrder) Less(i, j int) bool {
|
|
h1 := DeepHashObjectToString(sl[i])
|
|
h2 := DeepHashObjectToString(sl[j])
|
|
return h1 < h2
|
|
}
|