Merge pull request #83098 from ddebroy/disable-intree
CSI Migration phase 2: disable probing of in-tree plugins
This commit is contained in:
@@ -20,6 +20,7 @@ go_library(
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//pkg/volume/util/operationexecutor:go_default_library",
|
||||
"//pkg/volume/util/subpath:go_default_library",
|
||||
@@ -45,6 +46,7 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
],
|
||||
|
@@ -44,6 +44,7 @@ import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
csitrans "k8s.io/csi-translation-lib"
|
||||
"k8s.io/klog"
|
||||
utilexec "k8s.io/utils/exec"
|
||||
|
||||
@@ -56,6 +57,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
"k8s.io/kubernetes/pkg/volume/util/subpath"
|
||||
@@ -116,6 +118,7 @@ func NewAttachDetachController(
|
||||
disableReconciliationSync bool,
|
||||
reconcilerSyncDuration time.Duration,
|
||||
timerConfig TimerConfig) (AttachDetachController, error) {
|
||||
|
||||
adc := &attachDetachController{
|
||||
kubeClient: kubeClient,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
@@ -176,6 +179,10 @@ func NewAttachDetachController(
|
||||
adc.nodeStatusUpdater,
|
||||
recorder)
|
||||
|
||||
csiTranslator := csitrans.New()
|
||||
adc.intreeToCSITranslator = csiTranslator
|
||||
adc.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)
|
||||
|
||||
adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
|
||||
timerConfig.DesiredStateOfWorldPopulatorLoopSleepPeriod,
|
||||
timerConfig.DesiredStateOfWorldPopulatorListPodsRetryDuration,
|
||||
@@ -183,7 +190,9 @@ func NewAttachDetachController(
|
||||
adc.desiredStateOfWorld,
|
||||
&adc.volumePluginMgr,
|
||||
pvcInformer.Lister(),
|
||||
pvInformer.Lister())
|
||||
pvInformer.Lister(),
|
||||
adc.csiMigratedPluginManager,
|
||||
adc.intreeToCSITranslator)
|
||||
|
||||
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||
AddFunc: adc.podAdd,
|
||||
@@ -318,6 +327,12 @@ type attachDetachController struct {
|
||||
|
||||
// pvcQueue is used to queue pvc objects
|
||||
pvcQueue workqueue.RateLimitingInterface
|
||||
|
||||
// csiMigratedPluginManager detects in-tree plugins that have been migrated to CSI
|
||||
csiMigratedPluginManager csimigration.PluginManager
|
||||
|
||||
// intreeToCSITranslator translates from in-tree volume specs to CSI
|
||||
intreeToCSITranslator csimigration.InTreeToCSITranslator
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
||||
@@ -355,7 +370,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
||||
adc.podLister,
|
||||
adc.actualStateOfWorld,
|
||||
adc.desiredStateOfWorld,
|
||||
&adc.volumePluginMgr)
|
||||
&adc.volumePluginMgr,
|
||||
adc.csiMigratedPluginManager,
|
||||
adc.intreeToCSITranslator)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
@@ -421,10 +438,11 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error {
|
||||
podToAdd := pod
|
||||
adc.podAdd(podToAdd)
|
||||
for _, podVolume := range podToAdd.Spec.Volumes {
|
||||
nodeName := types.NodeName(podToAdd.Spec.NodeName)
|
||||
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
|
||||
// with the correct ones found on pods. The present in the ASW with no corresponding
|
||||
// pod will be detached and the spec is irrelevant.
|
||||
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
|
||||
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
|
||||
if err != nil {
|
||||
klog.Errorf(
|
||||
"Error creating spec for volume %q, pod %q/%q: %v",
|
||||
@@ -434,7 +452,6 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error {
|
||||
err)
|
||||
continue
|
||||
}
|
||||
nodeName := types.NodeName(podToAdd.Spec.NodeName)
|
||||
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
||||
if err != nil || plugin == nil {
|
||||
klog.V(10).Infof(
|
||||
@@ -488,7 +505,7 @@ func (adc *attachDetachController) podAdd(obj interface{}) {
|
||||
true /* default volume action */)
|
||||
|
||||
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
|
||||
}
|
||||
|
||||
// GetDesiredStateOfWorld returns desired state of world associated with controller
|
||||
@@ -512,7 +529,7 @@ func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {
|
||||
true /* default volume action */)
|
||||
|
||||
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) podDelete(obj interface{}) {
|
||||
@@ -522,7 +539,7 @@ func (adc *attachDetachController) podDelete(obj interface{}) {
|
||||
}
|
||||
|
||||
util.ProcessPodVolumes(pod, false, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) nodeAdd(obj interface{}) {
|
||||
@@ -640,7 +657,7 @@ func (adc *attachDetachController) syncPVCByKey(key string) error {
|
||||
true /* default volume action */)
|
||||
|
||||
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -52,7 +52,8 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
|
||||
nil, /* prober */
|
||||
false,
|
||||
5*time.Second,
|
||||
DefaultTimerConfig)
|
||||
DefaultTimerConfig,
|
||||
)
|
||||
|
||||
// Assert
|
||||
if err != nil {
|
||||
|
@@ -9,8 +9,10 @@ go_library(
|
||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach/util:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
|
||||
@@ -26,6 +28,7 @@ go_test(
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach/testing:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/testing:go_default_library",
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
@@ -34,6 +37,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
@@ -27,6 +28,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
@@ -60,14 +62,18 @@ func Register(pvcLister corelisters.PersistentVolumeClaimLister,
|
||||
podLister corelisters.PodLister,
|
||||
asw cache.ActualStateOfWorld,
|
||||
dsw cache.DesiredStateOfWorld,
|
||||
pluginMgr *volume.VolumePluginMgr) {
|
||||
pluginMgr *volume.VolumePluginMgr,
|
||||
csiMigratedPluginManager csimigration.PluginManager,
|
||||
intreeToCSITranslator csimigration.InTreeToCSITranslator) {
|
||||
registerMetrics.Do(func() {
|
||||
legacyregistry.CustomMustRegister(newAttachDetachStateCollector(pvcLister,
|
||||
podLister,
|
||||
pvLister,
|
||||
asw,
|
||||
dsw,
|
||||
pluginMgr))
|
||||
pluginMgr,
|
||||
csiMigratedPluginManager,
|
||||
intreeToCSITranslator))
|
||||
legacyregistry.MustRegister(forcedDetachMetricCounter)
|
||||
})
|
||||
}
|
||||
@@ -75,12 +81,14 @@ func Register(pvcLister corelisters.PersistentVolumeClaimLister,
|
||||
type attachDetachStateCollector struct {
|
||||
metrics.BaseStableCollector
|
||||
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
podLister corelisters.PodLister
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
asw cache.ActualStateOfWorld
|
||||
dsw cache.DesiredStateOfWorld
|
||||
volumePluginMgr *volume.VolumePluginMgr
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
podLister corelisters.PodLister
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
asw cache.ActualStateOfWorld
|
||||
dsw cache.DesiredStateOfWorld
|
||||
volumePluginMgr *volume.VolumePluginMgr
|
||||
csiMigratedPluginManager csimigration.PluginManager
|
||||
intreeToCSITranslator csimigration.InTreeToCSITranslator
|
||||
}
|
||||
|
||||
// volumeCount is a map of maps used as a counter, e.g.:
|
||||
@@ -105,8 +113,10 @@ func newAttachDetachStateCollector(
|
||||
pvLister corelisters.PersistentVolumeLister,
|
||||
asw cache.ActualStateOfWorld,
|
||||
dsw cache.DesiredStateOfWorld,
|
||||
pluginMgr *volume.VolumePluginMgr) *attachDetachStateCollector {
|
||||
return &attachDetachStateCollector{pvcLister: pvcLister, podLister: podLister, pvLister: pvLister, asw: asw, dsw: dsw, volumePluginMgr: pluginMgr}
|
||||
pluginMgr *volume.VolumePluginMgr,
|
||||
csiMigratedPluginManager csimigration.PluginManager,
|
||||
intreeToCSITranslator csimigration.InTreeToCSITranslator) *attachDetachStateCollector {
|
||||
return &attachDetachStateCollector{pvcLister: pvcLister, podLister: podLister, pvLister: pvLister, asw: asw, dsw: dsw, volumePluginMgr: pluginMgr, csiMigratedPluginManager: csiMigratedPluginManager, intreeToCSITranslator: intreeToCSITranslator}
|
||||
}
|
||||
|
||||
// Check if our collector implements necessary collector interface
|
||||
@@ -158,7 +168,7 @@ func (collector *attachDetachStateCollector) getVolumeInUseCount() volumeCount {
|
||||
continue
|
||||
}
|
||||
for _, podVolume := range pod.Spec.Volumes {
|
||||
volumeSpec, err := util.CreateVolumeSpec(podVolume, pod.Namespace, collector.pvcLister, collector.pvLister)
|
||||
volumeSpec, err := util.CreateVolumeSpec(podVolume, pod.Namespace, types.NodeName(pod.Spec.NodeName), collector.volumePluginMgr, collector.pvcLister, collector.pvLister, collector.csiMigratedPluginManager, collector.intreeToCSITranslator)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
@@ -25,9 +25,11 @@ import (
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
csitrans "k8s.io/csi-translation-lib"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
@@ -107,13 +109,16 @@ func TestVolumesInUseMetricCollection(t *testing.T) {
|
||||
pvcLister := pvcInformer.Lister()
|
||||
pvLister := pvInformer.Lister()
|
||||
|
||||
csiTranslator := csitrans.New()
|
||||
metricCollector := newAttachDetachStateCollector(
|
||||
pvcLister,
|
||||
fakePodInformer.Lister(),
|
||||
pvLister,
|
||||
nil,
|
||||
nil,
|
||||
fakeVolumePluginMgr)
|
||||
fakeVolumePluginMgr,
|
||||
csimigration.NewPluginManager(csiTranslator),
|
||||
csiTranslator)
|
||||
nodeUseMap := metricCollector.getVolumeInUseCount()
|
||||
if len(nodeUseMap) < 1 {
|
||||
t.Errorf("Expected one volume in use got %d", len(nodeUseMap))
|
||||
@@ -145,13 +150,16 @@ func TestTotalVolumesMetricCollection(t *testing.T) {
|
||||
}
|
||||
asw.AddVolumeNode(volumeName, volumeSpec, nodeName, "", true)
|
||||
|
||||
csiTranslator := csitrans.New()
|
||||
metricCollector := newAttachDetachStateCollector(
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
asw,
|
||||
dsw,
|
||||
fakeVolumePluginMgr)
|
||||
fakeVolumePluginMgr,
|
||||
csimigration.NewPluginManager(csiTranslator),
|
||||
csiTranslator)
|
||||
|
||||
totalVolumesMap := metricCollector.getTotalVolumesCount()
|
||||
if len(totalVolumesMap) != 2 {
|
||||
|
@@ -14,6 +14,7 @@ go_library(
|
||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach/util:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
@@ -45,6 +46,7 @@ go_test(
|
||||
deps = [
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/testing:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
@@ -52,5 +54,6 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
volutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
@@ -58,27 +59,33 @@ func NewDesiredStateOfWorldPopulator(
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld,
|
||||
volumePluginMgr *volume.VolumePluginMgr,
|
||||
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||
pvLister corelisters.PersistentVolumeLister) DesiredStateOfWorldPopulator {
|
||||
pvLister corelisters.PersistentVolumeLister,
|
||||
csiMigratedPluginManager csimigration.PluginManager,
|
||||
intreeToCSITranslator csimigration.InTreeToCSITranslator) DesiredStateOfWorldPopulator {
|
||||
return &desiredStateOfWorldPopulator{
|
||||
loopSleepDuration: loopSleepDuration,
|
||||
listPodsRetryDuration: listPodsRetryDuration,
|
||||
podLister: podLister,
|
||||
desiredStateOfWorld: desiredStateOfWorld,
|
||||
volumePluginMgr: volumePluginMgr,
|
||||
pvcLister: pvcLister,
|
||||
pvLister: pvLister,
|
||||
loopSleepDuration: loopSleepDuration,
|
||||
listPodsRetryDuration: listPodsRetryDuration,
|
||||
podLister: podLister,
|
||||
desiredStateOfWorld: desiredStateOfWorld,
|
||||
volumePluginMgr: volumePluginMgr,
|
||||
pvcLister: pvcLister,
|
||||
pvLister: pvLister,
|
||||
csiMigratedPluginManager: csiMigratedPluginManager,
|
||||
intreeToCSITranslator: intreeToCSITranslator,
|
||||
}
|
||||
}
|
||||
|
||||
type desiredStateOfWorldPopulator struct {
|
||||
loopSleepDuration time.Duration
|
||||
podLister corelisters.PodLister
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||
volumePluginMgr *volume.VolumePluginMgr
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
listPodsRetryDuration time.Duration
|
||||
timeOfLastListPods time.Time
|
||||
loopSleepDuration time.Duration
|
||||
podLister corelisters.PodLister
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||
volumePluginMgr *volume.VolumePluginMgr
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
listPodsRetryDuration time.Duration
|
||||
timeOfLastListPods time.Time
|
||||
csiMigratedPluginManager csimigration.PluginManager
|
||||
intreeToCSITranslator csimigration.InTreeToCSITranslator
|
||||
}
|
||||
|
||||
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
|
||||
@@ -163,7 +170,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {
|
||||
continue
|
||||
}
|
||||
util.ProcessPodVolumes(pod, true,
|
||||
dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister)
|
||||
dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister, dswp.csiMigratedPluginManager, dswp.intreeToCSITranslator)
|
||||
|
||||
}
|
||||
|
||||
|
@@ -25,8 +25,10 @@ import (
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
csitrans "k8s.io/csi-translation-lib"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
@@ -73,14 +75,17 @@ func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) {
|
||||
pvcLister := fakeInformerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||
pvLister := fakeInformerFactory.Core().V1().PersistentVolumes().Lister()
|
||||
|
||||
csiTranslator := csitrans.New()
|
||||
dswp := &desiredStateOfWorldPopulator{
|
||||
loopSleepDuration: 100 * time.Millisecond,
|
||||
listPodsRetryDuration: 3 * time.Second,
|
||||
desiredStateOfWorld: fakesDSW,
|
||||
volumePluginMgr: fakeVolumePluginMgr,
|
||||
podLister: fakePodInformer.Lister(),
|
||||
pvcLister: pvcLister,
|
||||
pvLister: pvLister,
|
||||
loopSleepDuration: 100 * time.Millisecond,
|
||||
listPodsRetryDuration: 3 * time.Second,
|
||||
desiredStateOfWorld: fakesDSW,
|
||||
volumePluginMgr: fakeVolumePluginMgr,
|
||||
podLister: fakePodInformer.Lister(),
|
||||
pvcLister: pvcLister,
|
||||
pvLister: pvLister,
|
||||
csiMigratedPluginManager: csimigration.NewPluginManager(csiTranslator),
|
||||
intreeToCSITranslator: csiTranslator,
|
||||
}
|
||||
|
||||
//add the given node to the list of nodes managed by dsw
|
||||
|
@@ -251,10 +251,6 @@ func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (plugin *TestPlugin) IsMigratedToCSI() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (plugin *TestPlugin) RequiresRemount() bool {
|
||||
return false
|
||||
}
|
||||
|
@@ -11,10 +11,14 @@ go_library(
|
||||
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util",
|
||||
deps = [
|
||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
|
@@ -17,20 +17,29 @@ limitations under the License.
|
||||
package util
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
// CreateVolumeSpec creates and returns a mutatable volume.Spec object for the
|
||||
// specified volume. It dereference any PVC to get PV objects, if needed.
|
||||
func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
|
||||
// A volume.Spec that refers to an in-tree plugin spec is translated to refer
|
||||
// to a migrated CSI plugin spec if all conditions for CSI migration on a node
|
||||
// for the in-tree plugin is satisfied.
|
||||
func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.NodeName, vpm *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) {
|
||||
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
|
||||
klog.V(10).Infof(
|
||||
"Found PVC, ClaimName: %q/%q",
|
||||
@@ -66,6 +75,15 @@ func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, pvcLister coreli
|
||||
err)
|
||||
}
|
||||
|
||||
volumeSpec, err = translateInTreeSpecToCSIIfNeeded(volumeSpec, nodeName, vpm, csiMigratedPluginManager, csiTranslator)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"error performing CSI migration checks and translation for PVC %q/%q: %v",
|
||||
podNamespace,
|
||||
pvcSource.ClaimName,
|
||||
err)
|
||||
}
|
||||
|
||||
klog.V(10).Infof(
|
||||
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
|
||||
volumeSpec.Name(),
|
||||
@@ -81,7 +99,15 @@ func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, pvcLister coreli
|
||||
// informer it may be mutated by another consumer.
|
||||
clonedPodVolume := podVolume.DeepCopy()
|
||||
|
||||
return volume.NewSpecFromVolume(clonedPodVolume), nil
|
||||
origspec := volume.NewSpecFromVolume(clonedPodVolume)
|
||||
spec, err := translateInTreeSpecToCSIIfNeeded(origspec, nodeName, vpm, csiMigratedPluginManager, csiTranslator)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"error performing CSI migration checks and translation for inline volume %q: %v",
|
||||
podVolume.Name,
|
||||
err)
|
||||
}
|
||||
return spec, nil
|
||||
}
|
||||
|
||||
// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
|
||||
@@ -160,7 +186,7 @@ func DetermineVolumeAction(pod *v1.Pod, desiredStateOfWorld cache.DesiredStateOf
|
||||
|
||||
// ProcessPodVolumes processes the volumes in the given pod and adds them to the
|
||||
// desired state of the world if addVolumes is true, otherwise it removes them.
|
||||
func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) {
|
||||
func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) {
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
@@ -193,7 +219,7 @@ func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.D
|
||||
|
||||
// Process volume spec for each volume defined in pod
|
||||
for _, podVolume := range pod.Spec.Volumes {
|
||||
volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, pvcLister, pvLister)
|
||||
volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, nodeName, volumePluginMgr, pvcLister, pvLister, csiMigratedPluginManager, csiTranslator)
|
||||
if err != nil {
|
||||
klog.V(10).Infof(
|
||||
"Error processing volume %q for pod %q/%q: %v",
|
||||
@@ -249,3 +275,110 @@ func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.D
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func translateInTreeSpecToCSIIfNeeded(spec *volume.Spec, nodeName types.NodeName, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) {
|
||||
translatedSpec := spec
|
||||
migratable, err := csiMigratedPluginManager.IsMigratable(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
migrationSupportedOnNode, err := isCSIMigrationSupportedOnNode(nodeName, spec, vpm, csiMigratedPluginManager)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if migratable && migrationSupportedOnNode {
|
||||
translatedSpec, err = csimigration.TranslateInTreeSpecToCSI(spec, csiTranslator)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return translatedSpec, nil
|
||||
}
|
||||
|
||||
func isCSIMigrationSupportedOnNode(nodeName types.NodeName, spec *volume.Spec, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager) (bool, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) ||
|
||||
!utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
|
||||
// If CSIMigration is disabled, CSI migration paths will not be taken for
|
||||
// the node. If CSINodeInfo is disabled, checking of installation status
|
||||
// of a migrated CSI plugin cannot be performed. Therefore stick to
|
||||
// in-tree plugins.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
pluginName, err := csiMigratedPluginManager.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(pluginName) == 0 {
|
||||
// Could not find a plugin name from translation directory, assume not translated
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if csiMigratedPluginManager.IsMigrationCompleteForPlugin(pluginName) {
|
||||
// All nodes are expected to have migrated CSI plugin installed and
|
||||
// configured when CSI Migration Complete flag is enabled for a plugin.
|
||||
// CSI migration is supported even if there is version skew between
|
||||
// managers and node.
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if len(nodeName) == 0 {
|
||||
return false, errors.New("nodeName is empty")
|
||||
}
|
||||
|
||||
kubeClient := vpm.Host.GetKubeClient()
|
||||
if kubeClient == nil {
|
||||
// Don't handle the controller/kubelet version skew check and fallback
|
||||
// to just checking the feature gates. This can happen if
|
||||
// we are in a standalone (headless) Kubelet
|
||||
return true, nil
|
||||
}
|
||||
|
||||
adcHost, ok := vpm.Host.(volume.AttachDetachVolumeHost)
|
||||
if !ok {
|
||||
// Don't handle the controller/kubelet version skew check and fallback
|
||||
// to just checking the feature gates. This can happen if
|
||||
// "enableControllerAttachDetach" is set to true on kubelet
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if adcHost.CSINodeLister() == nil {
|
||||
return false, errors.New("could not find CSINodeLister in attachDetachController")
|
||||
}
|
||||
|
||||
csiNode, err := adcHost.CSINodeLister().Get(string(nodeName))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
ann := csiNode.GetAnnotations()
|
||||
if ann == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
mpa := ann[v1.MigratedPluginsAnnotationKey]
|
||||
tok := strings.Split(mpa, ",")
|
||||
mpaSet := sets.NewString(tok...)
|
||||
|
||||
isMigratedOnNode := mpaSet.Has(pluginName)
|
||||
|
||||
if isMigratedOnNode {
|
||||
installed := false
|
||||
driverName, err := csiMigratedPluginManager.GetCSINameFromInTreeName(pluginName)
|
||||
if err != nil {
|
||||
return isMigratedOnNode, err
|
||||
}
|
||||
for _, driver := range csiNode.Spec.Drivers {
|
||||
if driver.Name == driverName {
|
||||
installed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !installed {
|
||||
return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
|
||||
}
|
||||
}
|
||||
|
||||
return isMigratedOnNode, nil
|
||||
}
|
||||
|
@@ -11,6 +11,7 @@ go_library(
|
||||
"//pkg/controller/volume/events:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//pkg/volume/util/operationexecutor:go_default_library",
|
||||
"//pkg/volume/util/subpath:go_default_library",
|
||||
@@ -63,6 +64,7 @@ go_test(
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/awsebs:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/util/operationexecutor:go_default_library",
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -47,6 +47,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/controller/volume/events"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
"k8s.io/kubernetes/pkg/volume/util/subpath"
|
||||
@@ -100,6 +101,8 @@ type expandController struct {
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
translator CSINameTranslator
|
||||
|
||||
csiMigratedPluginManager csimigration.PluginManager
|
||||
}
|
||||
|
||||
// NewExpandController expands the pvs
|
||||
@@ -110,19 +113,21 @@ func NewExpandController(
|
||||
scInformer storageclassinformer.StorageClassInformer,
|
||||
cloud cloudprovider.Interface,
|
||||
plugins []volume.VolumePlugin,
|
||||
translator CSINameTranslator) (ExpandController, error) {
|
||||
translator CSINameTranslator,
|
||||
csiMigratedPluginManager csimigration.PluginManager) (ExpandController, error) {
|
||||
|
||||
expc := &expandController{
|
||||
kubeClient: kubeClient,
|
||||
cloud: cloud,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||
pvLister: pvInformer.Lister(),
|
||||
pvSynced: pvInformer.Informer().HasSynced,
|
||||
classLister: scInformer.Lister(),
|
||||
classListerSynced: scInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
|
||||
translator: translator,
|
||||
kubeClient: kubeClient,
|
||||
cloud: cloud,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||
pvLister: pvInformer.Lister(),
|
||||
pvSynced: pvInformer.Informer().HasSynced,
|
||||
classLister: scInformer.Lister(),
|
||||
classListerSynced: scInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
|
||||
translator: translator,
|
||||
csiMigratedPluginManager: csiMigratedPluginManager,
|
||||
}
|
||||
|
||||
if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
|
||||
@@ -244,25 +249,15 @@ func (expc *expandController) syncHandler(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
|
||||
volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
|
||||
volumeResizerName := class.Provisioner
|
||||
|
||||
if err != nil || volumePlugin == nil {
|
||||
msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
|
||||
"waiting for an external controller to process this PVC")
|
||||
eventType := v1.EventTypeNormal
|
||||
if err != nil {
|
||||
eventType = v1.EventTypeWarning
|
||||
}
|
||||
expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
|
||||
klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
|
||||
// If we are expecting that an external plugin will handle resizing this volume then
|
||||
// is no point in requeuing this PVC.
|
||||
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
|
||||
migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("failed to check CSI migration status for PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if volumePlugin.IsMigratedToCSI() {
|
||||
// handle CSI migration scenarios before invoking FindExpandablePluginBySpec for in-tree
|
||||
if migratable {
|
||||
msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName)
|
||||
expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
|
||||
csiResizerName, err := expc.translator.GetCSINameFromInTreeName(class.Provisioner)
|
||||
@@ -281,6 +276,21 @@ func (expc *expandController) syncHandler(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
|
||||
if err != nil || volumePlugin == nil {
|
||||
msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
|
||||
"waiting for an external controller to process this PVC")
|
||||
eventType := v1.EventTypeNormal
|
||||
if err != nil {
|
||||
eventType = v1.EventTypeWarning
|
||||
}
|
||||
expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
|
||||
klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
|
||||
// If we are expecting that an external plugin will handle resizing this volume then
|
||||
// is no point in requeuing this PVC.
|
||||
return nil
|
||||
}
|
||||
|
||||
return expc.expand(pvc, pv, volumeResizerName)
|
||||
}
|
||||
|
||||
|
@@ -40,6 +40,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/awsebs"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
@@ -124,7 +125,8 @@ func TestSyncHandler(t *testing.T) {
|
||||
if tc.storageClass != nil {
|
||||
informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer().Add(tc.storageClass)
|
||||
}
|
||||
expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins, csitrans.New())
|
||||
translator := csitrans.New()
|
||||
expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins, translator, csimigration.NewPluginManager(translator))
|
||||
if err != nil {
|
||||
t.Fatalf("error creating expand controller : %v", err)
|
||||
}
|
||||
|
@@ -26,6 +26,7 @@ go_library(
|
||||
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csimigration:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//pkg/volume/util/recyclerclient:go_default_library",
|
||||
"//pkg/volume/util/subpath:go_default_library",
|
||||
|
@@ -527,16 +527,21 @@ func (t fakeCSINameTranslator) GetCSINameFromInTreeName(pluginName string) (stri
|
||||
return "vendor.com/MockCSIPlugin", nil
|
||||
}
|
||||
|
||||
type fakeCSIMigratedPluginManager struct{}
|
||||
|
||||
func (t fakeCSIMigratedPluginManager) IsMigrationEnabledForPlugin(pluginName string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// wrapTestWithCSIMigrationProvisionCalls returns a testCall that:
|
||||
// - configures controller with a volume plugin that emulates CSI migration
|
||||
// - calls given testCall
|
||||
func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall {
|
||||
plugin := &mockVolumePlugin{}
|
||||
return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
|
||||
plugin := &mockVolumePlugin{
|
||||
isMigratedToCSI: true,
|
||||
}
|
||||
ctrl.volumePluginMgr.InitPlugins([]vol.VolumePlugin{plugin}, nil /* prober */, ctrl)
|
||||
ctrl.translator = fakeCSINameTranslator{}
|
||||
ctrl.csiMigratedPluginManager = fakeCSIMigratedPluginManager{}
|
||||
return toWrap(ctrl, reactor, test)
|
||||
}
|
||||
}
|
||||
@@ -782,7 +787,6 @@ type mockVolumePlugin struct {
|
||||
deleteCallCounter int
|
||||
recycleCalls []error
|
||||
recycleCallCounter int
|
||||
isMigratedToCSI bool
|
||||
provisionOptions vol.VolumeOptions
|
||||
}
|
||||
|
||||
@@ -812,10 +816,6 @@ func (plugin *mockVolumePlugin) CanSupport(spec *vol.Spec) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (plugin *mockVolumePlugin) IsMigratedToCSI() bool {
|
||||
return plugin.isMigratedToCSI
|
||||
}
|
||||
|
||||
func (plugin *mockVolumePlugin) RequiresRemount() bool {
|
||||
return false
|
||||
}
|
||||
|
@@ -138,6 +138,11 @@ type CSINameTranslator interface {
|
||||
GetCSINameFromInTreeName(pluginName string) (string, error)
|
||||
}
|
||||
|
||||
// CSIMigratedPluginManager keeps track of CSI migration status of a plugin
|
||||
type CSIMigratedPluginManager interface {
|
||||
IsMigrationEnabledForPlugin(pluginName string) bool
|
||||
}
|
||||
|
||||
// PersistentVolumeController is a controller that synchronizes
|
||||
// PersistentVolumeClaims and PersistentVolumes. It starts two
|
||||
// cache.Controllers that watch PersistentVolume and PersistentVolumeClaim
|
||||
@@ -226,7 +231,8 @@ type PersistentVolumeController struct {
|
||||
// abort: N.A.
|
||||
operationTimestamps metrics.OperationStartTimeCache
|
||||
|
||||
translator CSINameTranslator
|
||||
translator CSINameTranslator
|
||||
csiMigratedPluginManager CSIMigratedPluginManager
|
||||
}
|
||||
|
||||
// syncClaim is the main controller method to decide what to do with a claim.
|
||||
@@ -1324,6 +1330,7 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum
|
||||
klog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
|
||||
opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
|
||||
plugin, storageClass, err := ctrl.findProvisionablePlugin(claim)
|
||||
// findProvisionablePlugin does not return err for external provisioners
|
||||
if err != nil {
|
||||
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())
|
||||
klog.Errorf("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)
|
||||
@@ -1338,8 +1345,8 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum
|
||||
claimKey := claimToClaimKey(claim)
|
||||
ctrl.operationTimestamps.AddIfNotExist(claimKey, ctrl.getProvisionerName(plugin, storageClass), "provision")
|
||||
var err error
|
||||
if plugin == nil || plugin.IsMigratedToCSI() {
|
||||
_, err = ctrl.provisionClaimOperationExternal(claim, plugin, storageClass)
|
||||
if plugin == nil {
|
||||
_, err = ctrl.provisionClaimOperationExternal(claim, storageClass)
|
||||
} else {
|
||||
_, err = ctrl.provisionClaimOperation(claim, plugin, storageClass)
|
||||
}
|
||||
@@ -1362,8 +1369,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(
|
||||
claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
|
||||
klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)
|
||||
|
||||
// called from provisionClaim(), in this case, plugin MUST NOT be nil and
|
||||
// plugin.IsMigratedToCSI() MUST return FALSE
|
||||
// called from provisionClaim(), in this case, plugin MUST NOT be nil
|
||||
// NOTE: checks on plugin/storageClass has been saved
|
||||
pluginName := plugin.GetPluginName()
|
||||
provisionerName := storageClass.Provisioner
|
||||
@@ -1553,15 +1559,14 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(
|
||||
// This method will be running in a standalone go-routine scheduled in "provisionClaim"
|
||||
func (ctrl *PersistentVolumeController) provisionClaimOperationExternal(
|
||||
claim *v1.PersistentVolumeClaim,
|
||||
plugin vol.ProvisionableVolumePlugin,
|
||||
storageClass *storage.StorageClass) (string, error) {
|
||||
claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
|
||||
klog.V(4).Infof("provisionClaimOperationExternal [%s] started, class: %q", claimToClaimKey(claim), claimClass)
|
||||
// Set provisionerName to external provisioner name by setClaimProvisioner
|
||||
var err error
|
||||
provisionerName := storageClass.Provisioner
|
||||
if plugin != nil {
|
||||
// update the provisioner name to use the CSI in-tree name
|
||||
if ctrl.csiMigratedPluginManager.IsMigrationEnabledForPlugin(storageClass.Provisioner) {
|
||||
// update the provisioner name to use the migrated CSI plugin name
|
||||
provisionerName, err = ctrl.translator.GetCSINameFromInTreeName(storageClass.Provisioner)
|
||||
if err != nil {
|
||||
strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err)
|
||||
@@ -1660,6 +1665,10 @@ func (ctrl *PersistentVolumeController) findProvisionablePlugin(claim *v1.Persis
|
||||
}
|
||||
|
||||
// Find a plugin for the class
|
||||
if ctrl.csiMigratedPluginManager.IsMigrationEnabledForPlugin(class.Provisioner) {
|
||||
// CSI migration scenario - do not depend on in-tree plugin
|
||||
return nil, class, nil
|
||||
}
|
||||
plugin, err := ctrl.volumePluginMgr.FindProvisionablePluginByName(class.Provisioner)
|
||||
if err != nil {
|
||||
if !strings.HasPrefix(class.Provisioner, "kubernetes.io/") {
|
||||
@@ -1708,7 +1717,7 @@ func (ctrl *PersistentVolumeController) getProvisionerNameFromVolume(volume *v1.
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
}
|
||||
if plugin != nil && !plugin.IsMigratedToCSI() {
|
||||
if plugin != nil {
|
||||
return plugin.GetPluginName()
|
||||
}
|
||||
// If reached here, Either an external provisioner was used for provisioning
|
||||
@@ -1722,22 +1731,25 @@ func (ctrl *PersistentVolumeController) getProvisionerNameFromVolume(volume *v1.
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
}
|
||||
if plugin != nil {
|
||||
if ctrl.csiMigratedPluginManager.IsMigrationEnabledForPlugin(class.Provisioner) {
|
||||
provisionerName, err := ctrl.translator.GetCSINameFromInTreeName(class.Provisioner)
|
||||
if err == nil {
|
||||
return provisionerName
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
}
|
||||
return provisionerName
|
||||
}
|
||||
return class.Provisioner
|
||||
}
|
||||
|
||||
// obtain plugin/external provisioner name from plugin and storage class
|
||||
// obtain plugin/external provisioner name from plugin and storage class for timestamp logging purposes
|
||||
func (ctrl *PersistentVolumeController) getProvisionerName(plugin vol.ProvisionableVolumePlugin, storageClass *storage.StorageClass) string {
|
||||
// intree plugin, returns the plugin's name
|
||||
if plugin != nil && !plugin.IsMigratedToCSI() {
|
||||
// non CSI-migrated in-tree plugin, returns the plugin's name
|
||||
if plugin != nil {
|
||||
return plugin.GetPluginName()
|
||||
} else if plugin != nil {
|
||||
// get the CSI in-tree name from storage class provisioner name
|
||||
}
|
||||
if ctrl.csiMigratedPluginManager.IsMigrationEnabledForPlugin(storageClass.Provisioner) {
|
||||
// get the name of the CSI plugin that the in-tree storage class
|
||||
// provisioner has migrated to
|
||||
provisionerName, err := ctrl.translator.GetCSINameFromInTreeName(storageClass.Provisioner)
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
|
@@ -44,6 +44,7 @@ import (
|
||||
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
||||
vol "k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
|
||||
"k8s.io/klog"
|
||||
)
|
||||
@@ -94,7 +95,6 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
|
||||
volumeQueue: workqueue.NewNamed("volumes"),
|
||||
resyncPeriod: p.SyncPeriod,
|
||||
operationTimestamps: metrics.NewOperationStartTimeCache(),
|
||||
translator: csitrans.New(),
|
||||
}
|
||||
|
||||
// Prober is nil because PV is not aware of Flexvolume.
|
||||
@@ -128,6 +128,11 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
|
||||
controller.podListerSynced = p.PodInformer.Informer().HasSynced
|
||||
controller.NodeLister = p.NodeInformer.Lister()
|
||||
controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
|
||||
|
||||
csiTranslator := csitrans.New()
|
||||
controller.translator = csiTranslator
|
||||
controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)
|
||||
|
||||
return controller, nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user