
In particular, fix the description of ServiceChangeTracker.Update's return value, and point out that it's different from EndpointsChangeTracker.EndpointSliceUpdate's (though fortunately, in a way that doesn't matter for any existing code).
245 lines
8.7 KiB
Go
245 lines
8.7 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package proxy
|
|
|
|
import (
|
|
"reflect"
|
|
"sync"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/tools/events"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/proxy/metrics"
|
|
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
|
)
|
|
|
|
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
|
|
// Services, keyed by their namespace and name.
|
|
type ServiceChangeTracker struct {
|
|
// lock protects items.
|
|
lock sync.Mutex
|
|
// items maps a service to its serviceChange.
|
|
items map[types.NamespacedName]*serviceChange
|
|
|
|
// makeServiceInfo allows the proxier to inject customized information when
|
|
// processing services.
|
|
makeServiceInfo makeServicePortFunc
|
|
// processServiceMapChange is invoked by the apply function on every change. This
|
|
// function should not modify the ServicePortMaps, but just use the changes for
|
|
// any Proxier-specific cleanup.
|
|
processServiceMapChange processServiceMapChangeFunc
|
|
|
|
ipFamily v1.IPFamily
|
|
recorder events.EventRecorder
|
|
}
|
|
|
|
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
|
|
type processServiceMapChangeFunc func(previous, current ServicePortMap)
|
|
|
|
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
|
|
// changes are accumulated, i.e. previous is state from before applying the changes,
|
|
// current is state after applying all of the changes.
|
|
type serviceChange struct {
|
|
previous ServicePortMap
|
|
current ServicePortMap
|
|
}
|
|
|
|
// NewServiceChangeTracker initializes a ServiceChangeTracker
|
|
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
|
|
return &ServiceChangeTracker{
|
|
items: make(map[types.NamespacedName]*serviceChange),
|
|
makeServiceInfo: makeServiceInfo,
|
|
recorder: recorder,
|
|
ipFamily: ipFamily,
|
|
processServiceMapChange: processServiceMapChange,
|
|
}
|
|
}
|
|
|
|
// Update updates the ServiceChangeTracker based on the <previous, current> service pair
|
|
// (where either previous or current, but not both, can be nil). It returns true if sct
|
|
// contains changes that need to be synced (whether or not those changes were caused by
|
|
// this update); note that this is different from the return value of
|
|
// EndpointChangeTracker.EndpointSliceUpdate().
|
|
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
|
|
// This is unexpected, we should return false directly.
|
|
if previous == nil && current == nil {
|
|
return false
|
|
}
|
|
|
|
svc := current
|
|
if svc == nil {
|
|
svc = previous
|
|
}
|
|
metrics.ServiceChangesTotal.Inc()
|
|
namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
|
|
|
|
sct.lock.Lock()
|
|
defer sct.lock.Unlock()
|
|
|
|
change, exists := sct.items[namespacedName]
|
|
if !exists {
|
|
change = &serviceChange{}
|
|
change.previous = sct.serviceToServiceMap(previous)
|
|
sct.items[namespacedName] = change
|
|
}
|
|
change.current = sct.serviceToServiceMap(current)
|
|
// if change.previous equal to change.current, it means no change
|
|
if reflect.DeepEqual(change.previous, change.current) {
|
|
delete(sct.items, namespacedName)
|
|
} else {
|
|
klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current))
|
|
}
|
|
metrics.ServiceChangesPending.Set(float64(len(sct.items)))
|
|
return len(sct.items) > 0
|
|
}
|
|
|
|
// ServicePortMap maps a service to its ServicePort.
|
|
type ServicePortMap map[ServicePortName]ServicePort
|
|
|
|
// UpdateServiceMapResult is the updated results after applying service changes.
|
|
type UpdateServiceMapResult struct {
|
|
// UpdatedServices lists the names of all services added/updated/deleted since the
|
|
// last Update.
|
|
UpdatedServices sets.Set[types.NamespacedName]
|
|
|
|
// DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs
|
|
// that had UDP ports. Callers can use this to abort timeout-waits or clear
|
|
// connection-tracking information.
|
|
DeletedUDPClusterIPs sets.Set[string]
|
|
}
|
|
|
|
// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values
|
|
// for all Services in sm with non-zero HealthCheckNodePort.
|
|
func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
|
|
// TODO: If this will appear to be computationally expensive, consider
|
|
// computing this incrementally similarly to svcPortMap.
|
|
ports := make(map[types.NamespacedName]uint16)
|
|
for svcPortName, info := range sm {
|
|
if info.HealthCheckNodePort() != 0 {
|
|
ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
|
|
}
|
|
}
|
|
return ports
|
|
}
|
|
|
|
// serviceToServiceMap translates a single Service object to a ServicePortMap.
|
|
//
|
|
// NOTE: service object should NOT be modified.
|
|
func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap {
|
|
if service == nil {
|
|
return nil
|
|
}
|
|
|
|
if proxyutil.ShouldSkipService(service) {
|
|
return nil
|
|
}
|
|
|
|
clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service)
|
|
if clusterIP == "" {
|
|
return nil
|
|
}
|
|
|
|
svcPortMap := make(ServicePortMap)
|
|
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
|
for i := range service.Spec.Ports {
|
|
servicePort := &service.Spec.Ports[i]
|
|
svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
|
|
baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort)
|
|
if sct.makeServiceInfo != nil {
|
|
svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
|
|
} else {
|
|
svcPortMap[svcPortName] = baseSvcInfo
|
|
}
|
|
}
|
|
return svcPortMap
|
|
}
|
|
|
|
// Update updates ServicePortMap base on the given changes, returns information about the
|
|
// diff since the last Update, triggers processServiceMapChange on every change, and
|
|
// clears the changes map.
|
|
func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult {
|
|
sct.lock.Lock()
|
|
defer sct.lock.Unlock()
|
|
|
|
result := UpdateServiceMapResult{
|
|
UpdatedServices: sets.New[types.NamespacedName](),
|
|
DeletedUDPClusterIPs: sets.New[string](),
|
|
}
|
|
|
|
for nn, change := range sct.items {
|
|
if sct.processServiceMapChange != nil {
|
|
sct.processServiceMapChange(change.previous, change.current)
|
|
}
|
|
result.UpdatedServices.Insert(nn)
|
|
|
|
sm.merge(change.current)
|
|
// filter out the Update event of current changes from previous changes
|
|
// before calling unmerge() so that can skip deleting the Update events.
|
|
change.previous.filter(change.current)
|
|
sm.unmerge(change.previous, result.DeletedUDPClusterIPs)
|
|
}
|
|
// clear changes after applying them to ServicePortMap.
|
|
sct.items = make(map[types.NamespacedName]*serviceChange)
|
|
metrics.ServiceChangesPending.Set(0)
|
|
|
|
return result
|
|
}
|
|
|
|
// merge adds other ServicePortMap's elements to current ServicePortMap.
|
|
// If collision, other ALWAYS win. Otherwise add the other to current.
|
|
// In other words, if some elements in current collisions with other, update the current by other.
|
|
func (sm *ServicePortMap) merge(other ServicePortMap) {
|
|
for svcPortName, info := range other {
|
|
_, exists := (*sm)[svcPortName]
|
|
if !exists {
|
|
klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info)
|
|
} else {
|
|
klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info)
|
|
}
|
|
(*sm)[svcPortName] = info
|
|
}
|
|
}
|
|
|
|
// filter filters out elements from ServicePortMap base on given ports string sets.
|
|
func (sm *ServicePortMap) filter(other ServicePortMap) {
|
|
for svcPortName := range *sm {
|
|
// skip the delete for Update event.
|
|
if _, ok := other[svcPortName]; ok {
|
|
delete(*sm, svcPortName)
|
|
}
|
|
}
|
|
}
|
|
|
|
// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and
|
|
// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs.
|
|
func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) {
|
|
for svcPortName := range other {
|
|
info, exists := (*sm)[svcPortName]
|
|
if exists {
|
|
klog.V(4).InfoS("Removing service port", "portName", svcPortName)
|
|
if info.Protocol() == v1.ProtocolUDP {
|
|
deletedUDPClusterIPs.Insert(info.ClusterIP().String())
|
|
}
|
|
delete(*sm, svcPortName)
|
|
} else {
|
|
klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName)
|
|
}
|
|
}
|
|
}
|