Attach/Detach Controller Kubelet Changes

This PR contains Kubelet changes to enable attach/detach controller control.
* It introduces a new "enable-controller-attach-detach" kubelet flag to
  enable control by controller. Default enabled.
* It removes all references "SafeToDetach" annoation from controller.
* It adds the new VolumesInUse field to the Node Status API object.
* It modifies the controller to use VolumesInUse instead of SafeToDetach
  annotation to gate detachment.
* There is a bug in node-problem-detector that causes VolumesInUse to
  get reset every 30 seconds. Issue https://github.com/kubernetes/node-problem-detector/issues/9
  opened to fix that.
This commit is contained in:
Saad Ali
2016-05-23 13:37:30 -07:00
committed by saadali
parent 24ddec1cbf
commit 9dbe943491
35 changed files with 2346 additions and 1217 deletions

View File

@@ -20,7 +20,6 @@ package volume
import (
"fmt"
"strings"
"time"
"github.com/golang/glog"
@@ -36,28 +35,19 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/attachdetach"
)
const (
// ControllerManagedAnnotation is the key of the annotation on Node objects
// that indicates attach/detach operations for the node should be managed
// by the attach/detach controller
ControllerManagedAnnotation string = "volumes.kubernetes.io/controller-managed-attach"
// SafeToDetachAnnotation is the annotation added to the Node object by
// kubelet in the format "volumes.kubernetes.io/safetodetach/{volumename}"
// to indicate the volume has been unmounted and is safe to detach.
SafeToDetachAnnotation string = "volumes.kubernetes.io/safetodetach-"
// loopPeriod is the ammount of time the reconciler loop waits between
// successive executions
reconcilerLoopPeriod time.Duration = 100 * time.Millisecond
// reconcilerMaxSafeToDetachDuration is the maximum amount of time the
// attach detach controller will wait for a volume to be safely detached
// reconcilerMaxWaitForUnmountDuration is the maximum amount of time the
// attach detach controller will wait for a volume to be safely unmounted
// from its node. Once this time has expired, the controller will assume the
// node or kubelet are unresponsive and will detach the volume anyway.
reconcilerMaxSafeToDetachDuration time.Duration = 10 * time.Minute
reconcilerMaxWaitForUnmountDuration time.Duration = 3 * time.Minute
)
// AttachDetachController defines the operations supported by this controller.
@@ -116,7 +106,7 @@ func NewAttachDetachController(
adc.attacherDetacher = attacherdetacher.NewAttacherDetacher(&adc.volumePluginMgr)
adc.reconciler = reconciler.NewReconciler(
reconcilerLoopPeriod,
reconcilerMaxSafeToDetachDuration,
reconcilerMaxWaitForUnmountDuration,
adc.desiredStateOfWorld,
adc.actualStateOfWorld,
adc.attacherDetacher)
@@ -215,13 +205,13 @@ func (adc *attachDetachController) nodeAdd(obj interface{}) {
}
nodeName := node.Name
if _, exists := node.Annotations[ControllerManagedAnnotation]; exists {
if _, exists := node.Annotations[attachdetach.ControllerManagedAnnotation]; exists {
// Node specifies annotation indicating it should be managed by attach
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName)
}
adc.processSafeToDetachAnnotations(nodeName, node.Annotations)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
}
func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
@@ -240,7 +230,7 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
glog.V(10).Infof("%v", err)
}
adc.processSafeToDetachAnnotations(nodeName, node.Annotations)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
}
// processPodVolumes processes the volumes in the given pod and adds them to the
@@ -309,10 +299,11 @@ func (adc *attachDetachController) processPodVolumes(
} else {
// Remove volume from desired state of world
uniqueVolumeName, err := attachableVolumePlugin.GetUniqueVolumeName(volumeSpec)
uniqueVolumeName, err := attachdetach.GetUniqueDeviceNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
glog.V(10).Infof(
"Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeName failed with %v",
"Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GenerateUniqueDeviceName failed with %v",
podVolume.Name,
pod.Namespace,
pod.Name,
@@ -332,20 +323,48 @@ func (adc *attachDetachController) processPodVolumes(
func (adc *attachDetachController) createVolumeSpec(
podVolume api.Volume, podNamespace string) (*volume.Spec, error) {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
glog.V(10).Infof(
"Found PVC, ClaimName: %q/%q",
podNamespace,
pvcSource.ClaimName)
// If podVolume is a PVC, fetch the real PV behind the claim
pvName, pvcUID, err := adc.getPVCFromCacheExtractPV(
podNamespace, pvcSource.ClaimName)
if err != nil {
return nil, fmt.Errorf("error processing PVC %q: %v", pvcSource.ClaimName, err)
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}
glog.V(10).Infof(
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
podNamespace,
pvcSource.ClaimName,
pvcUID,
pvName)
// Fetch actual PV object
volumeSpec, err := adc.getPVSpecFromCache(
pvName, pvcSource.ReadOnly, pvcUID)
if err != nil {
return nil, fmt.Errorf("error processing PVC %q: %v", pvcSource.ClaimName, err)
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}
glog.V(10).Infof(
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
volumeSpec.Name,
pvName,
podNamespace,
pvcSource.ClaimName,
pvcUID)
return volumeSpec, nil
}
@@ -353,7 +372,8 @@ func (adc *attachDetachController) createVolumeSpec(
// informer it may be mutated by another consumer.
clonedPodVolumeObj, err := api.Scheme.DeepCopy(podVolume)
if err != nil || clonedPodVolumeObj == nil {
return nil, fmt.Errorf("failed to deep copy %q volume object", podVolume.Name)
return nil, fmt.Errorf(
"failed to deep copy %q volume object. err=%v", podVolume.Name, err)
}
clonedPodVolume, ok := clonedPodVolumeObj.(api.Volume)
@@ -377,7 +397,7 @@ func (adc *attachDetachController) getPVCFromCacheExtractPV(
key = namespace + "/" + name
}
pvcObj, exists, err := adc.pvcInformer.GetStore().Get(key)
pvcObj, exists, err := adc.pvcInformer.GetStore().GetByKey(key)
if pvcObj == nil || !exists || err != nil {
return "", "", fmt.Errorf(
"failed to find PVC %q in PVCInformer cache. %v",
@@ -386,7 +406,7 @@ func (adc *attachDetachController) getPVCFromCacheExtractPV(
}
pvc, ok := pvcObj.(*api.PersistentVolumeClaim)
if ok || pvc == nil {
if !ok || pvc == nil {
return "", "", fmt.Errorf(
"failed to cast %q object %#v to PersistentVolumeClaim",
key,
@@ -414,14 +434,14 @@ func (adc *attachDetachController) getPVSpecFromCache(
name string,
pvcReadOnly bool,
expectedClaimUID types.UID) (*volume.Spec, error) {
pvObj, exists, err := adc.pvInformer.GetStore().Get(name)
pvObj, exists, err := adc.pvInformer.GetStore().GetByKey(name)
if pvObj == nil || !exists || err != nil {
return nil, fmt.Errorf(
"failed to find PV %q in PVInformer cache. %v", name, err)
}
pv, ok := pvObj.(*api.PersistentVolume)
if ok || pv == nil {
if !ok || pv == nil {
return nil, fmt.Errorf(
"failed to cast %q object %#v to PersistentVolume", name, pvObj)
}
@@ -442,9 +462,10 @@ func (adc *attachDetachController) getPVSpecFromCache(
// Do not return the object from the informer, since the store is shared it
// may be mutated by another consumer.
clonedPVObj, err := api.Scheme.DeepCopy(pv)
clonedPVObj, err := api.Scheme.DeepCopy(*pv)
if err != nil || clonedPVObj == nil {
return nil, fmt.Errorf("failed to deep copy %q PV object", name)
return nil, fmt.Errorf(
"failed to deep copy %q PV object. err=%v", name, err)
}
clonedPV, ok := clonedPVObj.(api.PersistentVolume)
@@ -456,28 +477,28 @@ func (adc *attachDetachController) getPVSpecFromCache(
return volume.NewSpecFromPersistentVolume(&clonedPV, pvcReadOnly), nil
}
// processSafeToDetachAnnotations processes the "safe to detach" annotations for
// the given node. It makes calls to delete any annotations referring to volumes
// it is not aware of. For volumes it is aware of, it marks them safe to detach
// in the "actual state of world" data structure.
func (adc *attachDetachController) processSafeToDetachAnnotations(
nodeName string, annotations map[string]string) {
var annotationsToRemove []string
for annotation := range annotations {
// Check annotations for "safe to detach" volumes
annotation = strings.ToLower(annotation)
if strings.HasPrefix(annotation, SafeToDetachAnnotation) {
// If volume exists in "actual state of world" mark it as safe to detach
safeToAttachVolume := strings.TrimPrefix(annotation, SafeToDetachAnnotation)
if err := adc.actualStateOfWorld.MarkVolumeNodeSafeToDetach(safeToAttachVolume, nodeName); err != nil {
// If volume doesn't exist in "actual state of world" remove
// the "safe to detach" annotation from the node
annotationsToRemove = append(annotationsToRemove, annotation)
// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName string, volumesInUse []api.UniqueDeviceName) {
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
for _, volumeInUse := range volumesInUse {
if attachedVolume.VolumeName == volumeInUse {
mounted = true
break
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(
attachedVolume.VolumeName, nodeName, mounted)
if err != nil {
glog.Warningf(
"SetVolumeMountedByNode(%q, %q, %q) returned an error: %v",
attachedVolume.VolumeName, nodeName, mounted, err)
}
}
// TODO: Call out to API server to delete annotationsToRemove from Node
}
// getUniquePodName returns a unique name to reference pod by in memory caches

View File

@@ -40,7 +40,7 @@ type AttacherDetacher interface {
// responsible for implmenting this behavior).
// All other errors are logged and the goroutine terminates without updating
// actualStateOfWorld (caller is responsible for retrying as needed).
AttachVolume(volumeToAttach *cache.VolumeToAttach, actualStateOfWorld cache.ActualStateOfWorld) error
AttachVolume(volumeToAttach cache.VolumeToAttach, actualStateOfWorld cache.ActualStateOfWorld) error
// Spawns a new goroutine to execute volume-specific logic to detach the
// volume from the node specified in volumeToDetach.
@@ -51,7 +51,7 @@ type AttacherDetacher interface {
// responsible for implmenting this behavior).
// All other errors are logged and the goroutine terminates without updating
// actualStateOfWorld (caller is responsible for retrying as needed).
DetachVolume(volumeToDetach *cache.AttachedVolume, actualStateOfWorld cache.ActualStateOfWorld) error
DetachVolume(volumeToDetach cache.AttachedVolume, actualStateOfWorld cache.ActualStateOfWorld) error
}
// NewAttacherDetacher returns a new instance of AttacherDetacher.
@@ -72,29 +72,29 @@ type attacherDetacher struct {
}
func (ad *attacherDetacher) AttachVolume(
volumeToAttach *cache.VolumeToAttach,
volumeToAttach cache.VolumeToAttach,
actualStateOfWorld cache.ActualStateOfWorld) error {
attachFunc, err := ad.generateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
if err != nil {
return err
}
return ad.pendingOperations.Run(volumeToAttach.VolumeName, attachFunc)
return ad.pendingOperations.Run(string(volumeToAttach.VolumeName), attachFunc)
}
func (ad *attacherDetacher) DetachVolume(
volumeToDetach *cache.AttachedVolume,
volumeToDetach cache.AttachedVolume,
actualStateOfWorld cache.ActualStateOfWorld) error {
detachFunc, err := ad.generateDetachVolumeFunc(volumeToDetach, actualStateOfWorld)
if err != nil {
return err
}
return ad.pendingOperations.Run(volumeToDetach.VolumeName, detachFunc)
return ad.pendingOperations.Run(string(volumeToDetach.VolumeName), detachFunc)
}
func (ad *attacherDetacher) generateAttachVolumeFunc(
volumeToAttach *cache.VolumeToAttach,
volumeToAttach cache.VolumeToAttach,
actualStateOfWorld cache.ActualStateOfWorld) (func() error, error) {
// Get attacher plugin
attachableVolumePlugin, err := ad.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
@@ -119,15 +119,23 @@ func (ad *attacherDetacher) generateAttachVolumeFunc(
if attachErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf("Attach operation for %q failed with: %v", volumeToAttach.VolumeName, attachErr)
glog.Errorf(
"Attach operation for device %q to node %q failed with: %v",
volumeToAttach.VolumeName, volumeToAttach.NodeName, attachErr)
return attachErr
}
glog.Infof(
"Successfully attached device %q to node %q. Will update actual state of world.",
volumeToAttach.VolumeName, volumeToAttach.NodeName)
// Update actual state of world
_, addVolumeNodeErr := actualStateOfWorld.AddVolumeNode(volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
if addVolumeNodeErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf("Attach operation for %q succeeded but updating actualStateOfWorld failed with: %v", volumeToAttach.VolumeName, addVolumeNodeErr)
glog.Errorf(
"Attach operation for device %q to node %q succeeded, but updating actualStateOfWorld failed with: %v",
volumeToAttach.VolumeName, volumeToAttach.NodeName, addVolumeNodeErr)
return addVolumeNodeErr
}
@@ -136,7 +144,7 @@ func (ad *attacherDetacher) generateAttachVolumeFunc(
}
func (ad *attacherDetacher) generateDetachVolumeFunc(
volumeToDetach *cache.AttachedVolume,
volumeToDetach cache.AttachedVolume,
actualStateOfWorld cache.ActualStateOfWorld) (func() error, error) {
// Get attacher plugin
attachableVolumePlugin, err := ad.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
@@ -150,7 +158,7 @@ func (ad *attacherDetacher) generateDetachVolumeFunc(
deviceName, err := attachableVolumePlugin.GetDeviceName(volumeToDetach.VolumeSpec)
if err != nil {
return nil, fmt.Errorf(
"failed to GetUniqueVolumeName from AttachablePlugin for volumeSpec %q err=%v",
"failed to GetDeviceName from AttachablePlugin for volumeSpec %q err=%v",
volumeToDetach.VolumeSpec.Name(),
err)
}
@@ -169,11 +177,15 @@ func (ad *attacherDetacher) generateDetachVolumeFunc(
if detachErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf("Detach operation for %q failed with: %v", volumeToDetach.VolumeName, detachErr)
glog.Errorf(
"Detach operation for device %q from node %q failed with: %v",
volumeToDetach.VolumeName, volumeToDetach.NodeName, detachErr)
return detachErr
}
// TODO: Reset "safe to detach" annotation on Node
glog.Infof(
"Successfully detached device %q from node %q. Will update actual state of world.",
volumeToDetach.VolumeName, volumeToDetach.NodeName)
// Update actual state of world
actualStateOfWorld.DeleteVolumeNode(volumeToDetach.VolumeName, volumeToDetach.NodeName)

View File

@@ -26,7 +26,9 @@ import (
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/attachdetach"
)
// ActualStateOfWorld defines a set of thread-safe operations supported on
@@ -38,21 +40,23 @@ type ActualStateOfWorld interface {
// indicating the specified volume is attached to the specified node.
// A unique volumeName is generated from the volumeSpec and returned on
// success.
// If the volume/node combo already exists, this is a no-op.
// If the volume/node combo already exists, the detachRequestedTime is reset
// to zero.
// If volumeSpec is not an attachable volume plugin, an error is returned.
// If no volume with the name volumeName exists in the store, the volume is
// added.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, the node is added.
AddVolumeNode(volumeSpec *volume.Spec, nodeName string) (string, error)
AddVolumeNode(volumeSpec *volume.Spec, nodeName string) (api.UniqueDeviceName, error)
// MarkVolumeNodeSafeToDetach marks the given volume as safe to detach from
// the given node.
// SetVolumeMountedByNode sets the MountedByNode value for the given volume
// and node. When set to true this value indicates the volume is mounted by
// the given node, indicating it may not be safe to detach.
// If no volume with the name volumeName exists in the store, an error is
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
MarkVolumeNodeSafeToDetach(volumeName, nodeName string) error
SetVolumeMountedByNode(volumeName api.UniqueDeviceName, nodeName string, mounted bool) error
// MarkDesireToDetach returns the difference between the current time and
// the DetachRequestedTime for the given volume/node combo. If the
@@ -61,7 +65,7 @@ type ActualStateOfWorld interface {
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
MarkDesireToDetach(volumeName, nodeName string) (time.Duration, error)
MarkDesireToDetach(volumeName api.UniqueDeviceName, nodeName string) (time.Duration, error)
// DeleteVolumeNode removes the given volume and node from the underlying
// store indicating the specified volume is no longer attached to the
@@ -69,23 +73,28 @@ type ActualStateOfWorld interface {
// If the volume/node combo does not exist, this is a no-op.
// If after deleting the node, the specified volume contains no other child
// nodes, the volume is also deleted.
DeleteVolumeNode(volumeName, nodeName string)
DeleteVolumeNode(volumeName api.UniqueDeviceName, nodeName string)
// VolumeNodeExists returns true if the specified volume/node combo exists
// in the underlying store indicating the specified volume is attached to
// the specified node.
VolumeNodeExists(volumeName, nodeName string) bool
VolumeNodeExists(volumeName api.UniqueDeviceName, nodeName string) bool
// GetAttachedVolumes generates and returns a list of volumes/node pairs
// reflecting which volumes are attached to which nodes based on the
// current actual state of the world.
GetAttachedVolumes() []AttachedVolume
// GetAttachedVolumes generates and returns a list of volumes attached to
// the specified node reflecting which volumes are attached to that node
// based on the current actual state of the world.
GetAttachedVolumesForNode(nodeName string) []AttachedVolume
}
// AttachedVolume represents a volume that is attached to a node.
type AttachedVolume struct {
// VolumeName is the unique identifier for the volume that is attached.
VolumeName string
VolumeName api.UniqueDeviceName
// VolumeSpec is the volume spec containing the specification for the
// volume that is attached.
@@ -94,11 +103,10 @@ type AttachedVolume struct {
// NodeName is the identifier for the node that the volume is attached to.
NodeName string
// SafeToDetach indicates that this volume has been been unmounted from the
// node and is safe to detach.
// The value is set by MarkVolumeNodeSafeToDetach(...) and reset on
// AddVolumeNode(...) calls.
SafeToDetach bool
// MountedByNode indicates that this volume has been been mounted by the
// node and is unsafe to detach.
// The value is set and unset by SetVolumeMountedByNode(...).
MountedByNode bool
// DetachRequestedTime is used to capture the desire to detach this volume.
// When the volume is newly created this value is set to time zero.
@@ -111,7 +119,7 @@ type AttachedVolume struct {
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld.
func NewActualStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld {
return &actualStateOfWorld{
attachedVolumes: make(map[string]attachedVolume),
attachedVolumes: make(map[api.UniqueDeviceName]attachedVolume),
volumePluginMgr: volumePluginMgr,
}
}
@@ -121,7 +129,7 @@ type actualStateOfWorld struct {
// controller believes to be successfully attached to the nodes it is
// managing. The key in this map is the name of the volume and the value is
// an object containing more information about the attached volume.
attachedVolumes map[string]attachedVolume
attachedVolumes map[api.UniqueDeviceName]attachedVolume
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
@@ -132,7 +140,7 @@ type actualStateOfWorld struct {
// believes to be succesfully attached to a node it is managing.
type attachedVolume struct {
// volumeName contains the unique identifier for this volume.
volumeName string
volumeName api.UniqueDeviceName
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to attach/detach
@@ -151,15 +159,22 @@ type nodeAttachedTo struct {
// nodeName contains the name of this node.
nodeName string
// safeToDetach indicates that this node/volume combo has been unmounted
// by the node and is safe to detach
safeToDetach bool
// mountedByNode indicates that this node/volume combo is mounted by the
// node and is unsafe to detach
mountedByNode bool
// number of times SetVolumeMountedByNode has been called to set the value
// of mountedByNode to true. This is used to prevent mountedByNode from
// being reset during the period between attach and mount when volumesInUse
// status for the node may not be set.
mountedByNodeSetCount uint
// detachRequestedTime used to capture the desire to detach this volume
detachRequestedTime time.Time
}
func (asw *actualStateOfWorld) AddVolumeNode(volumeSpec *volume.Spec, nodeName string) (string, error) {
func (asw *actualStateOfWorld) AddVolumeNode(
volumeSpec *volume.Spec, nodeName string) (api.UniqueDeviceName, error) {
asw.Lock()
defer asw.Unlock()
@@ -171,10 +186,11 @@ func (asw *actualStateOfWorld) AddVolumeNode(volumeSpec *volume.Spec, nodeName s
err)
}
volumeName, err := attachableVolumePlugin.GetUniqueVolumeName(volumeSpec)
volumeName, err := attachdetach.GetUniqueDeviceNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeName from AttachablePlugin for volumeSpec %q err=%v",
"failed to GetUniqueDeviceNameFromSpec for volumeSpec %q err=%v",
volumeSpec.Name(),
err)
}
@@ -190,55 +206,70 @@ func (asw *actualStateOfWorld) AddVolumeNode(volumeSpec *volume.Spec, nodeName s
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists || nodeObj.safeToDetach || !nodeObj.detachRequestedTime.IsZero() {
if !nodeExists {
// Create object if it doesn't exist.
// Reset safeToDeatch and detachRequestedTime values if it does.
volumeObj.nodesAttachedTo[nodeName] = nodeAttachedTo{
nodeName: nodeName,
safeToDetach: false,
detachRequestedTime: time.Time{},
nodeName: nodeName,
mountedByNode: true, // Assume mounted, until proven otherwise
mountedByNodeSetCount: 0,
detachRequestedTime: time.Time{},
}
} else if !nodeObj.detachRequestedTime.IsZero() {
// Reset detachRequestedTime values if object exists and time is non-zero
nodeObj.detachRequestedTime = time.Time{}
volumeObj.nodesAttachedTo[nodeName] = nodeObj
}
return volumeName, nil
}
func (asw *actualStateOfWorld) MarkVolumeNodeSafeToDetach(
volumeName, nodeName string) error {
func (asw *actualStateOfWorld) SetVolumeMountedByNode(
volumeName api.UniqueDeviceName, nodeName string, mounted bool) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"failed to MarkVolumeNodeSafeToDetach(volumeName=%q, nodeName=%q) volumeName does not exist",
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) volumeName does not exist",
volumeName,
nodeName)
nodeName,
mounted)
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return fmt.Errorf(
"failed to MarkVolumeNodeSafeToDetach(volumeName=%q, nodeName=%q) nodeName does not exist",
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) nodeName does not exist",
volumeName,
nodeName)
nodeName,
mounted)
}
// Reset safe to detach
nodeObj.safeToDetach = true
if mounted {
// Increment set count
nodeObj.mountedByNodeSetCount = nodeObj.mountedByNodeSetCount + 1
} else {
// Do not allow value to be reset unless it has been set at least once
if nodeObj.mountedByNodeSetCount == 0 {
return nil
}
}
nodeObj.mountedByNode = mounted
volumeObj.nodesAttachedTo[nodeName] = nodeObj
return nil
}
func (asw *actualStateOfWorld) MarkDesireToDetach(
volumeName, nodeName string) (time.Duration, error) {
volumeName api.UniqueDeviceName, nodeName string) (time.Duration, error) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkVolumeNodeSafeToDetach(volumeName=%q, nodeName=%q) volumeName does not exist",
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) volumeName does not exist",
volumeName,
nodeName)
}
@@ -246,7 +277,7 @@ func (asw *actualStateOfWorld) MarkDesireToDetach(
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkVolumeNodeSafeToDetach(volumeName=%q, nodeName=%q) nodeName does not exist",
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) nodeName does not exist",
volumeName,
nodeName)
}
@@ -259,7 +290,8 @@ func (asw *actualStateOfWorld) MarkDesireToDetach(
return time.Since(volumeObj.nodesAttachedTo[nodeName].detachRequestedTime), nil
}
func (asw *actualStateOfWorld) DeleteVolumeNode(volumeName, nodeName string) {
func (asw *actualStateOfWorld) DeleteVolumeNode(
volumeName api.UniqueDeviceName, nodeName string) {
asw.Lock()
defer asw.Unlock()
@@ -278,7 +310,8 @@ func (asw *actualStateOfWorld) DeleteVolumeNode(volumeName, nodeName string) {
}
}
func (asw *actualStateOfWorld) VolumeNodeExists(volumeName, nodeName string) bool {
func (asw *actualStateOfWorld) VolumeNodeExists(
volumeName api.UniqueDeviceName, nodeName string) bool {
asw.RLock()
defer asw.RUnlock()
@@ -305,10 +338,35 @@ func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
NodeName: nodeName,
VolumeName: volumeName,
VolumeSpec: volumeObj.spec,
SafeToDetach: nodeObj.safeToDetach,
MountedByNode: nodeObj.mountedByNode,
DetachRequestedTime: nodeObj.detachRequestedTime})
}
}
return attachedVolumes
}
func (asw *actualStateOfWorld) GetAttachedVolumesForNode(
nodeName string) []AttachedVolume {
asw.RLock()
defer asw.RUnlock()
attachedVolumes := make(
[]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for volumeName, volumeObj := range asw.attachedVolumes {
for actualNodeName, nodeObj := range volumeObj.nodesAttachedTo {
if actualNodeName == nodeName {
attachedVolumes = append(
attachedVolumes,
AttachedVolume{
NodeName: nodeName,
VolumeName: volumeName,
VolumeSpec: volumeObj.spec,
MountedByNode: nodeObj.mountedByNode,
DetachRequestedTime: nodeObj.detachRequestedTime})
}
}
}
return attachedVolumes
}

View File

@@ -19,15 +19,18 @@ package cache
import (
"testing"
"k8s.io/kubernetes/pkg/api"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/testing"
)
// Calls AddVolumeNode() once.
// Verifies a single volume/node entry exists.
func Test_AddVolumeNode_Positive_NewVolumeNewNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
@@ -49,15 +52,17 @@ func Test_AddVolumeNode_Positive_NewVolumeNewNode(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Calls AddVolumeNode() twice. Second time use a different node name.
// Verifies two volume/node entries exist with the same volumeSpec.
func Test_AddVolumeNode_Positive_ExistingVolumeNewNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
node1Name := "node1-name"
node2Name := "node2-name"
@@ -95,16 +100,18 @@ func Test_AddVolumeNode_Positive_ExistingVolumeNewNode(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <2> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, volumeName, node1Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, volumeName, node2Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volumeName), node1Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volumeName), node2Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Calls AddVolumeNode() twice. Uses the same volume and node both times.
// Verifies a single volume/node entry exists.
func Test_AddVolumeNode_Positive_ExistingVolumeExistingNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
// Act
@@ -136,15 +143,18 @@ func Test_AddVolumeNode_Positive_ExistingVolumeExistingNode(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with one volume/node entry.
// Calls DeleteVolumeNode() to delete volume/node.
// Verifies no volume/node entries exists.
func Test_DeleteVolumeNode_Positive_VolumeExistsNodeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -166,11 +176,13 @@ func Test_DeleteVolumeNode_Positive_VolumeExistsNodeExists(t *testing.T) {
}
}
// Calls DeleteVolumeNode() to delete volume/node on empty data stcut
// Verifies no volume/node entries exists.
func Test_DeleteVolumeNode_Positive_VolumeDoesntExistNodeDoesntExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeName := api.UniqueDeviceName("volume-name")
nodeName := "node-name"
// Act
@@ -188,12 +200,16 @@ func Test_DeleteVolumeNode_Positive_VolumeDoesntExistNodeDoesntExist(t *testing.
}
}
// Populates data struct with two volume/node entries the second one using a
// different node.
// Calls DeleteVolumeNode() to delete first volume/node.
// Verifies only second volume/node entry exists.
func Test_DeleteVolumeNode_Positive_TwoNodesOneDeleted(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
node1Name := "node1-name"
node2Name := "node2-name"
generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name)
@@ -230,15 +246,18 @@ func Test_DeleteVolumeNode_Positive_TwoNodesOneDeleted(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, volumeName, node2Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volumeName), node2Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with one volume/node entry.
// Calls VolumeNodeExists() to verify entry.
// Verifies the populated volume/node entry exists.
func Test_VolumeNodeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -258,15 +277,18 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with one volume1/node1 entry.
// Calls VolumeNodeExists() with volume1/node2.
// Verifies requested entry does not exist, but populated entry does.
func Test_VolumeNodeExists_Positive_VolumeExistsNodeDoesntExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
node1Name := "node1-name"
node2Name := "node2-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, node1Name)
@@ -287,14 +309,16 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeDoesntExist(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, node1Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), node1Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Calls VolumeNodeExists() on empty data struct.
// Verifies requested entry does not exist.
func Test_VolumeNodeExists_Positive_VolumeAndNodeDontExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeName := api.UniqueDeviceName("volume-name")
nodeName := "node-name"
// Act
@@ -311,6 +335,8 @@ func Test_VolumeNodeExists_Positive_VolumeAndNodeDontExist(t *testing.T) {
}
}
// Calls GetAttachedVolumes() on empty data struct.
// Verifies no volume/node entries are returned.
func Test_GetAttachedVolumes_Positive_NoVolumesOrNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -325,12 +351,15 @@ func Test_GetAttachedVolumes_Positive_NoVolumesOrNodes(t *testing.T) {
}
}
// Populates data struct with one volume/node entry.
// Calls GetAttachedVolumes() to get list of entries.
// Verifies one volume/node entry is returned.
func Test_GetAttachedVolumes_Positive_OneVolumeOneNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -345,22 +374,25 @@ func Test_GetAttachedVolumes_Positive_OneVolumeOneNode(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with two volume/node entries (different node and volume).
// Calls GetAttachedVolumes() to get list of entries.
// Verifies both volume/node entries are returned.
func Test_GetAttachedVolumes_Positive_TwoVolumeTwoNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volume1Name := "volume1-name"
volume1Spec := controllervolumetesting.GetTestVolumeSpec(volume1Name, volume1Name)
volume1Name := api.UniqueDeviceName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
node1Name := "node1-name"
generatedVolumeName1, add1Err := asw.AddVolumeNode(volume1Spec, node1Name)
if add1Err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", add1Err)
}
volume2Name := "volume2-name"
volume2Spec := controllervolumetesting.GetTestVolumeSpec(volume2Name, volume2Name)
volume2Name := api.UniqueDeviceName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
node2Name := "node2-name"
generatedVolumeName2, add2Err := asw.AddVolumeNode(volume2Spec, node2Name)
if add2Err != nil {
@@ -375,16 +407,19 @@ func Test_GetAttachedVolumes_Positive_TwoVolumeTwoNodes(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <2> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, volume1Name, node1Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName2, volume2Name, node2Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volume1Name), node1Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName2, string(volume2Name), node2Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with two volume/node entries (same volume different node).
// Calls GetAttachedVolumes() to get list of entries.
// Verifies both volume/node entries are returned.
func Test_GetAttachedVolumes_Positive_OneVolumeTwoNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
node1Name := "node1-name"
generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name)
if add1Err != nil {
@@ -411,16 +446,18 @@ func Test_GetAttachedVolumes_Positive_OneVolumeTwoNodes(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <2> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, volumeName, node1Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, volumeName, node2Name, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volumeName), node1Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volumeName), node2Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
func Test_MarkVolumeNodeSafeToDetach_Positive_NotMarked(t *testing.T) {
// Populates data struct with one volume/node entry.
// Verifies mountedByNode is true and DetachRequestedTime is zero.
func Test_SetVolumeMountedByNode_Positive_Set(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -435,15 +472,18 @@ func Test_MarkVolumeNodeSafeToDetach_Positive_NotMarked(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
func Test_MarkVolumeNodeSafeToDetach_Positive_Marked(t *testing.T) {
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode twice, first setting mounted to true then false.
// Verifies mountedByNode is false.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -451,11 +491,15 @@ func Test_MarkVolumeNodeSafeToDetach_Positive_Marked(t *testing.T) {
}
// Act
markSafeToDetachErr := asw.MarkVolumeNodeSafeToDetach(generatedVolumeName, nodeName)
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
// Assert
if markSafeToDetachErr != nil {
t.Fatalf("MarkVolumeNodeSafeToDetach failed. Expected <no error> Actual: <%v>", markSafeToDetachErr)
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
attachedVolumes := asw.GetAttachedVolumes()
@@ -463,15 +507,18 @@ func Test_MarkVolumeNodeSafeToDetach_Positive_Marked(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, true /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, false /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
func Test_MarkVolumeNodeSafeToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode once, setting mounted to false.
// Verifies mountedByNode is still true (since there was no SetVolumeMountedByNode to true call first)
func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -479,12 +526,48 @@ func Test_MarkVolumeNodeSafeToDetach_Positive_MarkedAddVolumeNodeReset(t *testin
}
// Act
markSafeToDetachErr := asw.MarkVolumeNodeSafeToDetach(generatedVolumeName, nodeName)
setVolumeMountedErr := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
// Assert
if setVolumeMountedErr != nil {
t.Fatalf("SetVolumeMountedByNode failed. Expected <no error> Actual: <%v>", setVolumeMountedErr)
}
attachedVolumes := asw.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode twice, first setting mounted to true then false.
// Calls AddVolumeNode to readd the same volume/node.
// Verifies mountedByNode is false and detachRequestedTime is zero.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotReset(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
// Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
generatedVolumeName, addErr = asw.AddVolumeNode(volumeSpec, nodeName)
// Assert
if markSafeToDetachErr != nil {
t.Fatalf("MarkVolumeNodeSafeToDetach failed. Expected <no error> Actual: <%v>", markSafeToDetachErr)
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
@@ -495,15 +578,19 @@ func Test_MarkVolumeNodeSafeToDetach_Positive_MarkedAddVolumeNodeReset(t *testin
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, false /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
func Test_MarkVolumeNodeSafeToDetach_Positive_MarkedVerifyDetachRequestedTimePerserved(t *testing.T) {
// Populates data struct with one volume/node entry.
// Calls MarkDesireToDetach() once on volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequestedTimePerserved(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -516,11 +603,15 @@ func Test_MarkVolumeNodeSafeToDetach_Positive_MarkedVerifyDetachRequestedTimePer
expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime
// Act
markSafeToDetachErr := asw.MarkVolumeNodeSafeToDetach(generatedVolumeName, nodeName)
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
// Assert
if markSafeToDetachErr != nil {
t.Fatalf("MarkVolumeNodeSafeToDetach failed. Expected <no error> Actual: <%v>", markSafeToDetachErr)
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
attachedVolumes := asw.GetAttachedVolumes()
@@ -528,18 +619,20 @@ func Test_MarkVolumeNodeSafeToDetach_Positive_MarkedVerifyDetachRequestedTimePer
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, true /* expectedSafeToDetach */, true /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, false /* expectedMountedByNode */, true /* expectNonZeroDetachRequestedTime */)
if !expectedDetachRequestedTime.Equal(attachedVolumes[0].DetachRequestedTime) {
t.Fatalf("DetachRequestedTime changed. Expected: <%v> Actual: <%v>", expectedDetachRequestedTime, attachedVolumes[0].DetachRequestedTime)
}
}
func Test_MarkDesireToDetach_Positive_NotMarked(t *testing.T) {
// Populates data struct with one volume/node entry.
// Verifies mountedByNode is true and detachRequestedTime is zero (default values).
func Test_MarkDesireToDetach_Positive_Set(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -554,15 +647,18 @@ func Test_MarkDesireToDetach_Positive_NotMarked(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with one volume/node entry.
// Calls MarkDesireToDetach() once on volume/node entry.
// Verifies mountedByNode is true and detachRequestedTime is NOT zero.
func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -583,15 +679,19 @@ func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, true /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, true /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with one volume/node entry.
// Calls MarkDesireToDetach() once on volume/node entry.
// Calls AddVolumeNode() to re-add the same volume/node entry.
// Verifies mountedByNode is true and detachRequestedTime is reset to zero.
func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
@@ -616,23 +716,31 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, false /* expectedSafeToDetach */, false /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
func Test_MarkDesireToDetach_Positive_MarkedVerifySafeToDetachPreserved(t *testing.T) {
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false.
// Calls MarkDesireToDetach() once on volume/node entry.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_MarkDesireToDetach_Positive_UnsetWithInitialSetVolumeMountedByNodePreserved(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
markSafeToDetachErr := asw.MarkVolumeNodeSafeToDetach(generatedVolumeName, nodeName)
if markSafeToDetachErr != nil {
t.Fatalf("MarkVolumeNodeSafeToDetach failed. Expected <no error> Actual: <%v>", markSafeToDetachErr)
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
// Act
@@ -649,34 +757,135 @@ func Test_MarkDesireToDetach_Positive_MarkedVerifySafeToDetachPreserved(t *testi
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, volumeName, nodeName, true /* expectedSafeToDetach */, true /* expectNonZeroDetachRequestedTime */)
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, false /* expectedMountedByNode */, true /* expectNonZeroDetachRequestedTime */)
}
func verifyAttachedVolume(
t *testing.T,
attachedVolumes []AttachedVolume,
expectedVolumeName,
expectedVolumeSpecName,
expectedVolumeName api.UniqueDeviceName,
expectedVolumeSpecName string,
expectedNodeName string,
expectedSafeToDetach,
expectedMountedByNode,
expectNonZeroDetachRequestedTime bool) {
for _, attachedVolume := range attachedVolumes {
if attachedVolume.VolumeName == expectedVolumeName &&
attachedVolume.VolumeSpec.Name() == expectedVolumeSpecName &&
attachedVolume.NodeName == expectedNodeName &&
attachedVolume.SafeToDetach == expectedSafeToDetach &&
attachedVolume.MountedByNode == expectedMountedByNode &&
attachedVolume.DetachRequestedTime.IsZero() == !expectNonZeroDetachRequestedTime {
return
}
}
t.Fatalf(
"attachedVolumes (%v) should contain the volume/node combo %q/%q with SafeToDetach=%v and NonZeroDetachRequestedTime=%v. It does not.",
"attachedVolumes (%v) should contain the volume/node combo %q/%q with MountedByNode=%v and NonZeroDetachRequestedTime=%v. It does not.",
attachedVolumes,
expectedVolumeName,
expectedNodeName,
expectedSafeToDetach,
expectedMountedByNode,
expectNonZeroDetachRequestedTime)
}
// t.Logf("attachedVolumes: %v", asw.GetAttachedVolumes()) // TEMP
func Test_GetAttachedVolumesForNode_Positive_NoVolumesOrNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
node := "random"
// Act
attachedVolumes := asw.GetAttachedVolumesForNode(node)
// Assert
if len(attachedVolumes) != 0 {
t.Fatalf("len(attachedVolumes) Expected: <0> Actual: <%v>", len(attachedVolumes))
}
}
func Test_GetAttachedVolumesForNode_Positive_OneVolumeOneNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName)
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
// Act
attachedVolumes := asw.GetAttachedVolumesForNode(nodeName)
// Assert
if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
func Test_GetAttachedVolumesForNode_Positive_TwoVolumeTwoNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volume1Name := api.UniqueDeviceName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
node1Name := "node1-name"
_, add1Err := asw.AddVolumeNode(volume1Spec, node1Name)
if add1Err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", add1Err)
}
volume2Name := api.UniqueDeviceName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
node2Name := "node2-name"
generatedVolumeName2, add2Err := asw.AddVolumeNode(volume2Spec, node2Name)
if add2Err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", add2Err)
}
// Act
attachedVolumes := asw.GetAttachedVolumesForNode(node2Name)
// Assert
if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName2, string(volume2Name), node2Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
func Test_GetAttachedVolumesForNode_Positive_OneVolumeTwoNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
node1Name := "node1-name"
generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name)
if add1Err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", add1Err)
}
node2Name := "node2-name"
generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeSpec, node2Name)
if add2Err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", add2Err)
}
if generatedVolumeName1 != generatedVolumeName2 {
t.Fatalf(
"Generated volume names for the same volume should be the same but they are not: %q and %q",
generatedVolumeName1,
generatedVolumeName2)
}
// Act
attachedVolumes := asw.GetAttachedVolumesForNode(node1Name)
// Assert
if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName1, string(volumeName), node1Name, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}

View File

@@ -25,7 +25,9 @@ import (
"fmt"
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/attachdetach"
)
// DesiredStateOfWorld defines a set of thread-safe operations supported on
@@ -50,7 +52,7 @@ type DesiredStateOfWorld interface {
// should be attached to the specified node, the volume is implicitly added.
// If no node with the name nodeName exists in list of nodes managed by the
// attach/detach attached controller, an error is returned.
AddPod(podName string, volumeSpec *volume.Spec, nodeName string) (string, error)
AddPod(podName string, volumeSpec *volume.Spec, nodeName string) (api.UniqueDeviceName, error)
// DeleteNode removes the given node from the list of nodes managed by the
// attach/detach controller.
@@ -68,7 +70,7 @@ type DesiredStateOfWorld interface {
// volumes under the specified node, this is a no-op.
// If after deleting the pod, the specified volume contains no other child
// pods, the volume is also deleted.
DeletePod(podName, volumeName, nodeName string)
DeletePod(podName string, volumeName api.UniqueDeviceName, nodeName string)
// NodeExists returns true if the node with the specified name exists in
// the list of nodes managed by the attach/detach controller.
@@ -77,7 +79,7 @@ type DesiredStateOfWorld interface {
// VolumeExists returns true if the volume with the specified name exists
// in the list of volumes that should be attached to the specified node by
// the attach detach controller.
VolumeExists(volumeName, nodeName string) bool
VolumeExists(volumeName api.UniqueDeviceName, nodeName string) bool
// GetVolumesToAttach generates and returns a list of volumes to attach
// and the nodes they should be attached to based on the current desired
@@ -89,7 +91,7 @@ type DesiredStateOfWorld interface {
type VolumeToAttach struct {
// VolumeName is the unique identifier for the volume that should be
// attached.
VolumeName string
VolumeName api.UniqueDeviceName
// VolumeSpec is a volume spec containing the specification for the volume
// that should be attached.
@@ -128,13 +130,13 @@ type nodeManaged struct {
// volumesToAttach is a map containing the set of volumes that should be
// attached to this node. The key in the map is the name of the volume and
// the value is a pod object containing more information about the volume.
volumesToAttach map[string]volumeToAttach
volumesToAttach map[api.UniqueDeviceName]volumeToAttach
}
// The volume object represents a volume that should be attached to a node.
type volumeToAttach struct {
// volumeName contains the unique identifier for this volume.
volumeName string
volumeName api.UniqueDeviceName
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to attach/detach
@@ -162,12 +164,15 @@ func (dsw *desiredStateOfWorld) AddNode(nodeName string) {
if _, nodeExists := dsw.nodesManaged[nodeName]; !nodeExists {
dsw.nodesManaged[nodeName] = nodeManaged{
nodeName: nodeName,
volumesToAttach: make(map[string]volumeToAttach),
volumesToAttach: make(map[api.UniqueDeviceName]volumeToAttach),
}
}
}
func (dsw *desiredStateOfWorld) AddPod(podName string, volumeSpec *volume.Spec, nodeName string) (string, error) {
func (dsw *desiredStateOfWorld) AddPod(
podName string,
volumeSpec *volume.Spec,
nodeName string) (api.UniqueDeviceName, error) {
dsw.Lock()
defer dsw.Unlock()
@@ -186,10 +191,11 @@ func (dsw *desiredStateOfWorld) AddPod(podName string, volumeSpec *volume.Spec,
err)
}
volumeName, err := attachableVolumePlugin.GetUniqueVolumeName(volumeSpec)
volumeName, err := attachdetach.GetUniqueDeviceNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeName from AttachablePlugin for volumeSpec %q err=%v",
"failed to GenerateUniqueDeviceName for volumeSpec %q err=%v",
volumeSpec.Name(),
err)
}
@@ -236,7 +242,10 @@ func (dsw *desiredStateOfWorld) DeleteNode(nodeName string) error {
return nil
}
func (dsw *desiredStateOfWorld) DeletePod(podName, volumeName, nodeName string) {
func (dsw *desiredStateOfWorld) DeletePod(
podName string,
volumeName api.UniqueDeviceName,
nodeName string) {
dsw.Lock()
defer dsw.Unlock()
@@ -272,7 +281,8 @@ func (dsw *desiredStateOfWorld) NodeExists(nodeName string) bool {
return nodeExists
}
func (dsw *desiredStateOfWorld) VolumeExists(volumeName, nodeName string) bool {
func (dsw *desiredStateOfWorld) VolumeExists(
volumeName api.UniqueDeviceName, nodeName string) bool {
dsw.RLock()
defer dsw.RUnlock()

View File

@@ -19,9 +19,12 @@ package cache
import (
"testing"
"k8s.io/kubernetes/pkg/api"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/testing"
)
// Calls AddNode() once.
// Verifies node exists, and zero volumes to attach.
func Test_AddNode_Positive_NewNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -43,27 +46,10 @@ func Test_AddNode_Positive_NewNode(t *testing.T) {
}
}
func Test_AddNode_Positive_ExistingVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
nodeName := "node-name"
dsw.AddNode(nodeName)
// Act
dsw.AddNode(nodeName)
// Assert
nodeExists := dsw.NodeExists(nodeName)
if !nodeExists {
t.Fatalf("Added node %q does not exist, it should.", nodeName)
}
volumesToAttach := dsw.GetVolumesToAttach()
if len(volumesToAttach) != 0 {
t.Fatalf("len(volumesToAttach) Expected: <0> Actual: <%v>", len(volumesToAttach))
}
}
// Calls AddNode() once.
// Verifies node exists.
// Calls AddNode() again with the same node.
// Verifies node exists, and zero volumes to attach.
func Test_AddNode_Positive_ExistingNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -94,13 +80,16 @@ func Test_AddNode_Positive_ExistingNode(t *testing.T) {
}
}
// Populates data struct with a single node no volume.
// Calls AddPod() with the same node and new pod/volume.
// Verifies node/volume exists, and 1 volumes to attach.
func Test_AddPod_Positive_NewPodNodeExistsVolumeDoesntExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
@@ -133,17 +122,22 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeDoesntExist(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, volumeName)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName))
}
// Populates data struct with a single node no volume.
// Calls AddPod() with the same node and new pod/volume.
// Verifies node/volume exists.
// Calls AddPod() with the same node and volume different pod.
// Verifies the same node/volume exists, and 1 volumes to attach.
func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1Name := "pod1-name"
pod2Name := "pod2-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
@@ -198,16 +192,21 @@ func Test_AddPod_Positive_NewPodNodeExistsVolumeExists(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, volumeName)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName))
}
// Populates data struct with a single node no volume.
// Calls AddPod() with the same node and new pod/volume.
// Verifies node/volume exists.
// Calls AddPod() with the same node, volume, and pod.
// Verifies the same node/volume exists, and 1 volumes to attach.
func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
@@ -262,16 +261,18 @@ func Test_AddPod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, volumeName)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName))
}
// Calls AddPod() with new pod/volume/node on empty data struct.
// Verifies call fails because node does not exist.
func Test_AddPod_Negative_NewPodNodeDoesntExistVolumeDoesntExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
volumeExists := dsw.VolumeExists(volumeName, nodeName)
if volumeExists {
@@ -303,6 +304,9 @@ func Test_AddPod_Negative_NewPodNodeDoesntExistVolumeDoesntExist(t *testing.T) {
}
}
// Populates data struct with a single node.
// Calls DeleteNode() to delete the node.
// Verifies node no longer exists, and zero volumes to attach.
func Test_DeleteNode_Positive_NodeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -329,6 +333,8 @@ func Test_DeleteNode_Positive_NodeExists(t *testing.T) {
}
}
// Calls DeleteNode() to delete node on empty data struct.
// Verifies no error is returned, and zero volumes to attach.
func Test_DeleteNode_Positive_NodeDoesntExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -354,6 +360,9 @@ func Test_DeleteNode_Positive_NodeDoesntExist(t *testing.T) {
}
}
// Populates data struct with new pod/volume/node.
// Calls DeleteNode() to delete the node.
// Verifies call fails because node still contains child volumes.
func Test_DeleteNode_Negative_NodeExistsHasChildVolumes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -361,8 +370,8 @@ func Test_DeleteNode_Negative_NodeExistsHasChildVolumes(t *testing.T) {
nodeName := "node-name"
dsw.AddNode(nodeName)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf(
@@ -376,7 +385,7 @@ func Test_DeleteNode_Negative_NodeExistsHasChildVolumes(t *testing.T) {
// Assert
if err == nil {
t.Fatalf("DeleteNode did not fail. Expected: <\"\"> Actual: <no error>")
t.Fatalf("DeleteNode did not fail. Expected: <\"failed to delete node...the node still contains volumes in its list of volumes to attach\"> Actual: <no error>")
}
nodeExists := dsw.NodeExists(nodeName)
@@ -389,16 +398,19 @@ func Test_DeleteNode_Negative_NodeExistsHasChildVolumes(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, volumeName)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName))
}
// Populates data struct with new pod/volume/node.
// Calls DeleteNode() to delete the pod/volume/node.
// Verifies volume no longer exists, and zero volumes to attach.
func Test_DeletePod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, nodeName)
@@ -436,14 +448,17 @@ func Test_DeletePod_Positive_PodExistsNodeExistsVolumeExists(t *testing.T) {
}
}
// Populates data struct with pod1/volume/node and pod2/volume/node.
// Calls DeleteNode() to delete the pod1/volume/node.
// Verifies volume still exists, and one volumes to attach.
func Test_DeletePod_Positive_2PodsExistNodeExistsVolumesExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1Name := "pod1-name"
pod2Name := "pod2-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolumeName1, pod1AddErr := dsw.AddPod(pod1Name, volumeSpec, nodeName)
@@ -491,17 +506,20 @@ func Test_DeletePod_Positive_2PodsExistNodeExistsVolumesExist(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName1, volumeName)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName1, string(volumeName))
}
// Populates data struct with pod1/volume/node.
// Calls DeleteNode() to delete the pod2/volume/node.
// Verifies volume still exists, and one volumes to attach.
func Test_DeletePod_Positive_PodDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1Name := "pod1-name"
pod2Name := "pod2-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolumeName, pod1AddErr := dsw.AddPod(pod1Name, volumeSpec, nodeName)
@@ -537,16 +555,19 @@ func Test_DeletePod_Positive_PodDoesNotExist(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, volumeName)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName))
}
// Populates data struct with pod/volume/node1.
// Calls DeleteNode() to delete the pod/volume/node2.
// Verifies volume still exists, and one volumes to attach.
func Test_DeletePod_Positive_NodeDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
node1Name := "node1-name"
dsw.AddNode(node1Name)
generatedVolumeName, podAddErr := dsw.AddPod(podName, volumeSpec, node1Name)
@@ -589,16 +610,19 @@ func Test_DeletePod_Positive_NodeDoesNotExist(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolumeName, volumeName)
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolumeName, string(volumeName))
}
// Populates data struct with pod/volume1/node.
// Calls DeleteNode() to delete the pod/volume2/node.
// Verifies volume still exists, and one volumes to attach.
func Test_DeletePod_Positive_VolumeDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
podName := "pod-name"
volume1Name := "volume1-name"
volume1Spec := controllervolumetesting.GetTestVolumeSpec(volume1Name, volume1Name)
volume1Name := api.UniqueDeviceName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
nodeName := "node-name"
dsw.AddNode(nodeName)
generatedVolume1Name, podAddErr := dsw.AddPod(podName, volume1Spec, nodeName)
@@ -616,7 +640,7 @@ func Test_DeletePod_Positive_VolumeDoesNotExist(t *testing.T) {
generatedVolume1Name,
nodeName)
}
volume2Name := "volume2-name"
volume2Name := api.UniqueDeviceName("volume2-name")
// Act
dsw.DeletePod(podName, volume2Name, nodeName)
@@ -641,9 +665,11 @@ func Test_DeletePod_Positive_VolumeDoesNotExist(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolume1Name, volume1Name)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolume1Name, string(volume1Name))
}
// Calls NodeExists() on random node.
// Verifies node does not exist, and no volumes to attach.
func Test_NodeExists_Positive_NodeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -664,6 +690,9 @@ func Test_NodeExists_Positive_NodeExists(t *testing.T) {
}
}
// Populates data struct with a single node.
// Calls NodeExists() on that node.
// Verifies node exists, and no volumes to attach.
func Test_NodeExists_Positive_NodeDoesntExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -685,6 +714,9 @@ func Test_NodeExists_Positive_NodeDoesntExist(t *testing.T) {
}
}
// Populates data struct with new pod/volume/node.
// Calls VolumeExists() on that volume/node.
// Verifies volume/node exists, and one volume to attach.
func Test_VolumeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -692,8 +724,8 @@ func Test_VolumeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
nodeName := "node-name"
dsw.AddNode(nodeName)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
generatedVolumeName, _ := dsw.AddPod(podName, volumeSpec, nodeName)
// Act
@@ -709,9 +741,12 @@ func Test_VolumeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, volumeName)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolumeName, string(volumeName))
}
// Populates data struct with new pod/volume1/node.
// Calls VolumeExists() on that volume2/node.
// Verifies volume2/node does not exist, and one volume to attach.
func Test_VolumeExists_Positive_VolumeDoesntExistNodeExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -719,8 +754,8 @@ func Test_VolumeExists_Positive_VolumeDoesntExistNodeExists(t *testing.T) {
nodeName := "node-name"
dsw.AddNode(nodeName)
podName := "pod-name"
volume1Name := "volume1-name"
volume1Spec := controllervolumetesting.GetTestVolumeSpec(volume1Name, volume1Name)
volume1Name := api.UniqueDeviceName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(podName, volume1Spec, nodeName)
if podAddErr != nil {
t.Fatalf(
@@ -728,7 +763,7 @@ func Test_VolumeExists_Positive_VolumeDoesntExistNodeExists(t *testing.T) {
podName,
podAddErr)
}
volume2Name := "volume2-name"
volume2Name := api.UniqueDeviceName("volume2-name")
// Act
volumeExists := dsw.VolumeExists(volume2Name, nodeName)
@@ -743,15 +778,17 @@ func Test_VolumeExists_Positive_VolumeDoesntExistNodeExists(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <1> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolume1Name, volume1Name)
verifyVolumeToAttach(t, volumesToAttach, nodeName, generatedVolume1Name, string(volume1Name))
}
// Calls VolumeExists() on some volume/node.
// Verifies volume/node do not exist, and zero volumes to attach.
func Test_VolumeExists_Positive_VolumeDoesntExistNodeDoesntExists(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
nodeName := "node-name"
volumeName := "volume-name"
volumeName := api.UniqueDeviceName("volume-name")
// Act
volumeExists := dsw.VolumeExists(volumeName, nodeName)
@@ -767,6 +804,8 @@ func Test_VolumeExists_Positive_VolumeDoesntExistNodeDoesntExists(t *testing.T)
}
}
// Calls GetVolumesToAttach()
// Verifies zero volumes to attach.
func Test_GetVolumesToAttach_Positive_NoNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -781,6 +820,9 @@ func Test_GetVolumesToAttach_Positive_NoNodes(t *testing.T) {
}
}
// Populates data struct with two nodes.
// Calls GetVolumesToAttach()
// Verifies zero volumes to attach.
func Test_GetVolumesToAttach_Positive_TwoNodes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -799,14 +841,17 @@ func Test_GetVolumesToAttach_Positive_TwoNodes(t *testing.T) {
}
}
// Populates data struct with two nodes with one volume/pod each.
// Calls GetVolumesToAttach()
// Verifies two volumes to attach.
func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEach(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
node1Name := "node1-name"
pod1Name := "pod1-name"
volume1Name := "volume1-name"
volume1Spec := controllervolumetesting.GetTestVolumeSpec(volume1Name, volume1Name)
volume1Name := api.UniqueDeviceName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name)
if podAddErr != nil {
@@ -817,8 +862,8 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEach(t *testing.T) {
}
node2Name := "node2-name"
pod2Name := "pod2-name"
volume2Name := "volume2-name"
volume2Spec := controllervolumetesting.GetTestVolumeSpec(volume2Name, volume2Name)
volume2Name := api.UniqueDeviceName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name)
generatedVolume2Name, podAddErr := dsw.AddPod(pod2Name, volume2Spec, node2Name)
if podAddErr != nil {
@@ -836,18 +881,22 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEach(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <2> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume1Name, volume1Name)
verifyVolumeToAttach(t, volumesToAttach, node2Name, generatedVolume2Name, volume2Name)
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume1Name, string(volume1Name))
verifyVolumeToAttach(t, volumesToAttach, node2Name, generatedVolume2Name, string(volume2Name))
}
// Populates data struct with two nodes with one volume/pod each and an extra
// pod for the second node/volume pair.
// Calls GetVolumesToAttach()
// Verifies two volumes to attach.
func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEachExtraPod(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
node1Name := "node1-name"
pod1Name := "pod1-name"
volume1Name := "volume1-name"
volume1Spec := controllervolumetesting.GetTestVolumeSpec(volume1Name, volume1Name)
volume1Name := api.UniqueDeviceName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name)
if podAddErr != nil {
@@ -858,8 +907,8 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEachExtraPod(t *testing.T
}
node2Name := "node2-name"
pod2Name := "pod2-name"
volume2Name := "volume2-name"
volume2Spec := controllervolumetesting.GetTestVolumeSpec(volume2Name, volume2Name)
volume2Name := api.UniqueDeviceName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name)
generatedVolume2Name, podAddErr := dsw.AddPod(pod2Name, volume2Spec, node2Name)
if podAddErr != nil {
@@ -886,18 +935,22 @@ func Test_GetVolumesToAttach_Positive_TwoNodesOneVolumeEachExtraPod(t *testing.T
t.Fatalf("len(volumesToAttach) Expected: <2> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume1Name, volume1Name)
verifyVolumeToAttach(t, volumesToAttach, node2Name, generatedVolume2Name, volume2Name)
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume1Name, string(volume1Name))
verifyVolumeToAttach(t, volumesToAttach, node2Name, generatedVolume2Name, string(volume2Name))
}
// Populates data struct with two nodes with one volume/pod on one node and two
// volume/pod pairs on the other node.
// Calls GetVolumesToAttach()
// Verifies three volumes to attach.
func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) {
// Arrange
volumePluginMgr, _ := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := NewDesiredStateOfWorld(volumePluginMgr)
node1Name := "node1-name"
pod1Name := "pod1-name"
volume1Name := "volume1-name"
volume1Spec := controllervolumetesting.GetTestVolumeSpec(volume1Name, volume1Name)
volume1Name := api.UniqueDeviceName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name)
generatedVolume1Name, podAddErr := dsw.AddPod(pod1Name, volume1Spec, node1Name)
if podAddErr != nil {
@@ -908,8 +961,8 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) {
}
node2Name := "node2-name"
pod2aName := "pod2a-name"
volume2Name := "volume2-name"
volume2Spec := controllervolumetesting.GetTestVolumeSpec(volume2Name, volume2Name)
volume2Name := api.UniqueDeviceName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name)
generatedVolume2Name1, podAddErr := dsw.AddPod(pod2aName, volume2Spec, node2Name)
if podAddErr != nil {
@@ -933,8 +986,8 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) {
generatedVolume2Name2)
}
pod3Name := "pod3-name"
volume3Name := "volume3-name"
volume3Spec := controllervolumetesting.GetTestVolumeSpec(volume3Name, volume3Name)
volume3Name := api.UniqueDeviceName("volume3-name")
volume3Spec := controllervolumetesting.GetTestVolumeSpec(string(volume3Name), volume3Name)
generatedVolume3Name, podAddErr := dsw.AddPod(pod3Name, volume3Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
@@ -951,16 +1004,16 @@ func Test_GetVolumesToAttach_Positive_TwoNodesThreeVolumes(t *testing.T) {
t.Fatalf("len(volumesToAttach) Expected: <3> Actual: <%v>", len(volumesToAttach))
}
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume1Name, volume1Name)
verifyVolumeToAttach(t, volumesToAttach, node2Name, generatedVolume2Name1, volume2Name)
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume3Name, volume3Name)
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume1Name, string(volume1Name))
verifyVolumeToAttach(t, volumesToAttach, node2Name, generatedVolume2Name1, string(volume2Name))
verifyVolumeToAttach(t, volumesToAttach, node1Name, generatedVolume3Name, string(volume3Name))
}
func verifyVolumeToAttach(
t *testing.T,
volumesToAttach []VolumeToAttach,
expectedNodeName,
expectedVolumeName,
expectedNodeName string,
expectedVolumeName api.UniqueDeviceName,
expectedVolumeSpecName string) {
for _, volumeToAttach := range volumesToAttach {
if volumeToAttach.NodeName == expectedNodeName &&

View File

@@ -31,7 +31,7 @@ import (
// Reconciler runs a periodic loop to reconcile the desired state of the with
// the actual state of the world by triggering attach detach operations.
type Reconciler interface {
// Starts running the reconcilation loop which executes periodically, checks
// Starts running the reconciliation loop which executes periodically, checks
// if volumes that should be attached are attached and volumes that should
// be detached are detached. If not, it will trigger attach/detach
// operations to rectify.
@@ -40,33 +40,34 @@ type Reconciler interface {
// NewReconciler returns a new instance of Reconciler that waits loopPeriod
// between successive executions.
// loopPeriod is the ammount of time the reconciler loop waits between
// loopPeriod is the amount of time the reconciler loop waits between
// successive executions.
// maxSafeToDetachDuration is the max ammount of time the reconciler will wait
// for the volume to deatch, after this it will detach the volume anyway
// assuming the node is unavilable. If during this time the volume becomes used
// by a new pod, the detach request will be aborted and the timer cleared.
// maxWaitForUnmountDuration is the max amount of time the reconciler will wait
// for the volume to be safely unmounted, after this it will detach the volume
// anyway (to handle crashed/unavailable nodes). If during this time the volume
// becomes used by a new pod, the detach request will be aborted and the timer
// cleared.
func NewReconciler(
loopPeriod time.Duration,
maxSafeToDetachDuration time.Duration,
maxWaitForUnmountDuration time.Duration,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
attacherDetacher attacherdetacher.AttacherDetacher) Reconciler {
return &reconciler{
loopPeriod: loopPeriod,
maxSafeToDetachDuration: maxSafeToDetachDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
attacherDetacher: attacherDetacher,
loopPeriod: loopPeriod,
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
attacherDetacher: attacherDetacher,
}
}
type reconciler struct {
loopPeriod time.Duration
maxSafeToDetachDuration time.Duration
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
attacherDetacher attacherdetacher.AttacherDetacher
loopPeriod time.Duration
maxWaitForUnmountDuration time.Duration
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
attacherDetacher attacherdetacher.AttacherDetacher
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
@@ -75,11 +76,42 @@ func (rc *reconciler) Run(stopCh <-chan struct{}) {
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Volume exists in actual state of world but not desired
if !attachedVolume.MountedByNode {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q to node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started DetachVolume for volume %q to node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
}
} else {
// If volume is not safe to detach (is mounted) wait a max amount of time before detaching any way.
timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err)
}
if timeElapsed > rc.maxWaitForUnmountDuration {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q to node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started DetachVolume for volume %q to node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
}
}
}
}
}
// Ensure volumes that should be attached are attached.
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset "safe to detach"
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(12).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
_, err := rc.actualStateOfWorld.AddVolumeNode(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
@@ -88,29 +120,10 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
}
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(5).Infof("Triggering AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.attacherDetacher.AttachVolume(&volumeToAttach, rc.actualStateOfWorld)
}
}
// Ensure volumes that should be detached are detached.
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Volume exists in actual state of world but not desired
if attachedVolume.SafeToDetach {
glog.V(5).Infof("Triggering DetachVolume for volume %q to node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
rc.attacherDetacher.DetachVolume(&attachedVolume, rc.actualStateOfWorld)
} else {
// If volume is not safe to detach wait a max amount of time before detaching any way.
timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err)
}
if timeElapsed > rc.maxSafeToDetachDuration {
glog.V(5).Infof("Triggering DetachVolume for volume %q to node %q. Volume is not safe to detach, but max wait time expired.", attachedVolume.VolumeName, attachedVolume.NodeName)
rc.attacherDetacher.DetachVolume(&attachedVolume, rc.actualStateOfWorld)
}
glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
err := rc.attacherDetacher.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
}
}

View File

@@ -20,6 +20,7 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller/volume/attacherdetacher"
"k8s.io/kubernetes/pkg/controller/volume/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/testing"
@@ -28,10 +29,12 @@ import (
)
const (
reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
maxSafeToDetachDuration time.Duration = 50 * time.Millisecond
reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
)
// Calls Run()
// Verifies there are no calls to attach or detach.
func Test_Run_Positive_DoNothing(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -39,7 +42,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
asw := cache.NewActualStateOfWorld(volumePluginMgr)
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxSafeToDetachDuration, dsw, asw, ad)
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
// Act
go reconciler.Run(wait.NeverStop)
@@ -52,6 +55,9 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
}
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Calls Run()
// Verifies there is one attach call and no detach calls.
func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
@@ -59,10 +65,10 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
asw := cache.NewActualStateOfWorld(volumePluginMgr)
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxSafeToDetachDuration, dsw, asw, ad)
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
@@ -87,17 +93,23 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
}
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMarkVolume(t *testing.T) {
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Marks the node/volume as unmounted.
// Deletes the node/volume/pod tuple from desiredStateOfWorld cache.
// Verifies there is one detach call and no (new) attach calls.
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxSafeToDetachDuration, dsw, asw, ad)
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
@@ -133,9 +145,10 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMarkVolume(t *testing
generatedVolumeName,
nodeName)
}
asw.MarkVolumeNodeSafeToDetach(generatedVolumeName, nodeName)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
// Assert -- Marked SafeToDetach
// Assert
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
@@ -143,17 +156,22 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMarkVolume(t *testing
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
}
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithoutMarkVolume(t *testing.T) {
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Deletes the node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
// Verifies there is one detach call and no (new) attach calls.
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxSafeToDetachDuration, dsw, asw, ad)
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
podName := "pod-name"
volumeName := "volume-name"
volumeSpec := controllervolumetesting.GetTestVolumeSpec(volumeName, volumeName)
volumeName := api.UniqueDeviceName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)

View File

@@ -22,13 +22,13 @@ import (
)
// GetTestVolumeSpec returns a test volume spec
func GetTestVolumeSpec(volumeName, diskName string) *volume.Spec {
func GetTestVolumeSpec(volumeName string, diskName api.UniqueDeviceName) *volume.Spec {
return &volume.Spec{
Volume: &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: diskName,
PDName: string(diskName),
FSType: "fake",
ReadOnly: false,
},