test-framework: fix code style

Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
This commit is contained in:
Kamil Gierszewski 2024-05-15 23:12:10 +02:00
parent 5051ef1f1a
commit e0b7c56b16
No known key found for this signature in database
2 changed files with 105 additions and 59 deletions

View File

@ -19,14 +19,24 @@ class LocalExecutor(BaseExecutor):
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout.total_seconds())
timeout=timeout.total_seconds(),
)
return Output(completed_process.stdout,
completed_process.stderr,
completed_process.returncode)
return Output(
completed_process.stdout, completed_process.stderr, completed_process.returncode
)
def _rsync(self, src, dst, delete=False, symlinks=False, checksum=False, exclude_list=[],
timeout: timedelta = timedelta(seconds=90), dut_to_controller=False):
def _rsync(
self,
src,
dst,
delete=False,
symlinks=False,
checksum=False,
exclude_list=[],
timeout: timedelta = timedelta(seconds=90),
dut_to_controller=False,
):
options = []
if delete:
@ -45,7 +55,8 @@ class LocalExecutor(BaseExecutor):
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout.total_seconds())
timeout=timeout.total_seconds(),
)
if completed_process.returncode:
raise Exception(f"rsync failed:\n{completed_process}")

View File

@ -28,8 +28,7 @@ class SshExecutor(BaseExecutor):
def __del__(self):
self.ssh.close()
def connect(self, user=None, port=None,
timeout: timedelta = timedelta(seconds=30)):
def connect(self, user=None, port=None, timeout: timedelta = timedelta(seconds=30)):
hostname = self.host
user = user or self.user
port = port or self.port
@ -37,48 +36,62 @@ class SshExecutor(BaseExecutor):
config, sock, key_filename = None, None, None
# search for 'host' in SSH config
try:
path = os.path.expanduser('~/.ssh/config')
path = os.path.expanduser("~/.ssh/config")
config = paramiko.SSHConfig.from_path(path)
except FileNotFoundError:
pass
if config is not None:
target = config.lookup(self.host)
hostname = target['hostname']
key_filename = target.get('identityfile', None)
user = target.get('user', user)
port = target.get('port', port)
if target.get('proxyjump', None) is not None:
proxy = config.lookup(target['proxyjump'])
hostname = target["hostname"]
key_filename = target.get("identityfile", None)
user = target.get("user", user)
port = target.get("port", port)
if target.get("proxyjump", None) is not None:
proxy = config.lookup(target["proxyjump"])
jump = paramiko.SSHClient()
jump.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
jump.connect(proxy['hostname'], username=proxy['user'],
port=int(proxy.get('port', 22)), key_filename=proxy.get('identityfile', None))
jump.connect(
proxy["hostname"],
username=proxy["user"],
port=int(proxy.get("port", 22)),
key_filename=proxy.get("identityfile", None),
)
transport = jump.get_transport()
local_addr = (proxy['hostname'], int(proxy.get('port', 22)))
local_addr = (proxy["hostname"], int(proxy.get("port", 22)))
dest_addr = (hostname, port)
sock = transport.open_channel("direct-tcpip", dest_addr, local_addr)
except Exception as e:
raise ConnectionError(f"An exception of type '{type(e)}' occurred while trying to "
f"connect to proxy '{proxy['hostname']}'.\n {e}")
raise ConnectionError(
f"An exception of type '{type(e)}' occurred while trying to "
f"connect to proxy '{proxy['hostname']}'.\n {e}"
)
if user is None:
TestRun.block("There is no user given in config.")
try:
self.ssh.connect(hostname, username=user,
port=port, timeout=timeout.total_seconds(),
banner_timeout=timeout.total_seconds(),
sock=sock, key_filename=key_filename)
self.ssh.connect(
hostname,
username=user,
port=port,
timeout=timeout.total_seconds(),
banner_timeout=timeout.total_seconds(),
sock=sock,
key_filename=key_filename,
)
self.ssh_config = config
except paramiko.AuthenticationException as e:
raise paramiko.AuthenticationException(
f"Authentication exception occurred while trying to connect to DUT. "
f"Please check your SSH key-based authentication.\n{e}")
f"Please check your SSH key-based authentication.\n{e}"
)
except (paramiko.SSHException, socket.timeout) as e:
raise ConnectionError(f"An exception of type '{type(e)}' occurred while trying to "
f"connect to {hostname}.\n {e}")
raise ConnectionError(
f"An exception of type '{type(e)}' occurred while trying to "
f"connect to {hostname}.\n {e}"
)
def disconnect(self):
try:
@ -88,16 +101,27 @@ class SshExecutor(BaseExecutor):
def _execute(self, command, timeout):
try:
(stdin, stdout, stderr) = self.ssh.exec_command(command,
timeout=timeout.total_seconds())
(stdin, stdout, stderr) = self.ssh.exec_command(
command, timeout=timeout.total_seconds()
)
except paramiko.SSHException as e:
raise ConnectionError(f"An exception occurred while executing command '{command}' on"
f" {self.host}\n{e}")
raise ConnectionError(
f"An exception occurred while executing command '{command}' on" f" {self.host}\n{e}"
)
return Output(stdout.read(), stderr.read(), stdout.channel.recv_exit_status())
def _rsync(self, src, dst, delete=False, symlinks=False, checksum=False, exclude_list=[],
timeout: timedelta = timedelta(seconds=90), dut_to_controller=False):
def _rsync(
self,
src,
dst,
delete=False,
symlinks=False,
checksum=False,
exclude_list=[],
timeout: timedelta = timedelta(seconds=90),
dut_to_controller=False,
):
options = []
if delete:
@ -110,22 +134,29 @@ class SshExecutor(BaseExecutor):
for exclude in exclude_list:
options.append(f"--exclude {exclude}")
src_to_dst = f"{self.user}@{self.host}:{src} {dst} " if dut_to_controller else\
f"{src} {self.user}@{self.host}:{dst} "
src_to_dst = (
f"{self.user}@{self.host}:{src} {dst} "
if dut_to_controller
else f"{src} {self.user}@{self.host}:{dst} "
)
try:
completed_process = subprocess.run(
f'rsync -r -e "ssh -p {self.port} '
f'-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no" '
+ src_to_dst + f'{" ".join(options)}',
+ src_to_dst
+ f'{" ".join(options)}',
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout.total_seconds())
timeout=timeout.total_seconds(),
)
except Exception as e:
TestRun.LOGGER.exception(f"Exception occurred during rsync process. "
f"Please check your SSH key-based authentication.\n{e}")
TestRun.LOGGER.exception(
f"Exception occurred during rsync process. "
f"Please check your SSH key-based authentication.\n{e}"
)
if completed_process.returncode:
raise Exception(f"rsync failed:\n{completed_process}")
@ -147,12 +178,13 @@ class SshExecutor(BaseExecutor):
def reboot(self):
self.run("reboot")
self.wait_for_connection_loss()
self.wait_for_connection(timedelta(seconds=self.reboot_timeout)) \
if self.reboot_timeout is not None else self.wait_for_connection()
self.wait_for_connection(
timedelta(seconds=self.reboot_timeout)
) if self.reboot_timeout is not None else self.wait_for_connection()
def is_active(self):
try:
self.ssh.exec_command('', timeout=5)
self.ssh.exec_command("", timeout=5)
return True
except Exception:
return False
@ -184,28 +216,28 @@ class SshExecutor(BaseExecutor):
def resolve_ip_address(self):
user, hostname, port = self.user, self.host, self.port
key_file = None
pattern = br"^Authenticated to.+\[(\d+\.\d+\.\d+\.\d+)].*$"
pattern = rb"^Authenticated to.+\[(\d+\.\d+\.\d+\.\d+)].*$"
param, command = " -v", "''"
try:
if self.ssh_config:
host = self.ssh_config.lookup(self.host)
if re.fullmatch(r"^\d+\.\d+\.\d+\.\d+$", host['hostname']):
return host['hostname']
if re.fullmatch(r"^\d+\.\d+\.\d+\.\d+$", host["hostname"]):
return host["hostname"]
if host.get('proxyjump', None) is not None:
proxy = self.ssh_config.lookup(host['proxyjump'])
if host.get("proxyjump", None) is not None:
proxy = self.ssh_config.lookup(host["proxyjump"])
user = proxy.get('user', user)
hostname = proxy['hostname']
port = proxy.get('port', port)
key_file = proxy.get('identityfile', key_file)
user = proxy.get("user", user)
hostname = proxy["hostname"]
port = proxy.get("port", port)
key_file = proxy.get("identityfile", key_file)
command = f"nslookup {host['hostname']}"
pattern = br"^Address:\s+(\d+\.\d+\.\d+\.\d+)\s*$"
pattern = rb"^Address:\s+(\d+\.\d+\.\d+\.\d+)\s*$"
param = ""
else:
user = host.get('user', user)
port = host.get('port', port)
key_file = host.get('identityfile', key_file)
user = host.get("user", user)
port = host.get("port", port)
key_file = host.get("identityfile", key_file)
user_str = f"{user}@"
identity_str = f" -i {os.path.abspath(key_file[0])}" if key_file else ""
@ -215,8 +247,11 @@ class SshExecutor(BaseExecutor):
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=30)
matches = re.findall(pattern, completed_process.stdout + completed_process.stderr, re.MULTILINE)
return matches[-1].decode('utf-8')
timeout=30,
)
matches = re.findall(
pattern, completed_process.stdout + completed_process.stderr, re.MULTILINE
)
return matches[-1].decode("utf-8")
except:
return None