Merge pull request #25457 from saad-ali/expectedStateOfWorldDataStructure

Automatic merge from submit-queue

Attach Detach Controller Business Logic

This PR adds the meat of the attach/detach controller proposed in #20262.

The PR splits the in-memory cache into a desired and actual state of the world.
This commit is contained in:
k8s-merge-robot
2016-05-26 00:41:54 -07:00
26 changed files with 3905 additions and 1074 deletions

View File

@@ -50,6 +50,24 @@ func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) {
return &gcePersistentDiskAttacher{host: plugin.host}, nil
}
func (plugin *gcePersistentDiskPlugin) GetUniqueVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _ := getVolumeSource(spec)
if volumeSource == nil {
return "", fmt.Errorf("Spec does not reference a GCE volume type")
}
return fmt.Sprintf("%s/%s:%v", gcePersistentDiskPluginName, volumeSource.PDName, volumeSource.ReadOnly), nil
}
func (plugin *gcePersistentDiskPlugin) GetDeviceName(spec *volume.Spec) (string, error) {
volumeSource, _ := getVolumeSource(spec)
if volumeSource == nil {
return "", fmt.Errorf("Spec does not reference a GCE volume type")
}
return volumeSource.PDName, nil
}
func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, hostName string) error {
volumeSource, readOnly := getVolumeSource(spec)
pdName := volumeSource.PDName

View File

@@ -133,6 +133,18 @@ type AttachableVolumePlugin interface {
VolumePlugin
NewAttacher() (Attacher, error)
NewDetacher() (Detacher, error)
// GetUniqueVolumeName returns a unique name representing the volume
// defined in spec. e.g. pluginname-deviceName-readwrite
// This helps ensures that the same operation (attach/detach) is never
// started on the same volume.
// If the plugin does not support the given spec, this returns an error.
GetUniqueVolumeName(spec *Spec) (string, error)
// GetDeviceName returns the name or ID of the device referenced in the
// specified volume spec. This is passed by callers to the Deatch method.
// If the plugin does not support the given spec, this returns an error.
GetDeviceName(spec *Spec) (string, error)
}
// VolumeHost is an interface that plugins can use to access the kubelet.

View File

@@ -23,6 +23,7 @@ import (
"os/exec"
"path"
"strings"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
@@ -132,6 +133,7 @@ func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
// Use as:
// volume.RegisterPlugin(&FakePlugin{"fake-name"})
type FakeVolumePlugin struct {
sync.RWMutex
PluginName string
Host VolumeHost
Config VolumeConfig
@@ -158,11 +160,15 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
}
func (plugin *FakeVolumePlugin) Init(host VolumeHost) error {
plugin.Lock()
defer plugin.Unlock()
plugin.Host = host
return nil
}
func (plugin *FakeVolumePlugin) Name() string {
plugin.RLock()
defer plugin.RUnlock()
return plugin.PluginName
}
@@ -172,6 +178,8 @@ func (plugin *FakeVolumePlugin) CanSupport(spec *Spec) bool {
}
func (plugin *FakeVolumePlugin) NewMounter(spec *Spec, pod *api.Pod, opts VolumeOptions) (Mounter, error) {
plugin.Lock()
defer plugin.Unlock()
volume := plugin.getFakeVolume(&plugin.Mounters)
volume.PodUID = pod.UID
volume.VolName = spec.Name()
@@ -181,6 +189,8 @@ func (plugin *FakeVolumePlugin) NewMounter(spec *Spec, pod *api.Pod, opts Volume
}
func (plugin *FakeVolumePlugin) NewUnmounter(volName string, podUID types.UID) (Unmounter, error) {
plugin.Lock()
defer plugin.Unlock()
volume := plugin.getFakeVolume(&plugin.Unmounters)
volume.PodUID = podUID
volume.VolName = volName
@@ -190,15 +200,41 @@ func (plugin *FakeVolumePlugin) NewUnmounter(volName string, podUID types.UID) (
}
func (plugin *FakeVolumePlugin) NewAttacher() (Attacher, error) {
plugin.Lock()
defer plugin.Unlock()
plugin.NewAttacherCallCount = plugin.NewAttacherCallCount + 1
return plugin.getFakeVolume(&plugin.Attachers), nil
}
func (plugin *FakeVolumePlugin) GetNewAttacherCallCount() int {
plugin.RLock()
defer plugin.RUnlock()
return plugin.NewAttacherCallCount
}
func (plugin *FakeVolumePlugin) NewDetacher() (Detacher, error) {
plugin.Lock()
defer plugin.Unlock()
plugin.NewDetacherCallCount = plugin.NewDetacherCallCount + 1
return plugin.getFakeVolume(&plugin.Detachers), nil
}
func (plugin *FakeVolumePlugin) GetNewDetacherCallCount() int {
plugin.RLock()
defer plugin.RUnlock()
return plugin.NewDetacherCallCount
}
func (plugin *FakeVolumePlugin) GetUniqueVolumeName(spec *Spec) (string, error) {
plugin.RLock()
defer plugin.RUnlock()
return plugin.Name() + "/" + spec.Name(), nil
}
func (plugin *FakeVolumePlugin) GetDeviceName(spec *Spec) (string, error) {
return spec.Name(), nil
}
func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec) (Recycler, error) {
return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil
}
@@ -208,6 +244,8 @@ func (plugin *FakeVolumePlugin) NewDeleter(spec *Spec) (Deleter, error) {
}
func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provisioner, error) {
plugin.Lock()
defer plugin.Unlock()
plugin.LastProvisionerOptions = options
return &FakeProvisioner{options, plugin.Host}, nil
}
@@ -217,6 +255,7 @@ func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMod
}
type FakeVolume struct {
sync.RWMutex
PodUID types.UID
VolName string
Plugin *FakeVolumePlugin
@@ -242,8 +281,10 @@ func (_ *FakeVolume) GetAttributes() Attributes {
}
func (fv *FakeVolume) SetUp(fsGroup *int64) error {
fv.Lock()
defer fv.Unlock()
fv.SetUpCallCount++
return fv.SetUpAt(fv.GetPath(), fsGroup)
return fv.SetUpAt(fv.getPath(), fsGroup)
}
func (fv *FakeVolume) SetUpAt(dir string, fsGroup *int64) error {
@@ -251,12 +292,20 @@ func (fv *FakeVolume) SetUpAt(dir string, fsGroup *int64) error {
}
func (fv *FakeVolume) GetPath() string {
fv.RLock()
defer fv.RUnlock()
return fv.getPath()
}
func (fv *FakeVolume) getPath() string {
return path.Join(fv.Plugin.Host.GetPodVolumeDir(fv.PodUID, utilstrings.EscapeQualifiedNameForDisk(fv.Plugin.PluginName), fv.VolName))
}
func (fv *FakeVolume) TearDown() error {
fv.Lock()
defer fv.Unlock()
fv.TearDownCallCount++
return fv.TearDownAt(fv.GetPath())
return fv.TearDownAt(fv.getPath())
}
func (fv *FakeVolume) TearDownAt(dir string) error {
@@ -264,36 +313,62 @@ func (fv *FakeVolume) TearDownAt(dir string) error {
}
func (fv *FakeVolume) Attach(spec *Spec, hostName string) error {
fv.Lock()
defer fv.Unlock()
fv.AttachCallCount++
return nil
}
func (fv *FakeVolume) GetAttachCallCount() int {
fv.RLock()
defer fv.RUnlock()
return fv.AttachCallCount
}
func (fv *FakeVolume) WaitForAttach(spec *Spec, spectimeout time.Duration) (string, error) {
fv.Lock()
defer fv.Unlock()
fv.WaitForAttachCallCount++
return "", nil
}
func (fv *FakeVolume) GetDeviceMountPath(spec *Spec) string {
fv.Lock()
defer fv.Unlock()
fv.GetDeviceMountPathCallCount++
return ""
}
func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath string, mounter mount.Interface) error {
fv.Lock()
defer fv.Unlock()
fv.MountDeviceCallCount++
return nil
}
func (fv *FakeVolume) Detach(deviceMountPath string, hostName string) error {
fv.Lock()
defer fv.Unlock()
fv.DetachCallCount++
return nil
}
func (fv *FakeVolume) GetDetachCallCount() int {
fv.RLock()
defer fv.RUnlock()
return fv.DetachCallCount
}
func (fv *FakeVolume) WaitForDetach(devicePath string, timeout time.Duration) error {
fv.Lock()
defer fv.Unlock()
fv.WaitForDetachCallCount++
return nil
}
func (fv *FakeVolume) UnmountDevice(globalMountPath string, mounter mount.Interface) error {
fv.Lock()
defer fv.Unlock()
fv.UnmountDeviceCallCount++
return nil
}