open-cas-linux/utils/opencas.py
Jan Musial aaedfb35dd Change startup procedure
Current startup procedure works on an assumption that we will
deal with asynchronously appearing devices in asynchronous way
(udev rules) and synchronous events in the system (systemd units)
won't interfere. If we would break anything (mounts) we would just
take those units and restart them. This tactic was working as long
as resetting systemd units took reasonable time.

As hackish as it sounds it worked in all systems that the software
has been validated on. Unfortunately it stopped working because
of *.mount units taking MUCH longer time to restart even on
mainstream OSes, so it's time to change.

This change implements open-cas systemd service which will wait
synchronously with systemd bootup process for all required Open CAS
devices to start. If they don't we fail the boot process just as
failing mounts would. We also make sure that this process takes place
before any mounts (aside from root FS and other critical FS's) are
even attempted. Now opencas-mount-utility can be discarded.

To override this behaviour on per-core basis you can specify
lazy_startup=true option in opencas.conf.

Signed-off-by: Jan Musial <jan.musial@intel.com>
2019-11-14 12:20:08 +01:00

799 lines
27 KiB
Python

#
# Copyright(c) 2012-2019 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause-Clear
#
import subprocess
import csv
import re
import os
import stat
import time
# Casadm functionality
class casadm:
casadm_path = '/sbin/casadm'
class result:
def __init__(self, cmd):
p = subprocess.run(cmd, universal_newlines=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.exit_code = p.returncode
self.stdout = p.stdout
self.stderr = p.stderr
class CasadmError(Exception):
def __init__(self, result):
super(casadm.CasadmError, self).__init__('casadm error')
self.result = result
@classmethod
def run_cmd(cls, cmd):
result = cls.result(cmd)
if result.exit_code != 0:
raise cls.CasadmError(result)
return result
@classmethod
def get_version(cls):
cmd = [cls.casadm_path,
'--version',
'--output-format', 'csv']
return cls.run_cmd(cmd)
@classmethod
def list_caches(cls):
cmd = [cls.casadm_path,
'--list-caches',
'--output-format', 'csv']
return cls.run_cmd(cmd)
@classmethod
def check_cache_device(cls, device):
cmd = [cls.casadm_path,
'--script',
'--check-cache-device',
'--cache-device', device]
return cls.run_cmd(cmd)
@classmethod
def start_cache(cls, device, cache_id=None, cache_mode=None,
cache_line_size=None, load=False, force=False):
cmd = [cls.casadm_path,
'--start-cache',
'--cache-device', device]
if cache_id:
cmd += ['--cache-id', str(cache_id)]
if cache_mode:
cmd += ['--cache-mode', cache_mode]
if cache_line_size:
cmd += ['--cache-line-size', str(cache_line_size)]
if load:
cmd += ['--load']
if force:
cmd += ['--force']
return cls.run_cmd(cmd)
@classmethod
def add_core(cls, device, cache_id, core_id=None, try_add=False):
cmd = [cls.casadm_path,
'--script',
'--add-core',
'--core-device', device,
'--cache-id', str(cache_id)]
if core_id is not None:
cmd += ['--core-id', str(core_id)]
if try_add:
cmd += ['--try-add']
return cls.run_cmd(cmd)
@classmethod
def stop_cache(cls, cache_id, no_flush=False):
cmd = [cls.casadm_path,
'--stop-cache',
'--cache-id', str(cache_id)]
if no_flush:
cmd += ['--no-data-flush']
return cls.run_cmd(cmd)
@classmethod
def remove_core(cls, cache_id, core_id, detach=False, force=False):
cmd = [cls.casadm_path,
'--script',
'--remove-core',
'--cache-id', str(cache_id),
'--core-id', str(core_id)]
if detach:
cmd += ['--detach']
if force:
cmd += ['--no-flush']
return cls.run_cmd(cmd)
@classmethod
def set_param(cls, namespace, cache_id, **kwargs):
cmd = [cls.casadm_path,
'--set-param', '--name', namespace,
'--cache-id', str(cache_id)]
for param, value in kwargs.items():
cmd += ['--'+param.replace('_', '-'), str(value)]
return cls.run_cmd(cmd)
@classmethod
def get_params(cls, namespace, cache_id, **kwargs):
cmd = [cls.casadm_path,
'--get-param', '--name', namespace,
'--cache-id', str(cache_id)]
for param, value in kwargs.items():
cmd += ['--'+param.replace('_', '-'), str(value)]
cmd += ['-o', 'csv']
return cls.run_cmd(cmd)
@classmethod
def flush_parameters(cls, cache_id, policy_type):
cmd = [cls.casadm_path,
'--flush-parameters',
'--cache-id', str(cache_id),
'--cleaning-policy-type', policy_type]
return cls.run_cmd(cmd)
@classmethod
def io_class_load_config(cls, cache_id, ioclass_file):
cmd = [cls.casadm_path,
'--io-class',
'--load-config',
'--cache-id', str(cache_id),
'--file', ioclass_file]
return cls.run_cmd(cmd)
# Configuration file parser
class cas_config(object):
default_location = '/etc/opencas/opencas.conf'
class ConflictingConfigException(ValueError):
pass
class AlreadyConfiguredException(ValueError):
pass
@staticmethod
def get_by_id_path(path):
for id_path in os.listdir('/dev/disk/by-id'):
full_path = '/dev/disk/by-id/{0}'.format(id_path)
if os.path.realpath(full_path) == os.path.realpath(path):
return full_path
raise ValueError('By-id device link not found for {0}'.format(path))
@staticmethod
def check_block_device(path):
if not os.path.exists(path) and path.startswith('/dev/cas'):
return
try:
mode = os.stat(path).st_mode
except:
raise ValueError('{0} not found'.format(path))
if not stat.S_ISBLK(mode):
raise ValueError('{0} is not block device'.format(path))
class cache_config(object):
def __init__(self, cache_id, device, cache_mode, **params):
self.cache_id = int(cache_id)
self.device = device
self.cache_mode = cache_mode
self.params = params
self.cores = dict()
@classmethod
def from_line(cls, line, allow_incomplete=False):
values = line.split()
if len(values) < 3:
raise ValueError('Invalid cache configuration (too few columns)')
elif len(values) > 4:
raise ValueError('Invalid cache configuration (too many columns)')
cache_id = int(values[0])
device = values[1]
cache_mode = values[2].lower()
params = dict()
if len(values) > 3:
for param in values[3].split(','):
param_name, param_value = param.split('=')
if param_name in params:
raise ValueError('Invalid cache configuration (repeated parameter')
params[param_name] = param_value
cache_config = cls(cache_id, device, cache_mode, **params)
cache_config.validate_config(False, allow_incomplete)
return cache_config
def validate_config(self, force, allow_incomplete=False):
type(self).check_cache_id_valid(self.cache_id)
self.check_recursive()
self.check_cache_mode_valid(self.cache_mode)
for param_name, param_value in self.params.items():
self.validate_parameter(param_name, param_value)
if not allow_incomplete:
cas_config.check_block_device(self.device)
if not force:
self.check_cache_device_empty()
def validate_parameter(self, param_name, param_value):
if param_name == 'ioclass_file':
if not os.path.exists(param_value):
raise ValueError('Invalid path to io_class file')
elif param_name == 'cleaning_policy':
self.check_cleaning_policy_valid(param_value)
elif param_name == 'promotion_policy':
self.check_promotion_policy_valid(param_value)
elif param_name == 'cache_line_size':
self.check_cache_line_size_valid(param_value)
else:
raise ValueError('{0} is invalid parameter name'.format(param_name))
@staticmethod
def check_cache_id_valid(cache_id):
if not 1 <= int(cache_id) <= 16384:
raise ValueError('{0} is invalid cache id'.format(cache_id))
def check_cache_device_empty(self):
try:
result = casadm.run_cmd(['lsblk', '-o', 'NAME', '-l', '-n', self.device])
except:
# lsblk returns non-0 if it can't probe for partitions
# this means that we're probably dealing with atomic device
# let it through
return
if len(list(filter(lambda a: a != '', result.stdout.split('\n')))) > 1:
raise ValueError(
'Partitions found on device {0}. Use force option to ignore'.
format(self.device))
def check_cache_mode_valid(self, cache_mode):
if cache_mode.lower() not in ['wt', 'pt', 'wa', 'wb', 'wo']:
raise ValueError('Invalid cache mode {0}'.format(cache_mode))
def check_cleaning_policy_valid(self, cleaning_policy):
if cleaning_policy.lower() not in ['acp', 'alru', 'nop']:
raise ValueError('{0} is invalid cleaning policy name'.format(
cleaning_policy))
def check_promotion_policy_valid(self, promotion_policy):
if promotion_policy.lower() not in ['always', 'nhit']:
raise ValueError('{0} is invalid promotion policy name'.format(
promotion_policy))
def check_cache_line_size_valid(self, cache_line_size):
if cache_line_size not in ['4', '8', '16', '32', '64']:
raise ValueError('{0} is invalid cache line size'.format(
cache_line_size))
def check_recursive(self):
if not self.device.startswith('/dev/cas'):
return
ids = self.device.split('/dev/cas')[1]
device_cache_id, _ = ids.split('-')
if int(device_cache_id) == self.cache_id:
raise ValueError('Recursive configuration detected')
def to_line(self):
ret = '{0}\t{1}\t{2}'.format(self.cache_id, self.device, self.cache_mode)
if len(self.params) > 0:
i = 0
for param, value in self.params.items():
if i > 0:
ret += ','
else:
ret += '\t'
ret += '{0}={1}'.format(param, value)
i += 1
ret += '\n'
return ret
class core_config(object):
def __init__(self, cache_id, core_id, path, **params):
self.cache_id = int(cache_id)
self.core_id = int(core_id)
self.device = path
self.params = params
@classmethod
def from_line(cls, line, allow_incomplete=False):
values = line.split()
if len(values) > 4:
raise ValueError("Invalid core configuration (too many columns)")
elif len(values) < 3:
raise ValueError("Invalid core configuration (too few columns)")
cache_id = int(values[0])
core_id = int(values[1])
device = values[2]
params = dict()
if len(values) > 3:
for param in values[3].lower().split(","):
param_name, param_value = param.split("=")
if param_name in params:
raise ValueError(
"Invalid core configuration (repeated parameter)"
)
params[param_name] = param_value
core_config = cls(cache_id, core_id, device, **params)
core_config.validate_config(allow_incomplete)
return core_config
def validate_config(self, allow_incomplete=False):
self.check_core_id_valid()
self.check_recursive()
cas_config.cache_config.check_cache_id_valid(self.cache_id)
for param_name, param_value in self.params.items():
self.validate_parameter(param_name, param_value)
if not allow_incomplete:
cas_config.check_block_device(self.device)
def validate_parameter(self, param_name, param_value):
if param_name == "lazy_startup":
if param_value.lower() not in ["true", "false"]:
raise ValueError(
"{} is invalid value for '{}' core param".format(
param_value, param_name
)
)
else:
raise ValueError("'{}' is invalid core param name".format(param_name))
def check_core_id_valid(self):
if not 0 <= int(self.core_id) <= 4095:
raise ValueError('{0} is invalid core id'.format(self.core_id))
def check_recursive(self):
if not self.device.startswith('/dev/cas'):
return
ids = self.device.split('/dev/cas')[1]
device_cache_id, _ = ids.split('-')
if int(device_cache_id) == self.cache_id:
raise ValueError('Recursive configuration detected')
def to_line(self):
ret = "{0}\t{1}\t{2}".format(self.cache_id, self.core_id, self.device)
for i, (param, value) in enumerate(self.params.items()):
ret += "," if i > 0 else "\t"
ret += "{0}={1}".format(param, value)
ret += "\n"
return ret
def __init__(self, caches=None, cores=None, version_tag=None):
self.caches = caches if caches else dict()
self.cores = cores if cores else list()
self.version_tag = version_tag
@classmethod
def from_file(cls, config_file, allow_incomplete=False):
section_caches = False
section_cores = False
try:
with open(config_file, 'r') as conf:
version_tag = conf.readline()
if not re.findall(r'^version=.*$', version_tag):
raise ValueError('No version tag found!')
config = cls(version_tag=version_tag)
for line in conf:
line = line.split('#')[0].rstrip()
if not line:
continue
if line == '[caches]':
section_caches = True
continue
if line == '[cores]':
section_caches = False
section_cores = True
continue
if section_caches:
cache = cas_config.cache_config.from_line(line, allow_incomplete)
config.insert_cache(cache)
elif section_cores:
core = cas_config.core_config.from_line(line, allow_incomplete)
config.insert_core(core)
except ValueError:
raise
except IOError:
raise Exception('Couldn\'t open config file')
except:
raise
return config
def insert_cache(self, new_cache_config):
if new_cache_config.cache_id in self.caches:
if (os.path.realpath(self.caches[new_cache_config.cache_id].device)
!= os.path.realpath(new_cache_config.device)):
raise cas_config.ConflictingConfigException(
'Other cache device configured under this id')
else:
raise cas_config.AlreadyConfiguredException(
'Cache already configured')
for cache_id, cache in self.caches.items():
if cache_id != new_cache_config.cache_id:
if (os.path.realpath(new_cache_config.device)
== os.path.realpath(cache.device)):
raise cas_config.ConflictingConfigException(
'This cache device is already configured as a cache')
for _, core in cache.cores.items():
if (os.path.realpath(core.device)
== os.path.realpath(new_cache_config.device)):
raise cas_config.ConflictingConfigException(
'This cache device is already configured as a core')
try:
new_cache_config.device = cas_config.get_by_id_path(new_cache_config.device)
except:
pass
self.caches[new_cache_config.cache_id] = new_cache_config
def insert_core(self, new_core_config):
if new_core_config.cache_id not in self.caches:
raise KeyError('Cache id {0} doesn\'t exist'.format(new_core_config.cache_id))
try:
for cache_id, cache in self.caches.items():
if (os.path.realpath(cache.device)
== os.path.realpath(new_core_config.device)):
raise cas_config.ConflictingConfigException(
'Core device already configured as a cache')
for core_id, core in cache.cores.items():
if (cache_id == new_core_config.cache_id
and core_id == new_core_config.core_id):
if (os.path.realpath(core.device)
== os.path.realpath(new_core_config.device)):
raise cas_config.AlreadyConfiguredException(
'Core already configured')
else:
raise cas_config.ConflictingConfigException(
'Other core device configured under this id')
else:
if (os.path.realpath(core.device)
== os.path.realpath(new_core_config.device)):
raise cas_config.ConflictingConfigException(
'This core device is already configured as a core')
except KeyError:
pass
try:
new_core_config.device = cas_config.get_by_id_path(new_core_config.device)
except:
pass
self.caches[new_core_config.cache_id].cores[new_core_config.core_id] = new_core_config
self.cores += [new_core_config]
def is_empty(self):
if len(self.caches) > 0 or len(self.cores) > 0:
return False
return True
def write(self, config_file):
try:
with open(config_file, 'w') as conf:
conf.write('{0}\n'.format(self.version_tag))
conf.write('# This config was automatically generated\n')
conf.write('[caches]\n')
for _, cache in self.caches.items():
conf.write(cache.to_line())
conf.write('\n[cores]\n')
for core in self.cores:
conf.write(core.to_line())
except:
raise Exception('Couldn\'t write config file')
def get_startup_cores(self):
return [
core
for core in self.cores
if core.params.get("lazy_startup", "false") == "false"
]
# Config helper functions
def start_cache(cache, load, force=False):
casadm.start_cache(
device=cache.device,
cache_id=cache.cache_id,
cache_mode=cache.cache_mode,
cache_line_size=cache.params.get('cache_line_size'),
load=load,
force=force)
def configure_cache(cache):
if "cleaning_policy" in cache.params:
casadm.set_param(
"cleaning", cache_id=cache.cache_id, policy=cache.params["cleaning_policy"]
)
if "promotion_policy" in cache.params:
casadm.set_param(
"promotion", cache_id=cache.cache_id, policy=cache.params["promotion"]
)
if "ioclass_file" in cache.params:
casadm.io_class_load_config(
cache_id=cache.cache_id, ioclass_file=cache.params["ioclass_file"]
)
def add_core(core, attach):
casadm.add_core(
device=core.device,
cache_id=core.cache_id,
core_id=core.core_id,
try_add=attach)
# Another helper functions
def is_cache_started(cache_config):
dev_list = get_caches_list()
for dev in dev_list:
if dev['type'] == 'cache' and int(dev['id']) == cache_config.cache_id:
return True
return False
def is_core_added(core_config):
dev_list = get_caches_list()
cache_id = 0
for dev in dev_list:
if dev['type'] == 'cache':
cache_id = int(dev['id'])
if (dev['type'] == 'core' and
cache_id == core_config.cache_id and
int(dev['id']) == core_config.core_id):
return True
return False
def get_caches_list():
result = casadm.list_caches()
return list(csv.DictReader(result.stdout.split('\n')))
def check_cache_device(device):
result = casadm.check_cache_device(device)
return list(csv.DictReader(result.stdout.split('\n')))[0]
def get_cas_version():
version = casadm.get_version()
ret = {}
for line in version.stdout.split('\n')[1:]:
try:
component, version = line.split(',')
except:
continue
ret[component] = version
return ret
class CompoundException(Exception):
def __init__(self):
super(CompoundException, self).__init__()
self.exception_list = list()
def __str__(self):
s = "Multiple exceptions occured:\n" if len(self.exception_list) > 1 else ""
for e in self.exception_list:
s += '{0}\n'.format(str(e))
return s
def add_exception(self, e):
if type(e) is CompoundException:
self.exception_list += e.exception_list
else:
self.exception_list += [e]
def is_empty(self):
return len(self.exception_list) == 0
def raise_nonempty(self):
if self.is_empty():
return
else:
raise self
def detach_core_recursive(cache_id, core_id, flush):
# Catching exceptions is left to uppermost caller of detach_core_recursive
# as the immediate caller that made a recursive call depends on the callee
# to remove core and thus release reference to lower level cache volume.
l_cache_id = ''
for dev in get_caches_list():
if dev['type'] == 'cache':
l_cache_id = dev['id']
elif dev['type'] == 'core' and dev['status'] == 'Active':
if '/dev/cas{0}-{1}'.format(cache_id, core_id) in dev['disk']:
detach_core_recursive(l_cache_id, dev['id'], flush)
elif l_cache_id == cache_id and dev['id'] == core_id and dev['status'] != 'Active':
return
casadm.remove_core(cache_id, core_id, detach = True, force = not flush)
def detach_all_cores(flush):
error = CompoundException()
try:
dev_list = get_caches_list()
except casadm.CasadmError as e:
raise Exception('Unable to list caches. Reason:\n{0}'.format(
e.result.stderr))
except:
raise Exception('Unable to list caches.')
for dev in dev_list:
if dev['type'] == 'cache':
cache_id = dev['id']
elif dev['type'] == 'core' and dev['status'] == "Active":
# In case of exception we proceed with detaching remaining core instances
# to gracefully shutdown as many cache instances as possible.
try:
detach_core_recursive(cache_id, dev['id'], flush)
except casadm.CasadmError as e:
error.add_exception(Exception(
'Unable to detach core {0}. Reason:\n{1}'.format(
dev['disk'], e.result.stderr)))
except:
error.add_exception(Exception(
'Unable to detach core {0}.'.format(dev['disk'])))
error.raise_nonempty()
def stop_all_caches(flush):
error = CompoundException()
try:
dev_list = get_caches_list()
except casadm.CasadmError as e:
raise Exception('Unable to list caches. Reason:\n{0}'.format(
e.result.stderr))
except:
raise Exception('Unable to list caches.')
for dev in dev_list:
if dev['type'] == 'cache':
# In case of exception we proceed with stopping subsequent cache instances
# to gracefully shutdown as many cache instances as possible.
try:
casadm.stop_cache(dev['id'], not flush)
except casadm.CasadmError as e:
error.add_exception(Exception(
'Unable to stop cache {0}. Reason:\n{1}'.format(
dev['disk'], e.result.stderr)))
except:
error.add_exception(Exception(
'Unable to stop cache {0}.'.format(dev['disk'])))
error.raise_nonempty()
def stop(flush):
error = CompoundException()
try:
detach_all_cores(flush)
except Exception as e:
error.add_exception(e)
try:
stop_all_caches(False)
except Exception as e:
error.add_exception(e)
error.raise_nonempty()
def get_devices_state():
device_list = get_caches_list()
devices = {"core_pool": [], "caches": {}, "cores": {}}
core_pool = False
prev_cache_id = -1
for device in device_list:
if device["type"] == "core pool":
core_pool = True
continue
if device["type"] == "cache":
core_pool = False
prev_cache_id = int(device["id"])
devices["caches"].update(
{
int(device["id"]): {
"device": device["disk"],
"status": device["status"],
}
}
)
elif device["type"] == "core":
core = {"device": device["disk"], "status": device["status"]}
if core_pool:
devices["core_pool"].append(core)
else:
core.update({"cache_id": prev_cache_id})
devices["cores"].update(
{(prev_cache_id, int(device["id"])): core}
)
return devices
def wait_for_startup(timeout=300, interval=5):
try:
config = cas_config.from_file(
cas_config.default_location, allow_incomplete=True
)
except Exception as e:
raise Exception("Unable to load opencas config. Reason: {0}".format(str(e)))
stop_time = time.time() + int(timeout)
not_initialized = None
target_core_state = config.get_startup_cores()
while stop_time > time.time():
not_initialized = []
runtime_core_state = get_devices_state()["cores"]
for core in target_core_state:
runtime_state = runtime_core_state.get((core.cache_id, core.core_id), None)
if not runtime_state or runtime_state["status"] != "Active":
not_initialized.append(core)
if not not_initialized:
break
time.sleep(interval)
return not_initialized