opencas-test-framework/test_tools/drbdadm.py
Robert Baldyga 40f08a369a Move test-framework to its own repository
Signed-off-by: Robert Baldyga <baldyga.r@gmail.com>
2023-05-01 18:55:34 +02:00

68 lines
2.1 KiB
Python

#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause-Clear
#
from core.test_run import TestRun
class Drbdadm:
# create metadata for resource
@staticmethod
def create_metadata(resource_name: str, force: bool):
cmd = "drbdadm create-md" + (" --force" if force else "") + f" {resource_name}"
return TestRun.executor.run_expect_success(cmd)
# enable resource
@staticmethod
def up(resource_name: str):
cmd = f"drbdadm up {resource_name}"
return TestRun.executor.run_expect_success(cmd)
# disable resource
@staticmethod
def down_all():
cmd = f"drbdadm down all"
return TestRun.executor.run_expect_success(cmd)
@staticmethod
def down(resource_name):
cmd = f"drbdadm down {resource_name}"
return TestRun.executor.run_expect_success(cmd)
# promote resource to primary
@staticmethod
def set_node_primary(resource_name: str, force=False):
cmd = f"drbdadm primary {resource_name}"
cmd += " --force" if force else ""
return TestRun.executor.run_expect_success(cmd)
# demote resource to secondary
@staticmethod
def set_node_secondary(resource_name: str):
cmd = f"drbdadm secondary {resource_name}"
return TestRun.executor.run_expect_success(cmd)
# check status for all or for specified resource
@staticmethod
def get_status(resource_name: str = ""):
cmd = f"drbdadm status {resource_name}"
return TestRun.executor.run_expect_success(cmd)
@staticmethod
def in_sync(resource_name: str):
cmd = f"drbdadm status {resource_name} | grep Inconsistent"
return TestRun.executor.run(cmd).exit_code == 1
# wait sync
@staticmethod
def wait_for_sync(resource_name: str = ""):
# ssh connection might timeout in case on long sync
cmd = f"drbdadm wait-sync {resource_name}"
return TestRun.executor.run_expect_success(cmd)
@staticmethod
def dump_config(resource_name: str):
cmd = f"drbdadm dump {resource_name}"
return TestRun.executor.run(cmd)