add open posix test cases on qemu and sim on CI

Signed-off-by: vela-mib <vela-mib@xiaomi.com>
This commit is contained in:
vela-mib
2024-02-20 10:54:27 +08:00
committed by Xiang Xiao
parent 306c1c0b7d
commit 8ff3b90742
10 changed files with 11684 additions and 48 deletions
+159 -35
View File
@@ -4,13 +4,43 @@ import os
import re
import subprocess
import time
from enum import Enum
import pexpect
import pexpect.fdpexpect
import pexpect.spawnbase
import serial
rootPath = os.path.dirname(os.path.abspath(__file__))
tmp_read_nonblocking = pexpect.spawnbase.SpawnBase.read_nonblocking
def enhanced_read_nonblocking(self, size=1, timeout=None):
return re.sub(
r"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]",
"",
tmp_read_nonblocking(self, size, timeout).decode(errors="ignore"),
).encode()
pexpect.spawnbase.SpawnBase.read_nonblocking = enhanced_read_nonblocking
class StatusCodeEnum(Enum):
NORMAL = (0, "Normal")
TIMEOUT_ERR = (-1, "Timeout")
EOF_ERR = (-2, "EOF")
CRASH_ERR = (-3, "Crash happened")
BUSYLOOP_ERR = (-4, "Busy loop happened")
UNKNOWN_ERR = (-5, "Unknown")
@staticmethod
def get_enum_msg_by_code(status_code):
for status in StatusCodeEnum:
if status.value[0] == status_code:
return status.value[1]
class connectNuttx(object):
def __init__(
@@ -40,6 +70,7 @@ class connectNuttx(object):
self.target = target
self.enter = "\r"
self.debug_flag = 0
self.format_str_len = 105
# get PROMPT value and rate value
self.PROMPT = getConfigValue(
self.path, self.board, core=self.core, flag="NSH_PROMPT_STRING"
@@ -107,38 +138,130 @@ class connectNuttx(object):
self.process.send(byte)
time.sleep(1)
self.process.send(byte)
time.sleep(1)
self.process.sendline("\n")
ret = self.process.expect_exact(expect)
return ret
def print_format_str(self, string, type="text"):
str_prefix = "+"
str_suffix = "+"
if type == "head":
rest_char_len = self.format_str_len - 2 - len(string)
half_len = int(rest_char_len / 2)
print(
str_prefix
+ "-" * half_len
+ string
+ "-" * (rest_char_len - half_len)
+ str_suffix
)
elif type == "tail":
rest_char_len = self.format_str_len - 2
print(str_prefix + "-" * rest_char_len + str_suffix)
elif type == "text":
str_prefix = "| "
str_suffix = " |"
rest_char_len = (
self.format_str_len - len(str_prefix) - len(str_suffix) - len(string)
)
print(
str_prefix
+ string
+ " " * (1 if rest_char_len < 1 else rest_char_len)
+ str_suffix
)
else:
print(string)
def clean_buffer(self):
i = -1
while True:
if (
(
self.process.before is not None
and self.process.before.decode(errors="ignore")
.replace("\r", "")
.replace("\n", "")
!= ""
)
or (
self.process.after is not None
and self.process.after != pexpect.TIMEOUT
and self.process.after.decode(errors="ignore")
.replace("\r", "")
.replace("\n", "")
!= ""
)
or i == 0
):
i = self.process.expect(
[re.compile(b".+"), pexpect.TIMEOUT, pexpect.EOF], timeout=0.1
)
else:
while True:
try:
self.process.read_nonblocking(
size=self.process.maxread, timeout=0.1
)
except Exception:
break
self.process.before = b""
self.process.after = b""
break
# send command to nsh
def sendCommand(self, cmd, expect="", timeout=10, flag=""):
def sendCommand(self, cmd, *argc, **argv):
expect = []
timeout = 10
ret = StatusCodeEnum.NORMAL.value[0]
length = len(argc)
if length == 0:
expect.append(self.PROMPT)
else:
for i in argc:
expect.append(i)
length = len(argv)
if length != 0:
for key, value in argv.items():
if key == "timeout":
timeout = value
if self.method != "minicom":
time.sleep(0.5)
if not expect:
expect = self.PROMPT
self.process.buffer = b""
self.process.sendline(cmd)
if self.target == "qemu":
self.clean_buffer()
self.process.sendline(cmd)
else:
self.clean_buffer()
self.process.sendline(cmd)
time.sleep(0.1)
self.process.send("\r\n\r\n")
try:
ret = self.process.expect(expect, timeout=timeout)
except pexpect.TIMEOUT:
print("Debug: TIMEOUT '%s' exist and run next test case" % cmd)
ret = -1
except pexpect.EOF:
print("Debug: EOF raise exception")
ret = -2
finally:
if self.debug_flag:
self.debug(cmd, ret)
self.process.buffer = b""
self.process.sendline("\n")
if flag:
is_newline = self.process.expect_exact(flag, timeout=timeout)
for i in expect:
ret = self.process.expect(i, timeout=timeout)
except Exception as e:
self.print_format_str(" Catch Exception ", type="head")
if isinstance(e, pexpect.TIMEOUT):
ret = StatusCodeEnum.TIMEOUT_ERR.value[0]
elif isinstance(e, pexpect.EOF):
ret = StatusCodeEnum.EOF_ERR.value[0]
self.print_format_str(f"An pexpect.EOF error occurred: {str(e)}")
else:
is_newline = self.process.expect_exact(self.PROMPT, timeout=timeout)
if self.debug_flag:
self.debug("NEWLINE", is_newline)
ret = StatusCodeEnum.UNKNOWN_ERR.value[0]
self.print_format_str(f"An unexpected error occurred: {str(e)}")
self.print_format_str(" Result ", type="head")
self.print_format_str(f"Command : '{cmd}'")
self.print_format_str(f"Expect value: {str(expect)}")
self.print_format_str(f"Timeout : {timeout}s")
self.print_format_str(
f"Test result : {StatusCodeEnum.get_enum_msg_by_code(ret)}"
)
self.print_format_str("", type="tail")
finally:
self.debug(cmd, ret)
if self.method != "minicom":
time.sleep(0.5)
return ret
@@ -151,15 +274,16 @@ class connectNuttx(object):
self.process.expect_exact(self.PROMPT)
def debug(self, cmd, ret):
print("********************* DEBUG START ********************")
if cmd == "\n":
cmd = r"\n"
print("cmd: %s\n" % cmd)
print("ret: %s\n" % str(ret))
print("before: %s\n" % repr(self.process.before))
print("after: %s\n" % repr(self.process.after))
print("buffer: %s\n" % repr(self.process.buffer))
print("********************** DEBUG END **********************")
if self.debug_flag:
print("********************* DEBUG START ********************")
if cmd == "\n":
cmd = r"\n"
print("cmd: {}".format(cmd))
print("ret: {}".format(ret))
print("before: {}".format(self.process.before.decode(errors="ignore")))
print("after: {}".format(self.process.after.decode(errors="ignore")))
print("buffer: {}".format(self.process.buffer.decode(errors="ignore")))
print("********************** DEBUG END **********************")
def cleanup(self):
if self.target == "sim":
+235
View File
@@ -0,0 +1,235 @@
import re
from datetime import datetime
from typing import Dict, List
"""
cmocka.json
"""
Passed = "Passed"
Failed = "Failed"
Unexecuted = "Unexecuted"
class CaseInfo:
def __init__(self, test_suite_name, test_case_name, status, log=None):
self.test_suite_name = test_suite_name
# case namee.g. "TestNuttxMm01"
self.test_case_name = test_case_name
# result: Passed or Failed
self.status = status
# log
self.log: List = [""] if log is None else log
class SuiteInfo:
def __init__(self, test_suite_name):
# suite name, e.g. "NuttxMmTestSuites"
self.test_suite_name = test_suite_name
# all test cases in the current test suite
self.test_cases: Dict[str, CaseInfo] = dict()
# number of cases passed in the current test suites
self.passed_count = 0
# number of cases failed in the current test suites
self.failed_count = 0
# case run count
self.run_count = 0
# unexecuted count
self.unexecuted_count = 0
# number of cases in the current test suites
self.cases_count = 0
# suite run flag
self.is_suite_run = False
class CmockaSummary:
def __init__(self, duration=0):
# number of all test suites
self.total_suites_count = 0
# all test cases number
self.total_cases_count = 0
# number of all passed cases
self.total_passed_count = 0
# number of all failed cases
self.total_failed_count = 0
# number of all unknown cases
self.total_unexecuted_count = 0
# duration
self.duration = duration
class CmockaSingleCoreRecord:
def __init__(self, lines, core="", board="", log="", duration=0):
# create time
self.create_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# core
self.core = "" if core is None else core
# board
self.board = "" if board is None else board
# cmocka info
self.test_suites: Dict[str, SuiteInfo] = dict()
# summary
self.summary = CmockaSummary(duration)
# log path
self.log = "" if log is None else log
# bad_case
self.bad_case_tip = ""
suite_pattern = r"\] (?P<test_suite_name>[a-zA-Z]*TestSuites)"
case_pattern = r"\]\s+(?P<test_case_name>TestNuttx\w+)"
lines_iter = iter(lines)
current_suite = None
while True:
try:
line = next(lines_iter)
if (suite_match := re.search(suite_pattern, line)) is not None:
current_suite = suite_match.group("test_suite_name")
elif (
current_suite is not None
and (case_match := re.search(case_pattern, line)) is not None
):
current_case = case_match.group("test_case_name")
self.append(CaseInfo(current_suite, current_case, Unexecuted))
except StopIteration:
break
def append(self, object: CaseInfo):
suite: SuiteInfo = self.test_suites.get(object.test_suite_name)
if suite is None:
suite = SuiteInfo(object.test_suite_name)
self.test_suites.update({object.test_suite_name: suite})
suite.test_cases.update({object.test_case_name: object})
passed_count = 0
failed_count = 0
unexecuted_count = 0
test_case: CaseInfo
for test_case in suite.test_cases.values():
if test_case.status == Passed:
passed_count += 1
elif test_case.status == Failed:
failed_count += 1
else:
unexecuted_count += 1
suite.passed_count = passed_count
suite.failed_count = failed_count
suite.unexecuted_count = unexecuted_count
suite.run_count = passed_count + failed_count
suite.cases_count = passed_count + failed_count + unexecuted_count
if passed_count + failed_count != 0:
suite.is_suite_run = True
total_passed_count = 0
total_failed_count = 0
total_unexecuted_count = 0
total_cases_count = 0
suite: SuiteInfo
for suite in self.test_suites.values():
total_passed_count += suite.passed_count
total_failed_count += suite.failed_count
total_unexecuted_count += suite.unexecuted_count
total_cases_count += suite.cases_count
self.summary.total_passed_count = total_passed_count
self.summary.total_failed_count = total_failed_count
self.summary.total_unexecuted_count = total_unexecuted_count
self.summary.total_cases_count = total_cases_count
self.summary.total_suites_count = len(self.test_suites)
def process(self, lines, err_code):
# regular expression
suite_start_pattern = r"\] (?P<test_suite_name>[a-zA-Z]*TestSuites): Running (?P<cases_count>\d+) test\(s\)"
case_run_pattern = r"\[\s+RUN\s+\] (?P<test_case_name>TestNuttx\w+)"
case_pass_pattern = r"\[\s+OK\s+\] (?P<test_case_name>TestNuttx\w+)"
case_fail_pattern = r"\[\s+FAILED\s+\] (?P<test_case_name>TestNuttx\w+)"
lines_iter = iter(lines)
line = next(lines_iter)
while True:
try:
interrupt_flag = False
# matching new test suites
if (
suite_start_match := re.search(suite_start_pattern, line)
) is not None:
test_suite_name = suite_start_match.group("test_suite_name")
cases_count = int(suite_start_match.group("cases_count"))
suite_end_pattern = r"{}: {} test(s) run.".format(
test_suite_name, cases_count
)
line = next(lines_iter)
while True:
if (
case_run_match := re.search(case_run_pattern, line)
) is not None:
test_case_name = case_run_match.group("test_case_name")
log = [line]
while True:
try:
line = next(lines_iter)
log.append(line)
except StopIteration:
self.append(
CaseInfo(
test_suite_name, test_case_name, Failed, log
)
)
if err_code == -3:
self.bad_case_tip = f"This case was not executed, \
because crash occurred after running '{test_case_name}'."
elif err_code == -4:
self.bad_case_tip = f"This case was not executed, \
because no response for a long time after running '{test_case_name}'."
elif err_code == -1:
self.bad_case_tip = f"This case was not executed, \
because the maximum waiting time has been exceeded \
while running '{test_case_name}'."
else:
self.bad_case_tip = "This case was not executed due to unknown reasons."
raise StopIteration
if re.search(case_pass_pattern, line) is not None:
self.append(
CaseInfo(
test_suite_name, test_case_name, Passed, log
)
)
break
elif re.search(case_fail_pattern, line) is not None:
self.append(
CaseInfo(
test_suite_name, test_case_name, Failed, log
)
)
break
elif re.search(suite_start_pattern, line) is not None:
self.append(
CaseInfo(
test_suite_name, test_case_name, Failed, log
)
)
interrupt_flag = True
break
elif suite_end_pattern in line:
break
if interrupt_flag:
break
line = next(lines_iter)
if interrupt_flag:
continue
line = next(lines_iter)
except StopIteration:
break