Clean shutdown of volume integration tests

This commit is contained in:
Wojciech Tyczyński 2022-06-16 05:00:09 +02:00
parent 08f9125cb0
commit 13e4f2b554
5 changed files with 73 additions and 53 deletions

View File

@ -150,10 +150,8 @@ func NewAttachDetachController(
return nil, fmt.Errorf("could not initialize volume plugins for Attach/Detach Controller: %w", err) return nil, fmt.Errorf("could not initialize volume plugins for Attach/Detach Controller: %w", err)
} }
eventBroadcaster := record.NewBroadcaster() adc.broadcaster = record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0) recorder := adc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
blkutil := volumepathhandler.NewBlockVolumePathHandler() blkutil := volumepathhandler.NewBlockVolumePathHandler()
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
@ -302,8 +300,8 @@ type attachDetachController struct {
// populate the current pods using podInformer. // populate the current pods using podInformer.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
// recorder is used to record events in the API server // broadcaster is broadcasting events
recorder record.EventRecorder broadcaster record.EventBroadcaster
// pvcQueue is used to queue pvc objects // pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface pvcQueue workqueue.RateLimitingInterface
@ -322,6 +320,11 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer adc.pvcQueue.ShutDown() defer adc.pvcQueue.ShutDown()
// Start events processing pipeline.
adc.broadcaster.StartStructuredLogging(0)
adc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: adc.kubeClient.CoreV1().Events("")})
defer adc.broadcaster.Shutdown()
klog.Infof("Starting attach detach controller") klog.Infof("Starting attach detach controller")
defer klog.Infof("Shutting down attach detach controller") defer klog.Infof("Shutting down attach detach controller")
@ -910,7 +913,7 @@ func (adc *attachDetachController) GetNodeName() types.NodeName {
} }
func (adc *attachDetachController) GetEventRecorder() record.EventRecorder { func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
return adc.recorder return nil
} }
func (adc *attachDetachController) GetSubpather() subpath.Interface { func (adc *attachDetachController) GetSubpather() subpath.Interface {

View File

@ -166,6 +166,7 @@ type PersistentVolumeController struct {
NodeListerSynced cache.InformerSynced NodeListerSynced cache.InformerSynced
kubeClient clientset.Interface kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
cloud cloudprovider.Interface cloud cloudprovider.Interface
volumePluginMgr vol.VolumePluginMgr volumePluginMgr vol.VolumePluginMgr

View File

@ -81,17 +81,17 @@ type ControllerParameters struct {
// NewController creates a new PersistentVolume controller // NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) { func NewController(p ControllerParameters) (*PersistentVolumeController, error) {
eventRecorder := p.EventRecorder eventRecorder := p.EventRecorder
var eventBroadcaster record.EventBroadcaster
if eventRecorder == nil { if eventRecorder == nil {
broadcaster := record.NewBroadcaster() eventBroadcaster = record.NewBroadcaster()
broadcaster.StartStructuredLogging(0) eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
} }
controller := &PersistentVolumeController{ controller := &PersistentVolumeController{
volumes: newPersistentVolumeOrderedIndex(), volumes: newPersistentVolumeOrderedIndex(),
claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
kubeClient: p.KubeClient, kubeClient: p.KubeClient,
eventBroadcaster: eventBroadcaster,
eventRecorder: eventRecorder, eventRecorder: eventRecorder,
runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */), runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
cloud: p.Cloud, cloud: p.Cloud,
@ -308,6 +308,13 @@ func (ctrl *PersistentVolumeController) Run(ctx context.Context) {
defer ctrl.claimQueue.ShutDown() defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown() defer ctrl.volumeQueue.ShutDown()
// Start events processing pipeline.
if ctrl.eventBroadcaster != nil {
ctrl.eventBroadcaster.StartStructuredLogging(0)
ctrl.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ctrl.kubeClient.CoreV1().Events("")})
defer ctrl.eventBroadcaster.Shutdown()
}
klog.Infof("Starting persistent volume controller") klog.Infof("Starting persistent volume controller")
defer klog.Infof("Shutting down persistent volume controller") defer klog.Infof("Shutting down persistent volume controller")

View File

@ -155,7 +155,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
}, },
} }
testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t)
@ -167,23 +167,25 @@ func TestPodDeletionWithDswp(t *testing.T) {
t.Fatalf("Failed to created node : %v", err) t.Fatalf("Failed to created node : %v", err)
} }
stopCh := make(chan struct{}) // start controller loop
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go informers.Core().V1().Nodes().Informer().Run(stopCh) go informers.Core().V1().Nodes().Informer().Run(ctx.Done())
if _, err := testClient.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil { if _, err := testClient.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
t.Errorf("Failed to create pod : %v", err) t.Errorf("Failed to create pod : %v", err)
} }
podInformer := informers.Core().V1().Pods().Informer() podInformer := informers.Core().V1().Pods().Informer()
go podInformer.Run(podStopCh) go podInformer.Run(ctx.Done())
// start controller loop go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) initCSIObjects(ctx.Done(), informers)
initCSIObjects(stopCh, informers) go ctrl.Run(ctx.Done())
go ctrl.Run(stopCh) // Run pvCtrl to avoid leaking goroutines started during its creation.
defer close(stopCh) go pvCtrl.Run(ctx)
waitToObservePods(t, podInformer, 1) waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod) podKey, err := cache.MetaNamespaceKeyFunc(pod)
@ -230,13 +232,14 @@ func TestPodUpdateWithWithADC(t *testing.T) {
}, },
} }
testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t)
pod := fakePodWithVol(namespaceName) pod := fakePodWithVol(namespaceName)
podStopCh := make(chan struct{}) podStopCh := make(chan struct{})
defer close(podStopCh)
if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to created node : %v", err) t.Fatalf("Failed to created node : %v", err)
@ -252,12 +255,16 @@ func TestPodUpdateWithWithADC(t *testing.T) {
go podInformer.Run(podStopCh) go podInformer.Run(podStopCh)
// start controller loop // start controller loop
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) defer cancel()
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
initCSIObjects(stopCh, informers) go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go ctrl.Run(stopCh) go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx.Done())
// Run pvCtrl to avoid leaking goroutines started during its creation.
go pvCtrl.Run(ctx)
waitToObservePods(t, podInformer, 1) waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod) podKey, err := cache.MetaNamespaceKeyFunc(pod)
@ -280,9 +287,6 @@ func TestPodUpdateWithWithADC(t *testing.T) {
} }
waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected 0 pods in dsw after pod completion", 0) waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected 0 pods in dsw after pod completion", 0)
close(podStopCh)
close(stopCh)
} }
func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
@ -301,13 +305,14 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
}, },
} }
testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t)
pod := fakePodWithVol(namespaceName) pod := fakePodWithVol(namespaceName)
podStopCh := make(chan struct{}) podStopCh := make(chan struct{})
defer close(podStopCh)
if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to created node : %v", err) t.Fatalf("Failed to created node : %v", err)
@ -323,12 +328,16 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
go podInformer.Run(podStopCh) go podInformer.Run(podStopCh)
// start controller loop // start controller loop
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) defer cancel()
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
initCSIObjects(stopCh, informers) go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go ctrl.Run(stopCh) go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx.Done())
// Run pvCtrl to avoid leaking goroutines started during its creation.
go pvCtrl.Run(ctx)
waitToObservePods(t, podInformer, 1) waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod) podKey, err := cache.MetaNamespaceKeyFunc(pod)
@ -351,9 +360,6 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
} }
waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected non-zero pods in dsw if KeepTerminatedPodVolumesAnnotation is set", 1) waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected non-zero pods in dsw if KeepTerminatedPodVolumesAnnotation is set", 1)
close(podStopCh)
close(stopCh)
} }
// wait for the podInformer to observe the pods. Call this function before // wait for the podInformer to observe the pods. Call this function before
@ -481,7 +487,7 @@ func TestPodAddedByDswp(t *testing.T) {
}, },
}, },
} }
testClient, ctrl, _, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig) testClient, ctrl, pvCtrl, informers := createAdClients(t, server, defaultSyncPeriod, defaultTimerConfig)
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
defer framework.DeleteNamespaceOrDie(testClient, ns, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t)
@ -503,12 +509,16 @@ func TestPodAddedByDswp(t *testing.T) {
go podInformer.Run(podStopCh) go podInformer.Run(podStopCh)
// start controller loop // start controller loop
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) defer cancel()
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(ctx.Done())
initCSIObjects(stopCh, informers) go informers.Core().V1().PersistentVolumes().Informer().Run(ctx.Done())
go ctrl.Run(stopCh) go informers.Storage().V1().VolumeAttachments().Informer().Run(ctx.Done())
initCSIObjects(ctx.Done(), informers)
go ctrl.Run(ctx.Done())
// Run pvCtrl to avoid leaking goroutines started during its creation.
go pvCtrl.Run(ctx)
waitToObservePods(t, podInformer, 1) waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod) podKey, err := cache.MetaNamespaceKeyFunc(pod)
@ -538,8 +548,6 @@ func TestPodAddedByDswp(t *testing.T) {
// the findAndAddActivePods loop turns every 3 minute // the findAndAddActivePods loop turns every 3 minute
waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 200*time.Second, "expected 2 pods in dsw after pod addition", 2) waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 200*time.Second, "expected 2 pods in dsw after pod addition", 2)
close(stopCh)
} }
func TestPVCBoundWithADC(t *testing.T) { func TestPVCBoundWithADC(t *testing.T) {
@ -592,6 +600,8 @@ func TestPVCBoundWithADC(t *testing.T) {
// start controller loop // start controller loop
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
informers.WaitForCacheSync(ctx.Done()) informers.WaitForCacheSync(ctx.Done())
initCSIObjects(ctx.Done(), informers) initCSIObjects(ctx.Done(), informers)
@ -606,7 +616,6 @@ func TestPVCBoundWithADC(t *testing.T) {
createPVForPVC(t, testClient, pvc) createPVForPVC(t, testClient, pvc)
} }
waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 4 pods in dsw after PVCs are bound", 4) waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 4 pods in dsw after PVCs are bound", 4)
cancel()
} }
// Create PV for PVC, pv controller will bind them together. // Create PV for PVC, pv controller will bind them together.

View File

@ -699,7 +699,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
} }
klog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: claims are bound") klog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: claims are bound")
stopCh <- struct{}{} close(stopCh)
// check that everything is bound to something // check that everything is bound to something
for i := 0; i < objCount; i++ { for i := 0; i < objCount; i++ {