Adding new logging, event, and metric to better capture when mirroring addresses is skipped
This commit is contained in:
@@ -161,6 +161,7 @@ type Controller struct {
|
|||||||
// has been synced at least once. Added as a member to the struct to allow
|
// has been synced at least once. Added as a member to the struct to allow
|
||||||
// injection for testing.
|
// injection for testing.
|
||||||
endpointSlicesSynced cache.InformerSynced
|
endpointSlicesSynced cache.InformerSynced
|
||||||
|
|
||||||
// endpointSliceTracker tracks the list of EndpointSlices and associated
|
// endpointSliceTracker tracks the list of EndpointSlices and associated
|
||||||
// resource versions expected for each Endpoints resource. It can help
|
// resource versions expected for each Endpoints resource. It can help
|
||||||
// determine if a cached EndpointSlice is out of date.
|
// determine if a cached EndpointSlice is out of date.
|
||||||
|
@@ -172,7 +172,7 @@ func TestSyncEndpoints(t *testing.T) {
|
|||||||
}},
|
}},
|
||||||
},
|
},
|
||||||
endpointSlices: []*discovery.EndpointSlice{},
|
endpointSlices: []*discovery.EndpointSlice{},
|
||||||
expectedNumActions: 1,
|
expectedNumActions: 2, // extra action for creating warning event
|
||||||
expectedNumSlices: 1,
|
expectedNumSlices: 1,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
@@ -26,4 +26,8 @@ const (
|
|||||||
// InvalidIPAddress indicates that an IP address found in an Endpoints
|
// InvalidIPAddress indicates that an IP address found in an Endpoints
|
||||||
// resource is invalid.
|
// resource is invalid.
|
||||||
InvalidIPAddress = "InvalidIPAddress"
|
InvalidIPAddress = "InvalidIPAddress"
|
||||||
|
// TooManyAddressesToMirror indicates that some addresses were not mirrored
|
||||||
|
// due to an EndpointSubset containing more addresses to mirror than
|
||||||
|
// MaxEndpointsPerSubset allows.
|
||||||
|
TooManyAddressesToMirror = "TooManyAddressesToMirror"
|
||||||
)
|
)
|
||||||
|
@@ -64,6 +64,18 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{},
|
[]string{},
|
||||||
)
|
)
|
||||||
|
// AddressesSkippedPerSync tracks the number of addresses skipped on each
|
||||||
|
// Endpoints sync due to being invalid or exceeding MaxEndpointsPerSubset.
|
||||||
|
AddressesSkippedPerSync = metrics.NewHistogramVec(
|
||||||
|
&metrics.HistogramOpts{
|
||||||
|
Subsystem: EndpointSliceMirroringSubsystem,
|
||||||
|
Name: "addresses_skipped_per_sync",
|
||||||
|
Help: "Number of addresses skipped on each Endpoints sync due to being invalid or exceeding MaxEndpointsPerSubset",
|
||||||
|
StabilityLevel: metrics.ALPHA,
|
||||||
|
Buckets: metrics.ExponentialBuckets(2, 2, 15),
|
||||||
|
},
|
||||||
|
[]string{},
|
||||||
|
)
|
||||||
// EndpointsSyncDuration tracks how long syncEndpoints() takes in a number
|
// EndpointsSyncDuration tracks how long syncEndpoints() takes in a number
|
||||||
// of Seconds.
|
// of Seconds.
|
||||||
EndpointsSyncDuration = metrics.NewHistogramVec(
|
EndpointsSyncDuration = metrics.NewHistogramVec(
|
||||||
@@ -127,6 +139,7 @@ func RegisterMetrics() {
|
|||||||
legacyregistry.MustRegister(EndpointsAddedPerSync)
|
legacyregistry.MustRegister(EndpointsAddedPerSync)
|
||||||
legacyregistry.MustRegister(EndpointsUpdatedPerSync)
|
legacyregistry.MustRegister(EndpointsUpdatedPerSync)
|
||||||
legacyregistry.MustRegister(EndpointsRemovedPerSync)
|
legacyregistry.MustRegister(EndpointsRemovedPerSync)
|
||||||
|
legacyregistry.MustRegister(AddressesSkippedPerSync)
|
||||||
legacyregistry.MustRegister(EndpointsSyncDuration)
|
legacyregistry.MustRegister(EndpointsSyncDuration)
|
||||||
legacyregistry.MustRegister(EndpointsDesired)
|
legacyregistry.MustRegister(EndpointsDesired)
|
||||||
legacyregistry.MustRegister(NumEndpointSlices)
|
legacyregistry.MustRegister(NumEndpointSlices)
|
||||||
|
@@ -37,10 +37,24 @@ import (
|
|||||||
// desired state
|
// desired state
|
||||||
type reconciler struct {
|
type reconciler struct {
|
||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
maxEndpointsPerSubset int32
|
|
||||||
|
// endpointSliceTracker tracks the list of EndpointSlices and associated
|
||||||
|
// resource versions expected for each Endpoints resource. It can help
|
||||||
|
// determine if a cached EndpointSlice is out of date.
|
||||||
endpointSliceTracker *endpointSliceTracker
|
endpointSliceTracker *endpointSliceTracker
|
||||||
metricsCache *metrics.Cache
|
|
||||||
|
// eventRecorder allows reconciler to record an event if it finds an invalid
|
||||||
|
// IP address in an Endpoints resource.
|
||||||
eventRecorder record.EventRecorder
|
eventRecorder record.EventRecorder
|
||||||
|
|
||||||
|
// maxEndpointsPerSubset references the maximum number of endpoints that
|
||||||
|
// should be added to an EndpointSlice for an EndpointSubset. This allows
|
||||||
|
// for a simple 1:1 mapping between EndpointSubset and EndpointSlice.
|
||||||
|
maxEndpointsPerSubset int32
|
||||||
|
|
||||||
|
// metricsCache tracks values for total numbers of desired endpoints as well
|
||||||
|
// as the efficiency of EndpointSlice endpoints distribution
|
||||||
|
metricsCache *metrics.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
// reconcile takes an Endpoints resource and ensures that corresponding
|
// reconcile takes an Endpoints resource and ensures that corresponding
|
||||||
@@ -50,38 +64,65 @@ func (r *reconciler) reconcile(endpoints *corev1.Endpoints, existingSlices []*di
|
|||||||
// Calculate desired state.
|
// Calculate desired state.
|
||||||
d := newDesiredCalc()
|
d := newDesiredCalc()
|
||||||
|
|
||||||
|
numInvalidAddresses := 0
|
||||||
|
addressesSkipped := 0
|
||||||
|
|
||||||
for _, subset := range endpoints.Subsets {
|
for _, subset := range endpoints.Subsets {
|
||||||
multiKey := d.initPorts(subset.Ports)
|
multiKey := d.initPorts(subset.Ports)
|
||||||
|
|
||||||
totalAddresses := 0
|
totalAddresses := len(subset.Addresses) + len(subset.NotReadyAddresses)
|
||||||
numInvalidAddresses := 0
|
totalAddressesAdded := 0
|
||||||
|
|
||||||
for _, address := range subset.Addresses {
|
for _, address := range subset.Addresses {
|
||||||
totalAddresses++
|
// Break if we've reached the max number of addresses to mirror
|
||||||
if totalAddresses > int(r.maxEndpointsPerSubset) {
|
// per EndpointSubset. This allows for a simple 1:1 mapping between
|
||||||
|
// EndpointSubset and EndpointSlice.
|
||||||
|
if totalAddressesAdded >= int(r.maxEndpointsPerSubset) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if ok := d.addAddress(address, multiKey, true); !ok {
|
if ok := d.addAddress(address, multiKey, true); ok {
|
||||||
|
totalAddressesAdded++
|
||||||
|
} else {
|
||||||
numInvalidAddresses++
|
numInvalidAddresses++
|
||||||
klog.Warningf("Address in %s/%s Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice: %s", endpoints.Namespace, endpoints.Name, address.IP)
|
klog.Warningf("Address in %s/%s Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice: %s", endpoints.Namespace, endpoints.Name, address.IP)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, address := range subset.NotReadyAddresses {
|
for _, address := range subset.NotReadyAddresses {
|
||||||
totalAddresses++
|
// Break if we've reached the max number of addresses to mirror
|
||||||
if totalAddresses > int(r.maxEndpointsPerSubset) {
|
// per EndpointSubset. This allows for a simple 1:1 mapping between
|
||||||
|
// EndpointSubset and EndpointSlice.
|
||||||
|
if totalAddressesAdded >= int(r.maxEndpointsPerSubset) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if ok := d.addAddress(address, multiKey, false); !ok {
|
if ok := d.addAddress(address, multiKey, true); ok {
|
||||||
|
totalAddressesAdded++
|
||||||
|
} else {
|
||||||
numInvalidAddresses++
|
numInvalidAddresses++
|
||||||
klog.Warningf("Address in %s/%s Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice: %s", endpoints.Namespace, endpoints.Name, address.IP)
|
klog.Warningf("Address in %s/%s Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice: %s", endpoints.Namespace, endpoints.Name, address.IP)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addressesSkipped += totalAddresses - totalAddressesAdded
|
||||||
|
}
|
||||||
|
|
||||||
|
// This metric includes addresses skipped for being invalid or exceeding
|
||||||
|
// MaxEndpointsPerSubset.
|
||||||
|
metrics.AddressesSkippedPerSync.WithLabelValues().Observe(float64(addressesSkipped))
|
||||||
|
|
||||||
|
// Record an event on the Endpoints resource if we skipped mirroring for any
|
||||||
|
// invalid IP addresses.
|
||||||
if numInvalidAddresses > 0 {
|
if numInvalidAddresses > 0 {
|
||||||
r.eventRecorder.Eventf(endpoints, corev1.EventTypeWarning, InvalidIPAddress,
|
r.eventRecorder.Eventf(endpoints, corev1.EventTypeWarning, InvalidIPAddress,
|
||||||
"Skipped %d invalid IP addresses when mirroring to EndpointSlices", numInvalidAddresses)
|
"Skipped %d invalid IP addresses when mirroring to EndpointSlices", numInvalidAddresses)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record a separate event if we skipped mirroring due to the number of
|
||||||
|
// addresses exceeding MaxEndpointsPerSubset.
|
||||||
|
if addressesSkipped > numInvalidAddresses {
|
||||||
|
klog.Warningf("%d addresses in %s/%s Endpoints were skipped due to exceeding MaxEndpointsPerSubset", addressesSkipped, endpoints.Namespace, endpoints.Name)
|
||||||
|
r.eventRecorder.Eventf(endpoints, corev1.EventTypeWarning, TooManyAddressesToMirror,
|
||||||
|
"A max of %d addresses can be mirrored to EndpointSlices per Endpoints subset. %d addresses were skipped", r.maxEndpointsPerSubset, addressesSkipped)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build data structures for existing state.
|
// Build data structures for existing state.
|
||||||
|
@@ -533,7 +533,7 @@ func TestReconcile(t *testing.T) {
|
|||||||
existingEndpointSlices: []*discovery.EndpointSlice{},
|
existingEndpointSlices: []*discovery.EndpointSlice{},
|
||||||
expectedNumSlices: 2,
|
expectedNumSlices: 2,
|
||||||
expectedClientActions: 2,
|
expectedClientActions: 2,
|
||||||
expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 3, addedPerSync: 3, numCreated: 2},
|
expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 3, addedPerSync: 3, skippedPerSync: 2, numCreated: 2},
|
||||||
}, {
|
}, {
|
||||||
testName: "Endpoints with 2 subsets, multiple ports, all invalid addresses",
|
testName: "Endpoints with 2 subsets, multiple ports, all invalid addresses",
|
||||||
subsets: []corev1.EndpointSubset{{
|
subsets: []corev1.EndpointSubset{{
|
||||||
@@ -582,9 +582,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
existingEndpointSlices: []*discovery.EndpointSlice{},
|
existingEndpointSlices: []*discovery.EndpointSlice{},
|
||||||
expectedNumSlices: 0,
|
expectedNumSlices: 0,
|
||||||
expectedClientActions: 0,
|
expectedClientActions: 0,
|
||||||
expectedMetrics: &expectedMetrics{desiredSlices: 0, actualSlices: 0, desiredEndpoints: 0, addedPerSync: 0, numCreated: 0},
|
expectedMetrics: &expectedMetrics{desiredSlices: 0, actualSlices: 0, desiredEndpoints: 0, addedPerSync: 0, skippedPerSync: 5, numCreated: 0},
|
||||||
}, {
|
}, {
|
||||||
testName: "Endpoints with 2 subsets, multiple ports and addresses, existing EndpointSlice with some addresses",
|
testName: "Endpoints with 2 subsets, 1 exceeding maxEndpointsPerSubset",
|
||||||
subsets: []corev1.EndpointSubset{{
|
subsets: []corev1.EndpointSubset{{
|
||||||
Ports: []corev1.EndpointPort{{
|
Ports: []corev1.EndpointPort{{
|
||||||
Name: "http",
|
Name: "http",
|
||||||
@@ -632,7 +632,7 @@ func TestReconcile(t *testing.T) {
|
|||||||
expectedNumSlices: 2,
|
expectedNumSlices: 2,
|
||||||
expectedClientActions: 2,
|
expectedClientActions: 2,
|
||||||
maxEndpointsPerSubset: 2,
|
maxEndpointsPerSubset: 2,
|
||||||
expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 4, addedPerSync: 4, updatedPerSync: 0, removedPerSync: 0, numCreated: 2, numUpdated: 0},
|
expectedMetrics: &expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 4, addedPerSync: 4, updatedPerSync: 0, removedPerSync: 0, skippedPerSync: 1, numCreated: 2, numUpdated: 0},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
@@ -885,6 +885,7 @@ type expectedMetrics struct {
|
|||||||
addedPerSync int
|
addedPerSync int
|
||||||
updatedPerSync int
|
updatedPerSync int
|
||||||
removedPerSync int
|
removedPerSync int
|
||||||
|
skippedPerSync int
|
||||||
numCreated int
|
numCreated int
|
||||||
numUpdated int
|
numUpdated int
|
||||||
numDeleted int
|
numDeleted int
|
||||||
@@ -929,6 +930,12 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
|
|||||||
t.Errorf("Expected endpointsRemovedPerSync to be %d, got %v", em.removedPerSync, actualRemovedPerSync)
|
t.Errorf("Expected endpointsRemovedPerSync to be %d, got %v", em.removedPerSync, actualRemovedPerSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
actualSkippedPerSync, err := testutil.GetHistogramMetricValue(metrics.AddressesSkippedPerSync.WithLabelValues())
|
||||||
|
handleErr(t, err, "addressesSkippedPerSync")
|
||||||
|
if actualSkippedPerSync != float64(em.skippedPerSync) {
|
||||||
|
t.Errorf("Expected addressesSkippedPerSync to be %d, got %v", em.skippedPerSync, actualSkippedPerSync)
|
||||||
|
}
|
||||||
|
|
||||||
actualCreated, err := testutil.GetCounterMetricValue(metrics.EndpointSliceChanges.WithLabelValues("create"))
|
actualCreated, err := testutil.GetCounterMetricValue(metrics.EndpointSliceChanges.WithLabelValues("create"))
|
||||||
handleErr(t, err, "endpointSliceChangesCreated")
|
handleErr(t, err, "endpointSliceChangesCreated")
|
||||||
if actualCreated != float64(em.numCreated) {
|
if actualCreated != float64(em.numCreated) {
|
||||||
@@ -962,6 +969,7 @@ func setupMetrics() {
|
|||||||
metrics.EndpointsAddedPerSync.Delete(map[string]string{})
|
metrics.EndpointsAddedPerSync.Delete(map[string]string{})
|
||||||
metrics.EndpointsUpdatedPerSync.Delete(map[string]string{})
|
metrics.EndpointsUpdatedPerSync.Delete(map[string]string{})
|
||||||
metrics.EndpointsRemovedPerSync.Delete(map[string]string{})
|
metrics.EndpointsRemovedPerSync.Delete(map[string]string{})
|
||||||
|
metrics.AddressesSkippedPerSync.Delete(map[string]string{})
|
||||||
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
|
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
|
||||||
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
|
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
|
||||||
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
|
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
|
||||||
|
Reference in New Issue
Block a user