1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2023 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import os
18import time
19import json
20import stat
21import argparse
22import subprocess
23import multiprocessing
24import xml.etree.ElementTree as ET
25from datetime import datetime
26
27
28def parse_xml(xml_file_path):
29    """
30    Parse the XML file of the execution output of the use case
31    """
32    tree = ET.parse(xml_file_path)
33    root = tree.getroot()
34    tests = root.attrib.get("tests")
35    failures = root.attrib.get("failures")
36    failed_info = {
37        "test_suite_name": [],
38        "total_count": tests,
39        "failed_count": failures,
40        "failed_testcase_name": []
41    }
42    for testsuite in root.findall(".//testsuite"):
43        testsuite_name = testsuite.attrib.get("name")
44        testsuite_failures = testsuite.attrib.get("failures")
45        if int(testsuite_failures):
46            failed_info["test_suite_name"].append(testsuite_name)
47        for testcase in testsuite.findall(".//testcase"):
48            testcase_name = testcase.attrib.get("name")
49            failure = testcase.find("failure")
50            if failure is not None:
51                failed_info["failed_testcase_name"].append(testcase_name)
52    return failed_info
53
54
55def run_command(test_binary_path: str, alter_cmds: list = None):
56    """
57    Run a gtest test binary.
58    """
59    default_cmds = []
60    default_cmds.append(test_binary_path)
61    default_cmds.append("--gtest_output=xml:{}.xml".format(test_binary_path))
62    default_cmds.append("--gtest_print_time=0")
63    default_cmds.append("--gtest_brief=1")
64    if alter_cmds is not None:
65        default_cmds.extend(alter_cmds)
66    subprocess.run(default_cmds)
67
68
69def run_single_test(tests_path, test_suite_name):
70    """
71    Run a gtest test suite
72    """
73    test_suite_path = None
74    for root, _, files in os.walk(tests_path):
75        for file in files:
76            if file.endswith(test_suite_name):
77                test_suite_path =  os.path.join(root, test_suite_name)
78    if test_suite_path is not None:
79        run_command(test_suite_path)
80    else:
81        print("TestSuite {} did not compile successfully.".format(test_suite_name))
82
83
84def run_tests_parallel(test_directory):
85    """
86    Run all gtest test binaries in parallel.
87    """
88    test_binaries = []
89    for root, _, files in os.walk(test_directory):
90        for file in files:
91            test_suite_path = os.path.join(root, file)
92            name, ext = os.path.splitext(file)
93            if ext == "":
94                test_binaries.append(test_suite_path)
95    start = time.time()
96    with multiprocessing.Pool(processes=64) as pool:
97        pool.map(run_command, iter(test_binaries))
98    end = time.time()
99    test_result = {
100        "time_stamp": str(datetime.now()),
101        "execute_time": 0,
102        "total_execute_tests": 0,
103        "failed_tests_count": 0,
104        "unavailable": [],
105        "failed": []
106    }
107    total_tests_count = 0
108    failed_tests_count = 0
109    for test_binary in test_binaries:
110        xml_file_path = "{}.xml".format(test_binary)
111        if os.path.exists(xml_file_path):
112            failed_info = parse_xml(xml_file_path)
113            total_tests_count = total_tests_count + int(failed_info.get('total_count', '0'))
114            failed_tests_count = failed_tests_count + int(failed_info.get('failed_count', '0'))
115            if int(failed_info.get('failed_count', '0')):
116                test_result['failed'].append(failed_info)
117        else:
118            test_result["unavailable"].append(test_binary.split('/')[-1])
119    test_result["execute_time"] = "{} seconds".format(round(end - start, 2))
120    test_result['total_execute_tests'] = total_tests_count
121    test_result['failed_tests_count'] = failed_tests_count
122    json_file_path = os.path.join(test_directory, "test_result.json")
123    flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
124    mode = stat.S_IRUSR | stat.S_IWUSR
125    with os.fdopen(os.open(json_file_path, flags, mode), 'w') as json_file:
126        json.dump(test_result, json_file, indent=2)
127
128    print("The test results have been generated, path is {}".format(json_file_path))
129
130
131def get_tests_out_path():
132    """
133    Obtain the output directory of test cases
134    """
135    code_path = os.getcwd()
136    for _ in range(6):
137        code_path = os.path.dirname(code_path)
138    code_path = os.path.join(code_path, "out/rk3568/clang_x64/tests/unittest/ace_engine")
139    return code_path
140
141
142def main():
143    """
144    Add unitest case execution parameters
145    """
146    parser = argparse.ArgumentParser()
147    parser.add_argument("-t", "--target", nargs='+', type=str, default=None)
148    tests_out_path = get_tests_out_path()
149    args = parser.parse_args()
150    targets = args.target
151    if targets is not None:
152        for target in targets:
153            run_single_test(tests_out_path, target)
154    else:
155        run_tests_parallel(tests_out_path)
156
157
158if __name__ == "__main__":
159    main()
160