Merge pull request #27 from katlapinka/kasiat/lvm-resolve-os-disk

Prevent removing OS partition logical volume
This commit is contained in:
Katarzyna Treder 2024-11-26 12:40:51 +01:00 committed by GitHub
commit 6dc07dad3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 33 additions and 19 deletions

View File

@ -165,7 +165,7 @@ class LvmConfiguration:
TestRun.executor.run(cmd) TestRun.executor.run(cmd)
@staticmethod @staticmethod
def set_use_devices_file(use_devices_file=False): def set_use_devices_file(use_devices_file=True):
cmd = (fr"sed -i 's/^\s*#*\s*\(use_devicesfile\).*/\t\1 = " cmd = (fr"sed -i 's/^\s*#*\s*\(use_devicesfile\).*/\t\1 = "
fr"{1 if use_devices_file else 0}/' {lvm_config_path}") fr"{1 if use_devices_file else 0}/' {lvm_config_path}")
TestRun.executor.run(cmd) TestRun.executor.run(cmd)
@ -460,13 +460,8 @@ class Lvm(Disk):
return cls.discover_logical_volumes() return cls.discover_logical_volumes()
@staticmethod @staticmethod
def remove(lv_name: str, vg_name: str): def remove(lv_path: str):
if not lv_name: cmd = f"lvremove -f {lv_path}"
raise ValueError("LV name needed for LV remove operation.")
if not vg_name:
raise ValueError("VG name needed for LV remove operation.")
cmd = f"lvremove -f {vg_name}/{lv_name}"
return TestRun.executor.run(cmd) return TestRun.executor.run(cmd)
@staticmethod @staticmethod
@ -477,22 +472,41 @@ class Lvm(Disk):
cmd = f"pvremove {pv_name}" cmd = f"pvremove {pv_name}"
return TestRun.executor.run(cmd) return TestRun.executor.run(cmd)
@staticmethod
def get_os_vg():
disks = get_system_disks()
cmd = (f"pvdisplay -c | grep -e /dev/{' -e /dev/'.join(disks)} | "
"awk -F':' '$2 != \"\" {print $2}'") # display non-empty groups
os_vg_names = TestRun.executor.run(cmd).stdout
if os_vg_names:
return set(os_vg_names.split("\n")) # remove duplicates
return []
@staticmethod
def get_non_os_vg():
disks = get_system_disks()
cmd = (f"pvdisplay -c | grep -Ev \'/dev/{'|/dev'.join(disks)}' | "
"awk -F':' '$2 != \"\" {print $2}\'") # display non-empty groups
non_os_vg_names = TestRun.executor.run(cmd).stdout
if non_os_vg_names:
return set(non_os_vg_names.split("\n")) # remove duplicates
return []
@classmethod @classmethod
def remove_all(cls): def remove_all(cls):
cmd = f"lvdisplay | grep 'LV Path' | awk '{{print $3}}'" non_os_vg_names = Lvm.get_non_os_vg()
lvm_paths = TestRun.executor.run(cmd).stdout.splitlines() cmd = "lvdisplay -c | awk -F':' '{{print $1,$2}}'" # prints lv_path vg_name
for lvm_path in lvm_paths: lvs = [tuple(lv.strip().split(' ')) for lv in TestRun.executor.run(cmd).stdout.splitlines()]
lv_name = lvm_path.split('/')[-1] [cls.remove(lv[0]) for lv in lvs if lv[1] in non_os_vg_names]
vg_name = lvm_path.split('/')[-2]
cls.remove(lv_name, vg_name)
cmd = f"vgdisplay | grep 'VG Name' | awk '{{print $3}}'" for vg_name in non_os_vg_names:
vg_names = TestRun.executor.run(cmd).stdout.splitlines()
for vg_name in vg_names:
TestRun.executor.run(f"vgchange -an {vg_name}") TestRun.executor.run(f"vgchange -an {vg_name}")
VolumeGroup.remove(vg_name) VolumeGroup.remove(vg_name)
cmd = f"pvdisplay | grep 'PV Name' | awk '{{print $3}}'" cmd = f"pvdisplay | grep 'PV Name' | awk '{{print $3}}'"
os_disks = get_system_disks()
# invert grep to make sure os_disks won`t be wiped during lvms cleanup
cmd += "".join([f" | grep -v {os_disk}" for os_disk in os_disks])
pv_names = TestRun.executor.run(cmd).stdout.splitlines() pv_names = TestRun.executor.run(cmd).stdout.splitlines()
for pv_name in pv_names: for pv_name in pv_names:
cls.remove_pv(pv_name) cls.remove_pv(pv_name)

View File

@ -107,11 +107,11 @@ def get_disk_serial_number(dev_path):
commands = [ commands = [
f"(udevadm info --query=all --name={dev_path} | grep 'SCSI.*_SERIAL' || " f"(udevadm info --query=all --name={dev_path} | grep 'SCSI.*_SERIAL' || "
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL_SHORT') | " f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL_SHORT') | "
"awk --field-separator '=' '{print $NF}'", "awk -F '=' '{print $NF}'",
f"sg_inq {dev_path} 2> /dev/null | grep '[Ss]erial number:' | " f"sg_inq {dev_path} 2> /dev/null | grep '[Ss]erial number:' | "
"awk '{print $NF}'", "awk '{print $NF}'",
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL' | " f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL' | "
"awk --field-separator '=' '{print $NF}'" "awk -F '=' '{print $NF}'"
] ]
for command in commands: for command in commands:
serial = TestRun.executor.run(command).stdout serial = TestRun.executor.run(command).stdout