1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2024 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import re
22
23from util import exec_command, compare_target_files, file_exists
24from util import get_time_stamp, print_success, print_failure
25from util import get_idl
26
27
28class Test:
29    def __init__(self):
30        self.name = self.__class__.__name__
31        self._file_name = self.get_file_name()
32        self.working_dir = self.get_working_dir()
33        self.idl_file = os.path.join(self.working_dir, "foo", "IFoo.idl")
34        self.output_dir = os.path.join(self.working_dir, "out")
35        self.target_dir = os.path.join(self.working_dir, "target")
36        self._gen_langauge = "--gen-cpp"
37        self._idl = get_idl()
38        self._command_format = "{} {} --intf-type sa -c {} -d {}"
39        self.command = self._command_format.format(self._idl, self._gen_langauge, self.idl_file, self.output_dir)
40
41    def get_file_name(self):
42        # 子类继承实现
43        return ""
44
45    def get_working_dir(self):
46        current_path = os.path.dirname(os.path.abspath(__file__))
47        return os.path.join(current_path, os.path.splitext(os.path.basename(self._file_name))[0])
48
49    def set_command_attr(self, attr):
50        self._command_attr = attr
51
52    def set_command_gen_langauge(self, langauge):
53        self._gen_langauge = langauge
54
55    def set_output_dir(self, output_dir):
56        self.output_dir = os.path.join(self.working_dir, output_dir)
57
58    def set_target_dir(self, target_dir):
59        self.target_dir = os.path.join(self.working_dir, target_dir)
60
61    def set_idl_file(self, idl_file):
62        self.idl_file = os.path.join(self.working_dir, "foo", idl_file)
63
64    def update_command(self):
65        self.command = self._command_format.format(self._idl, self._gen_langauge, self.idl_file, self.output_dir)
66
67    def set_gen_c_env(self):
68        self.set_command_gen_langauge("--gen-c")
69        self.set_output_dir("out_c")
70        self.set_target_dir("target_c")
71        self.update_command()
72
73    def set_gen_cpp_env(self):
74        self.set_command_gen_langauge("--gen-cpp")
75        self.set_output_dir("out_cpp")
76        self.set_target_dir("target_cpp")
77        self.update_command()
78
79    def set_gen_rust_env(self):
80        self.set_command_gen_langauge("--gen-rust")
81        self.set_output_dir("out_rust")
82        self.set_target_dir("target_rust")
83        self.update_command()
84
85    def set_gen_ts_env(self):
86        self.set_command_gen_langauge("--gen-ts")
87        self.set_output_dir("out_ts")
88        self.set_target_dir("target_ts")
89        self.set_idl_file("IFooTs.idl")
90        self.update_command()
91
92    def set_cmd_test_env(self):
93        self._command_format = "{} --intf-type sa -c {} {}"
94        self.command = self._command_format.format(self._idl, self.idl_file, self._command_attr)
95
96    def run(self):
97        # please add test code here
98        return False
99
100    def run_choose(self, choose, no_output=False):
101        if choose:
102            result = self.run_success(no_output)
103        else:
104            result = self.run_fail()
105        self.remove_output()
106        return result
107
108    def deal_result(self, result):
109        # 子类继承实现
110        return result
111
112    def run_success(self, no_output=False):
113        status, result = exec_command(self.command)
114        expected_output = ""
115        if status != 0:
116            print_failure(f"[ERROR] command:{self.command} run err")
117            return False
118        if no_output is True:
119            return True
120        if file_exists(os.path.join(self.target_dir, "output.txt")):
121            with open(os.path.join(self.target_dir, "output.txt"), 'r') as target_output:
122                expected_output = target_output.read()
123            if self.deal_result(result) == expected_output:
124                return True
125            else:
126                print_failure(f"[ERROR] command:{self.command} not meet expectations")
127                return False
128        return compare_target_files(self.output_dir, self.target_dir)
129
130    def hdi_gen_fail_check_ignore_line(self, result: str, target: str):
131        fail_template = r"(.*): \[(\S+):\d+\] \[?(.*)error:(.*)"
132        result_lines = result.split("\n")
133        target_lines = target.split("\n")
134        if len(result_lines) != len(target_lines):
135            print_failure(f"[ERROR] result line(len(result_lines)) != target line(len(target_lines))")
136            return False
137
138        for result_line, target_line in zip(result_lines, target_lines):
139            lhd_obj = re.search(fail_template, result_line)
140            rhd_obj = re.search(fail_template, target_line)
141            if not lhd_obj and not rhd_obj:
142                if result_line == target_line:
143                    continue
144                else:
145                    print_failure(f"[ERROR] actual: {result_line}")
146                    print_failure(f"[ERROR] expect: {target_line}")
147                    return False
148            elif not lhd_obj or not rhd_obj:
149                print_failure(f"[ERROR] actual: {result_line}")
150                print_failure(f"[ERROR] expect: {target_line}")
151                return False
152            lhd_start_check_content = lhd_obj.group(1)
153            rhd_start_check_content = rhd_obj.group(1)
154            lhd_err_func_check_content = lhd_obj.group(2)
155            rhd_err_func_check_content = rhd_obj.group(2)
156            lhd_median_check_content = lhd_obj.group(3)
157            rhd_median_check_content = rhd_obj.group(3)
158            lhd_end_check_content = lhd_obj.group(4)
159            rhd_end_check_content = rhd_obj.group(4)
160            if lhd_start_check_content != rhd_start_check_content or \
161                lhd_err_func_check_content != rhd_err_func_check_content or \
162                lhd_median_check_content != rhd_median_check_content or \
163                lhd_end_check_content != rhd_end_check_content:
164                print_failure(f"[ERROR] actual: {result_line}")
165                print_failure(f"[ERROR] expect: {target_line}")
166                return False
167
168        return True
169
170    def run_fail(self):
171        status, result = exec_command(self.command)
172        expected_fail_output = ""
173        with open(os.path.join(self.target_dir, "fail_output.txt"), 'r') as target_output:
174            expected_fail_output = target_output.read()
175
176        if status != 0 and self.hdi_gen_fail_check_ignore_line(result, expected_fail_output):
177            return True
178        return False
179
180    def remove_output(self):
181        if os.path.exists(self.output_dir):
182            shutil.rmtree(self.output_dir)
183
184    def test(self):
185        print_success("[ RUN       ] {}".format(self.name))
186        start_time = get_time_stamp()
187        result = self.run()
188        end_time = get_time_stamp()
189
190        if result:
191            print_success("[        OK ] {} ({}ms)".format(self.name, end_time - start_time))
192        else:
193            print_failure("[    FAILED ] {} ({}ms)".format(self.name, end_time - start_time))
194        return result
195