refactor: rename SchedulerCache to Cache in Scheduler

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
kerthcet
2022-02-24 09:47:21 +08:00
parent 09623be0b1
commit eafbaad9f7
9 changed files with 60 additions and 62 deletions

View File

@@ -67,7 +67,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
return return
} }
nodeInfo := sched.SchedulerCache.AddNode(node) nodeInfo := sched.Cache.AddNode(node)
klog.V(3).InfoS("Add event for node", "node", klog.KObj(node)) klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo)) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
} }
@@ -84,7 +84,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
return return
} }
nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode) nodeInfo := sched.Cache.UpdateNode(oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable. // Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil { if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo)) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
@@ -108,7 +108,7 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
return return
} }
klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node)) klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node))
if err := sched.SchedulerCache.RemoveNode(node); err != nil { if err := sched.Cache.RemoveNode(node); err != nil {
klog.ErrorS(err, "Scheduler cache RemoveNode failed") klog.ErrorS(err, "Scheduler cache RemoveNode failed")
} }
} }
@@ -129,7 +129,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
return return
} }
isAssumed, err := sched.SchedulerCache.IsAssumedPod(newPod) isAssumed, err := sched.Cache.IsAssumedPod(newPod)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err)) utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
} }
@@ -185,7 +185,7 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
} }
klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod)) klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulerCache.AddPod(pod); err != nil { if err := sched.Cache.AddPod(pod); err != nil {
klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod)) klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
} }
@@ -205,7 +205,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
} }
klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod)) klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod))
if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil { if err := sched.Cache.UpdatePod(oldPod, newPod); err != nil {
klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
} }
@@ -229,7 +229,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
return return
} }
klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod)) klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulerCache.RemovePod(pod); err != nil { if err := sched.Cache.RemovePod(pod); err != nil {
klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
} }

View File

@@ -225,21 +225,19 @@ func TestUpdatePodInCache(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
schedulerCache := cache.New(ttl, ctx.Done())
schedulerQueue := queue.NewTestQueue(ctx, nil)
sched := &Scheduler{ sched := &Scheduler{
SchedulerCache: schedulerCache, Cache: cache.New(ttl, ctx.Done()),
SchedulingQueue: schedulerQueue, SchedulingQueue: queue.NewTestQueue(ctx, nil),
} }
sched.addPodToCache(tt.oldObj) sched.addPodToCache(tt.oldObj)
sched.updatePodInCache(tt.oldObj, tt.newObj) sched.updatePodInCache(tt.oldObj, tt.newObj)
if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID { if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID {
if pod, err := sched.SchedulerCache.GetPod(tt.oldObj.(*v1.Pod)); err == nil { if pod, err := sched.Cache.GetPod(tt.oldObj.(*v1.Pod)); err == nil {
t.Errorf("Get pod UID %v from SchedulerCache but it should not happen", pod.UID) t.Errorf("Get pod UID %v from cache but it should not happen", pod.UID)
} }
} }
pod, err := sched.SchedulerCache.GetPod(tt.newObj.(*v1.Pod)) pod, err := sched.Cache.GetPod(tt.newObj.(*v1.Pod))
if err != nil { if err != nil {
t.Errorf("Failed to get pod from scheduler: %v", err) t.Errorf("Failed to get pod from scheduler: %v", err)
} }

View File

@@ -186,7 +186,7 @@ func (c *Configurator) create() (*Scheduler, error) {
) )
return &Scheduler{ return &Scheduler{
SchedulerCache: c.schedulerCache, Cache: c.schedulerCache,
Algorithm: algo, Algorithm: algo,
Extenders: extenders, Extenders: extenders,
Profiles: profiles, Profiles: profiles,

View File

@@ -39,7 +39,7 @@ var (
// "ttl" is how long the assumed pod will get expired. // "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine. // "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache { func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop) cache := newCache(ttl, cleanAssumedPeriod, stop)
cache.run() cache.run()
return cache return cache
} }
@@ -97,7 +97,7 @@ func (cache *cacheImpl) createImageStateSummary(state *imageState) *framework.Im
} }
} }
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *cacheImpl { func newCache(ttl, period time.Duration, stop <-chan struct{}) *cacheImpl {
return &cacheImpl{ return &cacheImpl{
ttl: ttl, ttl: ttl,
period: period, period: period,

View File

@@ -204,7 +204,7 @@ func TestAssumePodScheduled(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
for _, pod := range tt.pods { for _, pod := range tt.pods {
if err := cache.AssumePod(pod); err != nil { if err := cache.AssumePod(pod); err != nil {
t.Fatalf("AssumePod failed: %v", err) t.Fatalf("AssumePod failed: %v", err)
@@ -287,7 +287,7 @@ func TestExpirePod(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, pod := range tt.pods { for _, pod := range tt.pods {
if err := cache.AssumePod(pod.pod); err != nil { if err := cache.AssumePod(pod.pod); err != nil {
@@ -347,7 +347,7 @@ func TestAddPodWillConfirm(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@@ -387,7 +387,7 @@ func TestDump(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Errorf("assumePod failed: %v", err) t.Errorf("assumePod failed: %v", err)
@@ -455,7 +455,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@@ -510,7 +510,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
now := time.Now() now := time.Now()
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
} }
@@ -576,7 +576,7 @@ func TestUpdatePod(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAdd := range tt.podsToAdd { for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil { if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err) t.Fatalf("AddPod failed: %v", err)
@@ -638,7 +638,7 @@ func TestUpdatePodAndGet(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
if err := tt.handler(cache, tt.pod); err != nil { if err := tt.handler(cache, tt.pod); err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
@@ -709,7 +709,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
now := time.Now() now := time.Now()
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume { for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@@ -786,7 +786,7 @@ func TestEphemeralStorageResource(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
if err := cache.AddPod(tt.pod); err != nil { if err := cache.AddPod(tt.pod); err != nil {
t.Fatalf("AddPod failed: %v", err) t.Fatalf("AddPod failed: %v", err)
} }
@@ -839,7 +839,7 @@ func TestRemovePod(t *testing.T) {
for name, tt := range tests { for name, tt := range tests {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
nodeName := pod.Spec.NodeName nodeName := pod.Spec.NodeName
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
// Add/Assume pod succeeds even before adding the nodes. // Add/Assume pod succeeds even before adding the nodes.
if tt.assume { if tt.assume {
if err := cache.AddPod(pod); err != nil { if err := cache.AddPod(pod); err != nil {
@@ -881,7 +881,7 @@ func TestForgetPod(t *testing.T) {
now := time.Now() now := time.Now()
ttl := 10 * time.Second ttl := 10 * time.Second
cache := newSchedulerCache(ttl, time.Second, nil) cache := newCache(ttl, time.Second, nil)
for _, pod := range pods { for _, pod := range pods {
if err := assumeAndFinishBinding(cache, pod, now); err != nil { if err := assumeAndFinishBinding(cache, pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
@@ -1063,7 +1063,7 @@ func TestNodeOperators(t *testing.T) {
expected := buildNodeInfo(test.node, test.pods) expected := buildNodeInfo(test.node, test.pods)
node := test.node node := test.node
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
cache.AddNode(node) cache.AddNode(node)
for _, pod := range test.pods { for _, pod := range test.pods {
if err := cache.AddPod(pod); err != nil { if err := cache.AddPod(pod); err != nil {
@@ -1448,7 +1448,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil) cache = newCache(time.Second, time.Second, nil)
snapshot = NewEmptySnapshot() snapshot = NewEmptySnapshot()
for _, op := range test.operations { for _, op := range test.operations {
@@ -1663,7 +1663,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil) cache = newCache(time.Second, time.Second, nil)
snapshot = NewEmptySnapshot() snapshot = NewEmptySnapshot()
test.operations(t) test.operations(t)
@@ -1755,7 +1755,7 @@ func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, po
} }
func setupCacheOf1kNodes30kPods(b *testing.B) Cache { func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
nodeName := fmt.Sprintf("node-%d", i) nodeName := fmt.Sprintf("node-%d", i)
for j := 0; j < 30; j++ { for j := 0; j < 30; j++ {
@@ -1771,7 +1771,7 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
} }
func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *cacheImpl { func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *cacheImpl {
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newCache(time.Second, time.Second, nil)
for i := 0; i < podNum; i++ { for i := 0; i < podNum; i++ {
nodeName := fmt.Sprintf("node-%d", i/10) nodeName := fmt.Sprintf("node-%d", i/10)
objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10) objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)

View File

@@ -65,9 +65,9 @@ const (
// Scheduler watches for new unscheduled pods. It attempts to find // Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server. // nodes that they fit on and writes bindings back to the api server.
type Scheduler struct { type Scheduler struct {
// It is expected that changes made via SchedulerCache will be observed // It is expected that changes made via Cache will be observed
// by NodeLister and Algorithm. // by NodeLister and Algorithm.
SchedulerCache internalcache.Cache Cache internalcache.Cache
Algorithm ScheduleAlgorithm Algorithm ScheduleAlgorithm
@@ -357,7 +357,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// immediately. // immediately.
assumed.Spec.NodeName = host assumed.Spec.NodeName = host
if err := sched.SchedulerCache.AssumePod(assumed); err != nil { if err := sched.Cache.AssumePod(assumed); err != nil {
klog.ErrorS(err, "Scheduler cache AssumePod failed") klog.ErrorS(err, "Scheduler cache AssumePod failed")
return err return err
} }
@@ -406,7 +406,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error)
} }
func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) { func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) {
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { if finErr := sched.Cache.FinishBinding(assumed); finErr != nil {
klog.ErrorS(finErr, "Scheduler cache FinishBinding failed") klog.ErrorS(finErr, "Scheduler cache FinishBinding failed")
} }
if err != nil { if err != nil {
@@ -514,7 +514,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod // trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
} }
sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
@@ -534,7 +534,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} }
// One of the plugins returned status different than success or wait. // One of the plugins returned status different than success or wait.
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
} }
sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
@@ -567,7 +567,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} }
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else { } else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
@@ -590,7 +590,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else { } else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
@@ -607,7 +607,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { if err := sched.Cache.ForgetPod(assumedPod); err != nil {
klog.ErrorS(err, "scheduler cache ForgetPod failed") klog.ErrorS(err, "scheduler cache ForgetPod failed")
} else { } else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
@@ -665,7 +665,7 @@ func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bo
// Case 2: pod that has been assumed could be skipped. // Case 2: pod that has been assumed could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event // An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed. // during its previous scheduling cycle but before getting assumed.
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod) isAssumed, err := sched.Cache.IsAssumedPod(pod)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false return false

View File

@@ -398,7 +398,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
var gotForgetPod *v1.Pod var gotForgetPod *v1.Pod
var gotAssumedPod *v1.Pod var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding var gotBinding *v1.Binding
sCache := &fakecache.Cache{ cache := &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) { ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod gotForgetPod = pod
}, },
@@ -436,9 +436,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
defer cancel() defer cancel()
s := &Scheduler{ s := &Scheduler{
SchedulerCache: sCache, Cache: cache,
Algorithm: item.algo, Algorithm: item.algo,
client: client, client: client,
Error: func(p *framework.QueuedPodInfo, err error) { Error: func(p *framework.QueuedPodInfo, err error) {
gotPod = p.Pod gotPod = p.Pod
gotError = err gotError = err
@@ -881,7 +881,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing. // queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods. // scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, cache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
bindingChan := make(chan *v1.Binding, 1) bindingChan := make(chan *v1.Binding, 1)
client := clientsetfake.NewSimpleClientset() client := clientsetfake.NewSimpleClientset()
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
@@ -915,15 +915,15 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s
) )
algo := NewGenericScheduler( algo := NewGenericScheduler(
scache, cache,
internalcache.NewEmptySnapshot(), internalcache.NewEmptySnapshot(),
schedulerapi.DefaultPercentageOfNodesToScore, schedulerapi.DefaultPercentageOfNodesToScore,
) )
errChan := make(chan error, 1) errChan := make(chan error, 1)
sched := &Scheduler{ sched := &Scheduler{
SchedulerCache: scache, Cache: cache,
Algorithm: algo, Algorithm: algo,
NextPod: func() *framework.QueuedPodInfo { NextPod: func() *framework.QueuedPodInfo {
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))} return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))}
}, },
@@ -1180,16 +1180,16 @@ func TestSchedulerBinding(t *testing.T) {
} }
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
scache := internalcache.New(100*time.Millisecond, stop) cache := internalcache.New(100*time.Millisecond, stop)
algo := NewGenericScheduler( algo := NewGenericScheduler(
scache, cache,
nil, nil,
0, 0,
) )
sched := Scheduler{ sched := Scheduler{
Algorithm: algo, Algorithm: algo,
Extenders: test.extenders, Extenders: test.extenders,
SchedulerCache: scache, Cache: cache,
} }
err = sched.bind(context.Background(), fwk, pod, "node", nil) err = sched.bind(context.Background(), fwk, pod, "node", nil)
if err != nil { if err != nil {

View File

@@ -168,7 +168,7 @@ func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper,
// createAndWaitForNodesInCache calls createNodes(), and wait for the created // createAndWaitForNodesInCache calls createNodes(), and wait for the created
// nodes to be present in scheduler cache. // nodes to be present in scheduler cache.
func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
existingNodes := testCtx.Scheduler.SchedulerCache.NodeCount() existingNodes := testCtx.Scheduler.Cache.NodeCount()
nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
if err != nil { if err != nil {
return nodes, fmt.Errorf("cannot create nodes: %v", err) return nodes, fmt.Errorf("cannot create nodes: %v", err)
@@ -180,7 +180,7 @@ func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string,
// within 30 seconds; otherwise returns false. // within 30 seconds; otherwise returns false.
func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
return sched.SchedulerCache.NodeCount() >= nodeCount, nil return sched.Cache.NodeCount() >= nodeCount, nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
@@ -432,7 +432,7 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt
// waitCachedPodsStable waits until scheduler cache has the given pods. // waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error { func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount() cachedPods, err := testCtx.Scheduler.Cache.PodCount()
if err != nil { if err != nil {
return false, err return false, err
} }
@@ -444,7 +444,7 @@ func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error
if err1 != nil { if err1 != nil {
return false, err1 return false, err1
} }
cachedPod, err2 := testCtx.Scheduler.SchedulerCache.GetPod(actualPod) cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod)
if err2 != nil || cachedPod == nil { if err2 != nil || cachedPod == nil {
return false, err2 return false, err2
} }

View File

@@ -370,7 +370,7 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf
// WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete // WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete
func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) { func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) {
schedulerCacheIsEmpty := func() (bool, error) { schedulerCacheIsEmpty := func() (bool, error) {
dump := sched.SchedulerCache.Dump() dump := sched.Cache.Dump()
return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil
} }