feat(scheduling): implement azure, cinder, ebs and gce as filter plugin

This commit is contained in:
draveness
2019-12-27 16:53:28 +08:00
parent f1cbbda291
commit 320ac4e277
17 changed files with 1387 additions and 2873 deletions

View File

@@ -21,17 +21,11 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@@ -39,7 +33,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"max_attachable_volume_predicate_test.go",
"predicates_test.go",
"utils_test.go",
],
@@ -48,19 +41,14 @@ go_test(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@@ -18,28 +18,19 @@ package predicates
import (
"fmt"
"os"
"regexp"
"strconv"
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
csilibplugins "k8s.io/csi-translation-lib/plugins"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
@@ -92,25 +83,8 @@ const (
// EvenPodsSpreadPred defines the name of predicate EvenPodsSpread.
EvenPodsSpreadPred = "EvenPodsSpread"
// DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.
// GCE instances can have up to 16 PD volumes attached.
DefaultMaxGCEPDVolumes = 16
// DefaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.
// Larger Azure VMs can actually have much more disks attached.
// TODO We should determine the max based on VM size
DefaultMaxAzureDiskVolumes = 16
// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.
KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
// EBSVolumeFilterType defines the filter name for EBSVolumeFilter.
EBSVolumeFilterType = "EBS"
// GCEPDVolumeFilterType defines the filter name for GCEPDVolumeFilter.
GCEPDVolumeFilterType = "GCE"
// AzureDiskVolumeFilterType defines the filter name for AzureDiskVolumeFilter.
AzureDiskVolumeFilterType = "AzureDisk"
// CinderVolumeFilterType defines the filter name for CinderVolumeFilter.
CinderVolumeFilterType = "Cinder"
)
// IMPORTANT NOTE for predicate developers:
@@ -145,394 +119,6 @@ func Ordering() []string {
// The failure information is given by the error.
type FitPredicate func(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error)
// MaxPDVolumeCountChecker contains information to check the max number of volumes for a predicate.
type MaxPDVolumeCountChecker struct {
filter VolumeFilter
volumeLimitKey v1.ResourceName
maxVolumeFunc func(node *v1.Node) int
csiNodeLister storagelisters.CSINodeLister
pvLister corelisters.PersistentVolumeLister
pvcLister corelisters.PersistentVolumeClaimLister
scLister storagelisters.StorageClassLister
// The string below is generated randomly during the struct's initialization.
// It is used to prefix volumeID generated inside the predicate() method to
// avoid conflicts with any real volume.
randomVolumeIDPrefix string
}
// VolumeFilter contains information on how to filter PD Volumes when checking PD Volume caps.
type VolumeFilter struct {
// Filter normal volumes
FilterVolume func(vol *v1.Volume) (id string, relevant bool)
FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
// MatchProvisioner evaluates if the StorageClass provisioner matches the running predicate
MatchProvisioner func(sc *storage.StorageClass) (relevant bool)
// IsMigrated returns a boolean specifying whether the plugin is migrated to a CSI driver
IsMigrated func(csiNode *storage.CSINode) bool
}
// NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the
// number of volumes which match a filter that it requests, and those that are already present.
//
// DEPRECATED
// All cloudprovider specific predicates defined here are deprecated in favour of CSI volume limit
// predicate - MaxCSIVolumeCountPred.
//
// The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume
// types, counts the number of unique volumes, and rejects the new pod if it would place the total count over
// the maximum.
func NewMaxPDVolumeCountPredicate(filterName string, csiNodeLister storagelisters.CSINodeLister, scLister storagelisters.StorageClassLister,
pvLister corelisters.PersistentVolumeLister, pvcLister corelisters.PersistentVolumeClaimLister) FitPredicate {
var filter VolumeFilter
var volumeLimitKey v1.ResourceName
switch filterName {
case EBSVolumeFilterType:
filter = EBSVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)
case GCEPDVolumeFilterType:
filter = GCEPDVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)
case AzureDiskVolumeFilterType:
filter = AzureDiskVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
case CinderVolumeFilterType:
filter = CinderVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.CinderVolumeLimitKey)
default:
klog.Fatalf("Wrong filterName, Only Support %v %v %v ", EBSVolumeFilterType,
GCEPDVolumeFilterType, AzureDiskVolumeFilterType)
return nil
}
c := &MaxPDVolumeCountChecker{
filter: filter,
volumeLimitKey: volumeLimitKey,
maxVolumeFunc: getMaxVolumeFunc(filterName),
csiNodeLister: csiNodeLister,
pvLister: pvLister,
pvcLister: pvcLister,
scLister: scLister,
randomVolumeIDPrefix: rand.String(32),
}
return c.predicate
}
func getMaxVolumeFunc(filterName string) func(node *v1.Node) int {
return func(node *v1.Node) int {
maxVolumesFromEnv := getMaxVolLimitFromEnv()
if maxVolumesFromEnv > 0 {
return maxVolumesFromEnv
}
var nodeInstanceType string
for k, v := range node.ObjectMeta.Labels {
if k == v1.LabelInstanceType || k == v1.LabelInstanceTypeStable {
nodeInstanceType = v
break
}
}
switch filterName {
case EBSVolumeFilterType:
return getMaxEBSVolume(nodeInstanceType)
case GCEPDVolumeFilterType:
return DefaultMaxGCEPDVolumes
case AzureDiskVolumeFilterType:
return DefaultMaxAzureDiskVolumes
case CinderVolumeFilterType:
return volumeutil.DefaultMaxCinderVolumes
default:
return -1
}
}
}
func getMaxEBSVolume(nodeInstanceType string) int {
if ok, _ := regexp.MatchString(volumeutil.EBSNitroLimitRegex, nodeInstanceType); ok {
return volumeutil.DefaultMaxEBSNitroVolumeLimit
}
return volumeutil.DefaultMaxEBSVolumes
}
// getMaxVolLimitFromEnv checks the max PD volumes environment variable, otherwise returning a default value.
func getMaxVolLimitFromEnv() int {
if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" {
if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil {
klog.Errorf("Unable to parse maximum PD volumes value, using default: %v", err)
} else if parsedMaxVols <= 0 {
klog.Errorf("Maximum PD volumes must be a positive value, using default")
} else {
return parsedMaxVols
}
}
return -1
}
func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []v1.Volume, namespace string, filteredVolumes map[string]bool) error {
for i := range volumes {
vol := &volumes[i]
if id, ok := c.filter.FilterVolume(vol); ok {
filteredVolumes[id] = true
} else if vol.PersistentVolumeClaim != nil {
pvcName := vol.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return fmt.Errorf("PersistentVolumeClaim had no name")
}
// Until we know real ID of the volume use namespace/pvcName as substitute
// with a random prefix (calculated and stored inside 'c' during initialization)
// to avoid conflicts with existing volume IDs.
pvID := fmt.Sprintf("%s-%s/%s", c.randomVolumeIDPrefix, namespace, pvcName)
pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
if err != nil || pvc == nil {
// If the PVC is invalid, we don't count the volume because
// there's no guarantee that it belongs to the running predicate.
klog.V(4).Infof("Unable to look up PVC info for %s/%s, assuming PVC doesn't match predicate when counting limits: %v", namespace, pvcName, err)
continue
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
// PVC is not bound. It was either deleted and created again or
// it was forcefully unbound by admin. The pod can still use the
// original PV where it was bound to, so we count the volume if
// it belongs to the running predicate.
if c.matchProvisioner(pvc) {
klog.V(4).Infof("PVC %s/%s is not bound, assuming PVC matches predicate when counting limits", namespace, pvcName)
filteredVolumes[pvID] = true
}
continue
}
pv, err := c.pvLister.Get(pvName)
if err != nil || pv == nil {
// If the PV is invalid and PVC belongs to the running predicate,
// log the error and count the PV towards the PV limit.
if c.matchProvisioner(pvc) {
klog.V(4).Infof("Unable to look up PV info for %s/%s/%s, assuming PV matches predicate when counting limits: %v", namespace, pvcName, pvName, err)
filteredVolumes[pvID] = true
}
continue
}
if id, ok := c.filter.FilterPersistentVolume(pv); ok {
filteredVolumes[id] = true
}
}
}
return nil
}
// matchProvisioner helps identify if the given PVC belongs to the running predicate.
func (c *MaxPDVolumeCountChecker) matchProvisioner(pvc *v1.PersistentVolumeClaim) bool {
if pvc.Spec.StorageClassName == nil {
return false
}
storageClass, err := c.scLister.Get(*pvc.Spec.StorageClassName)
if err != nil || storageClass == nil {
return false
}
return c.filter.MatchProvisioner(storageClass)
}
func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return true, nil, nil
}
newVolumes := make(map[string]bool)
if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
return false, nil, err
}
// quick return
if len(newVolumes) == 0 {
return true, nil, nil
}
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
var (
csiNode *storage.CSINode
err error
)
if c.csiNodeLister != nil {
csiNode, err = c.csiNodeLister.Get(node.Name)
if err != nil {
// we don't fail here because the CSINode object is only necessary
// for determining whether the migration is enabled or not
klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
}
}
// if a plugin has been migrated to a CSI driver, defer to the CSI predicate
if c.filter.IsMigrated(csiNode) {
return true, nil, nil
}
// count unique volumes
existingVolumes := make(map[string]bool)
for _, existingPod := range nodeInfo.Pods() {
if err := c.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil {
return false, nil, err
}
}
numExistingVolumes := len(existingVolumes)
// filter out already-mounted volumes
for k := range existingVolumes {
if _, ok := newVolumes[k]; ok {
delete(newVolumes, k)
}
}
numNewVolumes := len(newVolumes)
maxAttachLimit := c.maxVolumeFunc(node)
volumeLimits := nodeInfo.VolumeLimits()
if maxAttachLimitFromAllocatable, ok := volumeLimits[c.volumeLimitKey]; ok {
maxAttachLimit = int(maxAttachLimitFromAllocatable)
}
if numExistingVolumes+numNewVolumes > maxAttachLimit {
// violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
return false, []PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
}
if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {
nodeInfo.TransientInfo.TransientLock.Lock()
defer nodeInfo.TransientInfo.TransientLock.Unlock()
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
}
return true, nil, nil
}
// EBSVolumeFilter is a VolumeFilter for filtering AWS ElasticBlockStore Volumes.
var EBSVolumeFilter = VolumeFilter{
FilterVolume: func(vol *v1.Volume) (string, bool) {
if vol.AWSElasticBlockStore != nil {
return vol.AWSElasticBlockStore.VolumeID, true
}
return "", false
},
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
if pv.Spec.AWSElasticBlockStore != nil {
return pv.Spec.AWSElasticBlockStore.VolumeID, true
}
return "", false
},
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.AWSEBSInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName)
},
}
// GCEPDVolumeFilter is a VolumeFilter for filtering GCE PersistentDisk Volumes.
var GCEPDVolumeFilter = VolumeFilter{
FilterVolume: func(vol *v1.Volume) (string, bool) {
if vol.GCEPersistentDisk != nil {
return vol.GCEPersistentDisk.PDName, true
}
return "", false
},
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
if pv.Spec.GCEPersistentDisk != nil {
return pv.Spec.GCEPersistentDisk.PDName, true
}
return "", false
},
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.GCEPDInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName)
},
}
// AzureDiskVolumeFilter is a VolumeFilter for filtering Azure Disk Volumes.
var AzureDiskVolumeFilter = VolumeFilter{
FilterVolume: func(vol *v1.Volume) (string, bool) {
if vol.AzureDisk != nil {
return vol.AzureDisk.DiskName, true
}
return "", false
},
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
if pv.Spec.AzureDisk != nil {
return pv.Spec.AzureDisk.DiskName, true
}
return "", false
},
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.AzureDiskInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName)
},
}
// CinderVolumeFilter is a VolumeFilter for filtering Cinder Volumes.
// It will be deprecated once Openstack cloudprovider has been removed from in-tree.
var CinderVolumeFilter = VolumeFilter{
FilterVolume: func(vol *v1.Volume) (string, bool) {
if vol.Cinder != nil {
return vol.Cinder.VolumeID, true
}
return "", false
},
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
if pv.Spec.Cinder != nil {
return pv.Spec.Cinder.VolumeID, true
}
return "", false
},
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
if sc.Provisioner == csilibplugins.CinderInTreePluginName {
return true
}
return false
},
IsMigrated: func(csiNode *storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
},
}
// GetResourceRequest returns a *schedulernodeinfo.Resource that covers the largest
// width in each resource dimension. Because init-containers run sequentially, we collect
// the max in each dimension iteratively. In contrast, we sum the resource vectors for

View File

@@ -17,7 +17,6 @@ limitations under the License.
package predicates
import (
"os"
"reflect"
"strconv"
"strings"
@@ -1719,44 +1718,3 @@ func TestPodToleratesTaints(t *testing.T) {
})
}
}
func TestGetMaxVols(t *testing.T) {
previousValue := os.Getenv(KubeMaxPDVols)
tests := []struct {
rawMaxVols string
expected int
name string
}{
{
rawMaxVols: "invalid",
expected: -1,
name: "Unable to parse maximum PD volumes value, using default value",
},
{
rawMaxVols: "-2",
expected: -1,
name: "Maximum PD volumes must be a positive value, using default value",
},
{
rawMaxVols: "40",
expected: 40,
name: "Parse maximum PD volumes value from env",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
os.Setenv(KubeMaxPDVols, test.rawMaxVols)
result := getMaxVolLimitFromEnv()
if result != test.expected {
t.Errorf("expected %v got %v", test.expected, result)
}
})
}
os.Unsetenv(KubeMaxPDVols)
if previousValue != "" {
os.Setenv(KubeMaxPDVols, previousValue)
}
}

View File

@@ -17,15 +17,8 @@ limitations under the License.
package predicates
import (
"strings"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
csilibplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/features"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@@ -94,56 +87,3 @@ func SetPredicatesOrderingDuringTest(value []string) func() {
predicatesOrdering = origVal
}
}
// isCSIMigrationOn returns a boolean value indicating whether
// the CSI migration has been enabled for a particular storage plugin.
func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string) bool {
if csiNode == nil || len(pluginName) == 0 {
return false
}
// In-tree storage to CSI driver migration feature should be enabled,
// along with the plugin-specific one
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
return false
}
switch pluginName {
case csilibplugins.AWSEBSInTreePluginName:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS) {
return false
}
case csilibplugins.GCEPDInTreePluginName:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) {
return false
}
case csilibplugins.AzureDiskInTreePluginName:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) {
return false
}
case csilibplugins.CinderInTreePluginName:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack) {
return false
}
default:
return false
}
// The plugin name should be listed in the CSINode object annotation.
// This indicates that the plugin has been migrated to a CSI driver in the node.
csiNodeAnn := csiNode.GetAnnotations()
if csiNodeAnn == nil {
return false
}
var mpaSet sets.String
mpa := csiNodeAnn[v1.MigratedPluginsAnnotationKey]
if len(mpa) == 0 {
mpaSet = sets.NewString()
} else {
tok := strings.Split(mpa, ",")
mpaSet = sets.NewString(tok...)
}
return mpaSet.Has(pluginName)
}