diff --git a/pkg/controller/endpointslice/BUILD b/pkg/controller/endpointslice/BUILD index 38b71d92328..11a722fe89b 100644 --- a/pkg/controller/endpointslice/BUILD +++ b/pkg/controller/endpointslice/BUILD @@ -5,6 +5,7 @@ go_library( srcs = [ "endpointset.go", "endpointslice_controller.go", + "endpointslice_tracker.go", "reconciler.go", "utils.go", ], @@ -49,6 +50,7 @@ go_test( name = "go_default_test", srcs = [ "endpointslice_controller_test.go", + "endpointslice_tracker_test.go", "reconciler_test.go", "utils_test.go", ], diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index b0d6aeeb06a..33918860084 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -63,7 +63,7 @@ const ( func NewController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, nodeInformer coreinformers.NodeInformer, - esInformer discoveryinformers.EndpointSliceInformer, + endpointSliceInformer discoveryinformers.EndpointSliceInformer, maxEndpointsPerSlice int32, client clientset.Interface, ) *Controller { @@ -105,8 +105,15 @@ func NewController(podInformer coreinformers.PodInformer, c.nodeLister = nodeInformer.Lister() c.nodesSynced = nodeInformer.Informer().HasSynced - c.endpointSliceLister = esInformer.Lister() - c.endpointSlicesSynced = esInformer.Informer().HasSynced + endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.onEndpointSliceAdd, + UpdateFunc: c.onEndpointSliceUpdate, + DeleteFunc: c.onEndpointSliceDelete, + }) + + c.endpointSliceLister = endpointSliceInformer.Lister() + c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced + c.endpointSliceTracker = newEndpointSliceTracker() c.maxEndpointsPerSlice = maxEndpointsPerSlice @@ -114,6 +121,7 @@ func NewController(podInformer coreinformers.PodInformer, client: c.client, nodeLister: c.nodeLister, maxEndpointsPerSlice: c.maxEndpointsPerSlice, + endpointSliceTracker: c.endpointSliceTracker, metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice), } c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker() @@ -152,6 +160,10 @@ type Controller struct { // endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once. // Added as a member to the struct to allow injection for testing. endpointSlicesSynced cache.InformerSynced + // endpointSliceTracker tracks the list of EndpointSlices and associated + // resource versions expected for each Service. It can help determine if a + // cached EndpointSlice is out of date. + endpointSliceTracker *endpointSliceTracker // nodeLister is able to list/get nodes and is populated by the // shared informer passed to NewController @@ -343,6 +355,57 @@ func (c *Controller) onServiceDelete(obj interface{}) { c.queue.Add(key) } +// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the +// EndpointSlice resource version does not match the expected version in the +// endpointSliceTracker. +func (c *Controller) onEndpointSliceAdd(obj interface{}) { + endpointSlice := obj.(*discovery.EndpointSlice) + if endpointSlice == nil { + utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()")) + return + } + if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) { + c.queueServiceForEndpointSlice(endpointSlice) + } +} + +// onEndpointSliceUpdate queues a sync for the relevant Service for a sync if +// the EndpointSlice resource version does not match the expected version in the +// endpointSliceTracker or the managed-by value of the EndpointSlice has changed +// from or to this controller. +func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) { + prevEndpointSlice := obj.(*discovery.EndpointSlice) + endpointSlice := obj.(*discovery.EndpointSlice) + if endpointSlice == nil || prevEndpointSlice == nil { + utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()")) + return + } + if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) { + c.queueServiceForEndpointSlice(endpointSlice) + } +} + +// onEndpointSliceDelete queues a sync for the relevant Service for a sync if the +// EndpointSlice resource version does not match the expected version in the +// endpointSliceTracker. +func (c *Controller) onEndpointSliceDelete(obj interface{}) { + endpointSlice := getEndpointSliceFromDeleteAction(obj) + if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) { + c.queueServiceForEndpointSlice(endpointSlice) + } +} + +// queueServiceForEndpointSlice attempts to queue the corresponding Service for +// the provided EndpointSlice. +func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) { + key, err := serviceControllerKey(endpointSlice) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err)) + return + } + c.queue.Add(key) +} + func (c *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod) diff --git a/pkg/controller/endpointslice/endpointslice_tracker.go b/pkg/controller/endpointslice/endpointslice_tracker.go new file mode 100644 index 00000000000..7a0680ab739 --- /dev/null +++ b/pkg/controller/endpointslice/endpointslice_tracker.go @@ -0,0 +1,123 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpointslice + +import ( + "sync" + + discovery "k8s.io/api/discovery/v1beta1" + "k8s.io/apimachinery/pkg/types" +) + +// endpointSliceResourceVersions tracks expected EndpointSlice resource versions +// by EndpointSlice name. +type endpointSliceResourceVersions map[string]string + +// endpointSliceTracker tracks EndpointSlices and their associated resource +// versions to help determine if a change to an EndpointSlice has been processed +// by the EndpointSlice controller. +type endpointSliceTracker struct { + // lock protects resourceVersionsByService. + lock sync.Mutex + // resourceVersionsByService tracks the list of EndpointSlices and + // associated resource versions expected for a given Service. + resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions +} + +// newEndpointSliceTracker creates and initializes a new endpointSliceTracker. +func newEndpointSliceTracker() *endpointSliceTracker { + return &endpointSliceTracker{ + resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{}, + } +} + +// Has returns true if the endpointSliceTracker has a resource version for the +// provided EndpointSlice. +func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool { + est.lock.Lock() + defer est.lock.Unlock() + + rrv := est.relatedResourceVersions(endpointSlice) + _, ok := rrv[endpointSlice.Name] + return ok +} + +// Stale returns true if this endpointSliceTracker does not have a resource +// version for the provided EndpointSlice or it does not match the resource +// version of the provided EndpointSlice. +func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool { + est.lock.Lock() + defer est.lock.Unlock() + + rrv := est.relatedResourceVersions(endpointSlice) + return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion +} + +// Update adds or updates the resource version in this endpointSliceTracker for +// the provided EndpointSlice. +func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) { + est.lock.Lock() + defer est.lock.Unlock() + + rrv := est.relatedResourceVersions(endpointSlice) + rrv[endpointSlice.Name] = endpointSlice.ResourceVersion +} + +// Delete removes the resource version in this endpointSliceTracker for the +// provided EndpointSlice. +func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) { + est.lock.Lock() + defer est.lock.Unlock() + + rrv := est.relatedResourceVersions(endpointSlice) + delete(rrv, endpointSlice.Name) +} + +// relatedResourceVersions returns the set of resource versions tracked for the +// Service corresponding to the provided EndpointSlice. If no resource versions +// are currently tracked for this service, an empty set is initialized. +func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions { + serviceNN := getServiceNN(endpointSlice) + vers, ok := est.resourceVersionsByService[serviceNN] + + if !ok { + vers = endpointSliceResourceVersions{} + est.resourceVersionsByService[serviceNN] = vers + } + + return vers +} + +// getServiceNN returns a namespaced name for the Service corresponding to the +// provided EndpointSlice. +func getServiceNN(endpointSlice *discovery.EndpointSlice) types.NamespacedName { + serviceName, _ := endpointSlice.Labels[discovery.LabelServiceName] + return types.NamespacedName{Name: serviceName, Namespace: endpointSlice.Namespace} +} + +// managedByChanged returns true if one of the provided EndpointSlices is +// managed by the EndpointSlice controller while the other is not. +func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool { + return managedByController(endpointSlice1) != managedByController(endpointSlice2) +} + +// managedByController returns true if the controller of the provided +// EndpointSlices is the EndpointSlice controller. +func managedByController(endpointSlice *discovery.EndpointSlice) bool { + managedBy, _ := endpointSlice.Labels[discovery.LabelManagedBy] + return managedBy == controllerName +} diff --git a/pkg/controller/endpointslice/endpointslice_tracker_test.go b/pkg/controller/endpointslice/endpointslice_tracker_test.go new file mode 100644 index 00000000000..0ad704e2d11 --- /dev/null +++ b/pkg/controller/endpointslice/endpointslice_tracker_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpointslice + +import ( + "testing" + + discovery "k8s.io/api/discovery/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEndpointSliceTrackerUpdate(t *testing.T) { + epSlice1 := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-1", + Namespace: "ns1", + ResourceVersion: "rv1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + } + + epSlice1DifferentNS := epSlice1.DeepCopy() + epSlice1DifferentNS.Namespace = "ns2" + + epSlice1DifferentService := epSlice1.DeepCopy() + epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2" + + epSlice1DifferentRV := epSlice1.DeepCopy() + epSlice1DifferentRV.ResourceVersion = "rv2" + + testCases := map[string]struct { + updateParam *discovery.EndpointSlice + checksParam *discovery.EndpointSlice + expectHas bool + expectStale bool + }{ + "same slice": { + updateParam: epSlice1, + checksParam: epSlice1, + expectHas: true, + expectStale: false, + }, + "different namespace": { + updateParam: epSlice1, + checksParam: epSlice1DifferentNS, + expectHas: false, + expectStale: true, + }, + "different service": { + updateParam: epSlice1, + checksParam: epSlice1DifferentService, + expectHas: false, + expectStale: true, + }, + "different resource version": { + updateParam: epSlice1, + checksParam: epSlice1DifferentRV, + expectHas: true, + expectStale: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + esTracker := newEndpointSliceTracker() + esTracker.Update(tc.updateParam) + if esTracker.Has(tc.checksParam) != tc.expectHas { + t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas) + } + if esTracker.Stale(tc.checksParam) != tc.expectStale { + t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale) + } + }) + } +} + +func TestEndpointSliceTrackerDelete(t *testing.T) { + epSlice1 := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-1", + Namespace: "ns1", + ResourceVersion: "rv1", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + }, + } + + epSlice1DifferentNS := epSlice1.DeepCopy() + epSlice1DifferentNS.Namespace = "ns2" + + epSlice1DifferentService := epSlice1.DeepCopy() + epSlice1DifferentService.Labels[discovery.LabelServiceName] = "svc2" + + epSlice1DifferentRV := epSlice1.DeepCopy() + epSlice1DifferentRV.ResourceVersion = "rv2" + + testCases := map[string]struct { + deleteParam *discovery.EndpointSlice + checksParam *discovery.EndpointSlice + expectHas bool + expectStale bool + }{ + "same slice": { + deleteParam: epSlice1, + checksParam: epSlice1, + expectHas: false, + expectStale: true, + }, + "different namespace": { + deleteParam: epSlice1DifferentNS, + checksParam: epSlice1DifferentNS, + expectHas: false, + expectStale: true, + }, + "different namespace, check original ep slice": { + deleteParam: epSlice1DifferentNS, + checksParam: epSlice1, + expectHas: true, + expectStale: false, + }, + "different service": { + deleteParam: epSlice1DifferentService, + checksParam: epSlice1DifferentService, + expectHas: false, + expectStale: true, + }, + "different service, check original ep slice": { + deleteParam: epSlice1DifferentService, + checksParam: epSlice1, + expectHas: true, + expectStale: false, + }, + "different resource version": { + deleteParam: epSlice1DifferentRV, + checksParam: epSlice1DifferentRV, + expectHas: false, + expectStale: true, + }, + "different resource version, check original ep slice": { + deleteParam: epSlice1DifferentRV, + checksParam: epSlice1, + expectHas: false, + expectStale: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + esTracker := newEndpointSliceTracker() + esTracker.Update(epSlice1) + + esTracker.Delete(tc.deleteParam) + if esTracker.Has(tc.checksParam) != tc.expectHas { + t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas) + } + if esTracker.Stale(tc.checksParam) != tc.expectStale { + t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale) + } + }) + } +} diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index ff0193058d5..24496e41c80 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -40,6 +40,7 @@ type reconciler struct { client clientset.Interface nodeLister corelisters.NodeLister maxEndpointsPerSlice int32 + endpointSliceTracker *endpointSliceTracker metricsCache *metrics.Cache } @@ -212,6 +213,7 @@ func (r *reconciler) finalize( } errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err)) } else { + r.endpointSliceTracker.Update(endpointSlice) metrics.EndpointSliceChanges.WithLabelValues("create").Inc() } } @@ -222,6 +224,7 @@ func (r *reconciler) finalize( if err != nil { errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)) } else { + r.endpointSliceTracker.Update(endpointSlice) metrics.EndpointSliceChanges.WithLabelValues("update").Inc() } } @@ -231,6 +234,7 @@ func (r *reconciler) finalize( if err != nil { errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)) } else { + r.endpointSliceTracker.Delete(endpointSlice) metrics.EndpointSliceChanges.WithLabelValues("delete").Inc() } } diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index a746f8b42e1..18331b4accc 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -752,6 +752,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer client: client, nodeLister: corelisters.NewNodeLister(indexer), maxEndpointsPerSlice: maxEndpointsPerSlice, + endpointSliceTracker: newEndpointSliceTracker(), metricsCache: metrics.NewCache(maxEndpointsPerSlice), } } diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index 88fedf15b6a..9cd89d8d145 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -26,6 +26,8 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" @@ -236,6 +238,27 @@ func getSliceToFill(endpointSlices []*discovery.EndpointSlice, numEndpoints, max return closestSlice } +// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action. +func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice { + if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok { + // Enqueue all the services that the pod used to be a member of. + // This is the same thing we do when we add a pod. + return endpointSlice + } + // If we reached here it means the pod was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return nil + } + endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj)) + return nil + } + return endpointSlice +} + // addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) { if endpointSlice.Annotations == nil { @@ -249,6 +272,19 @@ func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTim } } +// serviceControllerKey returns a controller key for a Service but derived from +// an EndpointSlice. +func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) { + if endpointSlice == nil { + return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()") + } + serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName] + if !ok || serviceName == "" { + return "", fmt.Errorf("EndpointSlice missing %s label", discovery.LabelServiceName) + } + return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil +} + // endpointSliceEndpointLen helps sort endpoint slices by the number of // endpoints they contain. type endpointSliceEndpointLen []*discovery.EndpointSlice diff --git a/pkg/controller/endpointslice/utils_test.go b/pkg/controller/endpointslice/utils_test.go index a9c60026ef2..4d9b19e815e 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/pkg/controller/endpointslice/utils_test.go @@ -310,6 +310,49 @@ func TestPodChangedWithPodEndpointChanged(t *testing.T) { newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy() } +func TestServiceControllerKey(t *testing.T) { + testCases := map[string]struct { + endpointSlice *discovery.EndpointSlice + expectedKey string + expectedErr error + }{ + "nil EndpointSlice": { + endpointSlice: nil, + expectedKey: "", + expectedErr: fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()"), + }, + "empty EndpointSlice": { + endpointSlice: &discovery.EndpointSlice{}, + expectedKey: "", + expectedErr: fmt.Errorf("EndpointSlice missing kubernetes.io/service-name label"), + }, + "valid EndpointSlice": { + endpointSlice: &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Labels: map[string]string{ + discovery.LabelServiceName: "svc", + }, + }, + }, + expectedKey: "ns/svc", + expectedErr: nil, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + actualKey, actualErr := serviceControllerKey(tc.endpointSlice) + if !reflect.DeepEqual(actualErr, tc.expectedErr) { + t.Errorf("Expected %s, got %s", tc.expectedErr, actualErr) + } + if actualKey != tc.expectedKey { + t.Errorf("Expected %s, got %s", tc.expectedKey, actualKey) + } + }) + } +} + // Test helpers func newPod(n int, namespace string, ready bool, nPorts int) *v1.Pod {