
For DUTs without IP defined use host when creating log dir (IP set to None caused an exception) Use paramiko method for copying log files (works when running tests from Windows) Signed-off-by: Daniel Madej <daniel.madej@huawei.com>
231 lines
7.5 KiB
Python
231 lines
7.5 KiB
Python
#
|
|
# Copyright(c) 2019-2021 Intel Corporation
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
#
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
from contextlib import contextmanager
|
|
from datetime import datetime
|
|
from threading import Lock
|
|
|
|
import portalocker
|
|
|
|
from log.html_log_config import HtmlLogConfig
|
|
from log.html_log_manager import HtmlLogManager
|
|
from log.html_presentation_policy import html_policy
|
|
from test_utils.output import Output
|
|
from test_utils.singleton import Singleton
|
|
|
|
|
|
def create_log(log_base_path, test_module, additional_args=None):
|
|
Log.setup()
|
|
log_cfg = HtmlLogConfig(base_dir=log_base_path,
|
|
presentation_policy=html_policy)
|
|
log = Log(log_config=log_cfg)
|
|
test_name = 'TestNameError'
|
|
error_msg = None
|
|
try:
|
|
test_name = test_module
|
|
if additional_args:
|
|
test_name += f"__{'_'.join(additional_args)}"
|
|
except Exception as ex:
|
|
error_msg = f'Detected some problems during calculating test name: {ex}'
|
|
finally:
|
|
log.begin(test_name)
|
|
print(f"\n<LogFile>{os.path.join(log.base_dir, 'main.html')}</LogFile>")
|
|
Log.add_file_logger(log.base_dir)
|
|
if error_msg:
|
|
log.exception(error_msg)
|
|
return log
|
|
|
|
|
|
class Log(HtmlLogManager, metaclass=Singleton):
|
|
logger = None
|
|
LOG_FORMAT = '%(asctime)s %(levelname)s:\t%(message)s'
|
|
DATE_FORMAT = "%Y/%m/%d %H:%M:%S"
|
|
command_id = 0
|
|
lock = Lock()
|
|
|
|
@classmethod
|
|
def destroy(cls):
|
|
del cls._instances[cls]
|
|
|
|
@classmethod
|
|
def setup(cls):
|
|
|
|
# Get handle to root logger.
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Set paramiko log level to warning
|
|
logging.getLogger('paramiko').setLevel(logging.WARNING)
|
|
|
|
# Create Handlers.
|
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
|
|
|
# Set logging level on handlers.
|
|
stdout_handler.setLevel(logging.DEBUG)
|
|
|
|
# Set log formatting on each handler.
|
|
formatter = logging.Formatter(Log.LOG_FORMAT, Log.DATE_FORMAT)
|
|
stdout_handler.setFormatter(formatter)
|
|
|
|
# Attach handlers to root logger.
|
|
logger.handlers = []
|
|
logger.addHandler(stdout_handler)
|
|
cls.logger = logger
|
|
logger.info("Logger successfully initialized.")
|
|
|
|
@classmethod
|
|
def add_file_logger(cls, log_path):
|
|
file_handler = logging.FileHandler(os.path.join(log_path, 'stdout.log'))
|
|
file_handler.setLevel(logging.DEBUG)
|
|
formatter = logging.Formatter(Log.LOG_FORMAT, Log.DATE_FORMAT)
|
|
file_handler.setFormatter(formatter)
|
|
cls.logger.addHandler(file_handler)
|
|
|
|
@contextmanager
|
|
def step(self, message):
|
|
self.step_info(message)
|
|
super(Log, self).start_group(message)
|
|
if Log.logger:
|
|
Log.logger.info(message)
|
|
yield
|
|
super(Log, self).end_group()
|
|
|
|
@contextmanager
|
|
def group(self, message):
|
|
self.start_group(message)
|
|
yield
|
|
self.end_group()
|
|
|
|
def add_build_info(self, msg):
|
|
super(Log, self).add_build_info(msg)
|
|
if Log.logger:
|
|
Log.logger.info(msg)
|
|
|
|
def info(self, msg):
|
|
super(Log, self).info(msg)
|
|
if Log.logger:
|
|
Log.logger.info(msg)
|
|
|
|
def debug(self, msg):
|
|
super(Log, self).debug(msg)
|
|
if Log.logger:
|
|
Log.logger.debug(msg)
|
|
|
|
def error(self, msg):
|
|
super(Log, self).error(msg)
|
|
if Log.logger:
|
|
Log.logger.error(msg)
|
|
|
|
def blocked(self, msg):
|
|
super(Log, self).blocked(msg)
|
|
if Log.logger:
|
|
Log.logger.fatal(msg)
|
|
|
|
def exception(self, msg):
|
|
super(Log, self).exception(msg)
|
|
if Log.logger:
|
|
Log.logger.exception(msg)
|
|
|
|
def critical(self, msg):
|
|
super(Log, self).critical(msg)
|
|
if Log.logger:
|
|
Log.logger.fatal(msg)
|
|
|
|
def workaround(self, msg):
|
|
super(Log, self).workaround(msg)
|
|
if Log.logger:
|
|
Log.logger.warning(msg)
|
|
|
|
def warning(self, msg):
|
|
super(Log, self).warning(msg)
|
|
if Log.logger:
|
|
Log.logger.warning(msg)
|
|
|
|
def get_new_command_id(self):
|
|
self.lock.acquire()
|
|
command_id = self.command_id
|
|
self.command_id += 1
|
|
self.lock.release()
|
|
return command_id
|
|
|
|
def write_to_command_log(self, message):
|
|
super(Log, self).debug(message)
|
|
command_log_path = os.path.join(self.base_dir, "dut_info", 'commands.log')
|
|
timestamp = datetime.now().strftime('%Y-%m-%d_%H:%M:%S:%f')
|
|
with portalocker.Lock(command_log_path, "ab+") as command_log:
|
|
line_to_write = f"[{timestamp}] {message}\n"
|
|
command_log.write(line_to_write.encode())
|
|
|
|
def write_command_to_command_log(self, command, command_id, info=None):
|
|
added_info = "" if info is None else f"[{info}] "
|
|
self.write_to_command_log(f"{added_info}Command id: {command_id}\n{command}")
|
|
|
|
def write_output_to_command_log(self, output: Output, command_id):
|
|
if output is not None:
|
|
line_to_write = f"Command id: {command_id}\n\texit code: {output.exit_code}\n" \
|
|
f"\tstdout: {output.stdout}\n" \
|
|
f"\tstderr: {output.stderr}\n\n\n"
|
|
self.write_to_command_log(line_to_write)
|
|
else:
|
|
self.write_to_command_log(f"Command id: {command_id}\n\tNone output.")
|
|
|
|
def step_info(self, step_name):
|
|
from core.test_run import TestRun
|
|
decorator = "// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //\n\n"
|
|
message = f"\n\n\n{decorator}{step_name}\n\n{decorator}\n"
|
|
|
|
try:
|
|
serial_monitor = TestRun.plugin_manager.get_plugin("serial_monitor")
|
|
serial_monitor.send_to_serial(message)
|
|
except (KeyError, AttributeError):
|
|
pass
|
|
self.write_to_command_log(message)
|
|
|
|
def get_additional_logs(self):
|
|
from core.test_run import TestRun
|
|
from test_tools.fs_utils import check_if_file_exists
|
|
messages_log = "/var/log/messages"
|
|
if not check_if_file_exists(messages_log):
|
|
messages_log = "/var/log/syslog"
|
|
log_files = {"messages.log": messages_log,
|
|
"dmesg.log": "/tmp/dmesg"}
|
|
extra_logs = TestRun.config.get("extra_logs", {})
|
|
log_files.update(extra_logs)
|
|
|
|
TestRun.executor.run(f"dmesg > {log_files['dmesg.log']}")
|
|
|
|
dut_identifier = TestRun.dut.ip if TestRun.dut.ip else TestRun.dut.config["host"]
|
|
for log_name, log_source_path in log_files.items():
|
|
try:
|
|
log_destination_path = os.path.join(
|
|
self.base_dir, "dut_info", dut_identifier, log_name
|
|
)
|
|
TestRun.executor.copy_from(log_source_path, log_destination_path)
|
|
except Exception as e:
|
|
TestRun.LOGGER.warning(
|
|
f"There was a problem during gathering {log_name} log.\n{str(e)}"
|
|
)
|
|
|
|
def generate_summary(self, item, meta):
|
|
import json
|
|
summary_path = os.path.join(self.base_dir, 'info.json')
|
|
with open(summary_path, "w+") as summary:
|
|
data = {
|
|
'module': os.path.relpath(item.fspath, os.getcwd()),
|
|
'function': item.name,
|
|
'meta': meta,
|
|
'status': self.get_result().name,
|
|
'path': os.path.normpath(self.base_dir),
|
|
'stage_status': {
|
|
'setup': getattr(item, "rep_setup", {}),
|
|
'call': getattr(item, "rep_call", {}),
|
|
'teardown': getattr(item, "rep_teardown", {})
|
|
}
|
|
}
|
|
json.dump(data, summary)
|