feat(scheduler): create metadata and addPod/removePod for migration

This commit is contained in:
draveness
2019-09-29 17:41:59 +08:00
parent 2dc5add8b1
commit 9554276d17
3 changed files with 414 additions and 297 deletions

View File

@@ -24,7 +24,7 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
@@ -110,11 +110,11 @@ func (paths *criticalPaths) update(tpVal string, num int32) {
}
}
// podSpreadCache combines tpKeyToCriticalPaths and tpPairToMatchNum
// evenPodsSpreadMetadata combines tpKeyToCriticalPaths and tpPairToMatchNum
// to represent:
// (1) critical paths where the least pods are matched on each spread constraint.
// (2) number of pods matched on each spread constraint.
type podSpreadCache struct {
type evenPodsSpreadMetadata struct {
// We record 2 critical paths instead of all critical paths here.
// criticalPaths[0].matchNum always holds the minimum matching number.
// criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but
@@ -124,14 +124,57 @@ type podSpreadCache struct {
tpPairToMatchNum map[topologyPair]int32
}
// NOTE: When new fields are added/removed or logic is changed, please make sure that
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
podRequest *schedulernodeinfo.Resource
podPorts []*v1.ContainerPort
type serviceAffinityMetadata struct {
matchingPodList []*v1.Pod
matchingPodServices []*v1.Service
}
func (m *serviceAffinityMetadata) addPod(addedPod *v1.Pod, pod *v1.Pod, node *v1.Node) {
// If addedPod is in the same namespace as the pod, update the list
// of matching pods if applicable.
if m == nil || addedPod.Namespace != pod.Namespace {
return
}
selector := CreateSelectorFromLabels(pod.Labels)
if selector.Matches(labels.Set(addedPod.Labels)) {
m.matchingPodList = append(m.matchingPodList, addedPod)
}
}
func (m *serviceAffinityMetadata) removePod(deletedPod *v1.Pod, node *v1.Node) {
deletedPodFullName := schedutil.GetPodFullName(deletedPod)
if m == nil ||
len(m.matchingPodList) == 0 ||
deletedPod.Namespace != m.matchingPodList[0].Namespace {
return
}
for i, pod := range m.matchingPodList {
if schedutil.GetPodFullName(pod) == deletedPodFullName {
m.matchingPodList = append(m.matchingPodList[:i], m.matchingPodList[i+1:]...)
break
}
}
}
func (m *serviceAffinityMetadata) clone() *serviceAffinityMetadata {
if m == nil {
return nil
}
copy := serviceAffinityMetadata{}
copy.matchingPodServices = append([]*v1.Service(nil),
m.matchingPodServices...)
copy.matchingPodList = append([]*v1.Pod(nil),
m.matchingPodList...)
return &copy
}
type podAffinityMetadata struct {
topologyPairsAntiAffinityPodsMap *topologyPairsMaps
// A map of topology pairs to a list of Pods that can potentially match
// the affinity terms of the "pod" and its inverse.
@@ -139,9 +182,70 @@ type predicateMetadata struct {
// A map of topology pairs to a list of Pods that can potentially match
// the anti-affinity terms of the "pod" and its inverse.
topologyPairsPotentialAntiAffinityPods *topologyPairsMaps
serviceAffinityInUse bool
serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service
}
func (m *podAffinityMetadata) addPod(addedPod *v1.Pod, pod *v1.Pod, node *v1.Node) error {
// Add matching anti-affinity terms of the addedPod to the map.
topologyPairsMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, addedPod, node)
if err != nil {
return err
}
m.topologyPairsAntiAffinityPodsMap.appendMaps(topologyPairsMaps)
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
affinity := pod.Spec.Affinity
podNodeName := addedPod.Spec.NodeName
if affinity != nil && len(podNodeName) > 0 {
// It is assumed that when the added pod matches affinity of the pod, all the terms must match,
// this should be changed when the implementation of targetPodMatchesAffinityOfPod/podMatchesAffinityTermProperties
// is changed
if targetPodMatchesAffinityOfPod(pod, addedPod) {
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
for _, term := range affinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
m.topologyPairsPotentialAffinityPods.addTopologyPair(pair, addedPod)
}
}
}
if targetPodMatchesAntiAffinityOfPod(pod, addedPod) {
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
for _, term := range antiAffinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
m.topologyPairsPotentialAntiAffinityPods.addTopologyPair(pair, addedPod)
}
}
}
}
return nil
}
func (m *podAffinityMetadata) removePod(deletedPod *v1.Pod) {
if m == nil {
return
}
m.topologyPairsAntiAffinityPodsMap.removePod(deletedPod)
// Delete pod from the matching affinity or anti-affinity topology pairs maps.
m.topologyPairsPotentialAffinityPods.removePod(deletedPod)
m.topologyPairsPotentialAntiAffinityPods.removePod(deletedPod)
}
func (m *podAffinityMetadata) clone() *podAffinityMetadata {
if m == nil {
return nil
}
copy := podAffinityMetadata{}
copy.topologyPairsPotentialAffinityPods = m.topologyPairsPotentialAffinityPods.clone()
copy.topologyPairsPotentialAntiAffinityPods = m.topologyPairsPotentialAntiAffinityPods.clone()
copy.topologyPairsAntiAffinityPodsMap = m.topologyPairsAntiAffinityPodsMap.clone()
return &copy
}
type podFitsResourcesMetadata struct {
// ignoredExtendedResources is a set of extended resource names that will
// be ignored in the PodFitsResources predicate.
//
@@ -149,9 +253,50 @@ type predicateMetadata struct {
// which should be accounted only by the extenders. This set is synthesized
// from scheduler extender configuration and does not change per pod.
ignoredExtendedResources sets.String
// podSpreadCache holds info of the minimum match number on each topology spread constraint,
podRequest *schedulernodeinfo.Resource
}
func (m *podFitsResourcesMetadata) clone() *podFitsResourcesMetadata {
if m == nil {
return nil
}
copy := podFitsResourcesMetadata{}
copy.ignoredExtendedResources = m.ignoredExtendedResources
copy.podRequest = m.podRequest
return &copy
}
type podFitsHostPortsMetadata struct {
podPorts []*v1.ContainerPort
}
func (m *podFitsHostPortsMetadata) clone() *podFitsHostPortsMetadata {
if m == nil {
return nil
}
copy := podFitsHostPortsMetadata{}
copy.podPorts = append([]*v1.ContainerPort(nil), m.podPorts...)
return &copy
}
// NOTE: When new fields are added/removed or logic is changed, please make sure that
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
// evenPodsSpreadMetadata holds info of the minimum match number on each topology spread constraint,
// and the match number of all valid topology pairs.
podSpreadCache *podSpreadCache
evenPodsSpreadMetadata *evenPodsSpreadMetadata
serviceAffinityMetadata *serviceAffinityMetadata
podAffinityMetadata *podAffinityMetadata
podFitsResourcesMetadata *podFitsResourcesMetadata
podFitsHostPortsMetadata *podFitsHostPortsMetadata
}
// Ensure that predicateMetadata implements algorithm.PredicateMetadata.
@@ -180,7 +325,7 @@ func EmptyPredicateMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*sche
// See the comments in "predicateMetadata" for the explanation of the options.
func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) {
RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) {
pm.ignoredExtendedResources = ignoredExtendedResources
pm.podFitsResourcesMetadata.ignoredExtendedResources = ignoredExtendedResources
})
}
@@ -190,35 +335,27 @@ func GetPredicateMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulerno
if pod == nil {
return nil
}
// existingPodSpreadCache represents how existing pods match "pod"
// evenPodsSpreadMetadata represents how existing pods match "pod"
// on its spread constraints
existingPodSpreadCache, err := getExistingPodSpreadCache(pod, nodeNameToInfoMap)
evenPodsSpreadMetadata, err := getEvenPodsSpreadMetadata(pod, nodeNameToInfoMap)
if err != nil {
klog.Errorf("Error calculating spreadConstraintsMap: %v", err)
return nil
}
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, nodeNameToInfoMap)
podAffinityMetadata, err := getPodAffinityMetadata(pod, nodeNameToInfoMap)
if err != nil {
klog.Errorf("Error calculating existingPodAntiAffinityMap: %v", err)
return nil
}
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, nodeNameToInfoMap)
if err != nil {
klog.Errorf("Error calculating incomingPod(Anti)AffinityMap: %v", err)
klog.Errorf("Error calculating podAffinityMetadata: %v", err)
return nil
}
predicateMetadata := &predicateMetadata{
pod: pod,
podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod),
podPorts: schedutil.GetContainerPorts(pod),
topologyPairsPotentialAffinityPods: incomingPodAffinityMap,
topologyPairsPotentialAntiAffinityPods: incomingPodAntiAffinityMap,
topologyPairsAntiAffinityPodsMap: existingPodAntiAffinityMap,
podSpreadCache: existingPodSpreadCache,
pod: pod,
podBestEffort: isPodBestEffort(pod),
evenPodsSpreadMetadata: evenPodsSpreadMetadata,
podAffinityMetadata: podAffinityMetadata,
podFitsResourcesMetadata: getPodFitsResourcesMetedata(pod),
podFitsHostPortsMetadata: getPodFitsHostPortsMetadata(pod),
}
for predicateName, precomputeFunc := range predicateMetadataProducers {
klog.V(10).Infof("Precompute: %v", predicateName)
@@ -227,7 +364,39 @@ func GetPredicateMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulerno
return predicateMetadata
}
func getExistingPodSpreadCache(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*podSpreadCache, error) {
func getPodFitsHostPortsMetadata(pod *v1.Pod) *podFitsHostPortsMetadata {
return &podFitsHostPortsMetadata{
podPorts: schedutil.GetContainerPorts(pod),
}
}
func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata {
return &podFitsResourcesMetadata{
podRequest: GetResourceRequest(pod),
}
}
func getPodAffinityMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulernodeinfo.NodeInfo) (*podAffinityMetadata, error) {
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, nodeNameToInfoMap)
if err != nil {
return nil, err
}
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, nodeNameToInfoMap)
if err != nil {
return nil, err
}
return &podAffinityMetadata{
topologyPairsPotentialAffinityPods: incomingPodAffinityMap,
topologyPairsPotentialAntiAffinityPods: incomingPodAntiAffinityMap,
topologyPairsAntiAffinityPodsMap: existingPodAntiAffinityMap,
}, nil
}
func getEvenPodsSpreadMetadata(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) {
// We have feature gating in APIServer to strip the spec
// so don't need to re-check feature gate, just check length of constraints.
constraints := getHardTopologySpreadConstraints(pod)
@@ -245,7 +414,7 @@ func getExistingPodSpreadCache(pod *v1.Pod, nodeInfoMap map[string]*schedulernod
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
m := podSpreadCache{
m := evenPodsSpreadMetadata{
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
tpPairToMatchNum: make(map[topologyPair]int32),
}
@@ -396,15 +565,15 @@ func (m *topologyPairsMaps) clone() *topologyPairsMaps {
return copy
}
func (c *podSpreadCache) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error {
func (c *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error {
return c.updatePod(addedPod, preemptorPod, node, 1)
}
func (c *podSpreadCache) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) error {
func (c *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) error {
return c.updatePod(deletedPod, preemptorPod, node, -1)
}
func (c *podSpreadCache) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) error {
func (c *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) error {
if updatedPod.Namespace != preemptorPod.Namespace || node == nil {
return nil
}
@@ -430,12 +599,12 @@ func (c *podSpreadCache) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.No
return nil
}
func (c *podSpreadCache) clone() *podSpreadCache {
func (c *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata {
// c could be nil when EvenPodsSpread feature is disabled
if c == nil {
return nil
}
copy := podSpreadCache{
copy := evenPodsSpreadMetadata{
tpKeyToCriticalPaths: make(map[string]*criticalPaths),
tpPairToMatchNum: make(map[topologyPair]int32),
}
@@ -456,29 +625,13 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) erro
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
return fmt.Errorf("deletedPod and meta.pod must not be the same")
}
meta.topologyPairsAntiAffinityPodsMap.removePod(deletedPod)
// Delete pod from the matching affinity or anti-affinity topology pairs maps.
meta.topologyPairsPotentialAffinityPods.removePod(deletedPod)
meta.topologyPairsPotentialAntiAffinityPods.removePod(deletedPod)
meta.podAffinityMetadata.removePod(deletedPod)
// Delete pod from the pod spread topology maps.
if err := meta.podSpreadCache.removePod(deletedPod, meta.pod, node); err != nil {
if err := meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node); err != nil {
return err
}
// All pods in the serviceAffinityMatchingPodList are in the same namespace.
// So, if the namespace of the first one is not the same as the namespace of the
// deletedPod, we don't need to check the list, as deletedPod isn't in the list.
if meta.serviceAffinityInUse &&
len(meta.serviceAffinityMatchingPodList) > 0 &&
deletedPod.Namespace == meta.serviceAffinityMatchingPodList[0].Namespace {
for i, pod := range meta.serviceAffinityMatchingPodList {
if schedutil.GetPodFullName(pod) == deletedPodFullName {
meta.serviceAffinityMatchingPodList = append(
meta.serviceAffinityMatchingPodList[:i],
meta.serviceAffinityMatchingPodList[i+1:]...)
break
}
}
}
meta.serviceAffinityMetadata.removePod(deletedPod, node)
return nil
}
@@ -492,53 +645,18 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
if node == nil {
return fmt.Errorf("node not found")
}
// Add matching anti-affinity terms of the addedPod to the map.
topologyPairsMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(meta.pod, addedPod, node)
if err != nil {
if err := meta.podAffinityMetadata.addPod(addedPod, meta.pod, node); err != nil {
return err
}
meta.topologyPairsAntiAffinityPodsMap.appendMaps(topologyPairsMaps)
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
affinity := meta.pod.Spec.Affinity
podNodeName := addedPod.Spec.NodeName
if affinity != nil && len(podNodeName) > 0 {
// It is assumed that when the added pod matches affinity of the meta.pod, all the terms must match,
// this should be changed when the implementation of targetPodMatchesAffinityOfPod/podMatchesAffinityTermProperties
// is changed
if targetPodMatchesAffinityOfPod(meta.pod, addedPod) {
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
for _, term := range affinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
meta.topologyPairsPotentialAffinityPods.addTopologyPair(pair, addedPod)
}
}
}
if targetPodMatchesAntiAffinityOfPod(meta.pod, addedPod) {
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
for _, term := range antiAffinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
meta.topologyPairsPotentialAntiAffinityPods.addTopologyPair(pair, addedPod)
}
}
}
}
// Update meta.podSpreadCache if meta.pod has hard spread constraints
// Update meta.evenPodsSpreadMetadata if meta.pod has hard spread constraints
// and addedPod matches that
if err := meta.podSpreadCache.addPod(addedPod, meta.pod, node); err != nil {
if err := meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node); err != nil {
return err
}
// If addedPod is in the same namespace as the meta.pod, update the list
// of matching pods if applicable.
if meta.serviceAffinityInUse && addedPod.Namespace == meta.pod.Namespace {
selector := CreateSelectorFromLabels(meta.pod.Labels)
if selector.Matches(labels.Set(addedPod.Labels)) {
meta.serviceAffinityMatchingPodList = append(meta.serviceAffinityMatchingPodList,
addedPod)
}
}
meta.serviceAffinityMetadata.addPod(addedPod, meta.pod, node)
return nil
}
@@ -546,21 +664,14 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
// its maps and slices, but it does not copy the contents of pointer values.
func (meta *predicateMetadata) ShallowCopy() PredicateMetadata {
newPredMeta := &predicateMetadata{
pod: meta.pod,
podBestEffort: meta.podBestEffort,
podRequest: meta.podRequest,
serviceAffinityInUse: meta.serviceAffinityInUse,
ignoredExtendedResources: meta.ignoredExtendedResources,
pod: meta.pod,
podBestEffort: meta.podBestEffort,
}
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
newPredMeta.topologyPairsPotentialAffinityPods = meta.topologyPairsPotentialAffinityPods.clone()
newPredMeta.topologyPairsPotentialAntiAffinityPods = meta.topologyPairsPotentialAntiAffinityPods.clone()
newPredMeta.topologyPairsAntiAffinityPodsMap = meta.topologyPairsAntiAffinityPodsMap.clone()
newPredMeta.podSpreadCache = meta.podSpreadCache.clone()
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
meta.serviceAffinityMatchingPodList...)
newPredMeta.podFitsHostPortsMetadata = meta.podFitsHostPortsMetadata.clone()
newPredMeta.podAffinityMetadata = meta.podAffinityMetadata.clone()
newPredMeta.evenPodsSpreadMetadata = meta.evenPodsSpreadMetadata.clone()
newPredMeta.serviceAffinityMetadata = meta.serviceAffinityMetadata.clone()
newPredMeta.podFitsResourcesMetadata = meta.podFitsResourcesMetadata.clone()
return (PredicateMetadata)(newPredMeta)
}