
The field in fact says that the container runtime should relabel a volume when running a container with it, it does not say that the volume supports SELinux. For example, NFS can support SELinux, but we don't want NFS volumes relabeled, because they can be shared among several Pods.
465 lines
14 KiB
Go
465 lines
14 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package flocker
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
flockerapi "github.com/clusterhq/flocker-go"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/mount-utils"
|
|
utilstrings "k8s.io/utils/strings"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/kubernetes/pkg/util/env"
|
|
"k8s.io/kubernetes/pkg/volume"
|
|
"k8s.io/kubernetes/pkg/volume/util"
|
|
)
|
|
|
|
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
|
|
func ProbeVolumePlugins() []volume.VolumePlugin {
|
|
return []volume.VolumePlugin{&flockerPlugin{nil}}
|
|
}
|
|
|
|
type flockerPlugin struct {
|
|
host volume.VolumeHost
|
|
}
|
|
|
|
type flockerVolume struct {
|
|
volName string
|
|
podUID types.UID
|
|
// dataset metadata name deprecated
|
|
datasetName string
|
|
// dataset uuid
|
|
datasetUUID string
|
|
//pod *v1.Pod
|
|
flockerClient flockerapi.Clientable
|
|
manager volumeManager
|
|
plugin *flockerPlugin
|
|
mounter mount.Interface
|
|
volume.MetricsProvider
|
|
}
|
|
|
|
var _ volume.VolumePlugin = &flockerPlugin{}
|
|
var _ volume.PersistentVolumePlugin = &flockerPlugin{}
|
|
var _ volume.DeletableVolumePlugin = &flockerPlugin{}
|
|
var _ volume.ProvisionableVolumePlugin = &flockerPlugin{}
|
|
|
|
const (
|
|
flockerPluginName = "kubernetes.io/flocker"
|
|
|
|
defaultHost = "localhost"
|
|
defaultPort = 4523
|
|
defaultCACertFile = "/etc/flocker/cluster.crt"
|
|
defaultClientKeyFile = "/etc/flocker/apiuser.key"
|
|
defaultClientCertFile = "/etc/flocker/apiuser.crt"
|
|
defaultMountPath = "/flocker"
|
|
|
|
timeoutWaitingForVolume = 2 * time.Minute
|
|
tickerWaitingForVolume = 5 * time.Second
|
|
)
|
|
|
|
func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
|
|
return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(flockerPluginName), volName)
|
|
}
|
|
|
|
func makeGlobalFlockerPath(datasetUUID string) string {
|
|
return filepath.Join(defaultMountPath, datasetUUID)
|
|
}
|
|
|
|
func (p *flockerPlugin) Init(host volume.VolumeHost) error {
|
|
p.host = host
|
|
return nil
|
|
}
|
|
|
|
func (p *flockerPlugin) GetPluginName() string {
|
|
return flockerPluginName
|
|
}
|
|
|
|
func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
|
|
volumeSource, _, err := getVolumeSource(spec)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return volumeSource.DatasetName, nil
|
|
}
|
|
|
|
func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool {
|
|
return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) ||
|
|
(spec.Volume != nil && spec.Volume.Flocker != nil)
|
|
}
|
|
|
|
func (p *flockerPlugin) RequiresRemount(spec *volume.Spec) bool {
|
|
return false
|
|
}
|
|
|
|
func (p *flockerPlugin) SupportsMountOption() bool {
|
|
return false
|
|
}
|
|
|
|
func (p *flockerPlugin) SupportsBulkVolumeVerification() bool {
|
|
return false
|
|
}
|
|
|
|
func (p *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
|
|
return []v1.PersistentVolumeAccessMode{
|
|
v1.ReadWriteOnce,
|
|
}
|
|
}
|
|
|
|
func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool) {
|
|
// AFAIK this will always be r/w, but perhaps for the future it will be needed
|
|
readOnly := false
|
|
|
|
if spec.Volume != nil && spec.Volume.Flocker != nil {
|
|
return spec.Volume.Flocker, readOnly
|
|
}
|
|
return spec.PersistentVolume.Spec.Flocker, readOnly
|
|
}
|
|
|
|
func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
|
|
// Inject real implementations here, test through the internal function.
|
|
return p.newMounterInternal(spec, pod.UID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
|
|
}
|
|
|
|
func (p *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) {
|
|
volumeSource, readOnly, err := getVolumeSource(spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
datasetName := volumeSource.DatasetName
|
|
datasetUUID := volumeSource.DatasetUUID
|
|
|
|
return &flockerVolumeMounter{
|
|
flockerVolume: &flockerVolume{
|
|
podUID: podUID,
|
|
volName: spec.Name(),
|
|
datasetName: datasetName,
|
|
datasetUUID: datasetUUID,
|
|
mounter: mounter,
|
|
manager: manager,
|
|
plugin: p,
|
|
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), p.host)),
|
|
},
|
|
readOnly: readOnly}, nil
|
|
}
|
|
|
|
func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
|
// Inject real implementations here, test through the internal function.
|
|
return p.newUnmounterInternal(volName, podUID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
|
|
}
|
|
|
|
func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) {
|
|
return &flockerVolumeUnmounter{&flockerVolume{
|
|
podUID: podUID,
|
|
volName: volName,
|
|
manager: manager,
|
|
mounter: mounter,
|
|
plugin: p,
|
|
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)),
|
|
}}, nil
|
|
}
|
|
|
|
func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
|
flockerVolume := &v1.Volume{
|
|
Name: volumeName,
|
|
VolumeSource: v1.VolumeSource{
|
|
Flocker: &v1.FlockerVolumeSource{
|
|
DatasetName: volumeName,
|
|
},
|
|
},
|
|
}
|
|
return volume.NewSpecFromVolume(flockerVolume), nil
|
|
}
|
|
|
|
func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) {
|
|
|
|
// return UUID if set
|
|
if len(b.datasetUUID) > 0 {
|
|
return b.datasetUUID, nil
|
|
}
|
|
|
|
if b.flockerClient == nil {
|
|
return "", fmt.Errorf("flocker client is not initialized")
|
|
}
|
|
|
|
// lookup in flocker API otherwise
|
|
return b.flockerClient.GetDatasetID(b.datasetName)
|
|
}
|
|
|
|
type flockerVolumeMounter struct {
|
|
*flockerVolume
|
|
readOnly bool
|
|
}
|
|
|
|
func (b *flockerVolumeMounter) GetAttributes() volume.Attributes {
|
|
return volume.Attributes{
|
|
ReadOnly: b.readOnly,
|
|
Managed: false,
|
|
SELinuxRelabel: false,
|
|
}
|
|
}
|
|
|
|
// Checks prior to mount operations to verify that the required components (binaries, etc.)
|
|
// to mount the volume are available on the underlying node.
|
|
// If not, it returns an error
|
|
func (b *flockerVolumeMounter) CanMount() error {
|
|
return nil
|
|
}
|
|
|
|
func (b *flockerVolumeMounter) GetPath() string {
|
|
return getPath(b.podUID, b.volName, b.plugin.host)
|
|
}
|
|
|
|
// SetUp bind mounts the disk global mount to the volume path.
|
|
func (b *flockerVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
|
|
return b.SetUpAt(b.GetPath(), mounterArgs)
|
|
}
|
|
|
|
// newFlockerClient uses environment variables and pod attributes to return a
|
|
// flocker client capable of talking with the Flocker control service.
|
|
func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerapi.Client, error) {
|
|
host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost)
|
|
port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile)
|
|
keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile)
|
|
certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile)
|
|
|
|
c, err := flockerapi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath)
|
|
return c, err
|
|
}
|
|
|
|
func (b *flockerVolumeMounter) newFlockerClient() (*flockerapi.Client, error) {
|
|
|
|
hostIP, err := b.plugin.host.GetHostIP()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return b.plugin.newFlockerClient(hostIP.String())
|
|
}
|
|
|
|
/*
|
|
SetUpAt will setup a Flocker volume following this flow of calls to the Flocker
|
|
control service:
|
|
|
|
1. Get the dataset id for the given volume name/dir
|
|
2. It should already be there, if it's not the user needs to manually create it
|
|
3. Check the current Primary UUID
|
|
4. If it doesn't match with the Primary UUID that we got on 2, then we will
|
|
need to update the Primary UUID for this volume.
|
|
5. Wait until the Primary UUID was updated or timeout.
|
|
*/
|
|
func (b *flockerVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
|
|
var err error
|
|
if b.flockerClient == nil {
|
|
b.flockerClient, err = b.newFlockerClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
datasetUUID, err := b.GetDatasetUUID()
|
|
if err != nil {
|
|
return fmt.Errorf("the datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err)
|
|
}
|
|
|
|
datasetState, err := b.flockerClient.GetDatasetState(datasetUUID)
|
|
if err != nil {
|
|
return fmt.Errorf("the datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err)
|
|
}
|
|
|
|
primaryUUID, err := b.flockerClient.GetPrimaryUUID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if datasetState.Primary != primaryUUID {
|
|
if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil {
|
|
return err
|
|
}
|
|
_, err := b.flockerClient.GetDatasetState(datasetUUID)
|
|
if err != nil {
|
|
return fmt.Errorf("the volume with datasetUUID='%s' migrated unsuccessfully", datasetUUID)
|
|
}
|
|
}
|
|
|
|
// TODO: handle failed mounts here.
|
|
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
|
|
klog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
klog.Errorf("cannot validate mount point: %s %v", dir, err)
|
|
return err
|
|
}
|
|
if !notMnt {
|
|
return nil
|
|
}
|
|
|
|
if err := os.MkdirAll(dir, 0750); err != nil {
|
|
klog.Errorf("mkdir failed on disk %s (%v)", dir, err)
|
|
return err
|
|
}
|
|
|
|
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
|
|
options := []string{"bind"}
|
|
if b.readOnly {
|
|
options = append(options, "ro")
|
|
}
|
|
|
|
globalFlockerPath := makeGlobalFlockerPath(datasetUUID)
|
|
klog.V(4).Infof("attempting to mount %s", dir)
|
|
|
|
err = b.mounter.MountSensitiveWithoutSystemd(globalFlockerPath, dir, "", options, nil)
|
|
if err != nil {
|
|
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
|
|
if mntErr != nil {
|
|
klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
|
|
return err
|
|
}
|
|
if !notMnt {
|
|
if mntErr = b.mounter.Unmount(dir); mntErr != nil {
|
|
klog.Errorf("failed to unmount: %v", mntErr)
|
|
return err
|
|
}
|
|
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
|
|
if mntErr != nil {
|
|
klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
|
|
return err
|
|
}
|
|
if !notMnt {
|
|
// This is very odd, we don't expect it. We'll try again next sync loop.
|
|
klog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
|
|
return err
|
|
}
|
|
}
|
|
os.Remove(dir)
|
|
klog.Errorf("mount of disk %s failed: %v", dir, err)
|
|
return err
|
|
}
|
|
|
|
if !b.readOnly {
|
|
volume.SetVolumeOwnership(b, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil))
|
|
}
|
|
|
|
klog.V(4).Infof("successfully mounted %s", dir)
|
|
return nil
|
|
}
|
|
|
|
// updateDatasetPrimary will update the primary in Flocker and wait for it to
|
|
// be ready. If it never gets to ready state it will timeout and error.
|
|
func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error {
|
|
// We need to update the primary and wait for it to be ready
|
|
_, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeoutChan := time.NewTimer(timeoutWaitingForVolume)
|
|
defer timeoutChan.Stop()
|
|
tickChan := time.NewTicker(tickerWaitingForVolume)
|
|
defer tickChan.Stop()
|
|
|
|
for {
|
|
if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-timeoutChan.C:
|
|
return fmt.Errorf(
|
|
"Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v",
|
|
datasetUUID, primaryUUID, err,
|
|
)
|
|
case <-tickChan.C:
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func getVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool, error) {
|
|
if spec.Volume != nil && spec.Volume.Flocker != nil {
|
|
return spec.Volume.Flocker, spec.ReadOnly, nil
|
|
} else if spec.PersistentVolume != nil &&
|
|
spec.PersistentVolume.Spec.Flocker != nil {
|
|
return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil
|
|
}
|
|
|
|
return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type")
|
|
}
|
|
|
|
type flockerVolumeUnmounter struct {
|
|
*flockerVolume
|
|
}
|
|
|
|
var _ volume.Unmounter = &flockerVolumeUnmounter{}
|
|
|
|
func (c *flockerVolumeUnmounter) GetPath() string {
|
|
return getPath(c.podUID, c.volName, c.plugin.host)
|
|
}
|
|
|
|
// Unmounts the bind mount, and detaches the disk only if the PD
|
|
// resource was the last reference to that disk on the kubelet.
|
|
func (c *flockerVolumeUnmounter) TearDown() error {
|
|
return c.TearDownAt(c.GetPath())
|
|
}
|
|
|
|
// TearDownAt unmounts the bind mount
|
|
func (c *flockerVolumeUnmounter) TearDownAt(dir string) error {
|
|
return mount.CleanupMountPoint(dir, c.mounter, false)
|
|
}
|
|
|
|
func (p *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
|
|
return p.newDeleterInternal(spec, &flockerUtil{})
|
|
}
|
|
|
|
func (p *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) {
|
|
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil {
|
|
return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil")
|
|
}
|
|
return &flockerVolumeDeleter{
|
|
flockerVolume: &flockerVolume{
|
|
volName: spec.Name(),
|
|
datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName,
|
|
datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID,
|
|
manager: manager,
|
|
}}, nil
|
|
}
|
|
|
|
func (p *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
|
|
return p.newProvisionerInternal(options, &flockerUtil{})
|
|
}
|
|
|
|
func (p *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) {
|
|
return &flockerVolumeProvisioner{
|
|
flockerVolume: &flockerVolume{
|
|
manager: manager,
|
|
plugin: p,
|
|
},
|
|
options: options,
|
|
}, nil
|
|
}
|