Clean shutdown of endpoints/endpointslice integration tests

This commit is contained in:
Wojciech Tyczyński 2022-05-28 21:51:14 +02:00
parent 3af4c74f37
commit c20f7cc4e1
4 changed files with 25 additions and 6 deletions

View File

@ -73,8 +73,6 @@ const (
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller { endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -173,6 +171,12 @@ type Controller struct {
// endpoints will be handled in parallel. // endpoints will be handled in parallel.
func (e *Controller) Run(ctx context.Context, workers int) { func (e *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Start events processing pipeline.
e.eventBroadcaster.StartStructuredLogging(0)
e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})
defer e.eventBroadcaster.Shutdown()
defer e.queue.ShutDown() defer e.queue.ShutDown()
klog.Infof("Starting endpoint controller") klog.Infof("Starting endpoint controller")

View File

@ -86,8 +86,6 @@ func NewController(podInformer coreinformers.PodInformer,
endpointUpdatesBatchPeriod time.Duration, endpointUpdatesBatchPeriod time.Duration,
) *Controller { ) *Controller {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -252,6 +250,12 @@ type Controller struct {
// Run will not return until stopCh is closed. // Run will not return until stopCh is closed.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) { func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Start events processing pipeline.
c.eventBroadcaster.StartLogging(klog.Infof)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
defer c.queue.ShutDown() defer c.queue.ShutDown()
klog.Infof("Starting endpoint slice controller") klog.Infof("Starting endpoint slice controller")

View File

@ -75,8 +75,6 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer,
endpointUpdatesBatchPeriod time.Duration, endpointUpdatesBatchPeriod time.Duration,
) *Controller { ) *Controller {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -207,6 +205,12 @@ type Controller struct {
// Run will not return until stopCh is closed. // Run will not return until stopCh is closed.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) { func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Start events processing pipeline.
c.eventBroadcaster.StartLogging(klog.Infof)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
defer c.queue.ShutDown() defer c.queue.ShutDown()
klog.Infof("Starting EndpointSliceMirroring controller") klog.Infof("Starting EndpointSliceMirroring controller")

View File

@ -28,9 +28,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
@ -58,6 +60,11 @@ func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMi
client, client,
batchPeriod) batchPeriod)
// The event processing pipeline is normally started via Run() method.
// However, since we don't start it in unit tests, we explicitly start it here.
esController.eventBroadcaster.StartLogging(klog.Infof)
esController.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
esController.endpointsSynced = alwaysReady esController.endpointsSynced = alwaysReady
esController.endpointSlicesSynced = alwaysReady esController.endpointSlicesSynced = alwaysReady
esController.servicesSynced = alwaysReady esController.servicesSynced = alwaysReady