#!/usr/local/sbin/charm-env python3 # Copyright 2015 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from charmhelpers.core.templating import render from charms.reactive import is_state from charmhelpers.core.hookenv import action_get from charmhelpers.core.hookenv import action_set from charmhelpers.core.hookenv import action_fail from subprocess import check_call from subprocess import check_output from subprocess import CalledProcessError from tempfile import TemporaryDirectory import json import re import os import sys os.environ['PATH'] += os.pathsep + os.path.join(os.sep, 'snap', 'bin') def main(): ''' Control logic to enlist Ceph RBD volumes as PersistentVolumes in Kubernetes. This will invoke the validation steps, and only execute if this script thinks the environment is 'sane' enough to provision volumes. ''' # k8s >= 1.10 uses CSI and doesn't directly create persistent volumes if get_version('kube-apiserver') >= (1, 10): print('This action is deprecated in favor of CSI creation of persistent volumes') print('in Kubernetes >= 1.10. Just create the PVC and a PV will be created') print('for you.') action_fail('Deprecated, just create PVC.') return # validate relationship pre-reqs before additional steps can be taken if not validate_relation(): print('Failed ceph relationship check') action_fail('Failed ceph relationship check') return if not is_ceph_healthy(): print('Ceph was not healthy.') action_fail('Ceph was not healthy.') return context = {} context['RBD_NAME'] = action_get_or_default('name').strip() context['RBD_SIZE'] = action_get_or_default('size') context['RBD_FS'] = action_get_or_default('filesystem').strip() context['PV_MODE'] = action_get_or_default('mode').strip() # Ensure we're not exceeding available space in the pool if not validate_space(context['RBD_SIZE']): return # Ensure our parameters match param_validation = validate_parameters(context['RBD_NAME'], context['RBD_FS'], context['PV_MODE']) if not param_validation == 0: return if not validate_unique_volume_name(context['RBD_NAME']): action_fail('Volume name collision detected. Volume creation aborted.') return context['monitors'] = get_monitors() # Invoke creation and format the mount device create_rbd_volume(context['RBD_NAME'], context['RBD_SIZE'], context['RBD_FS']) # Create a temporary workspace to render our persistentVolume template, and # enlist the RDB based PV we've just created with TemporaryDirectory() as active_working_path: temp_template = '{}/pv.yaml'.format(active_working_path) render('rbd-persistent-volume.yaml', temp_template, context) cmd = ['kubectl', 'create', '-f', temp_template] debug_command(cmd) check_call(cmd) def get_version(bin_name): """Get the version of an installed Kubernetes binary. :param str bin_name: Name of binary :return: 3-tuple version (maj, min, patch) Example:: >>> `get_version('kubelet') (1, 6, 0) """ cmd = '{} --version'.format(bin_name).split() version_string = check_output(cmd).decode('utf-8') return tuple(int(q) for q in re.findall("[0-9]+", version_string)[:3]) def action_get_or_default(key): ''' Convenience method to manage defaults since actions dont appear to properly support defaults ''' value = action_get(key) if value: return value elif key == 'filesystem': return 'xfs' elif key == 'size': return 0 elif key == 'mode': return "ReadWriteOnce" elif key == 'skip-size-check': return False else: return '' def create_rbd_volume(name, size, filesystem): ''' Create the RBD volume in Ceph. Then mount it locally to format it for the requested filesystem. :param name - The name of the RBD volume :param size - The size in MB of the volume :param filesystem - The type of filesystem to format the block device ''' # Create the rbd volume # $ rbd create foo --size 50 --image-feature layering command = ['rbd', 'create', '--size', '{}'.format(size), '--image-feature', 'layering', name] debug_command(command) check_call(command) # Lift the validation sequence to determine if we actually created the # rbd volume if validate_unique_volume_name(name): # we failed to create the RBD volume. whoops action_fail('RBD Volume not listed after creation.') print('Ceph RBD volume {} not found in rbd list'.format(name)) # hack, needs love if we're killing the process thread this deep in # the call stack. sys.exit(0) mount = ['rbd', 'map', name] debug_command(mount) device_path = check_output(mount).strip() try: format_command = ['mkfs.{}'.format(filesystem), device_path] debug_command(format_command) check_call(format_command) unmount = ['rbd', 'unmap', name] debug_command(unmount) check_call(unmount) except CalledProcessError: print('Failed to format filesystem and unmount. RBD created but not' ' enlisted.') action_fail('Failed to format filesystem and unmount.' ' RDB created but not enlisted.') def is_ceph_healthy(): ''' Probe the remote ceph cluster for health status ''' command = ['ceph', 'health'] debug_command(command) health_output = check_output(command) if b'HEALTH_OK' in health_output: return True else: return False def get_monitors(): ''' Parse the monitors out of /etc/ceph/ceph.conf ''' found_hosts = [] # This is kind of hacky. We should be piping this in from juju relations with open('/etc/ceph/ceph.conf', 'r') as ceph_conf: for line in ceph_conf.readlines(): if 'mon host' in line: # strip out the key definition hosts = line.lstrip('mon host = ').split(' ') for host in hosts: found_hosts.append(host) return found_hosts def get_available_space(): ''' Determine the space available in the RBD pool. Throw an exception if the RBD pool ('rbd') isn't found. ''' command = 'ceph df -f json'.split() debug_command(command) out = check_output(command).decode('utf-8') data = json.loads(out) for pool in data['pools']: if pool['name'] == 'rbd': return int(pool['stats']['max_avail'] / (1024 * 1024)) raise UnknownAvailableSpaceException('Unable to determine available space.') # noqa def validate_unique_volume_name(name): ''' Poll the CEPH-MON services to determine if we have a unique rbd volume name to use. If there is naming collisions, block the request for volume provisioning. :param name - The name of the RBD volume ''' command = ['rbd', 'list'] debug_command(command) raw_out = check_output(command) # Split the output on newlines # output spec: # $ rbd list # foo # foobar volume_list = raw_out.decode('utf-8').splitlines() for volume in volume_list: if volume.strip() == name: return False return True def validate_relation(): ''' Determine if we are related to ceph. If we are not, we should note this in the action output and fail this action run. We are relying on specific files in specific paths to be placed in order for this function to work. This method verifies those files are placed. ''' # TODO: Validate that the ceph-common package is installed if not is_state('ceph-storage.available'): message = 'Failed to detect connected ceph-mon' print(message) action_set({'pre-req.ceph-relation': message}) return False if not os.path.isfile('/etc/ceph/ceph.conf'): message = 'No Ceph configuration found in /etc/ceph/ceph.conf' print(message) action_set({'pre-req.ceph-configuration': message}) return False # TODO: Validate ceph key return True def validate_space(size): if action_get_or_default('skip-size-check'): return True available_space = get_available_space() if available_space < size: msg = 'Unable to allocate RBD of size {}MB, only {}MB are available.' action_fail(msg.format(size, available_space)) return False return True def validate_parameters(name, fs, mode): ''' Validate the user inputs to ensure they conform to what the action expects. This method will check the naming characters used for the rbd volume, ensure they have selected a fstype we are expecting and the mode against our whitelist ''' name_regex = '^[a-zA-z0-9][a-zA-Z0-9|-]' fs_whitelist = ['xfs', 'ext4'] # see http://kubernetes.io/docs/user-guide/persistent-volumes/#access-modes # for supported operations on RBD volumes. mode_whitelist = ['ReadWriteOnce', 'ReadOnlyMany'] fails = 0 if not re.match(name_regex, name): message = 'Validation failed for RBD volume-name' action_fail(message) fails = fails + 1 action_set({'validation.name': message}) if fs not in fs_whitelist: message = 'Validation failed for file system' action_fail(message) fails = fails + 1 action_set({'validation.filesystem': message}) if mode not in mode_whitelist: message = "Validation failed for mode" action_fail(message) fails = fails + 1 action_set({'validation.mode': message}) return fails def debug_command(cmd): ''' Print a debug statement of the command invoked ''' print("Invoking {}".format(cmd)) class UnknownAvailableSpaceException(Exception): pass if __name__ == '__main__': main()