From 3e7560e22c2e01af4d1af65dba733528a3c519fc Mon Sep 17 00:00:00 2001 From: Jan Musial Date: Wed, 15 Jan 2020 09:05:31 +0100 Subject: [PATCH] Create upgrade script for upgrade-in-flight of OpenCAS Signed-off-by: Jan Musial --- utils/upgrade | 378 +++++++++++++++++++++++++++++++++++++++++ utils/upgrade_utils.py | 218 ++++++++++++++++++++++++ 2 files changed, 596 insertions(+) create mode 100755 utils/upgrade create mode 100644 utils/upgrade_utils.py diff --git a/utils/upgrade b/utils/upgrade new file mode 100755 index 0000000..e4bb3e7 --- /dev/null +++ b/utils/upgrade @@ -0,0 +1,378 @@ +#!/usr/bin/python3 +# +# Copyright(c) 2019 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause-Clear +# + +import logging +import sys +import argparse +import subprocess +import os +from pathlib import Path + +import opencas +from upgrade_utils import ( + yn_prompt, + Failure, + Success, + Warn, + Abort, + StateMachine, + UpgradeState, + insert_module, + remove_module, + get_device_sysfs_path, + get_device_schedulers, + set_device_scheduler, + drop_os_caches, +) + +LOG_FILE = "/var/log/opencas-upgrade/upgrade.log" +COMPILATION_LOG = "/var/log/opencas-upgrade/build.log" + +CAS_CACHE_KEY = "CAS Cache Kernel Module" +CAS_DISK_KEY = "CAS Disk Kernel Module" +CAS_CLI_KEY = "CAS CLI Utility" + +OCL_BUILD_ROOT = f"{os.path.dirname(__file__)}/.." + + +class InitUpgrade(UpgradeState): + log = "Performing initial checks" + will_prompt = True + + class NotInstalled(Warn): + pass + + def do_work(self): + if self.state_machine.params["force"]: + self.will_prompt = False + elif yn_prompt("Proceed with upgrade procedure?") == "n": + return Abort("User aborted") + + try: + version = opencas.get_cas_version() + except FileNotFoundError: + return self.NotInstalled(f"Couldn't detect current CAS version") + except Exception as e: + return Failure(f"Failed to get current version of CAS {e}") + + if version[CAS_CLI_KEY] != version[CAS_CACHE_KEY]: + return Failure("Mismatch between CLI and cas_cache version") + + active_devices = opencas.get_devices_state() + + if any([device["status"] != "Running" for device in active_devices["caches"].values()]): + return Failure("Incomplete configuration. Run casadm -L to review CAS state!") + + if len(active_devices["core_pool"]) != 0: + return Failure("Incomplete configuration. Run casadm -L to review CAS state!") + + return Success() + + +class BuildCas(UpgradeState): + log = "Compiling Open CAS" + + def do_work(self): + with open(COMPILATION_LOG, "w") as build_log: + logging.info("Running ./configure for CAS") + p = subprocess.run( + ["./configure"], cwd=OCL_BUILD_ROOT, stdout=build_log, stderr=build_log + ) + if p.returncode: + return Failure(f"Configuration of Open CAS failed. Build log: {COMPILATION_LOG}") + + logging.info("Compiling CAS") + p = subprocess.run( + ["make", "-j"], cwd=OCL_BUILD_ROOT, stdout=build_log, stderr=build_log + ) + if p.returncode: + return Failure(f"Compilation of Open CAS failed. Build log: {COMPILATION_LOG}") + + return Success() + + +class InstallCas(UpgradeState): + log = "Installing new Open CAS files" + + def do_work(self): + with open(COMPILATION_LOG, "a") as install_log: + p = subprocess.run( + ["make", "install"], cwd=OCL_BUILD_ROOT, stdout=install_log, stderr=install_log + ) + + if p.returncode: + return Failure( + f"Installation of Open CAS failed. Installation log: {COMPILATION_LOG}" + ) + + return Success() + + +class InsertModule(UpgradeState): + module_path = f"{OCL_BUILD_ROOT}/modules/cas_cache/cas_cache.ko" + options = {"installed": False} + + def do_work(self): + try: + insert_module(self.module_path, **self.options) + except Exception as e: + return Failure(f"Couldn't load module {self.module_path}. Reason: {e}") + + return Success() + + +class InsertNewModule(InsertModule): + log = "Try to insert new caching module" + + +class DryRun(InsertModule): + log = "Perform dry run to check upgrade data integrity" + options = {"installed": False, "dry_run": 1} + + def do_work(self): + result = super().do_work() + if not isinstance(result, Success): + return Failure("Dry run failed") + + try: + logging.info( + "Insert with dry_run set succeeded. Removing module to attempt final insertion." + ) + remove_module("cas_cache") + except Exception as e: + return Failure(f"Could not unload module. Reason: {e}") + + return result + + +class InsertInstalledModule(InsertModule): + log = "Restore old cas_cache module" + module_path = "cas_cache" + options = {"installed": True} + + +class PrepareForUpgrade(UpgradeState): + log = "Preparing Open CAS for upgrade" + + def do_work(self): + try: + logging.info("Switching CAS to upgrade mode") + opencas.casadm.start_upgrade() + except opencas.casadm.CasadmError as e: + return Failure(e) + + return Success() + + +class RemoveModule(UpgradeState): + log = "Removing cas_cache module" + + def do_work(self): + try: + logging.info("Removing cas_cache module") + remove_module("cas_cache") + except Exception as e: + return Failure(f"Failed to remove cas_cache module. Reason: {e}") + + return Success() + + +class SetSchedulersToNoop(UpgradeState): + log = "Setting core devices schedulers to noop" + + def do_work(self): + unique_core_sysfs_devices = { + get_device_sysfs_path(core["device"]) + for core in opencas.get_devices_state()["cores"].values() + } + + self.state_machine.schedulers = {} + for core_sysfs_path in unique_core_sysfs_devices: + current, available = get_device_schedulers(core_sysfs_path) + self.state_machine.schedulers[core_sysfs_path] = current + + if current in ["noop", "none"]: + logging.info(f"Core {core_sysfs_path} already uses {current} scheduler. Skipping.") + elif "noop" in available: + logging.info(f"Switching scheduler for {core_sysfs_path}: {current} => noop") + set_device_scheduler(core_sysfs_path, "noop") + elif "none" in available: + logging.info(f"Switching scheduler for {core_sysfs_path}: {current} => none") + set_device_scheduler(core_sysfs_path, "none") + else: + logging.info(f"No appropriate scheduler available for {core_sysfs_path}. Skipping.") + + return Success() + + +class RestoreCoreSchedulers(UpgradeState): + log = "Restoring core devices schedulers" + + def do_work(self): + for core_sysfs_path, scheduler in self.state_machine.schedulers.items(): + current = get_device_schedulers(core_sysfs_path)[0] + if scheduler in ["noop", "none"]: + logging.info( + f"Device {core_sysfs_path} already uses {scheduler} scheduler. Skipping." + ) + else: + logging.info(f"Switching scheduler for {core_sysfs_path}: {current} => {scheduler}") + set_device_scheduler(core_sysfs_path, scheduler) + + return Success() + + +class DropCaches(UpgradeState): + log = "Drop OS caches to ensure memory availability" + + def do_work(self): + logging.info("Dropping slab and page caches using procfs") + drop_os_caches() + + return Success() + + +class InstallStateMachine(StateMachine): + transition_map = { + BuildCas: {Success: InstallCas}, + InstallCas: {"default": None}, + "default": None, + } + + +class RegularInstall(UpgradeState): + log = "Installing Open CAS" + will_prompt = True + + def do_work(self): + if ( + yn_prompt("Previous CAS installation not detected. Perform regular installation?") + == "n" + ): + return Abort("User aborted") + + sm = InstallStateMachine(BuildCas) + result = sm.run() + + return result + + +class UpgradeStateMachine(StateMachine): + """ + This class implements whole CAS in-flight upgrade procedure. + + +-------------+ not +---------------+ + |InitUpgrade +----------->+RegularInstall | + +------+------+ installed | | + | | +---------+ | + v | |BuildCas | | + +------+------+ fail | +---+-----+ | + |BuildCas +------+ | | | + +------+------+ | | v | + | | | +---+-----+ | + v | | |Install | | + +------+------+ fail | | +---------+ | + |SetToNoop +------+ | | + +------+------+ | +----------+----+ + | | | + v +----------------+ + +------+------+ fail | + |PrepareForUpg+-------------------+ | + +------+------+ | | + | | | + v | | + +------+------+ fail | | + |RemoveModule +-------------------+ | + +------+------+ | | + | | | + v | | + +------+------+ fail | | + |DropCaches +-------------------+ | + +------+------+ | | + | | | + v | | + +------+------+ fail | | + |DryRun +--------+ | | + +------+------+ | | | + | | | | + v | | | + +------+------+ fail | | | + |InsertNew +--------+ | | + +------+------+ | | | + | | | | + v v | | + +------+------+ +------+--------+ | | + |InstallCas | |InsertInstalled| | | + +------+------+ +------+--------+ | | + | | | | + v | | | + +------+------+ | | | + |RestoreSched +<-------+----------+ | + +------+------+ v + | +------+---+ + +---------------------->+ END | + +----------+ + + """ + + transition_map = { + InitUpgrade: {Success: BuildCas, InitUpgrade.NotInstalled: RegularInstall}, + RegularInstall: {"default": None}, + BuildCas: {Success: SetSchedulersToNoop}, + SetSchedulersToNoop: {Success: PrepareForUpgrade}, + PrepareForUpgrade: {Success: RemoveModule, Failure: RestoreCoreSchedulers}, + RemoveModule: {Success: DropCaches, Failure: RestoreCoreSchedulers}, + DropCaches: {Success: DryRun, Failure: RestoreCoreSchedulers}, + DryRun: {Success: InsertNewModule, Failure: InsertInstalledModule}, + InsertNewModule: {Success: InstallCas, Failure: InsertInstalledModule}, + InstallCas: {Success: RestoreCoreSchedulers, Failure: InsertInstalledModule}, + InsertInstalledModule: {"default": RestoreCoreSchedulers}, + RestoreCoreSchedulers: {"default": None}, + "default": None, + } + + +def start(args): + logging.info(">>> Starting OpenCAS upgrade procedure") + s = UpgradeStateMachine(InitUpgrade, force=args.force) + result = s.run() + + if not isinstance(result, Success): + print(f"Upgrade failed. Reason: {result}") + else: + print("Upgrade completed successfully!") + + print(f"Full upgrade log: {LOG_FILE}") + + +def main(): + Path(LOG_FILE).parent.mkdir(mode=0o700, parents=True, exist_ok=True) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)-5.5s] %(message)s", + handlers=[logging.FileHandler(LOG_FILE)], + ) + + parser = argparse.ArgumentParser(prog=__file__) + subparsers = parser.add_subparsers(title="actions") + + parser_start = subparsers.add_parser( + "start", help="Upgrade Open CAS in flight to current version" + ) + parser_start.add_argument("--force", action="store_true", help="Skip prompts") + parser_start.set_defaults(func=start) + + if len(sys.argv[1:]) == 0: + parser.print_help() + else: + args = parser.parse_args(sys.argv[1:]) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/utils/upgrade_utils.py b/utils/upgrade_utils.py new file mode 100644 index 0000000..3fffffb --- /dev/null +++ b/utils/upgrade_utils.py @@ -0,0 +1,218 @@ +# +# Copyright(c) 2020 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause-Clear +# + +import logging +import subprocess +import os +import re + + +def user_prompt(message, choices, default): + result = None + prompt = f"{message} ({'/'.join(choices)})[{default}]: " + logging.info(f"Prompting user: {prompt}") + while result not in choices: + result = input(f"\n{prompt}") + if not result: + logging.info(f"User chose default: {default}") + result = default + else: + logging.info(f"User chose: {result}") + + return result + + +def yn_prompt(message, default="n"): + return user_prompt(message, choices=["y", "n"], default=default) + + +class Result: + def __init__(self, msg=""): + self.msg = msg + + def __str__(self): + return f"{type(self).__name__}: {self.msg}" + + +class Failure(Result): + def result_mark(self): + return "[\u001b[31mX\u001b[0m]" + + +class Success(Result): + def result_mark(self): + return "[\u001b[32mv\u001b[0m]" + + +class Warn(Result): + def result_mark(self): + return "[\u001b[33m!\u001b[0m]" + + +class Except(Failure): + def result_mark(self): + return "[\u001b[31mE\u001b[0m]" + + +class Abort(Failure): + def result_mark(self): + return "[\u001b[31mA\u001b[0m]" + + +class StateMachine: + transition_map = {} + + def __init__(self, initial_state, **args): + self.initial_state = initial_state + self.params = args + + def run(self): + s = self.initial_state + result = Success() + self.last_fail = None + try: + while s is not None: + self.current_state = s(self) + + result = self.current_state.start() + if isinstance(result, Failure): + self.last_fail = result + + try: + s = self.transition_map[s][type(result)] + except KeyError: + try: + s = self.transition_map[s]["default"] + except KeyError: + s = self.transition_map["default"] + except KeyboardInterrupt: + self.result = self.abort() + except Exception as e: + self.result = self.exception(f"{type(e).__name__}({e})") + + if self.last_fail: + result = self.last_fail + + logging.info(f"Finishing {type(self).__name__} with result {result}") + return result + + def abort(self): + log = "User interrupted" + print(log) + logging.warning(log) + + return Abort() + + def exception(self, e): + log = f"Stopping {type(self).__name__}. Reason: {e}" + print(log) + self.last_fail = Except(e) + logging.exception(log) + + return self.last_fail + + +class UpgradeState: + will_prompt = False + log = "" + + def __init__(self, sm): + self.state_machine = sm + + def do_work(self): + raise NotImplementedError() + + def start(self): + self.enter_state() + try: + self.result = self.do_work() + except KeyboardInterrupt: + self.result = Abort("User aborted") + except Exception as e: + log = f"State {type(self).__name__} failed unexpectedly. Reason: {e}" + self.result = Except(log) + logging.exception(log) + raise e + + self.exit_state() + + return self.result + + def enter_state(self): + logging.debug(f"Entering state {type(self).__name__}") + print(f"{self.log+'...':60}", end="", flush=True) + + def exit_state(self): + if isinstance(self.result, Success): + log = logging.debug + elif isinstance(self.result, Warn): + log = logging.warning + else: + log = logging.error + + log(f"Exiting state {type(self).__name__} with result '{self.result}'") + if self.will_prompt: + print(f"\n{self.log+'...':60}", end="", flush=True) + + print(self.result.result_mark()) + + +def insert_module(name, installed=True, **params): + cmd_params = [f"{param}={val}" for param, val in params.items()] + + cmd = "modprobe --first-time" if installed else "insmod" + + p = subprocess.run([cmd, name] + cmd_params, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if p.returncode: + raise Exception(p.stderr.decode("ascii").rstrip("\n")) + + +def remove_module(name): + p = subprocess.run(["rmmod", name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if p.returncode: + raise Exception(p.stderr.decode("ascii").rstrip("\n")) + + +def get_device_sysfs_path(device): + basename = os.path.basename(device) + + p1 = subprocess.Popen(["find", "-L", "/sys/block", "-maxdepth", "2"], stdout=subprocess.PIPE) + p2 = subprocess.Popen(["grep", f"{basename}$"], stdin=p1.stdout, stdout=subprocess.PIPE) + p3 = subprocess.Popen( + ["sed", "-r", "s/(\/sys\/block\/[^/]+).*/\\1/"], stdin=p2.stdout, stdout=subprocess.PIPE + ) # noqa W605 + p1.stdout.close() + p2.stdout.close() + + output = p3.communicate()[0] + + return output.decode("ascii").rstrip("\n") + + +def get_device_schedulers(sysfs_path): + with open(f"{sysfs_path}/queue/scheduler", "r") as f: + schedulers = f.readline().rstrip("\n") + + try: + current = re.match(".*\[(.*)\].*", schedulers)[1] # noqa W605 + except IndexError: + current = "none" + pass + + available = schedulers.replace("[", "").replace("]", "").split() + + return current, available + + +def set_device_scheduler(sysfs_path, scheduler): + with open(f"{sysfs_path}/queue/scheduler", "w") as f: + f.write(f"{scheduler}\n") + + +def drop_os_caches(): + with open(f"/proc/sys/vm/drop_caches", "w") as f: + f.write("3")