diff --git a/storage_devices/disk.py b/storage_devices/disk.py index 9def9f6..8e2cbbc 100644 --- a/storage_devices/disk.py +++ b/storage_devices/disk.py @@ -1,9 +1,12 @@ # # Copyright(c) 2019-2022 Intel Corporation +# Copyright(c) 2024 Huawei Technologies Co., Ltd. # SPDX-License-Identifier: BSD-3-Clause # + import json import re + from datetime import timedelta from enum import IntEnum @@ -12,8 +15,8 @@ from storage_devices.device import Device from test_tools import disk_utils, fs_utils, nvme_cli from test_utils import disk_finder from test_utils.os_utils import wait +from test_utils.output import Output from test_utils.size import Unit -from test_tools.disk_utils import get_pci_address class DiskType(IntEnum): @@ -24,6 +27,12 @@ class DiskType(IntEnum): optane = 4 +def static_init(cls): + if getattr(cls, "static_init", None): + cls.static_init() + return cls + + class DiskTypeSetBase: def resolved(self): raise NotImplementedError() @@ -32,10 +41,12 @@ class DiskTypeSetBase: raise NotImplementedError() def json(self): - return json.dumps({ - "type": "set", - "values": [t.name for t in self.types()] - }) + return json.dumps( + { + "type": "set", + "values": [t.name for t in self.types()], + } + ) def __lt__(self, other): return min(self.types()) < min(other.types()) @@ -81,14 +92,18 @@ class DiskTypeLowerThan(DiskTypeSetBase): return set(filter(lambda d: d < disk_type, [*DiskType])) def json(self): - return json.dumps({ - "type": "operator", - "name": "lt", - "args": [self.__disk_name] - }) + return json.dumps( + { + "type": "operator", + "name": "lt", + "args": [self.__disk_name], + } + ) class Disk(Device): + types_registry = [] + def __init__( self, path, @@ -97,16 +112,32 @@ class Disk(Device): block_size, ): Device.__init__(self, path) + self.disk_type = disk_type self.serial_number = serial_number self.block_size = Unit(block_size) - self.disk_type = disk_type + self.device_id = self.get_device_id() self.partitions = [] self.pci_address = None - def create_partitions( - self, - sizes: [], - partition_table_type=disk_utils.PartitionTable.gpt): + @classmethod + def register_type(cls, new_type): + cls.types_registry.append(new_type) + + @classmethod + def resolve_type(cls, disk_path): + recognized_types = [ + disk_type for disk_type in cls.types_registry if disk_type.identify(disk_path) + ] + if len(recognized_types) == 0: + raise TypeError(f"Framework is not able to recognise disk type for disk {disk_path}") + if len(recognized_types) > 1: + raise TypeError( + f"Disk {disk_path} recognized as at least 2 disk types.\n" + f"Recognized disk types are {recognized_types}" + ) + return recognized_types[0] + + def create_partitions(self, sizes: [], partition_table_type=disk_utils.PartitionTable.gpt): disk_utils.create_partitions(self, sizes, partition_table_type) def remove_partition(self, part): @@ -115,9 +146,8 @@ class Disk(Device): self.partitions.remove(part) def umount_all_partitions(self): - TestRun.LOGGER.info( - f"Umounting all partitions from: {self.path}") - cmd = f'umount -l {fs_utils.readlink(self.path)}*?' + TestRun.LOGGER.info(f"Unmounting all partitions from: {self.path}") + cmd = f"umount -l {fs_utils.readlink(self.path)}*?" TestRun.executor.run(cmd) def remove_partitions(self): @@ -137,70 +167,81 @@ class Disk(Device): raise Exception("Couldn't check if device is detected by the system") def wait_for_plug_status(self, should_be_visible): - if not wait(lambda: should_be_visible == self.is_detected(), - timedelta(minutes=1), - timedelta(seconds=1)): - raise Exception(f"Timeout occurred while trying to " - f"{'plug' if should_be_visible else 'unplug'} disk.") + if not wait( + lambda: should_be_visible == self.is_detected(), + timedelta(minutes=1), + timedelta(seconds=1), + ): + raise Exception( + f"Timeout occurred while trying to " + f"{'plug' if should_be_visible else 'unplug'} disk." + ) + + @classmethod + def plug_all(cls): + raise NotImplementedError def plug(self): - if self.is_detected(): - return - TestRun.executor.run_expect_success(self.plug_command) - self.wait_for_plug_status(True) + raise NotImplementedError def unplug(self): if not self.is_detected(): return - TestRun.executor.run_expect_success(self.unplug_command) + self.__unplug() + self.device_id = None self.wait_for_plug_status(False) - @staticmethod - def plug_all_disks(): - TestRun.executor.run_expect_success(NvmeDisk.plug_all_command) - TestRun.executor.run_expect_success(SataDisk.plug_all_command) + def __unplug(self): + raise NotImplementedError def __str__(self): - disk_str = f'system path: {self.path}, type: {self.disk_type.name}, ' \ - f'serial: {self.serial_number}, size: {self.size}, ' \ - f'block size: {self.block_size}, pci address: {self.pci_address}, partitions:\n' + disk_str = ( + f"system path: {self.path}, type: {self.disk_type.name}, " + f"serial: {self.serial_number}, size: {self.size}, " + f"block size: {self.block_size}, pci address: {self.pci_address}, partitions:\n" + ) for part in self.partitions: - disk_str += f'\t{part}' + disk_str += f"\t{part}" return disk_str @staticmethod - def create_disk(path, - disk_type: DiskType, - serial_number, - block_size): + def create_disk(disk_path: str, disk_type: DiskType, serial_number: str, block_size: Unit): + resolved_disk_type = Disk.resolve_type(disk_path=disk_path) + return resolved_disk_type(disk_path, disk_type, serial_number, block_size) - resolved_disk_type = None - for checked_type in [NvmeDisk, SataDisk, VirtioDisk]: - try: - checked_type.get_unplug_path(fs_utils.readlink(path).split('/')[-1]) - resolved_disk_type = checked_type - break - except Exception: - continue - - if resolved_disk_type is None: - raise Exception(f"Unrecognized device type for {path}") - - return resolved_disk_type(path, disk_type, serial_number, block_size) + @classmethod + def plug_all_disks(cls): + for disk_type in cls.types_registry: + disk_type.plug_all() +@static_init class NvmeDisk(Disk): - plug_all_command = "echo 1 > /sys/bus/pci/rescan" - def __init__(self, path, disk_type, serial_number, block_size): - Disk.__init__(self, path, disk_type, serial_number, block_size) - self.plug_command = NvmeDisk.plug_all_command - self.unplug_command = f"echo 1 > /sys/block/{self.get_device_id()}/device/remove || " \ - f"echo 1 > /sys/block/{self.get_device_id()}/device/device/remove" - self.pci_address = NvmeDisk.get_pci_address(self.get_device_id()) + super().__init__(path, disk_type, serial_number, block_size) + self.__pci_address = self.get_pci_address(device_id=self.device_id) - def format_disk(self, metadata_size=None, block_size=None, - force=True, format_params=None, reset=True): + @classmethod + def static_init(cls): + Disk.register_type(new_type=cls) + + @classmethod + def plug_all(cls) -> Output: + command = "echo 1 > /sys/bus/pci/rescan" + output = TestRun.executor.run_expect_success(command) + return output + + def unplug(self) -> Output: + command = ( + f"echo 1 > /sys/block/{self.device_id}/device/remove || echo 1 > /sys/block/" + f"{self.device_id}/device/device/remove" + ) + output = TestRun.executor.run(command) + return output + + def format_disk( + self, metadata_size=None, block_size=None, force=True, format_params=None, reset=True + ): nvme_cli.format_disk(self, metadata_size, block_size, force, format_params, reset) def get_lba_formats(self): @@ -209,59 +250,76 @@ class NvmeDisk(Disk): def get_lba_format_in_use(self): return nvme_cli.get_lba_format_in_use(self) - @classmethod - def get_unplug_path(cls, device_id): + @staticmethod + def get_unplug_path(device_id) -> str: base = f"/sys/block/{device_id}/device" for suffix in ["/remove", "/device/remove"]: try: output = fs_utils.ls_item(base + suffix) fs_utils.parse_ls_output(output)[0] - except: + except TypeError: continue - return base + suffix - raise Exception(f"Couldn't create unplug path for {device_id}") - @classmethod - def get_pci_address(cls, device_id): + @staticmethod + def get_pci_address(device_id) -> str: return TestRun.executor.run(f"cat /sys/block/{device_id}/device/address").stdout + @staticmethod + def identify(device_path: str) -> bool: + device_name = TestRun.executor.run(f"realpath {device_path}").stdout.split("/")[2] + output = TestRun.executor.run( + f"realpath /sys/block/{device_name}/device/driver | grep nvme" + ) + return output.exit_code == 0 + +@static_init class SataDisk(Disk): - plug_all_command = "for i in $(find -H /sys/devices/ -path '*/scsi_host/*/scan' -type f); " \ - "do echo '- - -' > $i; done;" - def __init__(self, path, disk_type, serial_number, block_size): - Disk.__init__(self, path, disk_type, serial_number, block_size) - self.plug_command = SataDisk.plug_all_command - self.unplug_command = \ - f"echo 1 > {self.get_unplug_path(self.get_device_id())}" - self.pci_address = SataDisk.get_pci_address(self.get_device_id()) + super().__init__(path, disk_type, serial_number, block_size) + self.__pci_address = self.get_pci_address(device_id=self.device_id) @classmethod - def get_unplug_path(cls, device_id): - sysfs_addr = cls.get_sysfs_addr(device_id) + def static_init(cls): + Disk.register_type(new_type=cls) + + @classmethod + def plug_all(cls) -> Output: + cmd = ( + f"for i in $(find -H /sys/devices/ -path '*/scsi_host/*/scan' -type f); do echo " + f"'- - -' > $i; done;" + ) + output = TestRun.executor.run_expect_success(cmd) + return output + + def unplug(self) -> Output: + cmd = f"echo 1 > {self.get_unplug_path(device_id=self.device_id)}" + output = TestRun.executor.run(cmd) + return output + + def get_unplug_path(self, device_id) -> str: + sysfs_addr = self.get_sysfs_addr(device_id) try: - cls.get_pci_address(device_id) + self.get_pci_address(device_id) except Exception as e: raise Exception(f"Failed to find controller for {device_id}.\n{e}") return sysfs_addr + "/device/delete" - @classmethod - def get_sysfs_addr(cls, device_id): + @staticmethod + def get_sysfs_addr(device_id): ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)" output = fs_utils.ls_item(f"{ls_command}") sysfs_addr = fs_utils.parse_ls_output(output)[0] if not sysfs_addr: raise Exception(f"Failed to find sysfs address: ls -l {ls_command}") - return sysfs_addr.full_path - @classmethod - def get_pci_address(cls, device_id): - sysfs_addr = cls.get_sysfs_addr(device_id) + @staticmethod + def get_pci_address(device_id): + sysfs_addr = SataDisk.get_sysfs_addr(device_id) pci_address = re.findall(r"\d+:\d+:\d+.\d+", sysfs_addr) if not pci_address: @@ -269,40 +327,56 @@ class SataDisk(Disk): return pci_address[-1] + @staticmethod + def identify(device_path: str) -> bool: + device_name = TestRun.executor.run(f"realpath {device_path}").stdout.split("/")[2] + output = TestRun.executor.run( + f"realpath /sys/block/{device_name}/device/driver | grep scsi" + ) + return output.exit_code == 0 + +@static_init class VirtioDisk(Disk): - plug_all_command = "echo 1 > /sys/bus/pci/rescan" - def __init__(self, path, disk_type, serial_number, block_size): - Disk.__init__(self, path, disk_type, serial_number, block_size) - self.plug_command = VirtioDisk.plug_all_command - self.unplug_command = \ - f"echo 1 > {self.get_unplug_path(self.get_device_id())}" - self.pci_address = VirtioDisk.get_pci_address(self.get_device_id()) + super().__init__(path, disk_type, serial_number, block_size) + self.__pci_address = self.get_pci_address(device_id=self.device_id) @classmethod - def get_unplug_path(cls, device_id): + def static_init(cls) -> None: + Disk.register_type(new_type=cls) + + @classmethod + def plug_all(cls) -> Output: + cmd = "echo 1 > /sys/bus/pci/rescan" + output = TestRun.executor.run_expect_success(cmd) + return output + + def unplug(self) -> Output: + cmd = f"echo 1 > {self.get_unplug_path(device_id=self.device_id)}" + output = TestRun.executor.run(cmd) + return output + + @staticmethod + def get_unplug_path(device_id) -> str: sysfs_path = VirtioDisk.get_sysfs_addr(device_id) pci_addr = VirtioDisk.get_pci_address(device_id) - unplug_path = re.search(f".*{pci_addr}", sysfs_path) if not unplug_path: raise Exception(f"Failed to find controller for {device_id}") - - return unplug_path.group(0) + return unplug_path.group(0) + "/remove" @classmethod - def get_pci_address(cls, device_id): - sysfs_addr = VirtioDisk.get_sysfs_addr(device_id) + def get_pci_address(cls, device_id) -> str: + sysfs_addr = cls.get_sysfs_addr(device_id) pci_address = re.findall(r"\d+:[\da-f]+:[\da-f]+.\d+", sysfs_addr) - if not pci_address: raise Exception(f"Failed to get the pci address of {device_id} device.") return pci_address[-1] - @classmethod - def get_sysfs_addr(cls, device_id): + @staticmethod + def get_sysfs_addr(device_id: str) -> str: ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)" output = fs_utils.ls_item(f"{ls_command}") sysfs_addr = fs_utils.parse_ls_output(output)[0] @@ -311,4 +385,10 @@ class VirtioDisk(Disk): return sysfs_addr.full_path - + @staticmethod + def identify(device_path: str) -> bool: + device_name = TestRun.executor.run(f"realpath {device_path}").stdout.split("/")[2] + output = TestRun.executor.run( + f"realpath /sys/block/{device_name}/device/driver | grep virtio" + ) + return output.exit_code == 0