Updating EndpointSlice controller to wait for cache to be updated

This updates the EndpointSlice controller to make use of the
EndpointSlice tracker to identify when expected changes are not present
in the cache yet. If this is detected, the controller will wait to sync
until all expected updates have been received. This should help avoid
race conditions that would result in duplicate EndpointSlices or failed
attempts to update stale EndpointSlices. To simplify this logic, this
also moves the EndpointSlice tracker from relying on resource versions
to generations.
This commit is contained in:
Rob Scott
2020-07-28 18:14:40 -07:00
parent f79795d718
commit e1542606c2
8 changed files with 474 additions and 183 deletions

View File

@@ -1426,6 +1426,81 @@ func TestPodDeleteBatching(t *testing.T) {
}
}
func TestSyncServiceStaleInformer(t *testing.T) {
testcases := []struct {
name string
informerGenerationNumber int64
trackerGenerationNumber int64
expectError bool
}{
{
name: "informer cache outdated",
informerGenerationNumber: 10,
trackerGenerationNumber: 12,
expectError: true,
},
{
name: "cache and tracker synced",
informerGenerationNumber: 10,
trackerGenerationNumber: 10,
expectError: false,
},
{
name: "tracker outdated",
informerGenerationNumber: 10,
trackerGenerationNumber: 1,
expectError: false,
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
_, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
serviceName := "testing-1"
// Store Service in the cache
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
},
})
// Create EndpointSlice in the informer cache with informerGenerationNumber
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "matching-1",
Namespace: ns,
Generation: testcase.informerGenerationNumber,
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: controllerName,
},
},
AddressType: discovery.AddressTypeIPv4,
}
err := esController.endpointSliceStore.Add(epSlice1)
if err != nil {
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
}
// Create EndpointSlice in the tracker with trackerGenerationNumber
epSlice2 := epSlice1.DeepCopy()
epSlice2.Generation = testcase.trackerGenerationNumber
esController.endpointSliceTracker.Update(epSlice2)
err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
// Check if we got a StaleInformerCache error
if isStaleInformerCacheErr(err) != testcase.expectError {
t.Fatalf("Expected error because informer cache is outdated")
}
})
}
}
// Test helpers
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
t.Helper()