Merge pull request #59871 from wojtek-t/cache_fields_and_labels
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Store labels and fields with object We are already computing labels and fields before putting objects in watchcache. And my tests show this is `PodToSelectableFields` is responsible for ~10% of memory allocations. This PR is supposed to fix that - let's double check by running kubemark-big on it.
This commit is contained in:
		| @@ -130,7 +130,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type watchFilterFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool | ||||
| type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool | ||||
|  | ||||
| // Cacher is responsible for serving WATCH and LIST requests for a given | ||||
| // resource from its internal cache and updating its cache in the background | ||||
| @@ -337,7 +337,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, | ||||
| 	c.Lock() | ||||
| 	defer c.Unlock() | ||||
| 	forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) | ||||
| 	watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner) | ||||
| 	watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner) | ||||
|  | ||||
| 	c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) | ||||
| 	c.watcherIdx++ | ||||
| @@ -439,7 +439,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri | ||||
| 	if err != nil || listVal.Kind() != reflect.Slice { | ||||
| 		return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) | ||||
| 	} | ||||
| 	filter := filterFunction(key, pred) | ||||
| 	filter := filterWithAttrsFunction(key, pred) | ||||
|  | ||||
| 	obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) | ||||
| 	if err != nil { | ||||
| @@ -452,7 +452,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri | ||||
| 		if !ok { | ||||
| 			return fmt.Errorf("non *storeElement returned from storage: %v", obj) | ||||
| 		} | ||||
| 		if filter(elem.Key, elem.Object) { | ||||
| 		if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) { | ||||
| 			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) | ||||
| 		} | ||||
| 	} | ||||
| @@ -508,7 +508,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p | ||||
| 	if err != nil || listVal.Kind() != reflect.Slice { | ||||
| 		return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) | ||||
| 	} | ||||
| 	filter := filterFunction(key, pred) | ||||
| 	filter := filterWithAttrsFunction(key, pred) | ||||
|  | ||||
| 	objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) | ||||
| 	if err != nil { | ||||
| @@ -526,7 +526,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p | ||||
| 		if !ok { | ||||
| 			return fmt.Errorf("non *storeElement returned from storage: %v", obj) | ||||
| 		} | ||||
| 		if filter(elem.Key, elem.Object) { | ||||
| 		if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) { | ||||
| 			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) | ||||
| 		} | ||||
| 	} | ||||
| @@ -680,22 +680,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool { | ||||
| 	filterFunc := func(objKey string, obj runtime.Object) bool { | ||||
| 		if !hasPathPrefix(objKey, key) { | ||||
| 			return false | ||||
| 		} | ||||
| 		matches, err := p.Matches(obj) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err) | ||||
| 			return false | ||||
| 		} | ||||
| 		return matches | ||||
| 	} | ||||
| 	return filterFunc | ||||
| } | ||||
|  | ||||
| func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc { | ||||
| func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc { | ||||
| 	filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool { | ||||
| 		if !hasPathPrefix(objKey, key) { | ||||
| 			return false | ||||
| @@ -788,13 +773,13 @@ type cacheWatcher struct { | ||||
| 	input     chan *watchCacheEvent | ||||
| 	result    chan watch.Event | ||||
| 	done      chan struct{} | ||||
| 	filter    watchFilterFunc | ||||
| 	filter    filterWithAttrsFunc | ||||
| 	stopped   bool | ||||
| 	forget    func(bool) | ||||
| 	versioner Versioner | ||||
| } | ||||
|  | ||||
| func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher { | ||||
| func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher { | ||||
| 	watcher := &cacheWatcher{ | ||||
| 		input:     make(chan *watchCacheEvent, chanSize), | ||||
| 		result:    make(chan watch.Event, chanSize), | ||||
|   | ||||
| @@ -61,12 +61,16 @@ type watchCacheEvent struct { | ||||
| } | ||||
|  | ||||
| // Computing a key of an object is generally non-trivial (it performs | ||||
| // e.g. validation underneath). To avoid computing it multiple times | ||||
| // (to serve the event in different List/Watch requests), in the | ||||
| // underlying store we are keeping pair (key, object). | ||||
| // e.g. validation underneath). Similarly computing object fields and | ||||
| // labels. To avoid computing them multiple times (to serve the event | ||||
| // in different List/Watch requests), in the underlying store we are | ||||
| // keeping structs (key, object, labels, fields, uninitialized). | ||||
| type storeElement struct { | ||||
| 	Key    string | ||||
| 	Object runtime.Object | ||||
| 	Key           string | ||||
| 	Object        runtime.Object | ||||
| 	Labels        labels.Set | ||||
| 	Fields        fields.Set | ||||
| 	Uninitialized bool | ||||
| } | ||||
|  | ||||
| func storeElementKey(obj interface{}) (string, error) { | ||||
| @@ -220,6 +224,20 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd | ||||
| 		return fmt.Errorf("couldn't compute key: %v", err) | ||||
| 	} | ||||
| 	elem := &storeElement{Key: key, Object: event.Object} | ||||
| 	elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	watchCacheEvent := &watchCacheEvent{ | ||||
| 		Type:             event.Type, | ||||
| 		Object:           elem.Object, | ||||
| 		ObjLabels:        elem.Labels, | ||||
| 		ObjFields:        elem.Fields, | ||||
| 		ObjUninitialized: elem.Uninitialized, | ||||
| 		Key:              key, | ||||
| 		ResourceVersion:  resourceVersion, | ||||
| 	} | ||||
|  | ||||
| 	// TODO: We should consider moving this lock below after the watchCacheEvent | ||||
| 	// is created. In such situation, the only problematic scenario is Replace( | ||||
| @@ -231,34 +249,14 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	objLabels, objFields, objUninitialized, err := w.getAttrsFunc(event.Object) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	var prevObject runtime.Object | ||||
| 	var prevObjLabels labels.Set | ||||
| 	var prevObjFields fields.Set | ||||
| 	var prevObjUninitialized bool | ||||
| 	if exists { | ||||
| 		prevObject = previous.(*storeElement).Object | ||||
| 		prevObjLabels, prevObjFields, prevObjUninitialized, err = w.getAttrsFunc(prevObject) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	watchCacheEvent := &watchCacheEvent{ | ||||
| 		Type:                 event.Type, | ||||
| 		Object:               event.Object, | ||||
| 		ObjLabels:            objLabels, | ||||
| 		ObjFields:            objFields, | ||||
| 		ObjUninitialized:     objUninitialized, | ||||
| 		PrevObject:           prevObject, | ||||
| 		PrevObjLabels:        prevObjLabels, | ||||
| 		PrevObjFields:        prevObjFields, | ||||
| 		PrevObjUninitialized: prevObjUninitialized, | ||||
| 		Key:                  key, | ||||
| 		ResourceVersion:      resourceVersion, | ||||
| 		previousElem := previous.(*storeElement) | ||||
| 		watchCacheEvent.PrevObject = previousElem.Object | ||||
| 		watchCacheEvent.PrevObjLabels = previousElem.Labels | ||||
| 		watchCacheEvent.PrevObjFields = previousElem.Fields | ||||
| 		watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized | ||||
| 	} | ||||
|  | ||||
| 	if w.onEvent != nil { | ||||
| 		w.onEvent(watchCacheEvent) | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue