Modify code to use new interface functions
This commit is contained in:
parent
2e54686f1b
commit
e4f62d6c41
@ -23,6 +23,7 @@ package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -581,6 +582,10 @@ func (asw *actualStateOfWorld) GetAttachState(
|
||||
return AttachStateDetached
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) {
|
||||
klog.V(5).Infof("doing nothing")
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
@ -107,7 +107,7 @@ type ActualStateOfWorld interface {
|
||||
// volumes, depend on this to update the contents of the volume.
|
||||
// All volume mounting calls should be idempotent so a second mount call for
|
||||
// volumes that do not need to update contents should not fail.
|
||||
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error)
|
||||
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize *resource.Quantity) (bool, string, error)
|
||||
|
||||
// PodRemovedFromVolume returns true if the given pod does not exist in the list of
|
||||
// mountedPods for the given volume in the cache, indicating that the pod has
|
||||
@ -287,7 +287,7 @@ type attachedVolume struct {
|
||||
volumeInUseErrorForExpansion bool
|
||||
|
||||
// persistentVolumeSize records size of the volume when pod was started.
|
||||
persistentVolumeSize resource.Quantity
|
||||
persistentVolumeSize *resource.Quantity
|
||||
}
|
||||
|
||||
// The mountedPod object represents a pod for which the kubelet volume manager
|
||||
@ -655,14 +655,14 @@ func (asw *actualStateOfWorld) SetDeviceMountState(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) {
|
||||
func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) {
|
||||
asw.Lock()
|
||||
defer asw.Unlock()
|
||||
|
||||
volumeObj, ok := asw.attachedVolumes[volumeName]
|
||||
// only set volume claim size if claimStatusSize is zero
|
||||
// this can happen when volume was rebuilt after kubelet startup
|
||||
if ok && volumeObj.persistentVolumeSize.IsZero() {
|
||||
if ok && volumeObj.persistentVolumeSize == nil {
|
||||
volumeObj.persistentVolumeSize = claimSize
|
||||
asw.attachedVolumes[volumeName] = volumeObj
|
||||
}
|
||||
@ -710,7 +710,8 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro
|
||||
|
||||
func (asw *actualStateOfWorld) PodExistsInVolume(
|
||||
podName volumetypes.UniquePodName,
|
||||
volumeName v1.UniqueVolumeName) (bool, string, error) {
|
||||
volumeName v1.UniqueVolumeName,
|
||||
desiredVolumeSize *resource.Quantity) (bool, string, error) {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
||||
@ -728,8 +729,7 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
|
||||
if podObj.remountRequired {
|
||||
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
|
||||
}
|
||||
if podObj.fsResizeRequired &&
|
||||
!volumeObj.volumeInUseErrorForExpansion {
|
||||
if asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize) {
|
||||
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
|
||||
}
|
||||
}
|
||||
@ -737,6 +737,30 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
|
||||
return podExists, volumeObj.devicePath, nil
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize *resource.Quantity) bool {
|
||||
if volumeObj.volumeInUseErrorForExpansion {
|
||||
return false
|
||||
}
|
||||
if volumeObj.persistentVolumeSize == nil || desiredVolumeSize == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 {
|
||||
volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec)
|
||||
if err != nil || volumePlugin == nil {
|
||||
// Log and continue processing
|
||||
klog.Errorf(
|
||||
"PodExistsInVolume failed to find expandable plugin volume: %q (volSpecName: %q)",
|
||||
volumeObj.volumeName, volumeObj.spec.Name())
|
||||
return false
|
||||
}
|
||||
if volumePlugin.RequiresFSResize() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) PodRemovedFromVolume(
|
||||
podName volumetypes.UniquePodName,
|
||||
volumeName v1.UniqueVolumeName) bool {
|
||||
|
@ -676,7 +676,7 @@ func TestUncertainVolumeMounts(t *testing.T) {
|
||||
t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name())
|
||||
}
|
||||
|
||||
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1)
|
||||
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, nil)
|
||||
if volExists {
|
||||
t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1)
|
||||
}
|
||||
@ -762,7 +762,7 @@ func verifyPodExistsInVolumeAsw(
|
||||
expectedDevicePath string,
|
||||
asw ActualStateOfWorld) {
|
||||
podExistsInVolume, devicePath, err :=
|
||||
asw.PodExistsInVolume(expectedPodName, expectedVolumeName)
|
||||
asw.PodExistsInVolume(expectedPodName, expectedVolumeName, nil)
|
||||
if err != nil {
|
||||
t.Fatalf(
|
||||
"ASW PodExistsInVolume failed. Expected: <no error> Actual: <%v>", err)
|
||||
@ -804,7 +804,7 @@ func verifyPodDoesntExistInVolumeAsw(
|
||||
expectVolumeToExist bool,
|
||||
asw ActualStateOfWorld) {
|
||||
podExistsInVolume, devicePath, err :=
|
||||
asw.PodExistsInVolume(podToCheck, volumeToCheck)
|
||||
asw.PodExistsInVolume(podToCheck, volumeToCheck, nil)
|
||||
if !expectVolumeToExist && err == nil {
|
||||
t.Fatalf(
|
||||
"ASW PodExistsInVolume did not return error. Expected: <error indicating volume does not exist> Actual: <%v>", err)
|
||||
|
@ -128,13 +128,14 @@ type DesiredStateOfWorld interface {
|
||||
// UpdatePersistentVolumeSize updates persistentVolumeSize in desired state of the world
|
||||
// so as it can be compared against actual size and volume expansion performed
|
||||
// if necessary
|
||||
UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size resource.Quantity)
|
||||
UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity)
|
||||
}
|
||||
|
||||
// VolumeToMount represents a volume that is attached to this node and needs to
|
||||
// be mounted to PodName.
|
||||
type VolumeToMount struct {
|
||||
operationexecutor.VolumeToMount
|
||||
PersistentVolumeSize *resource.Quantity
|
||||
}
|
||||
|
||||
// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
|
||||
@ -193,7 +194,7 @@ type volumeToMount struct {
|
||||
desiredSizeLimit *resource.Quantity
|
||||
|
||||
// persistentVolumeSize records desired size of a persistent volume.
|
||||
persistentVolumeSize resource.Quantity
|
||||
persistentVolumeSize *resource.Quantity
|
||||
}
|
||||
|
||||
// The pod object represents a pod that references the underlying volume and
|
||||
@ -295,7 +296,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
|
||||
if volumeSpec.PersistentVolume != nil {
|
||||
pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage()
|
||||
if pvCap != nil {
|
||||
vmt.persistentVolumeSize = *pvCap
|
||||
vmt.persistentVolumeSize = pvCap
|
||||
}
|
||||
}
|
||||
|
||||
@ -366,7 +367,7 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume(
|
||||
|
||||
// UpdatePersistentVolumeSize updates last known PV size. This is used for volume expansion and
|
||||
// should be only used for persistent volumes.
|
||||
func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size resource.Quantity) {
|
||||
func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) {
|
||||
dsw.Lock()
|
||||
defer dsw.Unlock()
|
||||
|
||||
@ -447,7 +448,9 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
|
||||
VolumeGidValue: volumeObj.volumeGidValue,
|
||||
ReportedInUse: volumeObj.reportedInUse,
|
||||
MountRequestTime: podObj.mountRequestTime,
|
||||
DesiredSizeLimit: volumeObj.desiredSizeLimit}})
|
||||
DesiredSizeLimit: volumeObj.desiredSizeLimit},
|
||||
PersistentVolumeSize: volumeObj.persistentVolumeSize,
|
||||
})
|
||||
}
|
||||
}
|
||||
return volumesToMount
|
||||
|
@ -1108,7 +1108,7 @@ func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePod
|
||||
func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName {
|
||||
resizeRequiredVolumes := []v1.UniqueVolumeName{}
|
||||
for _, volumeToMount := range dsw.GetVolumesToMount() {
|
||||
_, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||
_, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, nil)
|
||||
if cache.IsFSResizeRequiredError(err) {
|
||||
resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName)
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ func (rc *reconciler) unmountVolumes() {
|
||||
func (rc *reconciler) mountOrAttachVolumes() {
|
||||
// Ensure volumes that should be attached/mounted are attached/mounted.
|
||||
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
|
||||
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize)
|
||||
volumeToMount.DevicePath = devicePath
|
||||
if cache.IsVolumeNotAttachedError(err) {
|
||||
rc.waitForVolumeAttach(volumeToMount)
|
||||
|
@ -1287,7 +1287,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
|
||||
// mark volume as resize required
|
||||
asw.MarkFSResizeRequired(volumeName, podName)
|
||||
|
||||
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName)
|
||||
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, nil)
|
||||
if tc.expansionFailed {
|
||||
if cache.IsFSResizeRequiredError(podExistErr) {
|
||||
t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
|
||||
@ -1299,7 +1299,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
|
||||
go reconciler.Run(wait.NeverStop)
|
||||
|
||||
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
|
||||
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
|
||||
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil)
|
||||
return mounted && err == nil, nil
|
||||
})
|
||||
if waitErr != nil {
|
||||
@ -1791,7 +1791,7 @@ func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podN
|
||||
err := retryWithExponentialBackOff(
|
||||
testOperationBackOffDuration,
|
||||
func() (bool, error) {
|
||||
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
|
||||
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil)
|
||||
if mounted || err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ type ActualStateOfWorldAttacherUpdater interface {
|
||||
AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
|
||||
|
||||
// SetVolumeClaimSize sets pvc claim size by reading pvc.Status.Capacity
|
||||
SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity)
|
||||
SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity)
|
||||
}
|
||||
|
||||
// VolumeLogger defines a set of operations for generating volume-related logging and error msgs
|
||||
|
@ -1491,7 +1491,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
|
||||
|
||||
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
|
||||
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
|
||||
var claimSize resource.Quantity
|
||||
var claimSize *resource.Quantity
|
||||
|
||||
if volumeToMount.VolumeSpec.PersistentVolume != nil {
|
||||
pv := volumeToMount.VolumeSpec.PersistentVolume
|
||||
@ -1502,7 +1502,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
|
||||
}
|
||||
pvcStatusSize := pvc.Status.Capacity.Storage()
|
||||
if pvcStatusSize != nil {
|
||||
claimSize = *pvcStatusSize
|
||||
claimSize = pvcStatusSize
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user