Support namespacing in cache.Store implementations
Support namespacing in cache.Store by framing the interface functions around interface{} and providing a key function to each Store implementation. Implementation of a fix for #2294.
This commit is contained in:
68
pkg/client/cache/fifo.go
vendored
68
pkg/client/cache/fifo.go
vendored
@@ -17,9 +17,8 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
|
||||
@@ -33,11 +32,18 @@ type FIFO struct {
|
||||
// We depend on the property that items in the set are in the queue and vice versa.
|
||||
items map[string]interface{}
|
||||
queue []string
|
||||
// keyFunc is used to make the key used for queued item insertion and retrieval, and
|
||||
// should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
}
|
||||
|
||||
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
||||
// if it doesn't already exist in the set.
|
||||
func (f *FIFO) Add(id string, obj interface{}) {
|
||||
func (f *FIFO) Add(obj interface{}) error {
|
||||
id, err := f.keyFunc(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create key for object: %v", err)
|
||||
}
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
if _, exists := f.items[id]; !exists {
|
||||
@@ -45,20 +51,26 @@ func (f *FIFO) Add(id string, obj interface{}) {
|
||||
}
|
||||
f.items[id] = obj
|
||||
f.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update is the same as Add in this implementation.
|
||||
func (f *FIFO) Update(id string, obj interface{}) {
|
||||
f.Add(id, obj)
|
||||
func (f *FIFO) Update(obj interface{}) error {
|
||||
return f.Add(obj)
|
||||
}
|
||||
|
||||
// Delete removes an item. It doesn't add it to the queue, because
|
||||
// this implementation assumes the consumer only cares about the objects,
|
||||
// not the order in which they were created/added.
|
||||
func (f *FIFO) Delete(id string) {
|
||||
func (f *FIFO) Delete(obj interface{}) error {
|
||||
id, err := f.keyFunc(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create key for object: %v", err)
|
||||
}
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
delete(f.items, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// List returns a list of all the items.
|
||||
@@ -72,25 +84,16 @@ func (f *FIFO) List() []interface{} {
|
||||
return list
|
||||
}
|
||||
|
||||
// ContainedIDs returns a util.StringSet containing all IDs of the stored items.
|
||||
// This is a snapshot of a moment in time, and one should keep in mind that
|
||||
// other go routines can add or remove items after you call this.
|
||||
func (c *FIFO) ContainedIDs() util.StringSet {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
set := util.StringSet{}
|
||||
for id := range c.items {
|
||||
set.Insert(id)
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
||||
// Get returns the requested item, or sets exists=false.
|
||||
func (f *FIFO) Get(id string) (item interface{}, exists bool) {
|
||||
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||
id, err := f.keyFunc(obj)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("couldn't create key for object: %v", err)
|
||||
}
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
item, exists = f.items[id]
|
||||
return item, exists
|
||||
return item, exists, nil
|
||||
}
|
||||
|
||||
// Pop waits until an item is ready and returns it. If multiple items are
|
||||
@@ -120,25 +123,36 @@ func (f *FIFO) Pop() interface{} {
|
||||
// 'f' takes ownersip of the map, you should not reference the map again
|
||||
// after calling this function. f's queue is reset, too; upon return, it
|
||||
// will contain the items in the map, in no particular order.
|
||||
func (f *FIFO) Replace(idToObj map[string]interface{}) {
|
||||
func (f *FIFO) Replace(list []interface{}) error {
|
||||
items := map[string]interface{}{}
|
||||
for _, item := range list {
|
||||
key, err := f.keyFunc(item)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create key for object: %v", err)
|
||||
}
|
||||
items[key] = item
|
||||
}
|
||||
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.items = idToObj
|
||||
f.items = items
|
||||
f.queue = f.queue[:0]
|
||||
for id := range idToObj {
|
||||
for id := range items {
|
||||
f.queue = append(f.queue, id)
|
||||
}
|
||||
if len(f.queue) > 0 {
|
||||
f.cond.Broadcast()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewFIFO returns a Store which can be used to queue up items to
|
||||
// process.
|
||||
func NewFIFO() *FIFO {
|
||||
func NewFIFO(keyFunc KeyFunc) *FIFO {
|
||||
f := &FIFO{
|
||||
items: map[string]interface{}{},
|
||||
queue: []string{},
|
||||
items: map[string]interface{}{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFunc,
|
||||
}
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
|
Reference in New Issue
Block a user