Updated comments on internal abstractions in client-go/tools/cache
The comments on Store and Queue and the FIFOs ceased being accurate long ago.
This commit is contained in:
		@@ -26,7 +26,7 @@ import (
 | 
				
			|||||||
	"k8s.io/klog"
 | 
						"k8s.io/klog"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewDeltaFIFO returns a Store which can be used process changes to items.
 | 
					// NewDeltaFIFO returns a Queue which can be used to process changes to items.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// keyFunc is used to figure out what key an object should have. (It's
 | 
					// keyFunc is used to figure out what key an object should have. (It's
 | 
				
			||||||
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
 | 
					// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
 | 
				
			||||||
@@ -67,7 +67,9 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
 | 
				
			|||||||
	return f
 | 
						return f
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DeltaFIFO is like FIFO, but allows you to process deletes.
 | 
					// DeltaFIFO is like FIFO, but allows the PopProcessFunc to process
 | 
				
			||||||
 | 
					// deletes.  The accumulator associated with a given object's key is a
 | 
				
			||||||
 | 
					// slice of Delta values for that object.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// DeltaFIFO is a producer-consumer queue, where a Reflector is
 | 
					// DeltaFIFO is a producer-consumer queue, where a Reflector is
 | 
				
			||||||
// intended to be the producer, and the consumer is whatever calls
 | 
					// intended to be the producer, and the consumer is whatever calls
 | 
				
			||||||
@@ -77,22 +79,25 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
 | 
				
			|||||||
//  * You want to process every object change (delta) at most once.
 | 
					//  * You want to process every object change (delta) at most once.
 | 
				
			||||||
//  * When you process an object, you want to see everything
 | 
					//  * When you process an object, you want to see everything
 | 
				
			||||||
//    that's happened to it since you last processed it.
 | 
					//    that's happened to it since you last processed it.
 | 
				
			||||||
//  * You want to process the deletion of objects.
 | 
					//  * You want to process the deletion of some of the objects.
 | 
				
			||||||
//  * You might want to periodically reprocess objects.
 | 
					//  * You might want to periodically reprocess objects.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
 | 
					// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
 | 
				
			||||||
// interface{} to satisfy the Store/Queue interfaces, but it
 | 
					// interface{} to satisfy the Store/Queue interfaces, but it
 | 
				
			||||||
// will always return an object of type Deltas.
 | 
					// will always return an object of type Deltas.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
 | 
					// A DeltaFIFO's knownObjects KeyListerGetter provides get/list access
 | 
				
			||||||
 | 
					// to a set of "known objects" that is used for two purposes.  One is
 | 
				
			||||||
 | 
					// to conditionalize delete operations: it is only for a known object
 | 
				
			||||||
 | 
					// that a Delete Delta is recorded (this applies to both Delete and
 | 
				
			||||||
 | 
					// Replace).  The deleted object will be included in the
 | 
				
			||||||
 | 
					// DeleteFinalStateUnknown markers, and those objects could be stale.
 | 
				
			||||||
 | 
					// The other purpose is in the Resync operation, which adds a Sync
 | 
				
			||||||
 | 
					// Delta for every known object.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
// A note on threading: If you call Pop() in parallel from multiple
 | 
					// A note on threading: If you call Pop() in parallel from multiple
 | 
				
			||||||
// threads, you could end up with multiple threads processing slightly
 | 
					// threads, you could end up with multiple threads processing slightly
 | 
				
			||||||
// different versions of the same object.
 | 
					// different versions of the same object.
 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
 | 
					 | 
				
			||||||
// to list keys that are "known", for the purpose of figuring out which
 | 
					 | 
				
			||||||
// items have been deleted when Replace() or Delete() are called. The deleted
 | 
					 | 
				
			||||||
// object will be included in the DeleteFinalStateUnknown markers. These objects
 | 
					 | 
				
			||||||
// could be stale.
 | 
					 | 
				
			||||||
type DeltaFIFO struct {
 | 
					type DeltaFIFO struct {
 | 
				
			||||||
	// lock/cond protects access to 'items' and 'queue'.
 | 
						// lock/cond protects access to 'items' and 'queue'.
 | 
				
			||||||
	lock sync.RWMutex
 | 
						lock sync.RWMutex
 | 
				
			||||||
@@ -187,7 +192,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// Delete is just like Add, but makes an Deleted Delta. If the item does not
 | 
					// Delete is just like Add, but makes an Deleted Delta. If the item does not
 | 
				
			||||||
// already exist, it will be ignored. (It may have already been deleted by a
 | 
					// already exist, it will be ignored. (It may have already been deleted by a
 | 
				
			||||||
// Replace (re-list), for example.
 | 
					// Replace (re-list), for example.)
 | 
				
			||||||
func (f *DeltaFIFO) Delete(obj interface{}) error {
 | 
					func (f *DeltaFIFO) Delete(obj interface{}) error {
 | 
				
			||||||
	id, err := f.KeyOf(obj)
 | 
						id, err := f.KeyOf(obj)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -313,6 +318,9 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
 | 
				
			|||||||
		f.items[id] = newDeltas
 | 
							f.items[id] = newDeltas
 | 
				
			||||||
		f.cond.Broadcast()
 | 
							f.cond.Broadcast()
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
 | 
							// This never happens, because dedupDeltas never returns an empty list
 | 
				
			||||||
 | 
							// when given a non-empty list (as it is here).
 | 
				
			||||||
 | 
							// But if somehow it ever does return an empty list, then
 | 
				
			||||||
		// We need to remove this from our map (extra items in the queue are
 | 
							// We need to remove this from our map (extra items in the queue are
 | 
				
			||||||
		// ignored if they are not in the map).
 | 
							// ignored if they are not in the map).
 | 
				
			||||||
		delete(f.items, id)
 | 
							delete(f.items, id)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										56
									
								
								staging/src/k8s.io/client-go/tools/cache/fifo.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										56
									
								
								staging/src/k8s.io/client-go/tools/cache/fifo.go
									
									
									
									
										vendored
									
									
								
							@@ -24,7 +24,7 @@ import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// PopProcessFunc is passed to Pop() method of Queue interface.
 | 
					// PopProcessFunc is passed to Pop() method of Queue interface.
 | 
				
			||||||
// It is supposed to process the element popped from the queue.
 | 
					// It is supposed to process the accumulator popped from the queue.
 | 
				
			||||||
type PopProcessFunc func(interface{}) error
 | 
					type PopProcessFunc func(interface{}) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ErrRequeue may be returned by a PopProcessFunc to safely requeue
 | 
					// ErrRequeue may be returned by a PopProcessFunc to safely requeue
 | 
				
			||||||
@@ -44,26 +44,38 @@ func (e ErrRequeue) Error() string {
 | 
				
			|||||||
	return e.Err.Error()
 | 
						return e.Err.Error()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Queue is exactly like a Store, but has a Pop() method too.
 | 
					// Queue extends Store with a collection of keys to "process".
 | 
				
			||||||
 | 
					// Every Add, Update, or Delete may put the object's key in that collection.
 | 
				
			||||||
 | 
					// A Queue has a way to derive the corresponding key given an accumulator.
 | 
				
			||||||
 | 
					// A Queue can be accessed concurrently from multiple goroutines.
 | 
				
			||||||
 | 
					// A Queue can be "closed", after which Pop operations return an error.
 | 
				
			||||||
type Queue interface {
 | 
					type Queue interface {
 | 
				
			||||||
	Store
 | 
						Store
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Pop blocks until it has something to process.
 | 
						// Pop blocks until there is at least one key to process or the
 | 
				
			||||||
	// It returns the object that was process and the result of processing.
 | 
						// Queue is closed.  In the latter case Pop returns with an error.
 | 
				
			||||||
	// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
 | 
						// In the former case Pop atomically picks one key to process,
 | 
				
			||||||
	// should be requeued before releasing the lock on the queue.
 | 
						// removes that (key, accumulator) association from the Store, and
 | 
				
			||||||
 | 
						// processes the accumulator.  Pop returns the accumulator that
 | 
				
			||||||
 | 
						// was processed and the result of processing.  The PopProcessFunc
 | 
				
			||||||
 | 
						// may return an ErrRequeue{inner} and in this case Pop will (a)
 | 
				
			||||||
 | 
						// return that (key, accumulator) association to the Queue as part
 | 
				
			||||||
 | 
						// of the atomic processing and (b) return the inner error from
 | 
				
			||||||
 | 
						// Pop.
 | 
				
			||||||
	Pop(PopProcessFunc) (interface{}, error)
 | 
						Pop(PopProcessFunc) (interface{}, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// AddIfNotPresent adds a value previously
 | 
						// AddIfNotPresent puts the given accumulator into the Queue (in
 | 
				
			||||||
	// returned by Pop back into the queue as long
 | 
						// association with the accumulator's key) if and only if that key
 | 
				
			||||||
	// as nothing else (presumably more recent)
 | 
						// is not already associated with a non-empty accumulator.
 | 
				
			||||||
	// has since been added.
 | 
					 | 
				
			||||||
	AddIfNotPresent(interface{}) error
 | 
						AddIfNotPresent(interface{}) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// HasSynced returns true if the first batch of items has been popped
 | 
						// HasSynced returns true if the first batch of keys have all been
 | 
				
			||||||
 | 
						// popped.  The first batch of keys are those of the first Replace
 | 
				
			||||||
 | 
						// operation if that happened before any Add, Update, or Delete;
 | 
				
			||||||
 | 
						// otherwise the first batch is empty.
 | 
				
			||||||
	HasSynced() bool
 | 
						HasSynced() bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Close queue
 | 
						// Close the queue
 | 
				
			||||||
	Close()
 | 
						Close()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -79,11 +91,16 @@ func Pop(queue Queue) interface{} {
 | 
				
			|||||||
	return result
 | 
						return result
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
 | 
					// FIFO is a Queue in which (a) each accumulator is simply the most
 | 
				
			||||||
// FIFO order processing. If multiple adds/updates of a single item happen while
 | 
					// recently provided object and (b) the collection of keys to process
 | 
				
			||||||
// an item is in the queue before it has been processed, it will only be
 | 
					// is a FIFO.  The accumulators all start out empty, and deleting an
 | 
				
			||||||
// processed once, and when it is processed, the most recent version will be
 | 
					// object from its accumulator empties the accumulator.  The Resync
 | 
				
			||||||
// processed. This can't be done with a channel.
 | 
					// operation is a no-op.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Thus: if multiple adds/updates of a single object happen while that
 | 
				
			||||||
 | 
					// object's key is in the queue before it has been processed then it
 | 
				
			||||||
 | 
					// will only be processed once, and when it is processed the most
 | 
				
			||||||
 | 
					// recent version will be processed. This can't be done with a channel
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// FIFO solves this use case:
 | 
					// FIFO solves this use case:
 | 
				
			||||||
//  * You want to process every object (exactly) once.
 | 
					//  * You want to process every object (exactly) once.
 | 
				
			||||||
@@ -94,7 +111,7 @@ func Pop(queue Queue) interface{} {
 | 
				
			|||||||
type FIFO struct {
 | 
					type FIFO struct {
 | 
				
			||||||
	lock sync.RWMutex
 | 
						lock sync.RWMutex
 | 
				
			||||||
	cond sync.Cond
 | 
						cond sync.Cond
 | 
				
			||||||
	// We depend on the property that items in the set are in the queue and vice versa.
 | 
						// We depend on the property that every key in `items` is also in `queue`
 | 
				
			||||||
	items map[string]interface{}
 | 
						items map[string]interface{}
 | 
				
			||||||
	queue []string
 | 
						queue []string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -326,7 +343,8 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Resync will touch all objects to put them into the processing queue
 | 
					// Resync will ensure that every object in the Store has its key in the queue.
 | 
				
			||||||
 | 
					// This should be a no-op, because that property is maintained by all operations.
 | 
				
			||||||
func (f *FIFO) Resync() error {
 | 
					func (f *FIFO) Resync() error {
 | 
				
			||||||
	f.lock.Lock()
 | 
						f.lock.Lock()
 | 
				
			||||||
	defer f.lock.Unlock()
 | 
						defer f.lock.Unlock()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -36,9 +36,12 @@ func init() {
 | 
				
			|||||||
	mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
 | 
						mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// MutationDetector is able to monitor if the object be modified outside.
 | 
					// MutationDetector is able to monitor objects for mutation within a limited window of time
 | 
				
			||||||
type MutationDetector interface {
 | 
					type MutationDetector interface {
 | 
				
			||||||
 | 
						// AddObject adds the given object to the set being monitored for a while from now
 | 
				
			||||||
	AddObject(obj interface{})
 | 
						AddObject(obj interface{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Run starts the monitoring and does not return until the monitoring is stopped.
 | 
				
			||||||
	Run(stopCh <-chan struct{})
 | 
						Run(stopCh <-chan struct{})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -159,21 +159,21 @@ type SharedIndexInformer interface {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewSharedInformer creates a new instance for the listwatcher.
 | 
					// NewSharedInformer creates a new instance for the listwatcher.
 | 
				
			||||||
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
 | 
					func NewSharedInformer(lw ListerWatcher, objOfWatchedType runtime.Object, resyncPeriod time.Duration) SharedInformer {
 | 
				
			||||||
	return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
 | 
						return NewSharedIndexInformer(lw, objOfWatchedType, resyncPeriod, Indexers{})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewSharedIndexInformer creates a new instance for the listwatcher.
 | 
					// NewSharedIndexInformer creates a new instance for the listwatcher.
 | 
				
			||||||
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
 | 
					func NewSharedIndexInformer(lw ListerWatcher, objOfWatchedType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
 | 
				
			||||||
	realClock := &clock.RealClock{}
 | 
						realClock := &clock.RealClock{}
 | 
				
			||||||
	sharedIndexInformer := &sharedIndexInformer{
 | 
						sharedIndexInformer := &sharedIndexInformer{
 | 
				
			||||||
		processor:                       &sharedProcessor{clock: realClock},
 | 
							processor:                       &sharedProcessor{clock: realClock},
 | 
				
			||||||
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
 | 
							indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
 | 
				
			||||||
		listerWatcher:                   lw,
 | 
							listerWatcher:                   lw,
 | 
				
			||||||
		objectType:                      objType,
 | 
							objectType:                      objOfWatchedType,
 | 
				
			||||||
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
 | 
							resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
 | 
				
			||||||
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
 | 
							defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
 | 
				
			||||||
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
 | 
							cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objOfWatchedType)),
 | 
				
			||||||
		clock:                           realClock,
 | 
							clock:                           realClock,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return sharedIndexInformer
 | 
						return sharedIndexInformer
 | 
				
			||||||
@@ -235,7 +235,6 @@ type sharedIndexInformer struct {
 | 
				
			|||||||
	processor             *sharedProcessor
 | 
						processor             *sharedProcessor
 | 
				
			||||||
	cacheMutationDetector MutationDetector
 | 
						cacheMutationDetector MutationDetector
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// This block is tracked to handle late initialization of the controller
 | 
					 | 
				
			||||||
	listerWatcher ListerWatcher
 | 
						listerWatcher ListerWatcher
 | 
				
			||||||
	objectType    runtime.Object
 | 
						objectType    runtime.Object
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,27 +23,52 @@ import (
 | 
				
			|||||||
	"k8s.io/apimachinery/pkg/api/meta"
 | 
						"k8s.io/apimachinery/pkg/api/meta"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Store is a generic object storage interface. Reflector knows how to watch a server
 | 
					// Store is a generic object storage and processing interface.  A
 | 
				
			||||||
// and update a store. A generic store is provided, which allows Reflector to be used
 | 
					// Store holds a map from string keys to accumulators, and has
 | 
				
			||||||
// as a local caching system, and an LRU store, which allows Reflector to work like a
 | 
					// operations to add, update, and delete a given object to/from the
 | 
				
			||||||
// queue of items yet to be processed.
 | 
					// accumulator currently associated with a given key.  A Store also
 | 
				
			||||||
 | 
					// knows how to extract the key from a given object, so many operations
 | 
				
			||||||
 | 
					// are given only the object.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// Store makes no assumptions about stored object identity; it is the responsibility
 | 
					// In the simplest Store implementations each accumulator is simply
 | 
				
			||||||
// of a Store implementation to provide a mechanism to correctly key objects and to
 | 
					// the last given object and thus the Store's behavior is simple
 | 
				
			||||||
// define the contract for obtaining objects by some arbitrary key type.
 | 
					// storage.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Reflector knows how to watch a server and update a Store.  This
 | 
				
			||||||
 | 
					// package provides a variety of implementations of Store.
 | 
				
			||||||
type Store interface {
 | 
					type Store interface {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Add adds the given object to the accumulator associated with the given object's key
 | 
				
			||||||
	Add(obj interface{}) error
 | 
						Add(obj interface{}) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Update updates the given object in the accumulator associated with the given object's key
 | 
				
			||||||
	Update(obj interface{}) error
 | 
						Update(obj interface{}) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Delete deletes the given object from the accumulator associated with the given object's key
 | 
				
			||||||
	Delete(obj interface{}) error
 | 
						Delete(obj interface{}) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// List returns a list of all the currently non-empty accumulators
 | 
				
			||||||
	List() []interface{}
 | 
						List() []interface{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ListKeys returns a list of all the keys currently associated with non-empty accumulators
 | 
				
			||||||
	ListKeys() []string
 | 
						ListKeys() []string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Get returns the accumulator associated with the given object's key
 | 
				
			||||||
	Get(obj interface{}) (item interface{}, exists bool, err error)
 | 
						Get(obj interface{}) (item interface{}, exists bool, err error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// GetByKey returns the accumulator associated with the given key
 | 
				
			||||||
	GetByKey(key string) (item interface{}, exists bool, err error)
 | 
						GetByKey(key string) (item interface{}, exists bool, err error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Replace will delete the contents of the store, using instead the
 | 
						// Replace will delete the contents of the store, using instead the
 | 
				
			||||||
	// given list. Store takes ownership of the list, you should not reference
 | 
						// given list. Store takes ownership of the list, you should not reference
 | 
				
			||||||
	// it after calling this function.
 | 
						// it after calling this function.
 | 
				
			||||||
	Replace([]interface{}, string) error
 | 
						Replace([]interface{}, string) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Resync is meaningless in the terms appearing here but has
 | 
				
			||||||
 | 
						// meaning in some implementations that have non-trivial
 | 
				
			||||||
 | 
						// additional behavior.  In general the idea is to tee up the
 | 
				
			||||||
 | 
						// current non-empty accumulators or their keys for
 | 
				
			||||||
 | 
						// reconsideration (whatever that means).
 | 
				
			||||||
	Resync() error
 | 
						Resync() error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -131,8 +131,8 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Index returns a list of items that match on the index function
 | 
					// Index returns a list of items that match the given object on the index function.
 | 
				
			||||||
// Index is thread-safe so long as you treat all items as immutable
 | 
					// Index is thread-safe so long as you treat all items as immutable.
 | 
				
			||||||
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
 | 
					func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
 | 
				
			||||||
	c.lock.RLock()
 | 
						c.lock.RLock()
 | 
				
			||||||
	defer c.lock.RUnlock()
 | 
						defer c.lock.RUnlock()
 | 
				
			||||||
@@ -142,37 +142,37 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
 | 
				
			|||||||
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
 | 
							return nil, fmt.Errorf("Index with name %s does not exist", indexName)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	indexKeys, err := indexFunc(obj)
 | 
						indexedValues, err := indexFunc(obj)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	index := c.indices[indexName]
 | 
						index := c.indices[indexName]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var returnKeySet sets.String
 | 
						var storeKeySet sets.String
 | 
				
			||||||
	if len(indexKeys) == 1 {
 | 
						if len(indexedValues) == 1 {
 | 
				
			||||||
		// In majority of cases, there is exactly one value matching.
 | 
							// In majority of cases, there is exactly one value matching.
 | 
				
			||||||
		// Optimize the most common path - deduping is not needed here.
 | 
							// Optimize the most common path - deduping is not needed here.
 | 
				
			||||||
		returnKeySet = index[indexKeys[0]]
 | 
							storeKeySet = index[indexedValues[0]]
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// Need to de-dupe the return list.
 | 
							// Need to de-dupe the return list.
 | 
				
			||||||
		// Since multiple keys are allowed, this can happen.
 | 
							// Since multiple keys are allowed, this can happen.
 | 
				
			||||||
		returnKeySet = sets.String{}
 | 
							storeKeySet = sets.String{}
 | 
				
			||||||
		for _, indexKey := range indexKeys {
 | 
							for _, indexedValue := range indexedValues {
 | 
				
			||||||
			for key := range index[indexKey] {
 | 
								for key := range index[indexedValue] {
 | 
				
			||||||
				returnKeySet.Insert(key)
 | 
									storeKeySet.Insert(key)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	list := make([]interface{}, 0, returnKeySet.Len())
 | 
						list := make([]interface{}, 0, storeKeySet.Len())
 | 
				
			||||||
	for absoluteKey := range returnKeySet {
 | 
						for storeKey := range storeKeySet {
 | 
				
			||||||
		list = append(list, c.items[absoluteKey])
 | 
							list = append(list, c.items[storeKey])
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return list, nil
 | 
						return list, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ByIndex returns a list of items that match an exact value on the index function
 | 
					// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
 | 
				
			||||||
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
 | 
					func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
 | 
				
			||||||
	c.lock.RLock()
 | 
						c.lock.RLock()
 | 
				
			||||||
	defer c.lock.RUnlock()
 | 
						defer c.lock.RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -183,7 +183,7 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	index := c.indices[indexName]
 | 
						index := c.indices[indexName]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	set := index[indexKey]
 | 
						set := index[indexedValue]
 | 
				
			||||||
	list := make([]interface{}, 0, set.Len())
 | 
						list := make([]interface{}, 0, set.Len())
 | 
				
			||||||
	for key := range set {
 | 
						for key := range set {
 | 
				
			||||||
		list = append(list, c.items[key])
 | 
							list = append(list, c.items[key])
 | 
				
			||||||
@@ -192,9 +192,9 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
 | 
				
			|||||||
	return list, nil
 | 
						return list, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// IndexKeys returns a list of keys that match on the index function.
 | 
					// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
 | 
				
			||||||
// IndexKeys is thread-safe so long as you treat all items as immutable.
 | 
					// IndexKeys is thread-safe so long as you treat all items as immutable.
 | 
				
			||||||
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
 | 
					func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
 | 
				
			||||||
	c.lock.RLock()
 | 
						c.lock.RLock()
 | 
				
			||||||
	defer c.lock.RUnlock()
 | 
						defer c.lock.RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -205,7 +205,7 @@ func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	index := c.indices[indexName]
 | 
						index := c.indices[indexName]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	set := index[indexKey]
 | 
						set := index[indexedValue]
 | 
				
			||||||
	return set.List(), nil
 | 
						return set.List(), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user