Merge pull request #4 from katlapinka/kasiat/pci-address

Add methods getting pci disks address
This commit is contained in:
Daniel Madej 2024-07-24 13:43:25 +02:00 committed by GitHub
commit 521d1dd4c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,7 +2,6 @@
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import itertools
import json
import re
from datetime import timedelta
@ -102,6 +101,7 @@ class Disk(Device):
self.block_size = Unit(block_size)
self.disk_type = disk_type
self.partitions = []
self.pci_address = None
def create_partitions(
self,
@ -163,7 +163,7 @@ class Disk(Device):
def __str__(self):
disk_str = f'system path: {self.path}, type: {self.disk_type.name}, ' \
f'serial: {self.serial_number}, size: {self.size}, ' \
f'block size: {self.block_size}, partitions:\n'
f'block size: {self.block_size}, pci address: {self.pci_address}, partitions:\n'
for part in self.partitions:
disk_str += f'\t{part}'
return disk_str
@ -175,10 +175,12 @@ class Disk(Device):
block_size):
resolved_disk_type = None
for resolved_disk_type in [NvmeDisk, SataDisk, VirtioDisk]:
for checked_type in [NvmeDisk, SataDisk, VirtioDisk]:
try:
resolved_disk_type.get_unplug_path()
except:
checked_type.get_unplug_path(fs_utils.readlink(path).split('/')[-1])
resolved_disk_type = checked_type
break
except Exception:
continue
if resolved_disk_type is None:
@ -195,12 +197,7 @@ class NvmeDisk(Disk):
self.plug_command = NvmeDisk.plug_all_command
self.unplug_command = f"echo 1 > /sys/block/{self.get_device_id()}/device/remove || " \
f"echo 1 > /sys/block/{self.get_device_id()}/device/device/remove"
self.pci_address = get_pci_address(self.get_device_id())
def __str__(self):
disk_str = super().__str__()
disk_str = f"pci address: {self.pci_address}, " + disk_str
return disk_str
self.pci_address = NvmeDisk.get_pci_address(self.get_device_id())
def format_disk(self, metadata_size=None, block_size=None,
force=True, format_params=None, reset=True):
@ -226,6 +223,10 @@ class NvmeDisk(Disk):
raise Exception(f"Couldn't create unplug path for {device_id}")
@classmethod
def get_pci_address(cls, device_id):
return TestRun.executor.run(f"cat /sys/block/{device_id}/device/address").stdout
class SataDisk(Disk):
plug_all_command = "for i in $(find -H /sys/devices/ -path '*/scsi_host/*/scan' -type f); " \
@ -236,24 +237,37 @@ class SataDisk(Disk):
self.plug_command = SataDisk.plug_all_command
self.unplug_command = \
f"echo 1 > {self.get_unplug_path(self.get_device_id())}"
self.pci_address = SataDisk.get_pci_address(self.get_device_id())
@classmethod
def get_unplug_path(cls, device_id):
sysfs_addr = cls.get_sysfs_addr(device_id)
try:
cls.get_pci_address(device_id)
except Exception as e:
raise Exception(f"Failed to find controller for {device_id}.\n{e}")
return sysfs_addr + "/device/delete"
@classmethod
def get_sysfs_addr(cls, device_id):
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0]
if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")
dirs = sysfs_addr.full_path.split('/')
scsi_address = dirs[-3]
try:
re.search(
r"^\d+[-:]\d+[-:]\d+[-:]\d+$",
scsi_address)
except:
raise Exception(f"Failed to find controller for {device_id}")
return sysfs_addr.full_path + "/device/delete"
return sysfs_addr.full_path
@classmethod
def get_pci_address(cls, device_id):
sysfs_addr = cls.get_sysfs_addr(device_id)
pci_address = re.findall(r"\d+:\d+:\d+.\d+", sysfs_addr)
if not pci_address:
raise Exception(f"Failed to get the pci address of {device_id} device.")
return pci_address[-1]
class VirtioDisk(Disk):
@ -264,26 +278,37 @@ class VirtioDisk(Disk):
self.plug_command = VirtioDisk.plug_all_command
self.unplug_command = \
f"echo 1 > {self.get_unplug_path(self.get_device_id())}"
self.pci_address = VirtioDisk.get_pci_address(self.get_device_id())
@classmethod
def get_unplug_path(cls, device_id):
sysfs_path = VirtioDisk.get_sysfs_addr(device_id)
pci_addr = VirtioDisk.get_pci_address(device_id)
unplug_path = re.search(f".*{pci_addr}", sysfs_path)
if not unplug_path:
raise Exception(f"Failed to find controller for {device_id}")
return unplug_path.group(0)
@classmethod
def get_pci_address(cls, device_id):
sysfs_addr = VirtioDisk.get_sysfs_addr(device_id)
pci_address = re.findall(r"\d+:[\da-f]+:[\da-f]+.\d+", sysfs_addr)
if not pci_address:
raise Exception(f"Failed to get the pci address of {device_id} device.")
return pci_address[-1]
@classmethod
def get_sysfs_addr(cls, device_id):
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0]
if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")
dirs = sysfs_addr.full_path.split("/")
return sysfs_addr.full_path
for i, path_component in enumerate(dirs[::-1]):
# Search for scsi address in sysfs path
matches = re.search(
r"^\d+:\d+:\d+.\d+$",
path_component)
if matches:
break
else:
raise Exception(f"Failed to find controller for {device_id}")
return "/".join(dirs[:-i]) + "/remove"