Merge pull request #124245 from wojtek-t/informer_options
Allow for configuring MinWatchTimeout in reflector
This commit is contained in:
		@@ -224,12 +224,16 @@ func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheIte
 | 
				
			|||||||
		return c.watchObject(namespace, options)
 | 
							return c.watchObject(namespace, options)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	store := c.newStore()
 | 
						store := c.newStore()
 | 
				
			||||||
	reflector := cache.NewNamedReflector(
 | 
						reflector := cache.NewReflectorWithOptions(
 | 
				
			||||||
		fmt.Sprintf("object-%q/%q", namespace, name),
 | 
					 | 
				
			||||||
		&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
 | 
							&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
 | 
				
			||||||
		c.newObject(),
 | 
							c.newObject(),
 | 
				
			||||||
		store,
 | 
							store,
 | 
				
			||||||
		0,
 | 
							cache.ReflectorOptions{
 | 
				
			||||||
 | 
								Name: fmt.Sprintf("object-%q/%q", namespace, name),
 | 
				
			||||||
 | 
								// Bump default 5m MinWatchTimeout to avoid recreating
 | 
				
			||||||
 | 
								// watches too often.
 | 
				
			||||||
 | 
								MinWatchTimeout: 30 * time.Minute,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	item := &objectCacheItem{
 | 
						item := &objectCacheItem{
 | 
				
			||||||
		refMap:    make(map[types.UID]int),
 | 
							refMap:    make(map[types.UID]int),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -59,6 +59,12 @@ type Config struct {
 | 
				
			|||||||
	// FullResyncPeriod is the period at which ShouldResync is considered.
 | 
						// FullResyncPeriod is the period at which ShouldResync is considered.
 | 
				
			||||||
	FullResyncPeriod time.Duration
 | 
						FullResyncPeriod time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
 | 
				
			||||||
 | 
						// to kube-apiserver. However, values lower than 5m will not be honored to avoid
 | 
				
			||||||
 | 
						// negative performance impact on controlplane.
 | 
				
			||||||
 | 
						// Optional - if unset a default value of 5m will be used.
 | 
				
			||||||
 | 
						MinWatchTimeout time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// ShouldResync is periodically used by the reflector to determine
 | 
						// ShouldResync is periodically used by the reflector to determine
 | 
				
			||||||
	// whether to Resync the Queue. If ShouldResync is `nil` or
 | 
						// whether to Resync the Queue. If ShouldResync is `nil` or
 | 
				
			||||||
	// returns true, it means the reflector should proceed with the
 | 
						// returns true, it means the reflector should proceed with the
 | 
				
			||||||
@@ -138,6 +144,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
 | 
				
			|||||||
		c.config.Queue,
 | 
							c.config.Queue,
 | 
				
			||||||
		ReflectorOptions{
 | 
							ReflectorOptions{
 | 
				
			||||||
			ResyncPeriod:    c.config.FullResyncPeriod,
 | 
								ResyncPeriod:    c.config.FullResyncPeriod,
 | 
				
			||||||
 | 
								MinWatchTimeout: c.config.MinWatchTimeout,
 | 
				
			||||||
			TypeDescription: c.config.ObjectDescription,
 | 
								TypeDescription: c.config.ObjectDescription,
 | 
				
			||||||
			Clock:           c.clock,
 | 
								Clock:           c.clock,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
@@ -346,6 +353,58 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
 | 
				
			|||||||
	return ObjectToName(obj)
 | 
						return ObjectToName(obj)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// InformerOptions configure a Reflector.
 | 
				
			||||||
 | 
					type InformerOptions struct {
 | 
				
			||||||
 | 
						// ListerWatcher implements List and Watch functions for the source of the resource
 | 
				
			||||||
 | 
						// the informer will be informing about.
 | 
				
			||||||
 | 
						ListerWatcher ListerWatcher
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ObjectType is an object of the type that informer is expected to receive.
 | 
				
			||||||
 | 
						ObjectType runtime.Object
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Handler defines functions that should called on object mutations.
 | 
				
			||||||
 | 
						Handler ResourceEventHandler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ResyncPeriod is the underlying Reflector's resync period. If non-zero, the store
 | 
				
			||||||
 | 
						// is re-synced with that frequency - Modify events are delivered even if objects
 | 
				
			||||||
 | 
						// didn't change.
 | 
				
			||||||
 | 
						// This is useful for synchronizing objects that configure external resources
 | 
				
			||||||
 | 
						// (e.g. configure cloud provider functionalities).
 | 
				
			||||||
 | 
						// Optional - if unset, store resyncing is not happening periodically.
 | 
				
			||||||
 | 
						ResyncPeriod time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
 | 
				
			||||||
 | 
						// to kube-apiserver. However, values lower than 5m will not be honored to avoid
 | 
				
			||||||
 | 
						// negative performance impact on controlplane.
 | 
				
			||||||
 | 
						// Optional - if unset a default value of 5m will be used.
 | 
				
			||||||
 | 
						MinWatchTimeout time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Indexers, if set, are the indexers for the received objects to optimize
 | 
				
			||||||
 | 
						// certain queries.
 | 
				
			||||||
 | 
						// Optional - if unset no indexes are maintained.
 | 
				
			||||||
 | 
						Indexers Indexers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Transform function, if set, will be called on all objects before they will be
 | 
				
			||||||
 | 
						// put into the Store and corresponding Add/Modify/Delete handlers will be invoked
 | 
				
			||||||
 | 
						// for them.
 | 
				
			||||||
 | 
						// Optional - if unset no additional transforming is happening.
 | 
				
			||||||
 | 
						Transform TransformFunc
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewInformerWithOptions returns a Store and a controller for populating the store
 | 
				
			||||||
 | 
					// while also providing event notifications. You should only used the returned
 | 
				
			||||||
 | 
					// Store for Get/List operations; Add/Modify/Deletes will cause the event
 | 
				
			||||||
 | 
					// notifications to be faulty.
 | 
				
			||||||
 | 
					func NewInformerWithOptions(options InformerOptions) (Store, Controller) {
 | 
				
			||||||
 | 
						var clientState Store
 | 
				
			||||||
 | 
						if options.Indexers == nil {
 | 
				
			||||||
 | 
							clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return clientState, newInformer(clientState, options)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewInformer returns a Store and a controller for populating the store
 | 
					// NewInformer returns a Store and a controller for populating the store
 | 
				
			||||||
// while also providing event notifications. You should only used the returned
 | 
					// while also providing event notifications. You should only used the returned
 | 
				
			||||||
// Store for Get/List operations; Add/Modify/Deletes will cause the event
 | 
					// Store for Get/List operations; Add/Modify/Deletes will cause the event
 | 
				
			||||||
@@ -360,6 +419,8 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
 | 
				
			|||||||
//     long as possible (until the upstream source closes the watch or times out,
 | 
					//     long as possible (until the upstream source closes the watch or times out,
 | 
				
			||||||
//     or you stop the controller).
 | 
					//     or you stop the controller).
 | 
				
			||||||
//   - h is the object you want notifications sent to.
 | 
					//   - h is the object you want notifications sent to.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Deprecated: Use NewInformerWithOptions instead.
 | 
				
			||||||
func NewInformer(
 | 
					func NewInformer(
 | 
				
			||||||
	lw ListerWatcher,
 | 
						lw ListerWatcher,
 | 
				
			||||||
	objType runtime.Object,
 | 
						objType runtime.Object,
 | 
				
			||||||
@@ -369,7 +430,13 @@ func NewInformer(
 | 
				
			|||||||
	// This will hold the client state, as we know it.
 | 
						// This will hold the client state, as we know it.
 | 
				
			||||||
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
						clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
 | 
						options := InformerOptions{
 | 
				
			||||||
 | 
							ListerWatcher: lw,
 | 
				
			||||||
 | 
							ObjectType:    objType,
 | 
				
			||||||
 | 
							Handler:       h,
 | 
				
			||||||
 | 
							ResyncPeriod:  resyncPeriod,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return clientState, newInformer(clientState, options)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewIndexerInformer returns an Indexer and a Controller for populating the index
 | 
					// NewIndexerInformer returns an Indexer and a Controller for populating the index
 | 
				
			||||||
@@ -387,6 +454,8 @@ func NewInformer(
 | 
				
			|||||||
//     or you stop the controller).
 | 
					//     or you stop the controller).
 | 
				
			||||||
//   - h is the object you want notifications sent to.
 | 
					//   - h is the object you want notifications sent to.
 | 
				
			||||||
//   - indexers is the indexer for the received object type.
 | 
					//   - indexers is the indexer for the received object type.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Deprecated: Use NewInformerWithOptions instead.
 | 
				
			||||||
func NewIndexerInformer(
 | 
					func NewIndexerInformer(
 | 
				
			||||||
	lw ListerWatcher,
 | 
						lw ListerWatcher,
 | 
				
			||||||
	objType runtime.Object,
 | 
						objType runtime.Object,
 | 
				
			||||||
@@ -397,7 +466,14 @@ func NewIndexerInformer(
 | 
				
			|||||||
	// This will hold the client state, as we know it.
 | 
						// This will hold the client state, as we know it.
 | 
				
			||||||
	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | 
						clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
 | 
						options := InformerOptions{
 | 
				
			||||||
 | 
							ListerWatcher: lw,
 | 
				
			||||||
 | 
							ObjectType:    objType,
 | 
				
			||||||
 | 
							Handler:       h,
 | 
				
			||||||
 | 
							ResyncPeriod:  resyncPeriod,
 | 
				
			||||||
 | 
							Indexers:      indexers,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return clientState, newInformer(clientState, options)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewTransformingInformer returns a Store and a controller for populating
 | 
					// NewTransformingInformer returns a Store and a controller for populating
 | 
				
			||||||
@@ -407,6 +483,8 @@ func NewIndexerInformer(
 | 
				
			|||||||
// The given transform function will be called on all objects before they will
 | 
					// The given transform function will be called on all objects before they will
 | 
				
			||||||
// put into the Store and corresponding Add/Modify/Delete handlers will
 | 
					// put into the Store and corresponding Add/Modify/Delete handlers will
 | 
				
			||||||
// be invoked for them.
 | 
					// be invoked for them.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Deprecated: Use NewInformerWithOptions instead.
 | 
				
			||||||
func NewTransformingInformer(
 | 
					func NewTransformingInformer(
 | 
				
			||||||
	lw ListerWatcher,
 | 
						lw ListerWatcher,
 | 
				
			||||||
	objType runtime.Object,
 | 
						objType runtime.Object,
 | 
				
			||||||
@@ -417,7 +495,14 @@ func NewTransformingInformer(
 | 
				
			|||||||
	// This will hold the client state, as we know it.
 | 
						// This will hold the client state, as we know it.
 | 
				
			||||||
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
						clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
 | 
						options := InformerOptions{
 | 
				
			||||||
 | 
							ListerWatcher: lw,
 | 
				
			||||||
 | 
							ObjectType:    objType,
 | 
				
			||||||
 | 
							Handler:       h,
 | 
				
			||||||
 | 
							ResyncPeriod:  resyncPeriod,
 | 
				
			||||||
 | 
							Transform:     transformer,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return clientState, newInformer(clientState, options)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewTransformingIndexerInformer returns an Indexer and a controller for
 | 
					// NewTransformingIndexerInformer returns an Indexer and a controller for
 | 
				
			||||||
@@ -427,6 +512,8 @@ func NewTransformingInformer(
 | 
				
			|||||||
// The given transform function will be called on all objects before they will
 | 
					// The given transform function will be called on all objects before they will
 | 
				
			||||||
// be put into the Index and corresponding Add/Modify/Delete handlers will
 | 
					// be put into the Index and corresponding Add/Modify/Delete handlers will
 | 
				
			||||||
// be invoked for them.
 | 
					// be invoked for them.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Deprecated: Use NewInformerWithOptions instead.
 | 
				
			||||||
func NewTransformingIndexerInformer(
 | 
					func NewTransformingIndexerInformer(
 | 
				
			||||||
	lw ListerWatcher,
 | 
						lw ListerWatcher,
 | 
				
			||||||
	objType runtime.Object,
 | 
						objType runtime.Object,
 | 
				
			||||||
@@ -438,7 +525,15 @@ func NewTransformingIndexerInformer(
 | 
				
			|||||||
	// This will hold the client state, as we know it.
 | 
						// This will hold the client state, as we know it.
 | 
				
			||||||
	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | 
						clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
 | 
						options := InformerOptions{
 | 
				
			||||||
 | 
							ListerWatcher: lw,
 | 
				
			||||||
 | 
							ObjectType:    objType,
 | 
				
			||||||
 | 
							Handler:       h,
 | 
				
			||||||
 | 
							ResyncPeriod:  resyncPeriod,
 | 
				
			||||||
 | 
							Indexers:      indexers,
 | 
				
			||||||
 | 
							Transform:     transformer,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return clientState, newInformer(clientState, options)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
 | 
					// Multiplexes updates in the form of a list of Deltas into a Store, and informs
 | 
				
			||||||
@@ -481,42 +576,29 @@ func processDeltas(
 | 
				
			|||||||
// providing event notifications.
 | 
					// providing event notifications.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// Parameters
 | 
					// Parameters
 | 
				
			||||||
//   - lw is list and watch functions for the source of the resource you want to
 | 
					 | 
				
			||||||
//     be informed of.
 | 
					 | 
				
			||||||
//   - objType is an object of the type that you expect to receive.
 | 
					 | 
				
			||||||
//   - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
 | 
					 | 
				
			||||||
//     calls, even if nothing changed). Otherwise, re-list will be delayed as
 | 
					 | 
				
			||||||
//     long as possible (until the upstream source closes the watch or times out,
 | 
					 | 
				
			||||||
//     or you stop the controller).
 | 
					 | 
				
			||||||
//   - h is the object you want notifications sent to.
 | 
					 | 
				
			||||||
//   - clientState is the store you want to populate
 | 
					//   - clientState is the store you want to populate
 | 
				
			||||||
func newInformer(
 | 
					//   - options contain the options to configure the controller
 | 
				
			||||||
	lw ListerWatcher,
 | 
					func newInformer(clientState Store, options InformerOptions) Controller {
 | 
				
			||||||
	objType runtime.Object,
 | 
					 | 
				
			||||||
	resyncPeriod time.Duration,
 | 
					 | 
				
			||||||
	h ResourceEventHandler,
 | 
					 | 
				
			||||||
	clientState Store,
 | 
					 | 
				
			||||||
	transformer TransformFunc,
 | 
					 | 
				
			||||||
) Controller {
 | 
					 | 
				
			||||||
	// This will hold incoming changes. Note how we pass clientState in as a
 | 
						// This will hold incoming changes. Note how we pass clientState in as a
 | 
				
			||||||
	// KeyLister, that way resync operations will result in the correct set
 | 
						// KeyLister, that way resync operations will result in the correct set
 | 
				
			||||||
	// of update/delete deltas.
 | 
						// of update/delete deltas.
 | 
				
			||||||
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 | 
						fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 | 
				
			||||||
		KnownObjects:          clientState,
 | 
							KnownObjects:          clientState,
 | 
				
			||||||
		EmitDeltaTypeReplaced: true,
 | 
							EmitDeltaTypeReplaced: true,
 | 
				
			||||||
		Transformer:           transformer,
 | 
							Transformer:           options.Transform,
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cfg := &Config{
 | 
						cfg := &Config{
 | 
				
			||||||
		Queue:            fifo,
 | 
							Queue:            fifo,
 | 
				
			||||||
		ListerWatcher:    lw,
 | 
							ListerWatcher:    options.ListerWatcher,
 | 
				
			||||||
		ObjectType:       objType,
 | 
							ObjectType:       options.ObjectType,
 | 
				
			||||||
		FullResyncPeriod: resyncPeriod,
 | 
							FullResyncPeriod: options.ResyncPeriod,
 | 
				
			||||||
 | 
							MinWatchTimeout:  options.MinWatchTimeout,
 | 
				
			||||||
		RetryOnError:     false,
 | 
							RetryOnError:     false,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		Process: func(obj interface{}, isInInitialList bool) error {
 | 
							Process: func(obj interface{}, isInInitialList bool) error {
 | 
				
			||||||
			if deltas, ok := obj.(Deltas); ok {
 | 
								if deltas, ok := obj.(Deltas); ok {
 | 
				
			||||||
				return processDeltas(h, clientState, deltas, isInInitialList)
 | 
									return processDeltas(options.Handler, clientState, deltas, isInInitialList)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			return errors.New("object given as Process argument is not Deltas")
 | 
								return errors.New("object given as Process argument is not Deltas")
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -49,6 +49,12 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const defaultExpectedTypeName = "<unspecified>"
 | 
					const defaultExpectedTypeName = "<unspecified>"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						// We try to spread the load on apiserver by setting timeouts for
 | 
				
			||||||
 | 
						// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
 | 
				
			||||||
 | 
						defaultMinWatchTimeout = 5 * time.Minute
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
 | 
					// Reflector watches a specified resource and causes all changes to be reflected in the given store.
 | 
				
			||||||
type Reflector struct {
 | 
					type Reflector struct {
 | 
				
			||||||
	// name identifies this reflector. By default it will be a file:line if possible.
 | 
						// name identifies this reflector. By default it will be a file:line if possible.
 | 
				
			||||||
@@ -72,6 +78,8 @@ type Reflector struct {
 | 
				
			|||||||
	// backoff manages backoff of ListWatch
 | 
						// backoff manages backoff of ListWatch
 | 
				
			||||||
	backoffManager wait.BackoffManager
 | 
						backoffManager wait.BackoffManager
 | 
				
			||||||
	resyncPeriod   time.Duration
 | 
						resyncPeriod   time.Duration
 | 
				
			||||||
 | 
						// minWatchTimeout defines the minimum timeout for watch requests.
 | 
				
			||||||
 | 
						minWatchTimeout time.Duration
 | 
				
			||||||
	// clock allows tests to manipulate time
 | 
						// clock allows tests to manipulate time
 | 
				
			||||||
	clock clock.Clock
 | 
						clock clock.Clock
 | 
				
			||||||
	// paginatedResult defines whether pagination should be forced for list calls.
 | 
						// paginatedResult defines whether pagination should be forced for list calls.
 | 
				
			||||||
@@ -151,12 +159,6 @@ func DefaultWatchErrorHandler(r *Reflector, err error) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					 | 
				
			||||||
	// We try to spread the load on apiserver by setting timeouts for
 | 
					 | 
				
			||||||
	// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
 | 
					 | 
				
			||||||
	minWatchTimeout = 5 * time.Minute
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
 | 
					// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
 | 
				
			||||||
// The indexer is configured to key on namespace
 | 
					// The indexer is configured to key on namespace
 | 
				
			||||||
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
 | 
					func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
 | 
				
			||||||
@@ -194,6 +196,10 @@ type ReflectorOptions struct {
 | 
				
			|||||||
	// (do not resync).
 | 
						// (do not resync).
 | 
				
			||||||
	ResyncPeriod time.Duration
 | 
						ResyncPeriod time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// MinWatchTimeout, if non-zero, defines the minimum timeout for watch requests send to kube-apiserver.
 | 
				
			||||||
 | 
						// However, values lower than 5m will not be honored to avoid negative performance impact on controlplane.
 | 
				
			||||||
 | 
						MinWatchTimeout time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Clock allows tests to control time. If unset defaults to clock.RealClock{}
 | 
						// Clock allows tests to control time. If unset defaults to clock.RealClock{}
 | 
				
			||||||
	Clock clock.Clock
 | 
						Clock clock.Clock
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -213,9 +219,14 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
 | 
				
			|||||||
	if reflectorClock == nil {
 | 
						if reflectorClock == nil {
 | 
				
			||||||
		reflectorClock = clock.RealClock{}
 | 
							reflectorClock = clock.RealClock{}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						minWatchTimeout := defaultMinWatchTimeout
 | 
				
			||||||
 | 
						if options.MinWatchTimeout > defaultMinWatchTimeout {
 | 
				
			||||||
 | 
							minWatchTimeout = options.MinWatchTimeout
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	r := &Reflector{
 | 
						r := &Reflector{
 | 
				
			||||||
		name:            options.Name,
 | 
							name:            options.Name,
 | 
				
			||||||
		resyncPeriod:    options.ResyncPeriod,
 | 
							resyncPeriod:    options.ResyncPeriod,
 | 
				
			||||||
 | 
							minWatchTimeout: minWatchTimeout,
 | 
				
			||||||
		typeDescription: options.TypeDescription,
 | 
							typeDescription: options.TypeDescription,
 | 
				
			||||||
		listerWatcher:   lw,
 | 
							listerWatcher:   lw,
 | 
				
			||||||
		store:           store,
 | 
							store:           store,
 | 
				
			||||||
@@ -415,7 +426,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
 | 
				
			|||||||
		start := r.clock.Now()
 | 
							start := r.clock.Now()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if w == nil {
 | 
							if w == nil {
 | 
				
			||||||
			timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
 | 
								timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
 | 
				
			||||||
			options := metav1.ListOptions{
 | 
								options := metav1.ListOptions{
 | 
				
			||||||
				ResourceVersion: r.LastSyncResourceVersion(),
 | 
									ResourceVersion: r.LastSyncResourceVersion(),
 | 
				
			||||||
				// We want to avoid situations of hanging watchers. Stop any watchers that do not
 | 
									// We want to avoid situations of hanging watchers. Stop any watchers that do not
 | 
				
			||||||
@@ -642,7 +653,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
 | 
				
			|||||||
		// TODO(#115478): large "list", slow clients, slow network, p&f
 | 
							// TODO(#115478): large "list", slow clients, slow network, p&f
 | 
				
			||||||
		//  might slow down streaming and eventually fail.
 | 
							//  might slow down streaming and eventually fail.
 | 
				
			||||||
		//  maybe in such a case we should retry with an increased timeout?
 | 
							//  maybe in such a case we should retry with an increased timeout?
 | 
				
			||||||
		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
 | 
							timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
 | 
				
			||||||
		options := metav1.ListOptions{
 | 
							options := metav1.ListOptions{
 | 
				
			||||||
			ResourceVersion:      lastKnownRV,
 | 
								ResourceVersion:      lastKnownRV,
 | 
				
			||||||
			AllowWatchBookmarks:  true,
 | 
								AllowWatchBookmarks:  true,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1091,6 +1091,67 @@ func TestGetExpectedGVKFromObject(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchTimeout(t *testing.T) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						testCases := []struct {
 | 
				
			||||||
 | 
							name                      string
 | 
				
			||||||
 | 
							minWatchTimeout           time.Duration
 | 
				
			||||||
 | 
							expectedMinTimeoutSeconds int64
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                      "no timeout",
 | 
				
			||||||
 | 
								expectedMinTimeoutSeconds: 5 * 60,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                      "small timeout not honored",
 | 
				
			||||||
 | 
								minWatchTimeout:           time.Second,
 | 
				
			||||||
 | 
								expectedMinTimeoutSeconds: 5 * 60,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                      "30m timeout",
 | 
				
			||||||
 | 
								minWatchTimeout:           30 * time.Minute,
 | 
				
			||||||
 | 
								expectedMinTimeoutSeconds: 30 * 60,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, tc := range testCases {
 | 
				
			||||||
 | 
							t.Run(tc.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								stopCh := make(chan struct{})
 | 
				
			||||||
 | 
								s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
								var gotTimeoutSeconds int64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								lw := &testLW{
 | 
				
			||||||
 | 
									ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
				
			||||||
 | 
										return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
 | 
										if options.TimeoutSeconds != nil {
 | 
				
			||||||
 | 
											gotTimeoutSeconds = *options.TimeoutSeconds
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										// Stop once the reflector begins watching since we're only interested in the list.
 | 
				
			||||||
 | 
										close(stopCh)
 | 
				
			||||||
 | 
										return watch.NewFake(), nil
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								opts := ReflectorOptions{
 | 
				
			||||||
 | 
									MinWatchTimeout: tc.minWatchTimeout,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								r := NewReflectorWithOptions(lw, &v1.Pod{}, s, opts)
 | 
				
			||||||
 | 
								if err := r.ListAndWatch(stopCh); err != nil {
 | 
				
			||||||
 | 
									t.Fatal(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								minExpected := tc.expectedMinTimeoutSeconds
 | 
				
			||||||
 | 
								maxExpected := 2 * tc.expectedMinTimeoutSeconds
 | 
				
			||||||
 | 
								if gotTimeoutSeconds < minExpected || gotTimeoutSeconds > maxExpected {
 | 
				
			||||||
 | 
									t.Errorf("unexpected TimeoutSecond, got %v, expected in [%v, %v]", gotTimeoutSeconds, minExpected, maxExpected)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type storeWithRV struct {
 | 
					type storeWithRV struct {
 | 
				
			||||||
	Store
 | 
						Store
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user