implement kubelet side online file system resize for volume
This commit is contained in:
		| @@ -99,6 +99,11 @@ const ( | ||||
| 	// Ability to Expand persistent volumes | ||||
| 	ExpandPersistentVolumes utilfeature.Feature = "ExpandPersistentVolumes" | ||||
|  | ||||
| 	// owner: @mlmhl | ||||
| 	// alpha: v1.11 | ||||
| 	// Ability to expand persistent volumes' file system without unmounting volumes. | ||||
| 	ExpandPersistentVolumesFSWithoutUnmounting utilfeature.Feature = "ExpandPersistentVolumesFSWithoutUnmounting" | ||||
|  | ||||
| 	// owner: @verb | ||||
| 	// alpha: v1.10 | ||||
| 	// | ||||
| @@ -328,28 +333,29 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS | ||||
| 	MountPropagation:                            {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	QOSReserved:                                 {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	ExpandPersistentVolumes:                     {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	CPUManager:                                  {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	ServiceNodeExclusion:                        {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	MountContainers:                             {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	VolumeScheduling:                            {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	CSIPersistentVolume:                         {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	CustomPodDNS:                                {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	BlockVolume:                                 {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	StorageObjectInUseProtection:                {Default: true, PreRelease: utilfeature.GA}, | ||||
| 	ResourceLimitsPriorityFunction:              {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	SupportIPVSProxyMode:                        {Default: true, PreRelease: utilfeature.GA}, | ||||
| 	SupportPodPidsLimit:                         {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	HyperVContainer:                             {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	ScheduleDaemonSetPods:                       {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	TokenRequest:                                {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	TokenRequestProjection:                      {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	CRIContainerLogRotation:                     {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	GCERegionalPersistentDisk:                   {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	RunAsGroup:                                  {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	VolumeSubpath:                               {Default: true, PreRelease: utilfeature.GA}, | ||||
| 	BalanceAttachedNodeVolumes:                  {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	DynamicProvisioningScheduling:               {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	VolumeSubpathEnvExpansion:                   {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	ExpandPersistentVolumesFSWithoutUnmounting:  {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	CPUManager:                     {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	ServiceNodeExclusion:           {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	MountContainers:                {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	VolumeScheduling:               {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	CSIPersistentVolume:            {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	CustomPodDNS:                   {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	BlockVolume:                    {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	StorageObjectInUseProtection:   {Default: true, PreRelease: utilfeature.GA}, | ||||
| 	ResourceLimitsPriorityFunction: {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	SupportIPVSProxyMode:           {Default: true, PreRelease: utilfeature.GA}, | ||||
| 	SupportPodPidsLimit:            {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	HyperVContainer:                {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	ScheduleDaemonSetPods:          {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	TokenRequest:                   {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	TokenRequestProjection:         {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	CRIContainerLogRotation:        {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	GCERegionalPersistentDisk:      {Default: true, PreRelease: utilfeature.Beta}, | ||||
| 	RunAsGroup:                     {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	VolumeSubpath:                  {Default: true, PreRelease: utilfeature.GA}, | ||||
| 	BalanceAttachedNodeVolumes:     {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	DynamicProvisioningScheduling:  {Default: false, PreRelease: utilfeature.Alpha}, | ||||
| 	VolumeSubpathEnvExpansion:      {Default: false, PreRelease: utilfeature.Alpha}, | ||||
|  | ||||
| 	// inherited features from generic apiserver, relisted here to get a conflict if it is changed | ||||
| 	// unintentionally on either side: | ||||
|   | ||||
							
								
								
									
										2
									
								
								pkg/kubelet/volumemanager/cache/BUILD
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								pkg/kubelet/volumemanager/cache/BUILD
									
									
									
									
										vendored
									
									
								
							| @@ -14,6 +14,7 @@ go_library( | ||||
|     ], | ||||
|     importpath = "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache", | ||||
|     deps = [ | ||||
|         "//pkg/features:go_default_library", | ||||
|         "//pkg/volume:go_default_library", | ||||
|         "//pkg/volume/util:go_default_library", | ||||
|         "//pkg/volume/util/operationexecutor:go_default_library", | ||||
| @@ -21,6 +22,7 @@ go_library( | ||||
|         "//vendor/github.com/golang/glog:go_default_library", | ||||
|         "//vendor/k8s.io/api/core/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", | ||||
|         "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", | ||||
|     ], | ||||
| ) | ||||
|  | ||||
|   | ||||
| @@ -28,6 +28,8 @@ import ( | ||||
|  | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| 	"k8s.io/kubernetes/pkg/volume/util" | ||||
| 	"k8s.io/kubernetes/pkg/volume/util/operationexecutor" | ||||
| @@ -148,6 +150,11 @@ type ActualStateOfWorld interface { | ||||
| 	// with pod's unique name. This map can be used to determine which pod is currently | ||||
| 	// in actual state of world. | ||||
| 	GetPods() map[volumetypes.UniquePodName]bool | ||||
|  | ||||
| 	// MarkFSResizeRequired marks each volume that is successfully attached and | ||||
| 	// mounted for the specified pod as requiring file system resize (if the plugin for the | ||||
| 	// volume indicates it requires file system resize). | ||||
| 	MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) | ||||
| } | ||||
|  | ||||
| // MountedVolume represents a volume that has successfully been mounted to a pod. | ||||
| @@ -291,6 +298,10 @@ type mountedPod struct { | ||||
|  | ||||
| 	// volumeGidValue contains the value of the GID annotation, if present. | ||||
| 	volumeGidValue string | ||||
|  | ||||
| 	// fsResizeRequired indicates the underlying volume has been successfully | ||||
| 	// mounted to this pod but its size has been expanded after that. | ||||
| 	fsResizeRequired bool | ||||
| } | ||||
|  | ||||
| func (asw *actualStateOfWorld) MarkVolumeAsAttached( | ||||
| @@ -444,6 +455,34 @@ func (asw *actualStateOfWorld) AddPodToVolume( | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (asw *actualStateOfWorld) MarkVolumeAsResized( | ||||
| 	podName volumetypes.UniquePodName, | ||||
| 	volumeName v1.UniqueVolumeName) error { | ||||
| 	asw.Lock() | ||||
| 	defer asw.Unlock() | ||||
|  | ||||
| 	volumeObj, volumeExists := asw.attachedVolumes[volumeName] | ||||
| 	if !volumeExists { | ||||
| 		return fmt.Errorf( | ||||
| 			"no volume with the name %q exists in the list of attached volumes", | ||||
| 			volumeName) | ||||
| 	} | ||||
|  | ||||
| 	podObj, podExists := volumeObj.mountedPods[podName] | ||||
| 	if !podExists { | ||||
| 		return fmt.Errorf( | ||||
| 			"no pod with the name %q exists in the mounted pods list of volume %s", | ||||
| 			podName, | ||||
| 			volumeName) | ||||
| 	} | ||||
|  | ||||
| 	glog.V(5).Infof("Volume %s(OuterVolumeSpecName %s) of pod %s has been resized", | ||||
| 		volumeName, podObj.outerVolumeSpecName, podName) | ||||
| 	podObj.fsResizeRequired = false | ||||
| 	asw.attachedVolumes[volumeName].mountedPods[podName] = podObj | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (asw *actualStateOfWorld) MarkRemountRequired( | ||||
| 	podName volumetypes.UniquePodName) { | ||||
| 	asw.Lock() | ||||
| @@ -475,6 +514,46 @@ func (asw *actualStateOfWorld) MarkRemountRequired( | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (asw *actualStateOfWorld) MarkFSResizeRequired( | ||||
| 	volumeName v1.UniqueVolumeName, | ||||
| 	podName volumetypes.UniquePodName) { | ||||
| 	asw.Lock() | ||||
| 	defer asw.Unlock() | ||||
| 	volumeObj, exist := asw.attachedVolumes[volumeName] | ||||
| 	if !exist { | ||||
| 		glog.Warningf("MarkFSResizeRequired for volume %s failed as volume not exist", volumeName) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	podObj, exist := volumeObj.mountedPods[podName] | ||||
| 	if !exist { | ||||
| 		glog.Warningf("MarkFSResizeRequired for volume %s failed "+ | ||||
| 			"as pod(%s) not exist", volumeName, podName) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	volumePlugin, err := | ||||
| 		asw.volumePluginMgr.FindExpandablePluginBySpec(podObj.volumeSpec) | ||||
| 	if err != nil || volumePlugin == nil { | ||||
| 		// Log and continue processing | ||||
| 		glog.Errorf( | ||||
| 			"MarkFSResizeRequired failed to find expandable plugin for pod %q volume: %q (volSpecName: %q)", | ||||
| 			podObj.podName, | ||||
| 			volumeObj.volumeName, | ||||
| 			podObj.volumeSpec.Name()) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if volumePlugin.RequiresFSResize() { | ||||
| 		if !podObj.fsResizeRequired { | ||||
| 			glog.V(3).Infof("PVC volume %s(OuterVolumeSpecName %s) of pod %s requires file system resize", | ||||
| 				volumeName, podObj.outerVolumeSpecName, podName) | ||||
| 			podObj.fsResizeRequired = true | ||||
| 		} | ||||
| 		asw.attachedVolumes[volumeName].mountedPods[podName] = podObj | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (asw *actualStateOfWorld) SetVolumeGloballyMounted( | ||||
| 	volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error { | ||||
| 	asw.Lock() | ||||
| @@ -546,8 +625,14 @@ func (asw *actualStateOfWorld) PodExistsInVolume( | ||||
| 	} | ||||
|  | ||||
| 	podObj, podExists := volumeObj.mountedPods[podName] | ||||
| 	if podExists && podObj.remountRequired { | ||||
| 		return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) | ||||
| 	if podExists { | ||||
| 		if podObj.remountRequired { | ||||
| 			return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) | ||||
| 		} | ||||
| 		if podObj.fsResizeRequired && | ||||
| 			utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { | ||||
| 			return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return podExists, volumeObj.devicePath, nil | ||||
| @@ -716,6 +801,35 @@ func newRemountRequiredError( | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // fsResizeRequiredError is an error returned when PodExistsInVolume() found | ||||
| // volume/pod attached/mounted but fsResizeRequired was true, indicating the | ||||
| // given volume receives an resize request after attached/mounted. | ||||
| type fsResizeRequiredError struct { | ||||
| 	volumeName v1.UniqueVolumeName | ||||
| 	podName    volumetypes.UniquePodName | ||||
| } | ||||
|  | ||||
| func (err fsResizeRequiredError) Error() string { | ||||
| 	return fmt.Sprintf( | ||||
| 		"volumeName %q mounted to %q needs to resize file system", | ||||
| 		err.volumeName, err.podName) | ||||
| } | ||||
|  | ||||
| func newFsResizeRequiredError( | ||||
| 	volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) error { | ||||
| 	return fsResizeRequiredError{ | ||||
| 		volumeName: volumeName, | ||||
| 		podName:    podName, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // IsFSResizeRequiredError returns true if the specified error is a | ||||
| // fsResizeRequiredError. | ||||
| func IsFSResizeRequiredError(err error) bool { | ||||
| 	_, ok := err.(fsResizeRequiredError) | ||||
| 	return ok | ||||
| } | ||||
|  | ||||
| // getMountedVolume constructs and returns a MountedVolume object from the given | ||||
| // mountedPod and attachedVolume objects. | ||||
| func getMountedVolume( | ||||
|   | ||||
| @@ -25,6 +25,7 @@ go_library( | ||||
|         "//vendor/k8s.io/api/core/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", | ||||
|         "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", | ||||
|         "//vendor/k8s.io/client-go/kubernetes:go_default_library", | ||||
| @@ -49,6 +50,7 @@ go_test( | ||||
|     srcs = ["desired_state_of_world_populator_test.go"], | ||||
|     embed = [":go_default_library"], | ||||
|     deps = [ | ||||
|         "//pkg/features:go_default_library", | ||||
|         "//pkg/kubelet/configmap:go_default_library", | ||||
|         "//pkg/kubelet/container/testing:go_default_library", | ||||
|         "//pkg/kubelet/pod:go_default_library", | ||||
| @@ -61,6 +63,7 @@ go_test( | ||||
|         "//pkg/volume/util:go_default_library", | ||||
|         "//pkg/volume/util/types:go_default_library", | ||||
|         "//vendor/k8s.io/api/core/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", | ||||
|   | ||||
| @@ -30,6 +30,7 @@ import ( | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| @@ -182,12 +183,26 @@ func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool { | ||||
| // Iterate through all pods and add to desired state of world if they don't | ||||
| // exist but should | ||||
| func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { | ||||
| 	// Map unique pod name to outer volume name to MountedVolume. | ||||
| 	mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume) | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { | ||||
| 		for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() { | ||||
| 			mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName] | ||||
| 			if !exist { | ||||
| 				mountedVolumes = make(map[string]cache.MountedVolume) | ||||
| 				mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes | ||||
| 			} | ||||
| 			mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	processedVolumesForFSResize := sets.NewString() | ||||
| 	for _, pod := range dswp.podManager.GetPods() { | ||||
| 		if dswp.isPodTerminated(pod) { | ||||
| 			// Do not (re)add volumes for terminated pods | ||||
| 			continue | ||||
| 		} | ||||
| 		dswp.processPodVolumes(pod) | ||||
| 		dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -259,7 +274,10 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { | ||||
|  | ||||
| // processPodVolumes processes the volumes in the given pod and adds them to the | ||||
| // desired state of the world. | ||||
| func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { | ||||
| func (dswp *desiredStateOfWorldPopulator) processPodVolumes( | ||||
| 	pod *v1.Pod, | ||||
| 	mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, | ||||
| 	processedVolumesForFSResize sets.String) { | ||||
| 	if pod == nil { | ||||
| 		return | ||||
| 	} | ||||
| @@ -274,7 +292,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { | ||||
|  | ||||
| 	// Process volume spec for each volume defined in pod | ||||
| 	for _, podVolume := range pod.Spec.Volumes { | ||||
| 		volumeSpec, volumeGidValue, err := | ||||
| 		pvc, volumeSpec, volumeGidValue, err := | ||||
| 			dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mountsMap, devicesMap) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf( | ||||
| @@ -304,6 +322,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { | ||||
| 			podVolume.Name, | ||||
| 			volumeSpec.Name(), | ||||
| 			uniquePodName) | ||||
|  | ||||
| 		if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { | ||||
| 			dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, | ||||
| 				uniquePodName, mountedVolumesForPod, processedVolumesForFSResize) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// some of the volume additions may have failed, should not mark this pod as fully processed | ||||
| @@ -316,6 +339,106 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { | ||||
|  | ||||
| } | ||||
|  | ||||
| // checkVolumeFSResize checks whether a PVC mounted by the pod requires file | ||||
| // system resize or not. If so, marks this volume as fsResizeRequired in ASW. | ||||
| // - mountedVolumesForPod stores all mounted volumes in ASW, because online | ||||
| //   volume resize only considers mounted volumes. | ||||
| // - processedVolumesForFSResize stores all volumes we have checked in current loop, | ||||
| //   because file system resize operation is a global operation for volume, so | ||||
| //   we only need to check it once if more than one pod use it. | ||||
| func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( | ||||
| 	pod *v1.Pod, | ||||
| 	podVolume v1.Volume, | ||||
| 	pvc *v1.PersistentVolumeClaim, | ||||
| 	volumeSpec *volume.Spec, | ||||
| 	uniquePodName volumetypes.UniquePodName, | ||||
| 	mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, | ||||
| 	processedVolumesForFSResize sets.String) { | ||||
| 	if podVolume.PersistentVolumeClaim == nil { | ||||
| 		// Only PVC supports resize operation. | ||||
| 		return | ||||
| 	} | ||||
| 	uniqueVolumeName, exist := getUniqueVolumeName(uniquePodName, podVolume.Name, mountedVolumesForPod) | ||||
| 	if !exist { | ||||
| 		// Volume not exist in ASW, we assume it hasn't been mounted yet. If it needs resize, | ||||
| 		// it will be handled as offline resize(if it indeed hasn't been mounted yet), | ||||
| 		// or online resize in subsequent loop(after we confirm it has been mounted). | ||||
| 		return | ||||
| 	} | ||||
| 	fsVolume, err := util.CheckVolumeModeFilesystem(volumeSpec) | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("Check volume mode failed for volume %s(OuterVolumeSpecName %s): %v", | ||||
| 			uniqueVolumeName, podVolume.Name, err) | ||||
| 		return | ||||
| 	} | ||||
| 	if !fsVolume { | ||||
| 		glog.V(5).Infof("Block mode volume needn't to check file system resize request") | ||||
| 		return | ||||
| 	} | ||||
| 	if processedVolumesForFSResize.Has(string(uniqueVolumeName)) { | ||||
| 		// File system resize operation is a global operation for volume, | ||||
| 		// so we only need to check it once if more than one pod use it. | ||||
| 		return | ||||
| 	} | ||||
| 	if mountedReadOnlyByPod(podVolume, pod) { | ||||
| 		// This volume is used as read only by this pod, we don't perform resize for read only volumes. | ||||
| 		glog.V(5).Infof("Skip file system resize check for volume %s in pod %s/%s "+ | ||||
| 			"as the volume is mounted as readonly", podVolume.Name, pod.Namespace, pod.Name) | ||||
| 		return | ||||
| 	} | ||||
| 	if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) { | ||||
| 		dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName) | ||||
| 	} | ||||
| 	processedVolumesForFSResize.Insert(string(uniqueVolumeName)) | ||||
| } | ||||
|  | ||||
| func mountedReadOnlyByPod(podVolume v1.Volume, pod *v1.Pod) bool { | ||||
| 	if podVolume.PersistentVolumeClaim.ReadOnly { | ||||
| 		return true | ||||
| 	} | ||||
| 	for _, container := range pod.Spec.InitContainers { | ||||
| 		if !mountedReadOnlyByContainer(podVolume.Name, &container) { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 	for _, container := range pod.Spec.Containers { | ||||
| 		if !mountedReadOnlyByContainer(podVolume.Name, &container) { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func mountedReadOnlyByContainer(volumeName string, container *v1.Container) bool { | ||||
| 	for _, volumeMount := range container.VolumeMounts { | ||||
| 		if volumeMount.Name == volumeName && !volumeMount.ReadOnly { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func getUniqueVolumeName( | ||||
| 	podName volumetypes.UniquePodName, | ||||
| 	outerVolumeSpecName string, | ||||
| 	mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) (v1.UniqueVolumeName, bool) { | ||||
| 	mountedVolumes, exist := mountedVolumesForPod[podName] | ||||
| 	if !exist { | ||||
| 		return "", false | ||||
| 	} | ||||
| 	mountedVolume, exist := mountedVolumes[outerVolumeSpecName] | ||||
| 	if !exist { | ||||
| 		return "", false | ||||
| 	} | ||||
| 	return mountedVolume.VolumeName, true | ||||
| } | ||||
|  | ||||
| func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { | ||||
| 	capacity := pvc.Status.Capacity[v1.ResourceStorage] | ||||
| 	requested := pv.Spec.Capacity[v1.ResourceStorage] | ||||
| 	return requested.Cmp(capacity) > 0 | ||||
| } | ||||
|  | ||||
| // podPreviouslyProcessed returns true if the volumes for this pod have already | ||||
| // been processed by the populator | ||||
| func (dswp *desiredStateOfWorldPopulator) podPreviouslyProcessed( | ||||
| @@ -350,7 +473,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod( | ||||
| // specified volume. It dereference any PVC to get PV objects, if needed. | ||||
| // Returns an error if unable to obtain the volume at this time. | ||||
| func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( | ||||
| 	podVolume v1.Volume, podName string, podNamespace string, mountsMap map[string]bool, devicesMap map[string]bool) (*volume.Spec, string, error) { | ||||
| 	podVolume v1.Volume, podName string, podNamespace string, mountsMap map[string]bool, devicesMap map[string]bool) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) { | ||||
| 	if pvcSource := | ||||
| 		podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { | ||||
| 		glog.V(5).Infof( | ||||
| @@ -359,15 +482,16 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( | ||||
| 			pvcSource.ClaimName) | ||||
|  | ||||
| 		// If podVolume is a PVC, fetch the real PV behind the claim | ||||
| 		pvName, pvcUID, err := dswp.getPVCExtractPV( | ||||
| 		pvc, err := dswp.getPVCExtractPV( | ||||
| 			podNamespace, pvcSource.ClaimName) | ||||
| 		if err != nil { | ||||
| 			return nil, "", fmt.Errorf( | ||||
| 			return nil, nil, "", fmt.Errorf( | ||||
| 				"error processing PVC %q/%q: %v", | ||||
| 				podNamespace, | ||||
| 				pvcSource.ClaimName, | ||||
| 				err) | ||||
| 		} | ||||
| 		pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID | ||||
|  | ||||
| 		glog.V(5).Infof( | ||||
| 			"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q", | ||||
| @@ -380,7 +504,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( | ||||
| 		volumeSpec, volumeGidValue, err := | ||||
| 			dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID) | ||||
| 		if err != nil { | ||||
| 			return nil, "", fmt.Errorf( | ||||
| 			return nil, nil, "", fmt.Errorf( | ||||
| 				"error processing PVC %q/%q: %v", | ||||
| 				podNamespace, | ||||
| 				pvcSource.ClaimName, | ||||
| @@ -399,11 +523,11 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( | ||||
| 		if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { | ||||
| 			volumeMode, err := util.GetVolumeMode(volumeSpec) | ||||
| 			if err != nil { | ||||
| 				return nil, "", err | ||||
| 				return nil, nil, "", err | ||||
| 			} | ||||
| 			// Error if a container has volumeMounts but the volumeMode of PVC isn't Filesystem | ||||
| 			if mountsMap[podVolume.Name] && volumeMode != v1.PersistentVolumeFilesystem { | ||||
| 				return nil, "", fmt.Errorf( | ||||
| 				return nil, nil, "", fmt.Errorf( | ||||
| 					"Volume %q has volumeMode %q, but is specified in volumeMounts for pod %q/%q", | ||||
| 					podVolume.Name, | ||||
| 					volumeMode, | ||||
| @@ -412,7 +536,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( | ||||
| 			} | ||||
| 			// Error if a container has volumeDevices but the volumeMode of PVC isn't Block | ||||
| 			if devicesMap[podVolume.Name] && volumeMode != v1.PersistentVolumeBlock { | ||||
| 				return nil, "", fmt.Errorf( | ||||
| 				return nil, nil, "", fmt.Errorf( | ||||
| 					"Volume %q has volumeMode %q, but is specified in volumeDevices for pod %q/%q", | ||||
| 					podVolume.Name, | ||||
| 					volumeMode, | ||||
| @@ -420,13 +544,13 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( | ||||
| 					podName) | ||||
| 			} | ||||
| 		} | ||||
| 		return volumeSpec, volumeGidValue, nil | ||||
| 		return pvc, volumeSpec, volumeGidValue, nil | ||||
| 	} | ||||
|  | ||||
| 	// Do not return the original volume object, since the source could mutate it | ||||
| 	clonedPodVolume := podVolume.DeepCopy() | ||||
|  | ||||
| 	return volume.NewSpecFromVolume(clonedPodVolume), "", nil | ||||
| 	return nil, volume.NewSpecFromVolume(clonedPodVolume), "", nil | ||||
| } | ||||
|  | ||||
| // getPVCExtractPV fetches the PVC object with the given namespace and name from | ||||
| @@ -434,11 +558,11 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( | ||||
| // it is pointing to and returns it. | ||||
| // An error is returned if the PVC object's phase is not "Bound". | ||||
| func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( | ||||
| 	namespace string, claimName string) (string, types.UID, error) { | ||||
| 	namespace string, claimName string) (*v1.PersistentVolumeClaim, error) { | ||||
| 	pvc, err := | ||||
| 		dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{}) | ||||
| 	if err != nil || pvc == nil { | ||||
| 		return "", "", fmt.Errorf( | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"failed to fetch PVC %s/%s from API server. err=%v", | ||||
| 			namespace, | ||||
| 			claimName, | ||||
| @@ -455,7 +579,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( | ||||
| 		// It should happen only in very rare case when scheduler schedules | ||||
| 		// a pod and user deletes a PVC that's used by it at the same time. | ||||
| 		if pvc.ObjectMeta.DeletionTimestamp != nil { | ||||
| 			return "", "", fmt.Errorf( | ||||
| 			return nil, fmt.Errorf( | ||||
| 				"can't start pod because PVC %s/%s is being deleted", | ||||
| 				namespace, | ||||
| 				claimName) | ||||
| @@ -464,7 +588,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( | ||||
|  | ||||
| 	if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { | ||||
|  | ||||
| 		return "", "", fmt.Errorf( | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", | ||||
| 			namespace, | ||||
| 			claimName, | ||||
| @@ -472,7 +596,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( | ||||
| 			pvc.Spec.VolumeName) | ||||
| 	} | ||||
|  | ||||
| 	return pvc.Spec.VolumeName, pvc.UID, nil | ||||
| 	return pvc, nil | ||||
| } | ||||
|  | ||||
| // getPVSpec fetches the PV object with the given name from the API server | ||||
|   | ||||
| @@ -20,12 +20,16 @@ import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"fmt" | ||||
|  | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	"k8s.io/apimachinery/pkg/api/resource" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	core "k8s.io/client-go/testing" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/kubelet/configmap" | ||||
| 	containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" | ||||
| 	kubepod "k8s.io/kubernetes/pkg/kubelet/pod" | ||||
| @@ -294,7 +298,7 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) { | ||||
|  | ||||
| 	fakePodManager.AddPod(pod) | ||||
| 	mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) | ||||
| 	volumeSpec, _, err := | ||||
| 	_, volumeSpec, _, err := | ||||
| 		dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) | ||||
|  | ||||
| 	// Assert | ||||
| @@ -343,7 +347,7 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) { | ||||
|  | ||||
| 	fakePodManager.AddPod(pod) | ||||
| 	mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) | ||||
| 	volumeSpec, _, err := | ||||
| 	_, volumeSpec, _, err := | ||||
| 		dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) | ||||
|  | ||||
| 	// Assert | ||||
| @@ -395,7 +399,7 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) { | ||||
|  | ||||
| 	fakePodManager.AddPod(pod) | ||||
| 	mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) | ||||
| 	volumeSpec, _, err := | ||||
| 	_, volumeSpec, _, err := | ||||
| 		dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) | ||||
|  | ||||
| 	// Assert | ||||
| @@ -447,7 +451,7 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) { | ||||
|  | ||||
| 	fakePodManager.AddPod(pod) | ||||
| 	mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) | ||||
| 	volumeSpec, _, err := | ||||
| 	_, volumeSpec, _, err := | ||||
| 		dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) | ||||
|  | ||||
| 	// Assert | ||||
| @@ -459,6 +463,155 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) { | ||||
| 	utilfeature.DefaultFeatureGate.Set("BlockVolume=false") | ||||
| } | ||||
|  | ||||
| func TestCheckVolumeFSResize(t *testing.T) { | ||||
| 	pv := &v1.PersistentVolume{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: "dswp-test-volume-name", | ||||
| 		}, | ||||
| 		Spec: v1.PersistentVolumeSpec{ | ||||
| 			PersistentVolumeSource: v1.PersistentVolumeSource{RBD: &v1.RBDPersistentVolumeSource{}}, | ||||
| 			Capacity:               volumeCapacity(1), | ||||
| 			ClaimRef:               &v1.ObjectReference{Namespace: "ns", Name: "file-bound"}, | ||||
| 		}, | ||||
| 	} | ||||
| 	pvc := &v1.PersistentVolumeClaim{ | ||||
| 		Spec: v1.PersistentVolumeClaimSpec{ | ||||
| 			VolumeName: "dswp-test-volume-name", | ||||
| 			Resources: v1.ResourceRequirements{ | ||||
| 				Requests: volumeCapacity(1), | ||||
| 			}, | ||||
| 		}, | ||||
| 		Status: v1.PersistentVolumeClaimStatus{ | ||||
| 			Phase:    v1.ClaimBound, | ||||
| 			Capacity: volumeCapacity(1), | ||||
| 		}, | ||||
| 	} | ||||
| 	dswp, fakePodManager, fakeDSW := createDswpWithVolume(t, pv, pvc) | ||||
| 	fakeASW := dswp.actualStateOfWorld | ||||
|  | ||||
| 	// create pod | ||||
| 	containers := []v1.Container{ | ||||
| 		{ | ||||
| 			VolumeMounts: []v1.VolumeMount{ | ||||
| 				{ | ||||
| 					Name:      "dswp-test-volume-name", | ||||
| 					MountPath: "/mnt", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) | ||||
| 	uniquePodName := types.UniquePodName(pod.UID) | ||||
| 	uniqueVolumeName := v1.UniqueVolumeName("fake-plugin/" + pod.Spec.Volumes[0].Name) | ||||
|  | ||||
| 	fakePodManager.AddPod(pod) | ||||
| 	// Fill the dsw to contains volumes and pods. | ||||
| 	dswp.findAndAddNewPods() | ||||
| 	reconcileASW(fakeASW, fakeDSW, t) | ||||
|  | ||||
| 	// No resize request for volume, volumes in ASW shouldn't be marked as fsResizeRequired. | ||||
| 	setExpandOnlinePersistentVolumesFeatureGate("true", t) | ||||
| 	resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW) | ||||
| 	if len(resizeRequiredVolumes) > 0 { | ||||
| 		t.Fatalf("No resize request for any volumes, but found resize required volumes in ASW: %v", resizeRequiredVolumes) | ||||
| 	} | ||||
|  | ||||
| 	// Add a resize request to volume. | ||||
| 	pv.Spec.Capacity = volumeCapacity(2) | ||||
| 	pvc.Spec.Resources.Requests = volumeCapacity(2) | ||||
|  | ||||
| 	// Disable the feature gate, so volume shouldn't be marked as fsResizeRequired. | ||||
| 	setExpandOnlinePersistentVolumesFeatureGate("false", t) | ||||
| 	resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) | ||||
| 	if len(resizeRequiredVolumes) > 0 { | ||||
| 		t.Fatalf("Feature gate disabled, but found resize required volumes in ASW: %v", resizeRequiredVolumes) | ||||
| 	} | ||||
|  | ||||
| 	// Make volume used as ReadOnly, so volume shouldn't be marked as fsResizeRequired. | ||||
| 	setExpandOnlinePersistentVolumesFeatureGate("true", t) | ||||
| 	pod.Spec.Containers[0].VolumeMounts[0].ReadOnly = true | ||||
| 	resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) | ||||
| 	if len(resizeRequiredVolumes) > 0 { | ||||
| 		t.Fatalf("volume mounted as ReadOnly, but found resize required volumes in ASW: %v", resizeRequiredVolumes) | ||||
| 	} | ||||
|  | ||||
| 	// Clear ASW, so volume shouldn't be marked as fsResizeRequired because they are not mounted. | ||||
| 	pod.Spec.Containers[0].VolumeMounts[0].ReadOnly = false | ||||
| 	clearASW(fakeASW, fakeDSW, t) | ||||
| 	resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) | ||||
| 	if len(resizeRequiredVolumes) > 0 { | ||||
| 		t.Fatalf("volume hasn't been mounted, but found resize required volumes in ASW: %v", resizeRequiredVolumes) | ||||
| 	} | ||||
|  | ||||
| 	// volume in ASW should be marked as fsResizeRequired. | ||||
| 	reconcileASW(fakeASW, fakeDSW, t) | ||||
| 	resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) | ||||
| 	if len(resizeRequiredVolumes) == 0 { | ||||
| 		t.Fatalf("Request resize for volume, but volume in ASW hasn't been marked as fsResizeRequired") | ||||
| 	} | ||||
| 	if len(resizeRequiredVolumes) != 1 { | ||||
| 		t.Fatalf("Some unexpected volumes are marked as fsResizeRequired: %v", resizeRequiredVolumes) | ||||
| 	} | ||||
| 	if resizeRequiredVolumes[0] != uniqueVolumeName { | ||||
| 		t.Fatalf("Mark wrong volume as fsResizeRequired: %s", resizeRequiredVolumes[0]) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func volumeCapacity(size int) v1.ResourceList { | ||||
| 	return v1.ResourceList{v1.ResourceStorage: resource.MustParse(fmt.Sprintf("%dGi", size))} | ||||
| } | ||||
|  | ||||
| func setExpandOnlinePersistentVolumesFeatureGate(value string, t *testing.T) { | ||||
| 	err := utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%s", features.ExpandPersistentVolumesFSWithoutUnmounting, value)) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Set ExpandPersistentVolumesFSWithoutUnmounting feature gate to %s failed: %v", value, err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func reconcileASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *testing.T) { | ||||
| 	for _, volumeToMount := range dsw.GetVolumesToMount() { | ||||
| 		err := asw.MarkVolumeAsAttached(volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", "") | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Unexpected error when MarkVolumeAsAttached: %v", err) | ||||
| 		} | ||||
| 		err = asw.MarkVolumeAsMounted(volumeToMount.PodName, volumeToMount.Pod.UID, | ||||
| 			volumeToMount.VolumeName, nil, nil, volumeToMount.OuterVolumeSpecName, volumeToMount.VolumeGidValue, volumeToMount.VolumeSpec) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Unexpected error when MarkVolumeAsMounted: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *testing.T) { | ||||
| 	for _, volumeToMount := range dsw.GetVolumesToMount() { | ||||
| 		err := asw.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Unexpected error when MarkVolumeAsUnmounted: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	for _, volumeToMount := range dsw.GetVolumesToMount() { | ||||
| 		asw.MarkVolumeAsDetached(volumeToMount.VolumeName, "") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName, | ||||
| 	dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { | ||||
| 	dswp.ReprocessPod(uniquePodName) | ||||
| 	dswp.findAndAddNewPods() | ||||
| 	return getResizeRequiredVolumes(dsw, asw) | ||||
| } | ||||
|  | ||||
| func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { | ||||
| 	resizeRequiredVolumes := []v1.UniqueVolumeName{} | ||||
| 	for _, volumeToMount := range dsw.GetVolumesToMount() { | ||||
| 		_, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) | ||||
| 		if cache.IsFSResizeRequiredError(err) { | ||||
| 			resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName) | ||||
| 		} | ||||
| 	} | ||||
| 	return resizeRequiredVolumes | ||||
| } | ||||
|  | ||||
| func verifyVolumeExistsInVolumesToMount(t *testing.T, expectedVolumeName v1.UniqueVolumeName, expectReportedInUse bool, dsw cache.DesiredStateOfWorld) { | ||||
| 	volumesToMount := dsw.GetVolumesToMount() | ||||
| 	for _, volume := range volumesToMount { | ||||
|   | ||||
| @@ -38,6 +38,7 @@ go_test( | ||||
|     srcs = ["reconciler_test.go"], | ||||
|     embed = [":go_default_library"], | ||||
|     deps = [ | ||||
|         "//pkg/features:go_default_library", | ||||
|         "//pkg/kubelet/volumemanager/cache:go_default_library", | ||||
|         "//pkg/util/mount:go_default_library", | ||||
|         "//pkg/volume:go_default_library", | ||||
|   | ||||
| @@ -254,6 +254,22 @@ func (rc *reconciler) reconcile() { | ||||
| 					glog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) | ||||
| 				} | ||||
| 			} | ||||
| 		} else if cache.IsFSResizeRequiredError(err) && | ||||
| 			utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { | ||||
| 			glog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandVolumeFSWithoutUnmounting", "")) | ||||
| 			err := rc.operationExecutor.ExpandVolumeFSWithoutUnmounting( | ||||
| 				volumeToMount.VolumeToMount, | ||||
| 				rc.actualStateOfWorld) | ||||
| 			if err != nil && | ||||
| 				!nestedpendingoperations.IsAlreadyExists(err) && | ||||
| 				!exponentialbackoff.IsExponentialBackoff(err) { | ||||
| 				// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. | ||||
| 				// Log all other errors. | ||||
| 				glog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandVolumeFSWithoutUnmounting failed", err).Error()) | ||||
| 			} | ||||
| 			if err == nil { | ||||
| 				glog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandVolumeFSWithoutUnmounting started", "")) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -32,6 +32,7 @@ import ( | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	core "k8s.io/client-go/testing" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| @@ -965,6 +966,120 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) { | ||||
| 	utilfeature.DefaultFeatureGate.Set("BlockVolume=false") | ||||
| } | ||||
|  | ||||
| // Populates desiredStateOfWorld cache with one volume/pod. | ||||
| // Enables controllerAttachDetachEnabled. | ||||
| // Calls Run() | ||||
| // Wait for volume mounted. | ||||
| // Mark volume as fsResizeRequired in ASW. | ||||
| // Verifies volume's fsResizeRequired flag is cleared later. | ||||
| func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { | ||||
| 	utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.ExpandPersistentVolumesFSWithoutUnmounting)) | ||||
| 	pv := &v1.PersistentVolume{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: "pv", | ||||
| 			UID:  "pvuid", | ||||
| 		}, | ||||
| 		Spec: v1.PersistentVolumeSpec{ | ||||
| 			ClaimRef: &v1.ObjectReference{Name: "pvc"}, | ||||
| 		}, | ||||
| 	} | ||||
| 	pvc := &v1.PersistentVolumeClaim{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: "pvc", | ||||
| 			UID:  "pvcuid", | ||||
| 		}, | ||||
| 		Spec: v1.PersistentVolumeClaimSpec{ | ||||
| 			VolumeName: "pv", | ||||
| 		}, | ||||
| 	} | ||||
| 	pod := &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: "pod1", | ||||
| 			UID:  "pod1uid", | ||||
| 		}, | ||||
| 		Spec: v1.PodSpec{ | ||||
| 			Volumes: []v1.Volume{ | ||||
| 				{ | ||||
| 					Name: "volume-name", | ||||
| 					VolumeSource: v1.VolumeSource{ | ||||
| 						PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ | ||||
| 							ClaimName: pvc.Name, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) | ||||
| 	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) | ||||
| 	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) | ||||
| 	kubeClient := createtestClientWithPVPVC(pv, pvc) | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	fakeHandler := volumetesting.NewBlockVolumePathHandler() | ||||
| 	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( | ||||
| 		kubeClient, | ||||
| 		volumePluginMgr, | ||||
| 		fakeRecorder, | ||||
| 		false, /* checkNodeCapabilitiesBeforeMount */ | ||||
| 		fakeHandler)) | ||||
|  | ||||
| 	reconciler := NewReconciler( | ||||
| 		kubeClient, | ||||
| 		true, /* controllerAttachDetachEnabled */ | ||||
| 		reconcilerLoopSleepDuration, | ||||
| 		reconcilerSyncStatesSleepPeriod, | ||||
| 		waitForAttachTimeout, | ||||
| 		nodeName, | ||||
| 		dsw, | ||||
| 		asw, | ||||
| 		hasAddedPods, | ||||
| 		oex, | ||||
| 		&mount.FakeMounter{}, | ||||
| 		volumePluginMgr, | ||||
| 		kubeletPodsDir) | ||||
|  | ||||
| 	volumeSpec := &volume.Spec{PersistentVolume: pv} | ||||
| 	podName := util.GetUniquePodName(pod) | ||||
| 	volumeName, err := dsw.AddPodToVolume( | ||||
| 		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) | ||||
| 	// Assert | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err) | ||||
| 	} | ||||
| 	dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) | ||||
|  | ||||
| 	// Start the reconciler to fill ASW. | ||||
| 	stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) | ||||
| 	go func() { | ||||
| 		reconciler.Run(stopChan) | ||||
| 		close(stoppedChan) | ||||
| 	}() | ||||
| 	waitForMount(t, fakePlugin, volumeName, asw) | ||||
| 	// Stop the reconciler. | ||||
| 	close(stopChan) | ||||
| 	<-stoppedChan | ||||
|  | ||||
| 	// Mark volume as fsResizeRequired. | ||||
| 	asw.MarkFSResizeRequired(volumeName, podName) | ||||
| 	_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName) | ||||
| 	if !cache.IsFSResizeRequiredError(podExistErr) { | ||||
| 		t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr) | ||||
| 	} | ||||
|  | ||||
| 	// Start the reconciler again, we hope reconciler will perform the | ||||
| 	// resize operation and clear the fsResizeRequired flag for volume. | ||||
| 	go reconciler.Run(wait.NeverStop) | ||||
|  | ||||
| 	waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) { | ||||
| 		mounted, _, err := asw.PodExistsInVolume(podName, volumeName) | ||||
| 		return mounted && err == nil, nil | ||||
| 	}) | ||||
| 	if waitErr != nil { | ||||
| 		t.Fatal("Volume resize should succeeded") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func waitForMount( | ||||
| 	t *testing.T, | ||||
| 	fakePlugin *volumetesting.FakeVolumePlugin, | ||||
| @@ -1044,3 +1159,30 @@ func createTestClient() *fake.Clientset { | ||||
| func runReconciler(reconciler Reconciler) { | ||||
| 	go reconciler.Run(wait.NeverStop) | ||||
| } | ||||
|  | ||||
| func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) *fake.Clientset { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	fakeClient.AddReactor("get", "nodes", | ||||
| 		func(action core.Action) (bool, runtime.Object, error) { | ||||
| 			return true, &v1.Node{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)}, | ||||
| 				Status: v1.NodeStatus{ | ||||
| 					VolumesAttached: []v1.AttachedVolume{ | ||||
| 						{ | ||||
| 							Name:       "fake-plugin/pv", | ||||
| 							DevicePath: "fake/path", | ||||
| 						}, | ||||
| 					}}, | ||||
| 			}, nil | ||||
| 		}) | ||||
| 	fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { | ||||
| 		return true, pvc, nil | ||||
| 	}) | ||||
| 	fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) { | ||||
| 		return true, pv, nil | ||||
| 	}) | ||||
| 	fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { | ||||
| 		return true, nil, fmt.Errorf("no reaction implemented for %s", action) | ||||
| 	}) | ||||
| 	return fakeClient | ||||
| } | ||||
|   | ||||
| @@ -439,6 +439,15 @@ func (plugin *FakeVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]st | ||||
| 	return []string{}, nil | ||||
| } | ||||
|  | ||||
| // Expandable volume support | ||||
| func (plugin *FakeVolumePlugin) ExpandVolumeDevice(spec *Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) { | ||||
| 	return resource.Quantity{}, nil | ||||
| } | ||||
|  | ||||
| func (plugin *FakeVolumePlugin) RequiresFSResize() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| type FakeFileVolumePlugin struct { | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -142,6 +142,8 @@ type OperationExecutor interface { | ||||
| 	IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool | ||||
| 	// Expand Volume will grow size available to PVC | ||||
| 	ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error | ||||
| 	// ExpandVolumeFSWithoutUnmounting will resize volume's file system to expected size without unmounting the volume. | ||||
| 	ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error | ||||
| 	// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin | ||||
| 	ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error) | ||||
| 	// CheckVolumeExistenceOperation checks volume existence | ||||
| @@ -173,6 +175,9 @@ type ActualStateOfWorldMounterUpdater interface { | ||||
|  | ||||
| 	// Marks the specified volume as having its global mount unmounted. | ||||
| 	MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error | ||||
|  | ||||
| 	// Marks the specified volume's file system resize request is finished. | ||||
| 	MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error | ||||
| } | ||||
|  | ||||
| // ActualStateOfWorldAttacherUpdater defines a set of operations updating the | ||||
| @@ -817,6 +822,14 @@ func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCW | ||||
| 	return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations) | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { | ||||
| 	generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount, actualStateOfWorld) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations) | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) VerifyControllerAttachedVolume( | ||||
| 	volumeToMount VolumeToMount, | ||||
| 	nodeName types.NodeName, | ||||
|   | ||||
| @@ -438,6 +438,16 @@ func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeReques | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (fopg *fakeOperationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { | ||||
| 	opFunc := func() (error, error) { | ||||
| 		startOperationAndBlock(fopg.ch, fopg.quit) | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	return volumetypes.GeneratedOperations{ | ||||
| 		OperationFunc: opFunc, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( | ||||
| 	pluginNodeVolumes map[types.NodeName][]*volume.Spec, | ||||
| 	pluginNane string, | ||||
|   | ||||
| @@ -121,6 +121,9 @@ type OperationGenerator interface { | ||||
| 		map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) | ||||
|  | ||||
| 	GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) | ||||
|  | ||||
| 	// Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. | ||||
| 	GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateVolumesAreAttachedFunc( | ||||
| @@ -1306,6 +1309,62 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( | ||||
| 	volumeToMount VolumeToMount, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { | ||||
| 	volumePlugin, err := | ||||
| 		og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) | ||||
| 	if err != nil || volumePlugin == nil { | ||||
| 		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err) | ||||
| 	} | ||||
|  | ||||
| 	attachableVolumePlugin, err := | ||||
| 		og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec) | ||||
| 	if err != nil || attachableVolumePlugin == nil { | ||||
| 		if attachableVolumePlugin == nil { | ||||
| 			err = fmt.Errorf("AttachableVolumePlugin is nil") | ||||
| 		} | ||||
| 		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindAttachablePluginBySpec failed", err) | ||||
| 	} | ||||
|  | ||||
| 	volumeAttacher, err := attachableVolumePlugin.NewAttacher() | ||||
| 	if err != nil || volumeAttacher == nil { | ||||
| 		if volumeAttacher == nil { | ||||
| 			err = fmt.Errorf("VolumeAttacher is nil") | ||||
| 		} | ||||
| 		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.NewAttacher failed", err) | ||||
| 	} | ||||
|  | ||||
| 	deviceMountPath, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec) | ||||
| 	if err != nil { | ||||
| 		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.GetDeviceMountPath failed", err) | ||||
| 	} | ||||
|  | ||||
| 	fsResizeFunc := func() (error, error) { | ||||
| 		resizeSimpleError, resizeDetailedError := og.resizeFileSystem(volumeToMount, volumeToMount.DevicePath, deviceMountPath, volumePlugin.GetPluginName()) | ||||
| 		if resizeSimpleError != nil || resizeDetailedError != nil { | ||||
| 			return resizeSimpleError, resizeDetailedError | ||||
| 		} | ||||
| 		markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName) | ||||
| 		if markFSResizedErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return volumeToMount.GenerateError("VolumeFSResize.MarkVolumeAsResized failed", markFSResizedErr) | ||||
| 		} | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	eventRecorderFunc := func(err *error) { | ||||
| 		if *err != nil { | ||||
| 			og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return volumetypes.GeneratedOperations{ | ||||
| 		OperationFunc:     fsResizeFunc, | ||||
| 		EventRecorderFunc: eventRecorderFunc, | ||||
| 		CompleteFunc:      util.OperationCompleteHook(volumePlugin.GetPluginName(), "volume_fs_resize"), | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { | ||||
| 	mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec) | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 mlmhl
					mlmhl