Unroll PreemptHandle to Handle
This commit is contained in:
@@ -1173,7 +1173,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
|
|||||||
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
|
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
fwk.PreemptHandle().AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
|
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
|
||||||
|
|
||||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
|
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
|
||||||
|
|
||||||
|
@@ -552,6 +552,10 @@ type Framework interface {
|
|||||||
// passed to the plugin factories at the time of plugin initialization. Plugins
|
// passed to the plugin factories at the time of plugin initialization. Plugins
|
||||||
// must store and use this handle to call framework functions.
|
// must store and use this handle to call framework functions.
|
||||||
type Handle interface {
|
type Handle interface {
|
||||||
|
// PodNominator abstracts operations to maintain nominated Pods.
|
||||||
|
PodNominator
|
||||||
|
// PluginsRunner abstracts operations to run some plugins.
|
||||||
|
PluginsRunner
|
||||||
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
|
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
|
||||||
// is taken at the beginning of a scheduling cycle and remains unchanged until
|
// is taken at the beginning of a scheduling cycle and remains unchanged until
|
||||||
// a pod finishes "Permit" point. There is no guarantee that the information
|
// a pod finishes "Permit" point. There is no guarantee that the information
|
||||||
@@ -581,8 +585,8 @@ type Handle interface {
|
|||||||
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
|
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
|
||||||
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status
|
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status
|
||||||
|
|
||||||
// TODO: unroll the wrapped interfaces to Handle.
|
// Extenders returns registered scheduler extenders.
|
||||||
PreemptHandle() PreemptHandle
|
Extenders() []Extender
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
|
// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
|
||||||
@@ -590,16 +594,6 @@ type PostFilterResult struct {
|
|||||||
NominatedNodeName string
|
NominatedNodeName string
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreemptHandle incorporates all needed logic to run preemption logic.
|
|
||||||
type PreemptHandle interface {
|
|
||||||
// PodNominator abstracts operations to maintain nominated Pods.
|
|
||||||
PodNominator
|
|
||||||
// PluginsRunner abstracts operations to run some plugins.
|
|
||||||
PluginsRunner
|
|
||||||
// Extenders returns registered scheduler extenders.
|
|
||||||
Extenders() []Extender
|
|
||||||
}
|
|
||||||
|
|
||||||
// PodNominator abstracts operations to maintain nominated Pods.
|
// PodNominator abstracts operations to maintain nominated Pods.
|
||||||
type PodNominator interface {
|
type PodNominator interface {
|
||||||
// AddNominatedPod adds the given pod to the nominated pod map or
|
// AddNominatedPod adds the given pod to the nominated pod map or
|
||||||
|
@@ -116,7 +116,6 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
|
|||||||
// before it is retried after many other pending pods.
|
// before it is retried after many other pending pods.
|
||||||
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) {
|
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) {
|
||||||
cs := pl.fh.ClientSet()
|
cs := pl.fh.ClientSet()
|
||||||
ph := pl.fh.PreemptHandle()
|
|
||||||
nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
|
nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
|
||||||
|
|
||||||
// 0) Fetch the latest version of <pod>.
|
// 0) Fetch the latest version of <pod>.
|
||||||
@@ -156,7 +155,7 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 3) Interact with registered Extenders to filter out some candidates if needed.
|
// 3) Interact with registered Extenders to filter out some candidates if needed.
|
||||||
candidates, status = CallExtenders(ph.Extenders(), pod, nodeLister, candidates)
|
candidates, status = CallExtenders(pl.fh.Extenders(), pod, nodeLister, candidates)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return "", status
|
return "", status
|
||||||
}
|
}
|
||||||
@@ -606,12 +605,11 @@ func selectVictimsOnNode(
|
|||||||
pdbs []*policy.PodDisruptionBudget,
|
pdbs []*policy.PodDisruptionBudget,
|
||||||
) ([]*v1.Pod, int, *framework.Status) {
|
) ([]*v1.Pod, int, *framework.Status) {
|
||||||
var potentialVictims []*framework.PodInfo
|
var potentialVictims []*framework.PodInfo
|
||||||
ph := fh.PreemptHandle()
|
|
||||||
removePod := func(rpi *framework.PodInfo) error {
|
removePod := func(rpi *framework.PodInfo) error {
|
||||||
if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
|
if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
status := ph.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
|
status := fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return status.AsError()
|
return status.AsError()
|
||||||
}
|
}
|
||||||
@@ -619,7 +617,7 @@ func selectVictimsOnNode(
|
|||||||
}
|
}
|
||||||
addPod := func(api *framework.PodInfo) error {
|
addPod := func(api *framework.PodInfo) error {
|
||||||
nodeInfo.AddPodInfo(api)
|
nodeInfo.AddPodInfo(api)
|
||||||
status := ph.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
|
status := fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return status.AsError()
|
return status.AsError()
|
||||||
}
|
}
|
||||||
@@ -714,7 +712,7 @@ func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface,
|
|||||||
// this node. So, we should remove their nomination. Removing their
|
// this node. So, we should remove their nomination. Removing their
|
||||||
// nomination updates these pods and moves them to the active queue. It
|
// nomination updates these pods and moves them to the active queue. It
|
||||||
// lets scheduler find another place for them.
|
// lets scheduler find another place for them.
|
||||||
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, c.Name())
|
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
|
||||||
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
||||||
klog.ErrorS(err, "cannot clear 'NominatedNodeName' field")
|
klog.ErrorS(err, "cannot clear 'NominatedNodeName' field")
|
||||||
// We do not return as this error is not critical.
|
// We do not return as this error is not critical.
|
||||||
|
@@ -88,7 +88,8 @@ type frameworkImpl struct {
|
|||||||
metricsRecorder *metricsRecorder
|
metricsRecorder *metricsRecorder
|
||||||
profileName string
|
profileName string
|
||||||
|
|
||||||
preemptHandle framework.PreemptHandle
|
extenders []framework.Extender
|
||||||
|
framework.PodNominator
|
||||||
|
|
||||||
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
|
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
|
||||||
// after the first failure.
|
// after the first failure.
|
||||||
@@ -122,6 +123,11 @@ func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionP
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extenders returns the registered extenders.
|
||||||
|
func (f *frameworkImpl) Extenders() []framework.Extender {
|
||||||
|
return f.extenders
|
||||||
|
}
|
||||||
|
|
||||||
type frameworkOptions struct {
|
type frameworkOptions struct {
|
||||||
clientSet clientset.Interface
|
clientSet clientset.Interface
|
||||||
eventRecorder events.EventRecorder
|
eventRecorder events.EventRecorder
|
||||||
@@ -218,19 +224,6 @@ func defaultFrameworkOptions() frameworkOptions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ framework.PreemptHandle = &preemptHandle{}
|
|
||||||
|
|
||||||
type preemptHandle struct {
|
|
||||||
extenders []framework.Extender
|
|
||||||
framework.PodNominator
|
|
||||||
framework.PluginsRunner
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extenders returns the registered extenders.
|
|
||||||
func (ph *preemptHandle) Extenders() []framework.Extender {
|
|
||||||
return ph.extenders
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ framework.Framework = &frameworkImpl{}
|
var _ framework.Framework = &frameworkImpl{}
|
||||||
|
|
||||||
// NewFramework initializes plugins given the configuration and the registry.
|
// NewFramework initializes plugins given the configuration and the registry.
|
||||||
@@ -251,11 +244,8 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
|||||||
metricsRecorder: options.metricsRecorder,
|
metricsRecorder: options.metricsRecorder,
|
||||||
profileName: options.profileName,
|
profileName: options.profileName,
|
||||||
runAllFilters: options.runAllFilters,
|
runAllFilters: options.runAllFilters,
|
||||||
}
|
|
||||||
f.preemptHandle = &preemptHandle{
|
|
||||||
extenders: options.extenders,
|
extenders: options.extenders,
|
||||||
PodNominator: options.podNominator,
|
PodNominator: options.podNominator,
|
||||||
PluginsRunner: f,
|
|
||||||
}
|
}
|
||||||
if plugins == nil {
|
if plugins == nil {
|
||||||
return f, nil
|
return f, nil
|
||||||
@@ -605,7 +595,6 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po
|
|||||||
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
|
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
|
||||||
var status *framework.Status
|
var status *framework.Status
|
||||||
|
|
||||||
ph := f.PreemptHandle()
|
|
||||||
podsAdded := false
|
podsAdded := false
|
||||||
// We run filters twice in some cases. If the node has greater or equal priority
|
// We run filters twice in some cases. If the node has greater or equal priority
|
||||||
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
|
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
|
||||||
@@ -630,7 +619,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
|
|||||||
nodeInfoToUse := info
|
nodeInfoToUse := info
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
var err error
|
var err error
|
||||||
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
|
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return framework.AsStatus(err)
|
return framework.AsStatus(err)
|
||||||
}
|
}
|
||||||
@@ -638,7 +627,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
||||||
status = statusMap.Merge()
|
status = statusMap.Merge()
|
||||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||||
return status
|
return status
|
||||||
@@ -651,12 +640,12 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
|
|||||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||||
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
||||||
// 3) augmented nodeInfo.
|
// 3) augmented nodeInfo.
|
||||||
func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
||||||
if ph == nil || nodeInfo.Node() == nil {
|
if fh == nil || nodeInfo.Node() == nil {
|
||||||
// This may happen only in tests.
|
// This may happen only in tests.
|
||||||
return false, state, nodeInfo, nil
|
return false, state, nodeInfo, nil
|
||||||
}
|
}
|
||||||
nominatedPodInfos := ph.NominatedPodsForNode(nodeInfo.Node().Name)
|
nominatedPodInfos := fh.NominatedPodsForNode(nodeInfo.Node().Name)
|
||||||
if len(nominatedPodInfos) == 0 {
|
if len(nominatedPodInfos) == 0 {
|
||||||
return false, state, nodeInfo, nil
|
return false, state, nodeInfo, nil
|
||||||
}
|
}
|
||||||
@@ -666,7 +655,7 @@ func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.P
|
|||||||
for _, pi := range nominatedPodInfos {
|
for _, pi := range nominatedPodInfos {
|
||||||
if corev1.PodPriority(pi.Pod) >= corev1.PodPriority(pod) && pi.Pod.UID != pod.UID {
|
if corev1.PodPriority(pi.Pod) >= corev1.PodPriority(pod) && pi.Pod.UID != pod.UID {
|
||||||
nodeInfoOut.AddPodInfo(pi)
|
nodeInfoOut.AddPodInfo(pi)
|
||||||
status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pi, nodeInfoOut)
|
status := fh.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pi, nodeInfoOut)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return false, state, nodeInfo, status.AsError()
|
return false, state, nodeInfo, status.AsError()
|
||||||
}
|
}
|
||||||
@@ -1133,11 +1122,6 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config
|
|||||||
return pgMap
|
return pgMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreemptHandle returns the internal preemptHandle object.
|
|
||||||
func (f *frameworkImpl) PreemptHandle() framework.PreemptHandle {
|
|
||||||
return f.preemptHandle
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProfileName returns the profile name associated to this framework.
|
// ProfileName returns the profile name associated to this framework.
|
||||||
func (f *frameworkImpl) ProfileName() string {
|
func (f *frameworkImpl) ProfileName() string {
|
||||||
return f.profileName
|
return f.profileName
|
||||||
|
@@ -409,15 +409,14 @@ func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.Cyc
|
|||||||
return nil, framework.NewStatus(framework.Error, err.Error())
|
return nil, framework.NewStatus(framework.Error, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
ph := pp.fh.PreemptHandle()
|
|
||||||
for _, nodeInfo := range nodeInfos {
|
for _, nodeInfo := range nodeInfos {
|
||||||
ph.RunFilterPlugins(ctx, state, pod, nodeInfo)
|
pp.fh.RunFilterPlugins(ctx, state, pod, nodeInfo)
|
||||||
}
|
}
|
||||||
var nodes []*v1.Node
|
var nodes []*v1.Node
|
||||||
for _, nodeInfo := range nodeInfos {
|
for _, nodeInfo := range nodeInfos {
|
||||||
nodes = append(nodes, nodeInfo.Node())
|
nodes = append(nodes, nodeInfo.Node())
|
||||||
}
|
}
|
||||||
ph.RunScorePlugins(ctx, state, pod, nodes)
|
pp.fh.RunScorePlugins(ctx, state, pod, nodes)
|
||||||
|
|
||||||
if pp.failPostFilter {
|
if pp.failPostFilter {
|
||||||
return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||||
|
Reference in New Issue
Block a user