Add predicate to find volume matches
This commit is contained in:
@@ -50,7 +50,8 @@ var (
|
||||
ErrNodeNetworkUnavailable = newPredicateFailureError("NodeNetworkUnavailable")
|
||||
ErrNodeUnschedulable = newPredicateFailureError("NodeUnschedulable")
|
||||
ErrNodeUnknownCondition = newPredicateFailureError("NodeUnknownCondition")
|
||||
ErrVolumeNodeConflict = newPredicateFailureError("NoVolumeNodeConflict")
|
||||
ErrVolumeNodeConflict = newPredicateFailureError("VolumeNodeAffinityConflict")
|
||||
ErrVolumeBindConflict = newPredicateFailureError("VolumeBindingNoMatch")
|
||||
// ErrFakePredicate is used for test only. The fake predicates returning false also returns error
|
||||
// as ErrFakePredicate.
|
||||
ErrFakePredicate = newPredicateFailureError("FakePredicateError")
|
||||
|
@@ -24,12 +24,14 @@ import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
|
||||
@@ -41,13 +43,14 @@ import (
|
||||
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||
"k8s.io/metrics/pkg/client/clientset_generated/clientset"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder"
|
||||
)
|
||||
|
||||
const (
|
||||
MatchInterPodAffinity = "MatchInterPodAffinity"
|
||||
CheckVolumeBinding = "CheckVolumeBinding"
|
||||
|
||||
// DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
|
||||
// GCE instances can have up to 16 PD volumes attached.
|
||||
@@ -127,6 +130,19 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
type StorageClassInfo interface {
|
||||
GetStorageClassInfo(className string) (*storagev1.StorageClass, error)
|
||||
}
|
||||
|
||||
// CachedStorageClassInfo implements StorageClassInfo
|
||||
type CachedStorageClassInfo struct {
|
||||
storagelisters.StorageClassLister
|
||||
}
|
||||
|
||||
func (c *CachedStorageClassInfo) GetStorageClassInfo(className string) (*storagev1.StorageClass, error) {
|
||||
return c.Get(className)
|
||||
}
|
||||
|
||||
func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool {
|
||||
// fast path if there is no conflict checking targets.
|
||||
if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil && volume.ISCSI == nil {
|
||||
@@ -416,8 +432,9 @@ var AzureDiskVolumeFilter VolumeFilter = VolumeFilter{
|
||||
}
|
||||
|
||||
type VolumeZoneChecker struct {
|
||||
pvInfo PersistentVolumeInfo
|
||||
pvcInfo PersistentVolumeClaimInfo
|
||||
pvInfo PersistentVolumeInfo
|
||||
pvcInfo PersistentVolumeClaimInfo
|
||||
classInfo StorageClassInfo
|
||||
}
|
||||
|
||||
// NewVolumeZonePredicate evaluates if a pod can fit due to the volumes it requests, given
|
||||
@@ -434,10 +451,11 @@ type VolumeZoneChecker struct {
|
||||
// determining the zone of a volume during scheduling, and that is likely to
|
||||
// require calling out to the cloud provider. It seems that we are moving away
|
||||
// from inline volume declarations anyway.
|
||||
func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
|
||||
func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, classInfo StorageClassInfo) algorithm.FitPredicate {
|
||||
c := &VolumeZoneChecker{
|
||||
pvInfo: pvInfo,
|
||||
pvcInfo: pvcInfo,
|
||||
pvInfo: pvInfo,
|
||||
pvcInfo: pvcInfo,
|
||||
classInfo: classInfo,
|
||||
}
|
||||
return c.predicate
|
||||
}
|
||||
@@ -489,6 +507,21 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetad
|
||||
|
||||
pvName := pvc.Spec.VolumeName
|
||||
if pvName == "" {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||
scName := pvc.Spec.StorageClassName
|
||||
if scName != nil && len(*scName) > 0 {
|
||||
class, _ := c.classInfo.GetStorageClassInfo(*scName)
|
||||
if class != nil {
|
||||
if class.VolumeBindingMode == nil {
|
||||
return false, nil, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", scName)
|
||||
}
|
||||
if *class.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
|
||||
// Skip unbound volumes
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName)
|
||||
}
|
||||
|
||||
@@ -1403,33 +1436,30 @@ func CheckNodeConditionPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata,
|
||||
return len(reasons) == 0, reasons, nil
|
||||
}
|
||||
|
||||
type VolumeNodeChecker struct {
|
||||
pvInfo PersistentVolumeInfo
|
||||
pvcInfo PersistentVolumeClaimInfo
|
||||
client clientset.Interface
|
||||
type VolumeBindingChecker struct {
|
||||
binder *volumebinder.VolumeBinder
|
||||
}
|
||||
|
||||
// NewVolumeNodePredicate evaluates if a pod can fit due to the volumes it requests, given
|
||||
// that some volumes have node topology constraints, particularly when using Local PVs.
|
||||
// The requirement is that any pod that uses a PVC that is bound to a PV with topology constraints
|
||||
// must be scheduled to a node that satisfies the PV's topology labels.
|
||||
func NewVolumeNodePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, client clientset.Interface) algorithm.FitPredicate {
|
||||
c := &VolumeNodeChecker{
|
||||
pvInfo: pvInfo,
|
||||
pvcInfo: pvcInfo,
|
||||
client: client,
|
||||
// NewVolumeBindingPredicate evaluates if a pod can fit due to the volumes it requests,
|
||||
// for both bound and unbound PVCs.
|
||||
//
|
||||
// For PVCs that are bound, then it checks that the corresponding PV's node affinity is
|
||||
// satisfied by the given node.
|
||||
//
|
||||
// For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements
|
||||
// and that the PV node affinity is satisfied by the given node.
|
||||
//
|
||||
// The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound
|
||||
// PVCs can be matched with an available and node-compatible PV.
|
||||
func NewVolumeBindingPredicate(binder *volumebinder.VolumeBinder) algorithm.FitPredicate {
|
||||
c := &VolumeBindingChecker{
|
||||
binder: binder,
|
||||
}
|
||||
return c.predicate
|
||||
}
|
||||
|
||||
func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// If a pod doesn't have any volume attached to it, the predicate will always be true.
|
||||
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
|
||||
if len(pod.Spec.Volumes) == 0 {
|
||||
func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
@@ -1438,45 +1468,27 @@ func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetad
|
||||
return false, nil, fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Checking for prebound volumes with node affinity")
|
||||
namespace := pod.Namespace
|
||||
manifest := &(pod.Spec)
|
||||
for i := range manifest.Volumes {
|
||||
volume := &manifest.Volumes[i]
|
||||
if volume.PersistentVolumeClaim == nil {
|
||||
continue
|
||||
}
|
||||
pvcName := volume.PersistentVolumeClaim.ClaimName
|
||||
if pvcName == "" {
|
||||
return false, nil, fmt.Errorf("PersistentVolumeClaim had no name")
|
||||
}
|
||||
pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
if pvc == nil {
|
||||
return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
|
||||
}
|
||||
pvName := pvc.Spec.VolumeName
|
||||
if pvName == "" {
|
||||
return false, nil, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName)
|
||||
}
|
||||
|
||||
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
if pv == nil {
|
||||
return false, nil, fmt.Errorf("PersistentVolume not found: %q", pvName)
|
||||
}
|
||||
|
||||
err = volumeutil.CheckNodeAffinity(pv, node.Labels)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q node mismatch: %v", pod.Name, node.Name, pvName, err.Error())
|
||||
return false, []algorithm.PredicateFailureReason{ErrVolumeNodeConflict}, nil
|
||||
}
|
||||
glog.V(4).Infof("VolumeNode predicate allows node %q for pod %q due to volume %q", node.Name, pod.Name, pvName)
|
||||
unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node.Name)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
failReasons := []algorithm.PredicateFailureReason{}
|
||||
if !boundSatisfied {
|
||||
glog.V(5).Info("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
|
||||
failReasons = append(failReasons, ErrVolumeNodeConflict)
|
||||
}
|
||||
|
||||
if !unboundSatisfied {
|
||||
glog.V(5).Info("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
|
||||
failReasons = append(failReasons, ErrVolumeBindConflict)
|
||||
}
|
||||
|
||||
if len(failReasons) > 0 {
|
||||
return false, failReasons, nil
|
||||
}
|
||||
|
||||
// All volumes bound or matching PVs found for all unbound PVCs
|
||||
glog.V(5).Info("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
|
||||
return true, nil, nil
|
||||
}
|
||||
|
@@ -24,8 +24,10 @@ import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
@@ -74,6 +76,17 @@ func (pvs FakePersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.Pe
|
||||
return nil, fmt.Errorf("Unable to find persistent volume: %s", pvID)
|
||||
}
|
||||
|
||||
type FakeStorageClassInfo []storagev1.StorageClass
|
||||
|
||||
func (classes FakeStorageClassInfo) GetStorageClassInfo(name string) (*storagev1.StorageClass, error) {
|
||||
for _, sc := range classes {
|
||||
if sc.Name == name {
|
||||
return &sc, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("Unable to find storage class: %s", name)
|
||||
}
|
||||
|
||||
var (
|
||||
opaqueResourceA = v1helper.OpaqueIntResourceName("AAA")
|
||||
opaqueResourceB = v1helper.OpaqueIntResourceName("BBB")
|
||||
@@ -3834,7 +3847,7 @@ func TestVolumeZonePredicate(t *testing.T) {
|
||||
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrVolumeZoneConflict}
|
||||
|
||||
for _, test := range tests {
|
||||
fit := NewVolumeZonePredicate(pvInfo, pvcInfo)
|
||||
fit := NewVolumeZonePredicate(pvInfo, pvcInfo, nil)
|
||||
node := &schedulercache.NodeInfo{}
|
||||
node.SetNode(test.Node)
|
||||
|
||||
@@ -3927,7 +3940,7 @@ func TestVolumeZonePredicateMultiZone(t *testing.T) {
|
||||
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrVolumeZoneConflict}
|
||||
|
||||
for _, test := range tests {
|
||||
fit := NewVolumeZonePredicate(pvInfo, pvcInfo)
|
||||
fit := NewVolumeZonePredicate(pvInfo, pvcInfo, nil)
|
||||
node := &schedulercache.NodeInfo{}
|
||||
node.SetNode(test.Node)
|
||||
|
||||
@@ -3945,6 +3958,130 @@ func TestVolumeZonePredicateMultiZone(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestVolumeZonePredicateWithVolumeBinding(t *testing.T) {
|
||||
var (
|
||||
modeWait = storagev1.VolumeBindingWaitForFirstConsumer
|
||||
|
||||
class0 = "Class_0"
|
||||
classWait = "Class_Wait"
|
||||
classImmediate = "Class_Immediate"
|
||||
)
|
||||
|
||||
classInfo := FakeStorageClassInfo{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: classImmediate},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: classWait},
|
||||
VolumeBindingMode: &modeWait,
|
||||
},
|
||||
}
|
||||
|
||||
pvInfo := FakePersistentVolumeInfo{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "Vol_1", Labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "us-west1-a"}},
|
||||
},
|
||||
}
|
||||
|
||||
pvcInfo := FakePersistentVolumeClaimInfo{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "PVC_1", Namespace: "default"},
|
||||
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "Vol_1"},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "PVC_NoSC", Namespace: "default"},
|
||||
Spec: v1.PersistentVolumeClaimSpec{StorageClassName: &class0},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "PVC_EmptySC", Namespace: "default"},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "PVC_WaitSC", Namespace: "default"},
|
||||
Spec: v1.PersistentVolumeClaimSpec{StorageClassName: &classWait},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "PVC_ImmediateSC", Namespace: "default"},
|
||||
Spec: v1.PersistentVolumeClaimSpec{StorageClassName: &classImmediate},
|
||||
},
|
||||
}
|
||||
|
||||
testNode := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "host1",
|
||||
Labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "us-west1-a", "uselessLabel": "none"},
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
Name string
|
||||
Pod *v1.Pod
|
||||
Fits bool
|
||||
Node *v1.Node
|
||||
ExpectFailure bool
|
||||
}{
|
||||
{
|
||||
Name: "label zone failure domain matched",
|
||||
Pod: createPodWithVolume("pod_1", "vol_1", "PVC_1"),
|
||||
Node: testNode,
|
||||
Fits: true,
|
||||
},
|
||||
{
|
||||
Name: "unbound volume empty storage class",
|
||||
Pod: createPodWithVolume("pod_1", "vol_1", "PVC_EmptySC"),
|
||||
Node: testNode,
|
||||
Fits: false,
|
||||
ExpectFailure: true,
|
||||
},
|
||||
{
|
||||
Name: "unbound volume no storage class",
|
||||
Pod: createPodWithVolume("pod_1", "vol_1", "PVC_NoSC"),
|
||||
Node: testNode,
|
||||
Fits: false,
|
||||
ExpectFailure: true,
|
||||
},
|
||||
{
|
||||
Name: "unbound volume immediate binding mode",
|
||||
Pod: createPodWithVolume("pod_1", "vol_1", "PVC_ImmediateSC"),
|
||||
Node: testNode,
|
||||
Fits: false,
|
||||
ExpectFailure: true,
|
||||
},
|
||||
{
|
||||
Name: "unbound volume wait binding mode",
|
||||
Pod: createPodWithVolume("pod_1", "vol_1", "PVC_WaitSC"),
|
||||
Node: testNode,
|
||||
Fits: true,
|
||||
},
|
||||
}
|
||||
|
||||
err := utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to enable feature gate for VolumeScheduling: %v", err)
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
fit := NewVolumeZonePredicate(pvInfo, pvcInfo, classInfo)
|
||||
node := &schedulercache.NodeInfo{}
|
||||
node.SetNode(test.Node)
|
||||
|
||||
fits, _, err := fit(test.Pod, nil, node)
|
||||
if !test.ExpectFailure && err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.Name, err)
|
||||
}
|
||||
if test.ExpectFailure && err == nil {
|
||||
t.Errorf("%s: expected error, got success", test.Name)
|
||||
}
|
||||
if fits != test.Fits {
|
||||
t.Errorf("%s: expected %v got %v", test.Name, test.Fits, fits)
|
||||
}
|
||||
}
|
||||
|
||||
err = utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to disable feature gate for VolumeScheduling: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMaxVols(t *testing.T) {
|
||||
previousValue := os.Getenv(KubeMaxPDVols)
|
||||
defaultValue := 39
|
||||
|
Reference in New Issue
Block a user