Avoid race condition when updating equivalence cache.
This commit is contained in:
@@ -75,9 +75,12 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(
|
||||
fit bool,
|
||||
reasons []algorithm.PredicateFailureReason,
|
||||
equivalenceHash uint64,
|
||||
needLock bool,
|
||||
) {
|
||||
ec.Lock()
|
||||
defer ec.Unlock()
|
||||
if needLock {
|
||||
ec.Lock()
|
||||
defer ec.Unlock()
|
||||
}
|
||||
if _, exist := ec.algorithmCache[nodeName]; !exist {
|
||||
ec.algorithmCache[nodeName] = newAlgorithmCache()
|
||||
}
|
||||
@@ -106,10 +109,12 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(
|
||||
// based on cached predicate results
|
||||
func (ec *EquivalenceCache) PredicateWithECache(
|
||||
podName, nodeName, predicateKey string,
|
||||
equivalenceHash uint64,
|
||||
equivalenceHash uint64, needLock bool,
|
||||
) (bool, []algorithm.PredicateFailureReason, bool) {
|
||||
ec.RLock()
|
||||
defer ec.RUnlock()
|
||||
if needLock {
|
||||
ec.RLock()
|
||||
defer ec.RUnlock()
|
||||
}
|
||||
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache",
|
||||
predicateKey, podName, nodeName)
|
||||
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
|
||||
|
@@ -90,6 +90,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
|
||||
test.fit,
|
||||
test.reasons,
|
||||
test.equivalenceHash,
|
||||
true,
|
||||
)
|
||||
|
||||
value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey)
|
||||
@@ -201,6 +202,7 @@ func TestPredicateWithECache(t *testing.T) {
|
||||
test.cachedItem.fit,
|
||||
test.cachedItem.reasons,
|
||||
test.equivalenceHashForUpdatePredicate,
|
||||
true,
|
||||
)
|
||||
// if we want to do invalid, invalid the cached item
|
||||
if test.expectedInvalidPredicateKey {
|
||||
@@ -213,6 +215,7 @@ func TestPredicateWithECache(t *testing.T) {
|
||||
test.nodeName,
|
||||
test.predicateKey,
|
||||
test.equivalenceHashForCalPredicate,
|
||||
true,
|
||||
)
|
||||
// returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash
|
||||
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
|
||||
@@ -564,6 +567,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
||||
test.cachedItem.fit,
|
||||
test.cachedItem.reasons,
|
||||
test.equivalenceHashForUpdatePredicate,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -632,6 +636,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
||||
test.cachedItem.fit,
|
||||
test.cachedItem.reasons,
|
||||
test.equivalenceHashForUpdatePredicate,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
|
@@ -108,7 +108,7 @@ type genericScheduler struct {
|
||||
|
||||
// Schedule tries to schedule the given pod to one of node in the node list.
|
||||
// If it succeeds, it will return the name of the node.
|
||||
// If it fails, it will return a Fiterror error with reasons.
|
||||
// If it fails, it will return a FitError error with reasons.
|
||||
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
|
||||
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
|
||||
defer trace.LogIfLong(100 * time.Millisecond)
|
||||
@@ -469,8 +469,11 @@ func podFitsOnNode(
|
||||
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
|
||||
if predicate, exist := predicateFuncs[predicateKey]; exist {
|
||||
if eCacheAvailable {
|
||||
// Lock ecache here to avoid a race condition against cache invalidation invoked
|
||||
// in event handlers. This race has existed despite locks in eCache implementation.
|
||||
ecache.Lock()
|
||||
// PredicateWithECache will return its cached predicate results.
|
||||
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash)
|
||||
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash, false)
|
||||
}
|
||||
|
||||
if !eCacheAvailable || invalid {
|
||||
@@ -488,8 +491,15 @@ func podFitsOnNode(
|
||||
} else {
|
||||
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
|
||||
}
|
||||
result := predicateResults[predicateKey]
|
||||
ecache.UpdateCachedPredicateItem(pod.GetName(), info.Node().GetName(), predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
|
||||
}
|
||||
}
|
||||
|
||||
if eCacheAvailable {
|
||||
ecache.Unlock()
|
||||
}
|
||||
|
||||
if !fit {
|
||||
// eCache is available and valid, and predicates result is unfit, record the fail reasons
|
||||
failedPredicates = append(failedPredicates, reasons...)
|
||||
@@ -503,18 +513,6 @@ func podFitsOnNode(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(bsalamat): This way of updating equiv. cache has a race condition against
|
||||
// cache invalidations invoked in event handlers. This race has existed despite locks
|
||||
// in eCache implementation. If cache is invalidated after a predicate is executed
|
||||
// and before we update the cache, the updates should not be written to the cache.
|
||||
if eCacheAvailable {
|
||||
nodeName := info.Node().GetName()
|
||||
for predKey, result := range predicateResults {
|
||||
// update equivalence cache with newly computed fit & reasons
|
||||
// TODO(resouer) should we do this in another thread? any race?
|
||||
ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivCacheInfo.hash)
|
||||
}
|
||||
}
|
||||
return len(failedPredicates) == 0, failedPredicates, nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user