Merge pull request #30825 from wongma7/pv-controller-informer
Automatic merge from submit-queue Use PV shared informer in PV controller Use the PV shared informer, addressing (partially) https://github.com/kubernetes/kubernetes/issues/26247 . Using the PVC shared informer is not so simple because sometimes the controller wants to `Requeue` and...
This commit is contained in:
		@@ -430,7 +430,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
 | 
				
			|||||||
		ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
 | 
							ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
 | 
				
			||||||
		cloud,
 | 
							cloud,
 | 
				
			||||||
		s.ClusterName,
 | 
							s.ClusterName,
 | 
				
			||||||
		nil, // volumeSource
 | 
							sharedInformers.PersistentVolumes().Informer(),
 | 
				
			||||||
		nil, // claimSource
 | 
							nil, // claimSource
 | 
				
			||||||
		nil, // classSource
 | 
							nil, // classSource
 | 
				
			||||||
		nil, // eventRecorder
 | 
							nil, // eventRecorder
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -289,7 +289,7 @@ func (s *CMServer) Run(_ []string) error {
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
 | 
							glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	volumeController := persistentvolumecontroller.NewPersistentVolumeController(
 | 
						volumeController := persistentvolumecontroller.NewPersistentVolumeControllerFromClient(
 | 
				
			||||||
		clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
 | 
							clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
 | 
				
			||||||
		s.PVClaimBinderSyncPeriod.Duration,
 | 
							s.PVClaimBinderSyncPeriod.Duration,
 | 
				
			||||||
		alphaProvisioner,
 | 
							alphaProvisioner,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -154,8 +154,8 @@ const createProvisionedPVInterval = 10 * time.Second
 | 
				
			|||||||
// framework.Controllers that watch PersistentVolume and PersistentVolumeClaim
 | 
					// framework.Controllers that watch PersistentVolume and PersistentVolumeClaim
 | 
				
			||||||
// changes.
 | 
					// changes.
 | 
				
			||||||
type PersistentVolumeController struct {
 | 
					type PersistentVolumeController struct {
 | 
				
			||||||
	volumeController          *framework.Controller
 | 
						volumeController          framework.ControllerInterface
 | 
				
			||||||
	volumeSource              cache.ListerWatcher
 | 
						pvInformer                framework.SharedIndexInformer
 | 
				
			||||||
	claimController           *framework.Controller
 | 
						claimController           *framework.Controller
 | 
				
			||||||
	claimSource               cache.ListerWatcher
 | 
						claimSource               cache.ListerWatcher
 | 
				
			||||||
	classReflector            *cache.Reflector
 | 
						classReflector            *cache.Reflector
 | 
				
			||||||
@@ -176,6 +176,13 @@ type PersistentVolumeController struct {
 | 
				
			|||||||
	claims  cache.Store
 | 
						claims  cache.Store
 | 
				
			||||||
	classes cache.Store
 | 
						classes cache.Store
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// isInformerInternal is true if the informer we hold is a personal informer,
 | 
				
			||||||
 | 
						// false if it is a shared informer. If we're using a normal shared informer,
 | 
				
			||||||
 | 
						// then the informer will be started for us. If we have a personal informer,
 | 
				
			||||||
 | 
						// we must start it ourselves. If you start the controller using
 | 
				
			||||||
 | 
						// NewPersistentVolumeController(passing SharedInformer), this will be false.
 | 
				
			||||||
 | 
						isInformerInternal bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Map of scheduled/running operations.
 | 
						// Map of scheduled/running operations.
 | 
				
			||||||
	runningOperations goroutinemap.GoRoutineMap
 | 
						runningOperations goroutinemap.GoRoutineMap
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -38,6 +38,8 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/watch"
 | 
						"k8s.io/kubernetes/pkg/watch"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
						"github.com/golang/glog"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/controller/framework/informers"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/util/wait"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// This file contains the controller base functionality, i.e. framework to
 | 
					// This file contains the controller base functionality, i.e. framework to
 | 
				
			||||||
@@ -52,7 +54,8 @@ func NewPersistentVolumeController(
 | 
				
			|||||||
	volumePlugins []vol.VolumePlugin,
 | 
						volumePlugins []vol.VolumePlugin,
 | 
				
			||||||
	cloud cloudprovider.Interface,
 | 
						cloud cloudprovider.Interface,
 | 
				
			||||||
	clusterName string,
 | 
						clusterName string,
 | 
				
			||||||
	volumeSource, claimSource, classSource cache.ListerWatcher,
 | 
						pvInformer framework.SharedIndexInformer,
 | 
				
			||||||
 | 
						claimSource, classSource cache.ListerWatcher,
 | 
				
			||||||
	eventRecorder record.EventRecorder,
 | 
						eventRecorder record.EventRecorder,
 | 
				
			||||||
	enableDynamicProvisioning bool,
 | 
						enableDynamicProvisioning bool,
 | 
				
			||||||
) *PersistentVolumeController {
 | 
					) *PersistentVolumeController {
 | 
				
			||||||
@@ -84,17 +87,8 @@ func NewPersistentVolumeController(
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if volumeSource == nil {
 | 
						controller.pvInformer = pvInformer
 | 
				
			||||||
		volumeSource = &cache.ListWatch{
 | 
						controller.isInformerInternal = false
 | 
				
			||||||
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
					 | 
				
			||||||
				return kubeClient.Core().PersistentVolumes().List(options)
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
					 | 
				
			||||||
				return kubeClient.Core().PersistentVolumes().Watch(options)
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	controller.volumeSource = volumeSource
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if claimSource == nil {
 | 
						if claimSource == nil {
 | 
				
			||||||
		claimSource = &cache.ListWatch{
 | 
							claimSource = &cache.ListWatch{
 | 
				
			||||||
@@ -120,17 +114,8 @@ func NewPersistentVolumeController(
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	controller.classSource = classSource
 | 
						controller.classSource = classSource
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, controller.volumeController = framework.NewIndexerInformer(
 | 
						controller.volumeController = pvInformer.GetController()
 | 
				
			||||||
		volumeSource,
 | 
					
 | 
				
			||||||
		&api.PersistentVolume{},
 | 
					 | 
				
			||||||
		syncPeriod,
 | 
					 | 
				
			||||||
		framework.ResourceEventHandlerFuncs{
 | 
					 | 
				
			||||||
			AddFunc:    controller.addVolume,
 | 
					 | 
				
			||||||
			UpdateFunc: controller.updateVolume,
 | 
					 | 
				
			||||||
			DeleteFunc: controller.deleteVolume,
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		cache.Indexers{"accessmodes": accessModesIndexFunc},
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
	_, controller.claimController = framework.NewInformer(
 | 
						_, controller.claimController = framework.NewInformer(
 | 
				
			||||||
		claimSource,
 | 
							claimSource,
 | 
				
			||||||
		&api.PersistentVolumeClaim{},
 | 
							&api.PersistentVolumeClaim{},
 | 
				
			||||||
@@ -154,25 +139,55 @@ func NewPersistentVolumeController(
 | 
				
			|||||||
	return controller
 | 
						return controller
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewPersistentVolumeControllerFromClient returns a new
 | 
				
			||||||
 | 
					// *PersistentVolumeController that runs its own informer.
 | 
				
			||||||
 | 
					func NewPersistentVolumeControllerFromClient(
 | 
				
			||||||
 | 
						kubeClient clientset.Interface,
 | 
				
			||||||
 | 
						syncPeriod time.Duration,
 | 
				
			||||||
 | 
						alphaProvisioner vol.ProvisionableVolumePlugin,
 | 
				
			||||||
 | 
						volumePlugins []vol.VolumePlugin,
 | 
				
			||||||
 | 
						cloud cloudprovider.Interface,
 | 
				
			||||||
 | 
						clusterName string,
 | 
				
			||||||
 | 
						volumeSource, claimSource, classSource cache.ListerWatcher,
 | 
				
			||||||
 | 
						eventRecorder record.EventRecorder,
 | 
				
			||||||
 | 
						enableDynamicProvisioning bool,
 | 
				
			||||||
 | 
					) *PersistentVolumeController {
 | 
				
			||||||
 | 
						pvInformer := informers.NewPVInformer(kubeClient, syncPeriod)
 | 
				
			||||||
 | 
						if volumeSource != nil {
 | 
				
			||||||
 | 
							pvInformer = framework.NewSharedIndexInformer(volumeSource, &api.PersistentVolume{}, syncPeriod, cache.Indexers{"accessmodes": accessModesIndexFunc})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						ctrl := NewPersistentVolumeController(
 | 
				
			||||||
 | 
							kubeClient,
 | 
				
			||||||
 | 
							syncPeriod,
 | 
				
			||||||
 | 
							alphaProvisioner,
 | 
				
			||||||
 | 
							volumePlugins,
 | 
				
			||||||
 | 
							cloud,
 | 
				
			||||||
 | 
							clusterName,
 | 
				
			||||||
 | 
							pvInformer,
 | 
				
			||||||
 | 
							claimSource,
 | 
				
			||||||
 | 
							classSource,
 | 
				
			||||||
 | 
							eventRecorder,
 | 
				
			||||||
 | 
							enableDynamicProvisioning,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						ctrl.isInformerInternal = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return ctrl
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// initializeCaches fills all controller caches with initial data from etcd in
 | 
					// initializeCaches fills all controller caches with initial data from etcd in
 | 
				
			||||||
// order to have the caches already filled when first addClaim/addVolume to
 | 
					// order to have the caches already filled when first addClaim/addVolume to
 | 
				
			||||||
// perform initial synchronization of the controller.
 | 
					// perform initial synchronization of the controller.
 | 
				
			||||||
func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) {
 | 
					func (ctrl *PersistentVolumeController) initializeCaches(volumeStore cache.Store, claimSource cache.ListerWatcher) {
 | 
				
			||||||
	volumeListObj, err := volumeSource.List(api.ListOptions{})
 | 
						volumeList := volumeStore.List()
 | 
				
			||||||
	if err != nil {
 | 
						for _, obj := range volumeList {
 | 
				
			||||||
		glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
 | 
							volume, ok := obj.(*api.PersistentVolume)
 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	volumeList, ok := volumeListObj.(*api.PersistentVolumeList)
 | 
					 | 
				
			||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
		glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj)
 | 
								glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", obj)
 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	for _, volume := range volumeList.Items {
 | 
					 | 
				
			||||||
		// Ignore template volumes from kubernetes 1.2
 | 
							// Ignore template volumes from kubernetes 1.2
 | 
				
			||||||
		deleted := ctrl.upgradeVolumeFrom1_2(&volume)
 | 
							deleted := ctrl.upgradeVolumeFrom1_2(volume)
 | 
				
			||||||
		if !deleted {
 | 
							if !deleted {
 | 
				
			||||||
			clone, err := conversion.NewCloner().DeepCopy(&volume)
 | 
								clone, err := conversion.NewCloner().DeepCopy(volume)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				glog.Errorf("error cloning volume %q: %v", volume.Name, err)
 | 
									glog.Errorf("error cloning volume %q: %v", volume.Name, err)
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
@@ -444,7 +459,21 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
 | 
				
			|||||||
// Run starts all of this controller's control loops
 | 
					// Run starts all of this controller's control loops
 | 
				
			||||||
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
 | 
					func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
 | 
				
			||||||
	glog.V(4).Infof("starting PersistentVolumeController")
 | 
						glog.V(4).Infof("starting PersistentVolumeController")
 | 
				
			||||||
	ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
 | 
						if ctrl.isInformerInternal {
 | 
				
			||||||
 | 
							go ctrl.pvInformer.Run(stopCh)
 | 
				
			||||||
 | 
							// Wait to avoid data race between Run and AddEventHandler in tests
 | 
				
			||||||
 | 
							wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
				
			||||||
 | 
								return ctrl.pvInformer.HasSynced(), nil
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						ctrl.initializeCaches(ctrl.pvInformer.GetStore(), ctrl.claimSource)
 | 
				
			||||||
 | 
						// AddEventHandler will send synthetic add events which we don't want until
 | 
				
			||||||
 | 
						// we have initialized the caches
 | 
				
			||||||
 | 
						ctrl.pvInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
 | 
				
			||||||
 | 
							AddFunc:    ctrl.addVolume,
 | 
				
			||||||
 | 
							UpdateFunc: ctrl.updateVolume,
 | 
				
			||||||
 | 
							DeleteFunc: ctrl.deleteVolume,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
	go ctrl.volumeController.Run(stopCh)
 | 
						go ctrl.volumeController.Run(stopCh)
 | 
				
			||||||
	go ctrl.claimController.Run(stopCh)
 | 
						go ctrl.claimController.Run(stopCh)
 | 
				
			||||||
	go ctrl.classReflector.RunUntil(stopCh)
 | 
						go ctrl.classReflector.RunUntil(stopCh)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -594,7 +594,7 @@ func newTestController(kubeClient clientset.Interface, volumeSource, claimSource
 | 
				
			|||||||
	if classSource == nil {
 | 
						if classSource == nil {
 | 
				
			||||||
		classSource = framework.NewFakeControllerSource()
 | 
							classSource = framework.NewFakeControllerSource()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ctrl := NewPersistentVolumeController(
 | 
						ctrl := NewPersistentVolumeControllerFromClient(
 | 
				
			||||||
		kubeClient,
 | 
							kubeClient,
 | 
				
			||||||
		5*time.Second,        // sync period
 | 
							5*time.Second,        // sync period
 | 
				
			||||||
		nil,                  // alpha provisioner
 | 
							nil,                  // alpha provisioner
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1126,7 +1126,7 @@ func createClients(ns *api.Namespace, t *testing.T, s *httptest.Server, syncPeri
 | 
				
			|||||||
	cloud := &fake_cloud.FakeCloud{}
 | 
						cloud := &fake_cloud.FakeCloud{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	syncPeriod = getSyncPeriod(syncPeriod)
 | 
						syncPeriod = getSyncPeriod(syncPeriod)
 | 
				
			||||||
	ctrl := persistentvolumecontroller.NewPersistentVolumeController(
 | 
						ctrl := persistentvolumecontroller.NewPersistentVolumeControllerFromClient(
 | 
				
			||||||
		binderClient,
 | 
							binderClient,
 | 
				
			||||||
		syncPeriod,
 | 
							syncPeriod,
 | 
				
			||||||
		nil, // alpha provisioner
 | 
							nil, // alpha provisioner
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user