Adding new EndpointsOverCapacity annotation for Endpoints controller

Now that the EndpointSlice API and controllers are GA, the Endpoints
controller will use this annotation to warn when Endpoints are over
capacity. In a future release, this warning will be replaced with
truncation.
This commit is contained in:
Rob Scott
2021-03-08 17:54:18 -08:00
parent 3f09686958
commit 8a3f72074e
4 changed files with 175 additions and 1 deletions

View File

@@ -60,6 +60,11 @@ const (
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
// maxCapacity represents the maximum number of addresses that should be
// stored in an Endpoints resource. In a future release, this controller
// may truncate endpoints exceeding this length.
maxCapacity = 1000
// TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints
// controller should go ahead and create endpoints for unready pods. This annotation is
// currently only used by StatefulSets, where we need the pod to be DNS
@@ -510,7 +515,8 @@ func (e *Controller) syncService(key string) error {
}
if !createEndpoints &&
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) {
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
@@ -528,6 +534,12 @@ func (e *Controller) syncService(key string) error {
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
}
if overCapacity(newEndpoints.Subsets) {
newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning"
} else {
delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
}
if newEndpoints.Labels == nil {
newEndpoints.Labels = make(map[string]string)
}
@@ -646,3 +658,24 @@ func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.E
}
return epp
}
// overCapacity returns true if there are more addresses in the provided subsets
// than the maxCapacity.
func overCapacity(subsets []v1.EndpointSubset) bool {
numEndpoints := 0
for _, subset := range subsets {
numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses)
}
return numEndpoints > maxCapacity
}
// capacityAnnotationSetCorrectly returns true if overCapacity() is true and the
// EndpointsOverCapacity annotation is set to "warning" or if overCapacity()
// is false and the annotation is not set.
func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
val, ok := annotations[v1.EndpointsOverCapacity]
if overCapacity(subsets) {
return ok && val == "warning"
}
return !ok
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package endpoint
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
@@ -35,6 +36,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clientscheme "k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@@ -225,6 +227,29 @@ func newController(url string, batchPeriod time.Duration) *endpointController {
}
}
func newFakeController(batchPeriod time.Duration) (*fake.Clientset, *endpointController) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
eController := NewEndpointController(
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Services(),
informerFactory.Core().V1().Endpoints(),
client,
batchPeriod)
eController.podsSynced = alwaysReady
eController.servicesSynced = alwaysReady
eController.endpointsSynced = alwaysReady
return client, &endpointController{
eController,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Core().V1().Services().Informer().GetStore(),
informerFactory.Core().V1().Endpoints().Informer().GetStore(),
}
}
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
@@ -1981,6 +2006,106 @@ func TestSyncEndpointsServiceNotFound(t *testing.T) {
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
}
func TestSyncServiceOverCapacity(t *testing.T) {
testCases := []struct {
name string
startingAnnotation *string
numExisting int
numDesired int
expectedAnnotation bool
}{{
name: "empty",
startingAnnotation: nil,
numExisting: 0,
numDesired: 0,
expectedAnnotation: false,
}, {
name: "annotation added past capacity",
startingAnnotation: nil,
numExisting: maxCapacity - 1,
numDesired: maxCapacity + 1,
expectedAnnotation: true,
}, {
name: "annotation removed below capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity - 1,
numDesired: maxCapacity - 1,
expectedAnnotation: false,
}, {
name: "annotation removed at capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity,
numDesired: maxCapacity,
expectedAnnotation: false,
}, {
name: "no endpoints change, annotation value corrected",
startingAnnotation: utilpointer.StringPtr("invalid"),
numExisting: maxCapacity + 1,
numDesired: maxCapacity + 1,
expectedAnnotation: true,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ns := "test"
client, c := newFakeController(0 * time.Second)
addPods(c.podStore, ns, tc.numDesired, 1, 0, ipv4only)
pods := c.podStore.List()
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
}
c.serviceStore.Add(svc)
subset := v1.EndpointSubset{}
for i := 0; i < tc.numExisting; i++ {
pod := pods[i].(*v1.Pod)
epa, _ := podToEndpointAddressForService(svc, pod)
subset.Addresses = append(subset.Addresses, *epa)
}
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{},
},
Subsets: []v1.EndpointSubset{subset},
}
if tc.startingAnnotation != nil {
endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation
}
c.endpointsStore.Add(endpoints)
client.CoreV1().Endpoints(ns).Create(context.TODO(), endpoints, metav1.CreateOptions{})
c.syncService(fmt.Sprintf("%s/%s", ns, svc.Name))
actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), endpoints.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("unexpected error getting endpoints: %v", err)
}
actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity]
if tc.expectedAnnotation {
if !ok {
t.Errorf("Expected EndpointsOverCapacity annotation to be set")
} else if actualAnnotation != "warning" {
t.Errorf("Expected EndpointsOverCapacity annotation to be 'warning', got %s", actualAnnotation)
}
} else {
if ok {
t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation)
}
}
})
}
}
func TestEndpointPortFromServicePort(t *testing.T) {
http := utilpointer.StringPtr("http")
testCases := map[string]struct {