Revised comments about f.knownObjects and added tests for Replace
This commit is contained in:
		@@ -68,8 +68,14 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeltaFIFO is like FIFO, but allows the PopProcessFunc to process
 | 
			
		||||
// deletes.  The accumulator associated with a given object's key is a
 | 
			
		||||
// slice of Delta values for that object.
 | 
			
		||||
// deletes and adds Sync to the ways an object can be applied to an
 | 
			
		||||
// acumulator.  The accumulator associated with a given object's key
 | 
			
		||||
// is a Deltas, which is a slice of Delta values for that object.
 | 
			
		||||
// Applying an object to a Deltas means to append a Delta except when
 | 
			
		||||
// the potentially appended Delta is a Delete and the Deltas already
 | 
			
		||||
// ends with a Delete.  In that case the Deltas does not grow,
 | 
			
		||||
// although the terminal Delete will be replaced by the new Delete if
 | 
			
		||||
// the older Delete's object is a DeletedFinalStateUnknown.
 | 
			
		||||
//
 | 
			
		||||
// DeltaFIFO is a producer-consumer queue, where a Reflector is
 | 
			
		||||
// intended to be the producer, and the consumer is whatever calls
 | 
			
		||||
@@ -83,17 +89,14 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
 | 
			
		||||
//  * You might want to periodically reprocess objects.
 | 
			
		||||
//
 | 
			
		||||
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
 | 
			
		||||
// interface{} to satisfy the Store/Queue interfaces, but it
 | 
			
		||||
// interface{} to satisfy the Store/Queue interfaces, but they
 | 
			
		||||
// will always return an object of type Deltas.
 | 
			
		||||
//
 | 
			
		||||
// A DeltaFIFO's knownObjects KeyListerGetter provides get/list access
 | 
			
		||||
// to a set of "known objects" that is used for two purposes.  One is
 | 
			
		||||
// to conditionalize delete operations: it is only for a known object
 | 
			
		||||
// that a Delete Delta is recorded (this applies to both Delete and
 | 
			
		||||
// Replace).  The deleted object will be included in the
 | 
			
		||||
// DeleteFinalStateUnknown markers, and those objects could be stale.
 | 
			
		||||
// The other purpose is in the Resync operation, which adds a Sync
 | 
			
		||||
// Delta for every known object.
 | 
			
		||||
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
 | 
			
		||||
// to list Store keys and to get objects by Store key.  The objects in
 | 
			
		||||
// question are called "known objects" and this set of objects
 | 
			
		||||
// modifies the behavior of the Delete, Replace, and Resync methods
 | 
			
		||||
// (each in a different way).
 | 
			
		||||
//
 | 
			
		||||
// A note on threading: If you call Pop() in parallel from multiple
 | 
			
		||||
// threads, you could end up with multiple threads processing slightly
 | 
			
		||||
@@ -119,9 +122,8 @@ type DeltaFIFO struct {
 | 
			
		||||
	// insertion and retrieval, and should be deterministic.
 | 
			
		||||
	keyFunc KeyFunc
 | 
			
		||||
 | 
			
		||||
	// knownObjects list keys that are "known", for the
 | 
			
		||||
	// purpose of figuring out which items have been deleted
 | 
			
		||||
	// when Replace() or Delete() is called.
 | 
			
		||||
	// knownObjects list keys that are "known" --- affecting Delete(),
 | 
			
		||||
	// Replace(), and Resync()
 | 
			
		||||
	knownObjects KeyListerGetter
 | 
			
		||||
 | 
			
		||||
	// Indication the queue is closed.
 | 
			
		||||
@@ -190,9 +192,11 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
 | 
			
		||||
	return f.queueActionLocked(Updated, obj)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Delete is just like Add, but makes an Deleted Delta. If the item does not
 | 
			
		||||
// already exist, it will be ignored. (It may have already been deleted by a
 | 
			
		||||
// Replace (re-list), for example.)
 | 
			
		||||
// Delete is just like Add, but makes a Deleted Delta. If the given
 | 
			
		||||
// object does not already exist, it will be ignored. (It may have
 | 
			
		||||
// already been deleted by a Replace (re-list), for example.)  In this
 | 
			
		||||
// method `f.knownObjects`, if not nil, provides (via GetByKey)
 | 
			
		||||
// _additional_ objects that are considered to already exist.
 | 
			
		||||
func (f *DeltaFIFO) Delete(obj interface{}) error {
 | 
			
		||||
	id, err := f.KeyOf(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -438,10 +442,15 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Replace will delete the contents of 'f', using instead the given map.
 | 
			
		||||
// 'f' takes ownership of the map, you should not reference the map again
 | 
			
		||||
// after calling this function. f's queue is reset, too; upon return, it
 | 
			
		||||
// will contain the items in the map, in no particular order.
 | 
			
		||||
// Replace atomically adds the given objects using the Sync type of
 | 
			
		||||
// Delta and does some deletions.  In particular: for every
 | 
			
		||||
// pre-existing key K that is not the key of an object in `list` there
 | 
			
		||||
// is the effect of `Delete(DeletedFinalStateUnknown{K, O})` where O
 | 
			
		||||
// is current object of K.  If `f.knownObjects == nil` then the
 | 
			
		||||
// pre-existing keys are those in `f.items` and the current object of
 | 
			
		||||
// K is the `.Newest()` of the Deltas associated with K.  Otherwise
 | 
			
		||||
// the pre-existing keys are those listed by `f.knownObjects` and the
 | 
			
		||||
// current object of K is what `f.knownObjects.GetByKey(K)` returns.
 | 
			
		||||
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
 | 
			
		||||
	f.lock.Lock()
 | 
			
		||||
	defer f.lock.Unlock()
 | 
			
		||||
@@ -515,7 +524,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Resync will send a sync event for each item
 | 
			
		||||
// Resync adds, with a Sync type of Delta, every object listed by
 | 
			
		||||
// `f.knownObjects` whose key is not already queued for processing.
 | 
			
		||||
// If `f.knownObjects` is `nil` then Resync does nothing.
 | 
			
		||||
func (f *DeltaFIFO) Resync() error {
 | 
			
		||||
	f.lock.Lock()
 | 
			
		||||
	defer f.lock.Unlock()
 | 
			
		||||
 
 | 
			
		||||
@@ -28,11 +28,15 @@ func testPop(f *DeltaFIFO) testFifoObject {
 | 
			
		||||
	return Pop(f).(Deltas).Newest().Object.(testFifoObject)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// keyLookupFunc adapts a raw function to be a KeyLookup.
 | 
			
		||||
type keyLookupFunc func() []testFifoObject
 | 
			
		||||
// literalListerGetter is a KeyListerGetter that is based on a
 | 
			
		||||
// function that returns a slice of objects to list and get.
 | 
			
		||||
// The function must list the same objects every time.
 | 
			
		||||
type literalListerGetter func() []testFifoObject
 | 
			
		||||
 | 
			
		||||
var _ KeyListerGetter = literalListerGetter(nil)
 | 
			
		||||
 | 
			
		||||
// ListKeys just calls kl.
 | 
			
		||||
func (kl keyLookupFunc) ListKeys() []string {
 | 
			
		||||
func (kl literalListerGetter) ListKeys() []string {
 | 
			
		||||
	result := []string{}
 | 
			
		||||
	for _, fifoObj := range kl() {
 | 
			
		||||
		result = append(result, fifoObj.name)
 | 
			
		||||
@@ -41,7 +45,7 @@ func (kl keyLookupFunc) ListKeys() []string {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetByKey returns the key if it exists in the list returned by kl.
 | 
			
		||||
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
 | 
			
		||||
func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) {
 | 
			
		||||
	for _, v := range kl() {
 | 
			
		||||
		if v.name == key {
 | 
			
		||||
			return v, true, nil
 | 
			
		||||
@@ -95,7 +99,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
 | 
			
		||||
	oldObj := mkFifoObj("foo", 1)
 | 
			
		||||
	newObj := mkFifoObj("foo", 2)
 | 
			
		||||
 | 
			
		||||
	f := NewDeltaFIFO(testFifoObjectKeyFunc, keyLookupFunc(func() []testFifoObject {
 | 
			
		||||
	f := NewDeltaFIFO(testFifoObjectKeyFunc, literalListerGetter(func() []testFifoObject {
 | 
			
		||||
		return []testFifoObject{oldObj}
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
@@ -218,7 +222,7 @@ func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
 | 
			
		||||
func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
 | 
			
		||||
	f := NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		keyLookupFunc(func() []testFifoObject {
 | 
			
		||||
		literalListerGetter(func() []testFifoObject {
 | 
			
		||||
			return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
@@ -268,7 +272,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
 | 
			
		||||
func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
 | 
			
		||||
	f := NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		keyLookupFunc(func() []testFifoObject {
 | 
			
		||||
		literalListerGetter(func() []testFifoObject {
 | 
			
		||||
			return []testFifoObject{mkFifoObj("foo", 5)}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
@@ -287,7 +291,7 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
 | 
			
		||||
func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
 | 
			
		||||
	f := NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		keyLookupFunc(func() []testFifoObject {
 | 
			
		||||
		literalListerGetter(func() []testFifoObject {
 | 
			
		||||
			return []testFifoObject{}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
@@ -304,9 +308,13 @@ func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
 | 
			
		||||
	// We test with only one pre-existing object because there is no
 | 
			
		||||
	// promise about how their deletes are ordered.
 | 
			
		||||
 | 
			
		||||
	// Try it with a pre-existing Delete
 | 
			
		||||
	f := NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		keyLookupFunc(func() []testFifoObject {
 | 
			
		||||
		literalListerGetter(func() []testFifoObject {
 | 
			
		||||
			return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
@@ -327,12 +335,57 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
 | 
			
		||||
			t.Errorf("Expected %#v, got %#v", e, a)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Now try starting with an Add instead of a Delete
 | 
			
		||||
	f = NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		literalListerGetter(func() []testFifoObject {
 | 
			
		||||
			return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
	f.Add(mkFifoObj("baz", 10))
 | 
			
		||||
	f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
 | 
			
		||||
 | 
			
		||||
	expectedList := []Deltas{
 | 
			
		||||
		{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
 | 
			
		||||
		{{Sync, mkFifoObj("foo", 5)}},
 | 
			
		||||
		// Since "bar" didn't have a delete event and wasn't in the Replace list
 | 
			
		||||
		// it should get a tombstone key with the right Obj.
 | 
			
		||||
		{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, expected := range expectedList {
 | 
			
		||||
		cur := Pop(f).(Deltas)
 | 
			
		||||
		if e, a := expected, cur; !reflect.DeepEqual(e, a) {
 | 
			
		||||
			t.Errorf("Expected %#v, got %#v", e, a)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Now try starting without an explicit KeyListerGetter
 | 
			
		||||
	f = NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		nil,
 | 
			
		||||
	)
 | 
			
		||||
	f.Add(mkFifoObj("baz", 10))
 | 
			
		||||
	f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
 | 
			
		||||
 | 
			
		||||
	expectedList := []Deltas{
 | 
			
		||||
		{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
 | 
			
		||||
		{{Sync, mkFifoObj("foo", 5)}},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, expected := range expectedList {
 | 
			
		||||
		cur := Pop(f).(Deltas)
 | 
			
		||||
		if e, a := expected, cur; !reflect.DeepEqual(e, a) {
 | 
			
		||||
			t.Errorf("Expected %#v, got %#v", e, a)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
 | 
			
		||||
	f := NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		keyLookupFunc(func() []testFifoObject {
 | 
			
		||||
		literalListerGetter(func() []testFifoObject {
 | 
			
		||||
			return []testFifoObject{mkFifoObj("foo", 5)}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
@@ -354,7 +407,7 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
 | 
			
		||||
func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
 | 
			
		||||
	f := NewDeltaFIFO(
 | 
			
		||||
		testFifoObjectKeyFunc,
 | 
			
		||||
		keyLookupFunc(func() []testFifoObject {
 | 
			
		||||
		literalListerGetter(func() []testFifoObject {
 | 
			
		||||
			return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
 
 | 
			
		||||
@@ -44,7 +44,7 @@ func (e ErrRequeue) Error() string {
 | 
			
		||||
	return e.Err.Error()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Queue extends Store with a collection of keys to "process".
 | 
			
		||||
// Queue extends Store with a collection of Store keys to "process".
 | 
			
		||||
// Every Add, Update, or Delete may put the object's key in that collection.
 | 
			
		||||
// A Queue has a way to derive the corresponding key given an accumulator.
 | 
			
		||||
// A Queue can be accessed concurrently from multiple goroutines.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user