From 819554f514ab697db982fae355ef93180ef1e82c Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Wed, 15 Feb 2017 16:55:02 +0800 Subject: [PATCH] Update equivalence cache to use predicate as key Remove Invalid field from host predicate --- .../algorithmprovider/defaults/defaults.go | 11 +- .../pkg/scheduler/core/equivalence_cache.go | 197 ++++++++++++------ .../scheduler/core/equivalence_cache_test.go | 131 ++++++++++++ 3 files changed, 275 insertions(+), 64 deletions(-) create mode 100644 plugin/pkg/scheduler/core/equivalence_cache_test.go diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 6b1f6550936..5acb2f6b66c 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -247,22 +247,21 @@ func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.Strin // GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused. func GetEquivalencePod(pod *v1.Pod) interface{} { - equivalencePod := EquivalencePod{} // For now we only consider pods: // 1. OwnerReferences is Controller - // 2. OwnerReferences kind is in valid controller kinds - // 3. with same OwnerReferences + // 2. with same OwnerReferences // to be equivalent if len(pod.OwnerReferences) != 0 { for _, ref := range pod.OwnerReferences { if *ref.Controller { - equivalencePod.ControllerRef = ref // a pod can only belongs to one controller - break + return &EquivalencePod{ + ControllerRef: ref, + } } } } - return &equivalencePod + return nil } // EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. diff --git a/plugin/pkg/scheduler/core/equivalence_cache.go b/plugin/pkg/scheduler/core/equivalence_cache.go index 7e4ebfd660f..27e43b1f9c6 100644 --- a/plugin/pkg/scheduler/core/equivalence_cache.go +++ b/plugin/pkg/scheduler/core/equivalence_cache.go @@ -18,18 +18,19 @@ package core import ( "hash/fnv" - - "github.com/golang/groupcache/lru" - "sync" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api/v1" hashutil "k8s.io/kubernetes/pkg/util/hash" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + + "github.com/golang/glog" + "github.com/golang/groupcache/lru" ) -// TODO(harryz) figure out the right number for this, 4096 may be too big -const maxCacheEntries = 4096 +// we use predicate names as cache's key, its count is limited +const maxCacheEntries = 100 type HostPredicate struct { Fit bool @@ -41,6 +42,9 @@ type AlgorithmCache struct { predicatesCache *lru.Cache } +// PredicateMap use equivalence hash as key +type PredicateMap map[uint64]HostPredicate + func newAlgorithmCache() AlgorithmCache { return AlgorithmCache{ predicatesCache: lru.New(maxCacheEntries), @@ -61,74 +65,151 @@ func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) } } -// addPodPredicate adds pod predicate for equivalence class -func (ec *EquivalenceCache) addPodPredicate(podKey uint64, nodeName string, fit bool, failReasons []algorithm.PredicateFailureReason) { +// UpdateCachedPredicateItem updates pod predicate for equivalence class +func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) { + ec.Lock() + defer ec.Unlock() if _, exist := ec.algorithmCache[nodeName]; !exist { ec.algorithmCache[nodeName] = newAlgorithmCache() } - ec.algorithmCache[nodeName].predicatesCache.Add(podKey, HostPredicate{Fit: fit, FailReasons: failReasons}) + predicateItem := HostPredicate{ + Fit: fit, + FailReasons: reasons, + } + // if cached predicate map already exists, just update the predicate by key + if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok { + predicateMap := v.(PredicateMap) + // maps in golang are references, no need to add them back + predicateMap[equivalenceHash] = predicateItem + } else { + ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey, + PredicateMap{ + equivalenceHash: predicateItem, + }) + } + glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, pod.GetName(), nodeName, predicateItem) } -// AddPodPredicatesCache cache pod predicate for equivalence class -func (ec *EquivalenceCache) AddPodPredicatesCache(pod *v1.Pod, fitNodeList []*v1.Node, failedPredicates *FailedPredicateMap) { - equivalenceHash := ec.hashEquivalencePod(pod) - - for _, fitNode := range fitNodeList { - ec.addPodPredicate(equivalenceHash, fitNode.Name, true, nil) - } - for failNodeName, failReasons := range *failedPredicates { - ec.addPodPredicate(equivalenceHash, failNodeName, false, failReasons) - } -} - -// GetCachedPredicates gets cached predicates for equivalence class -func (ec *EquivalenceCache) GetCachedPredicates(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, []*v1.Node) { - fitNodeList := []*v1.Node{} - failedPredicates := FailedPredicateMap{} - noCacheNodeList := []*v1.Node{} - equivalenceHash := ec.hashEquivalencePod(pod) - for _, node := range nodes { - findCache := false - if algorithmCache, exist := ec.algorithmCache[node.Name]; exist { - if cachePredicate, exist := algorithmCache.predicatesCache.Get(equivalenceHash); exist { - hostPredicate := cachePredicate.(HostPredicate) +// PredicateWithECache returns: +// 1. if fit +// 2. reasons if not fit +// 3. if this cache is invalid +// based on cached predicate results +func (ec *EquivalenceCache) PredicateWithECache(pod *v1.Pod, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) { + ec.RLock() + defer ec.RUnlock() + glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, pod.GetName(), nodeName) + if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { + if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { + predicateMap := cachePredicate.(PredicateMap) + // TODO(resouer) Is it possible a race that cache failed to update immediately? + if hostPredicate, ok := predicateMap[equivalenceHash]; ok { if hostPredicate.Fit { - fitNodeList = append(fitNodeList, node) + return true, []algorithm.PredicateFailureReason{}, false } else { - failedPredicates[node.Name] = hostPredicate.FailReasons + return false, hostPredicate.FailReasons, false } - findCache = true + } else { + // is invalid + return false, []algorithm.PredicateFailureReason{}, true } } - if !findCache { - noCacheNodeList = append(noCacheNodeList, node) + } + return false, []algorithm.PredicateFailureReason{}, true +} + +// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid +func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return + } + ec.Lock() + defer ec.Unlock() + if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { + for predicateKey := range predicateKeys { + algorithmCache.predicatesCache.Remove(predicateKey) } } - return fitNodeList, failedPredicates, noCacheNodeList + glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) } -// SendInvalidAlgorithmCacheReq marks AlgorithmCache item as invalid -func (ec *EquivalenceCache) SendInvalidAlgorithmCacheReq(nodeName string) { - ec.Lock() - defer ec.Unlock() - // clear the cache of this node - delete(ec.algorithmCache, nodeName) -} - -// SendClearAllCacheReq marks all cached item as invalid -func (ec *EquivalenceCache) SendClearAllCacheReq() { - ec.Lock() - defer ec.Unlock() - // clear cache of all nodes - for nodeName := range ec.algorithmCache { - delete(ec.algorithmCache, nodeName) +// InvalidateCachedPredicateItemOfAllNodes marks all items of given predicateKeys, of all pods, on all node as invalid +func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return } + ec.Lock() + defer ec.Unlock() + // algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates + for _, algorithmCache := range ec.algorithmCache { + for predicateKey := range predicateKeys { + // just use keys is enough + algorithmCache.predicatesCache.Remove(predicateKey) + } + } + glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) } -// hashEquivalencePod returns the hash of equivalence pod. -func (ec *EquivalenceCache) hashEquivalencePod(pod *v1.Pod) uint64 { - equivalencePod := ec.getEquivalencePod(pod) - hash := fnv.New32a() - hashutil.DeepHashObject(hash, equivalencePod) - return uint64(hash.Sum32()) +// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid +func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { + ec.Lock() + defer ec.Unlock() + delete(ec.algorithmCache, nodeName) + glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) +} + +// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod, on the given node as invalid +func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) { + if len(predicateKeys) == 0 { + return + } + equivalenceHash := ec.getHashEquivalencePod(pod) + if equivalenceHash == 0 { + // no equivalence pod found, just return + return + } + ec.Lock() + defer ec.Unlock() + if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { + for predicateKey := range predicateKeys { + if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { + // got the cached item of by predicateKey & pod + predicateMap := cachePredicate.(PredicateMap) + delete(predicateMap, equivalenceHash) + } + } + } + glog.V(5).Infof("Done invalidating cached predicates %v on node %s, for pod %v", predicateKeys, nodeName, pod.GetName()) +} + +// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case +func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { + // MatchInterPodAffinity: we assume scheduler can make sure newly binded pod + // will not break the existing inter pod affinity. So we does not need to invalidate + // MatchInterPodAffinity when pod added. + // + // But when a pod is deleted, existing inter pod affinity may become invalid. + // (e.g. this pod was preferred by some else, or vice versa) + // + // NOTE: assumptions above will not stand when we implemented features like + // RequiredDuringSchedulingRequiredDuringExecution. + + // NoDiskConflict: the newly scheduled pod fits to existing pods on this node, + // it will also fits to equivalence class of existing pods + + // GeneralPredicates: will always be affected by adding a new pod + invalidPredicates := sets.NewString("GeneralPredicates") + ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) +} + +// getHashEquivalencePod returns the hash of equivalence pod. +// if no equivalence pod found, return 0 +func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) uint64 { + equivalencePod := ec.getEquivalencePod(pod) + if equivalencePod != nil { + hash := fnv.New32a() + hashutil.DeepHashObject(hash, equivalencePod) + return uint64(hash.Sum32()) + } + return 0 } diff --git a/plugin/pkg/scheduler/core/equivalence_cache_test.go b/plugin/pkg/scheduler/core/equivalence_cache_test.go new file mode 100644 index 00000000000..f19b7ce7080 --- /dev/null +++ b/plugin/pkg/scheduler/core/equivalence_cache_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "reflect" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" +) + +func TestUpdateCachedPredicateItem(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + predicateKey string + nodeName string + fit bool + reasons []algorithm.PredicateFailureReason + equivalenceHash uint64 + expectCacheItem HostPredicate + }{ + { + name: "test 1", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + predicateKey: "GeneralPredicates", + nodeName: "node1", + fit: true, + equivalenceHash: 123, + expectCacheItem: HostPredicate{ + Fit: true, + }, + }, + } + for _, test := range tests { + // this case does not need to calculate equivalence hash, just pass an empty function + fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil } + ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc) + ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash) + + value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey) + if !ok { + t.Errorf("Failed : %s, can't find expected cache item: %v", test.name, test.expectCacheItem) + } else { + cachedMapItem := value.(PredicateMap) + if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) { + t.Errorf("Failed : %s, expected cached item: %v, but got: %v", test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash]) + } + } + } +} + +type predicateItemType struct { + fit bool + reasons []algorithm.PredicateFailureReason +} + +func TestInvalidateCachedPredicateItem(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + nodeName string + predicateKey string + equivalenceHash uint64 + cachedItem predicateItemType + expectedInvalid bool + expectedPredicateItem predicateItemType + }{ + { + name: "test 1", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + nodeName: "node1", + equivalenceHash: 123, + predicateKey: "GeneralPredicates", + cachedItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, + }, + expectedInvalid: true, + expectedPredicateItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{}, + }, + }, + } + + for _, test := range tests { + // this case does not need to calculate equivalence hash, just pass an empty function + fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil } + ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc) + // set cached item to equivalence cache + ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHash) + // if we want to do invalid, invalid the cached item + if test.expectedInvalid { + predicateKeys := sets.NewString() + predicateKeys.Insert(test.predicateKey) + ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) + } + // calculate predicate with equivalence cache + fit, reasons, invalid := ecache.PredicateWithECache(test.pod, test.nodeName, test.predicateKey, test.equivalenceHash) + // returned invalid should match expectedInvalid + if invalid != test.expectedInvalid { + t.Errorf("Failed : %s, expected invalid: %v, but got: %v", test.name, test.expectedInvalid, invalid) + } + // returned predicate result should match expected predicate item + if fit != test.expectedPredicateItem.fit { + t.Errorf("Failed : %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit) + } + if !reflect.DeepEqual(reasons, test.expectedPredicateItem.reasons) { + t.Errorf("Failed : %s, expected reasons: %v, but got: %v", test.name, test.cachedItem.reasons, reasons) + } + } +}