Update factory.go informers to update equivalence cache
Fix tombstone Add e2e to verify equivalence cache Addressing nits in factory,go and e2e Update build files
This commit is contained in:
parent
4103f40fc2
commit
0e8517875e
@ -120,6 +120,12 @@ const (
|
||||
//
|
||||
// Add priority to pods. Priority affects scheduling and preemption of pods.
|
||||
PodPriority utilfeature.Feature = "PodPriority"
|
||||
|
||||
// owner: @resouer
|
||||
// alpha: v1.8
|
||||
//
|
||||
// Enable equivalence class cache for scheduler.
|
||||
EnableEquivalenceClassCache utilfeature.Feature = "EnableEquivalenceClassCache"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -144,6 +150,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
|
||||
LocalStorageCapacityIsolation: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
DebugContainers: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
PodPriority: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
EnableEquivalenceClassCache: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
// unintentionally on either side:
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
||||
@ -39,6 +40,7 @@ import (
|
||||
|
||||
clientv1 "k8s.io/api/core/v1"
|
||||
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
@ -104,6 +106,7 @@ func CreateScheduler(
|
||||
statefulSetInformer,
|
||||
serviceInformer,
|
||||
s.HardPodAffinitySymmetricWeight,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
)
|
||||
|
||||
// Rebuild the configurator with a default Create(...) method.
|
||||
|
@ -37,6 +37,8 @@ import (
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||
)
|
||||
|
||||
const enableEquivalenceCache = true
|
||||
|
||||
func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
|
||||
schedulerFiles := map[string]struct {
|
||||
@ -432,6 +434,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
).CreateFromConfig(policy); err != nil {
|
||||
t.Errorf("%s: Error constructing: %v", v, err)
|
||||
continue
|
||||
|
@ -158,7 +158,8 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri
|
||||
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
|
||||
}
|
||||
|
||||
// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod, on the given node as invalid
|
||||
// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod (i.e. equivalenceHash),
|
||||
// on the given node as invalid
|
||||
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) {
|
||||
if len(predicateKeys) == 0 {
|
||||
return
|
||||
|
@ -20,6 +20,7 @@ package factory
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
@ -40,7 +41,9 @@ import (
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/api/helper"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
|
||||
@ -56,6 +59,14 @@ const (
|
||||
maximalGetBackoff = time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
serviceAffinitySet = sets.NewString("ServiceAffinity")
|
||||
maxPDVolumeCountPredicateSet = sets.NewString("MaxPDVolumeCountPredicate")
|
||||
matchInterPodAffinitySet = sets.NewString("MatchInterPodAffinity")
|
||||
generalPredicatesSets = sets.NewString("GeneralPredicates")
|
||||
noDiskConflictSet = sets.NewString("NoDiskConflict")
|
||||
)
|
||||
|
||||
// ConfigFactory is the default implementation of the scheduler.Configurator interface.
|
||||
// TODO make this private if possible, so that only its interface is externally used.
|
||||
type ConfigFactory struct {
|
||||
@ -99,6 +110,9 @@ type ConfigFactory struct {
|
||||
|
||||
// Equivalence class cache
|
||||
equivalencePodCache *core.EquivalenceCache
|
||||
|
||||
// Enable equivalence class cache
|
||||
enableEquivalenceClassCache bool
|
||||
}
|
||||
|
||||
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
|
||||
@ -115,6 +129,7 @@ func NewConfigFactory(
|
||||
statefulSetInformer appsinformers.StatefulSetInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
hardPodAffinitySymmetricWeight int,
|
||||
enableEquivalenceClassCache bool,
|
||||
) scheduler.Configurator {
|
||||
stopEverything := make(chan struct{})
|
||||
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
|
||||
@ -133,6 +148,7 @@ func NewConfigFactory(
|
||||
StopEverything: stopEverything,
|
||||
schedulerName: schedulerName,
|
||||
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceClassCache: enableEquivalenceClassCache,
|
||||
}
|
||||
|
||||
c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
||||
@ -201,11 +217,154 @@ func NewConfigFactory(
|
||||
)
|
||||
c.nodeLister = nodeInformer.Lister()
|
||||
|
||||
// TODO(harryz) need to fill all the handlers here and below for equivalence cache
|
||||
// On add and delete of PVs, it will affect equivalence cache items
|
||||
// related to persistent volume
|
||||
pvInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
|
||||
AddFunc: c.onPvAdd,
|
||||
DeleteFunc: c.onPvDelete,
|
||||
},
|
||||
0,
|
||||
)
|
||||
c.pVLister = pvInformer.Lister()
|
||||
|
||||
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
|
||||
pvcInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onPvcAdd,
|
||||
DeleteFunc: c.onPvcDelete,
|
||||
},
|
||||
0,
|
||||
)
|
||||
c.pVCLister = pvcInformer.Lister()
|
||||
|
||||
// This is for ServiceAffinity: affected by the selector of the service is updated.
|
||||
// Also, if new service is added, equivalence cache will also become invalid since
|
||||
// existing pods may be "captured" by this service and change this predicate result.
|
||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onServiceAdd,
|
||||
UpdateFunc: c.onServiceUpdate,
|
||||
DeleteFunc: c.onServiceDelete,
|
||||
},
|
||||
0,
|
||||
)
|
||||
c.serviceLister = serviceInformer.Lister()
|
||||
|
||||
// Existing equivalence cache should not be affected by add/delete RC/Deployment etc,
|
||||
// it only make sense when pod is scheduled or deleted
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1.PersistentVolume: %v", obj)
|
||||
return
|
||||
}
|
||||
c.invalidatePredicatesForPv(pv)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvDelete(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
var pv *v1.PersistentVolume
|
||||
switch t := obj.(type) {
|
||||
case *v1.PersistentVolume:
|
||||
pv = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pv, ok = t.Obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1.PersistentVolume: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
glog.Errorf("cannot convert to *v1.PersistentVolume: %v", t)
|
||||
return
|
||||
}
|
||||
c.invalidatePredicatesForPv(pv)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
||||
invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate")
|
||||
if pv.Spec.AWSElasticBlockStore != nil {
|
||||
invalidPredicates.Insert("MaxEBSVolumeCount")
|
||||
}
|
||||
if pv.Spec.GCEPersistentDisk != nil {
|
||||
invalidPredicates.Insert("MaxGCEPDVolumeCount")
|
||||
}
|
||||
if pv.Spec.AzureDisk != nil {
|
||||
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
|
||||
}
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvcAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
pvc, ok := obj.(*v1.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", obj)
|
||||
return
|
||||
}
|
||||
c.invalidatePredicatesForPvc(pvc)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvcDelete(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
var pvc *v1.PersistentVolumeClaim
|
||||
switch t := obj.(type) {
|
||||
case *v1.PersistentVolumeClaim:
|
||||
pvc = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pvc, ok = t.Obj.(*v1.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t)
|
||||
return
|
||||
}
|
||||
c.invalidatePredicatesForPvc(pvc)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
|
||||
if pvc.Spec.VolumeName != "" {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onServiceAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// TODO(resouer) We may need to invalidate this for specified group of pods only
|
||||
oldService := oldObj.(*v1.Service)
|
||||
newService := newObj.(*v1.Service)
|
||||
if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onServiceDelete(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
}
|
||||
}
|
||||
|
||||
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
|
||||
func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister {
|
||||
return c.nodeLister
|
||||
@ -229,7 +388,6 @@ func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister {
|
||||
return c.scheduledPodLister
|
||||
}
|
||||
|
||||
// TODO(resouer) need to update all the handlers here and below for equivalence cache
|
||||
func (c *ConfigFactory) addPodToCache(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
@ -240,6 +398,8 @@ func (c *ConfigFactory) addPodToCache(obj interface{}) {
|
||||
if err := c.schedulerCache.AddPod(pod); err != nil {
|
||||
glog.Errorf("scheduler cache AddPod failed: %v", err)
|
||||
}
|
||||
// NOTE: Updating equivalence cache of addPodToCache has been
|
||||
// handled optimistically in InvalidateCachedPredicateItemForPodAdd.
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||
@ -257,6 +417,29 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||
if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
|
||||
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
|
||||
}
|
||||
|
||||
c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// if the pod does not have binded node, updating equivalence cache is meaningless;
|
||||
// if pod's binded node has been changed, that case should be handled by pod add & delete.
|
||||
if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
|
||||
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
|
||||
// MatchInterPodAffinity need to be reconsidered for this node,
|
||||
// as well as all nodes in its same failure domain.
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
|
||||
matchInterPodAffinitySet)
|
||||
}
|
||||
// if requested container resource changed, invalidate GeneralPredicates of this node
|
||||
if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
|
||||
predicates.GetResourceRequest(oldPod)) {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItem(
|
||||
newPod.Spec.NodeName, generalPredicatesSets)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) deletePodFromCache(obj interface{}) {
|
||||
@ -278,6 +461,29 @@ func (c *ConfigFactory) deletePodFromCache(obj interface{}) {
|
||||
if err := c.schedulerCache.RemovePod(pod); err != nil {
|
||||
glog.Errorf("scheduler cache RemovePod failed: %v", err)
|
||||
}
|
||||
|
||||
c.invalidateCachedPredicatesOnDeletePod(pod)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// part of this case is the same as pod add.
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
|
||||
// MatchInterPodAffinity need to be reconsidered for this node,
|
||||
// as well as all nodes in its same failure domain.
|
||||
// TODO(resouer) can we just do this for nodes in the same failure domain
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
|
||||
matchInterPodAffinitySet)
|
||||
|
||||
// if this pod have these PV, cached result of disk conflict will become invalid.
|
||||
for _, volume := range pod.Spec.Volumes {
|
||||
if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
|
||||
volume.RBD != nil || volume.ISCSI != nil {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItem(
|
||||
pod.Spec.NodeName, noDiskConflictSet)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) addNodeToCache(obj interface{}) {
|
||||
@ -290,6 +496,8 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) {
|
||||
if err := c.schedulerCache.AddNode(node); err != nil {
|
||||
glog.Errorf("scheduler cache AddNode failed: %v", err)
|
||||
}
|
||||
|
||||
// NOTE: add a new node does not affect existing predicates in equivalence cache
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
@ -307,6 +515,64 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
|
||||
glog.Errorf("scheduler cache UpdateNode failed: %v", err)
|
||||
}
|
||||
|
||||
c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// Begin to update equivalence cache based on node update
|
||||
// TODO(resouer): think about lazily initialize this set
|
||||
invalidPredicates := sets.NewString()
|
||||
|
||||
oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations())
|
||||
if oldErr != nil {
|
||||
glog.Errorf("Failed to get taints from old node annotation for equivalence cache")
|
||||
}
|
||||
newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations())
|
||||
if newErr != nil {
|
||||
glog.Errorf("Failed to get taints from new node annotation for equivalence cache")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
|
||||
invalidPredicates.Insert("GeneralPredicates") // "PodFitsResources"
|
||||
}
|
||||
if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
|
||||
invalidPredicates.Insert("GeneralPredicates", "ServiceAffinity") // "PodSelectorMatches"
|
||||
for k, v := range oldNode.GetLabels() {
|
||||
// any label can be topology key of pod, we have to invalidate in all cases
|
||||
if v != newNode.GetLabels()[k] {
|
||||
invalidPredicates.Insert("MatchInterPodAffinity")
|
||||
}
|
||||
// NoVolumeZoneConflict will only be affected by zone related label change
|
||||
if k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion {
|
||||
if v != newNode.GetLabels()[k] {
|
||||
invalidPredicates.Insert("NoVolumeZoneConflict")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(oldTaints, newTaints) {
|
||||
invalidPredicates.Insert("PodToleratesNodeTaints")
|
||||
}
|
||||
if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) {
|
||||
oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
|
||||
newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
|
||||
for _, cond := range oldNode.Status.Conditions {
|
||||
oldConditions[cond.Type] = cond.Status
|
||||
}
|
||||
for _, cond := range newNode.Status.Conditions {
|
||||
newConditions[cond.Type] = cond.Status
|
||||
}
|
||||
if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] {
|
||||
invalidPredicates.Insert("CheckNodeMemoryPressure")
|
||||
}
|
||||
if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] {
|
||||
invalidPredicates.Insert("CheckNodeDiskPressure")
|
||||
}
|
||||
}
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) {
|
||||
@ -328,6 +594,9 @@ func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) {
|
||||
if err := c.schedulerCache.RemoveNode(node); err != nil {
|
||||
glog.Errorf("scheduler cache RemoveNode failed: %v", err)
|
||||
}
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(node.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
// Create creates a scheduler with the default algorithm provider.
|
||||
@ -424,11 +693,17 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO(resouer) use equivalence cache instead of nil here when #36238 get merged
|
||||
algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
|
||||
// Init equivalence class cache
|
||||
if f.enableEquivalenceClassCache && getEquivalencePodFunc != nil {
|
||||
f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc)
|
||||
glog.Info("Created equivalence class cache")
|
||||
}
|
||||
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
|
||||
|
||||
podBackoff := util.CreateDefaultPodBackoff()
|
||||
return &scheduler.Config{
|
||||
SchedulerCache: f.schedulerCache,
|
||||
Ecache: f.equivalencePodCache,
|
||||
// The scheduler only needs to consider schedulable nodes.
|
||||
NodeLister: &nodePredicateLister{f.nodeLister},
|
||||
Algorithm: algo,
|
||||
|
@ -40,6 +40,8 @@ import (
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
const enableEquivalenceCache = true
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
handler := utiltesting.FakeHandler{
|
||||
StatusCode: 500,
|
||||
@ -62,6 +64,7 @@ func TestCreate(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
factory.Create()
|
||||
}
|
||||
@ -93,6 +96,7 @@ func TestCreateFromConfig(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
||||
// Pre-register some predicate and priority functions
|
||||
@ -151,6 +155,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
||||
// Pre-register some predicate and priority functions
|
||||
@ -210,6 +215,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
||||
configData = []byte(`{}`)
|
||||
@ -266,6 +272,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
|
||||
@ -378,6 +385,7 @@ func TestResponsibleForPod(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
// factory of "foo-scheduler"
|
||||
factoryFooScheduler := NewConfigFactory(
|
||||
@ -392,6 +400,7 @@ func TestResponsibleForPod(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
// scheduler annotations to be tested
|
||||
schedulerFitsDefault := "default-scheduler"
|
||||
@ -461,6 +470,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
-1,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
_, err := factory.Create()
|
||||
if err == nil {
|
||||
@ -506,6 +516,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
test.hardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
_, err := factory.Create()
|
||||
if err == nil {
|
||||
|
@ -411,7 +411,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
|
||||
framework.AddOrUpdateLabelOnNode(c, node, labelKey, labelValue)
|
||||
}
|
||||
|
||||
CreateNodeSelectorPods(f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false)
|
||||
scheduling.CreateNodeSelectorPods(f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false)
|
||||
|
||||
By("Waiting for new node to appear and annotating it")
|
||||
framework.WaitForGroupSize(minMig, int32(minSize+1))
|
||||
@ -907,26 +907,6 @@ func doPut(url, content string) (string, error) {
|
||||
return strBody, nil
|
||||
}
|
||||
|
||||
func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) {
|
||||
By(fmt.Sprintf("Running RC which reserves host port and defines node selector"))
|
||||
|
||||
config := &testutils.RCConfig{
|
||||
Client: f.ClientSet,
|
||||
InternalClient: f.InternalClientset,
|
||||
Name: id,
|
||||
Namespace: f.Namespace.Name,
|
||||
Timeout: defaultTimeout,
|
||||
Image: framework.GetPauseImageName(f.ClientSet),
|
||||
Replicas: replicas,
|
||||
HostPorts: map[string]int{"port1": 4321},
|
||||
NodeSelector: nodeSelector,
|
||||
}
|
||||
err := framework.RunRC(*config)
|
||||
if expectRunning {
|
||||
framework.ExpectNoError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) {
|
||||
By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes))
|
||||
request := int64(1024 * 1024 * megabytes / replicas)
|
||||
|
@ -2462,6 +2462,15 @@ func AddOrUpdateLabelOnNode(c clientset.Interface, nodeName string, labelKey, la
|
||||
ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue}))
|
||||
}
|
||||
|
||||
func AddOrUpdateLabelOnNodeAndReturnOldValue(c clientset.Interface, nodeName string, labelKey, labelValue string) string {
|
||||
var oldValue string
|
||||
node, err := c.Core().Nodes().Get(nodeName, metav1.GetOptions{})
|
||||
ExpectNoError(err)
|
||||
oldValue = node.Labels[labelKey]
|
||||
ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue}))
|
||||
return oldValue
|
||||
}
|
||||
|
||||
func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string, labelValue string) {
|
||||
By("verifying the node has the label " + labelKey + " " + labelValue)
|
||||
node, err := c.Core().Nodes().Get(nodeName, metav1.GetOptions{})
|
||||
|
286
test/e2e/scheduling/equivalence_cache_predicates.go
Normal file
286
test/e2e/scheduling/equivalence_cache_predicates.go
Normal file
@ -0,0 +1,286 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduling
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
_ "github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
|
||||
var cs clientset.Interface
|
||||
var nodeList *v1.NodeList
|
||||
var masterNodes sets.String
|
||||
var systemPodsNo int
|
||||
var ns string
|
||||
f := framework.NewDefaultFramework("equivalence-cache")
|
||||
ignoreLabels := framework.ImagePullerLabels
|
||||
|
||||
BeforeEach(func() {
|
||||
cs = f.ClientSet
|
||||
ns = f.Namespace.Name
|
||||
|
||||
framework.WaitForAllNodesHealthy(cs, time.Minute)
|
||||
masterNodes, nodeList = framework.GetMasterAndWorkerNodesOrDie(cs)
|
||||
|
||||
framework.ExpectNoError(framework.CheckTestingNSDeletedExcept(cs, ns))
|
||||
|
||||
// Every test case in this suite assumes that cluster add-on pods stay stable and
|
||||
// cannot be run in parallel with any other test that touches Nodes or Pods.
|
||||
// It is so because we need to have precise control on what's running in the cluster.
|
||||
systemPods, err := framework.GetPodsInNamespace(cs, ns, ignoreLabels)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
systemPodsNo = 0
|
||||
for _, pod := range systemPods {
|
||||
if !masterNodes.Has(pod.Spec.NodeName) && pod.DeletionTimestamp == nil {
|
||||
systemPodsNo++
|
||||
}
|
||||
}
|
||||
|
||||
err = framework.WaitForPodsRunningReady(cs, api.NamespaceSystem, int32(systemPodsNo), int32(systemPodsNo), framework.PodReadyBeforeTimeout, ignoreLabels)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
for _, node := range nodeList.Items {
|
||||
framework.Logf("\nLogging pods the kubelet thinks is on node %v before test", node.Name)
|
||||
framework.PrintAllKubeletPods(cs, node.Name)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
// This test verifies that GeneralPredicates works as expected:
|
||||
// When a replica pod (with HostPorts) is scheduled to a node, it will invalidate GeneralPredicates cache on this node,
|
||||
// so that subsequent replica pods with same host port claim will be rejected.
|
||||
// We enforce all replica pods bind to the same node so there will always be conflicts.
|
||||
It("validates GeneralPredicates is properly invalidated when a pod is scheduled [Slow]", func() {
|
||||
By("Launching a RC with two replica pods with HostPorts")
|
||||
nodeName := getNodeThatCanRunPodWithoutToleration(f)
|
||||
rcName := "host-port"
|
||||
|
||||
// bind all replicas to same node
|
||||
nodeSelector := map[string]string{"kubernetes.io/hostname": nodeName}
|
||||
|
||||
By("One pod should be scheduled, the other should be rejected")
|
||||
// CreateNodeSelectorPods creates RC with host port 4312
|
||||
WaitForSchedulerAfterAction(f, func() error {
|
||||
err := CreateNodeSelectorPods(f, rcName, 2, nodeSelector, false)
|
||||
return err
|
||||
}, rcName, false)
|
||||
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, rcName)
|
||||
// the first replica pod is scheduled, and the second pod will be rejected.
|
||||
verifyResult(cs, 1, 1, ns)
|
||||
})
|
||||
|
||||
// This test verifies that MatchInterPodAffinity works as expected.
|
||||
// In equivalence cache, it does not handle inter pod affinity (anti-affinity) specially (unless node label changed),
|
||||
// because current predicates algorithm will ensure newly scheduled pod does not break existing affinity in cluster.
|
||||
It("validates pod affinity works properly when new replica pod is scheduled", func() {
|
||||
// create a pod running with label {security: S1}, and choose this node
|
||||
nodeName, _ := runAndKeepPodWithLabelAndGetNodeName(f)
|
||||
|
||||
By("Trying to apply a random label on the found node.")
|
||||
// we need to use real failure domains, since scheduler only know them
|
||||
k := "failure-domain.beta.kubernetes.io/zone"
|
||||
v := "equivalence-e2e-test"
|
||||
oldValue := framework.AddOrUpdateLabelOnNodeAndReturnOldValue(cs, nodeName, k, v)
|
||||
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
|
||||
// restore the node label
|
||||
defer framework.AddOrUpdateLabelOnNode(cs, nodeName, k, oldValue)
|
||||
|
||||
By("Trying to schedule RC with Pod Affinity should success.")
|
||||
framework.WaitForStableCluster(cs, masterNodes)
|
||||
affinityRCName := "with-pod-affinity-" + string(uuid.NewUUID())
|
||||
replica := 2
|
||||
labelsMap := map[string]string{
|
||||
"name": affinityRCName,
|
||||
}
|
||||
affinity := &v1.Affinity{
|
||||
PodAffinity: &v1.PodAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
{
|
||||
Key: "security",
|
||||
Operator: metav1.LabelSelectorOpIn,
|
||||
Values: []string{"S1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
TopologyKey: k,
|
||||
Namespaces: []string{ns},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
rc := getRCWithInterPodAffinity(affinityRCName, labelsMap, replica, affinity, framework.GetPauseImageName(f.ClientSet))
|
||||
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, affinityRCName)
|
||||
|
||||
// RC should be running successfully
|
||||
// TODO: WaitForSchedulerAfterAction() can on be used to wait for failure event,
|
||||
// not for successful RC, since no specific pod name can be provided.
|
||||
_, err := cs.Core().ReplicationControllers(ns).Create(rc)
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(framework.WaitForControlledPodsRunning(cs, ns, affinityRCName, api.Kind("ReplicationController")))
|
||||
|
||||
By("Remove node failure domain label")
|
||||
framework.RemoveLabelOffNode(cs, nodeName, k)
|
||||
|
||||
By("Trying to schedule another equivalent Pod should fail due to node label has been removed.")
|
||||
// use scale to create another equivalent pod and wait for failure event
|
||||
WaitForSchedulerAfterAction(f, func() error {
|
||||
err := framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, affinityRCName, uint(replica+1), false)
|
||||
return err
|
||||
}, affinityRCName, false)
|
||||
// and this new pod should be rejected since node label has been updated
|
||||
verifyReplicasResult(cs, replica, 1, ns, affinityRCName)
|
||||
})
|
||||
|
||||
// This test verifies that MatchInterPodAffinity (anti-affinity) is respected as expected.
|
||||
It("validates pod anti-affinity works properly when new replica pod is scheduled", func() {
|
||||
By("Launching two pods on two distinct nodes to get two node names")
|
||||
CreateHostPortPods(f, "host-port", 2, true)
|
||||
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "host-port")
|
||||
podList, err := cs.Core().Pods(ns).List(metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
Expect(len(podList.Items)).To(Equal(2))
|
||||
nodeNames := []string{podList.Items[0].Spec.NodeName, podList.Items[1].Spec.NodeName}
|
||||
Expect(nodeNames[0]).ToNot(Equal(nodeNames[1]))
|
||||
|
||||
By("Applying a random label to both nodes.")
|
||||
k := "e2e.inter-pod-affinity.kubernetes.io/zone"
|
||||
v := "equivalence-e2etest"
|
||||
for _, nodeName := range nodeNames {
|
||||
framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
|
||||
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
|
||||
defer framework.RemoveLabelOffNode(cs, nodeName, k)
|
||||
}
|
||||
|
||||
By("Trying to launch a pod with the service label on the selected nodes.")
|
||||
// run a pod with label {"service": "S1"} and expect it to be running
|
||||
runPausePod(f, pausePodConfig{
|
||||
Name: "with-label-" + string(uuid.NewUUID()),
|
||||
Labels: map[string]string{"service": "S1"},
|
||||
NodeSelector: map[string]string{k: v}, // only launch on our two nodes
|
||||
})
|
||||
|
||||
By("Trying to launch RC with podAntiAffinity on these two nodes should be rejected.")
|
||||
labelRCName := "with-podantiaffinity-" + string(uuid.NewUUID())
|
||||
replica := 2
|
||||
labelsMap := map[string]string{
|
||||
"name": labelRCName,
|
||||
}
|
||||
affinity := &v1.Affinity{
|
||||
PodAntiAffinity: &v1.PodAntiAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
{
|
||||
Key: "service",
|
||||
Operator: metav1.LabelSelectorOpIn,
|
||||
Values: []string{"S1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
TopologyKey: k,
|
||||
Namespaces: []string{ns},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
rc := getRCWithInterPodAffinityNodeSelector(labelRCName, labelsMap, replica, affinity,
|
||||
framework.GetPauseImageName(f.ClientSet), map[string]string{k: v})
|
||||
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, labelRCName)
|
||||
|
||||
WaitForSchedulerAfterAction(f, func() error {
|
||||
_, err := cs.Core().ReplicationControllers(ns).Create(rc)
|
||||
return err
|
||||
}, labelRCName, false)
|
||||
|
||||
// these two replicas should all be rejected since podAntiAffinity says it they anit-affinity with pod {"service": "S1"}
|
||||
verifyReplicasResult(cs, 0, replica, ns, labelRCName)
|
||||
})
|
||||
})
|
||||
|
||||
// getRCWithInterPodAffinity returns RC with given affinity rules.
|
||||
func getRCWithInterPodAffinity(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string) *v1.ReplicationController {
|
||||
return getRCWithInterPodAffinityNodeSelector(name, labelsMap, replica, affinity, image, map[string]string{})
|
||||
}
|
||||
|
||||
// getRCWithInterPodAffinity returns RC with given affinity rules and node selector.
|
||||
func getRCWithInterPodAffinityNodeSelector(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string, nodeSelector map[string]string) *v1.ReplicationController {
|
||||
replicaInt32 := int32(replica)
|
||||
return &v1.ReplicationController{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
Spec: v1.ReplicationControllerSpec{
|
||||
Replicas: &replicaInt32,
|
||||
Selector: labelsMap,
|
||||
Template: &v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: labelsMap,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Affinity: affinity,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: name,
|
||||
Image: image,
|
||||
},
|
||||
},
|
||||
DNSPolicy: v1.DNSDefault,
|
||||
NodeSelector: nodeSelector,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) error {
|
||||
By(fmt.Sprintf("Running RC which reserves host port and defines node selector"))
|
||||
|
||||
config := &testutils.RCConfig{
|
||||
Client: f.ClientSet,
|
||||
InternalClient: f.InternalClientset,
|
||||
Name: id,
|
||||
Namespace: f.Namespace.Name,
|
||||
Timeout: defaultTimeout,
|
||||
Image: framework.GetPauseImageName(f.ClientSet),
|
||||
Replicas: replicas,
|
||||
HostPorts: map[string]int{"port1": 4321},
|
||||
NodeSelector: nodeSelector,
|
||||
}
|
||||
err := framework.RunRC(*config)
|
||||
if expectRunning {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -24,6 +24,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -51,6 +52,8 @@ type pausePodConfig struct {
|
||||
Resources *v1.ResourceRequirements
|
||||
Tolerations []v1.Toleration
|
||||
NodeName string
|
||||
Ports []v1.ContainerPort
|
||||
OwnerReferences []metav1.OwnerReference
|
||||
}
|
||||
|
||||
var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
|
||||
@ -752,6 +755,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
|
||||
Name: conf.Name,
|
||||
Labels: conf.Labels,
|
||||
Annotations: conf.Annotations,
|
||||
OwnerReferences: conf.OwnerReferences,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeSelector: conf.NodeSelector,
|
||||
@ -760,6 +764,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
|
||||
{
|
||||
Name: conf.Name,
|
||||
Image: framework.GetPauseImageName(f.ClientSet),
|
||||
Ports: conf.Ports,
|
||||
},
|
||||
},
|
||||
Tolerations: conf.Tolerations,
|
||||
@ -948,6 +953,32 @@ func verifyResult(c clientset.Interface, expectedScheduled int, expectedNotSched
|
||||
Expect(len(scheduledPods)).To(Equal(expectedScheduled), printOnce(fmt.Sprintf("Scheduled Pods: %#v", scheduledPods)))
|
||||
}
|
||||
|
||||
// verifyReplicasResult is wrapper of verifyResult for a group pods with same "name: labelName" label, which means they belong to same RC
|
||||
func verifyReplicasResult(c clientset.Interface, expectedScheduled int, expectedNotScheduled int, ns string, labelName string) {
|
||||
allPods := getPodsByLabels(c, ns, map[string]string{"name": labelName})
|
||||
scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods)
|
||||
|
||||
printed := false
|
||||
printOnce := func(msg string) string {
|
||||
if !printed {
|
||||
printed = true
|
||||
return msg
|
||||
} else {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
Expect(len(notScheduledPods)).To(Equal(expectedNotScheduled), printOnce(fmt.Sprintf("Not scheduled Pods: %#v", notScheduledPods)))
|
||||
Expect(len(scheduledPods)).To(Equal(expectedScheduled), printOnce(fmt.Sprintf("Scheduled Pods: %#v", scheduledPods)))
|
||||
}
|
||||
|
||||
func getPodsByLabels(c clientset.Interface, ns string, labelsMap map[string]string) *v1.PodList {
|
||||
selector := labels.SelectorFromSet(labels.Set(labelsMap))
|
||||
allPods, err := c.Core().Pods(ns).List(metav1.ListOptions{LabelSelector: selector.String()})
|
||||
framework.ExpectNoError(err)
|
||||
return allPods
|
||||
}
|
||||
|
||||
func runAndKeepPodWithLabelAndGetNodeName(f *framework.Framework) (string, string) {
|
||||
// launch a pod to find a node which can launch a pod. We intentionally do
|
||||
// not just take the node list and choose the first of them. Depending on the
|
||||
|
@ -370,6 +370,7 @@ func TestSchedulerExtender(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy)
|
||||
if err != nil {
|
||||
|
@ -50,6 +50,8 @@ import (
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
const enableEquivalenceCache = true
|
||||
|
||||
type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface)
|
||||
|
||||
type nodeStateManager struct {
|
||||
@ -257,6 +259,7 @@ func TestUnschedulableNodes(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
@ -540,6 +543,7 @@ func TestMultiScheduler(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
@ -626,6 +630,7 @@ func TestMultiScheduler(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
schedulerConfig2, err := schedulerConfigFactory2.Create()
|
||||
if err != nil {
|
||||
@ -736,6 +741,7 @@ func TestAllocatable(t *testing.T) {
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
|
@ -36,6 +36,8 @@ import (
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
const enableEquivalenceCache = true
|
||||
|
||||
// mustSetupScheduler starts the following components:
|
||||
// - k8s api server (a.k.a. master)
|
||||
// - scheduler
|
||||
@ -74,6 +76,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
|
Loading…
Reference in New Issue
Block a user