Merge pull request #19365 from jsafrane/devel/retry-delete
Auto commit by PR queue bot
This commit is contained in:
@@ -306,7 +306,9 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||||||
|
|
||||||
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
|
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
|
||||||
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
|
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
|
||||||
s.PersistentVolumeControllerOptions.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.PersistentVolumeControllerOptions.VolumeConfigFlags),
|
s.PersistentVolumeControllerOptions.PVClaimBinderSyncPeriod,
|
||||||
|
s.PersistentVolumeControllerOptions.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry,
|
||||||
|
ProbeRecyclableVolumePlugins(s.PersistentVolumeControllerOptions.VolumeConfigFlags),
|
||||||
cloud,
|
cloud,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -256,6 +256,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
|
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
|
||||||
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
|
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
|
||||||
s.PersistentVolumeControllerOptions.PVClaimBinderSyncPeriod,
|
s.PersistentVolumeControllerOptions.PVClaimBinderSyncPeriod,
|
||||||
|
s.PersistentVolumeControllerOptions.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry,
|
||||||
kubecontrollermanager.ProbeRecyclableVolumePlugins(s.PersistentVolumeControllerOptions.VolumeConfigFlags), cloud)
|
kubecontrollermanager.ProbeRecyclableVolumePlugins(s.PersistentVolumeControllerOptions.VolumeConfigFlags), cloud)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
|
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
|
||||||
|
@@ -91,6 +91,7 @@ kube-controller-manager
|
|||||||
--port=10252: The port that the controller-manager's http service runs on
|
--port=10252: The port that the controller-manager's http service runs on
|
||||||
--profiling[=true]: Enable profiling via web interface host:port/debug/pprof/
|
--profiling[=true]: Enable profiling via web interface host:port/debug/pprof/
|
||||||
--pv-recycler-increment-timeout-nfs=30: the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod
|
--pv-recycler-increment-timeout-nfs=30: the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod
|
||||||
|
--pv-recycler-maximum-retry=3: Maximum number of attempts to recycle or delete a persistent volume
|
||||||
--pv-recycler-minimum-timeout-hostpath=60: The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.
|
--pv-recycler-minimum-timeout-hostpath=60: The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.
|
||||||
--pv-recycler-minimum-timeout-nfs=300: The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod
|
--pv-recycler-minimum-timeout-nfs=300: The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod
|
||||||
--pv-recycler-pod-template-filepath-hostpath="": The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.
|
--pv-recycler-pod-template-filepath-hostpath="": The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.
|
||||||
|
@@ -279,6 +279,7 @@ pv-recycler-minimum-timeout-hostpath
|
|||||||
pv-recycler-minimum-timeout-nfs
|
pv-recycler-minimum-timeout-nfs
|
||||||
pv-recycler-pod-template-filepath-hostpath
|
pv-recycler-pod-template-filepath-hostpath
|
||||||
pv-recycler-pod-template-filepath-nfs
|
pv-recycler-pod-template-filepath-nfs
|
||||||
|
pv-recycler-maximum-retry
|
||||||
pv-recycler-timeout-increment-hostpath
|
pv-recycler-timeout-increment-hostpath
|
||||||
pvclaimbinder-sync-period
|
pvclaimbinder-sync-period
|
||||||
read-only-port
|
read-only-port
|
||||||
|
@@ -27,6 +27,7 @@ import (
|
|||||||
// of volume.VolumeConfig which are then passed to the appropriate plugin. The ControllerManager binary is the only
|
// of volume.VolumeConfig which are then passed to the appropriate plugin. The ControllerManager binary is the only
|
||||||
// part of the code which knows what plugins are supported and which CLI flags correspond to each plugin.
|
// part of the code which knows what plugins are supported and which CLI flags correspond to each plugin.
|
||||||
type VolumeConfigFlags struct {
|
type VolumeConfigFlags struct {
|
||||||
|
PersistentVolumeRecyclerMaximumRetry int
|
||||||
PersistentVolumeRecyclerMinimumTimeoutNFS int
|
PersistentVolumeRecyclerMinimumTimeoutNFS int
|
||||||
PersistentVolumeRecyclerPodTemplateFilePathNFS string
|
PersistentVolumeRecyclerPodTemplateFilePathNFS string
|
||||||
PersistentVolumeRecyclerIncrementTimeoutNFS int
|
PersistentVolumeRecyclerIncrementTimeoutNFS int
|
||||||
@@ -46,6 +47,7 @@ func NewPersistentVolumeControllerOptions() PersistentVolumeControllerOptions {
|
|||||||
PVClaimBinderSyncPeriod: 10 * time.Minute,
|
PVClaimBinderSyncPeriod: 10 * time.Minute,
|
||||||
VolumeConfigFlags: VolumeConfigFlags{
|
VolumeConfigFlags: VolumeConfigFlags{
|
||||||
// default values here
|
// default values here
|
||||||
|
PersistentVolumeRecyclerMaximumRetry: 3,
|
||||||
PersistentVolumeRecyclerMinimumTimeoutNFS: 300,
|
PersistentVolumeRecyclerMinimumTimeoutNFS: 300,
|
||||||
PersistentVolumeRecyclerIncrementTimeoutNFS: 30,
|
PersistentVolumeRecyclerIncrementTimeoutNFS: 30,
|
||||||
PersistentVolumeRecyclerMinimumTimeoutHostPath: 60,
|
PersistentVolumeRecyclerMinimumTimeoutHostPath: 60,
|
||||||
@@ -76,6 +78,9 @@ func (o *PersistentVolumeControllerOptions) AddFlags(fs *pflag.FlagSet) {
|
|||||||
o.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath,
|
o.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath,
|
||||||
"the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. "+
|
"the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. "+
|
||||||
"This is for development and testing only and will not work in a multi-node cluster.")
|
"This is for development and testing only and will not work in a multi-node cluster.")
|
||||||
|
fs.IntVar(&o.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry, "pv-recycler-maximum-retry",
|
||||||
|
o.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry,
|
||||||
|
"Maximum number of attempts to recycle or delete a persistent volume")
|
||||||
fs.BoolVar(&o.VolumeConfigFlags.EnableHostPathProvisioning, "enable-hostpath-provisioner", o.VolumeConfigFlags.EnableHostPathProvisioning,
|
fs.BoolVar(&o.VolumeConfigFlags.EnableHostPathProvisioning, "enable-hostpath-provisioner", o.VolumeConfigFlags.EnableHostPathProvisioning,
|
||||||
"Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. "+
|
"Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. "+
|
||||||
"HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.")
|
"HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.")
|
||||||
|
@@ -46,15 +46,33 @@ type PersistentVolumeRecycler struct {
|
|||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
pluginMgr volume.VolumePluginMgr
|
pluginMgr volume.VolumePluginMgr
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
|
maximumRetry int
|
||||||
|
syncPeriod time.Duration
|
||||||
|
// Local cache of failed recycle / delete operations. Map volume.Name -> status of the volume.
|
||||||
|
// Only PVs in Released state have an entry here.
|
||||||
|
releasedVolumes map[string]releasedVolumeStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
|
// releasedVolumeStatus holds state of failed delete/recycle operation on a
|
||||||
func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
|
// volume. The controller re-tries the operation several times and it stores
|
||||||
|
// retry count + timestamp of the last attempt here.
|
||||||
|
type releasedVolumeStatus struct {
|
||||||
|
// How many recycle/delete operations failed.
|
||||||
|
retryCount int
|
||||||
|
// Timestamp of the last attempt.
|
||||||
|
lastAttempt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPersistentVolumeRecycler creates a new PersistentVolumeRecycler
|
||||||
|
func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, maximumRetry int, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
|
||||||
recyclerClient := NewRecyclerClient(kubeClient)
|
recyclerClient := NewRecyclerClient(kubeClient)
|
||||||
recycler := &PersistentVolumeRecycler{
|
recycler := &PersistentVolumeRecycler{
|
||||||
client: recyclerClient,
|
client: recyclerClient,
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
|
maximumRetry: maximumRetry,
|
||||||
|
syncPeriod: syncPeriod,
|
||||||
|
releasedVolumes: make(map[string]releasedVolumeStatus),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
|
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
|
||||||
@@ -89,6 +107,14 @@ func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time
|
|||||||
}
|
}
|
||||||
recycler.reclaimVolume(pv)
|
recycler.reclaimVolume(pv)
|
||||||
},
|
},
|
||||||
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
pv, ok := obj.(*api.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Error casting object to PersistentVolume: %v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
recycler.removeReleasedVolume(pv)
|
||||||
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -96,17 +122,50 @@ func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time
|
|||||||
return recycler, nil
|
return recycler, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
|
// shouldRecycle checks a volume and returns nil, if the volume should be
|
||||||
if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil {
|
// recycled right now. Otherwise it returns an error with reason why it should
|
||||||
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
|
// not be recycled.
|
||||||
|
func (recycler *PersistentVolumeRecycler) shouldRecycle(pv *api.PersistentVolume) error {
|
||||||
|
if pv.Spec.ClaimRef == nil {
|
||||||
|
return fmt.Errorf("Volume does not have a reference to claim")
|
||||||
|
}
|
||||||
|
if pv.Status.Phase != api.VolumeReleased {
|
||||||
|
return fmt.Errorf("The volume is not in 'Released' phase")
|
||||||
|
}
|
||||||
|
|
||||||
latest, err := recycler.client.GetPersistentVolume(pv.Name)
|
// The volume is Released, should we retry recycling?
|
||||||
if err != nil {
|
status, found := recycler.releasedVolumes[pv.Name]
|
||||||
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
|
if !found {
|
||||||
}
|
// We don't know anything about this volume. The controller has been
|
||||||
if latest.Status.Phase != api.VolumeReleased {
|
// restarted or the volume has been marked as Released by another
|
||||||
return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s. Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased)
|
// controller. Recycle/delete this volume as if it was just Released.
|
||||||
}
|
glog.V(5).Infof("PersistentVolume[%s] not found in local cache, recycling", pv.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the timestamp
|
||||||
|
expectedRetry := status.lastAttempt.Add(recycler.syncPeriod)
|
||||||
|
if time.Now().After(expectedRetry) {
|
||||||
|
glog.V(5).Infof("PersistentVolume[%s] retrying recycle after timeout", pv.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// It's too early
|
||||||
|
glog.V(5).Infof("PersistentVolume[%s] skipping recycle, it's too early: now: %v, next retry: %v", pv.Name, time.Now(), expectedRetry)
|
||||||
|
return fmt.Errorf("Too early after previous failure")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
|
||||||
|
glog.V(5).Infof("Recycler: checking PersistentVolume[%s]\n", pv.Name)
|
||||||
|
// Always load the latest version of the volume
|
||||||
|
newPV, err := recycler.client.GetPersistentVolume(pv.Name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
|
||||||
|
}
|
||||||
|
pv = newPV
|
||||||
|
|
||||||
|
err = recycler.shouldRecycle(pv)
|
||||||
|
if err == nil {
|
||||||
|
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
|
||||||
|
|
||||||
// both handleRecycle and handleDelete block until completion
|
// both handleRecycle and handleDelete block until completion
|
||||||
// TODO: allow parallel recycling operations to increase throughput
|
// TODO: allow parallel recycling operations to increase throughput
|
||||||
@@ -125,10 +184,41 @@ func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume
|
|||||||
glog.Errorf(errMsg)
|
glog.Errorf(errMsg)
|
||||||
return fmt.Errorf(errMsg)
|
return fmt.Errorf(errMsg)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
glog.V(3).Infof("PersistentVolume[%s] phase %s - skipping: %v", pv.Name, pv.Status.Phase, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleReleaseFailure evaluates a failed Recycle/Delete operation, updates
|
||||||
|
// internal controller state with new nr. of attempts and timestamp of the last
|
||||||
|
// attempt. Based on the number of failures it returns the next state of the
|
||||||
|
// volume (Released / Failed).
|
||||||
|
func (recycler *PersistentVolumeRecycler) handleReleaseFailure(pv *api.PersistentVolume) api.PersistentVolumePhase {
|
||||||
|
status, found := recycler.releasedVolumes[pv.Name]
|
||||||
|
if !found {
|
||||||
|
// First failure, set retryCount to 0 (will be inceremented few lines below)
|
||||||
|
status = releasedVolumeStatus{}
|
||||||
|
}
|
||||||
|
status.retryCount += 1
|
||||||
|
|
||||||
|
if status.retryCount > recycler.maximumRetry {
|
||||||
|
// This was the last attempt. Remove any internal state and mark the
|
||||||
|
// volume as Failed.
|
||||||
|
glog.V(3).Infof("PersistentVolume[%s] failed %d times - marking Failed", pv.Name, status.retryCount)
|
||||||
|
recycler.removeReleasedVolume(pv)
|
||||||
|
return api.VolumeFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
status.lastAttempt = time.Now()
|
||||||
|
recycler.releasedVolumes[pv.Name] = status
|
||||||
|
return api.VolumeReleased
|
||||||
|
}
|
||||||
|
|
||||||
|
func (recycler *PersistentVolumeRecycler) removeReleasedVolume(pv *api.PersistentVolume) {
|
||||||
|
delete(recycler.releasedVolumes, pv.Name)
|
||||||
|
}
|
||||||
|
|
||||||
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
|
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
|
||||||
glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
|
glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
|
||||||
|
|
||||||
@@ -154,9 +244,12 @@ func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume
|
|||||||
if err := volRecycler.Recycle(); err != nil {
|
if err := volRecycler.Recycle(); err != nil {
|
||||||
glog.Errorf("PersistentVolume[%s] failed recycling: %+v", pv.Name, err)
|
glog.Errorf("PersistentVolume[%s] failed recycling: %+v", pv.Name, err)
|
||||||
pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
|
pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
|
||||||
nextPhase = api.VolumeFailed
|
nextPhase = recycler.handleReleaseFailure(pv)
|
||||||
} else {
|
} else {
|
||||||
glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
|
glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
|
||||||
|
// The volume has been recycled. Remove any internal state to make
|
||||||
|
// any subsequent bind+recycle cycle working.
|
||||||
|
recycler.removeReleasedVolume(pv)
|
||||||
nextPhase = api.VolumePending
|
nextPhase = api.VolumePending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -200,9 +293,10 @@ func (recycler *PersistentVolumeRecycler) handleDelete(pv *api.PersistentVolume)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("PersistentVolume[%s] failed deletion: %+v", pv.Name, err)
|
glog.Errorf("PersistentVolume[%s] failed deletion: %+v", pv.Name, err)
|
||||||
pv.Status.Message = fmt.Sprintf("Deletion error: %s", err)
|
pv.Status.Message = fmt.Sprintf("Deletion error: %s", err)
|
||||||
nextPhase = api.VolumeFailed
|
nextPhase = recycler.handleReleaseFailure(pv)
|
||||||
} else {
|
} else {
|
||||||
glog.V(5).Infof("PersistentVolume[%s] successfully deleted through plugin\n", pv.Name)
|
glog.V(5).Infof("PersistentVolume[%s] successfully deleted through plugin\n", pv.Name)
|
||||||
|
recycler.removeReleasedVolume(pv)
|
||||||
// after successful deletion through the plugin, we can also remove the PV from the cluster
|
// after successful deletion through the plugin, we can also remove the PV from the cluster
|
||||||
if err := recycler.client.DeletePersistentVolume(pv); err != nil {
|
if err := recycler.client.DeletePersistentVolume(pv); err != nil {
|
||||||
return fmt.Errorf("error deleting persistent volume: %+v", err)
|
return fmt.Errorf("error deleting persistent volume: %+v", err)
|
||||||
|
@@ -17,16 +17,149 @@ limitations under the License.
|
|||||||
package persistentvolume
|
package persistentvolume
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/resource"
|
"k8s.io/kubernetes/pkg/api/resource"
|
||||||
"k8s.io/kubernetes/pkg/client/testing/fake"
|
"k8s.io/kubernetes/pkg/client/testing/fake"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
"k8s.io/kubernetes/pkg/volume/host_path"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
mySyncPeriod = 2 * time.Second
|
||||||
|
myMaximumRetry = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFailedRecycling(t *testing.T) {
|
func TestFailedRecycling(t *testing.T) {
|
||||||
pv := &api.PersistentVolume{
|
pv := preparePV()
|
||||||
|
|
||||||
|
mockClient := &mockBinderClient{
|
||||||
|
volume: pv,
|
||||||
|
}
|
||||||
|
|
||||||
|
// no Init called for pluginMgr and no plugins are available. Volume should fail recycling.
|
||||||
|
plugMgr := volume.VolumePluginMgr{}
|
||||||
|
|
||||||
|
recycler := &PersistentVolumeRecycler{
|
||||||
|
kubeClient: fake.NewSimpleClientset(),
|
||||||
|
client: mockClient,
|
||||||
|
pluginMgr: plugMgr,
|
||||||
|
releasedVolumes: make(map[string]releasedVolumeStatus),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := recycler.reclaimVolume(pv)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected non-nil error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mockClient.volume.Status.Phase != api.VolumeFailed {
|
||||||
|
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a new volume for the next test
|
||||||
|
pv = preparePV()
|
||||||
|
mockClient.volume = pv
|
||||||
|
|
||||||
|
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimDelete
|
||||||
|
err = recycler.reclaimVolume(pv)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected non-nil error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mockClient.volume.Status.Phase != api.VolumeFailed {
|
||||||
|
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRecyclingRetry(t *testing.T) {
|
||||||
|
// Test that recycler controller retries to recycle a volume several times, which succeeds eventually
|
||||||
|
pv := preparePV()
|
||||||
|
|
||||||
|
mockClient := &mockBinderClient{
|
||||||
|
volume: pv,
|
||||||
|
}
|
||||||
|
|
||||||
|
plugMgr := volume.VolumePluginMgr{}
|
||||||
|
// Use a fake NewRecycler function
|
||||||
|
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newFailingMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||||
|
// Reset a global call counter
|
||||||
|
failedCallCount = 0
|
||||||
|
|
||||||
|
recycler := &PersistentVolumeRecycler{
|
||||||
|
kubeClient: fake.NewSimpleClientset(),
|
||||||
|
client: mockClient,
|
||||||
|
pluginMgr: plugMgr,
|
||||||
|
syncPeriod: mySyncPeriod,
|
||||||
|
maximumRetry: myMaximumRetry,
|
||||||
|
releasedVolumes: make(map[string]releasedVolumeStatus),
|
||||||
|
}
|
||||||
|
|
||||||
|
// All but the last attempt will fail
|
||||||
|
testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry-1)
|
||||||
|
|
||||||
|
// The last attempt should succeed
|
||||||
|
err := recycler.reclaimVolume(pv)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Last step: Recycler failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mockClient.volume.Status.Phase != api.VolumePending {
|
||||||
|
t.Errorf("Last step: The volume should be Pending, but is %s instead", mockClient.volume.Status.Phase)
|
||||||
|
}
|
||||||
|
// Check the cache, it should not have any entry
|
||||||
|
status, found := recycler.releasedVolumes[pv.Name]
|
||||||
|
if found {
|
||||||
|
t.Errorf("Last step: Expected PV to be removed from cache, got %v", status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRecyclingRetryAlwaysFail(t *testing.T) {
|
||||||
|
// Test that recycler controller retries to recycle a volume several times, which always fails.
|
||||||
|
pv := preparePV()
|
||||||
|
|
||||||
|
mockClient := &mockBinderClient{
|
||||||
|
volume: pv,
|
||||||
|
}
|
||||||
|
|
||||||
|
plugMgr := volume.VolumePluginMgr{}
|
||||||
|
// Use a fake NewRecycler function
|
||||||
|
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newAlwaysFailingMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||||
|
// Reset a global call counter
|
||||||
|
failedCallCount = 0
|
||||||
|
|
||||||
|
recycler := &PersistentVolumeRecycler{
|
||||||
|
kubeClient: fake.NewSimpleClientset(),
|
||||||
|
client: mockClient,
|
||||||
|
pluginMgr: plugMgr,
|
||||||
|
syncPeriod: mySyncPeriod,
|
||||||
|
maximumRetry: myMaximumRetry,
|
||||||
|
releasedVolumes: make(map[string]releasedVolumeStatus),
|
||||||
|
}
|
||||||
|
|
||||||
|
// myMaximumRetry recycle attempts will fail
|
||||||
|
testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry)
|
||||||
|
|
||||||
|
// The volume should be failed after myMaximumRetry attempts
|
||||||
|
err := recycler.reclaimVolume(pv)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Last step: Recycler failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mockClient.volume.Status.Phase != api.VolumeFailed {
|
||||||
|
t.Errorf("Last step: The volume should be Failed, but is %s instead", mockClient.volume.Status.Phase)
|
||||||
|
}
|
||||||
|
// Check the cache, it should not have any entry
|
||||||
|
status, found := recycler.releasedVolumes[pv.Name]
|
||||||
|
if found {
|
||||||
|
t.Errorf("Last step: Expected PV to be removed from cache, got %v", status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func preparePV() *api.PersistentVolume {
|
||||||
|
return &api.PersistentVolume{
|
||||||
Spec: api.PersistentVolumeSpec{
|
Spec: api.PersistentVolumeSpec{
|
||||||
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
|
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
|
||||||
Capacity: api.ResourceList{
|
Capacity: api.ResourceList{
|
||||||
@@ -34,7 +167,7 @@ func TestFailedRecycling(t *testing.T) {
|
|||||||
},
|
},
|
||||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||||
HostPath: &api.HostPathVolumeSource{
|
HostPath: &api.HostPathVolumeSource{
|
||||||
Path: "/somepath/data02",
|
Path: "/tmp/data02",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
|
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
|
||||||
@@ -47,36 +180,85 @@ func TestFailedRecycling(t *testing.T) {
|
|||||||
Phase: api.VolumeReleased,
|
Phase: api.VolumeReleased,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
mockClient := &mockBinderClient{
|
// Test that `count` attempts to recycle a PV fails.
|
||||||
volume: pv,
|
func testRecycleFailures(t *testing.T, recycler *PersistentVolumeRecycler, mockClient *mockBinderClient, pv *api.PersistentVolume, count int) {
|
||||||
}
|
for i := 1; i <= count; i++ {
|
||||||
|
err := recycler.reclaimVolume(pv)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("STEP %d: Recycler faled: %v", i, err)
|
||||||
|
}
|
||||||
|
|
||||||
// no Init called for pluginMgr and no plugins are available. Volume should fail recycling.
|
// Check the status, it should be failed
|
||||||
plugMgr := volume.VolumePluginMgr{}
|
if mockClient.volume.Status.Phase != api.VolumeReleased {
|
||||||
|
t.Errorf("STEP %d: The volume should be Released, but is %s instead", i, mockClient.volume.Status.Phase)
|
||||||
|
}
|
||||||
|
|
||||||
recycler := &PersistentVolumeRecycler{
|
// Check the failed volume cache
|
||||||
kubeClient: fake.NewSimpleClientset(),
|
status, found := recycler.releasedVolumes[pv.Name]
|
||||||
client: mockClient,
|
if !found {
|
||||||
pluginMgr: plugMgr,
|
t.Errorf("STEP %d: cannot find released volume status", i)
|
||||||
}
|
}
|
||||||
|
if status.retryCount != i {
|
||||||
|
t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount)
|
||||||
|
}
|
||||||
|
|
||||||
err := recycler.reclaimVolume(pv)
|
// call reclaimVolume too early, it should not increment the retryCount
|
||||||
if err != nil {
|
time.Sleep(mySyncPeriod / 2)
|
||||||
t.Errorf("Unexpected non-nil error: %v", err)
|
err = recycler.reclaimVolume(pv)
|
||||||
}
|
if err != nil {
|
||||||
|
t.Errorf("STEP %d: Recycler failed: %v", i, err)
|
||||||
|
}
|
||||||
|
|
||||||
if mockClient.volume.Status.Phase != api.VolumeFailed {
|
status, found = recycler.releasedVolumes[pv.Name]
|
||||||
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
|
if !found {
|
||||||
}
|
t.Errorf("STEP %d: cannot find released volume status", i)
|
||||||
|
}
|
||||||
|
if status.retryCount != i {
|
||||||
|
t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount)
|
||||||
|
}
|
||||||
|
|
||||||
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimDelete
|
// Call the next reclaimVolume() after full pvRecycleRetryPeriod
|
||||||
err = recycler.reclaimVolume(pv)
|
time.Sleep(mySyncPeriod / 2)
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected non-nil error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if mockClient.volume.Status.Phase != api.VolumeFailed {
|
|
||||||
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
||||||
|
return &failingMockRecycler{
|
||||||
|
path: spec.PersistentVolume.Spec.HostPath.Path,
|
||||||
|
errorCount: myMaximumRetry - 1, // fail two times and then successfuly recycle the volume
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAlwaysFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
||||||
|
return &failingMockRecycler{
|
||||||
|
path: spec.PersistentVolume.Spec.HostPath.Path,
|
||||||
|
errorCount: 1000, // always fail
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type failingMockRecycler struct {
|
||||||
|
path string
|
||||||
|
// How many times should the recycler fail before returning success.
|
||||||
|
errorCount int
|
||||||
|
volume.MetricsNil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Counter of failingMockRecycler.Recycle() calls. Global variable just for
|
||||||
|
// testing. It's too much code to create a custom volume plugin, which would
|
||||||
|
// hold this variable.
|
||||||
|
var failedCallCount = 0
|
||||||
|
|
||||||
|
func (r *failingMockRecycler) GetPath() string {
|
||||||
|
return r.path
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *failingMockRecycler) Recycle() error {
|
||||||
|
failedCallCount += 1
|
||||||
|
if failedCallCount <= r.errorCount {
|
||||||
|
return fmt.Errorf("Failing for %d. time", failedCallCount)
|
||||||
|
}
|
||||||
|
// return nil means recycle passed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -58,7 +58,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
|||||||
binder.Run()
|
binder.Run()
|
||||||
defer binder.Stop()
|
defer binder.Stop()
|
||||||
|
|
||||||
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Second, plugins, cloud)
|
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Second, 3, plugins, cloud)
|
||||||
recycler.Run()
|
recycler.Run()
|
||||||
defer recycler.Stop()
|
defer recycler.Stop()
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user