opencas-test-framework/connection/base_executor.py
Katarzyna Treder 161cc7957a Move output to connection utils
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 12:29:44 +01:00

99 lines
3.7 KiB
Python

#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import time
from datetime import timedelta
from core.test_run import TestRun
from connection.utils.output import CmdException
class BaseExecutor:
def _execute(self, command, timeout):
raise NotImplementedError()
def _rsync(self, src, dst, delete, symlinks, checksum, exclude_list, timeout,
dut_to_controller):
raise NotImplementedError()
def _copy(self, src, dst, dut_to_controller: bool):
raise NotImplementedError()
def rsync_to(self, src, dst, delete=False, symlinks=False, checksum=False, exclude_list=[],
timeout: timedelta = timedelta(seconds=90)):
return self._rsync(src, dst, delete, symlinks, checksum, exclude_list, timeout, False)
def rsync_from(self, src, dst, delete=False, symlinks=False, checksum=False, exclude_list=[],
timeout: timedelta = timedelta(seconds=90)):
return self._rsync(src, dst, delete, symlinks, checksum, exclude_list, timeout, True)
def copy_to(self, src, dst):
return self._copy(src, dst, True)
def copy_from(self, src, dst):
return self._copy(src, dst, False)
def is_remote(self):
return False
def is_active(self):
return True
def wait_for_connection(self, timeout: timedelta = None):
pass
def resolve_ip_address(self):
return "127.0.0.1"
def run(self, command, timeout: timedelta = timedelta(minutes=30)):
if TestRun.dut and TestRun.dut.env:
command = f"{TestRun.dut.env} && {command}"
command_id = TestRun.LOGGER.get_new_command_id()
ip_info = TestRun.dut.ip if len(TestRun.duts) > 1 else ""
TestRun.LOGGER.write_command_to_command_log(command, command_id, info=ip_info)
output = self._execute(command, timeout)
TestRun.LOGGER.write_output_to_command_log(output, command_id)
return output
def run_in_background(self,
command,
stdout_redirect_path="/dev/null",
stderr_redirect_path="/dev/null"):
command += f" > {stdout_redirect_path} 2> {stderr_redirect_path} &echo $!"
output = self.run(command)
if output is not None:
return int(output.stdout)
def wait_cmd_finish(self, pid: int, timeout: timedelta = timedelta(minutes=30)):
self.run(f"tail --pid={pid} -f /dev/null", timeout)
def check_if_process_exists(self, pid: int):
output = self.run(f"ps aux | awk '{{print $2 }}' | grep ^{pid}$", timedelta(seconds=10))
return output.exit_code == 0
def kill_process(self, pid: int):
# TERM signal should be used in preference to the KILL signal, since a
# process may install a handler for the TERM signal in order to perform
# clean-up steps before terminating in an orderly fashion.
self.run(f"kill -s SIGTERM {pid} &> /dev/null")
time.sleep(3)
self.run(f"kill -s SIGKILL {pid} &> /dev/null")
def run_expect_success(self, command, timeout: timedelta = timedelta(minutes=30)):
output = self.run(command, timeout)
if output.exit_code != 0:
raise CmdException(f"Exception occurred while trying to execute '{command}' command.",
output)
return output
def run_expect_fail(self, command, timeout: timedelta = timedelta(minutes=30)):
output = self.run(command, timeout)
if output.exit_code == 0:
raise CmdException(f"Command '{command}' executed properly but error was expected.",
output)
return output