Implement simple endpoint slice batching

This commit is contained in:
Maciej Borsz
2020-03-02 21:00:06 +01:00
parent b6b494b448
commit 49b11b5431
11 changed files with 426 additions and 10 deletions

View File

@@ -9,6 +9,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/endpointslice/config",
visibility = ["//visibility:public"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library"],
)
filegroup(

View File

@@ -16,6 +16,10 @@ limitations under the License.
package config
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EndpointSliceControllerConfiguration contains elements describing
// EndpointSliceController.
type EndpointSliceControllerConfiguration struct {
@@ -28,4 +32,11 @@ type EndpointSliceControllerConfiguration struct {
// added to an EndpointSlice. More endpoints per slice will result in fewer
// and larger endpoint slices, but larger resources.
MaxEndpointsPerSlice int32
// EndpointUpdatesBatchPeriod can be used to batch endpoint updates.
// All updates of endpoint triggered by pod change will be delayed by up to
// 'EndpointUpdatesBatchPeriod'. If other pods in the same endpoint change
// in that period, they will be batched to a single endpoint update.
// Default 0 value means that each pod update triggers an endpoint update.
EndpointUpdatesBatchPeriod metav1.Duration
}

View File

@@ -61,12 +61,14 @@ func RegisterConversions(s *runtime.Scheme) error {
func autoConvert_v1alpha1_EndpointSliceControllerConfiguration_To_config_EndpointSliceControllerConfiguration(in *v1alpha1.EndpointSliceControllerConfiguration, out *config.EndpointSliceControllerConfiguration, s conversion.Scope) error {
out.ConcurrentServiceEndpointSyncs = in.ConcurrentServiceEndpointSyncs
out.MaxEndpointsPerSlice = in.MaxEndpointsPerSlice
out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod
return nil
}
func autoConvert_config_EndpointSliceControllerConfiguration_To_v1alpha1_EndpointSliceControllerConfiguration(in *config.EndpointSliceControllerConfiguration, out *v1alpha1.EndpointSliceControllerConfiguration, s conversion.Scope) error {
out.ConcurrentServiceEndpointSyncs = in.ConcurrentServiceEndpointSyncs
out.MaxEndpointsPerSlice = in.MaxEndpointsPerSlice
out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod
return nil
}

View File

@@ -23,6 +23,7 @@ package config
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EndpointSliceControllerConfiguration) DeepCopyInto(out *EndpointSliceControllerConfiguration) {
*out = *in
out.EndpointUpdatesBatchPeriod = in.EndpointUpdatesBatchPeriod
return
}

View File

@@ -66,6 +66,7 @@ func NewController(podInformer coreinformers.PodInformer,
endpointSliceInformer discoveryinformers.EndpointSliceInformer,
maxEndpointsPerSlice int32,
client clientset.Interface,
endpointUpdatesBatchPeriod time.Duration,
) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
@@ -129,6 +130,7 @@ func NewController(podInformer coreinformers.PodInformer,
c.eventBroadcaster = broadcaster
c.eventRecorder = recorder
c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
c.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
return c
@@ -194,6 +196,10 @@ type Controller struct {
// process the queue of service and pod changes
workerLoopPeriod time.Duration
// endpointUpdatesBatchPeriod is an artificial delay added to all service syncs triggered by pod changes.
// This can be used to reduce overall number of all endpoint slice updates.
endpointUpdatesBatchPeriod time.Duration
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
// to AsSelectorPreValidated (see #73527)
serviceSelectorCache *endpointutil.ServiceSelectorCache
@@ -414,14 +420,14 @@ func (c *Controller) addPod(obj interface{}) {
return
}
for key := range services {
c.queue.Add(key)
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}
func (c *Controller) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged)
for key := range services {
c.queue.Add(key)
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"testing"
"time"
@@ -51,7 +52,7 @@ type endpointSliceController struct {
serviceStore cache.Store
}
func newController(nodeNames []string) (*fake.Clientset, *endpointSliceController) {
func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceController) {
client := newClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes()
@@ -66,7 +67,8 @@ func newController(nodeNames []string) (*fake.Clientset, *endpointSliceControlle
nodeInformer,
informerFactory.Discovery().V1beta1().EndpointSlices(),
int32(100),
client)
client,
batchPeriod)
esController.nodesSynced = alwaysReady
esController.podsSynced = alwaysReady
@@ -86,7 +88,7 @@ func newController(nodeNames []string) (*fake.Clientset, *endpointSliceControlle
func TestSyncServiceNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault
serviceName := "testing-1"
client, esController := newController([]string{"node-1"})
client, esController := newController([]string{"node-1"}, time.Duration(0))
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
Spec: v1.ServiceSpec{
@@ -103,7 +105,7 @@ func TestSyncServiceNoSelector(t *testing.T) {
func TestSyncServiceWithSelector(t *testing.T) {
ns := metav1.NamespaceDefault
serviceName := "testing-1"
client, esController := newController([]string{"node-1"})
client, esController := newController([]string{"node-1"}, time.Duration(0))
standardSyncService(t, esController, ns, serviceName, "true")
expectActions(t, client.Actions(), 1, "create", "endpointslices")
@@ -123,7 +125,7 @@ func TestSyncServiceWithSelector(t *testing.T) {
// remove too much.
func TestSyncServiceMissing(t *testing.T) {
namespace := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"})
client, esController := newController([]string{"node-1"}, time.Duration(0))
// Build up existing service
existingServiceName := "stillthere"
@@ -159,7 +161,7 @@ func TestSyncServiceMissing(t *testing.T) {
// Ensure SyncService correctly selects Pods.
func TestSyncServicePodSelection(t *testing.T) {
client, esController := newController([]string{"node-1"})
client, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
pod1 := newPod(1, ns, true, 0)
@@ -186,7 +188,7 @@ func TestSyncServicePodSelection(t *testing.T) {
// Ensure SyncService correctly selects and labels EndpointSlices.
func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
client, esController := newController([]string{"node-1"})
client, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
serviceName := "testing-1"
@@ -274,7 +276,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
// Ensure SyncService handles a variety of protocols and IPs appropriately.
func TestSyncServiceFull(t *testing.T) {
client, esController := newController([]string{"node-1"})
client, esController := newController([]string{"node-1"}, time.Duration(0))
namespace := metav1.NamespaceDefault
serviceName := "all-the-protocols"
ipv6Family := v1.IPv6Protocol
@@ -345,7 +347,389 @@ func TestSyncServiceFull(t *testing.T) {
}}, slice.Endpoints)
}
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodAddsBatching(t *testing.T) {
type podAdd struct {
delay time.Duration
}
tests := []struct {
name string
batchPeriod time.Duration
adds []podAdd
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three adds with no batching",
batchPeriod: 0 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three adds in one batch",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three adds in two batches",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 1 * time.Second,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for i, add := range tc.adds {
time.Sleep(add.delay)
p := newPod(i, ns, true, 0)
esController.podStore.Add(p)
esController.addPod(p)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodUpdatesBatching(t *testing.T) {
resourceVersion := 1
type podUpdate struct {
delay time.Duration
podName string
podIP string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
updates []podUpdate
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three updates with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three updates in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three updates in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 1 * time.Second,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
addPods(t, esController, ns, tc.podsCount)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.updates {
time.Sleep(update.delay)
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
if err != nil {
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
}
if !exists {
t.Fatalf("Pod %q doesn't exist", update.podName)
}
oldPod := old.(*v1.Pod)
newPod := oldPod.DeepCopy()
newPod.Status.PodIPs[0].IP = update.podIP
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
resourceVersion++
esController.podStore.Update(newPod)
esController.updatePod(oldPod, newPod)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodDeleteBatching(t *testing.T) {
type podDelete struct {
delay time.Duration
podName string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
deletes []podDelete
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three deletes with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three deletes in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three deletes in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 1 * time.Second,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
addPods(t, esController, ns, tc.podsCount)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.deletes {
time.Sleep(update.delay)
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
assert.Nil(t, err, "error while retrieving old value of %q: %v", update.podName, err)
assert.Equal(t, true, exists, "pod should exist")
esController.podStore.Delete(old)
esController.deletePod(old)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// Test helpers
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
t.Helper()
for i := 0; i < podsCount; i++ {
pod := newPod(i, namespace, true, 0)
esController.podStore.Add(pod)
}
}
func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName, managedBySetup string) {
t.Helper()