|
|
|
@@ -17,14 +17,17 @@ limitations under the License.
|
|
|
|
|
package scheduler
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"reflect"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
|
|
|
storagev1 "k8s.io/api/storage/v1"
|
|
|
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
|
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
|
|
|
"k8s.io/client-go/dynamic/dynamicinformer"
|
|
|
|
|
"k8s.io/client-go/informers"
|
|
|
|
@@ -256,6 +259,24 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
|
|
|
|
|
return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// syncedPollPeriod controls how often you look at the status of your sync funcs
|
|
|
|
|
syncedPollPeriod = 100 * time.Millisecond
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// WaitForHandlersSync waits for EventHandlers to sync.
|
|
|
|
|
// It returns true if it was successful, false if the controller should shut down
|
|
|
|
|
func (sched *Scheduler) WaitForHandlersSync(ctx context.Context) error {
|
|
|
|
|
return wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (done bool, err error) {
|
|
|
|
|
for _, handler := range sched.registeredHandlers {
|
|
|
|
|
if !handler.HasSynced() {
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true, nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// addAllEventHandlers is a helper function used in tests and in Scheduler
|
|
|
|
|
// to add event handlers for various informers.
|
|
|
|
|
func addAllEventHandlers(
|
|
|
|
@@ -263,9 +284,14 @@ func addAllEventHandlers(
|
|
|
|
|
informerFactory informers.SharedInformerFactory,
|
|
|
|
|
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
|
|
|
|
|
gvkMap map[framework.GVK]framework.ActionType,
|
|
|
|
|
) {
|
|
|
|
|
) error {
|
|
|
|
|
var (
|
|
|
|
|
handlerRegistration cache.ResourceEventHandlerRegistration
|
|
|
|
|
err error
|
|
|
|
|
handlers []cache.ResourceEventHandlerRegistration
|
|
|
|
|
)
|
|
|
|
|
// scheduled pod cache
|
|
|
|
|
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
|
|
|
|
|
cache.FilteringResourceEventHandler{
|
|
|
|
|
FilterFunc: func(obj interface{}) bool {
|
|
|
|
|
switch t := obj.(type) {
|
|
|
|
@@ -290,9 +316,13 @@ func addAllEventHandlers(
|
|
|
|
|
DeleteFunc: sched.deletePodFromCache,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
|
|
|
|
|
// unscheduled pod queue
|
|
|
|
|
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
|
|
|
|
|
cache.FilteringResourceEventHandler{
|
|
|
|
|
FilterFunc: func(obj interface{}) bool {
|
|
|
|
|
switch t := obj.(type) {
|
|
|
|
@@ -317,15 +347,21 @@ func addAllEventHandlers(
|
|
|
|
|
DeleteFunc: sched.deletePodFromSchedulingQueue,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
|
|
|
|
|
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
|
|
|
|
|
cache.ResourceEventHandlerFuncs{
|
|
|
|
|
AddFunc: sched.addNodeToCache,
|
|
|
|
|
UpdateFunc: sched.updateNodeInCache,
|
|
|
|
|
DeleteFunc: sched.deleteNodeFromCache,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
|
|
|
|
|
logger := sched.logger
|
|
|
|
|
buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
|
|
|
|
@@ -356,17 +392,26 @@ func addAllEventHandlers(
|
|
|
|
|
case framework.Node, framework.Pod:
|
|
|
|
|
// Do nothing.
|
|
|
|
|
case framework.CSINode:
|
|
|
|
|
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, framework.CSINode, "CSINode"),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
case framework.CSIDriver:
|
|
|
|
|
informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
case framework.CSIStorageCapacity:
|
|
|
|
|
informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
case framework.PersistentVolume:
|
|
|
|
|
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
|
|
|
|
|
//
|
|
|
|
@@ -381,42 +426,60 @@ func addAllEventHandlers(
|
|
|
|
|
// bindings due to conflicts if PVs are updated by PV controller or other
|
|
|
|
|
// parties, then scheduler will add pod back to unschedulable queue. We
|
|
|
|
|
// need to move pods to active queue on PV update for this scenario.
|
|
|
|
|
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
case framework.PersistentVolumeClaim:
|
|
|
|
|
// MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
|
|
|
|
|
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
case framework.PodSchedulingContext:
|
|
|
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
|
|
|
|
|
_, _ = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
case framework.ResourceClaim:
|
|
|
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
|
|
|
|
|
_, _ = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
case framework.StorageClass:
|
|
|
|
|
if at&framework.Add != 0 {
|
|
|
|
|
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
|
|
|
|
|
cache.ResourceEventHandlerFuncs{
|
|
|
|
|
AddFunc: sched.onStorageClassAdd,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
}
|
|
|
|
|
if at&framework.Update != 0 {
|
|
|
|
|
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
|
|
|
|
|
cache.ResourceEventHandlerFuncs{
|
|
|
|
|
UpdateFunc: func(old, obj interface{}) {
|
|
|
|
|
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
// Tests may not instantiate dynInformerFactory.
|
|
|
|
@@ -438,11 +501,16 @@ func addAllEventHandlers(
|
|
|
|
|
// Fall back to try dynamic informers.
|
|
|
|
|
gvr, _ := schema.ParseResourceArg(string(gvk))
|
|
|
|
|
dynInformer := dynInformerFactory.ForResource(*gvr).Informer()
|
|
|
|
|
dynInformer.AddEventHandler(
|
|
|
|
|
if handlerRegistration, err = dynInformer.AddEventHandler(
|
|
|
|
|
buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
|
|
|
|
|
)
|
|
|
|
|
); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
handlers = append(handlers, handlerRegistration)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sched.registeredHandlers = handlers
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
|
|
|
|
|