Use CSI translation lib in VolumeBinderPredicate

This commit is contained in:
Fabio Bertinatto
2019-11-01 12:06:40 +01:00
parent 4e77a27908
commit 7cba40fb09
6 changed files with 438 additions and 19 deletions

View File

@@ -19,24 +19,40 @@ package scheduling
import (
"fmt"
"sort"
"strings"
"time"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/etcd3"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
storagev1beta1informers "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1"
csitrans "k8s.io/csi-translation-lib"
csiplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/controller/volume/scheduling/metrics"
"k8s.io/kubernetes/pkg/features"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
// InTreeToCSITranslator contains methods required to check migratable status
// and perform translations from InTree PV's to CSI
type InTreeToCSITranslator interface {
IsPVMigratable(pv *v1.PersistentVolume) bool
GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
}
// SchedulerVolumeBinder is used by the scheduler to handle PVC/PV binding
// and dynamic provisioning. The binding decisions are integrated into the pod scheduling
// workflow so that the PV NodeAffinity is also considered along with the pod's other
@@ -103,9 +119,10 @@ type volumeBinder struct {
kubeClient clientset.Interface
classLister storagelisters.StorageClassLister
nodeInformer coreinformers.NodeInformer
pvcCache PVCAssumeCache
pvCache PVAssumeCache
nodeInformer coreinformers.NodeInformer
csiNodeInformer storagev1beta1informers.CSINodeInformer
pvcCache PVCAssumeCache
pvCache PVAssumeCache
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
@@ -113,12 +130,15 @@ type volumeBinder struct {
// Amount of time to wait for the bind operation to succeed
bindTimeout time.Duration
translator InTreeToCSITranslator
}
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
func NewVolumeBinder(
kubeClient clientset.Interface,
nodeInformer coreinformers.NodeInformer,
csiNodeInformer storagev1beta1informers.CSINodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
storageClassInformer storageinformers.StorageClassInformer,
@@ -128,10 +148,12 @@ func NewVolumeBinder(
kubeClient: kubeClient,
classLister: storageClassInformer.Lister(),
nodeInformer: nodeInformer,
csiNodeInformer: csiNodeInformer,
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()),
podBindingCache: NewPodBindingCache(),
bindTimeout: bindTimeout,
translator: csitrans.New(),
}
return b
@@ -457,6 +479,12 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err)
}
csiNode, err := b.csiNodeInformer.Lister().Get(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default
klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err)
}
// Check for any conditions that might require scheduling retry
// When pod is removed from scheduling queue because of deletion or any
@@ -485,6 +513,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
return false, nil
}
pv, err = b.tryTranslatePVToCSI(pv, csiNode)
if err != nil {
return false, fmt.Errorf("failed to translate pv to csi: %v", err)
}
// Check PV's node affinity (the node might not have the proper label)
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
@@ -538,6 +571,12 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
}
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
}
pv, err = b.tryTranslatePVToCSI(pv, csiNode)
if err != nil {
return false, err
}
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
}
@@ -641,6 +680,12 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
}
func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, podName string) (bool, error) {
csiNode, err := b.csiNodeInformer.Lister().Get(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default
klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err)
}
for _, pvc := range claims {
pvName := pvc.Spec.VolumeName
pv, err := b.pvCache.GetPV(pvName)
@@ -648,6 +693,11 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
return false, err
}
pv, err = b.tryTranslatePVToCSI(pv, csiNode)
if err != nil {
return false, err
}
err = volumeutil.CheckNodeAffinity(pv, node.Labels)
if err != nil {
klog.V(4).Infof("PersistentVolume %q, Node %q mismatch for Pod %q: %v", pvName, node.Name, podName, err)
@@ -783,3 +833,72 @@ func (a byPVCSize) Less(i, j int) bool {
func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}
// isCSIMigrationOnForPlugin checks if CSI migrartion is enabled for a given plugin.
func isCSIMigrationOnForPlugin(pluginName string) bool {
switch pluginName {
case csiplugins.AWSEBSInTreePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
case csiplugins.GCEPDInTreePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
case csiplugins.AzureDiskInTreePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk)
case csiplugins.CinderInTreePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack)
}
return false
}
// isPluginMigratedToCSIOnNode checks if an in-tree plugin has been migrated to a CSI driver on the node.
func isPluginMigratedToCSIOnNode(pluginName string, csiNode *storagev1beta1.CSINode) bool {
if csiNode == nil {
return false
}
csiNodeAnn := csiNode.GetAnnotations()
if csiNodeAnn == nil {
return false
}
var mpaSet sets.String
mpa := csiNodeAnn[v1.MigratedPluginsAnnotationKey]
if len(mpa) == 0 {
mpaSet = sets.NewString()
} else {
tok := strings.Split(mpa, ",")
mpaSet = sets.NewString(tok...)
}
return mpaSet.Has(pluginName)
}
// tryTranslatePVToCSI will translate the in-tree PV to CSI if it meets the criteria. If not, it returns the unmodified in-tree PV.
func (b *volumeBinder) tryTranslatePVToCSI(pv *v1.PersistentVolume, csiNode *storagev1beta1.CSINode) (*v1.PersistentVolume, error) {
if !b.translator.IsPVMigratable(pv) {
return pv, nil
}
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
return pv, nil
}
pluginName, err := b.translator.GetInTreePluginNameFromSpec(pv, nil)
if err != nil {
return nil, fmt.Errorf("could not get plugin name from pv: %v", err)
}
if !isCSIMigrationOnForPlugin(pluginName) {
return pv, nil
}
if !isPluginMigratedToCSIOnNode(pluginName, csiNode) {
return pv, nil
}
transPV, err := b.translator.TranslateInTreePVToCSI(pv)
if err != nil {
return nil, fmt.Errorf("could not translate pv: %v", err)
}
return transPV, nil
}