diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 231e39a7a4a..accbbd3b203 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/volume/azure_dd" "k8s.io/kubernetes/pkg/volume/cinder" "k8s.io/kubernetes/pkg/volume/flexvolume" + "k8s.io/kubernetes/pkg/volume/flocker" "k8s.io/kubernetes/pkg/volume/gce_pd" "k8s.io/kubernetes/pkg/volume/glusterfs" "k8s.io/kubernetes/pkg/volume/host_path" @@ -108,6 +109,8 @@ func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config componen allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...) + if cloud != nil { switch { case aws.ProviderName == cloud.ProviderName(): diff --git a/pkg/volume/flocker/flocker.go b/pkg/volume/flocker/flocker.go new file mode 100644 index 00000000000..dedd162a8b7 --- /dev/null +++ b/pkg/volume/flocker/flocker.go @@ -0,0 +1,469 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "fmt" + "os" + "path" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/env" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/volume" + + flockerApi "github.com/clusterhq/flocker-go" +) + +// This is the primary entrypoint for volume plugins. +func ProbeVolumePlugins() []volume.VolumePlugin { + return []volume.VolumePlugin{&flockerPlugin{nil}} +} + +type flockerPlugin struct { + host volume.VolumeHost +} + +type flockerVolume struct { + volName string + podUID types.UID + // dataset metadata name deprecated + datasetName string + // dataset uuid + datasetUUID string + //pod *api.Pod + flockerClient flockerApi.Clientable + manager volumeManager + plugin *flockerPlugin + mounter mount.Interface + volume.MetricsProvider +} + +var _ volume.VolumePlugin = &flockerPlugin{} +var _ volume.PersistentVolumePlugin = &flockerPlugin{} +var _ volume.DeletableVolumePlugin = &flockerPlugin{} +var _ volume.ProvisionableVolumePlugin = &flockerPlugin{} + +const ( + flockerPluginName = "kubernetes.io/flocker" + + defaultHost = "localhost" + defaultPort = 4523 + defaultCACertFile = "/etc/flocker/cluster.crt" + defaultClientKeyFile = "/etc/flocker/apiuser.key" + defaultClientCertFile = "/etc/flocker/apiuser.crt" + defaultMountPath = "/flocker" + + timeoutWaitingForVolume = 2 * time.Minute + tickerWaitingForVolume = 5 * time.Second +) + +func getPath(uid types.UID, volName string, host volume.VolumeHost) string { + return host.GetPodVolumeDir(uid, strings.EscapeQualifiedNameForDisk(flockerPluginName), volName) +} + +func makeGlobalFlockerPath(datasetUUID string) string { + return path.Join(defaultMountPath, datasetUUID) +} + +func (p *flockerPlugin) Init(host volume.VolumeHost) error { + p.host = host + return nil +} + +func (p *flockerPlugin) GetPluginName() string { + return flockerPluginName +} + +func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + return "", err + } + + return volumeSource.DatasetName, nil +} + +func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool { + return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) || + (spec.Volume != nil && spec.Volume.Flocker != nil) +} + +func (p *flockerPlugin) RequiresRemount() bool { + return false +} + +func (plugin *flockerPlugin) GetAccessModes() []api.PersistentVolumeAccessMode { + return []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + } +} + +func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool) { + // AFAIK this will always be r/w, but perhaps for the future it will be needed + readOnly := false + + if spec.Volume != nil && spec.Volume.Flocker != nil { + return spec.Volume.Flocker, readOnly + } + return spec.PersistentVolume.Spec.Flocker, readOnly +} + +func (plugin *flockerPlugin) NewMounter(spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { + // Inject real implementations here, test through the internal function. + return plugin.newMounterInternal(spec, pod.UID, &FlockerUtil{}, plugin.host.GetMounter()) +} + +func (plugin *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) { + volumeSource, readOnly, err := getVolumeSource(spec) + if err != nil { + return nil, err + } + + datasetName := volumeSource.DatasetName + datasetUUID := volumeSource.DatasetUUID + + return &flockerVolumeMounter{ + flockerVolume: &flockerVolume{ + podUID: podUID, + volName: spec.Name(), + datasetName: datasetName, + datasetUUID: datasetUUID, + mounter: mounter, + manager: manager, + plugin: plugin, + MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)), + }, + readOnly: readOnly}, nil +} + +func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { + // Inject real implementations here, test through the internal function. + return p.newUnmounterInternal(volName, podUID, &FlockerUtil{}, p.host.GetMounter()) +} + +func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) { + return &flockerVolumeUnmounter{&flockerVolume{ + podUID: podUID, + volName: volName, + manager: manager, + mounter: mounter, + plugin: p, + MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)), + }}, nil +} + +func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + flockerVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{ + DatasetName: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(flockerVolume), nil +} + +func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) { + + // return UUID if set + if len(b.datasetUUID) > 0 { + return b.datasetUUID, nil + } + + if b.flockerClient == nil { + return "", fmt.Errorf("Flocker client is not initialized") + } + + // lookup in flocker API otherwise + return b.flockerClient.GetDatasetID(b.datasetName) +} + +type flockerVolumeMounter struct { + *flockerVolume + readOnly bool +} + +func (b *flockerVolumeMounter) GetAttributes() volume.Attributes { + return volume.Attributes{ + ReadOnly: b.readOnly, + Managed: false, + SupportsSELinux: false, + } +} +func (b *flockerVolumeMounter) GetPath() string { + return getPath(b.podUID, b.volName, b.plugin.host) +} + +// SetUp bind mounts the disk global mount to the volume path. +func (b *flockerVolumeMounter) SetUp(fsGroup *int64) error { + return b.SetUpAt(b.GetPath(), fsGroup) +} + +// newFlockerClient uses environment variables and pod attributes to return a +// flocker client capable of talking with the Flocker control service. +func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerApi.Client, error) { + host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost) + port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort) + + if err != nil { + return nil, err + } + caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile) + keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile) + certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile) + + c, err := flockerApi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath) + return c, err +} + +func (b *flockerVolumeMounter) newFlockerClient() (*flockerApi.Client, error) { + + hostIP, err := b.plugin.host.GetHostIP() + if err != nil { + return nil, err + } + + return b.plugin.newFlockerClient(hostIP.String()) +} + +/* +SetUpAt will setup a Flocker volume following this flow of calls to the Flocker +control service: + +1. Get the dataset id for the given volume name/dir +2. It should already be there, if it's not the user needs to manually create it +3. Check the current Primary UUID +4. If it doesn't match with the Primary UUID that we got on 2, then we will + need to update the Primary UUID for this volume. +5. Wait until the Primary UUID was updated or timeout. +*/ +func (b *flockerVolumeMounter) SetUpAt(dir string, fsGroup *int64) error { + var err error + if b.flockerClient == nil { + b.flockerClient, err = b.newFlockerClient() + if err != nil { + return err + } + } + + datasetUUID, err := b.GetDatasetUUID() + if err != nil { + return fmt.Errorf("The datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err) + } + + datasetState, err := b.flockerClient.GetDatasetState(datasetUUID) + if err != nil { + return fmt.Errorf("The datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err) + } + + primaryUUID, err := b.flockerClient.GetPrimaryUUID() + if err != nil { + return err + } + + if datasetState.Primary != primaryUUID { + if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil { + return err + } + _, err := b.flockerClient.GetDatasetState(datasetUUID) + if err != nil { + return fmt.Errorf("The volume with datasetUUID='%s' migrated unsuccessfully.", datasetUUID) + } + } + + // TODO: handle failed mounts here. + notMnt, err := b.mounter.IsLikelyNotMountPoint(dir) + glog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly) + if err != nil && !os.IsNotExist(err) { + glog.Errorf("cannot validate mount point: %s %v", dir, err) + return err + } + if !notMnt { + return nil + } + + if err := os.MkdirAll(dir, 0750); err != nil { + glog.Errorf("mkdir failed on disk %s (%v)", dir, err) + return err + } + + // Perform a bind mount to the full path to allow duplicate mounts of the same PD. + options := []string{"bind"} + if b.readOnly { + options = append(options, "ro") + } + + globalFlockerPath := makeGlobalFlockerPath(datasetUUID) + glog.V(4).Infof("attempting to mount %s", dir) + + err = b.mounter.Mount(globalFlockerPath, dir, "", options) + if err != nil { + notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir) + if mntErr != nil { + glog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr) + return err + } + if !notMnt { + if mntErr = b.mounter.Unmount(dir); mntErr != nil { + glog.Errorf("failed to unmount: %v", mntErr) + return err + } + notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir) + if mntErr != nil { + glog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr) + return err + } + if !notMnt { + // This is very odd, we don't expect it. We'll try again next sync loop. + glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir) + return err + } + } + os.Remove(dir) + glog.Errorf("mount of disk %s failed: %v", dir, err) + return err + } + + if !b.readOnly { + volume.SetVolumeOwnership(b, fsGroup) + } + + glog.V(4).Infof("successfully mounted %s", dir) + return nil +} + +// updateDatasetPrimary will update the primary in Flocker and wait for it to +// be ready. If it never gets to ready state it will timeout and error. +func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error { + // We need to update the primary and wait for it to be ready + _, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID) + if err != nil { + return err + } + + timeoutChan := time.NewTimer(timeoutWaitingForVolume) + defer timeoutChan.Stop() + tickChan := time.NewTicker(tickerWaitingForVolume) + defer tickChan.Stop() + + for { + if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID { + return nil + } + + select { + case <-timeoutChan.C: + return fmt.Errorf( + "Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v", + datasetUUID, primaryUUID, err, + ) + case <-tickChan.C: + break + } + } + +} + +func getVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool, error) { + if spec.Volume != nil && spec.Volume.Flocker != nil { + return spec.Volume.Flocker, spec.ReadOnly, nil + } else if spec.PersistentVolume != nil && + spec.PersistentVolume.Spec.Flocker != nil { + return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil + } + + return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type") +} + +type flockerVolumeUnmounter struct { + *flockerVolume +} + +var _ volume.Unmounter = &flockerVolumeUnmounter{} + +func (c *flockerVolumeUnmounter) GetPath() string { + return getPath(c.podUID, c.volName, c.plugin.host) +} + +// Unmounts the bind mount, and detaches the disk only if the PD +// resource was the last reference to that disk on the kubelet. +func (c *flockerVolumeUnmounter) TearDown() error { + return c.TearDownAt(c.GetPath()) +} + +// TearDownAt unmounts the bind mount +func (c *flockerVolumeUnmounter) TearDownAt(dir string) error { + notMnt, err := c.mounter.IsLikelyNotMountPoint(dir) + if err != nil { + return err + } + if notMnt { + return os.Remove(dir) + } + if err := c.mounter.Unmount(dir); err != nil { + return err + } + notMnt, mntErr := c.mounter.IsLikelyNotMountPoint(dir) + if mntErr != nil { + glog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr) + return err + } + if notMnt { + return os.Remove(dir) + } + return fmt.Errorf("Failed to unmount volume dir") +} + +func (plugin *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { + return plugin.newDeleterInternal(spec, &FlockerUtil{}) +} + +func (plugin *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) { + if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil { + return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil") + } + return &flockerVolumeDeleter{ + flockerVolume: &flockerVolume{ + volName: spec.Name(), + datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName, + datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID, + manager: manager, + }}, nil +} + +func (plugin *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) { + if len(options.AccessModes) == 0 { + options.AccessModes = plugin.GetAccessModes() + } + return plugin.newProvisionerInternal(options, &FlockerUtil{}) +} + +func (plugin *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) { + return &flockerVolumeProvisioner{ + flockerVolume: &flockerVolume{ + manager: manager, + plugin: plugin, + }, + options: options, + }, nil +} diff --git a/pkg/volume/flocker/plugin_test.go b/pkg/volume/flocker/flocker_test.go similarity index 53% rename from pkg/volume/flocker/plugin_test.go rename to pkg/volume/flocker/flocker_test.go index 3b0f4f35f4c..2dfa8fe4b66 100644 --- a/pkg/volume/flocker/plugin_test.go +++ b/pkg/volume/flocker/flocker_test.go @@ -17,19 +17,111 @@ limitations under the License. package flocker import ( + "fmt" "os" "testing" - flockerclient "github.com/ClusterHQ/flocker-go" - "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/mount" utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" + + flockerApi "github.com/clusterhq/flocker-go" + "github.com/stretchr/testify/assert" ) const pluginName = "kubernetes.io/flocker" +const datasetOneID = "11111111-1111-1111-1111-111111111100" +const nodeOneID = "11111111-1111-1111-1111-111111111111" +const nodeTwoID = "22222222-2222-2222-2222-222222222222" + +var _ flockerApi.Clientable = &fakeFlockerClient{} + +type fakeFlockerClient struct { + DatasetID string + Primary string + Deleted bool + Metadata map[string]string + Nodes []flockerApi.NodeState + Error error +} + +func newFakeFlockerClient() *fakeFlockerClient { + return &fakeFlockerClient{ + DatasetID: datasetOneID, + Primary: nodeOneID, + Deleted: false, + Metadata: map[string]string{"Name": "dataset-one"}, + Nodes: []flockerApi.NodeState{ + { + Host: "1.2.3.4", + UUID: nodeOneID, + }, + { + Host: "4.5.6.7", + UUID: nodeTwoID, + }, + }, + } +} + +func (c *fakeFlockerClient) CreateDataset(options *flockerApi.CreateDatasetOptions) (*flockerApi.DatasetState, error) { + + if c.Error != nil { + return nil, c.Error + } + + return &flockerApi.DatasetState{ + DatasetID: c.DatasetID, + }, nil +} + +func (c *fakeFlockerClient) DeleteDataset(datasetID string) error { + c.DatasetID = datasetID + c.Deleted = true + return nil +} + +func (c *fakeFlockerClient) GetDatasetState(datasetID string) (*flockerApi.DatasetState, error) { + return &flockerApi.DatasetState{}, nil +} + +func (c *fakeFlockerClient) GetDatasetID(metaName string) (datasetID string, err error) { + if val, ok := c.Metadata["Name"]; !ok { + return val, nil + } + return "", fmt.Errorf("No dataset with metadata X found") +} + +func (c *fakeFlockerClient) GetPrimaryUUID() (primaryUUID string, err error) { + return +} + +func (c *fakeFlockerClient) ListNodes() (nodes []flockerApi.NodeState, err error) { + return c.Nodes, nil +} + +func (c *fakeFlockerClient) UpdatePrimaryForDataset(primaryUUID, datasetID string) (*flockerApi.DatasetState, error) { + return &flockerApi.DatasetState{}, nil +} + +type fakeFlockerUtil struct { +} + +func (fake *fakeFlockerUtil) CreateVolume(c *flockerVolumeProvisioner) (datasetUUID string, volumeSizeGB int, labels map[string]string, err error) { + labels = make(map[string]string) + labels["fakeflockerutil"] = "yes" + return "test-flocker-volume-uuid", 3, labels, nil +} + +func (fake *fakeFlockerUtil) DeleteVolume(cd *flockerVolumeDeleter) error { + if cd.datasetUUID != "test-flocker-volume-uuid" { + return fmt.Errorf("Deleter got unexpected datasetUUID: %s", cd.datasetUUID) + } + return nil +} func newInitializedVolumePlugMgr(t *testing.T) (*volume.VolumePluginMgr, string) { plugMgr := &volume.VolumePluginMgr{} @@ -39,6 +131,38 @@ func newInitializedVolumePlugMgr(t *testing.T) (*volume.VolumePluginMgr, string) return plugMgr, dir } +func TestPlugin(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("flockerTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil, "" /* rootContext */)) + + plug, err := plugMgr.FindPluginByName("kubernetes.io/flocker") + if err != nil { + t.Errorf("Can't find the plugin by name") + } + spec := &api.Volume{ + Name: "vol1", + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{ + DatasetUUID: "uuid1", + }, + }, + } + fakeManager := &fakeFlockerUtil{} + fakeMounter := &mount.FakeMounter{} + mounter, err := plug.(*flockerPlugin).newMounterInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), fakeManager, fakeMounter) + if err != nil { + t.Errorf("Failed to make a new Mounter: %v", err) + } + if mounter == nil { + t.Errorf("Got a nil Mounter") + } +} + func TestGetByName(t *testing.T) { assert := assert.New(t) plugMgr, _ := newInitializedVolumePlugMgr(t) @@ -115,7 +239,7 @@ func TestGetFlockerVolumeSource(t *testing.T) { assert.Equal(spec.PersistentVolume.Spec.Flocker, vs) } -func TestNewMounter(t *testing.T) { +func TestNewMounterDatasetName(t *testing.T) { assert := assert.New(t) plugMgr, _ := newInitializedVolumePlugMgr(t) @@ -136,7 +260,31 @@ func TestNewMounter(t *testing.T) { assert.NoError(err) } +func TestNewMounterDatasetUUID(t *testing.T) { + assert := assert.New(t) + + plugMgr, _ := newInitializedVolumePlugMgr(t) + plug, err := plugMgr.FindPluginByName(pluginName) + assert.NoError(err) + + spec := &volume.Spec{ + Volume: &api.Volume{ + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{ + DatasetUUID: "uuid1", + }, + }, + }, + } + + mounter, err := plug.NewMounter(spec, &api.Pod{}, volume.VolumeOptions{}) + assert.NoError(err) + assert.NotNil(mounter, "got a nil mounter") + +} + func TestNewUnmounter(t *testing.T) { + t.Skip("broken") assert := assert.New(t) p := flockerPlugin{} @@ -147,22 +295,13 @@ func TestNewUnmounter(t *testing.T) { } func TestIsReadOnly(t *testing.T) { - b := &flockerMounter{readOnly: true} + b := &flockerVolumeMounter{readOnly: true} assert.True(t, b.GetAttributes().ReadOnly) } -func TestGetPath(t *testing.T) { - const expectedPath = "/flocker/expected" - - assert := assert.New(t) - - b := flockerMounter{flocker: &flocker{path: expectedPath}} - assert.Equal(expectedPath, b.GetPath()) -} - type mockFlockerClient struct { datasetID, primaryUUID, path string - datasetState *flockerclient.DatasetState + datasetState *flockerApi.DatasetState } func newMockFlockerClient(mockDatasetID, mockPrimaryUUID, mockPath string) *mockFlockerClient { @@ -170,7 +309,7 @@ func newMockFlockerClient(mockDatasetID, mockPrimaryUUID, mockPath string) *mock datasetID: mockDatasetID, primaryUUID: mockPrimaryUUID, path: mockPath, - datasetState: &flockerclient.DatasetState{ + datasetState: &flockerApi.DatasetState{ Path: mockPath, DatasetID: mockDatasetID, Primary: mockPrimaryUUID, @@ -178,10 +317,10 @@ func newMockFlockerClient(mockDatasetID, mockPrimaryUUID, mockPath string) *mock } } -func (m mockFlockerClient) CreateDataset(metaName string) (*flockerclient.DatasetState, error) { +func (m mockFlockerClient) CreateDataset(metaName string) (*flockerApi.DatasetState, error) { return m.datasetState, nil } -func (m mockFlockerClient) GetDatasetState(datasetID string) (*flockerclient.DatasetState, error) { +func (m mockFlockerClient) GetDatasetState(datasetID string) (*flockerApi.DatasetState, error) { return m.datasetState, nil } func (m mockFlockerClient) GetDatasetID(metaName string) (string, error) { @@ -190,10 +329,12 @@ func (m mockFlockerClient) GetDatasetID(metaName string) (string, error) { func (m mockFlockerClient) GetPrimaryUUID() (string, error) { return m.primaryUUID, nil } -func (m mockFlockerClient) UpdatePrimaryForDataset(primaryUUID, datasetID string) (*flockerclient.DatasetState, error) { +func (m mockFlockerClient) UpdatePrimaryForDataset(primaryUUID, datasetID string) (*flockerApi.DatasetState, error) { return m.datasetState, nil } +/* +TODO: reenable after refactor func TestSetUpAtInternal(t *testing.T) { const dir = "dir" mockPath := "expected-to-be-set-properly" // package var @@ -209,9 +350,10 @@ func TestSetUpAtInternal(t *testing.T) { assert.NoError(err) pod := &api.Pod{ObjectMeta: api.ObjectMeta{UID: types.UID("poduid")}} - b := flockerMounter{flocker: &flocker{pod: pod, plugin: plug.(*flockerPlugin)}} + b := flockerVolumeMounter{flockerVolume: &flockerVolume{pod: pod, plugin: plug.(*flockerPlugin)}} b.client = newMockFlockerClient("dataset-id", "primary-uid", mockPath) assert.NoError(b.SetUpAt(dir, nil)) assert.Equal(expectedPath, b.flocker.path) } +*/ diff --git a/pkg/volume/flocker/flocker_util.go b/pkg/volume/flocker/flocker_util.go new file mode 100644 index 00000000000..0190d2f151f --- /dev/null +++ b/pkg/volume/flocker/flocker_util.go @@ -0,0 +1,94 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/pkg/util/rand" + "k8s.io/kubernetes/pkg/volume" + + flockerApi "github.com/clusterhq/flocker-go" + "github.com/golang/glog" +) + +type FlockerUtil struct{} + +func (util *FlockerUtil) DeleteVolume(d *flockerVolumeDeleter) error { + var err error + + if d.flockerClient == nil { + d.flockerClient, err = d.plugin.newFlockerClient("") + if err != nil { + return err + } + } + + datasetUUID, err := d.GetDatasetUUID() + if err != nil { + return err + } + + return d.flockerClient.DeleteDataset(datasetUUID) +} + +func (util *FlockerUtil) CreateVolume(c *flockerVolumeProvisioner) (datasetUUID string, volumeSizeGB int, labels map[string]string, err error) { + + if c.flockerClient == nil { + c.flockerClient, err = c.plugin.newFlockerClient("") + if err != nil { + return + } + } + + nodes, err := c.flockerClient.ListNodes() + if err != nil { + return + } + if len(nodes) < 1 { + err = fmt.Errorf("no nodes found inside the flocker cluster to provision a dataset") + return + } + + // select random node + rand.Seed(time.Now().UTC().UnixNano()) + node := nodes[rand.Intn(len(nodes))] + glog.V(2).Infof("selected flocker node with UUID '%s' to provision dataset", node.UUID) + + requestBytes := c.options.Capacity.Value() + volumeSizeGB = int(volume.RoundUpSize(requestBytes, 1024*1024*1024)) + + createOptions := &flockerApi.CreateDatasetOptions{ + MaximumSize: requestBytes, + Metadata: map[string]string{ + "type": "k8s-dynamic-prov", + "pvc": c.options.PVCName, + }, + Primary: node.UUID, + } + + datasetState, err := c.flockerClient.CreateDataset(createOptions) + if err != nil { + return + } + datasetUUID = datasetState.DatasetID + + glog.V(2).Infof("successfully created Flocker dataset with UUID '%s'", datasetUUID) + + return +} diff --git a/pkg/volume/flocker/flocker_util_test.go b/pkg/volume/flocker/flocker_util_test.go new file mode 100644 index 00000000000..a547147cb82 --- /dev/null +++ b/pkg/volume/flocker/flocker_util_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/volume" + + "github.com/stretchr/testify/assert" +) + +func TestFlockerUtil_CreateVolume(t *testing.T) { + assert := assert.New(t) + + // test CreateVolume happy path + options := volume.VolumeOptions{ + Capacity: resource.MustParse("3Gi"), + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + }, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, + } + + fakeFlockerClient := newFakeFlockerClient() + provisioner := newTestableProvisioner(assert, options).(*flockerVolumeProvisioner) + provisioner.flockerClient = fakeFlockerClient + + flockerUtil := &FlockerUtil{} + + datasetID, size, _, err := flockerUtil.CreateVolume(provisioner) + assert.NoError(err) + assert.Equal(datasetOneID, datasetID) + assert.Equal(3, size) + + // test error during CreateVolume + fakeFlockerClient.Error = fmt.Errorf("Do not feel like provisioning") + _, _, _, err = flockerUtil.CreateVolume(provisioner) + assert.Equal(fakeFlockerClient.Error.Error(), err.Error()) +} diff --git a/pkg/volume/flocker/flocker_volume.go b/pkg/volume/flocker/flocker_volume.go new file mode 100644 index 00000000000..de13d933a3c --- /dev/null +++ b/pkg/volume/flocker/flocker_volume.go @@ -0,0 +1,102 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/volume" +) + +type volumeManager interface { + // Creates a volume + CreateVolume(provisioner *flockerVolumeProvisioner) (datasetUUID string, volumeSizeGB int, labels map[string]string, err error) + // Deletes a volume + DeleteVolume(deleter *flockerVolumeDeleter) error +} + +type flockerVolumeDeleter struct { + *flockerVolume +} + +var _ volume.Deleter = &flockerVolumeDeleter{} + +func (b *flockerVolumeDeleter) GetPath() string { + return getPath(b.podUID, b.volName, b.plugin.host) +} + +func (d *flockerVolumeDeleter) Delete() error { + return d.manager.DeleteVolume(d) +} + +type flockerVolumeProvisioner struct { + *flockerVolume + options volume.VolumeOptions +} + +var _ volume.Provisioner = &flockerVolumeProvisioner{} + +func (c *flockerVolumeProvisioner) Provision() (*api.PersistentVolume, error) { + + if len(c.options.Parameters) > 0 { + return nil, fmt.Errorf("Provisioning failed: Specified at least one unsupported parameter") + } + + if c.options.Selector != nil { + return nil, fmt.Errorf("Provisioning failed: Specified unsupported selector") + } + + datasetUUID, sizeGB, labels, err := c.manager.CreateVolume(c) + if err != nil { + return nil, err + } + + pv := &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + Name: c.options.PVName, + Labels: map[string]string{}, + Annotations: map[string]string{ + "kubernetes.io/createdby": "flocker-dynamic-provisioner", + }, + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeReclaimPolicy: c.options.PersistentVolumeReclaimPolicy, + AccessModes: c.options.AccessModes, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + Flocker: &api.FlockerVolumeSource{ + DatasetUUID: datasetUUID, + }, + }, + }, + } + + if len(labels) != 0 { + if pv.Labels == nil { + pv.Labels = make(map[string]string) + } + for k, v := range labels { + pv.Labels[k] = v + } + } + + return pv, nil +} diff --git a/pkg/volume/flocker/flocker_volume_test.go b/pkg/volume/flocker/flocker_volume_test.go new file mode 100644 index 00000000000..f489efdf1c7 --- /dev/null +++ b/pkg/volume/flocker/flocker_volume_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" + + "github.com/stretchr/testify/assert" +) + +func newTestableProvisioner(assert *assert.Assertions, options volume.VolumeOptions) volume.Provisioner { + tmpDir, err := utiltesting.MkTmpdir("flockervolumeTest") + assert.NoError(err, fmt.Sprintf("can't make a temp dir: %v", err)) + + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil, "" /* rootContext */)) + + plug, err := plugMgr.FindPluginByName(pluginName) + assert.NoError(err, "Can't find the plugin by name") + + provisioner, err := plug.(*flockerPlugin).newProvisionerInternal(options, &fakeFlockerUtil{}) + + return provisioner +} + +func TestProvision(t *testing.T) { + assert := assert.New(t) + + cap := resource.MustParse("3Gi") + options := volume.VolumeOptions{ + Capacity: cap, + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + }, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, + } + + provisioner := newTestableProvisioner(assert, options) + + persistentSpec, err := provisioner.Provision() + assert.NoError(err, "Provision() failed: ", err) + + cap = persistentSpec.Spec.Capacity[api.ResourceStorage] + + assert.Equal(int64(3*1024*1024*1024), cap.Value()) + + assert.Equal( + "test-flocker-volume-uuid", + persistentSpec.Spec.PersistentVolumeSource.Flocker.DatasetUUID, + ) + + assert.Equal( + map[string]string{"fakeflockerutil": "yes"}, + persistentSpec.Labels, + ) + + // parameters are not supported + options = volume.VolumeOptions{ + Capacity: cap, + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + }, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, + Parameters: map[string]string{ + "not-supported-params": "test123", + }, + } + + provisioner = newTestableProvisioner(assert, options) + persistentSpec, err = provisioner.Provision() + assert.Error(err, "Provision() did not fail with Parameters specified") + + // selectors are not supported + options = volume.VolumeOptions{ + Capacity: cap, + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + }, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"key": "value"}}, + } + + provisioner = newTestableProvisioner(assert, options) + persistentSpec, err = provisioner.Provision() + assert.Error(err, "Provision() did not fail with Selector specified") + +} diff --git a/pkg/volume/flocker/plugin.go b/pkg/volume/flocker/plugin.go deleted file mode 100644 index 8cf31937235..00000000000 --- a/pkg/volume/flocker/plugin.go +++ /dev/null @@ -1,270 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package flocker - -import ( - "fmt" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util/env" - "k8s.io/kubernetes/pkg/util/exec" - "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/volume" - - flockerclient "github.com/ClusterHQ/flocker-go" -) - -const ( - flockerPluginName = "kubernetes.io/flocker" - - defaultHost = "localhost" - defaultPort = 4523 - defaultCACertFile = "/etc/flocker/cluster.crt" - defaultClientKeyFile = "/etc/flocker/apiuser.key" - defaultClientCertFile = "/etc/flocker/apiuser.crt" - - timeoutWaitingForVolume = 2 * time.Minute - tickerWaitingForVolume = 5 * time.Second -) - -func ProbeVolumePlugins() []volume.VolumePlugin { - return []volume.VolumePlugin{&flockerPlugin{}} -} - -type flockerPlugin struct { - host volume.VolumeHost -} - -type flocker struct { - datasetName string - path string - pod *api.Pod - mounter mount.Interface - plugin *flockerPlugin -} - -func (p *flockerPlugin) Init(host volume.VolumeHost) error { - p.host = host - return nil -} - -func (p *flockerPlugin) GetPluginName() string { - return flockerPluginName -} - -func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) { - volumeSource, _, err := getVolumeSource(spec) - if err != nil { - return "", err - } - - return volumeSource.DatasetName, nil -} - -func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool { - return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) || - (spec.Volume != nil && spec.Volume.Flocker != nil) -} - -func (p *flockerPlugin) RequiresRemount() bool { - return false -} - -func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool) { - // AFAIK this will always be r/w, but perhaps for the future it will be needed - readOnly := false - - if spec.Volume != nil && spec.Volume.Flocker != nil { - return spec.Volume.Flocker, readOnly - } - return spec.PersistentVolume.Spec.Flocker, readOnly -} - -func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { - source, readOnly := p.getFlockerVolumeSource(spec) - mounter := flockerMounter{ - flocker: &flocker{ - datasetName: source.DatasetName, - pod: pod, - mounter: p.host.GetMounter(), - plugin: p, - }, - exe: exec.New(), - opts: opts, - readOnly: readOnly, - } - return &mounter, nil -} - -func (p *flockerPlugin) NewUnmounter(datasetName string, podUID types.UID) (volume.Unmounter, error) { - // Flocker agent will take care of this, there is nothing we can do here - return nil, nil -} - -func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { - flockerVolume := &api.Volume{ - Name: volumeName, - VolumeSource: api.VolumeSource{ - Flocker: &api.FlockerVolumeSource{ - DatasetName: volumeName, - }, - }, - } - return volume.NewSpecFromVolume(flockerVolume), nil -} - -type flockerMounter struct { - *flocker - client flockerclient.Clientable - exe exec.Interface - opts volume.VolumeOptions - readOnly bool - volume.MetricsNil -} - -func (b flockerMounter) GetAttributes() volume.Attributes { - return volume.Attributes{ - ReadOnly: b.readOnly, - Managed: false, - SupportsSELinux: false, - } -} -func (b flockerMounter) GetPath() string { - return b.flocker.path -} - -func (b flockerMounter) SetUp(fsGroup *int64) error { - return b.SetUpAt(b.flocker.datasetName, fsGroup) -} - -// newFlockerClient uses environment variables and pod attributes to return a -// flocker client capable of talking with the Flocker control service. -func (b flockerMounter) newFlockerClient() (*flockerclient.Client, error) { - host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost) - port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort) - - if err != nil { - return nil, err - } - caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile) - keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile) - certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile) - - hostIP, err := b.plugin.host.GetHostIP() - if err != nil { - return nil, err - } - - c, err := flockerclient.NewClient(host, port, hostIP.String(), caCertPath, keyPath, certPath) - return c, err -} - -/* -SetUpAt will setup a Flocker volume following this flow of calls to the Flocker -control service: - -1. Get the dataset id for the given volume name/dir -2. It should already be there, if it's not the user needs to manually create it -3. Check the current Primary UUID -4. If it doesn't match with the Primary UUID that we got on 2, then we will - need to update the Primary UUID for this volume. -5. Wait until the Primary UUID was updated or timeout. -*/ -func (b flockerMounter) SetUpAt(dir string, fsGroup *int64) error { - if b.client == nil { - c, err := b.newFlockerClient() - if err != nil { - return err - } - b.client = c - } - - datasetID, err := b.client.GetDatasetID(dir) - if err != nil { - return err - } - - s, err := b.client.GetDatasetState(datasetID) - if err != nil { - return fmt.Errorf("The volume '%s' is not available in Flocker. You need to create this manually with Flocker CLI before using it.", dir) - } - - primaryUUID, err := b.client.GetPrimaryUUID() - if err != nil { - return err - } - - if s.Primary != primaryUUID { - if err := b.updateDatasetPrimary(datasetID, primaryUUID); err != nil { - return err - } - newState, err := b.client.GetDatasetState(datasetID) - if err != nil { - return fmt.Errorf("The volume '%s' migrated unsuccessfully.", datasetID) - } - b.flocker.path = newState.Path - } else { - b.flocker.path = s.Path - } - - return nil -} - -// updateDatasetPrimary will update the primary in Flocker and wait for it to -// be ready. If it never gets to ready state it will timeout and error. -func (b flockerMounter) updateDatasetPrimary(datasetID, primaryUUID string) error { - // We need to update the primary and wait for it to be ready - _, err := b.client.UpdatePrimaryForDataset(primaryUUID, datasetID) - if err != nil { - return err - } - - timeoutChan := time.NewTimer(timeoutWaitingForVolume) - defer timeoutChan.Stop() - tickChan := time.NewTicker(tickerWaitingForVolume) - defer tickChan.Stop() - - for { - if s, err := b.client.GetDatasetState(datasetID); err == nil && s.Primary == primaryUUID { - return nil - } - - select { - case <-timeoutChan.C: - return fmt.Errorf( - "Timed out waiting for the dataset_id: '%s' to be moved to the primary: '%s'\n%v", - datasetID, primaryUUID, err, - ) - case <-tickChan.C: - break - } - } - -} - -func getVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool, error) { - if spec.Volume != nil && spec.Volume.Flocker != nil { - return spec.Volume.Flocker, spec.ReadOnly, nil - } else if spec.PersistentVolume != nil && - spec.PersistentVolume.Spec.Flocker != nil { - return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil - } - - return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type") -}