test-framework: reformat disk.py code

Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
This commit is contained in:
Kamil Gierszewski 2024-09-06 04:27:08 +02:00 committed by Katarzyna Treder
parent a7c7cd3d84
commit 42ebe34da3

View File

@ -1,9 +1,12 @@
# #
# Copyright(c) 2019-2022 Intel Corporation # Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import json import json
import re import re
from datetime import timedelta from datetime import timedelta
from enum import IntEnum from enum import IntEnum
@ -12,8 +15,8 @@ from storage_devices.device import Device
from test_tools import disk_utils, fs_utils, nvme_cli from test_tools import disk_utils, fs_utils, nvme_cli
from test_utils import disk_finder from test_utils import disk_finder
from test_utils.os_utils import wait from test_utils.os_utils import wait
from test_utils.output import Output
from test_utils.size import Unit from test_utils.size import Unit
from test_tools.disk_utils import get_pci_address
class DiskType(IntEnum): class DiskType(IntEnum):
@ -24,6 +27,12 @@ class DiskType(IntEnum):
optane = 4 optane = 4
def static_init(cls):
if getattr(cls, "static_init", None):
cls.static_init()
return cls
class DiskTypeSetBase: class DiskTypeSetBase:
def resolved(self): def resolved(self):
raise NotImplementedError() raise NotImplementedError()
@ -32,10 +41,12 @@ class DiskTypeSetBase:
raise NotImplementedError() raise NotImplementedError()
def json(self): def json(self):
return json.dumps({ return json.dumps(
"type": "set", {
"values": [t.name for t in self.types()] "type": "set",
}) "values": [t.name for t in self.types()],
}
)
def __lt__(self, other): def __lt__(self, other):
return min(self.types()) < min(other.types()) return min(self.types()) < min(other.types())
@ -81,14 +92,18 @@ class DiskTypeLowerThan(DiskTypeSetBase):
return set(filter(lambda d: d < disk_type, [*DiskType])) return set(filter(lambda d: d < disk_type, [*DiskType]))
def json(self): def json(self):
return json.dumps({ return json.dumps(
"type": "operator", {
"name": "lt", "type": "operator",
"args": [self.__disk_name] "name": "lt",
}) "args": [self.__disk_name],
}
)
class Disk(Device): class Disk(Device):
types_registry = []
def __init__( def __init__(
self, self,
path, path,
@ -97,16 +112,32 @@ class Disk(Device):
block_size, block_size,
): ):
Device.__init__(self, path) Device.__init__(self, path)
self.disk_type = disk_type
self.serial_number = serial_number self.serial_number = serial_number
self.block_size = Unit(block_size) self.block_size = Unit(block_size)
self.disk_type = disk_type self.device_id = self.get_device_id()
self.partitions = [] self.partitions = []
self.pci_address = None self.pci_address = None
def create_partitions( @classmethod
self, def register_type(cls, new_type):
sizes: [], cls.types_registry.append(new_type)
partition_table_type=disk_utils.PartitionTable.gpt):
@classmethod
def resolve_type(cls, disk_path):
recognized_types = [
disk_type for disk_type in cls.types_registry if disk_type.identify(disk_path)
]
if len(recognized_types) == 0:
raise TypeError(f"Framework is not able to recognise disk type for disk {disk_path}")
if len(recognized_types) > 1:
raise TypeError(
f"Disk {disk_path} recognized as at least 2 disk types.\n"
f"Recognized disk types are {recognized_types}"
)
return recognized_types[0]
def create_partitions(self, sizes: [], partition_table_type=disk_utils.PartitionTable.gpt):
disk_utils.create_partitions(self, sizes, partition_table_type) disk_utils.create_partitions(self, sizes, partition_table_type)
def remove_partition(self, part): def remove_partition(self, part):
@ -115,9 +146,8 @@ class Disk(Device):
self.partitions.remove(part) self.partitions.remove(part)
def umount_all_partitions(self): def umount_all_partitions(self):
TestRun.LOGGER.info( TestRun.LOGGER.info(f"Unmounting all partitions from: {self.path}")
f"Umounting all partitions from: {self.path}") cmd = f"umount -l {fs_utils.readlink(self.path)}*?"
cmd = f'umount -l {fs_utils.readlink(self.path)}*?'
TestRun.executor.run(cmd) TestRun.executor.run(cmd)
def remove_partitions(self): def remove_partitions(self):
@ -137,70 +167,81 @@ class Disk(Device):
raise Exception("Couldn't check if device is detected by the system") raise Exception("Couldn't check if device is detected by the system")
def wait_for_plug_status(self, should_be_visible): def wait_for_plug_status(self, should_be_visible):
if not wait(lambda: should_be_visible == self.is_detected(), if not wait(
timedelta(minutes=1), lambda: should_be_visible == self.is_detected(),
timedelta(seconds=1)): timedelta(minutes=1),
raise Exception(f"Timeout occurred while trying to " timedelta(seconds=1),
f"{'plug' if should_be_visible else 'unplug'} disk.") ):
raise Exception(
f"Timeout occurred while trying to "
f"{'plug' if should_be_visible else 'unplug'} disk."
)
@classmethod
def plug_all(cls):
raise NotImplementedError
def plug(self): def plug(self):
if self.is_detected(): raise NotImplementedError
return
TestRun.executor.run_expect_success(self.plug_command)
self.wait_for_plug_status(True)
def unplug(self): def unplug(self):
if not self.is_detected(): if not self.is_detected():
return return
TestRun.executor.run_expect_success(self.unplug_command) self.__unplug()
self.device_id = None
self.wait_for_plug_status(False) self.wait_for_plug_status(False)
@staticmethod def __unplug(self):
def plug_all_disks(): raise NotImplementedError
TestRun.executor.run_expect_success(NvmeDisk.plug_all_command)
TestRun.executor.run_expect_success(SataDisk.plug_all_command)
def __str__(self): def __str__(self):
disk_str = f'system path: {self.path}, type: {self.disk_type.name}, ' \ disk_str = (
f'serial: {self.serial_number}, size: {self.size}, ' \ f"system path: {self.path}, type: {self.disk_type.name}, "
f'block size: {self.block_size}, pci address: {self.pci_address}, partitions:\n' f"serial: {self.serial_number}, size: {self.size}, "
f"block size: {self.block_size}, pci address: {self.pci_address}, partitions:\n"
)
for part in self.partitions: for part in self.partitions:
disk_str += f'\t{part}' disk_str += f"\t{part}"
return disk_str return disk_str
@staticmethod @staticmethod
def create_disk(path, def create_disk(disk_path: str, disk_type: DiskType, serial_number: str, block_size: Unit):
disk_type: DiskType, resolved_disk_type = Disk.resolve_type(disk_path=disk_path)
serial_number, return resolved_disk_type(disk_path, disk_type, serial_number, block_size)
block_size):
resolved_disk_type = None @classmethod
for checked_type in [NvmeDisk, SataDisk, VirtioDisk]: def plug_all_disks(cls):
try: for disk_type in cls.types_registry:
checked_type.get_unplug_path(fs_utils.readlink(path).split('/')[-1]) disk_type.plug_all()
resolved_disk_type = checked_type
break
except Exception:
continue
if resolved_disk_type is None:
raise Exception(f"Unrecognized device type for {path}")
return resolved_disk_type(path, disk_type, serial_number, block_size)
@static_init
class NvmeDisk(Disk): class NvmeDisk(Disk):
plug_all_command = "echo 1 > /sys/bus/pci/rescan"
def __init__(self, path, disk_type, serial_number, block_size): def __init__(self, path, disk_type, serial_number, block_size):
Disk.__init__(self, path, disk_type, serial_number, block_size) super().__init__(path, disk_type, serial_number, block_size)
self.plug_command = NvmeDisk.plug_all_command self.__pci_address = self.get_pci_address(device_id=self.device_id)
self.unplug_command = f"echo 1 > /sys/block/{self.get_device_id()}/device/remove || " \
f"echo 1 > /sys/block/{self.get_device_id()}/device/device/remove"
self.pci_address = NvmeDisk.get_pci_address(self.get_device_id())
def format_disk(self, metadata_size=None, block_size=None, @classmethod
force=True, format_params=None, reset=True): def static_init(cls):
Disk.register_type(new_type=cls)
@classmethod
def plug_all(cls) -> Output:
command = "echo 1 > /sys/bus/pci/rescan"
output = TestRun.executor.run_expect_success(command)
return output
def unplug(self) -> Output:
command = (
f"echo 1 > /sys/block/{self.device_id}/device/remove || echo 1 > /sys/block/"
f"{self.device_id}/device/device/remove"
)
output = TestRun.executor.run(command)
return output
def format_disk(
self, metadata_size=None, block_size=None, force=True, format_params=None, reset=True
):
nvme_cli.format_disk(self, metadata_size, block_size, force, format_params, reset) nvme_cli.format_disk(self, metadata_size, block_size, force, format_params, reset)
def get_lba_formats(self): def get_lba_formats(self):
@ -209,59 +250,76 @@ class NvmeDisk(Disk):
def get_lba_format_in_use(self): def get_lba_format_in_use(self):
return nvme_cli.get_lba_format_in_use(self) return nvme_cli.get_lba_format_in_use(self)
@classmethod @staticmethod
def get_unplug_path(cls, device_id): def get_unplug_path(device_id) -> str:
base = f"/sys/block/{device_id}/device" base = f"/sys/block/{device_id}/device"
for suffix in ["/remove", "/device/remove"]: for suffix in ["/remove", "/device/remove"]:
try: try:
output = fs_utils.ls_item(base + suffix) output = fs_utils.ls_item(base + suffix)
fs_utils.parse_ls_output(output)[0] fs_utils.parse_ls_output(output)[0]
except: except TypeError:
continue continue
return base + suffix return base + suffix
raise Exception(f"Couldn't create unplug path for {device_id}") raise Exception(f"Couldn't create unplug path for {device_id}")
@classmethod @staticmethod
def get_pci_address(cls, device_id): def get_pci_address(device_id) -> str:
return TestRun.executor.run(f"cat /sys/block/{device_id}/device/address").stdout return TestRun.executor.run(f"cat /sys/block/{device_id}/device/address").stdout
@staticmethod
def identify(device_path: str) -> bool:
device_name = TestRun.executor.run(f"realpath {device_path}").stdout.split("/")[2]
output = TestRun.executor.run(
f"realpath /sys/block/{device_name}/device/driver | grep nvme"
)
return output.exit_code == 0
@static_init
class SataDisk(Disk): class SataDisk(Disk):
plug_all_command = "for i in $(find -H /sys/devices/ -path '*/scsi_host/*/scan' -type f); " \
"do echo '- - -' > $i; done;"
def __init__(self, path, disk_type, serial_number, block_size): def __init__(self, path, disk_type, serial_number, block_size):
Disk.__init__(self, path, disk_type, serial_number, block_size) super().__init__(path, disk_type, serial_number, block_size)
self.plug_command = SataDisk.plug_all_command self.__pci_address = self.get_pci_address(device_id=self.device_id)
self.unplug_command = \
f"echo 1 > {self.get_unplug_path(self.get_device_id())}"
self.pci_address = SataDisk.get_pci_address(self.get_device_id())
@classmethod @classmethod
def get_unplug_path(cls, device_id): def static_init(cls):
sysfs_addr = cls.get_sysfs_addr(device_id) Disk.register_type(new_type=cls)
@classmethod
def plug_all(cls) -> Output:
cmd = (
f"for i in $(find -H /sys/devices/ -path '*/scsi_host/*/scan' -type f); do echo "
f"'- - -' > $i; done;"
)
output = TestRun.executor.run_expect_success(cmd)
return output
def unplug(self) -> Output:
cmd = f"echo 1 > {self.get_unplug_path(device_id=self.device_id)}"
output = TestRun.executor.run(cmd)
return output
def get_unplug_path(self, device_id) -> str:
sysfs_addr = self.get_sysfs_addr(device_id)
try: try:
cls.get_pci_address(device_id) self.get_pci_address(device_id)
except Exception as e: except Exception as e:
raise Exception(f"Failed to find controller for {device_id}.\n{e}") raise Exception(f"Failed to find controller for {device_id}.\n{e}")
return sysfs_addr + "/device/delete" return sysfs_addr + "/device/delete"
@classmethod @staticmethod
def get_sysfs_addr(cls, device_id): def get_sysfs_addr(device_id):
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)" ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}") output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0] sysfs_addr = fs_utils.parse_ls_output(output)[0]
if not sysfs_addr: if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}") raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")
return sysfs_addr.full_path return sysfs_addr.full_path
@classmethod @staticmethod
def get_pci_address(cls, device_id): def get_pci_address(device_id):
sysfs_addr = cls.get_sysfs_addr(device_id) sysfs_addr = SataDisk.get_sysfs_addr(device_id)
pci_address = re.findall(r"\d+:\d+:\d+.\d+", sysfs_addr) pci_address = re.findall(r"\d+:\d+:\d+.\d+", sysfs_addr)
if not pci_address: if not pci_address:
@ -269,40 +327,56 @@ class SataDisk(Disk):
return pci_address[-1] return pci_address[-1]
@staticmethod
def identify(device_path: str) -> bool:
device_name = TestRun.executor.run(f"realpath {device_path}").stdout.split("/")[2]
output = TestRun.executor.run(
f"realpath /sys/block/{device_name}/device/driver | grep scsi"
)
return output.exit_code == 0
@static_init
class VirtioDisk(Disk): class VirtioDisk(Disk):
plug_all_command = "echo 1 > /sys/bus/pci/rescan"
def __init__(self, path, disk_type, serial_number, block_size): def __init__(self, path, disk_type, serial_number, block_size):
Disk.__init__(self, path, disk_type, serial_number, block_size) super().__init__(path, disk_type, serial_number, block_size)
self.plug_command = VirtioDisk.plug_all_command self.__pci_address = self.get_pci_address(device_id=self.device_id)
self.unplug_command = \
f"echo 1 > {self.get_unplug_path(self.get_device_id())}"
self.pci_address = VirtioDisk.get_pci_address(self.get_device_id())
@classmethod @classmethod
def get_unplug_path(cls, device_id): def static_init(cls) -> None:
Disk.register_type(new_type=cls)
@classmethod
def plug_all(cls) -> Output:
cmd = "echo 1 > /sys/bus/pci/rescan"
output = TestRun.executor.run_expect_success(cmd)
return output
def unplug(self) -> Output:
cmd = f"echo 1 > {self.get_unplug_path(device_id=self.device_id)}"
output = TestRun.executor.run(cmd)
return output
@staticmethod
def get_unplug_path(device_id) -> str:
sysfs_path = VirtioDisk.get_sysfs_addr(device_id) sysfs_path = VirtioDisk.get_sysfs_addr(device_id)
pci_addr = VirtioDisk.get_pci_address(device_id) pci_addr = VirtioDisk.get_pci_address(device_id)
unplug_path = re.search(f".*{pci_addr}", sysfs_path) unplug_path = re.search(f".*{pci_addr}", sysfs_path)
if not unplug_path: if not unplug_path:
raise Exception(f"Failed to find controller for {device_id}") raise Exception(f"Failed to find controller for {device_id}")
return unplug_path.group(0) + "/remove"
return unplug_path.group(0)
@classmethod @classmethod
def get_pci_address(cls, device_id): def get_pci_address(cls, device_id) -> str:
sysfs_addr = VirtioDisk.get_sysfs_addr(device_id) sysfs_addr = cls.get_sysfs_addr(device_id)
pci_address = re.findall(r"\d+:[\da-f]+:[\da-f]+.\d+", sysfs_addr) pci_address = re.findall(r"\d+:[\da-f]+:[\da-f]+.\d+", sysfs_addr)
if not pci_address: if not pci_address:
raise Exception(f"Failed to get the pci address of {device_id} device.") raise Exception(f"Failed to get the pci address of {device_id} device.")
return pci_address[-1] return pci_address[-1]
@classmethod @staticmethod
def get_sysfs_addr(cls, device_id): def get_sysfs_addr(device_id: str) -> str:
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)" ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}") output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0] sysfs_addr = fs_utils.parse_ls_output(output)[0]
@ -311,4 +385,10 @@ class VirtioDisk(Disk):
return sysfs_addr.full_path return sysfs_addr.full_path
@staticmethod
def identify(device_path: str) -> bool:
device_name = TestRun.executor.run(f"realpath {device_path}").stdout.split("/")[2]
output = TestRun.executor.run(
f"realpath /sys/block/{device_name}/device/driver | grep virtio"
)
return output.exit_code == 0