Score plugin for the scheduling framework.

This commit is contained in:
Abdullah Gharaibeh
2019-07-16 09:19:20 -04:00
parent 8e3a2f2a5b
commit c54c4d1962
8 changed files with 255 additions and 32 deletions

View File

@@ -17,12 +17,15 @@ limitations under the License.
package v1alpha1
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
@@ -31,18 +34,19 @@ import (
// framework is the component responsible for initializing and running scheduler
// plugins.
type framework struct {
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
}
const (
@@ -55,10 +59,10 @@ var _ = Framework(&framework{})
// NewFramework initializes plugins given the configuration and the registry.
func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) {
f := &framework{
registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
plugins: make(map[string]Plugin),
waitingPods: newWaitingPodsMap(),
registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(),
}
if plugins == nil {
return f, nil
@@ -71,6 +75,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
pluginConfig := pluginNameToConfig(args)
pluginsMap := make(map[string]Plugin)
for name, factory := range r {
// initialize only needed plugins
if _, ok := pg[name]; !ok {
@@ -84,12 +89,19 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if err != nil {
return nil, fmt.Errorf("error initializing plugin %v: %v", name, err)
}
f.plugins[name] = p
pluginsMap[name] = p
// A weight of zero is not permitted, plugins can be disabled explicitly
// when configured.
f.pluginNameToWeightMap[name] = int(pg[name].Weight)
if f.pluginNameToWeightMap[name] == 0 {
f.pluginNameToWeightMap[name] = 1
}
}
if plugins.PreFilter != nil {
for _, pf := range plugins.PreFilter.Enabled {
if pg, ok := f.plugins[pf.Name]; ok {
if pg, ok := pluginsMap[pf.Name]; ok {
p, ok := pg.(PrefilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prefilter plugin", pf.Name)
@@ -101,9 +113,23 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
}
if plugins.Score != nil {
for _, sc := range plugins.Score.Enabled {
if pg, ok := pluginsMap[sc.Name]; ok {
p, ok := pg.(ScorePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend score plugin", sc.Name)
}
f.scorePlugins = append(f.scorePlugins, p)
} else {
return nil, fmt.Errorf("score plugin %v does not exist", sc.Name)
}
}
}
if plugins.Reserve != nil {
for _, r := range plugins.Reserve.Enabled {
if pg, ok := f.plugins[r.Name]; ok {
if pg, ok := pluginsMap[r.Name]; ok {
p, ok := pg.(ReservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend reserve plugin", r.Name)
@@ -117,7 +143,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.PreBind != nil {
for _, pb := range plugins.PreBind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(PrebindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prebind plugin", pb.Name)
@@ -131,7 +157,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Bind != nil {
for _, pb := range plugins.Bind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(BindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend bind plugin", pb.Name)
@@ -145,7 +171,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.PostBind != nil {
for _, pb := range plugins.PostBind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(PostbindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend postbind plugin", pb.Name)
@@ -159,7 +185,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Unreserve != nil {
for _, ur := range plugins.Unreserve.Enabled {
if pg, ok := f.plugins[ur.Name]; ok {
if pg, ok := pluginsMap[ur.Name]; ok {
p, ok := pg.(UnreservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend unreserve plugin", ur.Name)
@@ -173,7 +199,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Permit != nil {
for _, pr := range plugins.Permit.Enabled {
if pg, ok := f.plugins[pr.Name]; ok {
if pg, ok := pluginsMap[pr.Name]; ok {
p, ok := pg.(PermitPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend permit plugin", pr.Name)
@@ -187,7 +213,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.QueueSort != nil {
for _, qs := range plugins.QueueSort.Enabled {
if pg, ok := f.plugins[qs.Name]; ok {
if pg, ok := pluginsMap[qs.Name]; ok {
p, ok := pg.(QueueSortPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend queue sort plugin", qs.Name)
@@ -237,6 +263,52 @@ func (f *framework) RunPrefilterPlugins(
return nil
}
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScoreMap, *Status) {
pluginToNodeScoreMap := make(PluginToNodeScoreMap, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScoreMap[pl.Name()] = make(NodeScoreList, len(nodes))
}
ctx, cancel := context.WithCancel(context.Background())
var firstErr *Status
var mu = sync.Mutex{}
catchError := func(err *Status) {
mu.Lock()
defer mu.Unlock()
if firstErr == nil {
firstErr = err
}
cancel()
}
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
weight, weightExists := f.pluginNameToWeightMap[pl.Name()]
if !weightExists {
errMsg := fmt.Sprintf("weight does not exist for plugin %v", pl.Name())
catchError(NewStatus(Error, errMsg))
return
}
score, status := pl.Score(pc, pod, nodes[index].Name)
if !status.IsSuccess() {
catchError(status)
return
}
pluginToNodeScoreMap[pl.Name()][index] = score * weight
}
})
if firstErr != nil {
msg := fmt.Sprintf("error while running score plugin for pod %v: %v", pod.Name, firstErr.Message())
klog.Error(msg)
return nil, NewStatus(Error, msg)
}
return pluginToNodeScoreMap, nil
}
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
@@ -400,8 +472,8 @@ func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown
return pc
}
func pluginsNeeded(plugins *config.Plugins) map[string]struct{} {
pgMap := make(map[string]struct{}, 0)
func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap := make(map[string]config.Plugin, 0)
if plugins == nil {
return pgMap
@@ -412,7 +484,7 @@ func pluginsNeeded(plugins *config.Plugins) map[string]struct{} {
return
}
for _, pg := range pgs.Enabled {
pgMap[pg.Name] = struct{}{}
pgMap[pg.Name] = pg
}
}
find(plugins.QueueSort)