|
|
|
@@ -26,6 +26,7 @@ import (
|
|
|
|
|
gcli "github.com/heketi/heketi/client/api/go-client"
|
|
|
|
|
gapi "github.com/heketi/heketi/pkg/glusterfs/api"
|
|
|
|
|
"k8s.io/kubernetes/pkg/api"
|
|
|
|
|
"k8s.io/kubernetes/pkg/api/errors"
|
|
|
|
|
"k8s.io/kubernetes/pkg/api/resource"
|
|
|
|
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
|
|
|
"k8s.io/kubernetes/pkg/types"
|
|
|
|
@@ -55,9 +56,10 @@ var _ volume.Deleter = &glusterfsVolumeDeleter{}
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
glusterfsPluginName = "kubernetes.io/glusterfs"
|
|
|
|
|
volprefix = "vol_"
|
|
|
|
|
replicacount = 3
|
|
|
|
|
durabilitytype = "replicate"
|
|
|
|
|
volPrefix = "vol_"
|
|
|
|
|
dynamicEpSvcPrefix = "gluster-dynamic-"
|
|
|
|
|
replicaCount = 3
|
|
|
|
|
durabilityType = "replicate"
|
|
|
|
|
secretKeyName = "key" // key name used in secret
|
|
|
|
|
annGlusterURL = "glusterfs.kubernetes.io/url"
|
|
|
|
|
annGlusterSecretName = "glusterfs.kubernetes.io/secretname"
|
|
|
|
@@ -111,6 +113,7 @@ func (plugin *glusterfsPlugin) GetAccessModes() []api.PersistentVolumeAccessMode
|
|
|
|
|
func (plugin *glusterfsPlugin) NewMounter(spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
|
|
|
|
|
source, _ := plugin.getGlusterVolumeSource(spec)
|
|
|
|
|
ep_name := source.EndpointsName
|
|
|
|
|
// PVC/POD is in same ns.
|
|
|
|
|
ns := pod.Namespace
|
|
|
|
|
ep, err := plugin.host.GetKubeClient().Core().Endpoints(ns).Get(ep_name)
|
|
|
|
|
if err != nil {
|
|
|
|
@@ -295,20 +298,29 @@ func (b *glusterfsMounter) setUpAtInternal(dir string) error {
|
|
|
|
|
options = append(options, "log-level=ERROR")
|
|
|
|
|
options = append(options, "log-file="+log)
|
|
|
|
|
|
|
|
|
|
addr := make(map[string]struct{})
|
|
|
|
|
for _, s := range b.hosts.Subsets {
|
|
|
|
|
for _, a := range s.Addresses {
|
|
|
|
|
addr[a.IP] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
var addrlist []string
|
|
|
|
|
if b.hosts == nil {
|
|
|
|
|
return fmt.Errorf("glusterfs: endpoint is nil")
|
|
|
|
|
} else {
|
|
|
|
|
addr := make(map[string]struct{})
|
|
|
|
|
if b.hosts.Subsets != nil {
|
|
|
|
|
for _, s := range b.hosts.Subsets {
|
|
|
|
|
for _, a := range s.Addresses {
|
|
|
|
|
addr[a.IP] = struct{}{}
|
|
|
|
|
addrlist = append(addrlist, a.IP)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Avoid mount storm, pick a host randomly.
|
|
|
|
|
// Iterate all hosts until mount succeeds.
|
|
|
|
|
for hostIP := range addr {
|
|
|
|
|
errs = b.mounter.Mount(hostIP+":"+b.path, dir, "glusterfs", options)
|
|
|
|
|
if errs == nil {
|
|
|
|
|
glog.Infof("glusterfs: successfully mounted %s", dir)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Avoid mount storm, pick a host randomly.
|
|
|
|
|
// Iterate all hosts until mount succeeds.
|
|
|
|
|
for _, ip := range addrlist {
|
|
|
|
|
errs = b.mounter.Mount(ip+":"+b.path, dir, "glusterfs", options)
|
|
|
|
|
if errs == nil {
|
|
|
|
|
glog.Infof("glusterfs: successfully mounted %s", dir)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -351,7 +363,6 @@ func (plugin *glusterfsPlugin) newProvisionerInternal(options volume.VolumeOptio
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type provisioningConfig struct {
|
|
|
|
|
endpoint string
|
|
|
|
|
url string
|
|
|
|
|
user string
|
|
|
|
|
userKey string
|
|
|
|
@@ -401,8 +412,7 @@ func (d *glusterfsVolumeDeleter) Delete() error {
|
|
|
|
|
var err error
|
|
|
|
|
glog.V(2).Infof("glusterfs: delete volume: %s ", d.glusterfsMounter.path)
|
|
|
|
|
volumeName := d.glusterfsMounter.path
|
|
|
|
|
volumeId := dstrings.TrimPrefix(volumeName, volprefix)
|
|
|
|
|
|
|
|
|
|
volumeId := dstrings.TrimPrefix(volumeName, volPrefix)
|
|
|
|
|
class, err := volutil.GetClassForVolume(d.plugin.host.GetKubeClient(), d.spec)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
@@ -423,12 +433,34 @@ func (d *glusterfsVolumeDeleter) Delete() error {
|
|
|
|
|
}
|
|
|
|
|
err = cli.VolumeDelete(volumeId)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.V(4).Infof("glusterfs: error when deleting the volume :%s", err)
|
|
|
|
|
glog.Errorf("glusterfs: error when deleting the volume :%v", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
glog.V(2).Infof("glusterfs: volume %s deleted successfully", volumeName)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
//Deleter takes endpoint and endpointnamespace from pv spec.
|
|
|
|
|
pvSpec := d.spec.Spec
|
|
|
|
|
var dynamicEndpoint, dynamicNamespace string
|
|
|
|
|
if pvSpec.ClaimRef == nil {
|
|
|
|
|
glog.Errorf("glusterfs: ClaimRef is nil")
|
|
|
|
|
return fmt.Errorf("glusterfs: ClaimRef is nil")
|
|
|
|
|
}
|
|
|
|
|
if pvSpec.ClaimRef.Namespace == "" {
|
|
|
|
|
glog.Errorf("glusterfs: namespace is nil")
|
|
|
|
|
return fmt.Errorf("glusterfs: namespace is nil")
|
|
|
|
|
}
|
|
|
|
|
dynamicNamespace = pvSpec.ClaimRef.Namespace
|
|
|
|
|
if pvSpec.Glusterfs.EndpointsName != "" {
|
|
|
|
|
dynamicEndpoint = pvSpec.Glusterfs.EndpointsName
|
|
|
|
|
}
|
|
|
|
|
glog.V(3).Infof("glusterfs: dynamic namespace and endpoint : [%v/%v]", dynamicNamespace, dynamicEndpoint)
|
|
|
|
|
err = d.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: error when deleting endpoint/service :%v", err)
|
|
|
|
|
} else {
|
|
|
|
|
glog.V(1).Infof("glusterfs: [%v/%v] deleted successfully ", dynamicNamespace, dynamicEndpoint)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *glusterfsVolumeProvisioner) Provision() (*api.PersistentVolume, error) {
|
|
|
|
@@ -448,8 +480,8 @@ func (r *glusterfsVolumeProvisioner) Provision() (*api.PersistentVolume, error)
|
|
|
|
|
glog.V(4).Infof("glusterfs: creating volume with configuration %+v", r.provisioningConfig)
|
|
|
|
|
glusterfs, sizeGB, err := r.CreateVolume()
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: create volume err: %s.", err)
|
|
|
|
|
return nil, fmt.Errorf("glusterfs: create volume err: %s.", err)
|
|
|
|
|
glog.Errorf("glusterfs: create volume err: %v.", err)
|
|
|
|
|
return nil, fmt.Errorf("glusterfs: create volume err: %v.", err)
|
|
|
|
|
}
|
|
|
|
|
pv := new(api.PersistentVolume)
|
|
|
|
|
pv.Spec.PersistentVolumeSource.Glusterfs = glusterfs
|
|
|
|
@@ -478,20 +510,117 @@ func (p *glusterfsVolumeProvisioner) CreateVolume() (r *api.GlusterfsVolumeSourc
|
|
|
|
|
glog.Errorf("glusterfs: failed to create gluster rest client")
|
|
|
|
|
return nil, 0, fmt.Errorf("failed to create gluster REST client, REST server authentication failed")
|
|
|
|
|
}
|
|
|
|
|
volumeReq := &gapi.VolumeCreateRequest{Size: sz, Durability: gapi.VolumeDurabilityInfo{Type: durabilitytype, Replicate: gapi.ReplicaDurability{Replica: replicacount}}}
|
|
|
|
|
volumeReq := &gapi.VolumeCreateRequest{Size: sz, Durability: gapi.VolumeDurabilityInfo{Type: durabilityType, Replicate: gapi.ReplicaDurability{Replica: replicaCount}}}
|
|
|
|
|
volume, err := cli.VolumeCreate(volumeReq)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: error creating volume %s ", err)
|
|
|
|
|
glog.Errorf("glusterfs: error creating volume %v ", err)
|
|
|
|
|
return nil, 0, fmt.Errorf("error creating volume %v", err)
|
|
|
|
|
}
|
|
|
|
|
glog.V(1).Infof("glusterfs: volume with size: %d and name: %s created", volume.Size, volume.Name)
|
|
|
|
|
clusterinfo, err := cli.ClusterInfo(volume.Cluster)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: failed to get cluster details")
|
|
|
|
|
return nil, 0, fmt.Errorf("failed to get cluster details %v", err)
|
|
|
|
|
}
|
|
|
|
|
// For the above dynamically provisioned volume, we gather the list of node IPs
|
|
|
|
|
// of the cluster on which provisioned volume belongs to, as there can be multiple
|
|
|
|
|
// clusters.
|
|
|
|
|
var dynamicHostIps []string
|
|
|
|
|
for _, node := range clusterinfo.Nodes {
|
|
|
|
|
nodei, err := cli.NodeInfo(string(node))
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: failed to get hostip %v ", err)
|
|
|
|
|
return nil, 0, fmt.Errorf("failed to get hostip %v", err)
|
|
|
|
|
}
|
|
|
|
|
ipaddr := dstrings.Join(nodei.NodeAddRequest.Hostnames.Storage, "")
|
|
|
|
|
dynamicHostIps = append(dynamicHostIps, ipaddr)
|
|
|
|
|
}
|
|
|
|
|
glog.V(3).Infof("glusterfs: hostlist :%v", dynamicHostIps)
|
|
|
|
|
if len(dynamicHostIps) == 0 {
|
|
|
|
|
glog.Errorf("glusterfs: no hosts found %v ", err)
|
|
|
|
|
return nil, 0, fmt.Errorf("no hosts found %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The 'endpointname' is created in form of 'gluster-dynamic-<claimname>'.
|
|
|
|
|
// createEndpointService() checks for this 'endpoint' existence in PVC's namespace and
|
|
|
|
|
// If not found, it create an endpoint and svc using the IPs we dynamically picked at time
|
|
|
|
|
// of volume creation.
|
|
|
|
|
epServiceName := dynamicEpSvcPrefix + p.options.PVC.Name
|
|
|
|
|
epNamespace := p.options.PVC.Namespace
|
|
|
|
|
endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC.Name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: failed to create endpoint/service")
|
|
|
|
|
return nil, 0, fmt.Errorf("failed to create endpoint/service %v", err)
|
|
|
|
|
}
|
|
|
|
|
glog.V(3).Infof("glusterfs: dynamic ep %v and svc : %v ", endpoint, service)
|
|
|
|
|
return &api.GlusterfsVolumeSource{
|
|
|
|
|
EndpointsName: p.endpoint,
|
|
|
|
|
EndpointsName: endpoint.Name,
|
|
|
|
|
Path: volume.Name,
|
|
|
|
|
ReadOnly: false,
|
|
|
|
|
}, sz, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, hostips []string, pvcname string) (endpoint *api.Endpoints, service *api.Service, err error) {
|
|
|
|
|
|
|
|
|
|
addrlist := make([]api.EndpointAddress, len(hostips))
|
|
|
|
|
for i, v := range hostips {
|
|
|
|
|
addrlist[i].IP = v
|
|
|
|
|
}
|
|
|
|
|
endpoint = &api.Endpoints{
|
|
|
|
|
ObjectMeta: api.ObjectMeta{
|
|
|
|
|
Namespace: namespace,
|
|
|
|
|
Name: epServiceName,
|
|
|
|
|
Labels: map[string]string{
|
|
|
|
|
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Subsets: []api.EndpointSubset{{
|
|
|
|
|
Addresses: addrlist,
|
|
|
|
|
Ports: []api.EndpointPort{{Port: 1, Protocol: "TCP"}},
|
|
|
|
|
}},
|
|
|
|
|
}
|
|
|
|
|
_, err = p.plugin.host.GetKubeClient().Core().Endpoints(namespace).Create(endpoint)
|
|
|
|
|
if err != nil && errors.IsAlreadyExists(err) {
|
|
|
|
|
glog.V(1).Infof("glusterfs: endpoint [%s] already exist in namespace [%s]", endpoint, namespace)
|
|
|
|
|
err = nil
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: failed to create endpoint %v", err)
|
|
|
|
|
return nil, nil, fmt.Errorf("error creating endpoint %v", err)
|
|
|
|
|
}
|
|
|
|
|
service = &api.Service{
|
|
|
|
|
ObjectMeta: api.ObjectMeta{
|
|
|
|
|
Name: epServiceName,
|
|
|
|
|
Namespace: namespace,
|
|
|
|
|
Labels: map[string]string{
|
|
|
|
|
"gluster.kubernetes.io/provisioned-for-pvc": pvcname,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Spec: api.ServiceSpec{
|
|
|
|
|
Ports: []api.ServicePort{
|
|
|
|
|
{Protocol: "TCP", Port: 1}}}}
|
|
|
|
|
_, err = p.plugin.host.GetKubeClient().Core().Services(namespace).Create(service)
|
|
|
|
|
if err != nil && errors.IsAlreadyExists(err) {
|
|
|
|
|
glog.V(1).Infof("glusterfs: service [%s] already exist in namespace [%s]", service, namespace)
|
|
|
|
|
err = nil
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: failed to create service %v", err)
|
|
|
|
|
return nil, nil, fmt.Errorf("error creating service %v", err)
|
|
|
|
|
}
|
|
|
|
|
return endpoint, service, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *glusterfsVolumeDeleter) deleteEndpointService(namespace string, epServiceName string) (err error) {
|
|
|
|
|
err = d.plugin.host.GetKubeClient().Core().Services(namespace).Delete(epServiceName, nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
glog.Errorf("glusterfs: failed to delete service %s in namespace %s error %v", epServiceName, namespace, err)
|
|
|
|
|
return fmt.Errorf("error deleting service %v in namespace [%s]", err, namespace)
|
|
|
|
|
}
|
|
|
|
|
glog.V(1).Infof("glusterfs: service/endpoint [%s] in namespace [%s] deleted successfully", epServiceName, namespace)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// parseSecret finds a given Secret instance and reads user password from it.
|
|
|
|
|
func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) {
|
|
|
|
|
secretMap, err := volutil.GetSecret(namespace, secretName, kubeClient)
|
|
|
|
@@ -521,8 +650,6 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa
|
|
|
|
|
authEnabled := true
|
|
|
|
|
for k, v := range params {
|
|
|
|
|
switch dstrings.ToLower(k) {
|
|
|
|
|
case "endpoint":
|
|
|
|
|
cfg.endpoint = v
|
|
|
|
|
case "resturl":
|
|
|
|
|
cfg.url = v
|
|
|
|
|
case "restuser":
|
|
|
|
@@ -543,9 +670,6 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa
|
|
|
|
|
if len(cfg.url) == 0 {
|
|
|
|
|
return nil, fmt.Errorf("StorageClass for provisioner %s must contain 'resturl' parameter", glusterfsPluginName)
|
|
|
|
|
}
|
|
|
|
|
if len(cfg.endpoint) == 0 {
|
|
|
|
|
return nil, fmt.Errorf("StorageClass for provisioner %s must contain 'endpoint' parameter", glusterfsPluginName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !authEnabled {
|
|
|
|
|
cfg.user = ""
|
|
|
|
|