opencas-test-framework/connection/local_executor.py
Katarzyna Treder fd869a0afc Refactor disk tools and fs tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-11 19:13:39 +01:00

65 lines
1.8 KiB
Python

#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import os
import subprocess
from datetime import timedelta
from connection.base_executor import BaseExecutor
from core.test_run import TestRun
from test_tools.fs_tools import copy
from connection.utils.output import Output, CmdException
class LocalExecutor(BaseExecutor):
def __init__(self):
default_executable_path = "/bin/bash" if os.name == 'posix' else None
self._executable_path = TestRun.config.get("executable_path", default_executable_path)
def _execute(self, command, timeout):
completed_process = subprocess.run(
command,
shell=True,
executable=self._executable_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout.total_seconds(),
)
return Output(
completed_process.stdout, completed_process.stderr, completed_process.returncode
)
def _rsync(
self,
src,
dst,
delete=False,
symlinks=False,
checksum=False,
exclude_list=[],
timeout: timedelta = timedelta(seconds=90),
dut_to_controller=False,
):
options = []
if delete:
options.append("--delete")
if symlinks:
options.append("--links")
if checksum:
options.append("--checksum")
for exclude in exclude_list:
options.append(f"--exclude {exclude}")
output = self._execute(f'rsync -r {src} {dst} {" ".join(options)}', timeout)
if output.exit_code:
raise CmdException("rsync failed", output)
def _copy(self, src, dst, dut_to_controller: bool):
copy(src, dst, recursive=True)