# # Copyright(c) 2012-2019 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause-Clear # import subprocess import csv import re import os import stat import time # Casadm functionality class casadm: casadm_path = '/sbin/casadm' class result: def __init__(self, cmd): p = subprocess.run(cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.exit_code = p.returncode self.stdout = p.stdout self.stderr = p.stderr class CasadmError(Exception): def __init__(self, result): super(casadm.CasadmError, self).__init__('casadm error') self.result = result @classmethod def run_cmd(cls, cmd): result = cls.result(cmd) if result.exit_code != 0: raise cls.CasadmError(result) return result @classmethod def get_version(cls): cmd = [cls.casadm_path, '--version', '--output-format', 'csv'] return cls.run_cmd(cmd) @classmethod def list_caches(cls): cmd = [cls.casadm_path, '--list-caches', '--output-format', 'csv'] return cls.run_cmd(cmd) @classmethod def check_cache_device(cls, device): cmd = [cls.casadm_path, '--script', '--check-cache-device', '--cache-device', device] return cls.run_cmd(cmd) @classmethod def start_cache(cls, device, cache_id=None, cache_mode=None, cache_line_size=None, load=False, force=False): cmd = [cls.casadm_path, '--start-cache', '--cache-device', device] if cache_id: cmd += ['--cache-id', str(cache_id)] if cache_mode: cmd += ['--cache-mode', cache_mode] if cache_line_size: cmd += ['--cache-line-size', str(cache_line_size)] if load: cmd += ['--load'] if force: cmd += ['--force'] return cls.run_cmd(cmd) @classmethod def add_core(cls, device, cache_id, core_id=None, try_add=False): cmd = [cls.casadm_path, '--script', '--add-core', '--core-device', device, '--cache-id', str(cache_id)] if core_id is not None: cmd += ['--core-id', str(core_id)] if try_add: cmd += ['--try-add'] return cls.run_cmd(cmd) @classmethod def stop_cache(cls, cache_id, no_flush=False): cmd = [cls.casadm_path, '--stop-cache', '--cache-id', str(cache_id)] if no_flush: cmd += ['--no-data-flush'] return cls.run_cmd(cmd) @classmethod def remove_core(cls, cache_id, core_id, detach=False, force=False): cmd = [cls.casadm_path, '--script', '--remove-core', '--cache-id', str(cache_id), '--core-id', str(core_id)] if detach: cmd += ['--detach'] if force: cmd += ['--no-flush'] return cls.run_cmd(cmd) @classmethod def set_param(cls, namespace, cache_id, **kwargs): cmd = [cls.casadm_path, '--set-param', '--name', namespace, '--cache-id', str(cache_id)] for param, value in kwargs.items(): cmd += ['--'+param.replace('_', '-'), str(value)] return cls.run_cmd(cmd) @classmethod def get_params(cls, namespace, cache_id, **kwargs): cmd = [cls.casadm_path, '--get-param', '--name', namespace, '--cache-id', str(cache_id)] for param, value in kwargs.items(): cmd += ['--'+param.replace('_', '-'), str(value)] cmd += ['-o', 'csv'] return cls.run_cmd(cmd) @classmethod def flush_parameters(cls, cache_id, policy_type): cmd = [cls.casadm_path, '--flush-parameters', '--cache-id', str(cache_id), '--cleaning-policy-type', policy_type] return cls.run_cmd(cmd) @classmethod def io_class_load_config(cls, cache_id, ioclass_file): cmd = [cls.casadm_path, '--io-class', '--load-config', '--cache-id', str(cache_id), '--file', ioclass_file] return cls.run_cmd(cmd) @classmethod def start_upgrade(cls): cmd = [cls.casadm_path, '--script', '--upgrade-in-flight'] return cls.run_cmd(cmd) # Configuration file parser class cas_config(object): default_location = '/etc/opencas/opencas.conf' class ConflictingConfigException(ValueError): pass class AlreadyConfiguredException(ValueError): pass @staticmethod def get_by_id_path(path): for id_path in os.listdir('/dev/disk/by-id'): full_path = '/dev/disk/by-id/{0}'.format(id_path) if os.path.realpath(full_path) == os.path.realpath(path): return full_path raise ValueError('By-id device link not found for {0}'.format(path)) @staticmethod def check_block_device(path): if not os.path.exists(path) and path.startswith('/dev/cas'): return try: mode = os.stat(path).st_mode except: raise ValueError('{0} not found'.format(path)) if not stat.S_ISBLK(mode): raise ValueError('{0} is not block device'.format(path)) class cache_config(object): def __init__(self, cache_id, device, cache_mode, **params): self.cache_id = int(cache_id) self.device = device self.cache_mode = cache_mode self.params = params self.cores = dict() @classmethod def from_line(cls, line, allow_incomplete=False): values = line.split() if len(values) < 3: raise ValueError('Invalid cache configuration (too few columns)') elif len(values) > 4: raise ValueError('Invalid cache configuration (too many columns)') cache_id = int(values[0]) device = values[1] cache_mode = values[2].lower() params = dict() if len(values) > 3: for param in values[3].split(','): param_name, param_value = param.split('=') if param_name in params: raise ValueError('Invalid cache configuration (repeated parameter') params[param_name] = param_value cache_config = cls(cache_id, device, cache_mode, **params) cache_config.validate_config(False, allow_incomplete) return cache_config def validate_config(self, force, allow_incomplete=False): type(self).check_cache_id_valid(self.cache_id) self.check_recursive() self.check_cache_mode_valid(self.cache_mode) for param_name, param_value in self.params.items(): self.validate_parameter(param_name, param_value) if not allow_incomplete: cas_config.check_block_device(self.device) if not force: self.check_cache_device_empty() def validate_parameter(self, param_name, param_value): if param_name == 'ioclass_file': if not os.path.exists(param_value): raise ValueError('Invalid path to io_class file') elif param_name == 'cleaning_policy': self.check_cleaning_policy_valid(param_value) elif param_name == 'promotion_policy': self.check_promotion_policy_valid(param_value) elif param_name == 'cache_line_size': self.check_cache_line_size_valid(param_value) else: raise ValueError('{0} is invalid parameter name'.format(param_name)) @staticmethod def check_cache_id_valid(cache_id): if not 1 <= int(cache_id) <= 16384: raise ValueError('{0} is invalid cache id'.format(cache_id)) def check_cache_device_empty(self): try: result = casadm.run_cmd(['lsblk', '-o', 'NAME', '-l', '-n', self.device]) except: # lsblk returns non-0 if it can't probe for partitions # this means that we're probably dealing with atomic device # let it through return if len(list(filter(lambda a: a != '', result.stdout.split('\n')))) > 1: raise ValueError( 'Partitions found on device {0}. Use force option to ignore'. format(self.device)) def check_cache_mode_valid(self, cache_mode): if cache_mode.lower() not in ['wt', 'pt', 'wa', 'wb', 'wo']: raise ValueError('Invalid cache mode {0}'.format(cache_mode)) def check_cleaning_policy_valid(self, cleaning_policy): if cleaning_policy.lower() not in ['acp', 'alru', 'nop']: raise ValueError('{0} is invalid cleaning policy name'.format( cleaning_policy)) def check_promotion_policy_valid(self, promotion_policy): if promotion_policy.lower() not in ['always', 'nhit']: raise ValueError('{0} is invalid promotion policy name'.format( promotion_policy)) def check_cache_line_size_valid(self, cache_line_size): if cache_line_size not in ['4', '8', '16', '32', '64']: raise ValueError('{0} is invalid cache line size'.format( cache_line_size)) def check_recursive(self): if not self.device.startswith('/dev/cas'): return ids = self.device.split('/dev/cas')[1] device_cache_id, _ = ids.split('-') if int(device_cache_id) == self.cache_id: raise ValueError('Recursive configuration detected') def to_line(self): ret = '{0}\t{1}\t{2}'.format(self.cache_id, self.device, self.cache_mode) if len(self.params) > 0: i = 0 for param, value in self.params.items(): if i > 0: ret += ',' else: ret += '\t' ret += '{0}={1}'.format(param, value) i += 1 ret += '\n' return ret class core_config(object): def __init__(self, cache_id, core_id, path, **params): self.cache_id = int(cache_id) self.core_id = int(core_id) self.device = path self.params = params @classmethod def from_line(cls, line, allow_incomplete=False): values = line.split() if len(values) > 4: raise ValueError("Invalid core configuration (too many columns)") elif len(values) < 3: raise ValueError("Invalid core configuration (too few columns)") cache_id = int(values[0]) core_id = int(values[1]) device = values[2] params = dict() if len(values) > 3: for param in values[3].lower().split(","): param_name, param_value = param.split("=") if param_name in params: raise ValueError( "Invalid core configuration (repeated parameter)" ) params[param_name] = param_value core_config = cls(cache_id, core_id, device, **params) core_config.validate_config(allow_incomplete) return core_config def validate_config(self, allow_incomplete=False): self.check_core_id_valid() self.check_recursive() cas_config.cache_config.check_cache_id_valid(self.cache_id) for param_name, param_value in self.params.items(): self.validate_parameter(param_name, param_value) if not allow_incomplete: cas_config.check_block_device(self.device) def validate_parameter(self, param_name, param_value): if param_name == "lazy_startup": if param_value.lower() not in ["true", "false"]: raise ValueError( "{} is invalid value for '{}' core param".format( param_value, param_name ) ) else: raise ValueError("'{}' is invalid core param name".format(param_name)) def check_core_id_valid(self): if not 0 <= int(self.core_id) <= 4095: raise ValueError('{0} is invalid core id'.format(self.core_id)) def check_recursive(self): if not self.device.startswith('/dev/cas'): return ids = self.device.split('/dev/cas')[1] device_cache_id, _ = ids.split('-') if int(device_cache_id) == self.cache_id: raise ValueError('Recursive configuration detected') def to_line(self): ret = "{0}\t{1}\t{2}".format(self.cache_id, self.core_id, self.device) for i, (param, value) in enumerate(self.params.items()): ret += "," if i > 0 else "\t" ret += "{0}={1}".format(param, value) ret += "\n" return ret def __init__(self, caches=None, cores=None, version_tag=None): self.caches = caches if caches else dict() self.cores = cores if cores else list() self.version_tag = version_tag @classmethod def from_file(cls, config_file, allow_incomplete=False): section_caches = False section_cores = False try: with open(config_file, 'r') as conf: version_tag = conf.readline() if not re.findall(r'^version=.*$', version_tag): raise ValueError('No version tag found!') config = cls(version_tag=version_tag) for line in conf: line = line.split('#')[0].rstrip() if not line: continue if line == '[caches]': section_caches = True continue if line == '[cores]': section_caches = False section_cores = True continue if section_caches: cache = cas_config.cache_config.from_line(line, allow_incomplete) config.insert_cache(cache) elif section_cores: core = cas_config.core_config.from_line(line, allow_incomplete) config.insert_core(core) except ValueError: raise except IOError: raise Exception('Couldn\'t open config file') except: raise return config def insert_cache(self, new_cache_config): if new_cache_config.cache_id in self.caches: if (os.path.realpath(self.caches[new_cache_config.cache_id].device) != os.path.realpath(new_cache_config.device)): raise cas_config.ConflictingConfigException( 'Other cache device configured under this id') else: raise cas_config.AlreadyConfiguredException( 'Cache already configured') for cache_id, cache in self.caches.items(): if cache_id != new_cache_config.cache_id: if (os.path.realpath(new_cache_config.device) == os.path.realpath(cache.device)): raise cas_config.ConflictingConfigException( 'This cache device is already configured as a cache') for _, core in cache.cores.items(): if (os.path.realpath(core.device) == os.path.realpath(new_cache_config.device)): raise cas_config.ConflictingConfigException( 'This cache device is already configured as a core') try: new_cache_config.device = cas_config.get_by_id_path(new_cache_config.device) except: pass self.caches[new_cache_config.cache_id] = new_cache_config def insert_core(self, new_core_config): if new_core_config.cache_id not in self.caches: raise KeyError('Cache id {0} doesn\'t exist'.format(new_core_config.cache_id)) try: for cache_id, cache in self.caches.items(): if (os.path.realpath(cache.device) == os.path.realpath(new_core_config.device)): raise cas_config.ConflictingConfigException( 'Core device already configured as a cache') for core_id, core in cache.cores.items(): if (cache_id == new_core_config.cache_id and core_id == new_core_config.core_id): if (os.path.realpath(core.device) == os.path.realpath(new_core_config.device)): raise cas_config.AlreadyConfiguredException( 'Core already configured') else: raise cas_config.ConflictingConfigException( 'Other core device configured under this id') else: if (os.path.realpath(core.device) == os.path.realpath(new_core_config.device)): raise cas_config.ConflictingConfigException( 'This core device is already configured as a core') except KeyError: pass try: new_core_config.device = cas_config.get_by_id_path(new_core_config.device) except: pass self.caches[new_core_config.cache_id].cores[new_core_config.core_id] = new_core_config self.cores += [new_core_config] def is_empty(self): if len(self.caches) > 0 or len(self.cores) > 0: return False return True def write(self, config_file): try: with open(config_file, 'w') as conf: conf.write('{0}\n'.format(self.version_tag)) conf.write('# This config was automatically generated\n') conf.write('[caches]\n') for _, cache in self.caches.items(): conf.write(cache.to_line()) conf.write('\n[cores]\n') for core in self.cores: conf.write(core.to_line()) except: raise Exception('Couldn\'t write config file') def get_startup_cores(self): return [ core for core in self.cores if core.params.get("lazy_startup", "false") == "false" ] # Config helper functions def start_cache(cache, load, force=False): casadm.start_cache( device=cache.device, cache_id=cache.cache_id, cache_mode=cache.cache_mode, cache_line_size=cache.params.get('cache_line_size'), load=load, force=force) def configure_cache(cache): if "cleaning_policy" in cache.params: casadm.set_param( "cleaning", cache_id=cache.cache_id, policy=cache.params["cleaning_policy"] ) if "promotion_policy" in cache.params: casadm.set_param( "promotion", cache_id=cache.cache_id, policy=cache.params["promotion_policy"] ) if "ioclass_file" in cache.params: casadm.io_class_load_config( cache_id=cache.cache_id, ioclass_file=cache.params["ioclass_file"] ) def add_core(core, attach): casadm.add_core( device=core.device, cache_id=core.cache_id, core_id=core.core_id, try_add=attach) # Another helper functions def is_cache_started(cache_config): dev_list = get_caches_list() for dev in dev_list: if dev['type'] == 'cache' and int(dev['id']) == cache_config.cache_id: return True return False def is_core_added(core_config): dev_list = get_caches_list() cache_id = 0 for dev in dev_list: if dev['type'] == 'cache': cache_id = int(dev['id']) if (dev['type'] == 'core' and cache_id == core_config.cache_id and int(dev['id']) == core_config.core_id): return True return False def get_caches_list(): result = casadm.list_caches() return list(csv.DictReader(result.stdout.split('\n'))) def check_cache_device(device): result = casadm.check_cache_device(device) return list(csv.DictReader(result.stdout.split('\n')))[0] def get_cas_version(): version = casadm.get_version() ret = {} for line in version.stdout.split('\n')[1:]: try: component, version = line.split(',') except: continue ret[component] = version return ret class CompoundException(Exception): def __init__(self): super(CompoundException, self).__init__() self.exception_list = list() def __str__(self): s = "Multiple exceptions occured:\n" if len(self.exception_list) > 1 else "" for e in self.exception_list: s += '{0}\n'.format(str(e)) return s def add_exception(self, e): if type(e) is CompoundException: self.exception_list += e.exception_list else: self.exception_list += [e] def is_empty(self): return len(self.exception_list) == 0 def raise_nonempty(self): if self.is_empty(): return else: raise self def detach_core_recursive(cache_id, core_id, flush): # Catching exceptions is left to uppermost caller of detach_core_recursive # as the immediate caller that made a recursive call depends on the callee # to remove core and thus release reference to lower level cache volume. l_cache_id = '' for dev in get_caches_list(): if dev['type'] == 'cache': l_cache_id = dev['id'] elif dev['type'] == 'core' and dev['status'] == 'Active': if '/dev/cas{0}-{1}'.format(cache_id, core_id) in dev['disk']: detach_core_recursive(l_cache_id, dev['id'], flush) elif l_cache_id == cache_id and dev['id'] == core_id and dev['status'] != 'Active': return casadm.remove_core(cache_id, core_id, detach = True, force = not flush) def detach_all_cores(flush): error = CompoundException() try: dev_list = get_caches_list() except casadm.CasadmError as e: raise Exception('Unable to list caches. Reason:\n{0}'.format( e.result.stderr)) except: raise Exception('Unable to list caches.') for dev in dev_list: if dev['type'] == 'cache': cache_id = dev['id'] elif dev['type'] == 'core' and dev['status'] == "Active": # In case of exception we proceed with detaching remaining core instances # to gracefully shutdown as many cache instances as possible. try: detach_core_recursive(cache_id, dev['id'], flush) except casadm.CasadmError as e: error.add_exception(Exception( 'Unable to detach core {0}. Reason:\n{1}'.format( dev['disk'], e.result.stderr))) except: error.add_exception(Exception( 'Unable to detach core {0}.'.format(dev['disk']))) error.raise_nonempty() def stop_all_caches(flush): error = CompoundException() try: dev_list = get_caches_list() except casadm.CasadmError as e: raise Exception('Unable to list caches. Reason:\n{0}'.format( e.result.stderr)) except: raise Exception('Unable to list caches.') for dev in dev_list: if dev['type'] == 'cache': # In case of exception we proceed with stopping subsequent cache instances # to gracefully shutdown as many cache instances as possible. try: casadm.stop_cache(dev['id'], not flush) except casadm.CasadmError as e: error.add_exception(Exception( 'Unable to stop cache {0}. Reason:\n{1}'.format( dev['disk'], e.result.stderr))) except: error.add_exception(Exception( 'Unable to stop cache {0}.'.format(dev['disk']))) error.raise_nonempty() def stop(flush): error = CompoundException() try: detach_all_cores(flush) except Exception as e: error.add_exception(e) try: stop_all_caches(False) except Exception as e: error.add_exception(e) error.raise_nonempty() def get_devices_state(): device_list = get_caches_list() devices = {"core_pool": [], "caches": {}, "cores": {}} core_pool = False prev_cache_id = -1 for device in device_list: if device["type"] == "core pool": core_pool = True continue if device["type"] == "cache": core_pool = False prev_cache_id = int(device["id"]) devices["caches"].update( { int(device["id"]): { "device": device["disk"], "status": device["status"], } } ) elif device["type"] == "core": core = {"device": device["disk"], "status": device["status"]} if core_pool: devices["core_pool"].append(core) else: core.update({"cache_id": prev_cache_id}) devices["cores"].update( {(prev_cache_id, int(device["id"])): core} ) return devices def wait_for_cas_ctrl(): for i in range(30): # timeout 30s if os.path.exists('/dev/cas_ctrl'): return time.sleep(1) def wait_for_startup(timeout=300, interval=5): try: config = cas_config.from_file( cas_config.default_location, allow_incomplete=True ) except Exception as e: raise Exception("Unable to load opencas config. Reason: {0}".format(str(e))) stop_time = time.time() + int(timeout) not_initialized = None target_core_state = config.get_startup_cores() while stop_time > time.time(): not_initialized = [] runtime_core_state = get_devices_state()["cores"] for core in target_core_state: runtime_state = runtime_core_state.get((core.cache_id, core.core_id), None) if not runtime_state or runtime_state["status"] != "Active": not_initialized.append(core) if not not_initialized: break time.sleep(interval) return not_initialized