Revert "Remove references to openstack and cinder"

This reverts commit 9bbf01bae9.
This commit is contained in:
Jan Safranek
2022-11-10 18:22:07 +01:00
parent 77b027936a
commit 5b284a50b7
216 changed files with 25733 additions and 22 deletions

View File

@@ -4097,6 +4097,20 @@ func TestValidateVolumes(t *testing.T) {
field: "rbd.image",
}},
},
// Cinder
{
name: "valid Cinder",
vol: core.Volume{
Name: "cinder",
VolumeSource: core.VolumeSource{
Cinder: &core.CinderVolumeSource{
VolumeID: "29ea5088-4f60-4757-962e-dba678767887",
FSType: "ext4",
ReadOnly: false,
},
},
},
},
// CephFS
{
name: "valid CephFS",

View File

@@ -24,5 +24,6 @@ import (
_ "k8s.io/legacy-cloud-providers/aws"
_ "k8s.io/legacy-cloud-providers/azure"
_ "k8s.io/legacy-cloud-providers/gce"
_ "k8s.io/legacy-cloud-providers/openstack"
_ "k8s.io/legacy-cloud-providers/vsphere"
)

View File

@@ -148,6 +148,13 @@ const (
// Enables the GCE PD in-tree driver to GCE CSI Driver migration feature.
CSIMigrationGCE featuregate.Feature = "CSIMigrationGCE"
// owner: @adisky
// alpha: v1.14
// beta: v1.18
//
// Enables the OpenStack Cinder in-tree driver to OpenStack Cinder CSI Driver migration feature.
CSIMigrationOpenStack featuregate.Feature = "CSIMigrationOpenStack"
// owner: @trierra
// alpha: v1.23
//
@@ -408,6 +415,12 @@ const (
// Disables the GCE PD in-tree driver.
InTreePluginGCEUnregister featuregate.Feature = "InTreePluginGCEUnregister"
// owner: @adisky
// alpha: v1.21
//
// Disables the OpenStack Cinder in-tree driver.
InTreePluginOpenStackUnregister featuregate.Feature = "InTreePluginOpenStackUnregister"
// owner: @trierra
// alpha: v1.23
//
@@ -921,6 +934,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
CSIMigrationGCE: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.27
CSIMigrationOpenStack: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
CSIMigrationPortworx: {Default: false, PreRelease: featuregate.Beta}, // Off by default (requires Portworx CSI driver)
CSIMigrationRBD: {Default: false, PreRelease: featuregate.Alpha}, // Off by default (requires RBD CSI driver)
@@ -993,6 +1008,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
InTreePluginGCEUnregister: {Default: false, PreRelease: featuregate.Alpha},
InTreePluginOpenStackUnregister: {Default: false, PreRelease: featuregate.Alpha},
InTreePluginPortworxUnregister: {Default: false, PreRelease: featuregate.Alpha},
InTreePluginRBDUnregister: {Default: false, PreRelease: featuregate.Alpha},

View File

@@ -30,6 +30,7 @@ const (
NodeUnschedulable = "NodeUnschedulable"
NodeVolumeLimits = "NodeVolumeLimits"
AzureDiskLimits = "AzureDiskLimits"
CinderLimits = "CinderLimits"
EBSLimits = "EBSLimits"
GCEPDLimits = "GCEPDLimits"
PodTopologySpread = "PodTopologySpread"

View File

@@ -58,6 +58,8 @@ func getVolumeLimitKey(filterType string) v1.ResourceName {
return v1.ResourceName(volumeutil.GCEVolumeLimitKey)
case azureDiskVolumeFilterType:
return v1.ResourceName(volumeutil.AzureVolumeLimitKey)
case cinderVolumeFilterType:
return v1.ResourceName(volumeutil.CinderVolumeLimitKey)
default:
return v1.ResourceName(volumeutil.GetCSIAttachLimitKey(filterType))
}

View File

@@ -56,6 +56,8 @@ const (
gcePDVolumeFilterType = "GCE"
// azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.
azureDiskVolumeFilterType = "AzureDisk"
// cinderVolumeFilterType defines the filter name for cinderVolumeFilter.
cinderVolumeFilterType = "Cinder"
// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.
ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
@@ -73,6 +75,15 @@ func NewAzureDisk(_ runtime.Object, handle framework.Handle, fts feature.Feature
return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory, fts), nil
}
// CinderName is the name of the plugin used in the plugin registry and configurations.
const CinderName = names.CinderLimits
// NewCinder returns function that initializes a new plugin and returns it.
func NewCinder(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory, fts), nil
}
// EBSName is the name of the plugin used in the plugin registry and configurations.
const EBSName = names.EBSLimits
@@ -160,6 +171,10 @@ func newNonCSILimits(
name = AzureDiskName
filter = azureDiskVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
case cinderVolumeFilterType:
name = CinderName
filter = cinderVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.CinderVolumeLimitKey)
default:
klog.ErrorS(errors.New("wrong filterName"), "Cannot create nonCSILimits plugin")
return nil
@@ -460,6 +475,32 @@ var azureDiskVolumeFilter = VolumeFilter{
},
}
// cinderVolumeFilter is a VolumeFilter for filtering cinder Volumes.
// It will be deprecated once Openstack cloudprovider has been removed from in-tree.
var cinderVolumeFilter = VolumeFilter{
FilterVolume: func(vol *v1.Volume) (string, bool) {
if vol.Cinder != nil {
return vol.Cinder.VolumeID, true
}
return "", false
},
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
if pv.Spec.Cinder != nil {
return pv.Spec.Cinder.VolumeID, true
}
return "", false
},
MatchProvisioner: func(sc *storage.StorageClass) bool {
return sc.Provisioner == csilibplugins.CinderInTreePluginName
},
IsMigrated: func(csiNode *storage.CSINode) bool {
return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
},
}
func getMaxVolumeFunc(filterName string) func(node *v1.Node) int {
return func(node *v1.Node) int {
maxVolumesFromEnv := getMaxVolLimitFromEnv()
@@ -481,6 +522,8 @@ func getMaxVolumeFunc(filterName string) func(node *v1.Node) int {
return defaultMaxGCEPDVolumes
case azureDiskVolumeFilterType:
return defaultMaxAzureDiskVolumes
case cinderVolumeFilterType:
return volumeutil.DefaultMaxCinderVolumes
default:
return -1
}

View File

@@ -55,6 +55,8 @@ func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) {
return false
}
case csilibplugins.CinderInTreePluginName:
return true
case csilibplugins.RBDVolumePluginName:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationRBD) {
return false

View File

@@ -72,6 +72,7 @@ func NewInTreeRegistry() runtime.Registry {
nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
interpodaffinity.Name: interpodaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,

View File

@@ -1010,6 +1010,8 @@ func isCSIMigrationOnForPlugin(pluginName string) bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
case csiplugins.AzureDiskInTreePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk)
case csiplugins.CinderInTreePluginName:
return true
case csiplugins.PortworxVolumePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx)
case csiplugins.RBDVolumePluginName:

14
pkg/volume/cinder/OWNERS Normal file
View File

@@ -0,0 +1,14 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- jsafrane
- anguslees
- dims
reviewers:
- anguslees
- saad-ali
- jsafrane
- jingxu97
- msau42
emeritus_approvers:
- FengyunPan2

View File

@@ -0,0 +1,434 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"context"
"fmt"
"os"
"path"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
type cinderDiskAttacher struct {
host volume.VolumeHost
cinderProvider BlockStorageProvider
}
var _ volume.Attacher = &cinderDiskAttacher{}
var _ volume.DeviceMounter = &cinderDiskAttacher{}
var _ volume.AttachableVolumePlugin = &cinderPlugin{}
var _ volume.DeviceMountableVolumePlugin = &cinderPlugin{}
const (
probeVolumeInitDelay = 1 * time.Second
probeVolumeFactor = 2.0
operationFinishInitDelay = 1 * time.Second
operationFinishFactor = 1.1
operationFinishSteps = 10
diskAttachInitDelay = 1 * time.Second
diskAttachFactor = 1.2
diskAttachSteps = 15
diskDetachInitDelay = 1 * time.Second
diskDetachFactor = 1.2
diskDetachSteps = 13
)
func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) {
cinder, err := plugin.getCloudProvider()
if err != nil {
return nil, err
}
return &cinderDiskAttacher{
host: plugin.host,
cinderProvider: cinder,
}, nil
}
func (plugin *cinderPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
return plugin.NewAttacher()
}
func (plugin *cinderPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
return mounter.GetMountRefs(deviceMountPath)
}
func (attacher *cinderDiskAttacher) waitOperationFinished(volumeID string) error {
backoff := wait.Backoff{
Duration: operationFinishInitDelay,
Factor: operationFinishFactor,
Steps: operationFinishSteps,
}
var volumeStatus string
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
var pending bool
var err error
pending, volumeStatus, err = attacher.cinderProvider.OperationPending(volumeID)
if err != nil {
return false, err
}
return !pending, nil
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("volume %q is %s, can't finish within the alloted time", volumeID, volumeStatus)
}
return err
}
func (attacher *cinderDiskAttacher) waitDiskAttached(instanceID, volumeID string) error {
backoff := wait.Backoff{
Duration: diskAttachInitDelay,
Factor: diskAttachFactor,
Steps: diskAttachSteps,
}
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
attached, err := attacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
if err != nil {
return false, err
}
return attached, nil
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("volume %q failed to be attached within the alloted time", volumeID)
}
return err
}
func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
volumeID, _, _, err := getVolumeInfo(spec)
if err != nil {
return "", err
}
instanceID, err := attacher.nodeInstanceID(nodeName)
if err != nil {
return "", err
}
if err := attacher.waitOperationFinished(volumeID); err != nil {
return "", err
}
attached, err := attacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
if err != nil {
// Log error and continue with attach
klog.Warningf(
"Error checking if volume (%q) is already attached to current instance (%q). Will continue and try attach anyway. err=%v",
volumeID, instanceID, err)
}
if err == nil && attached {
// Volume is already attached to instance.
klog.Infof("Attach operation is successful. volume %q is already attached to instance %q.", volumeID, instanceID)
} else {
_, err = attacher.cinderProvider.AttachDisk(instanceID, volumeID)
if err == nil {
if err = attacher.waitDiskAttached(instanceID, volumeID); err != nil {
klog.Errorf("Error waiting for volume %q to be attached from node %q: %v", volumeID, nodeName, err)
return "", err
}
klog.Infof("Attach operation successful: volume %q attached to instance %q.", volumeID, instanceID)
} else {
klog.Infof("Attach volume %q to instance %q failed with: %v", volumeID, instanceID, err)
return "", err
}
}
devicePath, err := attacher.cinderProvider.GetAttachmentDiskPath(instanceID, volumeID)
if err != nil {
klog.Infof("Can not get device path of volume %q which be attached to instance %q, failed with: %v", volumeID, instanceID, err)
return "", err
}
return devicePath, nil
}
func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumeIDList := []string{}
for _, spec := range specs {
volumeID, _, _, err := getVolumeInfo(spec)
if err != nil {
klog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumeIDList = append(volumeIDList, volumeID)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeID] = spec
}
attachedResult, err := attacher.cinderProvider.DisksAreAttachedByName(nodeName, volumeIDList)
if err != nil {
// Log error and continue with attach
klog.Errorf(
"Error checking if Volumes (%v) are already attached to current node (%q). Will continue and try attach anyway. err=%v",
volumeIDList, nodeName, err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
klog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, _ *v1.Pod, timeout time.Duration) (string, error) {
// NOTE: devicePath is path as reported by Cinder, which may be incorrect and should not be used. See Issue #33128
volumeID, _, _, err := getVolumeInfo(spec)
if err != nil {
return "", err
}
if devicePath == "" {
return "", fmt.Errorf("WaitForAttach failed for Cinder disk %q: devicePath is empty", volumeID)
}
ticker := time.NewTicker(probeVolumeInitDelay)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
duration := probeVolumeInitDelay
for {
select {
case <-ticker.C:
klog.V(5).Infof("Checking Cinder disk %q is attached.", volumeID)
probeAttachedVolume()
if !attacher.cinderProvider.ShouldTrustDevicePath() {
// Using the Cinder volume ID, find the real device path (See Issue #33128)
devicePath = attacher.cinderProvider.GetDevicePath(volumeID)
}
exists, err := mount.PathExists(devicePath)
if exists && err == nil {
klog.Infof("Successfully found attached Cinder disk %q at %v.", volumeID, devicePath)
return devicePath, nil
}
// Log an error, and continue checking periodically
klog.Errorf("Error: could not find attached Cinder disk %q (path: %q): %v", volumeID, devicePath, err)
// Using exponential backoff instead of linear
ticker.Stop()
duration = time.Duration(float64(duration) * probeVolumeFactor)
ticker = time.NewTicker(duration)
case <-timer.C:
return "", fmt.Errorf("could not find attached Cinder disk %q. Timeout waiting for mount paths to be created", volumeID)
}
}
}
func (attacher *cinderDiskAttacher) GetDeviceMountPath(
spec *volume.Spec) (string, error) {
volumeID, _, _, err := getVolumeInfo(spec)
if err != nil {
return "", err
}
return makeGlobalPDName(attacher.host, volumeID), nil
}
// FIXME: this method can be further pruned.
func (attacher *cinderDiskAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, _ volume.DeviceMounterArgs) error {
mounter := attacher.host.GetMounter(cinderVolumePluginName)
notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
_, volumeFSType, readOnly, err := getVolumeInfo(spec)
if err != nil {
return err
}
options := []string{}
if readOnly {
options = append(options, "ro")
}
if notMnt {
diskMounter := volumeutil.NewSafeFormatAndMountFromHost(cinderVolumePluginName, attacher.host)
mountOptions := volumeutil.MountOptionFromSpec(spec, options...)
err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeFSType, mountOptions)
if err != nil {
os.Remove(deviceMountPath)
return err
}
}
return nil
}
type cinderDiskDetacher struct {
mounter mount.Interface
cinderProvider BlockStorageProvider
}
var _ volume.Detacher = &cinderDiskDetacher{}
var _ volume.DeviceUnmounter = &cinderDiskDetacher{}
func (plugin *cinderPlugin) NewDetacher() (volume.Detacher, error) {
cinder, err := plugin.getCloudProvider()
if err != nil {
return nil, err
}
return &cinderDiskDetacher{
mounter: plugin.host.GetMounter(plugin.GetPluginName()),
cinderProvider: cinder,
}, nil
}
func (plugin *cinderPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
return plugin.NewDetacher()
}
func (detacher *cinderDiskDetacher) waitOperationFinished(volumeID string) error {
backoff := wait.Backoff{
Duration: operationFinishInitDelay,
Factor: operationFinishFactor,
Steps: operationFinishSteps,
}
var volumeStatus string
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
var pending bool
var err error
pending, volumeStatus, err = detacher.cinderProvider.OperationPending(volumeID)
if err != nil {
return false, err
}
return !pending, nil
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("volume %q is %s, can't finish within the alloted time", volumeID, volumeStatus)
}
return err
}
func (detacher *cinderDiskDetacher) waitDiskDetached(instanceID, volumeID string) error {
backoff := wait.Backoff{
Duration: diskDetachInitDelay,
Factor: diskDetachFactor,
Steps: diskDetachSteps,
}
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
attached, err := detacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
if err != nil {
return false, err
}
return !attached, nil
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("volume %q failed to detach within the alloted time", volumeID)
}
return err
}
func (detacher *cinderDiskDetacher) Detach(volumeName string, nodeName types.NodeName) error {
volumeID := path.Base(volumeName)
if err := detacher.waitOperationFinished(volumeID); err != nil {
return err
}
attached, instanceID, err := detacher.cinderProvider.DiskIsAttachedByName(nodeName, volumeID)
if err != nil {
// Log error and continue with detach
klog.Errorf(
"Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
volumeID, nodeName, err)
}
if err == nil && !attached {
// Volume is already detached from node.
klog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, nodeName)
return nil
}
if err = detacher.cinderProvider.DetachDisk(instanceID, volumeID); err != nil {
klog.Errorf("Error detaching volume %q from node %q: %v", volumeID, nodeName, err)
return err
}
if err = detacher.waitDiskDetached(instanceID, volumeID); err != nil {
klog.Errorf("Error waiting for volume %q to detach from node %q: %v", volumeID, nodeName, err)
return err
}
klog.Infof("detached volume %q from node %q", volumeID, nodeName)
return nil
}
func (detacher *cinderDiskDetacher) UnmountDevice(deviceMountPath string) error {
return mount.CleanupMountPoint(deviceMountPath, detacher.mounter, false)
}
func (plugin *cinderPlugin) CanAttach(spec *volume.Spec) (bool, error) {
return true, nil
}
func (plugin *cinderPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
return true, nil
}
func (attacher *cinderDiskAttacher) nodeInstanceID(nodeName types.NodeName) (string, error) {
instances, res := attacher.cinderProvider.Instances()
if !res {
return "", fmt.Errorf("failed to list openstack instances")
}
instanceID, err := instances.InstanceID(context.TODO(), nodeName)
if err != nil {
return "", err
}
if ind := strings.LastIndex(instanceID, "/"); ind >= 0 {
instanceID = instanceID[(ind + 1):]
}
return instanceID, nil
}

View File

@@ -0,0 +1,758 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"context"
"errors"
"os"
"path/filepath"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"fmt"
"sort"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
const (
VolumeStatusPending = "pending"
VolumeStatusDone = "done"
)
var attachStatus = "Attach"
var detachStatus = "Detach"
func TestGetDeviceName_Volume(t *testing.T) {
plugin := newPlugin(t)
name := "my-cinder-volume"
spec := createVolSpec(name, false)
deviceName, err := plugin.GetVolumeName(spec)
if err != nil {
t.Errorf("GetDeviceName error: %v", err)
}
if deviceName != name {
t.Errorf("GetDeviceName error: expected %s, got %s", name, deviceName)
}
}
func TestGetDeviceName_PersistentVolume(t *testing.T) {
plugin := newPlugin(t)
name := "my-cinder-pv"
spec := createPVSpec(name, true)
deviceName, err := plugin.GetVolumeName(spec)
if err != nil {
t.Errorf("GetDeviceName error: %v", err)
}
if deviceName != name {
t.Errorf("GetDeviceName error: expected %s, got %s", name, deviceName)
}
}
func TestGetDeviceMountPath(t *testing.T) {
name := "cinder-volume-id"
spec := createVolSpec(name, false)
rootDir := "/var/lib/kubelet/"
host := volumetest.NewFakeVolumeHost(t, rootDir, nil, nil)
attacher := &cinderDiskAttacher{
host: host,
}
//test the path
path, err := attacher.GetDeviceMountPath(spec)
if err != nil {
t.Errorf("Get device mount path error")
}
expectedPath := filepath.Join(rootDir, "plugins/kubernetes.io/cinder/mounts", name)
if path != expectedPath {
t.Errorf("Device mount path error: expected %s, got %s ", expectedPath, path)
}
}
// One testcase for TestAttachDetach table test below
type testcase struct {
name string
// For fake GCE:
attach attachCall
detach detachCall
operationPending operationPendingCall
diskIsAttached diskIsAttachedCall
disksAreAttached disksAreAttachedCall
diskPath diskPathCall
t *testing.T
attachOrDetach *string
instanceID string
// Actual test to run
test func(test *testcase) (string, error)
// Expected return of the test
expectedResult string
expectedError error
}
func TestAttachDetach(t *testing.T) {
volumeID := "disk"
instanceID := "instance"
pending := VolumeStatusPending
done := VolumeStatusDone
nodeName := types.NodeName("nodeName")
readOnly := false
spec := createVolSpec(volumeID, readOnly)
attachError := errors.New("fake attach error")
detachError := errors.New("fake detach error")
diskCheckError := errors.New("fake DiskIsAttached error")
diskPathError := errors.New("fake GetAttachmentDiskPath error")
disksCheckError := errors.New("fake DisksAreAttached error")
operationFinishTimeout := errors.New("fake waitOperationFinished error")
tests := []testcase{
// Successful Attach call
{
name: "Attach_Positive",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, false, nil},
attach: attachCall{instanceID, volumeID, "", nil},
diskPath: diskPathCall{instanceID, volumeID, "/dev/sda", nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
},
expectedResult: "/dev/sda",
},
// Disk is already attached
{
name: "Attach_Positive_AlreadyAttached",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, true, nil},
diskPath: diskPathCall{instanceID, volumeID, "/dev/sda", nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
},
expectedResult: "/dev/sda",
},
// Disk is attaching
{
name: "Attach_is_attaching",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, true, pending, operationFinishTimeout},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
},
expectedError: operationFinishTimeout,
},
// Attach call fails
{
name: "Attach_Negative",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, false, diskCheckError},
attach: attachCall{instanceID, volumeID, "/dev/sda", attachError},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
},
expectedError: attachError,
},
// GetAttachmentDiskPath call fails
{
name: "Attach_Negative_DiskPatchFails",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, false, nil},
attach: attachCall{instanceID, volumeID, "", nil},
diskPath: diskPathCall{instanceID, volumeID, "", diskPathError},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
},
expectedError: diskPathError,
},
// Successful VolumesAreAttached call, attached
{
name: "VolumesAreAttached_Positive",
instanceID: instanceID,
disksAreAttached: disksAreAttachedCall{instanceID, nodeName, []string{volumeID}, map[string]bool{volumeID: true}, nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
attachments, err := attacher.VolumesAreAttached([]*volume.Spec{spec}, nodeName)
return serializeAttachments(attachments), err
},
expectedResult: serializeAttachments(map[*volume.Spec]bool{spec: true}),
},
// Successful VolumesAreAttached call, not attached
{
name: "VolumesAreAttached_Negative",
instanceID: instanceID,
disksAreAttached: disksAreAttachedCall{instanceID, nodeName, []string{volumeID}, map[string]bool{volumeID: false}, nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
attachments, err := attacher.VolumesAreAttached([]*volume.Spec{spec}, nodeName)
return serializeAttachments(attachments), err
},
expectedResult: serializeAttachments(map[*volume.Spec]bool{spec: false}),
},
// Treat as attached when DisksAreAttached call fails
{
name: "VolumesAreAttached_CinderFailed",
instanceID: instanceID,
disksAreAttached: disksAreAttachedCall{instanceID, nodeName, []string{volumeID}, nil, disksCheckError},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
attachments, err := attacher.VolumesAreAttached([]*volume.Spec{spec}, nodeName)
return serializeAttachments(attachments), err
},
expectedResult: serializeAttachments(map[*volume.Spec]bool{spec: true}),
expectedError: disksCheckError,
},
// Detach succeeds
{
name: "Detach_Positive",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, true, nil},
detach: detachCall{instanceID, volumeID, nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(volumeID, nodeName)
},
},
// Disk is already detached
{
name: "Detach_Positive_AlreadyDetached",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, false, nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(volumeID, nodeName)
},
},
// Detach succeeds when DiskIsAttached fails
{
name: "Detach_Positive_CheckFails",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, false, diskCheckError},
detach: detachCall{instanceID, volumeID, nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(volumeID, nodeName)
},
},
// Detach fails
{
name: "Detach_Negative",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, false, done, nil},
diskIsAttached: diskIsAttachedCall{instanceID, nodeName, volumeID, false, diskCheckError},
detach: detachCall{instanceID, volumeID, detachError},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(volumeID, nodeName)
},
expectedError: detachError,
},
// // Disk is detaching
{
name: "Detach_Is_Detaching",
instanceID: instanceID,
operationPending: operationPendingCall{volumeID, true, pending, operationFinishTimeout},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(volumeID, nodeName)
},
expectedError: operationFinishTimeout,
},
}
for _, testcase := range tests {
testcase.t = t
attachOrDetach := ""
testcase.attachOrDetach = &attachOrDetach
result, err := testcase.test(&testcase)
if err != testcase.expectedError {
t.Errorf("%s failed: expected err=%q, got %q", testcase.name, testcase.expectedError, err)
}
if result != testcase.expectedResult {
t.Errorf("%s failed: expected result=%q, got %q", testcase.name, testcase.expectedResult, result)
}
}
}
type volumeAttachmentFlag struct {
volumeID string
attached bool
}
type volumeAttachmentFlags []volumeAttachmentFlag
func (va volumeAttachmentFlags) Len() int {
return len(va)
}
func (va volumeAttachmentFlags) Swap(i, j int) {
va[i], va[j] = va[j], va[i]
}
func (va volumeAttachmentFlags) Less(i, j int) bool {
if va[i].volumeID < va[j].volumeID {
return true
}
if va[i].volumeID > va[j].volumeID {
return false
}
return va[j].attached
}
func serializeAttachments(attachments map[*volume.Spec]bool) string {
var attachmentFlags volumeAttachmentFlags
for spec, attached := range attachments {
attachmentFlags = append(attachmentFlags, volumeAttachmentFlag{spec.Name(), attached})
}
sort.Sort(attachmentFlags)
return fmt.Sprint(attachmentFlags)
}
// newPlugin creates a new gcePersistentDiskPlugin with fake cloud, NewAttacher
// and NewDetacher won't work.
func newPlugin(t *testing.T) *cinderPlugin {
host := volumetest.NewFakeVolumeHost(t, os.TempDir(), nil, nil)
plugins := ProbeVolumePlugins()
plugin := plugins[0]
plugin.Init(host)
return plugin.(*cinderPlugin)
}
func newAttacher(testcase *testcase) *cinderDiskAttacher {
return &cinderDiskAttacher{
host: nil,
cinderProvider: testcase,
}
}
func newDetacher(testcase *testcase) *cinderDiskDetacher {
return &cinderDiskDetacher{
cinderProvider: testcase,
}
}
func createVolSpec(name string, readOnly bool) *volume.Spec {
return &volume.Spec{
Volume: &v1.Volume{
Name: name,
VolumeSource: v1.VolumeSource{
Cinder: &v1.CinderVolumeSource{
VolumeID: name,
ReadOnly: readOnly,
},
},
},
}
}
func createPVSpec(name string, readOnly bool) *volume.Spec {
return &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
Cinder: &v1.CinderPersistentVolumeSource{
VolumeID: name,
ReadOnly: readOnly,
},
},
},
},
}
}
// Fake GCE implementation
type attachCall struct {
instanceID string
volumeID string
retDeviceName string
ret error
}
type detachCall struct {
instanceID string
devicePath string
ret error
}
type operationPendingCall struct {
diskName string
pending bool
volumeStatus string
ret error
}
type diskIsAttachedCall struct {
instanceID string
nodeName types.NodeName
volumeID string
isAttached bool
ret error
}
type diskPathCall struct {
instanceID string
volumeID string
retPath string
ret error
}
type disksAreAttachedCall struct {
instanceID string
nodeName types.NodeName
volumeIDs []string
areAttached map[string]bool
ret error
}
func (testcase *testcase) AttachDisk(instanceID, volumeID string) (string, error) {
expected := &testcase.attach
if expected.volumeID == "" && expected.instanceID == "" {
// testcase.attach looks uninitialized, test did not expect to call
// AttachDisk
testcase.t.Errorf("unexpected AttachDisk call")
return "", errors.New("unexpected AttachDisk call")
}
if expected.volumeID != volumeID {
testcase.t.Errorf("unexpected AttachDisk call: expected volumeID %s, got %s", expected.volumeID, volumeID)
return "", errors.New("unexpected AttachDisk call: wrong volumeID")
}
if expected.instanceID != instanceID {
testcase.t.Errorf("unexpected AttachDisk call: expected instanceID %s, got %s", expected.instanceID, instanceID)
return "", errors.New("unexpected AttachDisk call: wrong instanceID")
}
klog.V(4).Infof("AttachDisk call: %s, %s, returning %q, %v", volumeID, instanceID, expected.retDeviceName, expected.ret)
testcase.attachOrDetach = &attachStatus
return expected.retDeviceName, expected.ret
}
func (testcase *testcase) DetachDisk(instanceID, volumeID string) error {
expected := &testcase.detach
if expected.devicePath == "" && expected.instanceID == "" {
// testcase.detach looks uninitialized, test did not expect to call
// DetachDisk
testcase.t.Errorf("unexpected DetachDisk call")
return errors.New("unexpected DetachDisk call")
}
if expected.devicePath != volumeID {
testcase.t.Errorf("unexpected DetachDisk call: expected volumeID %s, got %s", expected.devicePath, volumeID)
return errors.New("unexpected DetachDisk call: wrong volumeID")
}
if expected.instanceID != instanceID {
testcase.t.Errorf("unexpected DetachDisk call: expected instanceID %s, got %s", expected.instanceID, instanceID)
return errors.New("unexpected DetachDisk call: wrong instanceID")
}
klog.V(4).Infof("DetachDisk call: %s, %s, returning %v", volumeID, instanceID, expected.ret)
testcase.attachOrDetach = &detachStatus
return expected.ret
}
func (testcase *testcase) OperationPending(diskName string) (bool, string, error) {
expected := &testcase.operationPending
if expected.volumeStatus == VolumeStatusPending {
klog.V(4).Infof("OperationPending call: %s, returning %v, %v, %v", diskName, expected.pending, expected.volumeStatus, expected.ret)
return true, expected.volumeStatus, expected.ret
}
klog.V(4).Infof("OperationPending call: %s, returning %v, %v, %v", diskName, expected.pending, expected.volumeStatus, expected.ret)
return false, expected.volumeStatus, expected.ret
}
func (testcase *testcase) DiskIsAttached(instanceID, volumeID string) (bool, error) {
expected := &testcase.diskIsAttached
// If testcase call DetachDisk*, return false
if *testcase.attachOrDetach == detachStatus {
return false, nil
}
// If testcase call AttachDisk*, return true
if *testcase.attachOrDetach == attachStatus {
return true, nil
}
if expected.volumeID == "" && expected.instanceID == "" {
// testcase.diskIsAttached looks uninitialized, test did not expect to
// call DiskIsAttached
testcase.t.Errorf("unexpected DiskIsAttached call")
return false, errors.New("unexpected DiskIsAttached call")
}
if expected.volumeID != volumeID {
testcase.t.Errorf("unexpected DiskIsAttached call: expected volumeID %s, got %s", expected.volumeID, volumeID)
return false, errors.New("unexpected DiskIsAttached call: wrong volumeID")
}
if expected.instanceID != instanceID {
testcase.t.Errorf("unexpected DiskIsAttached call: expected instanceID %s, got %s", expected.instanceID, instanceID)
return false, errors.New("unexpected DiskIsAttached call: wrong instanceID")
}
klog.V(4).Infof("DiskIsAttached call: %s, %s, returning %v, %v", volumeID, instanceID, expected.isAttached, expected.ret)
return expected.isAttached, expected.ret
}
func (testcase *testcase) GetAttachmentDiskPath(instanceID, volumeID string) (string, error) {
expected := &testcase.diskPath
if expected.volumeID == "" && expected.instanceID == "" {
// testcase.diskPath looks uninitialized, test did not expect to
// call GetAttachmentDiskPath
testcase.t.Errorf("unexpected GetAttachmentDiskPath call")
return "", errors.New("unexpected GetAttachmentDiskPath call")
}
if expected.volumeID != volumeID {
testcase.t.Errorf("unexpected GetAttachmentDiskPath call: expected volumeID %s, got %s", expected.volumeID, volumeID)
return "", errors.New("unexpected GetAttachmentDiskPath call: wrong volumeID")
}
if expected.instanceID != instanceID {
testcase.t.Errorf("unexpected GetAttachmentDiskPath call: expected instanceID %s, got %s", expected.instanceID, instanceID)
return "", errors.New("unexpected GetAttachmentDiskPath call: wrong instanceID")
}
klog.V(4).Infof("GetAttachmentDiskPath call: %s, %s, returning %v, %v", volumeID, instanceID, expected.retPath, expected.ret)
return expected.retPath, expected.ret
}
func (testcase *testcase) ShouldTrustDevicePath() bool {
return true
}
func (testcase *testcase) DiskIsAttachedByName(nodeName types.NodeName, volumeID string) (bool, string, error) {
expected := &testcase.diskIsAttached
instanceID := expected.instanceID
// If testcase call DetachDisk*, return false
if *testcase.attachOrDetach == detachStatus {
return false, instanceID, nil
}
// If testcase call AttachDisk*, return true
if *testcase.attachOrDetach == attachStatus {
return true, instanceID, nil
}
if expected.nodeName != nodeName {
testcase.t.Errorf("unexpected DiskIsAttachedByName call: expected nodename %s, got %s", expected.nodeName, nodeName)
return false, instanceID, errors.New("unexpected DiskIsAttachedByName call: wrong nodename")
}
if expected.volumeID == "" && expected.instanceID == "" {
// testcase.diskIsAttached looks uninitialized, test did not expect to
// call DiskIsAttached
testcase.t.Errorf("unexpected DiskIsAttachedByName call")
return false, instanceID, errors.New("unexpected DiskIsAttachedByName call")
}
if expected.volumeID != volumeID {
testcase.t.Errorf("unexpected DiskIsAttachedByName call: expected volumeID %s, got %s", expected.volumeID, volumeID)
return false, instanceID, errors.New("unexpected DiskIsAttachedByName call: wrong volumeID")
}
if expected.instanceID != instanceID {
testcase.t.Errorf("unexpected DiskIsAttachedByName call: expected instanceID %s, got %s", expected.instanceID, instanceID)
return false, instanceID, errors.New("unexpected DiskIsAttachedByName call: wrong instanceID")
}
klog.V(4).Infof("DiskIsAttachedByName call: %s, %s, returning %v, %v, %v", volumeID, nodeName, expected.isAttached, expected.instanceID, expected.ret)
return expected.isAttached, expected.instanceID, expected.ret
}
func (testcase *testcase) CreateVolume(name string, size int, vtype, availability string, tags *map[string]string) (string, string, string, bool, error) {
return "", "", "", false, errors.New("not implemented")
}
func (testcase *testcase) GetDevicePath(volumeID string) string {
return ""
}
func (testcase *testcase) InstanceID() (string, error) {
return testcase.instanceID, nil
}
func (testcase *testcase) ExpandVolume(volumeID string, oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error) {
return resource.Quantity{}, nil
}
func (testcase *testcase) DeleteVolume(volumeID string) error {
return errors.New("not implemented")
}
func (testcase *testcase) GetAutoLabelsForPD(name string) (map[string]string, error) {
return map[string]string{}, errors.New("not implemented")
}
func (testcase *testcase) Instances() (cloudprovider.Instances, bool) {
return &instances{testcase.instanceID}, true
}
func (testcase *testcase) InstancesV2() (cloudprovider.InstancesV2, bool) {
return nil, false
}
func (testcase *testcase) DisksAreAttached(instanceID string, volumeIDs []string) (map[string]bool, error) {
expected := &testcase.disksAreAttached
areAttached := make(map[string]bool)
if len(expected.volumeIDs) == 0 && expected.instanceID == "" {
// testcase.volumeIDs looks uninitialized, test did not expect to call DisksAreAttached
testcase.t.Errorf("Unexpected DisksAreAttached call!")
return areAttached, errors.New("unexpected DisksAreAttached call")
}
if !reflect.DeepEqual(expected.volumeIDs, volumeIDs) {
testcase.t.Errorf("Unexpected DisksAreAttached call: expected volumeIDs %v, got %v", expected.volumeIDs, volumeIDs)
return areAttached, errors.New("unexpected DisksAreAttached call: wrong volumeID")
}
if expected.instanceID != instanceID {
testcase.t.Errorf("Unexpected DisksAreAttached call: expected instanceID %s, got %s", expected.instanceID, instanceID)
return areAttached, errors.New("unexpected DisksAreAttached call: wrong instanceID")
}
klog.V(4).Infof("DisksAreAttached call: %v, %s, returning %v, %v", volumeIDs, instanceID, expected.areAttached, expected.ret)
return expected.areAttached, expected.ret
}
func (testcase *testcase) DisksAreAttachedByName(nodeName types.NodeName, volumeIDs []string) (map[string]bool, error) {
expected := &testcase.disksAreAttached
areAttached := make(map[string]bool)
instanceID := expected.instanceID
if expected.nodeName != nodeName {
testcase.t.Errorf("Unexpected DisksAreAttachedByName call: expected nodeName %s, got %s", expected.nodeName, nodeName)
return areAttached, errors.New("unexpected DisksAreAttachedByName call: wrong nodename")
}
if len(expected.volumeIDs) == 0 && expected.instanceID == "" {
// testcase.volumeIDs looks uninitialized, test did not expect to call DisksAreAttached
testcase.t.Errorf("Unexpected DisksAreAttachedByName call!")
return areAttached, errors.New("unexpected DisksAreAttachedByName call")
}
if !reflect.DeepEqual(expected.volumeIDs, volumeIDs) {
testcase.t.Errorf("Unexpected DisksAreAttachedByName call: expected volumeIDs %v, got %v", expected.volumeIDs, volumeIDs)
return areAttached, errors.New("unexpected DisksAreAttachedByName call: wrong volumeID")
}
if expected.instanceID != instanceID {
testcase.t.Errorf("Unexpected DisksAreAttachedByName call: expected instanceID %s, got %s", expected.instanceID, instanceID)
return areAttached, errors.New("unexpected DisksAreAttachedByName call: wrong instanceID")
}
klog.V(4).Infof("DisksAreAttachedByName call: %v, %s, returning %v, %v", volumeIDs, nodeName, expected.areAttached, expected.ret)
return expected.areAttached, expected.ret
}
// Implementation of fake cloudprovider.Instances
type instances struct {
instanceID string
}
func (instances *instances) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, errors.New("not implemented")
}
func (instances *instances) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, errors.New("not implemented")
}
func (instances *instances) InstanceID(ctx context.Context, name types.NodeName) (string, error) {
return instances.instanceID, nil
}
func (instances *instances) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
return "", errors.New("not implemented")
}
func (instances *instances) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", errors.New("not implemented")
}
func (instances *instances) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
return false, errors.New("unimplemented")
}
func (instances *instances) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
return false, errors.New("unimplemented")
}
func (instances *instances) InstanceMetadataByProviderID(ctx context.Context, providerID string) (*cloudprovider.InstanceMetadata, error) {
return nil, errors.New("unimplemented")
}
func (instances *instances) List(filter string) ([]types.NodeName, error) {
return []types.NodeName{}, errors.New("not implemented")
}
func (instances *instances) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}
func (instances *instances) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return "", errors.New("not implemented")
}

635
pkg/volume/cinder/cinder.go Normal file
View File

@@ -0,0 +1,635 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"errors"
"fmt"
"os"
"path"
"path/filepath"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
"k8s.io/utils/keymutex"
utilstrings "k8s.io/utils/strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/legacy-cloud-providers/openstack"
)
const (
// DefaultCloudConfigPath is the default path for cloud configuration
DefaultCloudConfigPath = "/etc/kubernetes/cloud-config"
)
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&cinderPlugin{}}
}
// BlockStorageProvider is the interface for accessing cinder functionality.
type BlockStorageProvider interface {
AttachDisk(instanceID, volumeID string) (string, error)
DetachDisk(instanceID, volumeID string) error
DeleteVolume(volumeID string) error
CreateVolume(name string, size int, vtype, availability string, tags *map[string]string) (string, string, string, bool, error)
GetDevicePath(volumeID string) string
InstanceID() (string, error)
GetAttachmentDiskPath(instanceID, volumeID string) (string, error)
OperationPending(diskName string) (bool, string, error)
DiskIsAttached(instanceID, volumeID string) (bool, error)
DiskIsAttachedByName(nodeName types.NodeName, volumeID string) (bool, string, error)
DisksAreAttachedByName(nodeName types.NodeName, volumeIDs []string) (map[string]bool, error)
ShouldTrustDevicePath() bool
Instances() (cloudprovider.Instances, bool)
ExpandVolume(volumeID string, oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error)
}
type cinderPlugin struct {
host volume.VolumeHost
// Guarding SetUp and TearDown operations
volumeLocks keymutex.KeyMutex
}
var _ volume.VolumePlugin = &cinderPlugin{}
var _ volume.PersistentVolumePlugin = &cinderPlugin{}
var _ volume.DeletableVolumePlugin = &cinderPlugin{}
var _ volume.ProvisionableVolumePlugin = &cinderPlugin{}
const (
cinderVolumePluginName = "kubernetes.io/cinder"
)
func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(cinderVolumePluginName), volName)
}
func (plugin *cinderPlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.volumeLocks = keymutex.NewHashed(0)
return nil
}
func (plugin *cinderPlugin) GetPluginName() string {
return cinderVolumePluginName
}
func (plugin *cinderPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeID, _, _, err := getVolumeInfo(spec)
if err != nil {
return "", err
}
return volumeID, nil
}
func (plugin *cinderPlugin) CanSupport(spec *volume.Spec) bool {
return (spec.Volume != nil && spec.Volume.Cinder != nil) || (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Cinder != nil)
}
func (plugin *cinderPlugin) RequiresRemount(spec *volume.Spec) bool {
return false
}
func (plugin *cinderPlugin) SupportsMountOption() bool {
return true
}
func (plugin *cinderPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *cinderPlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
return false, nil
}
var _ volume.VolumePluginWithAttachLimits = &cinderPlugin{}
func (plugin *cinderPlugin) GetVolumeLimits() (map[string]int64, error) {
volumeLimits := map[string]int64{
util.CinderVolumeLimitKey: util.DefaultMaxCinderVolumes,
}
cloud := plugin.host.GetCloudProvider()
// if we can't fetch cloudprovider we return an error
// hoping external CCM or admin can set it. Returning
// default values from here will mean, no one can
// override them.
if cloud == nil {
return nil, fmt.Errorf("no cloudprovider present")
}
if cloud.ProviderName() != openstack.ProviderName {
return nil, fmt.Errorf("expected Openstack cloud, found %s", cloud.ProviderName())
}
openstackCloud, ok := cloud.(*openstack.OpenStack)
if ok && openstackCloud.NodeVolumeAttachLimit() > 0 {
volumeLimits[util.CinderVolumeLimitKey] = int64(openstackCloud.NodeVolumeAttachLimit())
}
return volumeLimits, nil
}
func (plugin *cinderPlugin) VolumeLimitKey(spec *volume.Spec) string {
return util.CinderVolumeLimitKey
}
func (plugin *cinderPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
}
}
func (plugin *cinderPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod.UID, &DiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
}
func (plugin *cinderPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager cdManager, mounter mount.Interface) (volume.Mounter, error) {
pdName, fsType, readOnly, err := getVolumeInfo(spec)
if err != nil {
return nil, err
}
return &cinderVolumeMounter{
cinderVolume: &cinderVolume{
podUID: podUID,
volName: spec.Name(),
pdName: pdName,
mounter: mounter,
manager: manager,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
},
fsType: fsType,
readOnly: readOnly,
blockDeviceMounter: util.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host),
mountOptions: util.MountOptionFromSpec(spec),
}, nil
}
func (plugin *cinderPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
return plugin.newUnmounterInternal(volName, podUID, &DiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
}
func (plugin *cinderPlugin) newUnmounterInternal(volName string, podUID types.UID, manager cdManager, mounter mount.Interface) (volume.Unmounter, error) {
return &cinderVolumeUnmounter{
&cinderVolume{
podUID: podUID,
volName: volName,
manager: manager,
mounter: mounter,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
}}, nil
}
func (plugin *cinderPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
return plugin.newDeleterInternal(spec, &DiskUtil{})
}
func (plugin *cinderPlugin) newDeleterInternal(spec *volume.Spec, manager cdManager) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Cinder == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.Cinder is nil")
}
return &cinderVolumeDeleter{
&cinderVolume{
volName: spec.Name(),
pdName: spec.PersistentVolume.Spec.Cinder.VolumeID,
manager: manager,
plugin: plugin,
}}, nil
}
func (plugin *cinderPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
return plugin.newProvisionerInternal(options, &DiskUtil{})
}
func (plugin *cinderPlugin) newProvisionerInternal(options volume.VolumeOptions, manager cdManager) (volume.Provisioner, error) {
return &cinderVolumeProvisioner{
cinderVolume: &cinderVolume{
manager: manager,
plugin: plugin,
},
options: options,
}, nil
}
func (plugin *cinderPlugin) getCloudProvider() (BlockStorageProvider, error) {
cloud := plugin.host.GetCloudProvider()
if cloud == nil {
if _, err := os.Stat(DefaultCloudConfigPath); err == nil {
var config *os.File
config, err = os.Open(DefaultCloudConfigPath)
if err != nil {
return nil, fmt.Errorf("unable to load OpenStack configuration from default path : %v", err)
}
defer config.Close()
cloud, err = cloudprovider.GetCloudProvider(openstack.ProviderName, config)
if err != nil {
return nil, fmt.Errorf("unable to create OpenStack cloud provider from default path : %v", err)
}
} else {
return nil, fmt.Errorf("OpenStack cloud provider was not initialized properly : %v", err)
}
}
switch cloud := cloud.(type) {
case *openstack.OpenStack:
return cloud, nil
default:
return nil, errors.New("invalid cloud provider: expected OpenStack")
}
}
func (plugin *cinderPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
if !ok {
return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
}
hu := kvh.GetHostUtil()
pluginMntDir := util.GetPluginMountDir(plugin.host, plugin.GetPluginName())
sourceName, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
if err != nil {
return nil, err
}
klog.V(4).Infof("Found volume %s mounted to %s", sourceName, mountPath)
cinderVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
Cinder: &v1.CinderVolumeSource{
VolumeID: sourceName,
},
},
}
return volume.NewSpecFromVolume(cinderVolume), nil
}
var _ volume.ExpandableVolumePlugin = &cinderPlugin{}
func (plugin *cinderPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) {
volumeID, _, _, err := getVolumeInfo(spec)
if err != nil {
return oldSize, err
}
cloud, err := plugin.getCloudProvider()
if err != nil {
return oldSize, err
}
expandedSize, err := cloud.ExpandVolume(volumeID, oldSize, newSize)
if err != nil {
return oldSize, err
}
klog.V(2).Infof("volume %s expanded to new size %d successfully", volumeID, int(newSize.Value()))
return expandedSize, nil
}
func (plugin *cinderPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
fsVolume, err := util.CheckVolumeModeFilesystem(resizeOptions.VolumeSpec)
if err != nil {
return false, fmt.Errorf("error checking VolumeMode: %v", err)
}
// if volume is not a fs file system, there is nothing for us to do here.
if !fsVolume {
return true, nil
}
_, err = util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}
var _ volume.NodeExpandableVolumePlugin = &cinderPlugin{}
func (plugin *cinderPlugin) RequiresFSResize() bool {
return true
}
// Abstract interface to PD operations.
type cdManager interface {
// Attaches the disk to the kubelet's host machine.
AttachDisk(mounter *cinderVolumeMounter, globalPDPath string) error
// Detaches the disk from the kubelet's host machine.
DetachDisk(unmounter *cinderVolumeUnmounter) error
// Creates a volume
CreateVolume(provisioner *cinderVolumeProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID string, volumeSizeGB int, labels map[string]string, fstype string, err error)
// Deletes a volume
DeleteVolume(deleter *cinderVolumeDeleter) error
}
var _ volume.Mounter = &cinderVolumeMounter{}
type cinderVolumeMounter struct {
*cinderVolume
fsType string
readOnly bool
blockDeviceMounter *mount.SafeFormatAndMount
mountOptions []string
}
// cinderPersistentDisk volumes are disk resources provided by C3
// that are attached to the kubelet's host machine and exposed to the pod.
type cinderVolume struct {
volName string
podUID types.UID
// Unique identifier of the volume, used to find the disk resource in the provider.
pdName string
// Filesystem type, optional.
fsType string
// Utility interface that provides API calls to the provider to attach/detach disks.
manager cdManager
// Mounter interface that provides system calls to mount the global path to the pod local path.
mounter mount.Interface
plugin *cinderPlugin
volume.MetricsProvider
}
func (b *cinderVolumeMounter) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: b.readOnly,
Managed: !b.readOnly,
SELinuxRelabel: true,
}
}
func (b *cinderVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetUp bind mounts to the volume path.
func (b *cinderVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(5).Infof("Cinder SetUp %s to %s", b.pdName, dir)
b.plugin.volumeLocks.LockKey(b.pdName)
defer b.plugin.volumeLocks.UnlockKey(b.pdName)
notmnt, err := b.mounter.IsLikelyNotMountPoint(dir)
if err != nil && !os.IsNotExist(err) {
klog.Errorf("Cannot validate mount point: %s %v", dir, err)
return err
}
if !notmnt {
klog.V(4).Infof("Something is already mounted to target %s", dir)
return nil
}
globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName)
options := []string{"bind"}
if b.readOnly {
options = append(options, "ro")
}
if err := os.MkdirAll(dir, 0750); err != nil {
klog.V(4).Infof("Could not create directory %s: %v", dir, err)
return err
}
mountOptions := util.JoinMountOptions(options, b.mountOptions)
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
klog.V(4).Infof("Attempting to mount cinder volume %s to %s with options %v", b.pdName, dir, mountOptions)
err = b.mounter.MountSensitiveWithoutSystemd(globalPDPath, dir, "", options, nil)
if err != nil {
klog.V(4).Infof("Mount failed: %v", err)
notmnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if !notmnt {
if mntErr = b.mounter.Unmount(dir); mntErr != nil {
klog.Errorf("Failed to unmount: %v", mntErr)
return err
}
notmnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if !notmnt {
// This is very odd, we don't expect it. We'll try again next sync loop.
klog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", b.GetPath())
return err
}
}
os.Remove(dir)
klog.Errorf("Failed to mount %s: %v", dir, err)
return err
}
if !b.readOnly {
volume.SetVolumeOwnership(b, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil))
}
klog.V(3).Infof("Cinder volume %s mounted to %s", b.pdName, dir)
return nil
}
func makeGlobalPDName(host volume.VolumeHost, devName string) string {
return filepath.Join(host.GetPluginDir(cinderVolumePluginName), util.MountsInGlobalPDPath, devName)
}
func (cd *cinderVolume) GetPath() string {
return getPath(cd.podUID, cd.volName, cd.plugin.host)
}
type cinderVolumeUnmounter struct {
*cinderVolume
}
var _ volume.Unmounter = &cinderVolumeUnmounter{}
func (c *cinderVolumeUnmounter) TearDown() error {
return c.TearDownAt(c.GetPath())
}
// Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet.
func (c *cinderVolumeUnmounter) TearDownAt(dir string) error {
if pathExists, pathErr := mount.PathExists(dir); pathErr != nil {
return fmt.Errorf("error checking if path exists: %v", pathErr)
} else if !pathExists {
klog.Warningf("Warning: Unmount skipped because path does not exist: %w", dir)
return nil
}
klog.V(5).Infof("Cinder TearDown of %s", dir)
notmnt, err := c.mounter.IsLikelyNotMountPoint(dir)
if err != nil {
klog.V(4).Infof("IsLikelyNotMountPoint check failed: %v", err)
return err
}
if notmnt {
klog.V(4).Infof("Nothing is mounted to %s, ignoring", dir)
return os.Remove(dir)
}
// Find Cinder volumeID to lock the right volume
// TODO: refactor VolumePlugin.NewUnmounter to get full volume.Spec just like
// NewMounter. We could then find volumeID there without probing MountRefs.
refs, err := c.mounter.GetMountRefs(dir)
if err != nil {
klog.V(4).Infof("GetMountRefs failed: %v", err)
return err
}
if len(refs) == 0 {
klog.V(4).Infof("Directory %s is not mounted", dir)
return fmt.Errorf("directory %s is not mounted", dir)
}
c.pdName = path.Base(refs[0])
klog.V(4).Infof("Found volume %s mounted to %s", c.pdName, dir)
// lock the volume (and thus wait for any concurrent SetUpAt to finish)
c.plugin.volumeLocks.LockKey(c.pdName)
defer c.plugin.volumeLocks.UnlockKey(c.pdName)
// Reload list of references, there might be SetUpAt finished in the meantime
_, err = c.mounter.GetMountRefs(dir)
if err != nil {
klog.V(4).Infof("GetMountRefs failed: %v", err)
return err
}
if err := c.mounter.Unmount(dir); err != nil {
klog.V(4).Infof("Unmount failed: %v", err)
return err
}
klog.V(3).Infof("Successfully unmounted: %s\n", dir)
notmnt, mntErr := c.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if notmnt {
if err := os.Remove(dir); err != nil {
klog.V(4).Infof("Failed to remove directory after unmount: %v", err)
return err
}
}
return nil
}
type cinderVolumeDeleter struct {
*cinderVolume
}
var _ volume.Deleter = &cinderVolumeDeleter{}
func (r *cinderVolumeDeleter) GetPath() string {
return getPath(r.podUID, r.volName, r.plugin.host)
}
func (r *cinderVolumeDeleter) Delete() error {
return r.manager.DeleteVolume(r)
}
type cinderVolumeProvisioner struct {
*cinderVolume
options volume.VolumeOptions
}
var _ volume.Provisioner = &cinderVolumeProvisioner{}
func (c *cinderVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
if !util.ContainsAllAccessModes(c.plugin.GetAccessModes(), c.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", c.options.PVC.Spec.AccessModes, c.plugin.GetAccessModes())
}
volumeID, sizeGB, labels, fstype, err := c.manager.CreateVolume(c, selectedNode, allowedTopologies)
if err != nil {
return nil, err
}
if fstype == "" {
fstype = "ext4"
}
volumeMode := c.options.PVC.Spec.VolumeMode
if volumeMode != nil && *volumeMode == v1.PersistentVolumeBlock {
// Block volumes should not have any FSType
fstype = ""
}
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: c.options.PVName,
Labels: labels,
Annotations: map[string]string{
util.VolumeDynamicallyCreatedByKey: "cinder-dynamic-provisioner",
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: c.options.PersistentVolumeReclaimPolicy,
AccessModes: c.options.PVC.Spec.AccessModes,
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)),
},
VolumeMode: volumeMode,
PersistentVolumeSource: v1.PersistentVolumeSource{
Cinder: &v1.CinderPersistentVolumeSource{
VolumeID: volumeID,
FSType: fstype,
ReadOnly: false,
},
},
MountOptions: c.options.MountOptions,
},
}
if len(c.options.PVC.Spec.AccessModes) == 0 {
pv.Spec.AccessModes = c.plugin.GetAccessModes()
}
requirements := make([]v1.NodeSelectorRequirement, 0)
for k, v := range labels {
if v != "" {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: []string{v}})
}
}
if len(requirements) > 0 {
pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
pv.Spec.NodeAffinity.Required = new(v1.NodeSelector)
pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions = requirements
}
return pv, nil
}
func getVolumeInfo(spec *volume.Spec) (string, string, bool, error) {
if spec.Volume != nil && spec.Volume.Cinder != nil {
return spec.Volume.Cinder.VolumeID, spec.Volume.Cinder.FSType, spec.Volume.Cinder.ReadOnly, nil
} else if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.Cinder != nil {
return spec.PersistentVolume.Spec.Cinder.VolumeID, spec.PersistentVolume.Spec.Cinder.FSType, spec.ReadOnly, nil
}
return "", "", false, fmt.Errorf("Spec does not reference a Cinder volume type")
}

View File

@@ -0,0 +1,179 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"fmt"
"path/filepath"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilstrings "k8s.io/utils/strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
var _ volume.VolumePlugin = &cinderPlugin{}
var _ volume.PersistentVolumePlugin = &cinderPlugin{}
var _ volume.BlockVolumePlugin = &cinderPlugin{}
var _ volume.DeletableVolumePlugin = &cinderPlugin{}
var _ volume.ProvisionableVolumePlugin = &cinderPlugin{}
var _ volume.ExpandableVolumePlugin = &cinderPlugin{}
func (plugin *cinderPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName, mapPath string) (*volume.Spec, error) {
pluginDir := plugin.host.GetVolumeDevicePluginDir(cinderVolumePluginName)
blkutil := volumepathhandler.NewBlockVolumePathHandler()
globalMapPathUUID, err := blkutil.FindGlobalMapPathUUIDFromPod(pluginDir, mapPath, podUID)
if err != nil {
return nil, err
}
klog.V(5).Infof("globalMapPathUUID: %v, err: %v", globalMapPathUUID, err)
globalMapPath := filepath.Dir(globalMapPathUUID)
if len(globalMapPath) <= 1 {
return nil, fmt.Errorf("failed to get volume plugin information from globalMapPathUUID: %v", globalMapPathUUID)
}
return getVolumeSpecFromGlobalMapPath(volumeName, globalMapPath)
}
func getVolumeSpecFromGlobalMapPath(volumeName, globalMapPath string) (*volume.Spec, error) {
// Get volume spec information from globalMapPath
// globalMapPath example:
// plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumeID}
// plugins/kubernetes.io/cinder/volumeDevices/vol-XXXXXX
vID := filepath.Base(globalMapPath)
if len(vID) <= 1 {
return nil, fmt.Errorf("failed to get volumeID from global path=%s", globalMapPath)
}
block := v1.PersistentVolumeBlock
cinderVolume := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
Cinder: &v1.CinderPersistentVolumeSource{
VolumeID: vID,
},
},
VolumeMode: &block,
},
}
return volume.NewSpecFromPersistentVolume(cinderVolume, true), nil
}
// NewBlockVolumeMapper creates a new volume.BlockVolumeMapper from an API specification.
func (plugin *cinderPlugin) NewBlockVolumeMapper(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
// If this is called via GenerateUnmapDeviceFunc(), pod is nil.
// Pass empty string as dummy uid since uid isn't used in the case.
var uid types.UID
if pod != nil {
uid = pod.UID
}
return plugin.newBlockVolumeMapperInternal(spec, uid, &DiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
}
func (plugin *cinderPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID types.UID, manager cdManager, mounter mount.Interface) (volume.BlockVolumeMapper, error) {
pdName, fsType, readOnly, err := getVolumeInfo(spec)
if err != nil {
return nil, err
}
mapper := &cinderVolumeMapper{
cinderVolume: &cinderVolume{
podUID: podUID,
volName: spec.Name(),
pdName: pdName,
fsType: fsType,
manager: manager,
mounter: mounter,
plugin: plugin,
},
readOnly: readOnly,
}
blockPath, err := mapper.GetGlobalMapPath(spec)
if err != nil {
return nil, fmt.Errorf("failed to get device path: %v", err)
}
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
return mapper, nil
}
func (plugin *cinderPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
return plugin.newUnmapperInternal(volName, podUID, &DiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
}
func (plugin *cinderPlugin) newUnmapperInternal(volName string, podUID types.UID, manager cdManager, mounter mount.Interface) (volume.BlockVolumeUnmapper, error) {
return &cinderPluginUnmapper{
cinderVolume: &cinderVolume{
podUID: podUID,
volName: volName,
manager: manager,
mounter: mounter,
plugin: plugin,
}}, nil
}
type cinderPluginUnmapper struct {
*cinderVolume
volume.MetricsNil
}
var _ volume.BlockVolumeUnmapper = &cinderPluginUnmapper{}
type cinderVolumeMapper struct {
*cinderVolume
readOnly bool
}
var _ volume.BlockVolumeMapper = &cinderVolumeMapper{}
// GetGlobalMapPath returns global map path and error
// path: plugins/kubernetes.io/{PluginName}/volumeDevices/volumeID
//
// plugins/kubernetes.io/cinder/volumeDevices/vol-XXXXXX
func (cd *cinderVolume) GetGlobalMapPath(spec *volume.Spec) (string, error) {
pdName, _, _, err := getVolumeInfo(spec)
if err != nil {
return "", err
}
return filepath.Join(cd.plugin.host.GetVolumeDevicePluginDir(cinderVolumePluginName), pdName), nil
}
// GetPodDeviceMapPath returns pod device map path and volume name
// path: pods/{podUid}/volumeDevices/kubernetes.io~cinder
func (cd *cinderVolume) GetPodDeviceMapPath() (string, string) {
name := cinderVolumePluginName
return cd.plugin.host.GetPodVolumeDeviceDir(cd.podUID, utilstrings.EscapeQualifiedName(name)), cd.volName
}
// SupportsMetrics returns true for cinderVolumeMapper as it initializes the
// MetricsProvider.
func (cvm *cinderVolumeMapper) SupportsMetrics() bool {
return true
}

View File

@@ -0,0 +1,151 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"os"
"path/filepath"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
const (
testVolName = "vol-1234"
testPVName = "pv1"
testGlobalPath = "plugins/kubernetes.io/cinder/volumeDevices/vol-1234"
testPodPath = "pods/poduid/volumeDevices/kubernetes.io~cinder"
)
func TestGetVolumeSpecFromGlobalMapPath(t *testing.T) {
// make our test path for fake GlobalMapPath
// /tmp symbolized our pluginDir
// /tmp/testGlobalPathXXXXX/plugins/kubernetes.io/cinder/volumeDevices/pdVol1
tmpVDir, err := utiltesting.MkTmpdir("cinderBlockTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
//deferred clean up
defer os.RemoveAll(tmpVDir)
expectedGlobalPath := filepath.Join(tmpVDir, testGlobalPath)
//Bad Path
badspec, err := getVolumeSpecFromGlobalMapPath("", "")
if badspec != nil || err == nil {
t.Errorf("Expected not to get spec from GlobalMapPath but did")
}
// Good Path
spec, err := getVolumeSpecFromGlobalMapPath("myVolume", expectedGlobalPath)
if spec == nil || err != nil {
t.Fatalf("Failed to get spec from GlobalMapPath: %v", err)
}
if spec.PersistentVolume.Name != "myVolume" {
t.Errorf("Invalid PV name from GlobalMapPath spec: %s", spec.PersistentVolume.Name)
}
if spec.PersistentVolume.Spec.Cinder.VolumeID != testVolName {
t.Errorf("Invalid volumeID from GlobalMapPath spec: %s", spec.PersistentVolume.Spec.Cinder.VolumeID)
}
block := v1.PersistentVolumeBlock
specMode := spec.PersistentVolume.Spec.VolumeMode
if specMode == nil {
t.Fatalf("Failed to get volumeMode from PersistentVolumeBlock")
}
if *specMode != block {
t.Errorf("Invalid volumeMode from GlobalMapPath spec: %v expected: %v", *specMode, block)
}
}
func getTestVolume(readOnly bool, isBlock bool) *volume.Spec {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: testPVName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
Cinder: &v1.CinderPersistentVolumeSource{
VolumeID: testVolName,
},
},
},
}
if isBlock {
blockMode := v1.PersistentVolumeBlock
pv.Spec.VolumeMode = &blockMode
}
return volume.NewSpecFromPersistentVolume(pv, readOnly)
}
func TestGetPodAndPluginMapPaths(t *testing.T) {
tmpVDir, err := utiltesting.MkTmpdir("cinderBlockTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
//deferred clean up
defer os.RemoveAll(tmpVDir)
expectedGlobalPath := filepath.Join(tmpVDir, testGlobalPath)
expectedPodPath := filepath.Join(tmpVDir, testPodPath)
spec := getTestVolume(false, true /*isBlock*/)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(t, tmpVDir, nil, nil))
plug, err := plugMgr.FindMapperPluginByName(cinderVolumePluginName)
if err != nil {
os.RemoveAll(tmpVDir)
t.Fatalf("Can't find the plugin by name: %q", cinderVolumePluginName)
}
if plug.GetPluginName() != cinderVolumePluginName {
t.Fatalf("Wrong name: %s", plug.GetPluginName())
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}}
mapper, err := plug.NewBlockVolumeMapper(spec, pod, volume.VolumeOptions{})
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mapper == nil {
t.Fatalf("Got a nil Mounter")
}
//GetGlobalMapPath
gMapPath, err := mapper.GetGlobalMapPath(spec)
if err != nil || len(gMapPath) == 0 {
t.Fatalf("Invalid GlobalMapPath from spec: %s", spec.PersistentVolume.Spec.Cinder.VolumeID)
}
if gMapPath != expectedGlobalPath {
t.Errorf("Failed to get GlobalMapPath: %s %s", gMapPath, expectedGlobalPath)
}
//GetPodDeviceMapPath
gDevicePath, gVolName := mapper.GetPodDeviceMapPath()
if gDevicePath != expectedPodPath {
t.Errorf("Got unexpected pod path: %s, expected %s", gDevicePath, expectedPodPath)
}
if gVolName != testPVName {
t.Errorf("Got unexpected volNamne: %s, expected %s", gVolName, testPVName)
}
}

View File

@@ -0,0 +1,365 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/mount-utils"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/legacy-cloud-providers/openstack"
)
func TestCanSupport(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("cinderTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/cinder")
if err != nil {
t.Fatal("Can't find the plugin by name")
}
if plug.GetPluginName() != "kubernetes.io/cinder" {
t.Errorf("Wrong name: %s", plug.GetPluginName())
}
if !plug.CanSupport(&volume.Spec{Volume: &v1.Volume{VolumeSource: v1.VolumeSource{Cinder: &v1.CinderVolumeSource{}}}}) {
t.Errorf("Expected true")
}
if !plug.CanSupport(&volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{PersistentVolumeSource: v1.PersistentVolumeSource{Cinder: &v1.CinderPersistentVolumeSource{}}}}}) {
t.Errorf("Expected true")
}
}
type fakePDManager struct {
// How long should AttachDisk/DetachDisk take - we need slower AttachDisk in a test.
attachDetachDuration time.Duration
}
func getFakeDeviceName(host volume.VolumeHost, pdName string) string {
return filepath.Join(host.GetPluginDir(cinderVolumePluginName), "device", pdName)
}
// Real Cinder AttachDisk attaches a cinder volume. If it is not yet mounted,
// it mounts it to globalPDPath.
// We create a dummy directory (="device") and bind-mount it to globalPDPath
func (fake *fakePDManager) AttachDisk(b *cinderVolumeMounter, globalPDPath string) error {
globalPath := makeGlobalPDName(b.plugin.host, b.pdName)
fakeDeviceName := getFakeDeviceName(b.plugin.host, b.pdName)
err := os.MkdirAll(fakeDeviceName, 0750)
if err != nil {
return err
}
// Attaching a Cinder volume can be slow...
time.Sleep(fake.attachDetachDuration)
// The volume is "attached", bind-mount it if it's not mounted yet.
notmnt, err := b.mounter.IsLikelyNotMountPoint(globalPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(globalPath, 0750); err != nil {
return err
}
notmnt = true
} else {
return err
}
}
if notmnt {
err = b.mounter.MountSensitiveWithoutSystemd(fakeDeviceName, globalPath, "", []string{"bind"}, nil)
if err != nil {
return err
}
}
return nil
}
func (fake *fakePDManager) DetachDisk(c *cinderVolumeUnmounter) error {
globalPath := makeGlobalPDName(c.plugin.host, c.pdName)
fakeDeviceName := getFakeDeviceName(c.plugin.host, c.pdName)
// unmount the bind-mount - should be fast
err := c.mounter.Unmount(globalPath)
if err != nil {
return err
}
// "Detach" the fake "device"
err = os.RemoveAll(fakeDeviceName)
if err != nil {
return err
}
return nil
}
func (fake *fakePDManager) CreateVolume(c *cinderVolumeProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID string, volumeSizeGB int, labels map[string]string, fstype string, err error) {
labels = make(map[string]string)
labels[v1.LabelTopologyZone] = "nova"
return "test-volume-name", 1, labels, "", nil
}
func (fake *fakePDManager) DeleteVolume(cd *cinderVolumeDeleter) error {
if cd.pdName != "test-volume-name" {
return fmt.Errorf("Deleter got unexpected volume name: %s", cd.pdName)
}
return nil
}
func TestPlugin(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("cinderTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/cinder")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
spec := &v1.Volume{
Name: "vol1",
VolumeSource: v1.VolumeSource{
Cinder: &v1.CinderVolumeSource{
VolumeID: "pd",
FSType: "ext4",
},
},
}
mounter, err := plug.(*cinderPlugin).newMounterInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), &fakePDManager{0}, mount.NewFakeMounter(nil))
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
volPath := filepath.Join(tmpDir, "pods/poduid/volumes/kubernetes.io~cinder/vol1")
path := mounter.GetPath()
if path != volPath {
t.Errorf("Got unexpected path: %s", path)
}
if err := mounter.SetUp(volume.MounterArgs{}); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
unmounter, err := plug.(*cinderPlugin).newUnmounterInternal("vol1", types.UID("poduid"), &fakePDManager{0}, mount.NewFakeMounter(nil))
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
if unmounter == nil {
t.Errorf("Got a nil Unmounter")
}
if err := unmounter.TearDown(); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(path); err == nil {
t.Errorf("TearDown() failed, volume path still exists: %s", path)
} else if !os.IsNotExist(err) {
t.Errorf("TearDown() failed: %v", err)
}
// Test Provisioner
options := volume.VolumeOptions{
PVC: volumetest.CreateTestPVC("100Mi", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}),
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
}
provisioner, err := plug.(*cinderPlugin).newProvisionerInternal(options, &fakePDManager{0})
if err != nil {
t.Errorf("ProvisionerInternal() failed: %v", err)
}
persistentSpec, err := provisioner.Provision(nil, nil)
if err != nil {
t.Errorf("Provision() failed: %v", err)
}
if persistentSpec.Spec.PersistentVolumeSource.Cinder.VolumeID != "test-volume-name" {
t.Errorf("Provision() returned unexpected volume ID: %s", persistentSpec.Spec.PersistentVolumeSource.Cinder.VolumeID)
}
cap := persistentSpec.Spec.Capacity[v1.ResourceStorage]
size := cap.Value()
if size != 1024*1024*1024 {
t.Errorf("Provision() returned unexpected volume size: %v", size)
}
// check nodeaffinity members
if persistentSpec.Spec.NodeAffinity == nil {
t.Errorf("Provision() returned unexpected nil NodeAffinity")
}
if persistentSpec.Spec.NodeAffinity.Required == nil {
t.Errorf("Provision() returned unexpected nil NodeAffinity.Required")
}
n := len(persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms)
if n != 1 {
t.Errorf("Provision() returned unexpected number of NodeSelectorTerms %d. Expected %d", n, 1)
}
n = len(persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions)
if n != 1 {
t.Errorf("Provision() returned unexpected number of MatchExpressions %d. Expected %d", n, 1)
}
req := persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0]
if req.Key != v1.LabelTopologyZone {
t.Errorf("Provision() returned unexpected requirement key in NodeAffinity %v", req.Key)
}
if req.Operator != v1.NodeSelectorOpIn {
t.Errorf("Provision() returned unexpected requirement operator in NodeAffinity %v", req.Operator)
}
if len(req.Values) != 1 || req.Values[0] != "nova" {
t.Errorf("Provision() returned unexpected requirement value in NodeAffinity %v", req.Values)
}
// Test Deleter
volSpec := &volume.Spec{
PersistentVolume: persistentSpec,
}
deleter, err := plug.(*cinderPlugin).newDeleterInternal(volSpec, &fakePDManager{0})
if err != nil {
t.Errorf("DeleterInternal() failed: %v", err)
}
err = deleter.Delete()
if err != nil {
t.Errorf("Deleter() failed: %v", err)
}
}
func TestGetVolumeLimit(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("cinderTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
cloud, err := getOpenstackCloudProvider()
if err != nil {
t.Fatalf("can not instantiate openstack cloudprovider : %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
volumeHost := volumetest.NewFakeKubeletVolumeHostWithCloudProvider(t, tmpDir, nil, nil, cloud)
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumeHost)
plug, err := plugMgr.FindPluginByName("kubernetes.io/cinder")
if err != nil {
t.Fatalf("Can't find the plugin by name")
}
attachablePlugin, ok := plug.(volume.VolumePluginWithAttachLimits)
if !ok {
t.Fatalf("plugin %s is not of attachable type", plug.GetPluginName())
}
limits, err := attachablePlugin.GetVolumeLimits()
if err != nil {
t.Errorf("error fetching limits : %v", err)
}
if len(limits) == 0 {
t.Fatalf("expecting limit from openstack got none")
}
limit, _ := limits[util.CinderVolumeLimitKey]
if limit != 10 {
t.Fatalf("expected volume limit to be 10 got %d", limit)
}
}
func getOpenstackCloudProvider() (*openstack.OpenStack, error) {
cfg := getOpenstackConfig()
return openstack.NewFakeOpenStackCloud(cfg)
}
func getOpenstackConfig() openstack.Config {
cfg := openstack.Config{
Global: struct {
AuthURL string `gcfg:"auth-url"`
Username string
UserID string `gcfg:"user-id"`
Password string `datapolicy:"password"`
TenantID string `gcfg:"tenant-id"`
TenantName string `gcfg:"tenant-name"`
TrustID string `gcfg:"trust-id"`
DomainID string `gcfg:"domain-id"`
DomainName string `gcfg:"domain-name"`
Region string
CAFile string `gcfg:"ca-file"`
SecretName string `gcfg:"secret-name"`
SecretNamespace string `gcfg:"secret-namespace"`
KubeconfigPath string `gcfg:"kubeconfig-path"`
}{
Username: "user",
Password: "pass",
TenantID: "foobar",
DomainID: "2a73b8f597c04551a0fdc8e95544be8a",
DomainName: "local",
AuthURL: "http://auth.url",
UserID: "user",
},
BlockStorage: openstack.BlockStorageOpts{
NodeVolumeAttachLimit: 10,
},
}
return cfg
}
func TestUnsupportedVolumeHost(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("cinderTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/cinder")
if err != nil {
t.Fatal("Can't find the plugin by name")
}
_, err = plug.ConstructVolumeSpec("", "")
if err == nil {
t.Errorf("Expected failure constructing volume spec with unsupported VolumeHost")
}
}

View File

@@ -0,0 +1,278 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/exec"
)
// DiskUtil has utility/helper methods
type DiskUtil struct{}
// AttachDisk attaches a disk specified by a volume.CinderPersistenDisk to the current kubelet.
// Mounts the disk to its global path.
func (util *DiskUtil) AttachDisk(b *cinderVolumeMounter, globalPDPath string) error {
options := []string{}
if b.readOnly {
options = append(options, "ro")
}
cloud, err := b.plugin.getCloudProvider()
if err != nil {
return err
}
instanceid, err := cloud.InstanceID()
if err != nil {
return err
}
diskid, err := cloud.AttachDisk(instanceid, b.pdName)
if err != nil {
return err
}
var devicePath string
numTries := 0
for {
devicePath = cloud.GetDevicePath(diskid)
probeAttachedVolume()
_, err := os.Stat(devicePath)
if err == nil {
break
}
if err != nil && !os.IsNotExist(err) {
return err
}
numTries++
if numTries == 10 {
return errors.New("could not attach disk: Timeout after 60s")
}
time.Sleep(time.Second * 6)
}
notmnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
return err
}
notmnt = true
} else {
return err
}
}
if notmnt {
err = b.blockDeviceMounter.FormatAndMount(devicePath, globalPDPath, b.fsType, options)
if err != nil {
os.Remove(globalPDPath)
return err
}
klog.V(2).Infof("Safe mount successful: %q\n", devicePath)
}
return nil
}
// DetachDisk unmounts the device and detaches the disk from the kubelet's host machine.
func (util *DiskUtil) DetachDisk(cd *cinderVolumeUnmounter) error {
globalPDPath := makeGlobalPDName(cd.plugin.host, cd.pdName)
if err := cd.mounter.Unmount(globalPDPath); err != nil {
return err
}
if err := os.Remove(globalPDPath); err != nil {
return err
}
klog.V(2).Infof("Successfully unmounted main device: %s\n", globalPDPath)
cloud, err := cd.plugin.getCloudProvider()
if err != nil {
return err
}
instanceid, err := cloud.InstanceID()
if err != nil {
return err
}
if err = cloud.DetachDisk(instanceid, cd.pdName); err != nil {
return err
}
klog.V(2).Infof("Successfully detached cinder volume %s", cd.pdName)
return nil
}
// DeleteVolume uses the cloud entrypoint to delete specified volume
func (util *DiskUtil) DeleteVolume(cd *cinderVolumeDeleter) error {
cloud, err := cd.plugin.getCloudProvider()
if err != nil {
return err
}
if err = cloud.DeleteVolume(cd.pdName); err != nil {
// OpenStack cloud provider returns volume.tryAgainError when necessary,
// no handling needed here.
klog.V(2).Infof("Error deleting cinder volume %s: %v", cd.pdName, err)
return err
}
klog.V(2).Infof("Successfully deleted cinder volume %s", cd.pdName)
return nil
}
func getZonesFromNodes(kubeClient clientset.Interface) (sets.String, error) {
// TODO: caching, currently it is overkill because it calls this function
// only when it creates dynamic PV
zones := make(sets.String)
nodes, err := kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.V(2).Infof("Error listing nodes")
return zones, err
}
for _, node := range nodes.Items {
if zone, ok := node.Labels[v1.LabelTopologyZone]; ok {
zones.Insert(zone)
}
}
klog.V(4).Infof("zones found: %v", zones)
return zones, nil
}
// CreateVolume uses the cloud provider entrypoint for creating a volume
func (util *DiskUtil) CreateVolume(c *cinderVolumeProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID string, volumeSizeGB int, volumeLabels map[string]string, fstype string, err error) {
cloud, err := c.plugin.getCloudProvider()
if err != nil {
return "", 0, nil, "", err
}
capacity := c.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
// Cinder works with gigabytes, convert to GiB with rounding up
volSizeGiB, err := volumehelpers.RoundUpToGiBInt(capacity)
if err != nil {
return "", 0, nil, "", err
}
name := volutil.GenerateVolumeName(c.options.ClusterName, c.options.PVName, 255) // Cinder volume name can have up to 255 characters
vtype := ""
availability := ""
// Apply ProvisionerParameters (case-insensitive). We leave validation of
// the values to the cloud provider.
for k, v := range c.options.Parameters {
switch strings.ToLower(k) {
case "type":
vtype = v
case "availability":
availability = v
case volume.VolumeParameterFSType:
fstype = v
default:
return "", 0, nil, "", fmt.Errorf("invalid option %q for volume plugin %s", k, c.plugin.GetPluginName())
}
}
// TODO: implement PVC.Selector parsing
if c.options.PVC.Spec.Selector != nil {
return "", 0, nil, "", fmt.Errorf("claim.Spec.Selector is not supported for dynamic provisioning on Cinder")
}
if availability == "" {
// No zone specified, choose one randomly in the same region
zones, err := getZonesFromNodes(c.plugin.host.GetKubeClient())
if err != nil {
klog.V(2).Infof("error getting zone information: %v", err)
return "", 0, nil, "", err
}
// if we did not get any zones, lets leave it blank and gophercloud will
// use zone "nova" as default
if len(zones) > 0 {
availability, err = volumehelpers.SelectZoneForVolume(false, false, "", nil, zones, node, allowedTopologies, c.options.PVC.Name)
if err != nil {
klog.V(2).Infof("error selecting zone for volume: %v", err)
return "", 0, nil, "", err
}
}
}
volumeID, volumeAZ, volumeRegion, IgnoreVolumeAZ, err := cloud.CreateVolume(name, volSizeGiB, vtype, availability, c.options.CloudTags)
if err != nil {
klog.V(2).Infof("Error creating cinder volume: %v", err)
return "", 0, nil, "", err
}
klog.V(2).Infof("Successfully created cinder volume %s", volumeID)
// these are needed that pod is spawning to same AZ
volumeLabels = make(map[string]string)
if IgnoreVolumeAZ == false {
if volumeAZ != "" {
volumeLabels[v1.LabelTopologyZone] = volumeAZ
}
if volumeRegion != "" {
volumeLabels[v1.LabelTopologyRegion] = volumeRegion
}
}
return volumeID, volSizeGiB, volumeLabels, fstype, nil
}
func probeAttachedVolume() error {
// rescan scsi bus
scsiHostRescan()
executor := exec.New()
// udevadm settle waits for udevd to process the device creation
// events for all hardware devices, thus ensuring that any device
// nodes have been created successfully before proceeding.
argsSettle := []string{"settle"}
cmdSettle := executor.Command("udevadm", argsSettle...)
_, errSettle := cmdSettle.CombinedOutput()
if errSettle != nil {
klog.Errorf("error running udevadm settle %v\n", errSettle)
}
args := []string{"trigger"}
cmd := executor.Command("udevadm", args...)
_, err := cmd.CombinedOutput()
if err != nil {
klog.Errorf("error running udevadm trigger %v\n", err)
return err
}
klog.V(4).Infof("Successfully probed all attachments")
return nil
}
func scsiHostRescan() {
scsiPath := "/sys/class/scsi_host/"
if dirs, err := ioutil.ReadDir(scsiPath); err == nil {
for _, f := range dirs {
name := scsiPath + f.Name() + "/scan"
data := []byte("- - -")
ioutil.WriteFile(name, data, 0666)
}
}
}

18
pkg/volume/cinder/doc.go Normal file
View File

@@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cinder contains the internal representation of cinder volumes.
package cinder // import "k8s.io/kubernetes/pkg/volume/cinder"

View File

@@ -222,6 +222,9 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
csitranslationplugins.AWSEBSInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
},
csitranslationplugins.CinderInTreePluginName: func() bool {
return true
},
csitranslationplugins.AzureDiskInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk)
},

View File

@@ -68,6 +68,8 @@ func (pm PluginManager) IsMigrationCompleteForPlugin(pluginName string) bool {
return pm.featureGate.Enabled(features.InTreePluginAzureFileUnregister)
case csilibplugins.AzureDiskInTreePluginName:
return pm.featureGate.Enabled(features.InTreePluginAzureDiskUnregister)
case csilibplugins.CinderInTreePluginName:
return pm.featureGate.Enabled(features.InTreePluginOpenStackUnregister)
case csilibplugins.VSphereInTreePluginName:
return pm.featureGate.Enabled(features.InTreePluginvSphereUnregister)
case csilibplugins.PortworxVolumePluginName:
@@ -94,6 +96,8 @@ func (pm PluginManager) IsMigrationEnabledForPlugin(pluginName string) bool {
return pm.featureGate.Enabled(features.CSIMigrationAzureFile)
case csilibplugins.AzureDiskInTreePluginName:
return pm.featureGate.Enabled(features.CSIMigrationAzureDisk)
case csilibplugins.CinderInTreePluginName:
return true
case csilibplugins.VSphereInTreePluginName:
return pm.featureGate.Enabled(features.CSIMigrationvSphere)
case csilibplugins.PortworxVolumePluginName:

View File

@@ -40,6 +40,13 @@ const (
// GCEVolumeLimitKey stores resource name that will store volume limits for GCE node
GCEVolumeLimitKey = "attachable-volumes-gce-pd"
// CinderVolumeLimitKey contains Volume limit key for Cinder
CinderVolumeLimitKey = "attachable-volumes-cinder"
// DefaultMaxCinderVolumes defines the maximum number of PD Volumes for Cinder
// For Openstack we are keeping this to a high enough value so as depending on backend
// cluster admins can configure it.
DefaultMaxCinderVolumes = 256
// CSIAttachLimitPrefix defines prefix used for CSI volumes
CSIAttachLimitPrefix = "attachable-volumes-csi-"

View File

@@ -21,6 +21,7 @@ import (
"os"
"reflect"
"runtime"
"strings"
"testing"
v1 "k8s.io/api/core/v1"
@@ -260,6 +261,30 @@ func TestFsUserFrom(t *testing.T) {
}
}
func TestGenerateVolumeName(t *testing.T) {
// Normal operation, no truncate
v1 := GenerateVolumeName("kubernetes", "pv-cinder-abcde", 255)
if v1 != "kubernetes-dynamic-pv-cinder-abcde" {
t.Errorf("Expected kubernetes-dynamic-pv-cinder-abcde, got %s", v1)
}
// Truncate trailing "6789-dynamic"
prefix := strings.Repeat("0123456789", 9) // 90 characters prefix + 8 chars. of "-dynamic"
v2 := GenerateVolumeName(prefix, "pv-cinder-abcde", 100)
expect := prefix[:84] + "-pv-cinder-abcde"
if v2 != expect {
t.Errorf("Expected %s, got %s", expect, v2)
}
// Truncate really long cluster name
prefix = strings.Repeat("0123456789", 1000) // 10000 characters prefix
v3 := GenerateVolumeName(prefix, "pv-cinder-abcde", 100)
if v3 != expect {
t.Errorf("Expected %s, got %s", expect, v3)
}
}
func TestHasMountRefs(t *testing.T) {
testCases := map[string]struct {
mountPath string