Merge pull request #63975 from godliness/lock-optimization

Automatic merge from submit-queue (batch tested with PRs 63434, 64172, 63975, 64180, 63755). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Optimize the lock which in the RunPredicate

**What this PR does / why we need it**:


Enhance the performance of scheduler

-  Change the lock in the RunPredicate from lock to rlock




**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:

Could solve part of #63784 

**Special notes for your reviewer**:

_Run benchmark test by scheduler_perf_:
`Before`  BenchmarkScheduling/1000Nodes/0Pods-32     1000    11689758 ns/op
`After`     BenchmarkScheduling/1000Nodes/0Pods-32     1000    5951510 ns/op

_Run integration (density) test by scheduler_perf_:
Schedule 3000 Pods On 3000 Nodes
`Before`  rate  19 per second on average
`After`     rate  58 per second on average

_Cpu profile test result_:
`Before`  [click](https://cdn.rawgit.com/godliness/files/master/63784_before.svg)
`After`     [click](https://cdn.rawgit.com/godliness/files/master/63784_after.svg)

**Release note**:

```release-note
`None`
```

/sig scheduling

/cc @misterikkit 
/cc @bsalamat
/cc @ravisantoshgudimetla 
/cc @resouer
This commit is contained in:
Kubernetes Submit Queue
2018-05-24 12:18:17 -07:00
committed by GitHub
2 changed files with 53 additions and 27 deletions

View File

@@ -35,7 +35,7 @@ import (
// 1. a map of AlgorithmCache with node name as key // 1. a map of AlgorithmCache with node name as key
// 2. function to get equivalence pod // 2. function to get equivalence pod
type EquivalenceCache struct { type EquivalenceCache struct {
mu sync.Mutex mu sync.RWMutex
algorithmCache map[string]AlgorithmCache algorithmCache map[string]AlgorithmCache
} }
@@ -72,9 +72,6 @@ func (ec *EquivalenceCache) RunPredicate(
equivClassInfo *equivalenceClassInfo, equivClassInfo *equivalenceClassInfo,
cache schedulercache.Cache, cache schedulercache.Cache,
) (bool, []algorithm.PredicateFailureReason, error) { ) (bool, []algorithm.PredicateFailureReason, error) {
ec.mu.Lock()
defer ec.mu.Unlock()
if nodeInfo == nil || nodeInfo.Node() == nil { if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests. // This may happen during tests.
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
@@ -88,20 +85,32 @@ func (ec *EquivalenceCache) RunPredicate(
if err != nil { if err != nil {
return fit, reasons, err return fit, reasons, err
} }
// Skip update if NodeInfo is stale. if cache != nil {
if cache != nil && cache.IsUpToDate(nodeInfo) { ec.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClassInfo.hash, cache, nodeInfo)
ec.updateResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, fit, reasons, equivClassInfo.hash)
} }
return fit, reasons, nil return fit, reasons, nil
} }
// updateResult updates the cached result of a predicate. // updateResult updates the cached result of a predicate.
func (ec *EquivalenceCache) updateResult( func (ec *EquivalenceCache) updateResult(
podName, nodeName, predicateKey string, podName, predicateKey string,
fit bool, fit bool,
reasons []algorithm.PredicateFailureReason, reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64, equivalenceHash uint64,
cache schedulercache.Cache,
nodeInfo *schedulercache.NodeInfo,
) { ) {
ec.mu.Lock()
defer ec.mu.Unlock()
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
return
}
// Skip update if NodeInfo is stale.
if !cache.IsUpToDate(nodeInfo) {
return
}
nodeName := nodeInfo.Node().GetName()
if _, exist := ec.algorithmCache[nodeName]; !exist { if _, exist := ec.algorithmCache[nodeName]; !exist {
ec.algorithmCache[nodeName] = AlgorithmCache{} ec.algorithmCache[nodeName] = AlgorithmCache{}
} }
@@ -130,6 +139,8 @@ func (ec *EquivalenceCache) lookupResult(
podName, nodeName, predicateKey string, podName, nodeName, predicateKey string,
equivalenceHash uint64, equivalenceHash uint64,
) (bool, []algorithm.PredicateFailureReason, bool) { ) (bool, []algorithm.PredicateFailureReason, bool) {
ec.mu.RLock()
defer ec.mu.RUnlock()
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache",
predicateKey, podName, nodeName) predicateKey, podName, nodeName)
if hostPredicate, exist := ec.algorithmCache[nodeName][predicateKey][equivalenceHash]; exist { if hostPredicate, exist := ec.algorithmCache[nodeName][predicateKey][equivalenceHash]; exist {

View File

@@ -253,9 +253,7 @@ func TestRunPredicate(t *testing.T) {
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
equivClass := ecache.getEquivalenceClassInfo(pod) equivClass := ecache.getEquivalenceClassInfo(pod)
if test.expectCacheHit { if test.expectCacheHit {
ecache.mu.Lock() ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
ecache.updateResult(pod.Name, node.Node().Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash)
ecache.mu.Unlock()
} }
fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
@@ -289,9 +287,7 @@ func TestRunPredicate(t *testing.T) {
if !test.expectCacheHit && test.pred.callCount == 0 { if !test.expectCacheHit && test.pred.callCount == 0 {
t.Errorf("Predicate should be called") t.Errorf("Predicate should be called")
} }
ecache.mu.Lock()
_, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) _, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
ecache.mu.Unlock()
if invalid && test.expectCacheWrite { if invalid && test.expectCacheWrite {
t.Errorf("Cache write should happen") t.Errorf("Cache write should happen")
} }
@@ -316,6 +312,7 @@ func TestUpdateResult(t *testing.T) {
equivalenceHash uint64 equivalenceHash uint64
expectPredicateMap bool expectPredicateMap bool
expectCacheItem HostPredicate expectCacheItem HostPredicate
cache schedulercache.Cache
}{ }{
{ {
name: "test 1", name: "test 1",
@@ -328,6 +325,7 @@ func TestUpdateResult(t *testing.T) {
expectCacheItem: HostPredicate{ expectCacheItem: HostPredicate{
Fit: true, Fit: true,
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 2", name: "test 2",
@@ -340,6 +338,7 @@ func TestUpdateResult(t *testing.T) {
expectCacheItem: HostPredicate{ expectCacheItem: HostPredicate{
Fit: false, Fit: false,
}, },
cache: &upToDateCache{},
}, },
} }
for _, test := range tests { for _, test := range tests {
@@ -354,16 +353,18 @@ func TestUpdateResult(t *testing.T) {
test.equivalenceHash: predicateItem, test.equivalenceHash: predicateItem,
} }
} }
ecache.mu.Lock()
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
ecache.updateResult( ecache.updateResult(
test.pod, test.pod,
test.nodeName,
test.predicateKey, test.predicateKey,
test.fit, test.fit,
test.reasons, test.reasons,
test.equivalenceHash, test.equivalenceHash,
test.cache,
node,
) )
ecache.mu.Unlock()
cachedMapItem, ok := ecache.algorithmCache[test.nodeName][test.predicateKey] cachedMapItem, ok := ecache.algorithmCache[test.nodeName][test.predicateKey]
if !ok { if !ok {
@@ -398,6 +399,7 @@ func TestLookupResult(t *testing.T) {
expectedInvalidPredicateKey bool expectedInvalidPredicateKey bool
expectedInvalidEquivalenceHash bool expectedInvalidEquivalenceHash bool
expectedPredicateItem predicateItemType expectedPredicateItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
name: "test 1", name: "test 1",
@@ -415,6 +417,7 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 2", name: "test 2",
@@ -431,6 +434,7 @@ func TestLookupResult(t *testing.T) {
fit: true, fit: true,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 3", name: "test 3",
@@ -448,6 +452,7 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 4", name: "test 4",
@@ -466,22 +471,24 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
} }
for _, test := range tests { for _, test := range tests {
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.mu.Lock()
ecache.updateResult( ecache.updateResult(
test.podName, test.podName,
test.nodeName,
test.predicateKey, test.predicateKey,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node,
) )
ecache.mu.Unlock()
// if we want to do invalid, invalid the cached item // if we want to do invalid, invalid the cached item
if test.expectedInvalidPredicateKey { if test.expectedInvalidPredicateKey {
predicateKeys := sets.NewString() predicateKeys := sets.NewString()
@@ -489,13 +496,11 @@ func TestLookupResult(t *testing.T) {
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys)
} }
// calculate predicate with equivalence cache // calculate predicate with equivalence cache
ecache.mu.Lock()
fit, reasons, invalid := ecache.lookupResult(test.podName, fit, reasons, invalid := ecache.lookupResult(test.podName,
test.nodeName, test.nodeName,
test.predicateKey, test.predicateKey,
test.equivalenceHashForCalPredicate, test.equivalenceHashForCalPredicate,
) )
ecache.mu.Unlock()
// returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash // returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
if invalid != test.expectedInvalidEquivalenceHash { if invalid != test.expectedInvalidEquivalenceHash {
@@ -645,6 +650,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
nodeName string nodeName string
equivalenceHashForUpdatePredicate uint64 equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType cachedItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
podName: "testPod", podName: "testPod",
@@ -656,6 +662,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
predicates.ErrPodNotFitsHostPorts, predicates.ErrPodNotFitsHostPorts,
}, },
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@@ -667,6 +674,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
predicates.ErrPodNotFitsHostPorts, predicates.ErrPodNotFitsHostPorts,
}, },
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@@ -675,22 +683,24 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: true, fit: true,
}, },
cache: &upToDateCache{},
}, },
} }
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.mu.Lock()
ecache.updateResult( ecache.updateResult(
test.podName, test.podName,
test.nodeName,
testPredicate, testPredicate,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node,
) )
ecache.mu.Unlock()
} }
// invalidate cached predicate for all nodes // invalidate cached predicate for all nodes
@@ -716,6 +726,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
nodeName string nodeName string
equivalenceHashForUpdatePredicate uint64 equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType cachedItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
podName: "testPod", podName: "testPod",
@@ -725,6 +736,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@@ -734,6 +746,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@@ -742,22 +755,24 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: true, fit: true,
}, },
cache: &upToDateCache{},
}, },
} }
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.mu.Lock()
ecache.updateResult( ecache.updateResult(
test.podName, test.podName,
test.nodeName,
testPredicate, testPredicate,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node,
) )
ecache.mu.Unlock()
} }
for _, test := range tests { for _, test := range tests {