Refactor flocker volume plugin

* Support provisioning
* Support deletion
* Use bind mounts instead of /flocker in containers
* support ownership management or SELinux relabeling.
This commit is contained in:
Christian Simon 2016-09-27 13:19:45 +00:00
parent 6f2af39021
commit cd0897801b
8 changed files with 995 additions and 289 deletions

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/volume/azure_dd"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/flexvolume"
"k8s.io/kubernetes/pkg/volume/flocker"
"k8s.io/kubernetes/pkg/volume/gce_pd"
"k8s.io/kubernetes/pkg/volume/glusterfs"
"k8s.io/kubernetes/pkg/volume/host_path"
@ -108,6 +109,8 @@ func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config componen
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...)
if cloud != nil {
switch {
case aws.ProviderName == cloud.ProviderName():

View File

@ -0,0 +1,469 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flocker
import (
"fmt"
"os"
"path"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/env"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
flockerApi "github.com/clusterhq/flocker-go"
)
// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&flockerPlugin{nil}}
}
type flockerPlugin struct {
host volume.VolumeHost
}
type flockerVolume struct {
volName string
podUID types.UID
// dataset metadata name deprecated
datasetName string
// dataset uuid
datasetUUID string
//pod *api.Pod
flockerClient flockerApi.Clientable
manager volumeManager
plugin *flockerPlugin
mounter mount.Interface
volume.MetricsProvider
}
var _ volume.VolumePlugin = &flockerPlugin{}
var _ volume.PersistentVolumePlugin = &flockerPlugin{}
var _ volume.DeletableVolumePlugin = &flockerPlugin{}
var _ volume.ProvisionableVolumePlugin = &flockerPlugin{}
const (
flockerPluginName = "kubernetes.io/flocker"
defaultHost = "localhost"
defaultPort = 4523
defaultCACertFile = "/etc/flocker/cluster.crt"
defaultClientKeyFile = "/etc/flocker/apiuser.key"
defaultClientCertFile = "/etc/flocker/apiuser.crt"
defaultMountPath = "/flocker"
timeoutWaitingForVolume = 2 * time.Minute
tickerWaitingForVolume = 5 * time.Second
)
func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
return host.GetPodVolumeDir(uid, strings.EscapeQualifiedNameForDisk(flockerPluginName), volName)
}
func makeGlobalFlockerPath(datasetUUID string) string {
return path.Join(defaultMountPath, datasetUUID)
}
func (p *flockerPlugin) Init(host volume.VolumeHost) error {
p.host = host
return nil
}
func (p *flockerPlugin) GetPluginName() string {
return flockerPluginName
}
func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
return volumeSource.DatasetName, nil
}
func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool {
return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) ||
(spec.Volume != nil && spec.Volume.Flocker != nil)
}
func (p *flockerPlugin) RequiresRemount() bool {
return false
}
func (plugin *flockerPlugin) GetAccessModes() []api.PersistentVolumeAccessMode {
return []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
}
}
func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool) {
// AFAIK this will always be r/w, but perhaps for the future it will be needed
readOnly := false
if spec.Volume != nil && spec.Volume.Flocker != nil {
return spec.Volume.Flocker, readOnly
}
return spec.PersistentVolume.Spec.Flocker, readOnly
}
func (plugin *flockerPlugin) NewMounter(spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
// Inject real implementations here, test through the internal function.
return plugin.newMounterInternal(spec, pod.UID, &FlockerUtil{}, plugin.host.GetMounter())
}
func (plugin *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) {
volumeSource, readOnly, err := getVolumeSource(spec)
if err != nil {
return nil, err
}
datasetName := volumeSource.DatasetName
datasetUUID := volumeSource.DatasetUUID
return &flockerVolumeMounter{
flockerVolume: &flockerVolume{
podUID: podUID,
volName: spec.Name(),
datasetName: datasetName,
datasetUUID: datasetUUID,
mounter: mounter,
manager: manager,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
},
readOnly: readOnly}, nil
}
func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
// Inject real implementations here, test through the internal function.
return p.newUnmounterInternal(volName, podUID, &FlockerUtil{}, p.host.GetMounter())
}
func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) {
return &flockerVolumeUnmounter{&flockerVolume{
podUID: podUID,
volName: volName,
manager: manager,
mounter: mounter,
plugin: p,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)),
}}, nil
}
func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
flockerVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
Flocker: &api.FlockerVolumeSource{
DatasetName: volumeName,
},
},
}
return volume.NewSpecFromVolume(flockerVolume), nil
}
func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) {
// return UUID if set
if len(b.datasetUUID) > 0 {
return b.datasetUUID, nil
}
if b.flockerClient == nil {
return "", fmt.Errorf("Flocker client is not initialized")
}
// lookup in flocker API otherwise
return b.flockerClient.GetDatasetID(b.datasetName)
}
type flockerVolumeMounter struct {
*flockerVolume
readOnly bool
}
func (b *flockerVolumeMounter) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: b.readOnly,
Managed: false,
SupportsSELinux: false,
}
}
func (b *flockerVolumeMounter) GetPath() string {
return getPath(b.podUID, b.volName, b.plugin.host)
}
// SetUp bind mounts the disk global mount to the volume path.
func (b *flockerVolumeMounter) SetUp(fsGroup *int64) error {
return b.SetUpAt(b.GetPath(), fsGroup)
}
// newFlockerClient uses environment variables and pod attributes to return a
// flocker client capable of talking with the Flocker control service.
func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerApi.Client, error) {
host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost)
port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort)
if err != nil {
return nil, err
}
caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile)
keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile)
certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile)
c, err := flockerApi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath)
return c, err
}
func (b *flockerVolumeMounter) newFlockerClient() (*flockerApi.Client, error) {
hostIP, err := b.plugin.host.GetHostIP()
if err != nil {
return nil, err
}
return b.plugin.newFlockerClient(hostIP.String())
}
/*
SetUpAt will setup a Flocker volume following this flow of calls to the Flocker
control service:
1. Get the dataset id for the given volume name/dir
2. It should already be there, if it's not the user needs to manually create it
3. Check the current Primary UUID
4. If it doesn't match with the Primary UUID that we got on 2, then we will
need to update the Primary UUID for this volume.
5. Wait until the Primary UUID was updated or timeout.
*/
func (b *flockerVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
var err error
if b.flockerClient == nil {
b.flockerClient, err = b.newFlockerClient()
if err != nil {
return err
}
}
datasetUUID, err := b.GetDatasetUUID()
if err != nil {
return fmt.Errorf("The datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err)
}
datasetState, err := b.flockerClient.GetDatasetState(datasetUUID)
if err != nil {
return fmt.Errorf("The datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err)
}
primaryUUID, err := b.flockerClient.GetPrimaryUUID()
if err != nil {
return err
}
if datasetState.Primary != primaryUUID {
if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil {
return err
}
_, err := b.flockerClient.GetDatasetState(datasetUUID)
if err != nil {
return fmt.Errorf("The volume with datasetUUID='%s' migrated unsuccessfully.", datasetUUID)
}
}
// TODO: handle failed mounts here.
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
glog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mount point: %s %v", dir, err)
return err
}
if !notMnt {
return nil
}
if err := os.MkdirAll(dir, 0750); err != nil {
glog.Errorf("mkdir failed on disk %s (%v)", dir, err)
return err
}
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
options := []string{"bind"}
if b.readOnly {
options = append(options, "ro")
}
globalFlockerPath := makeGlobalFlockerPath(datasetUUID)
glog.V(4).Infof("attempting to mount %s", dir)
err = b.mounter.Mount(globalFlockerPath, dir, "", options)
if err != nil {
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if !notMnt {
if mntErr = b.mounter.Unmount(dir); mntErr != nil {
glog.Errorf("failed to unmount: %v", mntErr)
return err
}
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if !notMnt {
// This is very odd, we don't expect it. We'll try again next sync loop.
glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
return err
}
}
os.Remove(dir)
glog.Errorf("mount of disk %s failed: %v", dir, err)
return err
}
if !b.readOnly {
volume.SetVolumeOwnership(b, fsGroup)
}
glog.V(4).Infof("successfully mounted %s", dir)
return nil
}
// updateDatasetPrimary will update the primary in Flocker and wait for it to
// be ready. If it never gets to ready state it will timeout and error.
func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error {
// We need to update the primary and wait for it to be ready
_, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID)
if err != nil {
return err
}
timeoutChan := time.NewTimer(timeoutWaitingForVolume)
defer timeoutChan.Stop()
tickChan := time.NewTicker(tickerWaitingForVolume)
defer tickChan.Stop()
for {
if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID {
return nil
}
select {
case <-timeoutChan.C:
return fmt.Errorf(
"Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v",
datasetUUID, primaryUUID, err,
)
case <-tickChan.C:
break
}
}
}
func getVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool, error) {
if spec.Volume != nil && spec.Volume.Flocker != nil {
return spec.Volume.Flocker, spec.ReadOnly, nil
} else if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.Flocker != nil {
return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil
}
return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type")
}
type flockerVolumeUnmounter struct {
*flockerVolume
}
var _ volume.Unmounter = &flockerVolumeUnmounter{}
func (c *flockerVolumeUnmounter) GetPath() string {
return getPath(c.podUID, c.volName, c.plugin.host)
}
// Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet.
func (c *flockerVolumeUnmounter) TearDown() error {
return c.TearDownAt(c.GetPath())
}
// TearDownAt unmounts the bind mount
func (c *flockerVolumeUnmounter) TearDownAt(dir string) error {
notMnt, err := c.mounter.IsLikelyNotMountPoint(dir)
if err != nil {
return err
}
if notMnt {
return os.Remove(dir)
}
if err := c.mounter.Unmount(dir); err != nil {
return err
}
notMnt, mntErr := c.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if notMnt {
return os.Remove(dir)
}
return fmt.Errorf("Failed to unmount volume dir")
}
func (plugin *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
return plugin.newDeleterInternal(spec, &FlockerUtil{})
}
func (plugin *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil")
}
return &flockerVolumeDeleter{
flockerVolume: &flockerVolume{
volName: spec.Name(),
datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName,
datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID,
manager: manager,
}}, nil
}
func (plugin *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
if len(options.AccessModes) == 0 {
options.AccessModes = plugin.GetAccessModes()
}
return plugin.newProvisionerInternal(options, &FlockerUtil{})
}
func (plugin *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) {
return &flockerVolumeProvisioner{
flockerVolume: &flockerVolume{
manager: manager,
plugin: plugin,
},
options: options,
}, nil
}

View File

@ -17,19 +17,111 @@ limitations under the License.
package flocker
import (
"fmt"
"os"
"testing"
flockerclient "github.com/ClusterHQ/flocker-go"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
flockerApi "github.com/clusterhq/flocker-go"
"github.com/stretchr/testify/assert"
)
const pluginName = "kubernetes.io/flocker"
const datasetOneID = "11111111-1111-1111-1111-111111111100"
const nodeOneID = "11111111-1111-1111-1111-111111111111"
const nodeTwoID = "22222222-2222-2222-2222-222222222222"
var _ flockerApi.Clientable = &fakeFlockerClient{}
type fakeFlockerClient struct {
DatasetID string
Primary string
Deleted bool
Metadata map[string]string
Nodes []flockerApi.NodeState
Error error
}
func newFakeFlockerClient() *fakeFlockerClient {
return &fakeFlockerClient{
DatasetID: datasetOneID,
Primary: nodeOneID,
Deleted: false,
Metadata: map[string]string{"Name": "dataset-one"},
Nodes: []flockerApi.NodeState{
{
Host: "1.2.3.4",
UUID: nodeOneID,
},
{
Host: "4.5.6.7",
UUID: nodeTwoID,
},
},
}
}
func (c *fakeFlockerClient) CreateDataset(options *flockerApi.CreateDatasetOptions) (*flockerApi.DatasetState, error) {
if c.Error != nil {
return nil, c.Error
}
return &flockerApi.DatasetState{
DatasetID: c.DatasetID,
}, nil
}
func (c *fakeFlockerClient) DeleteDataset(datasetID string) error {
c.DatasetID = datasetID
c.Deleted = true
return nil
}
func (c *fakeFlockerClient) GetDatasetState(datasetID string) (*flockerApi.DatasetState, error) {
return &flockerApi.DatasetState{}, nil
}
func (c *fakeFlockerClient) GetDatasetID(metaName string) (datasetID string, err error) {
if val, ok := c.Metadata["Name"]; !ok {
return val, nil
}
return "", fmt.Errorf("No dataset with metadata X found")
}
func (c *fakeFlockerClient) GetPrimaryUUID() (primaryUUID string, err error) {
return
}
func (c *fakeFlockerClient) ListNodes() (nodes []flockerApi.NodeState, err error) {
return c.Nodes, nil
}
func (c *fakeFlockerClient) UpdatePrimaryForDataset(primaryUUID, datasetID string) (*flockerApi.DatasetState, error) {
return &flockerApi.DatasetState{}, nil
}
type fakeFlockerUtil struct {
}
func (fake *fakeFlockerUtil) CreateVolume(c *flockerVolumeProvisioner) (datasetUUID string, volumeSizeGB int, labels map[string]string, err error) {
labels = make(map[string]string)
labels["fakeflockerutil"] = "yes"
return "test-flocker-volume-uuid", 3, labels, nil
}
func (fake *fakeFlockerUtil) DeleteVolume(cd *flockerVolumeDeleter) error {
if cd.datasetUUID != "test-flocker-volume-uuid" {
return fmt.Errorf("Deleter got unexpected datasetUUID: %s", cd.datasetUUID)
}
return nil
}
func newInitializedVolumePlugMgr(t *testing.T) (*volume.VolumePluginMgr, string) {
plugMgr := &volume.VolumePluginMgr{}
@ -39,6 +131,38 @@ func newInitializedVolumePlugMgr(t *testing.T) (*volume.VolumePluginMgr, string)
return plugMgr, dir
}
func TestPlugin(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("flockerTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil, "" /* rootContext */))
plug, err := plugMgr.FindPluginByName("kubernetes.io/flocker")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
spec := &api.Volume{
Name: "vol1",
VolumeSource: api.VolumeSource{
Flocker: &api.FlockerVolumeSource{
DatasetUUID: "uuid1",
},
},
}
fakeManager := &fakeFlockerUtil{}
fakeMounter := &mount.FakeMounter{}
mounter, err := plug.(*flockerPlugin).newMounterInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), fakeManager, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
}
func TestGetByName(t *testing.T) {
assert := assert.New(t)
plugMgr, _ := newInitializedVolumePlugMgr(t)
@ -115,7 +239,7 @@ func TestGetFlockerVolumeSource(t *testing.T) {
assert.Equal(spec.PersistentVolume.Spec.Flocker, vs)
}
func TestNewMounter(t *testing.T) {
func TestNewMounterDatasetName(t *testing.T) {
assert := assert.New(t)
plugMgr, _ := newInitializedVolumePlugMgr(t)
@ -136,7 +260,31 @@ func TestNewMounter(t *testing.T) {
assert.NoError(err)
}
func TestNewMounterDatasetUUID(t *testing.T) {
assert := assert.New(t)
plugMgr, _ := newInitializedVolumePlugMgr(t)
plug, err := plugMgr.FindPluginByName(pluginName)
assert.NoError(err)
spec := &volume.Spec{
Volume: &api.Volume{
VolumeSource: api.VolumeSource{
Flocker: &api.FlockerVolumeSource{
DatasetUUID: "uuid1",
},
},
},
}
mounter, err := plug.NewMounter(spec, &api.Pod{}, volume.VolumeOptions{})
assert.NoError(err)
assert.NotNil(mounter, "got a nil mounter")
}
func TestNewUnmounter(t *testing.T) {
t.Skip("broken")
assert := assert.New(t)
p := flockerPlugin{}
@ -147,22 +295,13 @@ func TestNewUnmounter(t *testing.T) {
}
func TestIsReadOnly(t *testing.T) {
b := &flockerMounter{readOnly: true}
b := &flockerVolumeMounter{readOnly: true}
assert.True(t, b.GetAttributes().ReadOnly)
}
func TestGetPath(t *testing.T) {
const expectedPath = "/flocker/expected"
assert := assert.New(t)
b := flockerMounter{flocker: &flocker{path: expectedPath}}
assert.Equal(expectedPath, b.GetPath())
}
type mockFlockerClient struct {
datasetID, primaryUUID, path string
datasetState *flockerclient.DatasetState
datasetState *flockerApi.DatasetState
}
func newMockFlockerClient(mockDatasetID, mockPrimaryUUID, mockPath string) *mockFlockerClient {
@ -170,7 +309,7 @@ func newMockFlockerClient(mockDatasetID, mockPrimaryUUID, mockPath string) *mock
datasetID: mockDatasetID,
primaryUUID: mockPrimaryUUID,
path: mockPath,
datasetState: &flockerclient.DatasetState{
datasetState: &flockerApi.DatasetState{
Path: mockPath,
DatasetID: mockDatasetID,
Primary: mockPrimaryUUID,
@ -178,10 +317,10 @@ func newMockFlockerClient(mockDatasetID, mockPrimaryUUID, mockPath string) *mock
}
}
func (m mockFlockerClient) CreateDataset(metaName string) (*flockerclient.DatasetState, error) {
func (m mockFlockerClient) CreateDataset(metaName string) (*flockerApi.DatasetState, error) {
return m.datasetState, nil
}
func (m mockFlockerClient) GetDatasetState(datasetID string) (*flockerclient.DatasetState, error) {
func (m mockFlockerClient) GetDatasetState(datasetID string) (*flockerApi.DatasetState, error) {
return m.datasetState, nil
}
func (m mockFlockerClient) GetDatasetID(metaName string) (string, error) {
@ -190,10 +329,12 @@ func (m mockFlockerClient) GetDatasetID(metaName string) (string, error) {
func (m mockFlockerClient) GetPrimaryUUID() (string, error) {
return m.primaryUUID, nil
}
func (m mockFlockerClient) UpdatePrimaryForDataset(primaryUUID, datasetID string) (*flockerclient.DatasetState, error) {
func (m mockFlockerClient) UpdatePrimaryForDataset(primaryUUID, datasetID string) (*flockerApi.DatasetState, error) {
return m.datasetState, nil
}
/*
TODO: reenable after refactor
func TestSetUpAtInternal(t *testing.T) {
const dir = "dir"
mockPath := "expected-to-be-set-properly" // package var
@ -209,9 +350,10 @@ func TestSetUpAtInternal(t *testing.T) {
assert.NoError(err)
pod := &api.Pod{ObjectMeta: api.ObjectMeta{UID: types.UID("poduid")}}
b := flockerMounter{flocker: &flocker{pod: pod, plugin: plug.(*flockerPlugin)}}
b := flockerVolumeMounter{flockerVolume: &flockerVolume{pod: pod, plugin: plug.(*flockerPlugin)}}
b.client = newMockFlockerClient("dataset-id", "primary-uid", mockPath)
assert.NoError(b.SetUpAt(dir, nil))
assert.Equal(expectedPath, b.flocker.path)
}
*/

View File

@ -0,0 +1,94 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flocker
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/volume"
flockerApi "github.com/clusterhq/flocker-go"
"github.com/golang/glog"
)
type FlockerUtil struct{}
func (util *FlockerUtil) DeleteVolume(d *flockerVolumeDeleter) error {
var err error
if d.flockerClient == nil {
d.flockerClient, err = d.plugin.newFlockerClient("")
if err != nil {
return err
}
}
datasetUUID, err := d.GetDatasetUUID()
if err != nil {
return err
}
return d.flockerClient.DeleteDataset(datasetUUID)
}
func (util *FlockerUtil) CreateVolume(c *flockerVolumeProvisioner) (datasetUUID string, volumeSizeGB int, labels map[string]string, err error) {
if c.flockerClient == nil {
c.flockerClient, err = c.plugin.newFlockerClient("")
if err != nil {
return
}
}
nodes, err := c.flockerClient.ListNodes()
if err != nil {
return
}
if len(nodes) < 1 {
err = fmt.Errorf("no nodes found inside the flocker cluster to provision a dataset")
return
}
// select random node
rand.Seed(time.Now().UTC().UnixNano())
node := nodes[rand.Intn(len(nodes))]
glog.V(2).Infof("selected flocker node with UUID '%s' to provision dataset", node.UUID)
requestBytes := c.options.Capacity.Value()
volumeSizeGB = int(volume.RoundUpSize(requestBytes, 1024*1024*1024))
createOptions := &flockerApi.CreateDatasetOptions{
MaximumSize: requestBytes,
Metadata: map[string]string{
"type": "k8s-dynamic-prov",
"pvc": c.options.PVCName,
},
Primary: node.UUID,
}
datasetState, err := c.flockerClient.CreateDataset(createOptions)
if err != nil {
return
}
datasetUUID = datasetState.DatasetID
glog.V(2).Infof("successfully created Flocker dataset with UUID '%s'", datasetUUID)
return
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flocker
import (
"fmt"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume"
"github.com/stretchr/testify/assert"
)
func TestFlockerUtil_CreateVolume(t *testing.T) {
assert := assert.New(t)
// test CreateVolume happy path
options := volume.VolumeOptions{
Capacity: resource.MustParse("3Gi"),
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
}
fakeFlockerClient := newFakeFlockerClient()
provisioner := newTestableProvisioner(assert, options).(*flockerVolumeProvisioner)
provisioner.flockerClient = fakeFlockerClient
flockerUtil := &FlockerUtil{}
datasetID, size, _, err := flockerUtil.CreateVolume(provisioner)
assert.NoError(err)
assert.Equal(datasetOneID, datasetID)
assert.Equal(3, size)
// test error during CreateVolume
fakeFlockerClient.Error = fmt.Errorf("Do not feel like provisioning")
_, _, _, err = flockerUtil.CreateVolume(provisioner)
assert.Equal(fakeFlockerClient.Error.Error(), err.Error())
}

View File

@ -0,0 +1,102 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flocker
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume"
)
type volumeManager interface {
// Creates a volume
CreateVolume(provisioner *flockerVolumeProvisioner) (datasetUUID string, volumeSizeGB int, labels map[string]string, err error)
// Deletes a volume
DeleteVolume(deleter *flockerVolumeDeleter) error
}
type flockerVolumeDeleter struct {
*flockerVolume
}
var _ volume.Deleter = &flockerVolumeDeleter{}
func (b *flockerVolumeDeleter) GetPath() string {
return getPath(b.podUID, b.volName, b.plugin.host)
}
func (d *flockerVolumeDeleter) Delete() error {
return d.manager.DeleteVolume(d)
}
type flockerVolumeProvisioner struct {
*flockerVolume
options volume.VolumeOptions
}
var _ volume.Provisioner = &flockerVolumeProvisioner{}
func (c *flockerVolumeProvisioner) Provision() (*api.PersistentVolume, error) {
if len(c.options.Parameters) > 0 {
return nil, fmt.Errorf("Provisioning failed: Specified at least one unsupported parameter")
}
if c.options.Selector != nil {
return nil, fmt.Errorf("Provisioning failed: Specified unsupported selector")
}
datasetUUID, sizeGB, labels, err := c.manager.CreateVolume(c)
if err != nil {
return nil, err
}
pv := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: c.options.PVName,
Labels: map[string]string{},
Annotations: map[string]string{
"kubernetes.io/createdby": "flocker-dynamic-provisioner",
},
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: c.options.PersistentVolumeReclaimPolicy,
AccessModes: c.options.AccessModes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Flocker: &api.FlockerVolumeSource{
DatasetUUID: datasetUUID,
},
},
},
}
if len(labels) != 0 {
if pv.Labels == nil {
pv.Labels = make(map[string]string)
}
for k, v := range labels {
pv.Labels[k] = v
}
}
return pv, nil
}

View File

@ -0,0 +1,109 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flocker
import (
"fmt"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"github.com/stretchr/testify/assert"
)
func newTestableProvisioner(assert *assert.Assertions, options volume.VolumeOptions) volume.Provisioner {
tmpDir, err := utiltesting.MkTmpdir("flockervolumeTest")
assert.NoError(err, fmt.Sprintf("can't make a temp dir: %v", err))
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil, "" /* rootContext */))
plug, err := plugMgr.FindPluginByName(pluginName)
assert.NoError(err, "Can't find the plugin by name")
provisioner, err := plug.(*flockerPlugin).newProvisionerInternal(options, &fakeFlockerUtil{})
return provisioner
}
func TestProvision(t *testing.T) {
assert := assert.New(t)
cap := resource.MustParse("3Gi")
options := volume.VolumeOptions{
Capacity: cap,
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
}
provisioner := newTestableProvisioner(assert, options)
persistentSpec, err := provisioner.Provision()
assert.NoError(err, "Provision() failed: ", err)
cap = persistentSpec.Spec.Capacity[api.ResourceStorage]
assert.Equal(int64(3*1024*1024*1024), cap.Value())
assert.Equal(
"test-flocker-volume-uuid",
persistentSpec.Spec.PersistentVolumeSource.Flocker.DatasetUUID,
)
assert.Equal(
map[string]string{"fakeflockerutil": "yes"},
persistentSpec.Labels,
)
// parameters are not supported
options = volume.VolumeOptions{
Capacity: cap,
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
Parameters: map[string]string{
"not-supported-params": "test123",
},
}
provisioner = newTestableProvisioner(assert, options)
persistentSpec, err = provisioner.Provision()
assert.Error(err, "Provision() did not fail with Parameters specified")
// selectors are not supported
options = volume.VolumeOptions{
Capacity: cap,
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"key": "value"}},
}
provisioner = newTestableProvisioner(assert, options)
persistentSpec, err = provisioner.Provision()
assert.Error(err, "Provision() did not fail with Selector specified")
}

View File

@ -1,270 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flocker
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/env"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
flockerclient "github.com/ClusterHQ/flocker-go"
)
const (
flockerPluginName = "kubernetes.io/flocker"
defaultHost = "localhost"
defaultPort = 4523
defaultCACertFile = "/etc/flocker/cluster.crt"
defaultClientKeyFile = "/etc/flocker/apiuser.key"
defaultClientCertFile = "/etc/flocker/apiuser.crt"
timeoutWaitingForVolume = 2 * time.Minute
tickerWaitingForVolume = 5 * time.Second
)
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&flockerPlugin{}}
}
type flockerPlugin struct {
host volume.VolumeHost
}
type flocker struct {
datasetName string
path string
pod *api.Pod
mounter mount.Interface
plugin *flockerPlugin
}
func (p *flockerPlugin) Init(host volume.VolumeHost) error {
p.host = host
return nil
}
func (p *flockerPlugin) GetPluginName() string {
return flockerPluginName
}
func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
return volumeSource.DatasetName, nil
}
func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool {
return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) ||
(spec.Volume != nil && spec.Volume.Flocker != nil)
}
func (p *flockerPlugin) RequiresRemount() bool {
return false
}
func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool) {
// AFAIK this will always be r/w, but perhaps for the future it will be needed
readOnly := false
if spec.Volume != nil && spec.Volume.Flocker != nil {
return spec.Volume.Flocker, readOnly
}
return spec.PersistentVolume.Spec.Flocker, readOnly
}
func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
source, readOnly := p.getFlockerVolumeSource(spec)
mounter := flockerMounter{
flocker: &flocker{
datasetName: source.DatasetName,
pod: pod,
mounter: p.host.GetMounter(),
plugin: p,
},
exe: exec.New(),
opts: opts,
readOnly: readOnly,
}
return &mounter, nil
}
func (p *flockerPlugin) NewUnmounter(datasetName string, podUID types.UID) (volume.Unmounter, error) {
// Flocker agent will take care of this, there is nothing we can do here
return nil, nil
}
func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
flockerVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
Flocker: &api.FlockerVolumeSource{
DatasetName: volumeName,
},
},
}
return volume.NewSpecFromVolume(flockerVolume), nil
}
type flockerMounter struct {
*flocker
client flockerclient.Clientable
exe exec.Interface
opts volume.VolumeOptions
readOnly bool
volume.MetricsNil
}
func (b flockerMounter) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: b.readOnly,
Managed: false,
SupportsSELinux: false,
}
}
func (b flockerMounter) GetPath() string {
return b.flocker.path
}
func (b flockerMounter) SetUp(fsGroup *int64) error {
return b.SetUpAt(b.flocker.datasetName, fsGroup)
}
// newFlockerClient uses environment variables and pod attributes to return a
// flocker client capable of talking with the Flocker control service.
func (b flockerMounter) newFlockerClient() (*flockerclient.Client, error) {
host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost)
port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort)
if err != nil {
return nil, err
}
caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile)
keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile)
certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile)
hostIP, err := b.plugin.host.GetHostIP()
if err != nil {
return nil, err
}
c, err := flockerclient.NewClient(host, port, hostIP.String(), caCertPath, keyPath, certPath)
return c, err
}
/*
SetUpAt will setup a Flocker volume following this flow of calls to the Flocker
control service:
1. Get the dataset id for the given volume name/dir
2. It should already be there, if it's not the user needs to manually create it
3. Check the current Primary UUID
4. If it doesn't match with the Primary UUID that we got on 2, then we will
need to update the Primary UUID for this volume.
5. Wait until the Primary UUID was updated or timeout.
*/
func (b flockerMounter) SetUpAt(dir string, fsGroup *int64) error {
if b.client == nil {
c, err := b.newFlockerClient()
if err != nil {
return err
}
b.client = c
}
datasetID, err := b.client.GetDatasetID(dir)
if err != nil {
return err
}
s, err := b.client.GetDatasetState(datasetID)
if err != nil {
return fmt.Errorf("The volume '%s' is not available in Flocker. You need to create this manually with Flocker CLI before using it.", dir)
}
primaryUUID, err := b.client.GetPrimaryUUID()
if err != nil {
return err
}
if s.Primary != primaryUUID {
if err := b.updateDatasetPrimary(datasetID, primaryUUID); err != nil {
return err
}
newState, err := b.client.GetDatasetState(datasetID)
if err != nil {
return fmt.Errorf("The volume '%s' migrated unsuccessfully.", datasetID)
}
b.flocker.path = newState.Path
} else {
b.flocker.path = s.Path
}
return nil
}
// updateDatasetPrimary will update the primary in Flocker and wait for it to
// be ready. If it never gets to ready state it will timeout and error.
func (b flockerMounter) updateDatasetPrimary(datasetID, primaryUUID string) error {
// We need to update the primary and wait for it to be ready
_, err := b.client.UpdatePrimaryForDataset(primaryUUID, datasetID)
if err != nil {
return err
}
timeoutChan := time.NewTimer(timeoutWaitingForVolume)
defer timeoutChan.Stop()
tickChan := time.NewTicker(tickerWaitingForVolume)
defer tickChan.Stop()
for {
if s, err := b.client.GetDatasetState(datasetID); err == nil && s.Primary == primaryUUID {
return nil
}
select {
case <-timeoutChan.C:
return fmt.Errorf(
"Timed out waiting for the dataset_id: '%s' to be moved to the primary: '%s'\n%v",
datasetID, primaryUUID, err,
)
case <-tickChan.C:
break
}
}
}
func getVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool, error) {
if spec.Volume != nil && spec.Volume.Flocker != nil {
return spec.Volume.Flocker, spec.ReadOnly, nil
} else if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.Flocker != nil {
return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil
}
return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type")
}