Simplify stale-connection detection in kube-proxy
This commit is contained in:
@@ -48,7 +48,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||
utilversion "k8s.io/kubernetes/pkg/util/version"
|
||||
)
|
||||
@@ -556,15 +555,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||
proxier.deleteServiceConnections(staleUDPServices.List())
|
||||
}
|
||||
|
||||
// Generate a list of ip strings from the list of endpoint infos
|
||||
func flattenEndpointsInfo(endPoints []*endpointsInfo) []string {
|
||||
var endpointIPs []string
|
||||
for _, ep := range endPoints {
|
||||
endpointIPs = append(endpointIPs, ep.endpoint)
|
||||
}
|
||||
return endpointIPs
|
||||
}
|
||||
|
||||
// Reconstruct the list of endpoint infos from the endpointIP list
|
||||
// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos
|
||||
// from the full []hostPortInfo slice.
|
||||
@@ -614,28 +604,34 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
|
||||
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
|
||||
func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string,
|
||||
healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) {
|
||||
healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, staleSet map[endpointServicePair]bool) {
|
||||
|
||||
// return values
|
||||
newMap = make(map[proxy.ServicePortName][]*endpointsInfo)
|
||||
stale = make(map[endpointServicePair]bool)
|
||||
staleSet = make(map[endpointServicePair]bool)
|
||||
|
||||
// local
|
||||
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
|
||||
|
||||
// Update endpoints for services.
|
||||
for i := range allEndpoints {
|
||||
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap, &stale)
|
||||
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap)
|
||||
}
|
||||
// Check stale connections against endpoints missing from the update.
|
||||
// TODO: we should really only mark a connection stale if the proto was UDP
|
||||
// and the (ip, port, proto) was removed from the endpoints.
|
||||
for svcPort := range curMap {
|
||||
if _, found := newMap[svcPort]; !found {
|
||||
glog.V(3).Infof("Removing endpoints for %q", svcPort)
|
||||
// record endpoints of unactive service to stale connections
|
||||
for _, ep := range curMap[svcPort] {
|
||||
stale[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
|
||||
for svcPort, epList := range curMap {
|
||||
for _, ep := range epList {
|
||||
stale := true
|
||||
for i := range newMap[svcPort] {
|
||||
if *newMap[svcPort][i] == *ep {
|
||||
stale = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if stale {
|
||||
glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint)
|
||||
staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -652,7 +648,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
|
||||
updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker)
|
||||
}
|
||||
|
||||
return newMap, stale
|
||||
return newMap, staleSet
|
||||
}
|
||||
|
||||
// Gather information about all the endpoint state for a given api.Endpoints.
|
||||
@@ -667,8 +663,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
|
||||
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
|
||||
curEndpoints map[proxy.ServicePortName][]*endpointsInfo,
|
||||
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo,
|
||||
svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo,
|
||||
staleConnections *map[endpointServicePair]bool) {
|
||||
svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) {
|
||||
|
||||
// We need to build a map of portname -> all ip:ports for that
|
||||
// portname. Explode Endpoints.Subsets[*] into this structure.
|
||||
@@ -694,16 +689,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
|
||||
// Decompose the lists of endpoints into details of what was changed for the caller.
|
||||
for svcPort, hostPortInfos := range *svcPortToInfoMap {
|
||||
newEPList := flattenValidEndpoints(hostPortInfos)
|
||||
// Flatten the list of current endpoint infos to just a list of ips as strings
|
||||
curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort])
|
||||
if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) {
|
||||
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
|
||||
// Gather stale connections to removed endpoints
|
||||
removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEPList)
|
||||
for _, ep := range removedEndpoints {
|
||||
(*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
|
||||
}
|
||||
}
|
||||
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
|
||||
// Once the set operations using the list of ips are complete, build the list of endpoint infos
|
||||
(*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList)
|
||||
@@ -738,17 +723,6 @@ func isValidEndpoint(hpp *hostPortInfo) bool {
|
||||
return hpp.host != "" && hpp.port > 0
|
||||
}
|
||||
|
||||
// Tests whether two slices are equivalent. This sorts both slices in-place.
|
||||
func slicesEquiv(lhs, rhs []string) bool {
|
||||
if len(lhs) != len(rhs) {
|
||||
return false
|
||||
}
|
||||
if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func flattenValidEndpoints(endpoints []hostPortInfo) []string {
|
||||
// Convert Endpoint objects into strings for easier use later.
|
||||
var result []string
|
||||
@@ -803,11 +777,6 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp
|
||||
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
|
||||
}
|
||||
|
||||
// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints
|
||||
func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string {
|
||||
return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List()
|
||||
}
|
||||
|
||||
type endpointServicePair struct {
|
||||
endpoint string
|
||||
servicePortName proxy.ServicePortName
|
||||
|
||||
Reference in New Issue
Block a user