351 lines
13 KiB
Go
351 lines
13 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package proxy
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
discovery "k8s.io/api/discovery/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/tools/events"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/proxy/metrics"
|
|
)
|
|
|
|
var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
|
|
discovery.AddressTypeIPv4,
|
|
discovery.AddressTypeIPv6,
|
|
)
|
|
|
|
// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
|
|
// Endpoints, keyed by their namespace and name.
|
|
type EndpointsChangeTracker struct {
|
|
// lock protects lastChangeTriggerTimes
|
|
lock sync.Mutex
|
|
|
|
// processEndpointsMapChange is invoked by the apply function on every change.
|
|
// This function should not modify the EndpointsMaps, but just use the changes for
|
|
// any Proxier-specific cleanup.
|
|
processEndpointsMapChange processEndpointsMapChangeFunc
|
|
|
|
// endpointSliceCache holds a simplified version of endpoint slices.
|
|
endpointSliceCache *EndpointSliceCache
|
|
|
|
// lastChangeTriggerTimes maps from the Service's NamespacedName to the times of
|
|
// the triggers that caused its EndpointSlice objects to change. Used to calculate
|
|
// the network-programming-latency metric.
|
|
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
|
|
// trackerStartTime is the time when the EndpointsChangeTracker was created, so
|
|
// we can avoid generating network-programming-latency metrics for changes that
|
|
// occurred before that.
|
|
trackerStartTime time.Time
|
|
}
|
|
|
|
type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
|
|
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
|
|
|
|
// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
|
|
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
|
|
return &EndpointsChangeTracker{
|
|
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
|
trackerStartTime: time.Now(),
|
|
processEndpointsMapChange: processEndpointsMapChange,
|
|
endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
|
|
}
|
|
}
|
|
|
|
// EndpointSliceUpdate updates the EndpointsChangeTracker by adding/updating or removing
|
|
// endpointSlice (depending on removeSlice). It returns true if this update contained a
|
|
// change that needs to be synced; note that this is different from the return value of
|
|
// ServiceChangeTracker.Update().
|
|
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
|
|
if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
|
|
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
|
|
return false
|
|
}
|
|
|
|
// This should never happen
|
|
if endpointSlice == nil {
|
|
klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
|
|
return false
|
|
}
|
|
|
|
namespacedName, _, err := endpointSliceCacheKeys(endpointSlice)
|
|
if err != nil {
|
|
klog.InfoS("Error getting endpoint slice cache keys", "err", err)
|
|
return false
|
|
}
|
|
|
|
metrics.EndpointChangesTotal.Inc()
|
|
|
|
ect.lock.Lock()
|
|
defer ect.lock.Unlock()
|
|
|
|
changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice)
|
|
|
|
if changeNeeded {
|
|
metrics.EndpointChangesPending.Inc()
|
|
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
|
|
// by-definition coming from the time of last update, which is not what
|
|
// we want to measure. So we simply ignore it in this cases.
|
|
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
|
|
// when other EndpointSlice for that service still exist.
|
|
if removeSlice {
|
|
delete(ect.lastChangeTriggerTimes, namespacedName)
|
|
} else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) {
|
|
ect.lastChangeTriggerTimes[namespacedName] =
|
|
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
|
}
|
|
}
|
|
|
|
return changeNeeded
|
|
}
|
|
|
|
// checkoutChanges returns a map of pending endpointsChanges and marks them as
|
|
// applied.
|
|
func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange {
|
|
metrics.EndpointChangesPending.Set(0)
|
|
|
|
return ect.endpointSliceCache.checkoutChanges()
|
|
}
|
|
|
|
// checkoutTriggerTimes applies the locally cached trigger times to a map of
|
|
// trigger times that have been passed in and empties the local cache.
|
|
func (ect *EndpointsChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
|
ect.lock.Lock()
|
|
defer ect.lock.Unlock()
|
|
|
|
for k, v := range ect.lastChangeTriggerTimes {
|
|
prev, ok := (*lastChangeTriggerTimes)[k]
|
|
if !ok {
|
|
(*lastChangeTriggerTimes)[k] = v
|
|
} else {
|
|
(*lastChangeTriggerTimes)[k] = append(prev, v...)
|
|
}
|
|
}
|
|
ect.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
|
}
|
|
|
|
// getLastChangeTriggerTime returns the time.Time value of the
|
|
// EndpointsLastChangeTriggerTime annotation stored in the given endpoints
|
|
// object or the "zero" time if the annotation wasn't set or was set
|
|
// incorrectly.
|
|
func getLastChangeTriggerTime(annotations map[string]string) time.Time {
|
|
// TODO(#81360): ignore case when Endpoint is deleted.
|
|
if _, ok := annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
|
|
// It's possible that the Endpoints object won't have the
|
|
// EndpointsLastChangeTriggerTime annotation set. In that case return
|
|
// the 'zero value', which is ignored in the upstream code.
|
|
return time.Time{}
|
|
}
|
|
val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime])
|
|
if err != nil {
|
|
klog.ErrorS(err, "Error while parsing EndpointsLastChangeTriggerTimeAnnotation",
|
|
"value", annotations[v1.EndpointsLastChangeTriggerTime])
|
|
// In case of error val = time.Zero, which is ignored in the upstream code.
|
|
}
|
|
return val
|
|
}
|
|
|
|
// endpointsChange contains all changes to endpoints that happened since proxy
|
|
// rules were synced. For a single object, changes are accumulated, i.e.
|
|
// previous is state from before applying the changes, current is state after
|
|
// applying the changes.
|
|
type endpointsChange struct {
|
|
previous EndpointsMap
|
|
current EndpointsMap
|
|
}
|
|
|
|
// UpdateEndpointsMapResult is the updated results after applying endpoints changes.
|
|
type UpdateEndpointsMapResult struct {
|
|
// UpdatedServices lists the names of all services with added/updated/deleted
|
|
// endpoints since the last Update.
|
|
UpdatedServices sets.Set[types.NamespacedName]
|
|
|
|
// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
|
|
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
|
|
// ensure that no further traffic for the Service gets delivered to them.
|
|
DeletedUDPEndpoints []ServiceEndpoint
|
|
|
|
// NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to
|
|
// non-0 endpoints. Existing conntrack entries caching the fact that these
|
|
// services are black holes must be deleted to ensure that traffic can immediately
|
|
// begin flowing to the new endpoints.
|
|
NewlyActiveUDPServices []ServicePortName
|
|
|
|
// List of the trigger times for all endpoints objects that changed. It's used to export the
|
|
// network programming latency.
|
|
// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
|
|
LastChangeTriggerTimes map[types.NamespacedName][]time.Time
|
|
}
|
|
|
|
// EndpointsMap maps a service name to a list of all its Endpoints.
|
|
type EndpointsMap map[ServicePortName][]Endpoint
|
|
|
|
// Update updates em based on the changes in ect, returns information about the diff since
|
|
// the last Update, triggers processEndpointsMapChange on every change, and clears the
|
|
// changes map.
|
|
func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
|
|
result := UpdateEndpointsMapResult{
|
|
UpdatedServices: sets.New[types.NamespacedName](),
|
|
DeletedUDPEndpoints: make([]ServiceEndpoint, 0),
|
|
NewlyActiveUDPServices: make([]ServicePortName, 0),
|
|
LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
|
}
|
|
if ect == nil {
|
|
return result
|
|
}
|
|
|
|
changes := ect.checkoutChanges()
|
|
for nn, change := range changes {
|
|
if ect.processEndpointsMapChange != nil {
|
|
ect.processEndpointsMapChange(change.previous, change.current)
|
|
}
|
|
result.UpdatedServices.Insert(nn)
|
|
|
|
em.unmerge(change.previous)
|
|
em.merge(change.current)
|
|
detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)
|
|
}
|
|
ect.checkoutTriggerTimes(&result.LastChangeTriggerTimes)
|
|
|
|
return result
|
|
}
|
|
|
|
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
|
|
func (em EndpointsMap) merge(other EndpointsMap) {
|
|
for svcPortName := range other {
|
|
em[svcPortName] = other[svcPortName]
|
|
}
|
|
}
|
|
|
|
// Unmerge removes the <service, endpoints> pairs from the current EndpointsMap which are contained in the EndpointsMap passed in.
|
|
func (em EndpointsMap) unmerge(other EndpointsMap) {
|
|
for svcPortName := range other {
|
|
delete(em, svcPortName)
|
|
}
|
|
}
|
|
|
|
// getLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
|
|
func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.Set[string] {
|
|
localIPs := make(map[types.NamespacedName]sets.Set[string])
|
|
for svcPortName, epList := range em {
|
|
for _, ep := range epList {
|
|
// Only add ready endpoints for health checking. Terminating endpoints may still serve traffic
|
|
// but the health check signal should fail if there are only terminating endpoints on a node.
|
|
if !ep.IsReady() {
|
|
continue
|
|
}
|
|
|
|
if ep.IsLocal() {
|
|
nsn := svcPortName.NamespacedName
|
|
if localIPs[nsn] == nil {
|
|
localIPs[nsn] = sets.New[string]()
|
|
}
|
|
localIPs[nsn].Insert(ep.IP())
|
|
}
|
|
}
|
|
}
|
|
return localIPs
|
|
}
|
|
|
|
// LocalReadyEndpoints returns a map of Service names to the number of local ready
|
|
// endpoints for that service.
|
|
func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
|
|
// TODO: If this will appear to be computationally expensive, consider
|
|
// computing this incrementally similarly to endpointsMap.
|
|
|
|
// (Note that we need to call getLocalEndpointIPs first to squash the data by IP,
|
|
// because the EndpointsMap is sorted by IP+port, not just IP, and we want to
|
|
// consider a Service pointing to 10.0.0.1:80 and 10.0.0.1:443 to have 1 endpoint,
|
|
// not 2.)
|
|
|
|
eps := make(map[types.NamespacedName]int)
|
|
localIPs := em.getLocalReadyEndpointIPs()
|
|
for nsn, ips := range localIPs {
|
|
eps[nsn] = len(ips)
|
|
}
|
|
return eps
|
|
}
|
|
|
|
// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
|
|
// (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
|
|
func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
|
|
// Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
|
|
// are no longer sending to newEndpointsMap. The proxier should make sure that
|
|
// conntrack does not accidentally route any new connections to them.
|
|
for svcPortName, epList := range oldEndpointsMap {
|
|
if svcPortName.Protocol != v1.ProtocolUDP {
|
|
continue
|
|
}
|
|
|
|
for _, ep := range epList {
|
|
// If the old endpoint wasn't Serving then there can't be stale
|
|
// conntrack entries since there was no traffic sent to it.
|
|
if !ep.IsServing() {
|
|
continue
|
|
}
|
|
|
|
deleted := true
|
|
// Check if the endpoint has changed, including if it went from
|
|
// serving to not serving. If it did change stale entries for the old
|
|
// endpoint have to be cleared.
|
|
for i := range newEndpointsMap[svcPortName] {
|
|
if newEndpointsMap[svcPortName][i].String() == ep.String() {
|
|
deleted = false
|
|
break
|
|
}
|
|
}
|
|
if deleted {
|
|
klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep)
|
|
*deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Detect services that have gone from 0 to non-0 ready endpoints. If there were
|
|
// previously 0 endpoints, but someone tried to connect to it, then a conntrack
|
|
// entry may have been created blackholing traffic to that IP, which should be
|
|
// deleted now.
|
|
for svcPortName, epList := range newEndpointsMap {
|
|
if svcPortName.Protocol != v1.ProtocolUDP {
|
|
continue
|
|
}
|
|
|
|
epServing := 0
|
|
for _, ep := range epList {
|
|
if ep.IsServing() {
|
|
epServing++
|
|
}
|
|
}
|
|
|
|
oldEpServing := 0
|
|
for _, ep := range oldEndpointsMap[svcPortName] {
|
|
if ep.IsServing() {
|
|
oldEpServing++
|
|
}
|
|
}
|
|
|
|
if epServing > 0 && oldEpServing == 0 {
|
|
*newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
|
|
}
|
|
}
|
|
}
|