opencas-test-framework/test_utils/disk_finder.py
Katarzyna Treder 161cc7957a Move output to connection utils
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 12:29:44 +01:00

191 lines
6.9 KiB
Python

#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import posixpath
from core.test_run import TestRun
from test_tools import disk_utils
from test_tools.fs_utils import check_if_file_exists, readlink
from test_utils import os_utils
from connection.utils.output import CmdException
def find_disks():
devices_result = []
TestRun.LOGGER.info("Finding platform's disks.")
# TODO: intelmas should be implemented as a separate tool in the future.
# There will be intelmas installer in case, when it is not installed
output = TestRun.executor.run('intelmas')
if output.exit_code != 0:
raise Exception(f"Error while executing command: 'intelmas'.\n"
f"stdout: {output.stdout}\nstderr: {output.stderr}")
block_devices = get_block_devices_list()
try:
discover_ssd_devices(block_devices, devices_result)
discover_hdd_devices(block_devices, devices_result)
except Exception as e:
raise Exception(f"Exception occurred while looking for disks: {str(e)}")
return devices_result
def get_block_devices_list():
devices = TestRun.executor.run_expect_success("ls /sys/block -1").stdout.splitlines()
os_disks = get_system_disks()
block_devices = []
for dev in devices:
if any([prefix in dev for prefix in ["sd", "nvme", "vd"]]) and dev not in os_disks:
block_devices.append(dev)
return block_devices
def discover_hdd_devices(block_devices, devices_res):
for dev in block_devices:
if TestRun.executor.run_expect_success(f"cat /sys/block/{dev}/removable").stdout == "1":
continue # skip removable drives
block_size = disk_utils.get_block_size(dev)
if int(block_size) == 4096:
disk_type = 'hdd4k'
else:
disk_type = 'hdd'
devices_res.append({
"type": disk_type,
"path": f"{resolve_to_by_id_link(dev)}",
"serial": TestRun.executor.run_expect_success(
f"sg_inq /dev/{dev} | grep -i 'serial number'"
).stdout.split(': ')[1].strip(),
"blocksize": block_size,
"size": disk_utils.get_size(dev)})
block_devices.clear()
# This method discovers only Intel SSD devices
def discover_ssd_devices(block_devices, devices_res):
ssd_count = int(TestRun.executor.run_expect_success(
'intelmas show -intelssd | grep DevicePath | wc -l').stdout)
for i in range(0, ssd_count):
# Workaround for intelmas bug that lists all of the devices (non intel included)
# with -intelssd flag
if TestRun.executor.run(
f"intelmas show -display index -intelssd {i} | grep -w Intel").exit_code == 0:
device_path = TestRun.executor.run_expect_success(
f"intelmas show -intelssd {i} | grep DevicePath").stdout.split()[2]
dev = device_path.replace("/dev/", "")
if "sg" in dev:
sata_dev = TestRun.executor.run_expect_success(
f"sg_map | grep {dev}").stdout.split()[1]
dev = sata_dev.replace("/dev/", "")
if dev not in block_devices:
continue
serial_number = TestRun.executor.run_expect_success(
f"intelmas show -intelssd {i} | grep SerialNumber").stdout.split()[2].strip()
if 'nvme' not in device_path:
disk_type = 'sata'
device_path = dev
elif TestRun.executor.run(
f"intelmas show -intelssd {i} | grep Optane").exit_code == 0:
disk_type = 'optane'
else:
disk_type = 'nand'
devices_res.append({
"type": disk_type,
"path": resolve_to_by_id_link(device_path),
"serial": serial_number,
"blocksize": disk_utils.get_block_size(dev),
"size": disk_utils.get_size(dev)})
block_devices.remove(dev)
def get_disk_serial_number(dev_path):
commands = [
f"(udevadm info --query=all --name={dev_path} | grep 'SCSI.*_SERIAL' || "
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL_SHORT') | "
"awk -F '=' '{print $NF}'",
f"sg_inq {dev_path} 2> /dev/null | grep '[Ss]erial number:' | "
"awk '{print $NF}'",
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL' | "
"awk -F '=' '{print $NF}'"
]
for command in commands:
serial = TestRun.executor.run(command).stdout
if serial:
return serial.split('\n')[0]
return None
def get_all_serial_numbers():
serial_numbers = {}
block_devices = get_block_devices_list()
for dev in block_devices:
serial = get_disk_serial_number(dev)
try:
path = resolve_to_by_id_link(dev)
except Exception:
continue
if serial:
serial_numbers[serial] = path
else:
TestRun.LOGGER.warning(f"Device {path} ({dev}) does not have a serial number.")
serial_numbers[path] = path
return serial_numbers
def get_system_disks():
system_device = TestRun.executor.run_expect_success('mount | grep " / "').stdout.split()[0]
readlink_output = readlink(system_device)
device_name = readlink_output.split('/')[-1]
sys_block_path = os_utils.get_sys_block_path()
used_device_names = __get_slaves(device_name)
if not used_device_names:
used_device_names = [device_name]
disk_names = []
for device_name in used_device_names:
if check_if_file_exists(f'{sys_block_path}/{device_name}/partition'):
parent_device = readlink(f'{sys_block_path}/{device_name}/..').split('/')[-1]
disk_names.append(parent_device)
else:
disk_names.append(device_name)
return disk_names
def __get_slaves(device_name: str):
try:
device_names = TestRun.executor.run_expect_success(
f'ls {os_utils.get_sys_block_path()}/{device_name}/slaves').stdout.splitlines()
except CmdException as e:
if "No such file or directory" not in e.output.stderr:
raise
return None
device_list = []
for device_name in device_names:
slaves = __get_slaves(device_name)
if slaves:
for slave in slaves:
device_list.append(slave)
else:
device_list.append(device_name)
return device_list
def resolve_to_by_id_link(path):
by_id_paths = TestRun.executor.run_expect_success("ls /dev/disk/by-id -1").stdout.splitlines()
dev_full_paths = [posixpath.join("/dev/disk/by-id", by_id_path) for by_id_path in by_id_paths]
for full_path in dev_full_paths:
# handle exception for broken links
try:
if readlink(full_path) == readlink(posixpath.join("/dev", path)):
return full_path
except CmdException:
continue
raise ValueError(f'By-id device link not found for device {path}')