began turning attention to cache.Controller
This commit is contained in:
		| @@ -26,7 +26,16 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/util/wait" | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Config contains all the settings for a Controller. | // This file implements a low-level controller that is used in | ||||||
|  | // sharedIndexInformer, which is an implementation of | ||||||
|  | // SharedIndexInformer.  Such informers, in turn, are key components | ||||||
|  | // in the high level controllers that form the backbone of the | ||||||
|  | // Kubernetes control plane.  Look at those for examples, or the | ||||||
|  | // example in | ||||||
|  | // https://github.com/kubernetes/client-go/tree/master/examples/workqueue | ||||||
|  | // . | ||||||
|  |  | ||||||
|  | // Config contains all the settings for one of these low-level controllers. | ||||||
| type Config struct { | type Config struct { | ||||||
| 	// The queue for your objects - has to be a DeltaFIFO due to | 	// The queue for your objects - has to be a DeltaFIFO due to | ||||||
| 	// assumptions in the implementation. Your Process() function | 	// assumptions in the implementation. Your Process() function | ||||||
| @@ -36,19 +45,16 @@ type Config struct { | |||||||
| 	// Something that can list and watch your objects. | 	// Something that can list and watch your objects. | ||||||
| 	ListerWatcher | 	ListerWatcher | ||||||
|  |  | ||||||
| 	// Something that can process your objects. | 	// Something that can process a popped Deltas. | ||||||
| 	Process ProcessFunc | 	Process ProcessFunc | ||||||
|  |  | ||||||
| 	// The type of your objects. | 	// ObjectType is an example object of the type this controller is | ||||||
|  | 	// expected to handle.  Only the type needs to be right, except | ||||||
|  | 	// that when that is `unstructured.Unstructured` the object's | ||||||
|  | 	// `"apiVersion"` must also be right. | ||||||
| 	ObjectType runtime.Object | 	ObjectType runtime.Object | ||||||
|  |  | ||||||
| 	// Reprocess everything at least this often. | 	// FullResyncPeriod is the period at which ShouldResync is invoked. | ||||||
| 	// Note that if it takes longer for you to clear the queue than this |  | ||||||
| 	// period, you will end up processing items in the order determined |  | ||||||
| 	// by FIFO.Replace(). Currently, this is random. If this is a |  | ||||||
| 	// problem, we can change that replacement policy to append new |  | ||||||
| 	// things to the end of the queue instead of replacing the entire |  | ||||||
| 	// queue. |  | ||||||
| 	FullResyncPeriod time.Duration | 	FullResyncPeriod time.Duration | ||||||
|  |  | ||||||
| 	// ShouldResync, if specified, is invoked when the controller's reflector determines the next | 	// ShouldResync, if specified, is invoked when the controller's reflector determines the next | ||||||
| @@ -71,7 +77,7 @@ type ShouldResyncFunc func() bool | |||||||
| // ProcessFunc processes a single object. | // ProcessFunc processes a single object. | ||||||
| type ProcessFunc func(obj interface{}) error | type ProcessFunc func(obj interface{}) error | ||||||
|  |  | ||||||
| // Controller is a generic controller framework. | // `*controller` implements Controller | ||||||
| type controller struct { | type controller struct { | ||||||
| 	config         Config | 	config         Config | ||||||
| 	reflector      *Reflector | 	reflector      *Reflector | ||||||
| @@ -79,7 +85,7 @@ type controller struct { | |||||||
| 	clock          clock.Clock | 	clock          clock.Clock | ||||||
| } | } | ||||||
|  |  | ||||||
| // Controller is a generic controller framework. | // Controller is a low-level controller used in sharedIndexInformer. | ||||||
| type Controller interface { | type Controller interface { | ||||||
| 	Run(stopCh <-chan struct{}) | 	Run(stopCh <-chan struct{}) | ||||||
| 	HasSynced() bool | 	HasSynced() bool | ||||||
| @@ -95,7 +101,7 @@ func New(c *Config) Controller { | |||||||
| 	return ctlr | 	return ctlr | ||||||
| } | } | ||||||
|  |  | ||||||
| // Run begins processing items, and will continue until a value is sent down stopCh. | // Run begins processing items, and will continue until a value is sent down stopCh or it is closed. | ||||||
| // It's an error to call Run more than once. | // It's an error to call Run more than once. | ||||||
| // Run blocks; call via go. | // Run blocks; call via go. | ||||||
| func (c *controller) Run(stopCh <-chan struct{}) { | func (c *controller) Run(stopCh <-chan struct{}) { | ||||||
|   | |||||||
| @@ -55,7 +55,10 @@ type Reflector struct { | |||||||
| 	// stringification of expectedType otherwise. It is for display | 	// stringification of expectedType otherwise. It is for display | ||||||
| 	// only, and should not be used for parsing or comparison. | 	// only, and should not be used for parsing or comparison. | ||||||
| 	expectedTypeName string | 	expectedTypeName string | ||||||
| 	// The type of object we expect to place in the store. | 	// An example object of the type we expect to place in the store. | ||||||
|  | 	// Only the type needs to be right, except that when that is | ||||||
|  | 	// `unstructured.Unstructured` the object's `"apiVersion"` must | ||||||
|  | 	// also be right. | ||||||
| 	expectedType reflect.Type | 	expectedType reflect.Type | ||||||
| 	// The GVK of the object we expect to place in the store if unstructured. | 	// The GVK of the object we expect to place in the store if unstructured. | ||||||
| 	expectedGVK *schema.GroupVersionKind | 	expectedGVK *schema.GroupVersionKind | ||||||
| @@ -63,10 +66,12 @@ type Reflector struct { | |||||||
| 	store Store | 	store Store | ||||||
| 	// listerWatcher is used to perform lists and watches. | 	// listerWatcher is used to perform lists and watches. | ||||||
| 	listerWatcher ListerWatcher | 	listerWatcher ListerWatcher | ||||||
| 	// period controls timing between one watch ending and | 	// period controls timing between an unsuccessful watch ending and | ||||||
| 	// the beginning of the next one. | 	// the beginning of the next list. | ||||||
| 	period       time.Duration | 	period       time.Duration | ||||||
|  | 	// The period at which ShouldResync is invoked | ||||||
| 	resyncPeriod time.Duration | 	resyncPeriod time.Duration | ||||||
|  | 	// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked | ||||||
| 	ShouldResync func() bool | 	ShouldResync func() bool | ||||||
| 	// clock allows tests to manipulate time | 	// clock allows tests to manipulate time | ||||||
| 	clock clock.Clock | 	clock clock.Clock | ||||||
| @@ -98,12 +103,16 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa | |||||||
| 	return indexer, reflector | 	return indexer, reflector | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewReflector creates a new Reflector object which will keep the given store up to | // NewReflector creates a new Reflector object which will keep the | ||||||
| // date with the server's contents for the given resource. Reflector promises to | // given store up to date with the server's contents for the given | ||||||
| // only put things in the store that have the type of expectedType, unless expectedType | // resource. Reflector promises to only put things in the store that | ||||||
| // is nil. If resyncPeriod is non-zero, then lists will be executed after every | // have the type of expectedType, unless expectedType is nil. If | ||||||
| // resyncPeriod, so that you can use reflectors to periodically process everything as | // resyncPeriod is non-zero, then the reflector will periodically | ||||||
| // well as incrementally processing the things that change. | // consult its ShouldResync function to determine whether to invoke | ||||||
|  | // the Store's Resync operation; `ShouldResync==nil` means always | ||||||
|  | // "yes".  This enables you to use reflectors to periodically process | ||||||
|  | // everything as well as incrementally processing the things that | ||||||
|  | // change. | ||||||
| func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { | func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { | ||||||
| 	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) | 	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) | ||||||
| } | } | ||||||
| @@ -147,7 +156,8 @@ func (r *Reflector) setExpectedType(expectedType interface{}) { | |||||||
| // call chains to NewReflector, so they'd be low entropy names for reflectors | // call chains to NewReflector, so they'd be low entropy names for reflectors | ||||||
| var internalPackages = []string{"client-go/tools/cache/"} | var internalPackages = []string{"client-go/tools/cache/"} | ||||||
|  |  | ||||||
| // Run starts a watch and handles watch events. Will restart the watch if it is closed. | // Run repeatedly uses the reflector's ListAndWatch to fetch all the | ||||||
|  | // objects and subsequent deltas. | ||||||
| // Run will exit when stopCh is closed. | // Run will exit when stopCh is closed. | ||||||
| func (r *Reflector) Run(stopCh <-chan struct{}) { | func (r *Reflector) Run(stopCh <-chan struct{}) { | ||||||
| 	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) | 	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) | ||||||
|   | |||||||
| @@ -159,21 +159,21 @@ type SharedIndexInformer interface { | |||||||
| } | } | ||||||
|  |  | ||||||
| // NewSharedInformer creates a new instance for the listwatcher. | // NewSharedInformer creates a new instance for the listwatcher. | ||||||
| func NewSharedInformer(lw ListerWatcher, objOfWatchedType runtime.Object, resyncPeriod time.Duration) SharedInformer { | func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, resyncPeriod time.Duration) SharedInformer { | ||||||
| 	return NewSharedIndexInformer(lw, objOfWatchedType, resyncPeriod, Indexers{}) | 	return NewSharedIndexInformer(lw, exampleObject, resyncPeriod, Indexers{}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewSharedIndexInformer creates a new instance for the listwatcher. | // NewSharedIndexInformer creates a new instance for the listwatcher. | ||||||
| func NewSharedIndexInformer(lw ListerWatcher, objOfWatchedType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { | func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { | ||||||
| 	realClock := &clock.RealClock{} | 	realClock := &clock.RealClock{} | ||||||
| 	sharedIndexInformer := &sharedIndexInformer{ | 	sharedIndexInformer := &sharedIndexInformer{ | ||||||
| 		processor:                       &sharedProcessor{clock: realClock}, | 		processor:                       &sharedProcessor{clock: realClock}, | ||||||
| 		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), | 		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), | ||||||
| 		listerWatcher:                   lw, | 		listerWatcher:                   lw, | ||||||
| 		objectType:                      objOfWatchedType, | 		objectType:                      exampleObject, | ||||||
| 		resyncCheckPeriod:               defaultEventHandlerResyncPeriod, | 		resyncCheckPeriod:               defaultEventHandlerResyncPeriod, | ||||||
| 		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, | 		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, | ||||||
| 		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objOfWatchedType)), | 		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), | ||||||
| 		clock:                           realClock, | 		clock:                           realClock, | ||||||
| 	} | 	} | ||||||
| 	return sharedIndexInformer | 	return sharedIndexInformer | ||||||
| @@ -236,6 +236,11 @@ type sharedIndexInformer struct { | |||||||
| 	cacheMutationDetector MutationDetector | 	cacheMutationDetector MutationDetector | ||||||
|  |  | ||||||
| 	listerWatcher ListerWatcher | 	listerWatcher ListerWatcher | ||||||
|  |  | ||||||
|  | 	// objectType is an example object of the type this informer is | ||||||
|  | 	// expected to handle.  Only the type needs to be right, except | ||||||
|  | 	// that when that is `unstructured.Unstructured` the object's | ||||||
|  | 	// `"apiVersion"` must also be right. | ||||||
| 	objectType    runtime.Object | 	objectType    runtime.Object | ||||||
|  |  | ||||||
| 	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call | 	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call | ||||||
| @@ -600,9 +605,10 @@ type processorListener struct { | |||||||
| 	// full resync from the shared informer, but modified by two | 	// full resync from the shared informer, but modified by two | ||||||
| 	// adjustments.  One is imposing a lower bound, | 	// adjustments.  One is imposing a lower bound, | ||||||
| 	// `minimumResyncPeriod`.  The other is another lower bound, the | 	// `minimumResyncPeriod`.  The other is another lower bound, the | ||||||
| 	// sharedProcessor's `resyncCheckPeriod`, that is imposed in | 	// sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only | ||||||
| 	// AddEventHandlerWithResyncPeriod invocations made after the | 	// in AddEventHandlerWithResyncPeriod invocations made after the | ||||||
| 	// sharedProcessor starts. | 	// sharedProcessor starts and (b) only if the informer does | ||||||
|  | 	// resyncs at all. | ||||||
| 	requestedResyncPeriod time.Duration | 	requestedResyncPeriod time.Duration | ||||||
| 	// resyncPeriod is the threshold that will be used in the logic | 	// resyncPeriod is the threshold that will be used in the logic | ||||||
| 	// for this listener.  This value differs from | 	// for this listener.  This value differs from | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Mike Spreitzer
					Mike Spreitzer