Add code to handle in-tree to CSI migration for resizing

This commit is contained in:
Hemant Kumar
2019-05-16 17:42:16 -04:00
parent 46a80259f6
commit 405d33eae4
6 changed files with 166 additions and 11 deletions

View File

@@ -1,6 +1,6 @@
package(default_visibility = ["//visibility:public"])
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@@ -10,6 +10,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/controller/volume/expand/cache:go_default_library",
@@ -27,14 +28,17 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@@ -54,3 +58,14 @@ filegroup(
],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["expand_controller_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
],
)

View File

@@ -30,14 +30,19 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
storageclassinformer "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
csitranslation "k8s.io/csi-translation-lib"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/util/mount"
@@ -72,6 +77,10 @@ type expandController struct {
pvLister corelisters.PersistentVolumeLister
pvSynced kcache.InformerSynced
// storageClass lister for fetching provisioner name
classLister storagelisters.StorageClassLister
classListerSynced cache.InformerSynced
// cloud provider used by volume host
cloud cloudprovider.Interface
@@ -90,17 +99,20 @@ func NewExpandController(
kubeClient clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
scInformer storageclassinformer.StorageClassInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin) (ExpandController, error) {
expc := &expandController{
kubeClient: kubeClient,
cloud: cloud,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
kubeClient: kubeClient,
cloud: cloud,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
classLister: scInformer.Lister(),
classListerSynced: scInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
}
if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
@@ -201,14 +213,29 @@ func (expc *expandController) syncHandler(key string) error {
klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
return err
}
if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc))
klog.V(4).Infof("%v", err)
return err
}
claimClass := v1helper.GetPersistentVolumeClaimClass(pvc)
if claimClass == "" {
klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc))
return nil
}
class, err := expc.classLister.Get(claimClass)
if err != nil {
klog.V(4).Infof("volume expansion is not supported for PVC: %s, can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass)
return nil
}
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
volumeResizerName := class.Provisioner
if err != nil || volumePlugin == nil {
msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
"waiting for an external controller to process this PVC")
@@ -221,11 +248,31 @@ func (expc *expandController) syncHandler(key string) error {
return err
}
return expc.expand(pvc, pv)
if volumePlugin.IsMigratedToCSI() {
msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName)
expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner)
if err != nil {
errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
return fmt.Errorf(errorMsg)
}
pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient)
if err != nil {
errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
return fmt.Errorf(errorMsg)
}
return nil
}
return expc.expand(pvc, pv, volumeResizerName)
}
func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
pvc, err := util.MarkResizeInProgress(pvc, expc.kubeClient)
func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
if err != nil {
klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err

View File

@@ -0,0 +1,47 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expand
import (
"testing"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/controller"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
)
func TestSyncHandler(t *testing.T) {
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
storageClassInformer := informerFactory.Storage().V1().StorageClasses()
expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, nil)
if err != nil {
t.Fatalf("error creating expand controller : %v", err)
}
var expController *expandController
expController, _ = expc.(*expandController)
err = expController.syncHandler("default/foo")
if err != nil {
t.Fatalf("error running sync handler : %v", err)
}
}