Merge pull request #24947 from hpcloud/hpe/vsphere-volume
Automatic merge from submit-queue vSphere Volume Plugin Implementation This PR implements vSphere Volume plugin support in Kubernetes (ref. issue #23932).
This commit is contained in:
419
pkg/volume/vsphere_volume/vsphere_volume.go
Normal file
419
pkg/volume/vsphere_volume/vsphere_volume.go
Normal file
@@ -0,0 +1,419 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package vsphere_volume
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
utilstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
// This is the primary entrypoint for volume plugins.
|
||||
func ProbeVolumePlugins() []volume.VolumePlugin {
|
||||
return []volume.VolumePlugin{&vsphereVolumePlugin{}}
|
||||
}
|
||||
|
||||
type vsphereVolumePlugin struct {
|
||||
host volume.VolumeHost
|
||||
}
|
||||
|
||||
var _ volume.VolumePlugin = &vsphereVolumePlugin{}
|
||||
var _ volume.PersistentVolumePlugin = &vsphereVolumePlugin{}
|
||||
var _ volume.DeletableVolumePlugin = &vsphereVolumePlugin{}
|
||||
var _ volume.ProvisionableVolumePlugin = &vsphereVolumePlugin{}
|
||||
|
||||
const (
|
||||
vsphereVolumePluginName = "kubernetes.io/vsphere-volume"
|
||||
)
|
||||
|
||||
// vSphere Volume Plugin
|
||||
func (plugin *vsphereVolumePlugin) Init(host volume.VolumeHost) error {
|
||||
plugin.host = host
|
||||
return nil
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) Name() string {
|
||||
return vsphereVolumePluginName
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) CanSupport(spec *volume.Spec) bool {
|
||||
return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.VsphereVolume != nil) ||
|
||||
(spec.Volume != nil && spec.Volume.VsphereVolume != nil)
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) NewMounter(spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
|
||||
return plugin.newMounterInternal(spec, pod.UID, &VsphereDiskUtil{}, plugin.host.GetMounter())
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
||||
return plugin.newUnmounterInternal(volName, podUID, &VsphereDiskUtil{}, plugin.host.GetMounter())
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager vdManager, mounter mount.Interface) (volume.Mounter, error) {
|
||||
var vvol *api.VsphereVirtualDiskVolumeSource
|
||||
if spec.Volume != nil && spec.Volume.VsphereVolume != nil {
|
||||
vvol = spec.Volume.VsphereVolume
|
||||
} else {
|
||||
vvol = spec.PersistentVolume.Spec.VsphereVolume
|
||||
}
|
||||
|
||||
volPath := vvol.VolumePath
|
||||
fsType := vvol.FSType
|
||||
|
||||
return &vsphereVolumeMounter{
|
||||
vsphereVolume: &vsphereVolume{
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
volPath: volPath,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
},
|
||||
fsType: fsType,
|
||||
diskMounter: &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}}, nil
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager vdManager, mounter mount.Interface) (volume.Unmounter, error) {
|
||||
return &vsphereVolumeUnmounter{
|
||||
&vsphereVolume{
|
||||
podUID: podUID,
|
||||
volName: volName,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) getCloudProvider() (*vsphere.VSphere, error) {
|
||||
cloud := plugin.host.GetCloudProvider()
|
||||
if cloud == nil {
|
||||
glog.Errorf("Cloud provider not initialized properly")
|
||||
return nil, errors.New("Cloud provider not initialized properly")
|
||||
}
|
||||
|
||||
vs := cloud.(*vsphere.VSphere)
|
||||
if vs == nil {
|
||||
return nil, errors.New("Invalid cloud provider: expected vSphere")
|
||||
}
|
||||
return vs, nil
|
||||
}
|
||||
|
||||
// Abstract interface to disk operations.
|
||||
type vdManager interface {
|
||||
// Attaches the disk to the kubelet's host machine.
|
||||
AttachDisk(mounter *vsphereVolumeMounter, globalPDPath string) error
|
||||
// Detaches the disk from the kubelet's host machine.
|
||||
DetachDisk(unmounter *vsphereVolumeUnmounter) error
|
||||
// Creates a volume
|
||||
CreateVolume(provisioner *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeGB int, err error)
|
||||
// Deletes a volume
|
||||
DeleteVolume(deleter *vsphereVolumeDeleter) error
|
||||
}
|
||||
|
||||
// vspherePersistentDisk volumes are disk resources are attached to the kubelet's host machine and exposed to the pod.
|
||||
type vsphereVolume struct {
|
||||
volName string
|
||||
podUID types.UID
|
||||
// Unique identifier of the volume, used to find the disk resource in the provider.
|
||||
volPath string
|
||||
// Filesystem type, optional.
|
||||
fsType string
|
||||
//diskID for detach disk
|
||||
diskID string
|
||||
// Utility interface that provides API calls to the provider to attach/detach disks.
|
||||
manager vdManager
|
||||
// Mounter interface that provides system calls to mount the global path to the pod local path.
|
||||
mounter mount.Interface
|
||||
// diskMounter provides the interface that is used to mount the actual block device.
|
||||
diskMounter mount.Interface
|
||||
plugin *vsphereVolumePlugin
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
func detachDiskLogError(vv *vsphereVolume) {
|
||||
err := vv.manager.DetachDisk(&vsphereVolumeUnmounter{vv})
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to detach disk: %v (%v)", vv, err)
|
||||
}
|
||||
}
|
||||
|
||||
var _ volume.Mounter = &vsphereVolumeMounter{}
|
||||
|
||||
type vsphereVolumeMounter struct {
|
||||
*vsphereVolume
|
||||
fsType string
|
||||
diskMounter *mount.SafeFormatAndMount
|
||||
}
|
||||
|
||||
func (b *vsphereVolumeMounter) GetAttributes() volume.Attributes {
|
||||
return volume.Attributes{
|
||||
SupportsSELinux: true,
|
||||
}
|
||||
}
|
||||
|
||||
// SetUp attaches the disk and bind mounts to the volume path.
|
||||
func (b *vsphereVolumeMounter) SetUp(fsGroup *int64) error {
|
||||
return b.SetUpAt(b.GetPath(), fsGroup)
|
||||
}
|
||||
|
||||
// SetUp attaches the disk and bind mounts to the volume path.
|
||||
func (b *vsphereVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
|
||||
glog.V(5).Infof("vSphere volume setup %s to %s", b.volPath, dir)
|
||||
|
||||
// TODO: handle failed mounts here.
|
||||
notmnt, err := b.mounter.IsLikelyNotMountPoint(dir)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
glog.V(4).Infof("IsLikelyNotMountPoint failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if !notmnt {
|
||||
glog.V(4).Infof("Something is already mounted to target %s", dir)
|
||||
return nil
|
||||
}
|
||||
globalPDPath := makeGlobalPDPath(b.plugin.host, b.volPath)
|
||||
if err := b.manager.AttachDisk(b, globalPDPath); err != nil {
|
||||
glog.V(4).Infof("AttachDisk failed: %v", err)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("vSphere volume %s attached", b.volPath)
|
||||
|
||||
options := []string{"bind"}
|
||||
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
// TODO: we should really eject the attach/detach out into its own control loop.
|
||||
glog.V(4).Infof("Could not create directory %s: %v", dir, err)
|
||||
detachDiskLogError(b.vsphereVolume)
|
||||
return err
|
||||
}
|
||||
|
||||
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
|
||||
err = b.mounter.Mount(globalPDPath, dir, "", options)
|
||||
if err != nil {
|
||||
notmnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if !notmnt {
|
||||
if mntErr = b.mounter.Unmount(dir); mntErr != nil {
|
||||
glog.Errorf("Failed to unmount: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
notmnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if !notmnt {
|
||||
glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", b.GetPath())
|
||||
return err
|
||||
}
|
||||
}
|
||||
os.Remove(dir)
|
||||
detachDiskLogError(b.vsphereVolume)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("vSphere volume %s mounted to %s", b.volPath, dir)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ volume.Unmounter = &vsphereVolumeUnmounter{}
|
||||
|
||||
type vsphereVolumeUnmounter struct {
|
||||
*vsphereVolume
|
||||
}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
func (v *vsphereVolumeUnmounter) TearDown() error {
|
||||
return v.TearDownAt(v.GetPath())
|
||||
}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
func (v *vsphereVolumeUnmounter) TearDownAt(dir string) error {
|
||||
glog.V(5).Infof("vSphere Volume TearDown of %s", dir)
|
||||
notmnt, err := v.mounter.IsLikelyNotMountPoint(dir)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Error checking if mountpoint ", dir, ": ", err)
|
||||
return err
|
||||
}
|
||||
if notmnt {
|
||||
glog.V(4).Infof("Not mount point,deleting")
|
||||
return os.Remove(dir)
|
||||
}
|
||||
|
||||
// Find vSphere volumeID to lock the right volume
|
||||
refs, err := mount.GetMountRefs(v.mounter, dir)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Error getting mountrefs for ", dir, ": ", err)
|
||||
return err
|
||||
}
|
||||
if len(refs) == 0 {
|
||||
glog.V(4).Infof("Directory %s is not mounted", dir)
|
||||
return fmt.Errorf("directory %s is not mounted", dir)
|
||||
}
|
||||
|
||||
v.volPath = path.Base(refs[0])
|
||||
glog.V(4).Infof("Found volume %s mounted to %s", v.volPath, dir)
|
||||
|
||||
// Reload list of references, there might be SetUpAt finished in the meantime
|
||||
refs, err = mount.GetMountRefs(v.mounter, dir)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("GetMountRefs failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if err := v.mounter.Unmount(dir); err != nil {
|
||||
glog.V(4).Infof("Unmount failed: %v", err)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("Successfully unmounted: %s\n", dir)
|
||||
|
||||
// If refCount is 1, then all bind mounts have been removed, and the
|
||||
// remaining reference is the global mount. It is safe to detach.
|
||||
if len(refs) == 1 {
|
||||
if err := v.manager.DetachDisk(v); err != nil {
|
||||
glog.V(4).Infof("DetachDisk failed: %v", err)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("Volume %s detached", v.volPath)
|
||||
}
|
||||
notmnt, mntErr := v.mounter.IsLikelyNotMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if notmnt {
|
||||
if err := os.Remove(dir); err != nil {
|
||||
glog.V(4).Infof("Failed to remove directory after unmount: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeGlobalPDPath(host volume.VolumeHost, devName string) string {
|
||||
return path.Join(host.GetPluginDir(vsphereVolumePluginName), "mounts", devName)
|
||||
}
|
||||
|
||||
func (vv *vsphereVolume) GetPath() string {
|
||||
name := vsphereVolumePluginName
|
||||
return vv.plugin.host.GetPodVolumeDir(vv.podUID, utilstrings.EscapeQualifiedNameForDisk(name), vv.volName)
|
||||
}
|
||||
|
||||
// vSphere Persistent Volume Plugin
|
||||
func (plugin *vsphereVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMode {
|
||||
return []api.PersistentVolumeAccessMode{
|
||||
api.ReadWriteOnce,
|
||||
}
|
||||
}
|
||||
|
||||
// vSphere Deletable Volume Plugin
|
||||
type vsphereVolumeDeleter struct {
|
||||
*vsphereVolume
|
||||
}
|
||||
|
||||
var _ volume.Deleter = &vsphereVolumeDeleter{}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
|
||||
return plugin.newDeleterInternal(spec, &VsphereDiskUtil{})
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) newDeleterInternal(spec *volume.Spec, manager vdManager) (volume.Deleter, error) {
|
||||
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.VsphereVolume == nil {
|
||||
return nil, fmt.Errorf("spec.PersistentVolumeSource.VsphereVolume is nil")
|
||||
}
|
||||
return &vsphereVolumeDeleter{
|
||||
&vsphereVolume{
|
||||
volName: spec.Name(),
|
||||
volPath: spec.PersistentVolume.Spec.VsphereVolume.VolumePath,
|
||||
manager: manager,
|
||||
plugin: plugin,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (r *vsphereVolumeDeleter) Delete() error {
|
||||
return r.manager.DeleteVolume(r)
|
||||
}
|
||||
|
||||
// vSphere Provisionable Volume Plugin
|
||||
type vsphereVolumeProvisioner struct {
|
||||
*vsphereVolume
|
||||
options volume.VolumeOptions
|
||||
}
|
||||
|
||||
var _ volume.Provisioner = &vsphereVolumeProvisioner{}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
|
||||
if len(options.AccessModes) == 0 {
|
||||
options.AccessModes = plugin.GetAccessModes()
|
||||
}
|
||||
return plugin.newProvisionerInternal(options, &VsphereDiskUtil{})
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager vdManager) (volume.Provisioner, error) {
|
||||
return &vsphereVolumeProvisioner{
|
||||
vsphereVolume: &vsphereVolume{
|
||||
manager: manager,
|
||||
plugin: plugin,
|
||||
},
|
||||
options: options,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *vsphereVolumeProvisioner) Provision() (*api.PersistentVolume, error) {
|
||||
vmDiskPath, sizeKB, err := v.manager.CreateVolume(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pv := &api.PersistentVolume{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: v.options.PVName,
|
||||
Labels: map[string]string{},
|
||||
Annotations: map[string]string{
|
||||
"kubernetes.io/createdby": "vsphere-volume-dynamic-provisioner",
|
||||
},
|
||||
},
|
||||
Spec: api.PersistentVolumeSpec{
|
||||
PersistentVolumeReclaimPolicy: v.options.PersistentVolumeReclaimPolicy,
|
||||
AccessModes: v.options.AccessModes,
|
||||
Capacity: api.ResourceList{
|
||||
api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dKi", sizeKB)),
|
||||
},
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
|
||||
VolumePath: vmDiskPath,
|
||||
FSType: "ext4",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return pv, nil
|
||||
}
|
219
pkg/volume/vsphere_volume/vsphere_volume_test.go
Normal file
219
pkg/volume/vsphere_volume/vsphere_volume_test.go
Normal file
@@ -0,0 +1,219 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package vsphere_volume
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||
)
|
||||
|
||||
func TestCanSupport(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("vsphereVolumeTest")
|
||||
if err != nil {
|
||||
t.Fatalf("can't make a temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPluginByName("kubernetes.io/vsphere-volume")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
if plug.Name() != "kubernetes.io/vsphere-volume" {
|
||||
t.Errorf("Wrong name: %s", plug.Name())
|
||||
}
|
||||
|
||||
if !plug.CanSupport(&volume.Spec{Volume: &api.Volume{VolumeSource: api.VolumeSource{VsphereVolume: &api.VsphereVirtualDiskVolumeSource{}}}}) {
|
||||
t.Errorf("Expected true")
|
||||
}
|
||||
|
||||
if !plug.CanSupport(&volume.Spec{PersistentVolume: &api.PersistentVolume{Spec: api.PersistentVolumeSpec{PersistentVolumeSource: api.PersistentVolumeSource{VsphereVolume: &api.VsphereVirtualDiskVolumeSource{}}}}}) {
|
||||
t.Errorf("Expected true")
|
||||
}
|
||||
}
|
||||
|
||||
type fakePDManager struct {
|
||||
attachCalled bool
|
||||
detachCalled bool
|
||||
}
|
||||
|
||||
func getFakeDeviceName(host volume.VolumeHost, volPath string) string {
|
||||
return path.Join(host.GetPluginDir(vsphereVolumePluginName), "device", volPath)
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) AttachDisk(b *vsphereVolumeMounter, globalPDPath string) error {
|
||||
fakeDeviceName := getFakeDeviceName(b.plugin.host, b.volPath)
|
||||
err := os.MkdirAll(fakeDeviceName, 0750)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.attachCalled = true
|
||||
// Simulate the global mount so that the fakeMounter returns the
|
||||
// expected number of mounts for the attached disk.
|
||||
err = b.mounter.Mount(fakeDeviceName, globalPDPath, "", []string{"bind"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) DetachDisk(v *vsphereVolumeUnmounter) error {
|
||||
globalPath := makeGlobalPDPath(v.plugin.host, v.volPath)
|
||||
fakeDeviceName := getFakeDeviceName(v.plugin.host, v.volPath)
|
||||
err := v.mounter.Unmount(globalPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// "Detach" the fake "device"
|
||||
err = os.RemoveAll(fakeDeviceName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.detachCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, err error) {
|
||||
return "[local] test-volume-name.vmdk", 100, nil
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) DeleteVolume(vd *vsphereVolumeDeleter) error {
|
||||
if vd.volPath != "[local] test-volume-name.vmdk" {
|
||||
return fmt.Errorf("Deleter got unexpected volume path: %s", vd.volPath)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPlugin(t *testing.T) {
|
||||
// Initial setup to test volume plugin
|
||||
tmpDir, err := utiltesting.MkTmpdir("vsphereVolumeTest")
|
||||
if err != nil {
|
||||
t.Fatalf("can't make a temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPluginByName("kubernetes.io/vsphere-volume")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
|
||||
spec := &api.Volume{
|
||||
Name: "vol1",
|
||||
VolumeSource: api.VolumeSource{
|
||||
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
|
||||
VolumePath: "[local] test-volume-name.vmdk",
|
||||
FSType: "ext4",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Test Mounter
|
||||
fakeManager := &fakePDManager{}
|
||||
fakeMounter := &mount.FakeMounter{}
|
||||
mounter, err := plug.(*vsphereVolumePlugin).newMounterInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), fakeManager, fakeMounter)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Mounter: %v", err)
|
||||
}
|
||||
if mounter == nil {
|
||||
t.Errorf("Got a nil Mounter")
|
||||
}
|
||||
|
||||
mntPath := path.Join(tmpDir, "pods/poduid/volumes/kubernetes.io~vsphere-volume/vol1")
|
||||
path := mounter.GetPath()
|
||||
if path != mntPath {
|
||||
t.Errorf("Got unexpected path: %s", path)
|
||||
}
|
||||
|
||||
if err := mounter.SetUp(nil); err != nil {
|
||||
t.Errorf("Expected success, got: %v", err)
|
||||
}
|
||||
|
||||
if !fakeManager.attachCalled {
|
||||
t.Errorf("Attach watch not called")
|
||||
}
|
||||
|
||||
// Test Unmounter
|
||||
fakeManager = &fakePDManager{}
|
||||
unmounter, err := plug.(*vsphereVolumePlugin).newUnmounterInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Unmounter: %v", err)
|
||||
}
|
||||
if unmounter == nil {
|
||||
t.Errorf("Got a nil Unmounter")
|
||||
}
|
||||
|
||||
if err := unmounter.TearDown(); err != nil {
|
||||
t.Errorf("Expected success, got: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
t.Errorf("TearDown() failed, volume path still exists: %s", path)
|
||||
} else if !os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
if !fakeManager.detachCalled {
|
||||
t.Errorf("Detach watch not called")
|
||||
}
|
||||
|
||||
// Test Provisioner
|
||||
cap := resource.MustParse("100Mi")
|
||||
options := volume.VolumeOptions{
|
||||
Capacity: cap,
|
||||
AccessModes: []api.PersistentVolumeAccessMode{
|
||||
api.ReadWriteOnce,
|
||||
},
|
||||
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
|
||||
}
|
||||
provisioner, err := plug.(*vsphereVolumePlugin).newProvisionerInternal(options, &fakePDManager{})
|
||||
persistentSpec, err := provisioner.Provision()
|
||||
if err != nil {
|
||||
t.Errorf("Provision() failed: %v", err)
|
||||
}
|
||||
|
||||
if persistentSpec.Spec.PersistentVolumeSource.VsphereVolume.VolumePath != "[local] test-volume-name.vmdk" {
|
||||
t.Errorf("Provision() returned unexpected path %s", persistentSpec.Spec.PersistentVolumeSource.VsphereVolume.VolumePath)
|
||||
}
|
||||
|
||||
cap = persistentSpec.Spec.Capacity[api.ResourceStorage]
|
||||
size := cap.Value()
|
||||
if size != 100*1024 {
|
||||
t.Errorf("Provision() returned unexpected volume size: %v", size)
|
||||
}
|
||||
|
||||
// Test Deleter
|
||||
volSpec := &volume.Spec{
|
||||
PersistentVolume: persistentSpec,
|
||||
}
|
||||
deleter, err := plug.(*vsphereVolumePlugin).newDeleterInternal(volSpec, &fakePDManager{})
|
||||
err = deleter.Delete()
|
||||
if err != nil {
|
||||
t.Errorf("Deleter() failed: %v", err)
|
||||
}
|
||||
}
|
199
pkg/volume/vsphere_volume/vsphere_volume_util.go
Normal file
199
pkg/volume/vsphere_volume/vsphere_volume_util.go
Normal file
@@ -0,0 +1,199 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package vsphere_volume
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetries = 10
|
||||
)
|
||||
|
||||
// Singleton key mutex for keeping attach/detach operations for the same PD atomic
|
||||
var attachDetachMutex = keymutex.NewKeyMutex()
|
||||
|
||||
type VsphereDiskUtil struct{}
|
||||
|
||||
// Attaches a disk to the current kubelet.
|
||||
// Mounts the disk to it's global path.
|
||||
func (util *VsphereDiskUtil) AttachDisk(vm *vsphereVolumeMounter, globalPDPath string) error {
|
||||
options := []string{}
|
||||
|
||||
// Block execution until any pending attach/detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(vm.volPath)
|
||||
defer attachDetachMutex.UnlockKey(vm.volPath)
|
||||
|
||||
cloud, err := vm.plugin.getCloudProvider()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
diskID, diskUUID, attachError := cloud.AttachDisk(vm.volPath, "")
|
||||
if attachError != nil {
|
||||
return err
|
||||
} else if diskUUID == "" {
|
||||
return errors.New("Disk UUID has no value")
|
||||
}
|
||||
|
||||
// diskID for detach Disk
|
||||
vm.diskID = diskID
|
||||
|
||||
var devicePath string
|
||||
numTries := 0
|
||||
for {
|
||||
devicePath = verifyDevicePath(diskUUID)
|
||||
// probe the attached vol so that symlink in /dev/disk/by-id is created
|
||||
probeAttachedVolume()
|
||||
|
||||
_, err := os.Stat(devicePath)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
numTries++
|
||||
if numTries == maxRetries {
|
||||
return errors.New("Could not attach disk: Timeout after 60s")
|
||||
}
|
||||
time.Sleep(time.Second * 60)
|
||||
}
|
||||
|
||||
notMnt, err := vm.mounter.IsLikelyNotMountPoint(globalPDPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
notMnt = true
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if notMnt {
|
||||
err = vm.diskMounter.FormatAndMount(devicePath, globalPDPath, vm.fsType, options)
|
||||
if err != nil {
|
||||
os.Remove(globalPDPath)
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Safe mount successful: %q\n", devicePath)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyDevicePath(diskUUID string) string {
|
||||
files, _ := ioutil.ReadDir("/dev/disk/by-id/")
|
||||
for _, f := range files {
|
||||
// TODO: should support other controllers
|
||||
if strings.Contains(f.Name(), "scsi-") {
|
||||
devID := f.Name()[len("scsi-"):len(f.Name())]
|
||||
if strings.Contains(diskUUID, devID) {
|
||||
glog.V(4).Infof("Found disk attached as %q; full devicepath: %s\n", f.Name(), path.Join("/dev/disk/by-id/", f.Name()))
|
||||
return path.Join("/dev/disk/by-id/", f.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.Warningf("Failed to find device for the diskid: %q\n", diskUUID)
|
||||
return ""
|
||||
}
|
||||
|
||||
func probeAttachedVolume() error {
|
||||
executor := exec.New()
|
||||
args := []string{"trigger"}
|
||||
cmd := executor.Command("/usr/bin/udevadm", args...)
|
||||
_, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
glog.Errorf("error running udevadm trigger %v\n", err)
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof("Successfully probed all attachments")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||
func (util *VsphereDiskUtil) DetachDisk(vu *vsphereVolumeUnmounter) error {
|
||||
|
||||
// Block execution until any pending attach/detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(vu.volPath)
|
||||
defer attachDetachMutex.UnlockKey(vu.volPath)
|
||||
|
||||
globalPDPath := makeGlobalPDPath(vu.plugin.host, vu.volPath)
|
||||
if err := vu.mounter.Unmount(globalPDPath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(globalPDPath); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully unmounted main device: %s\n", globalPDPath)
|
||||
|
||||
cloud, err := vu.plugin.getCloudProvider()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = cloud.DetachDisk(vu.diskID, ""); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully detached vSphere volume %s", vu.volPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateVolume creates a vSphere volume.
|
||||
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, err error) {
|
||||
cloud, err := v.plugin.getCloudProvider()
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
volSizeBytes := v.options.Capacity.Value()
|
||||
// vSphere works with kilobytes, convert to KiB with rounding up
|
||||
volSizeKB := int(volume.RoundUpSize(volSizeBytes, 1024))
|
||||
name := volume.GenerateVolumeName(v.options.ClusterName, v.options.PVName, 255)
|
||||
vmDiskPath, err = cloud.CreateVolume(name, volSizeKB, v.options.CloudTags)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Error creating vsphere volume: %v", err)
|
||||
return "", 0, err
|
||||
}
|
||||
glog.V(2).Infof("Successfully created vsphere volume %s", name)
|
||||
return vmDiskPath, volSizeKB, nil
|
||||
}
|
||||
|
||||
// DeleteVolume deletes a vSphere volume.
|
||||
func (util *VsphereDiskUtil) DeleteVolume(vd *vsphereVolumeDeleter) error {
|
||||
cloud, err := vd.plugin.getCloudProvider()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = cloud.DeleteVolume(vd.volPath); err != nil {
|
||||
glog.V(2).Infof("Error deleting vsphere volume %s: %v", vd.volPath, err)
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully deleted vsphere volume %s", vd.volPath)
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user