Switch namespace controller to shared informer
This commit is contained in:
@@ -17,18 +17,20 @@ limitations under the License.
|
||||
package namespace
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
|
||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/namespace/deletion"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
@@ -52,10 +54,10 @@ type NamespaceController struct {
|
||||
kubeClient clientset.Interface
|
||||
// clientPool manages a pool of dynamic clients
|
||||
clientPool dynamic.ClientPool
|
||||
// store that holds the namespaces
|
||||
store cache.Store
|
||||
// controller that observes the namespaces
|
||||
controller cache.Controller
|
||||
// lister that can list namespaces from a shared cache
|
||||
lister corelisters.NamespaceLister
|
||||
// returns true when the namespace cache is ready
|
||||
listerSynced cache.InformerSynced
|
||||
// namespaces that have been queued up for processing by workers
|
||||
queue workqueue.RateLimitingInterface
|
||||
// function to list of preferred resources for namespace deletion
|
||||
@@ -71,6 +73,7 @@ func NewNamespaceController(
|
||||
kubeClient clientset.Interface,
|
||||
clientPool dynamic.ClientPool,
|
||||
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
|
||||
namespaceInformer coreinformers.NamespaceInformer,
|
||||
resyncPeriod time.Duration,
|
||||
finalizerToken v1.FinalizerName) *NamespaceController {
|
||||
|
||||
@@ -88,18 +91,8 @@ func NewNamespaceController(
|
||||
metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.Core().RESTClient().GetRateLimiter())
|
||||
}
|
||||
|
||||
// configure the backing store/controller
|
||||
store, controller := cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return kubeClient.Core().Namespaces().List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return kubeClient.Core().Namespaces().Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.Namespace{},
|
||||
resyncPeriod,
|
||||
// configure the namespace informer event handlers
|
||||
namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
namespace := obj.(*v1.Namespace)
|
||||
@@ -110,10 +103,11 @@ func NewNamespaceController(
|
||||
namespaceController.enqueueNamespace(namespace)
|
||||
},
|
||||
},
|
||||
resyncPeriod,
|
||||
)
|
||||
namespaceController.lister = namespaceInformer.Lister()
|
||||
namespaceController.listerSynced = namespaceInformer.Informer().HasSynced
|
||||
|
||||
namespaceController.store = store
|
||||
namespaceController.controller = controller
|
||||
return namespaceController
|
||||
}
|
||||
|
||||
@@ -122,7 +116,7 @@ func NewNamespaceController(
|
||||
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
// delay processing namespace events to allow HA api servers to observe namespace deletion,
|
||||
@@ -175,28 +169,35 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
|
||||
startTime := time.Now()
|
||||
defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime))
|
||||
|
||||
obj, exists, err := nm.store.GetByKey(key)
|
||||
if !exists {
|
||||
namespace, err := nm.lister.Get(key)
|
||||
if errors.IsNotFound(err) {
|
||||
glog.Infof("Namespace has been deleted %v", key)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to retrieve namespace %v from store: %v", key, err)
|
||||
nm.queue.Add(key)
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
|
||||
return err
|
||||
}
|
||||
namespace := obj.(*v1.Namespace)
|
||||
return nm.namespacedResourcesDeleter.Delete(namespace.Name)
|
||||
}
|
||||
|
||||
// Run starts observing the system with the specified number of workers.
|
||||
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
go nm.controller.Run(stopCh)
|
||||
defer nm.queue.ShutDown()
|
||||
|
||||
glog.Info("Starting the NamespaceController")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, nm.listerSynced) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(nm.worker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down NamespaceController")
|
||||
nm.queue.ShutDown()
|
||||
|
||||
glog.Info("Shutting down NamespaceController")
|
||||
}
|
||||
|
Reference in New Issue
Block a user