Merge pull request #118729 from danwinship/endpoint-naming
Fix endpoint-related names to use consistent singular/plural
This commit is contained in:
commit
6bc2f2ec22
@ -27,23 +27,23 @@ import (
|
||||
|
||||
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
|
||||
func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap,
|
||||
serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) {
|
||||
serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
|
||||
deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointUpdateResult)
|
||||
deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
|
||||
deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointsUpdateResult)
|
||||
}
|
||||
|
||||
// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related
|
||||
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
|
||||
// may create "black hole" entries for that IP+port. When the service gets endpoints we
|
||||
// need to delete those entries so further traffic doesn't get dropped.
|
||||
func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) {
|
||||
func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
|
||||
conntrackCleanupServiceNodePorts := sets.New[int]()
|
||||
|
||||
// merge newly active services gathered from updateEndpointsMap
|
||||
// merge newly active services gathered from endpointsUpdateResult
|
||||
// a UDP service that changes from 0 to non-0 endpoints is newly active.
|
||||
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
|
||||
for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices {
|
||||
if svcInfo, ok := svcPortMap[svcPortName]; ok {
|
||||
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
|
||||
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
||||
@ -78,8 +78,8 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv
|
||||
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
|
||||
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
|
||||
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
|
||||
func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointUpdateResult proxy.UpdateEndpointMapResult) {
|
||||
for _, epSvcPair := range endpointUpdateResult.DeletedUDPEndpoints {
|
||||
func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints {
|
||||
if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
|
||||
endpointIP := proxyutil.IPPart(epSvcPair.Endpoint)
|
||||
nodePort := svcInfo.NodePort()
|
||||
|
@ -147,9 +147,9 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName)
|
||||
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
|
||||
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
|
||||
|
||||
// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
|
||||
// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
|
||||
// Endpoints, keyed by their namespace and name.
|
||||
type EndpointChangeTracker struct {
|
||||
type EndpointsChangeTracker struct {
|
||||
// lock protects lastChangeTriggerTimes
|
||||
lock sync.Mutex
|
||||
|
||||
@ -159,16 +159,16 @@ type EndpointChangeTracker struct {
|
||||
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
|
||||
// object to change. Used to calculate the network-programming-latency.
|
||||
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
|
||||
// record the time when the endpointChangeTracker was created so we can ignore the endpoints
|
||||
// record the time when the endpointsChangeTracker was created so we can ignore the endpoints
|
||||
// that were generated before, because we can't estimate the network-programming-latency on those.
|
||||
// This is specially problematic on restarts, because we process all the endpoints that may have been
|
||||
// created hours or days before.
|
||||
trackerStartTime time.Time
|
||||
}
|
||||
|
||||
// NewEndpointChangeTracker initializes an EndpointsChangeMap
|
||||
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
|
||||
return &EndpointChangeTracker{
|
||||
// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
|
||||
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
|
||||
return &EndpointsChangeTracker{
|
||||
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
||||
trackerStartTime: time.Now(),
|
||||
processEndpointsMapChange: processEndpointsMapChange,
|
||||
@ -177,9 +177,9 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
|
||||
}
|
||||
|
||||
// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
|
||||
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap.
|
||||
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeTracker.
|
||||
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
|
||||
func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
|
||||
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
|
||||
if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) {
|
||||
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
|
||||
return false
|
||||
@ -225,13 +225,13 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
|
||||
// PendingChanges returns a set whose keys are the names of the services whose endpoints
|
||||
// have changed since the last time ect was used to update an EndpointsMap. (You must call
|
||||
// this _before_ calling em.Update(ect).)
|
||||
func (ect *EndpointChangeTracker) PendingChanges() sets.Set[string] {
|
||||
func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] {
|
||||
return ect.endpointSliceCache.pendingChanges()
|
||||
}
|
||||
|
||||
// checkoutChanges returns a list of pending endpointsChanges and marks them as
|
||||
// applied.
|
||||
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
|
||||
func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange {
|
||||
metrics.EndpointChangesPending.Set(0)
|
||||
|
||||
return ect.endpointSliceCache.checkoutChanges()
|
||||
@ -239,7 +239,7 @@ func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
|
||||
|
||||
// checkoutTriggerTimes applies the locally cached trigger times to a map of
|
||||
// trigger times that have been passed in and empties the local cache.
|
||||
func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||
func (ect *EndpointsChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||
ect.lock.Lock()
|
||||
defer ect.lock.Unlock()
|
||||
|
||||
@ -284,8 +284,8 @@ type endpointsChange struct {
|
||||
current EndpointsMap
|
||||
}
|
||||
|
||||
// UpdateEndpointMapResult is the updated results after applying endpoints changes.
|
||||
type UpdateEndpointMapResult struct {
|
||||
// UpdateEndpointsMapResult is the updated results after applying endpoints changes.
|
||||
type UpdateEndpointsMapResult struct {
|
||||
// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
|
||||
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
|
||||
// ensure that no further traffic for the Service gets delivered to them.
|
||||
@ -304,7 +304,7 @@ type UpdateEndpointMapResult struct {
|
||||
}
|
||||
|
||||
// Update updates endpointsMap base on the given changes.
|
||||
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
|
||||
func (em EndpointsMap) Update(changes *EndpointsChangeTracker) (result UpdateEndpointsMapResult) {
|
||||
result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0)
|
||||
result.NewlyActiveUDPServices = make([]ServicePortName, 0)
|
||||
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
||||
@ -321,7 +321,7 @@ type EndpointsMap map[ServicePortName][]Endpoint
|
||||
// and clear the changes map. In addition it returns (via argument) and resets the
|
||||
// lastChangeTriggerTimes for all endpoints that were changed and will result in syncing
|
||||
// the proxy rules. apply triggers processEndpointsMapChange on every change.
|
||||
func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
|
||||
func (em EndpointsMap) apply(ect *EndpointsChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
|
||||
newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||
if ect == nil {
|
||||
return
|
||||
@ -396,7 +396,7 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
|
||||
}
|
||||
|
||||
// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
|
||||
// (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
|
||||
// (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
|
||||
func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
|
||||
// Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
|
||||
// are no longer sending to newEndpointsMap. The proxier should make sure that
|
||||
|
@ -1340,7 +1340,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
|
||||
testCases := map[string]struct {
|
||||
startingSlices []*discovery.EndpointSlice
|
||||
endpointChangeTracker *EndpointChangeTracker
|
||||
endpointsChangeTracker *EndpointsChangeTracker
|
||||
namespacedName types.NamespacedName
|
||||
paramEndpointSlice *discovery.EndpointSlice
|
||||
paramRemoveSlice bool
|
||||
@ -1350,12 +1350,12 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
}{
|
||||
// test starting from an empty state
|
||||
"add a simple slice that doesn't already exist": {
|
||||
startingSlices: []*discovery.EndpointSlice{},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
startingSlices: []*discovery.EndpointSlice{},
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||
@ -1375,7 +1375,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
startingSlices: []*discovery.EndpointSlice{
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
@ -1388,7 +1388,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
startingSlices: []*discovery.EndpointSlice{
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: fqdnSlice,
|
||||
paramRemoveSlice: false,
|
||||
@ -1402,11 +1402,11 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||
@ -1435,11 +1435,11 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||
@ -1466,11 +1466,11 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: true,
|
||||
expectedReturnVal: true,
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: true,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.2.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||
@ -1489,7 +1489,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: true,
|
||||
@ -1502,11 +1502,11 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
startingSlices: []*discovery.EndpointSlice{
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: false, Serving: false, Terminating: false},
|
||||
@ -1526,11 +1526,11 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
startingSlices: []*discovery.EndpointSlice{
|
||||
generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||
@ -1549,11 +1549,11 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||
@ -1578,11 +1578,11 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
},
|
||||
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
|
||||
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
|
||||
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
|
||||
paramRemoveSlice: false,
|
||||
expectedReturnVal: true,
|
||||
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||
@ -1605,19 +1605,19 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
initializeCache(tc.endpointChangeTracker.endpointSliceCache, tc.startingSlices)
|
||||
initializeCache(tc.endpointsChangeTracker.endpointSliceCache, tc.startingSlices)
|
||||
|
||||
got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
|
||||
got := tc.endpointsChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
|
||||
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
|
||||
t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal)
|
||||
}
|
||||
|
||||
pendingChanges := tc.endpointChangeTracker.PendingChanges()
|
||||
pendingChanges := tc.endpointsChangeTracker.PendingChanges()
|
||||
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
|
||||
t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList())
|
||||
}
|
||||
|
||||
changes := tc.endpointChangeTracker.checkoutChanges()
|
||||
changes := tc.endpointsChangeTracker.checkoutChanges()
|
||||
if tc.expectedCurrentChange == nil {
|
||||
if len(changes) != 0 {
|
||||
t.Errorf("Expected %s to have no changes", tc.namespacedName)
|
||||
@ -1637,20 +1637,20 @@ func TestCheckoutChanges(t *testing.T) {
|
||||
svcPortName1 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-1", v1.ProtocolTCP}
|
||||
|
||||
testCases := map[string]struct {
|
||||
endpointChangeTracker *EndpointChangeTracker
|
||||
expectedChanges []*endpointsChange
|
||||
items map[types.NamespacedName]*endpointsChange
|
||||
appliedSlices []*discovery.EndpointSlice
|
||||
pendingSlices []*discovery.EndpointSlice
|
||||
endpointsChangeTracker *EndpointsChangeTracker
|
||||
expectedChanges []*endpointsChange
|
||||
items map[types.NamespacedName]*endpointsChange
|
||||
appliedSlices []*discovery.EndpointSlice
|
||||
pendingSlices []*discovery.EndpointSlice
|
||||
}{
|
||||
"empty slices": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
|
||||
expectedChanges: []*endpointsChange{},
|
||||
appliedSlices: []*discovery.EndpointSlice{},
|
||||
pendingSlices: []*discovery.EndpointSlice{},
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
|
||||
expectedChanges: []*endpointsChange{},
|
||||
appliedSlices: []*discovery.EndpointSlice{},
|
||||
pendingSlices: []*discovery.EndpointSlice{},
|
||||
},
|
||||
"adding initial slice": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{},
|
||||
current: EndpointsMap{
|
||||
@ -1663,7 +1663,7 @@ func TestCheckoutChanges(t *testing.T) {
|
||||
},
|
||||
},
|
||||
"removing port in update": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
|
||||
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)},
|
||||
@ -1685,13 +1685,13 @@ func TestCheckoutChanges(t *testing.T) {
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
for _, slice := range tc.appliedSlices {
|
||||
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
|
||||
tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false)
|
||||
}
|
||||
tc.endpointChangeTracker.checkoutChanges()
|
||||
tc.endpointsChangeTracker.checkoutChanges()
|
||||
for _, slice := range tc.pendingSlices {
|
||||
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
|
||||
tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false)
|
||||
}
|
||||
changes := tc.endpointChangeTracker.checkoutChanges()
|
||||
changes := tc.endpointsChangeTracker.checkoutChanges()
|
||||
|
||||
if len(tc.expectedChanges) != len(changes) {
|
||||
t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes))
|
||||
@ -1730,7 +1730,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
|
||||
for i := range expected[x] {
|
||||
newEp, ok := newMap[x][i].(*BaseEndpointInfo)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to cast endpointsInfo")
|
||||
t.Fatalf("Failed to cast endpointInfo")
|
||||
}
|
||||
if !endpointEqual(newEp, expected[x][i]) {
|
||||
t.Fatalf("expected new[%v][%d] to be %v, got %v"+
|
||||
|
@ -122,15 +122,15 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro
|
||||
}
|
||||
|
||||
// internal struct for endpoints information
|
||||
type endpointsInfo struct {
|
||||
type endpointInfo struct {
|
||||
*proxy.BaseEndpointInfo
|
||||
|
||||
ChainName utiliptables.Chain
|
||||
}
|
||||
|
||||
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
||||
// returns a new proxy.Endpoint which abstracts a endpointInfo
|
||||
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
|
||||
return &endpointsInfo{
|
||||
return &endpointInfo{
|
||||
BaseEndpointInfo: baseInfo,
|
||||
ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.Endpoint),
|
||||
}
|
||||
@ -143,7 +143,7 @@ type Proxier struct {
|
||||
// services that happened since iptables was synced. For a single object,
|
||||
// changes are accumulated, i.e. previous is state from before all of them,
|
||||
// current is state after applying all of those.
|
||||
endpointsChanges *proxy.EndpointChangeTracker
|
||||
endpointsChanges *proxy.EndpointsChangeTracker
|
||||
serviceChanges *proxy.ServiceChangeTracker
|
||||
|
||||
mu sync.Mutex // protects the following fields
|
||||
@ -265,7 +265,7 @@ func NewProxier(ipFamily v1.IPFamily,
|
||||
svcPortMap: make(proxy.ServicePortMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
|
||||
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
|
||||
needFullSync: true,
|
||||
syncPeriod: syncPeriod,
|
||||
iptables: ipt,
|
||||
@ -952,7 +952,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// Note the endpoint chains that will be used
|
||||
for _, ep := range allLocallyReachableEndpoints {
|
||||
if epInfo, ok := ep.(*endpointsInfo); ok {
|
||||
if epInfo, ok := ep.(*endpointInfo); ok {
|
||||
activeNATChains[epInfo.ChainName] = true
|
||||
}
|
||||
}
|
||||
@ -1345,9 +1345,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// Generate the per-endpoint chains.
|
||||
for _, ep := range allLocallyReachableEndpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
|
||||
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1556,7 +1556,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
|
||||
// First write session affinity rules, if applicable.
|
||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||
for _, ep := range endpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@ -1578,7 +1578,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
|
||||
// Now write loadbalancing rules.
|
||||
numEndpoints := len(endpoints)
|
||||
for i, ep := range endpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -330,7 +330,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
||||
svcPortMap: make(proxy.ServicePortMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil),
|
||||
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil),
|
||||
needFullSync: true,
|
||||
iptables: ipt,
|
||||
masqueradeMark: "0x4000",
|
||||
@ -3371,7 +3371,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
|
||||
proxier.servicesSynced = true
|
||||
}
|
||||
|
||||
func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
|
||||
func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointInfo) {
|
||||
if len(newMap) != len(expected) {
|
||||
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
|
||||
}
|
||||
@ -3380,9 +3380,9 @@ func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.End
|
||||
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
|
||||
} else {
|
||||
for i := range expected[x] {
|
||||
newEp, ok := newMap[x][i].(*endpointsInfo)
|
||||
newEp, ok := newMap[x][i].(*endpointInfo)
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo")
|
||||
t.Errorf("Failed to cast endpointInfo")
|
||||
continue
|
||||
}
|
||||
if newEp.Endpoint != expected[x][i].Endpoint ||
|
||||
@ -3731,16 +3731,16 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name string
|
||||
previousEndpoints []*discovery.EndpointSlice
|
||||
currentEndpoints []*discovery.EndpointSlice
|
||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||
oldEndpoints map[proxy.ServicePortName][]*endpointInfo
|
||||
expectedResult map[proxy.ServicePortName][]*endpointInfo
|
||||
expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
|
||||
expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
|
||||
expectedLocalEndpoints map[types.NamespacedName]int
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
name: "nothing",
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{},
|
||||
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
@ -3749,12 +3749,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "no change, named port, local",
|
||||
previousEndpoints: namedPortLocal,
|
||||
currentEndpoints: namedPortLocal,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -3769,7 +3769,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "no change, multiple subsets",
|
||||
previousEndpoints: multipleSubsets,
|
||||
currentEndpoints: multipleSubsets,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -3777,7 +3777,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -3793,7 +3793,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "no change, multiple subsets, multiple ports, local",
|
||||
previousEndpoints: multipleSubsetsMultiplePortsLocal,
|
||||
currentEndpoints: multipleSubsetsMultiplePortsLocal,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -3804,7 +3804,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -3825,7 +3825,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "no change, multiple endpoints, subsets, IPs, and ports",
|
||||
previousEndpoints: multipleSubsetsIPsPorts,
|
||||
currentEndpoints: multipleSubsetsIPsPorts,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
@ -3851,7 +3851,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
@ -3888,8 +3888,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "add an Endpoints",
|
||||
previousEndpoints: []*discovery.EndpointSlice{nil},
|
||||
currentEndpoints: namedPortLocal,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -3906,12 +3906,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "remove an Endpoints",
|
||||
previousEndpoints: namedPortLocal,
|
||||
currentEndpoints: []*discovery.EndpointSlice{nil},
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{},
|
||||
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||
Endpoint: "10.1.1.1:11",
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
@ -3923,12 +3923,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "add an IP and port",
|
||||
previousEndpoints: namedPort,
|
||||
currentEndpoints: namedPortsLocalNoLocal,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
@ -3950,7 +3950,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "remove an IP and port",
|
||||
previousEndpoints: namedPortsLocalNoLocal,
|
||||
currentEndpoints: namedPort,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
@ -3960,7 +3960,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -3982,12 +3982,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "add a subset",
|
||||
previousEndpoints: []*discovery.EndpointSlice{namedPort[0], nil},
|
||||
currentEndpoints: multipleSubsetsWithLocal,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -4007,7 +4007,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "remove a subset",
|
||||
previousEndpoints: multipleSubsets,
|
||||
currentEndpoints: []*discovery.EndpointSlice{namedPort[0], nil},
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -4015,7 +4015,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -4031,12 +4031,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "rename a port",
|
||||
previousEndpoints: namedPort,
|
||||
currentEndpoints: namedPortRenamed,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -4054,12 +4054,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "renumber a port",
|
||||
previousEndpoints: namedPort,
|
||||
currentEndpoints: namedPortRenumbered,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -4075,7 +4075,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "complex add and remove",
|
||||
previousEndpoints: complexBefore,
|
||||
currentEndpoints: complexAfter,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
@ -4094,7 +4094,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.6:45", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.11:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
@ -4141,8 +4141,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
name: "change from 0 endpoint address to 1 unnamed port",
|
||||
previousEndpoints: emptyEndpointSlices,
|
||||
currentEndpoints: namedPort,
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
|
||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||
},
|
||||
|
@ -226,7 +226,7 @@ type Proxier struct {
|
||||
// services that happened since last syncProxyRules call. For a single object,
|
||||
// changes are accumulated, i.e. previous is state from before all of them,
|
||||
// current is state after applying all of those.
|
||||
endpointsChanges *proxy.EndpointChangeTracker
|
||||
endpointsChanges *proxy.EndpointsChangeTracker
|
||||
serviceChanges *proxy.ServiceChangeTracker
|
||||
|
||||
mu sync.Mutex // protects the following fields
|
||||
@ -425,7 +425,7 @@ func NewProxier(ipFamily v1.IPFamily,
|
||||
svcPortMap: make(proxy.ServicePortMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
|
||||
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil),
|
||||
initialSync: true,
|
||||
syncPeriod: syncPeriod,
|
||||
minSyncPeriod: minSyncPeriod,
|
||||
|
@ -154,7 +154,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
||||
svcPortMap: make(proxy.ServicePortMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, nil),
|
||||
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil),
|
||||
excludeCIDRs: excludeCIDRs,
|
||||
iptables: ipt,
|
||||
ipvs: ipvs,
|
||||
@ -4358,7 +4358,7 @@ raid10 57344 0 - Live 0xffffffffc0597000`,
|
||||
}
|
||||
|
||||
// The majority of EndpointSlice specific tests are not ipvs specific and focus on
|
||||
// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
|
||||
// the shared EndpointsChangeTracker and EndpointSliceCache. This test ensures that the
|
||||
// ipvs proxier supports translating EndpointSlices to ipvs output.
|
||||
func TestEndpointSliceE2E(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
|
@ -595,7 +595,7 @@ func TestServiceToServiceMap(t *testing.T) {
|
||||
}
|
||||
|
||||
type FakeProxier struct {
|
||||
endpointsChanges *EndpointChangeTracker
|
||||
endpointsChanges *EndpointsChangeTracker
|
||||
serviceChanges *ServiceChangeTracker
|
||||
svcPortMap ServicePortMap
|
||||
endpointsMap EndpointsMap
|
||||
@ -607,7 +607,7 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
|
||||
svcPortMap: make(ServicePortMap),
|
||||
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
|
||||
endpointsMap: make(EndpointsMap),
|
||||
endpointsChanges: &EndpointChangeTracker{
|
||||
endpointsChanges: &EndpointsChangeTracker{
|
||||
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
||||
trackerStartTime: t,
|
||||
processEndpointsMapChange: nil,
|
||||
|
@ -32,13 +32,13 @@ import (
|
||||
|
||||
type HostNetworkService interface {
|
||||
getNetworkByName(name string) (*hnsNetworkInfo, error)
|
||||
getAllEndpointsByNetwork(networkName string) (map[string]*endpointsInfo, error)
|
||||
getEndpointByID(id string) (*endpointsInfo, error)
|
||||
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
|
||||
getEndpointByName(id string) (*endpointsInfo, error)
|
||||
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
|
||||
getAllEndpointsByNetwork(networkName string) (map[string]*endpointInfo, error)
|
||||
getEndpointByID(id string) (*endpointInfo, error)
|
||||
getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error)
|
||||
getEndpointByName(id string) (*endpointInfo, error)
|
||||
createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error)
|
||||
deleteEndpoint(hnsID string) error
|
||||
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
|
||||
getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
|
||||
getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error)
|
||||
deleteLoadBalancer(hnsID string) error
|
||||
}
|
||||
@ -87,7 +87,7 @@ func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
|
||||
func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointInfo), error) {
|
||||
hcnnetwork, err := hns.hcn.GetNetworkByName(networkName)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get HNS network by name", "name", networkName)
|
||||
@ -97,7 +97,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list endpoints: %w", err)
|
||||
}
|
||||
endpointInfos := make(map[string]*(endpointsInfo))
|
||||
endpointInfos := make(map[string]*(endpointInfo))
|
||||
for _, ep := range endpoints {
|
||||
|
||||
if len(ep.IpConfigurations) == 0 {
|
||||
@ -108,7 +108,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
|
||||
// Add to map with key endpoint ID or IP address
|
||||
// Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address.
|
||||
// TODO: Store by IP only and remove any lookups by endpoint ID.
|
||||
endpointInfos[ep.Id] = &endpointsInfo{
|
||||
endpointInfos[ep.Id] = &endpointInfo{
|
||||
ip: ep.IpConfigurations[0].IpAddress,
|
||||
isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
|
||||
macAddress: ep.MacAddress,
|
||||
@ -127,7 +127,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
|
||||
|
||||
// If ipFamilyPolicy is RequireDualStack or PreferDualStack, then there will be 2 IPS (iPV4 and IPV6)
|
||||
// in the endpoint list
|
||||
endpointDualstack := &endpointsInfo{
|
||||
endpointDualstack := &endpointInfo{
|
||||
ip: ep.IpConfigurations[1].IpAddress,
|
||||
isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
|
||||
macAddress: ep.MacAddress,
|
||||
@ -145,12 +145,12 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
|
||||
return endpointInfos, nil
|
||||
}
|
||||
|
||||
func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) {
|
||||
func (hns hns) getEndpointByID(id string) (*endpointInfo, error) {
|
||||
hnsendpoint, err := hns.hcn.GetEndpointByID(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &endpointsInfo{ //TODO: fill out PA
|
||||
return &endpointInfo{ //TODO: fill out PA
|
||||
ip: hnsendpoint.IpConfigurations[0].IpAddress,
|
||||
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
|
||||
macAddress: hnsendpoint.MacAddress,
|
||||
@ -158,7 +158,7 @@ func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) {
|
||||
hns: hns,
|
||||
}, nil
|
||||
}
|
||||
func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
|
||||
func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error) {
|
||||
hnsnetwork, err := hns.hcn.GetNetworkByName(networkName)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error getting network by name")
|
||||
@ -179,7 +179,7 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints
|
||||
}
|
||||
}
|
||||
if equal && strings.EqualFold(endpoint.HostComputeNetwork, hnsnetwork.Id) {
|
||||
return &endpointsInfo{
|
||||
return &endpointInfo{
|
||||
ip: ip,
|
||||
isLocal: uint32(endpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
|
||||
macAddress: endpoint.MacAddress,
|
||||
@ -190,12 +190,12 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints
|
||||
}
|
||||
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
|
||||
}
|
||||
func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) {
|
||||
func (hns hns) getEndpointByName(name string) (*endpointInfo, error) {
|
||||
hnsendpoint, err := hns.hcn.GetEndpointByName(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &endpointsInfo{ //TODO: fill out PA
|
||||
return &endpointInfo{ //TODO: fill out PA
|
||||
ip: hnsendpoint.IpConfigurations[0].IpAddress,
|
||||
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
|
||||
macAddress: hnsendpoint.MacAddress,
|
||||
@ -203,7 +203,7 @@ func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) {
|
||||
hns: hns,
|
||||
}, nil
|
||||
}
|
||||
func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
|
||||
func (hns hns) createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error) {
|
||||
hnsNetwork, err := hns.hcn.GetNetworkByName(networkName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -251,7 +251,7 @@ func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpoints
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &endpointsInfo{
|
||||
return &endpointInfo{
|
||||
ip: createdEndpoint.IpConfigurations[0].IpAddress,
|
||||
isLocal: uint32(createdEndpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
|
||||
macAddress: createdEndpoint.MacAddress,
|
||||
@ -273,7 +273,7 @@ func (hns hns) deleteEndpoint(hnsID string) error {
|
||||
}
|
||||
|
||||
// findLoadBalancerID will construct a id from the provided loadbalancer fields
|
||||
func findLoadBalancerID(endpoints []endpointsInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) {
|
||||
func findLoadBalancerID(endpoints []endpointInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) {
|
||||
// Compute hash from backends (endpoint IDs)
|
||||
hash, err := hashEndpoints(endpoints)
|
||||
if err != nil {
|
||||
@ -315,7 +315,7 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
|
||||
return loadBalancers, nil
|
||||
}
|
||||
|
||||
func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
|
||||
func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
|
||||
var id loadBalancerIdentifier
|
||||
vips := []string{}
|
||||
// Compute hash from backends (endpoint IDs)
|
||||
@ -424,7 +424,7 @@ func (hns hns) deleteLoadBalancer(hnsID string) error {
|
||||
}
|
||||
|
||||
// Calculates a hash from the given endpoint IDs.
|
||||
func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) {
|
||||
func hashEndpoints[T string | endpointInfo](endpoints []T) (hash [20]byte, err error) {
|
||||
var id string
|
||||
// Recover in case something goes wrong. Return error and null byte array.
|
||||
defer func() {
|
||||
@ -437,7 +437,7 @@ func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err
|
||||
// Iterate over endpoints, compute hash
|
||||
for _, ep := range endpoints {
|
||||
switch x := any(ep).(type) {
|
||||
case endpointsInfo:
|
||||
case endpointInfo:
|
||||
id = strings.ToUpper(x.hnsID)
|
||||
case string:
|
||||
id = x
|
||||
|
@ -203,7 +203,7 @@ func TestCreateEndpointLocal(t *testing.T) {
|
||||
hns := hns{hcn: newHcnImpl()}
|
||||
Network := mustTestNetwork(t)
|
||||
|
||||
endpoint := &endpointsInfo{
|
||||
endpoint := &endpointInfo{
|
||||
ip: epIpAddress,
|
||||
macAddress: epMacAddress,
|
||||
isLocal: true,
|
||||
@ -242,7 +242,7 @@ func TestCreateEndpointRemote(t *testing.T) {
|
||||
Network := mustTestNetwork(t)
|
||||
providerAddress := epPaAddress
|
||||
|
||||
endpoint := &endpointsInfo{
|
||||
endpoint := &endpointInfo{
|
||||
ip: epIpAddressRemote,
|
||||
macAddress: epMacAddress,
|
||||
isLocal: false,
|
||||
@ -350,11 +350,11 @@ func TestGetLoadBalancerExisting(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
endpoint := &endpointsInfo{
|
||||
endpoint := &endpointInfo{
|
||||
ip: Endpoint.IpConfigurations[0].IpAddress,
|
||||
hnsID: Endpoint.Id,
|
||||
}
|
||||
endpoints := []endpointsInfo{*endpoint}
|
||||
endpoints := []endpointInfo{*endpoint}
|
||||
hash, err := hashEndpoints(endpoints)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@ -409,11 +409,11 @@ func TestGetLoadBalancerNew(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
endpoint := &endpointsInfo{
|
||||
endpoint := &endpointInfo{
|
||||
ip: Endpoint.IpConfigurations[0].IpAddress,
|
||||
hnsID: Endpoint.Id,
|
||||
}
|
||||
endpoints := []endpointsInfo{*endpoint}
|
||||
endpoints := []endpointInfo{*endpoint}
|
||||
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@ -523,7 +523,7 @@ func TestHashEndpoints(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
endpointInfoA := &endpointsInfo{
|
||||
endpointInfoA := &endpointInfo{
|
||||
ip: endpointA.IpConfigurations[0].IpAddress,
|
||||
hnsID: endpointA.Id,
|
||||
}
|
||||
@ -543,12 +543,12 @@ func TestHashEndpoints(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
endpointInfoB := &endpointsInfo{
|
||||
endpointInfoB := &endpointInfo{
|
||||
ip: endpointB.IpConfigurations[0].IpAddress,
|
||||
hnsID: endpointB.Id,
|
||||
}
|
||||
endpoints := []endpointsInfo{*endpointInfoA, *endpointInfoB}
|
||||
endpointsReverse := []endpointsInfo{*endpointInfoB, *endpointInfoA}
|
||||
endpoints := []endpointInfo{*endpointInfoA, *endpointInfoB}
|
||||
endpointsReverse := []endpointInfo{*endpointInfoB, *endpointInfoA}
|
||||
h1, err := hashEndpoints(endpoints)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -123,7 +123,7 @@ type serviceInfo struct {
|
||||
hnsID string
|
||||
nodePorthnsID string
|
||||
policyApplied bool
|
||||
remoteEndpoint *endpointsInfo
|
||||
remoteEndpoint *endpointInfo
|
||||
hns HostNetworkService
|
||||
preserveDIP bool
|
||||
localTrafficDSR bool
|
||||
@ -273,7 +273,7 @@ func (t DualStackCompatTester) DualStackCompatible(networkName string) bool {
|
||||
}
|
||||
|
||||
// internal struct for endpoints information
|
||||
type endpointsInfo struct {
|
||||
type endpointInfo struct {
|
||||
ip string
|
||||
port uint16
|
||||
isLocal bool
|
||||
@ -290,57 +290,57 @@ type endpointsInfo struct {
|
||||
}
|
||||
|
||||
// String is part of proxy.Endpoint interface.
|
||||
func (info *endpointsInfo) String() string {
|
||||
func (info *endpointInfo) String() string {
|
||||
return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port)))
|
||||
}
|
||||
|
||||
// GetIsLocal is part of proxy.Endpoint interface.
|
||||
func (info *endpointsInfo) GetIsLocal() bool {
|
||||
func (info *endpointInfo) GetIsLocal() bool {
|
||||
return info.isLocal
|
||||
}
|
||||
|
||||
// IsReady returns true if an endpoint is ready and not terminating.
|
||||
func (info *endpointsInfo) IsReady() bool {
|
||||
func (info *endpointInfo) IsReady() bool {
|
||||
return info.ready
|
||||
}
|
||||
|
||||
// IsServing returns true if an endpoint is ready, regardless of it's terminating state.
|
||||
func (info *endpointsInfo) IsServing() bool {
|
||||
func (info *endpointInfo) IsServing() bool {
|
||||
return info.serving
|
||||
}
|
||||
|
||||
// IsTerminating returns true if an endpoint is terminating.
|
||||
func (info *endpointsInfo) IsTerminating() bool {
|
||||
func (info *endpointInfo) IsTerminating() bool {
|
||||
return info.terminating
|
||||
}
|
||||
|
||||
// GetZoneHint returns the zone hint for the endpoint.
|
||||
func (info *endpointsInfo) GetZoneHints() sets.Set[string] {
|
||||
func (info *endpointInfo) GetZoneHints() sets.Set[string] {
|
||||
return sets.Set[string]{}
|
||||
}
|
||||
|
||||
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
|
||||
func (info *endpointsInfo) IP() string {
|
||||
func (info *endpointInfo) IP() string {
|
||||
return info.ip
|
||||
}
|
||||
|
||||
// Port returns just the Port part of the endpoint.
|
||||
func (info *endpointsInfo) Port() (int, error) {
|
||||
func (info *endpointInfo) Port() (int, error) {
|
||||
return int(info.port), nil
|
||||
}
|
||||
|
||||
// Equal is part of proxy.Endpoint interface.
|
||||
func (info *endpointsInfo) Equal(other proxy.Endpoint) bool {
|
||||
func (info *endpointInfo) Equal(other proxy.Endpoint) bool {
|
||||
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
|
||||
}
|
||||
|
||||
// GetNodeName returns the NodeName for this endpoint.
|
||||
func (info *endpointsInfo) GetNodeName() string {
|
||||
func (info *endpointInfo) GetNodeName() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetZone returns the Zone for this endpoint.
|
||||
func (info *endpointsInfo) GetZone() string {
|
||||
func (info *endpointInfo) GetZone() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
@ -407,7 +407,7 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName,
|
||||
if exists {
|
||||
// Cleanup Endpoints references
|
||||
for _, ep := range epInfos {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
|
||||
if ok {
|
||||
epInfo.Cleanup()
|
||||
@ -448,7 +448,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
|
||||
}
|
||||
}
|
||||
|
||||
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
||||
// returns a new proxy.Endpoint which abstracts a endpointInfo
|
||||
func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint {
|
||||
|
||||
portNumber, err := baseInfo.Port()
|
||||
@ -457,7 +457,7 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro
|
||||
portNumber = 0
|
||||
}
|
||||
|
||||
info := &endpointsInfo{
|
||||
info := &endpointInfo{
|
||||
ip: baseInfo.IP(),
|
||||
port: uint16(portNumber),
|
||||
isLocal: baseInfo.GetIsLocal(),
|
||||
@ -474,8 +474,8 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro
|
||||
return info
|
||||
}
|
||||
|
||||
func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointsInfo, error) {
|
||||
hnsEndpoint := &endpointsInfo{
|
||||
func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointInfo, error) {
|
||||
hnsEndpoint := &endpointInfo{
|
||||
ip: ip,
|
||||
isLocal: true,
|
||||
macAddress: mac,
|
||||
@ -489,15 +489,15 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string,
|
||||
return ep, err
|
||||
}
|
||||
|
||||
func (ep *endpointsInfo) DecrementRefCount() {
|
||||
klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointsInfo", ep)
|
||||
func (ep *endpointInfo) DecrementRefCount() {
|
||||
klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointInfo", ep)
|
||||
if !ep.GetIsLocal() && ep.refCount != nil && *ep.refCount > 0 {
|
||||
*ep.refCount--
|
||||
}
|
||||
}
|
||||
|
||||
func (ep *endpointsInfo) Cleanup() {
|
||||
klog.V(3).InfoS("Endpoint cleanup", "endpointsInfo", ep)
|
||||
func (ep *endpointInfo) Cleanup() {
|
||||
klog.V(3).InfoS("Endpoint cleanup", "endpointInfo", ep)
|
||||
if !ep.GetIsLocal() && ep.refCount != nil {
|
||||
*ep.refCount--
|
||||
|
||||
@ -601,7 +601,7 @@ type Proxier struct {
|
||||
// services that happened since policies were synced. For a single object,
|
||||
// changes are accumulated, i.e. previous is state from before all of them,
|
||||
// current is state after applying all of those.
|
||||
endpointsChanges *proxy.EndpointChangeTracker
|
||||
endpointsChanges *proxy.EndpointsChangeTracker
|
||||
serviceChanges *proxy.ServiceChangeTracker
|
||||
endPointsRefCount endPointsReferenceCountMap
|
||||
mu sync.Mutex // protects the following fields
|
||||
@ -802,7 +802,7 @@ func NewProxier(
|
||||
}
|
||||
|
||||
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
|
||||
endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
|
||||
endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
|
||||
proxier.endpointsChanges = endPointChangeTracker
|
||||
proxier.serviceChanges = serviceChanges
|
||||
|
||||
@ -868,7 +868,7 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapSt
|
||||
}
|
||||
// Cleanup Endpoints references
|
||||
for _, ep := range endpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if ok {
|
||||
if winProxyOptimization {
|
||||
epInfo.DecrementRefCount()
|
||||
@ -1062,7 +1062,7 @@ func isNetworkNotFoundError(err error) bool {
|
||||
// If atleast one is not terminating, then return false
|
||||
func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
|
||||
for _, epInfo := range proxier.endpointsMap[svcName] {
|
||||
ep, ok := epInfo.(*endpointsInfo)
|
||||
ep, ok := epInfo.(*endpointInfo)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@ -1087,7 +1087,7 @@ func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName,
|
||||
// If atleast one is serving, then return false
|
||||
func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
|
||||
for _, epInfo := range proxier.endpointsMap[svcName] {
|
||||
ep, ok := epInfo.(*endpointsInfo)
|
||||
ep, ok := epInfo.(*endpointInfo)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@ -1102,7 +1102,7 @@ func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName,
|
||||
}
|
||||
|
||||
// updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details
|
||||
func updateQueriedEndpoints(newHnsEndpoint *endpointsInfo, queriedEndpoints map[string]*endpointsInfo) {
|
||||
func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[string]*endpointInfo) {
|
||||
// store newly created endpoints in queriedEndpoints
|
||||
queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
|
||||
queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
|
||||
@ -1130,7 +1130,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
hnsNetworkName := proxier.network.name
|
||||
hns := proxier.hns
|
||||
|
||||
var gatewayHnsendpoint *endpointsInfo
|
||||
var gatewayHnsendpoint *endpointInfo
|
||||
if proxier.forwardHealthCheckVip {
|
||||
gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
|
||||
}
|
||||
@ -1153,7 +1153,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
||||
|
||||
deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
|
||||
// merge stale services gathered from updateEndpointsMap
|
||||
// merge stale services gathered from EndpointsMap.Update
|
||||
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
|
||||
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
||||
klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
|
||||
@ -1168,7 +1168,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
if queriedEndpoints == nil {
|
||||
klog.V(4).InfoS("No existing endpoints found in HNS")
|
||||
queriedEndpoints = make(map[string]*(endpointsInfo))
|
||||
queriedEndpoints = make(map[string]*(endpointInfo))
|
||||
}
|
||||
queriedLoadBalancers, err := hns.getAllLoadBalancers()
|
||||
if queriedLoadBalancers == nil {
|
||||
@ -1208,7 +1208,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
|
||||
if serviceVipEndpoint == nil {
|
||||
klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
|
||||
hnsEndpoint := &endpointsInfo{
|
||||
hnsEndpoint := &endpointInfo{
|
||||
ip: svcInfo.ClusterIP().String(),
|
||||
isLocal: false,
|
||||
macAddress: proxier.hostMac,
|
||||
@ -1228,8 +1228,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
var hnsEndpoints []endpointsInfo
|
||||
var hnsLocalEndpoints []endpointsInfo
|
||||
var hnsEndpoints []endpointInfo
|
||||
var hnsLocalEndpoints []endpointInfo
|
||||
klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName)
|
||||
// Create Remote endpoints for every endpoint, corresponding to the service
|
||||
containsPublicIP := false
|
||||
@ -1249,9 +1249,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
for _, epInfo := range proxier.endpointsMap[svcName] {
|
||||
ep, ok := epInfo.(*endpointsInfo)
|
||||
ep, ok := epInfo.(*endpointInfo)
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Failed to cast endpointsInfo", "serviceName", svcName)
|
||||
klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1274,7 +1274,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
}
|
||||
|
||||
var newHnsEndpoint *endpointsInfo
|
||||
var newHnsEndpoint *endpointInfo
|
||||
hnsNetworkName := proxier.network.name
|
||||
var err error
|
||||
|
||||
@ -1319,7 +1319,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
providerAddress = proxier.nodeIP.String()
|
||||
}
|
||||
|
||||
hnsEndpoint := &endpointsInfo{
|
||||
hnsEndpoint := &endpointInfo{
|
||||
ip: ep.ip,
|
||||
isLocal: false,
|
||||
macAddress: conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)),
|
||||
@ -1328,13 +1328,13 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Remote endpoint creation failed", "endpointsInfo", hnsEndpoint)
|
||||
klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint)
|
||||
continue
|
||||
}
|
||||
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
|
||||
} else {
|
||||
|
||||
hnsEndpoint := &endpointsInfo{
|
||||
hnsEndpoint := &endpointInfo{
|
||||
ip: ep.ip,
|
||||
isLocal: false,
|
||||
macAddress: ep.macAddress,
|
||||
@ -1371,7 +1371,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Save the hnsId for reference
|
||||
klog.V(1).InfoS("Hns endpoint resource", "endpointsInfo", newHnsEndpoint)
|
||||
klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint)
|
||||
|
||||
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
|
||||
if newHnsEndpoint.GetIsLocal() {
|
||||
@ -1384,10 +1384,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
ep.hnsID = newHnsEndpoint.hnsID
|
||||
|
||||
klog.V(3).InfoS("Endpoint resource found", "endpointsInfo", ep)
|
||||
klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep)
|
||||
}
|
||||
|
||||
klog.V(3).InfoS("Associated endpoints for service", "endpointsInfo", hnsEndpoints, "serviceName", svcName)
|
||||
klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName)
|
||||
|
||||
if len(svcInfo.hnsID) > 0 {
|
||||
// This should not happen
|
||||
@ -1399,7 +1399,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if len(hnsEndpoints) == 0 {
|
||||
if svcInfo.winProxyOptimization {
|
||||
// Deleting loadbalancers when there are no endpoints to serve.
|
||||
klog.V(3).InfoS("Cleanup existing ", "endpointsInfo", hnsEndpoints, "serviceName", svcName)
|
||||
klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName)
|
||||
svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers)
|
||||
}
|
||||
klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName)
|
||||
@ -1560,10 +1560,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
nodeport = svcInfo.HealthCheckNodePort()
|
||||
}
|
||||
|
||||
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointsInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
|
||||
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
|
||||
|
||||
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
|
||||
[]endpointsInfo{*gatewayHnsendpoint},
|
||||
[]endpointInfo{*gatewayHnsendpoint},
|
||||
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
|
||||
sourceVip,
|
||||
lbIngressIP.ip,
|
||||
@ -1623,7 +1623,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not.
|
||||
// If it is needed, the function will delete the existing loadbalancer and return true, else false.
|
||||
func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointsInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
|
||||
func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
|
||||
|
||||
if !winProxyOptimization || *lbHnsID == "" {
|
||||
// Loadbalancer delete not needed
|
||||
|
@ -123,8 +123,8 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
|
||||
}
|
||||
|
||||
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange)
|
||||
endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange)
|
||||
proxier.endpointsChanges = endpointChangeTracker
|
||||
endpointsChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange)
|
||||
proxier.endpointsChanges = endpointsChangeTracker
|
||||
proxier.serviceChanges = serviceChanges
|
||||
|
||||
return proxier
|
||||
@ -232,9 +232,9 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
|
||||
proxier.syncProxyRules()
|
||||
|
||||
ep := proxier.endpointsMap[svcPortName][0]
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
|
||||
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
|
||||
|
||||
} else {
|
||||
if epInfo.hnsID != "EPID-3" {
|
||||
@ -296,9 +296,9 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
|
||||
proxier.setInitialized(true)
|
||||
proxier.syncProxyRules()
|
||||
ep := proxier.endpointsMap[svcPortName][0]
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
|
||||
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
|
||||
|
||||
} else {
|
||||
if epInfo.hnsID != endpointGuid1 {
|
||||
@ -389,9 +389,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
|
||||
proxier.setInitialized(true)
|
||||
proxier.syncProxyRules()
|
||||
ep := proxier.endpointsMap[svcPortName1][0]
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
|
||||
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
|
||||
|
||||
} else {
|
||||
if epInfo.hnsID != endpointGuid1 {
|
||||
@ -439,9 +439,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
|
||||
proxier.syncProxyRules()
|
||||
|
||||
ep = proxier.endpointsMap[svcPortName1][0]
|
||||
epInfo, ok = ep.(*endpointsInfo)
|
||||
epInfo, ok = ep.(*endpointInfo)
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
|
||||
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
|
||||
|
||||
} else {
|
||||
if epInfo.hnsID != endpointGuid1 {
|
||||
@ -534,9 +534,9 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
|
||||
proxier.setInitialized(true)
|
||||
proxier.syncProxyRules()
|
||||
ep := proxier.endpointsMap[svcPortName1][0]
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
|
||||
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
|
||||
|
||||
} else {
|
||||
if epInfo.hnsID != endpointGuid1 {
|
||||
@ -613,10 +613,10 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
|
||||
proxier.syncProxyRules()
|
||||
|
||||
ep = proxier.endpointsMap[svcPortName1][0]
|
||||
epInfo, ok = ep.(*endpointsInfo)
|
||||
epInfo, ok = ep.(*endpointInfo)
|
||||
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
|
||||
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
|
||||
|
||||
} else {
|
||||
if epInfo.hnsID != endpointGuid1 {
|
||||
@ -913,9 +913,9 @@ func TestEndpointSlice(t *testing.T) {
|
||||
}
|
||||
|
||||
ep := proxier.endpointsMap[svcPortName][0]
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
epInfo, ok := ep.(*endpointInfo)
|
||||
if !ok {
|
||||
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
|
||||
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
|
||||
|
||||
} else {
|
||||
if epInfo.hnsID != "EPID-3" {
|
||||
|
Loading…
Reference in New Issue
Block a user