1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 #
5 # Copyright (c) 2023 Huawei Device Co., Ltd.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #     http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 
18 
19 import os
20 import re
21 import shutil
22 import sys
23 import subprocess
24 import time
25 import copy
26 import queue
27 import select
28 import pty
29 import pytest
30 
31 from datetime import datetime, timedelta
32 
33 sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "mylogger.py"))
34 from mylogger import get_logger, parse_json
35 
36 Log = get_logger("build_option")
37 current_file_path = os.path.abspath(__file__)
38 script_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
39 log_info = Log.info
40 log_error = Log.error
41 
42 config = parse_json()
43 if not config:
44     log_error("config file: build_example.json not exist")
45     sys.exit(0)
46 
47 out_dir = os.path.join(script_path, "out")
48 exclude = config.get("build_option").get("exclude")
49 try:
50     if os.path.exists(out_dir):
51         for tmp_dir in os.listdir(out_dir):
52             if tmp_dir in exclude:
53                 continue
54             if os.path.isdir(os.path.join(out_dir, tmp_dir)):
55                 shutil.rmtree(os.path.join(out_dir, tmp_dir))
56             else:
57                 os.remove(os.path.join(out_dir, tmp_dir))
58 except Exception as err:
59     log_error(err)
60 
61 
62 @pytest.fixture()
63 def init_build_env():
64     def find_top_dir():
65         cur_dir = os.getcwd()
66         while cur_dir != "/":
67             build_scripts = os.path.join(
68                 cur_dir, 'build/scripts/build_package_list.json')
69             if os.path.exists(build_scripts):
70                 return cur_dir
71             cur_dir = os.path.dirname(cur_dir)
72 
73     os.chdir(find_top_dir())
74     subprocess.run(['repo', 'forall', '-c', 'git reset --hard'],
75                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
76     subprocess.run(['repo', 'forall', '-c', 'git clean -dfx'],
77                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
78 
79 
80 class TestBuildOption:
81     FLAGS = {"gn": {"pattern": r"Excuting gn command", "flag": False},
82              "done": {"pattern": r"Done\. Made \d+ targets from \d+ files in \d+ms", "flag": False},
83              "ninja": {"pattern": r"Excuting ninja command", "flag": False},
84              "success": {"pattern": r"=====build  successful=====", "flag": False}
85              }
86 
87     try:
88         LOG_PATH = script_path + config.get("build_option").get("log_path")
89         CMD = script_path + config.get("build_option").get("common_cmd")
90         NINJIA_CMD = script_path + config.get("build_option").get("ninjia_cmd")
91         TIMEOUT = int(config.get("build_option").get("exec_timeout"))
92         TIME_OVER = int(config.get("build_option").get("file_time_intever"))
93         COMMAND_TYPE = config.get("build_option").get("cmd_type")
94         PTYFLAG = True if config.get("build_option").get("ptyflag").lower() == "true" else False
95         select_timeout = float(config.get("build_option").get("select_timeout"))
96         log_info("TIMEOUT:{}".format(TIMEOUT))
97         log_info("COMMAND_TYPE:{}".format(COMMAND_TYPE))
98         log_info("TIME_OVER:{}".format(TIME_OVER))
99         log_info("PTYFLAG:{}".format(PTYFLAG))
100         log_info("select_timeout:{}".format(select_timeout))
101     except Exception as err:
102         log_error("build_example.json has error")
103         log_error(err)
104         raise err
105 
106     @staticmethod
107     def exec_command_communicate(cmd, timeout=60):
108         try:
109             log_info("communicate_exec cmd is :{}".format(" ".join(cmd)))
110             proc = subprocess.Popen(
111                 cmd,
112                 stdout=subprocess.PIPE,
113                 stderr=subprocess.PIPE,
114                 encoding="utf-8",
115                 universal_newlines=True,
116                 errors='ignore',
117                 cwd=script_path
118             )
119             out, err_ = proc.communicate(timeout=timeout)
120             out_res = out.splitlines() + err_.splitlines()
121             return out_res, proc.returncode
122         except Exception as errs:
123             log_error("An error occurred: {}".format(errs))
124             raise Exception("exec cmd time out,communicate")
125 
126     @staticmethod
127     def resolve_res(cmd_res, flag_res):
128         for line_count, line in enumerate(cmd_res):
129             for flag_name, value in flag_res.items():
130                 re_match = re.search(value["pattern"], line)
131                 if re_match:
132                     log_info("【match success {}】:{}\n".format(line_count, line))  # 输出命令终端的显示
133                     if len(re_match.groups()) > 0:
134                         if isinstance(flag_res[flag_name]["flag"], bool):
135                             flag_res[flag_name]["flag"] = [re_match.group(1)]
136                         else:
137                             data = flag_res[flag_name]["flag"]
138                             data.append(re_match.group(1))
139                             flag_res[flag_name]["flag"] = data
140                     else:
141                         flag_res[flag_name]["flag"] = True
142         return flag_res
143 
144     @staticmethod
145     def check_flags(flags, expect_dict=None, returncode=0):
146         new_dict = copy.deepcopy(flags)
147         if returncode != 0:
148             log_error("returncode != 0")
149             return 1
150         if expect_dict:
151             error_count = 0
152             for k in expect_dict.keys():
153                 flags.pop(k)
154                 if k in new_dict and new_dict[k]["flag"] != expect_dict[k]:
155                     error_count += 1
156             if error_count != 0:
157                 log_error("【actual_result】:{}\n".format(new_dict))
158                 return 1
159         check_li = [item for item in flags.values() if not item["flag"]]
160         log_info("【expect_result】:{}\n".format(expect_dict))
161         log_info("【actual_result】:{}\n".format(new_dict))
162         if len(check_li) > 0:
163             return 1
164         return 0
165 
166     @staticmethod
167     def is_exist(path):
168         if os.path.exists(path):
169             return True
170         return False
171 
172     @staticmethod
173     def same_element(list1, list2):
174         for el in list1:
175             if el not in list2:
176                 return False
177         return True
178 
179     @staticmethod
180     def print_error_line(cmd_res, is_success=False):
181         if is_success:
182             for ind, line in enumerate(cmd_res):
183                 log_info("【{}】:{}".format(ind, line))
184         else:
185             for ind, line in enumerate(cmd_res):
186                 log_error("【{}】:{}".format(ind, line))
187 
188     @staticmethod
189     def get_build_only_gn_flags(para_value):
190         flags = copy.deepcopy(TestBuildOption.FLAGS)
191         expect_dict = {}
192 
193         if para_value.lower() == "true":
194             expect_dict["ninja"] = False
195         else:
196             expect_dict["ninja"] = True
197 
198         return flags, expect_dict
199 
200     @staticmethod
201     def get_ccache_flags(para_value):
202         flags = copy.deepcopy(TestBuildOption.FLAGS)
203         expect_dict = {}
204         flags["ccache"] = {"pattern": r"Excuting gn command.*ohos_build_enable_ccache=true", "flag": False}
205 
206         if para_value.lower() == "true":
207             expect_dict["ccache"] = True
208         else:
209             expect_dict["ccache"] = False
210 
211         return flags, expect_dict
212 
213     @staticmethod
214     def get_target_cpu_flags(para_value):
215         flags = copy.deepcopy(TestBuildOption.FLAGS)
216         expect_dict = {"loader": True}
217 
218         flags["loader"] = {"pattern": r"loader args.*'target_cpu={}".format(para_value), "flag": False}
219 
220         return flags, expect_dict
221 
222     @staticmethod
223     def get_rename_last_log_flags(para_value):
224         flags = copy.deepcopy(TestBuildOption.FLAGS)
225         expect_dict = {}
226         return flags, expect_dict
227 
228     @staticmethod
229     def get_enable_pycache_flags(para_value):
230         flags = copy.deepcopy(TestBuildOption.FLAGS)
231         expect_dict = {}
232         if para_value.lower() == "true":
233             expect_dict["pycache"] = True
234         else:
235             expect_dict["pycache"] = False
236         flags["pycache"] = {"pattern": r"Starting pycache daemon at", "flag": False}
237         flags["os_level"] = {"pattern": r"loader args.*os_level=([a-zA-Z]+)\'", "flag": False}
238         flags["root_dir"] = {"pattern": r"""loader args.*source_root_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
239         flags["gn_dir"] = {"pattern": r"""loader args.*gn_root_out_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
240         flags["start_end_time"] = {"pattern": r"(\d+-\d+-\d+ \d+:\d+:\d+)", "flag": False}
241         flags["cost_time"] = {"pattern": r"Cost time:.*(\d+:\d+:\d+)", "flag": False}
242         return flags, expect_dict
243 
244     @staticmethod
245     def get_build_target_flags(para_value):
246         flags = copy.deepcopy(TestBuildOption.FLAGS)
247         flags["use_thin"] = {"pattern": r"Excuting gn command.*use_thin_lto=false.*", "flag": False}
248         flags["ninja_build_target"] = {"pattern": r"Excuting ninja command.*{}$".format(para_value), "flag": False}
249         expect_dict = {}
250         test_target_list = ['build_all_test_pkg', 'package_testcase', 'package_testcase_mlf']
251 
252         if para_value.endswith('make_test') or para_value.split(':')[-1] in test_target_list:
253             expect_dict["use_thin"] = True
254             expect_dict["ninja_build_target"] = True
255         else:
256             expect_dict["use_thin"] = False
257             expect_dict["ninja_build_target"] = True
258         return flags, expect_dict
259 
260     @staticmethod
261     def get_ninja_args_flags(para_value):
262         flags = copy.deepcopy(TestBuildOption.FLAGS)
263         expect_dict = {"ninja_args": True}
264         flags["ninja_args"] = {"pattern": r"Excuting ninja command.*{}".format(para_value), "flag": False}
265 
266         return flags, expect_dict
267 
268     @staticmethod
269     def get_full_compilation_flags(para_value):
270         flags = copy.deepcopy(TestBuildOption.FLAGS)
271         flags["full_compilation_gn"] = {"pattern": r"Excuting gn command.*use_thin_lto=false.*", "flag": False}
272         flags["full_compilation_ninja"] = {"pattern": r"Excuting ninja command.*make_all make_test$", "flag": False}
273         expect_dict = {}
274 
275         if para_value in ["", "True"]:
276             expect_dict["full_compilation_gn"] = True
277             expect_dict["full_compilation_ninja"] = True
278         else:
279             expect_dict["full_compilation_gn"] = False
280             expect_dict["full_compilation_ninja"] = False
281 
282         return flags, expect_dict
283 
284     @staticmethod
285     def get_strict_mode_flags(para_value):
286         flags = copy.deepcopy(TestBuildOption.FLAGS)
287         expect_dict = {}
288 
289         return flags, expect_dict
290 
291     @staticmethod
292     def get_scalable_build_flags(para_value):
293         flags = copy.deepcopy(TestBuildOption.FLAGS)
294         expect_dict = {}
295 
296         return flags, expect_dict
297 
298     @staticmethod
299     def get_build_example_flags(para_value):
300         flags = copy.deepcopy(TestBuildOption.FLAGS)
301         build_example_file_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
302             os.path.abspath(__file__)))), "subsystem_config_example.json")
303         flags["build_example"] = {
304             "pattern": r"loader args.*example_subsystem_file=.*{}.*".format(build_example_file_path), "flag": False}
305         expect_dict = {}
306 
307         if para_value.lower() == "true":
308             expect_dict["build_example"] = True
309         else:
310             expect_dict["build_example"] = False
311 
312         return flags, expect_dict
313 
314     @staticmethod
315     def get_build_platform_name_flags(para_value):
316         flags = copy.deepcopy(TestBuildOption.FLAGS)
317         expect_dict = {}
318 
319         if para_value == "phone":
320             flags["build_platform"] = {
321                 "pattern": r"loader args.*build_platform_name=phone", "flag": False}
322             expect_dict["build_platform"] = True
323 
324         return flags, expect_dict
325 
326     @staticmethod
327     def get_build_xts_flags(para_value):
328         flags = copy.deepcopy(TestBuildOption.FLAGS)
329         expect_dict = {}
330 
331         flags["build_xts"] = {"pattern": r"loader args.*build_xts={}.*".format(para_value.capitalize()), "flag": False}
332 
333         return flags, expect_dict
334 
335     @staticmethod
336     def get_ignore_api_check_flags(para_value):
337         flags = copy.deepcopy(TestBuildOption.FLAGS)
338         expect_dict = {}
339 
340         if para_value == "":
341             flags["ignore_api_check"] = {"pattern": r"loader args.*ignore_api_check=\['xts', 'common', 'testfwk'\]",
342                                          "flag": False}
343         else:
344             flags["ignore_api_check"] = {
345                 "pattern": r"loader args.*ignore_api_check=(.*)\",",
346                 "flag": False}
347 
348         return flags, expect_dict
349 
350     @staticmethod
351     def get_load_test_config_flags(para_value):
352         flags = copy.deepcopy(TestBuildOption.FLAGS)
353         expect_dict = {}
354 
355         flags["load_test"] = {"pattern": r"loader args.*load_test_config={}.*".format(para_value.capitalize()),
356                               "flag": False}
357 
358         return flags, expect_dict
359 
360     @staticmethod
361     def get_build_type_flags(para_value):
362         flags = copy.deepcopy(TestBuildOption.FLAGS)
363         expect_dict = {}
364         flags["build_type_debug"] = {"pattern": r"Excuting gn command.*is_debug=true",
365                                      "flag": False}
366         flags["build_type_profile"] = {"pattern": r"Excuting gn command.*is_profile=true",
367                                        "flag": False}
368         flags["build_type_none"] = {
369             "pattern": r'Excuting gn command.*ohos_build_type=\\"debug\\"',
370             "flag": False}
371 
372         if para_value == "debug":
373             expect_dict["build_type_debug"] = True
374             expect_dict["build_type_profile"] = False
375             expect_dict["build_type_none"] = True
376         elif para_value == "profile":
377             expect_dict["build_type_debug"] = False
378             expect_dict["build_type_profile"] = True
379             expect_dict["build_type_none"] = True
380         else:
381             expect_dict["build_type_debug"] = False
382             expect_dict["build_type_profile"] = False
383             expect_dict["build_type_none"] = True
384 
385         return flags, expect_dict
386 
387     @staticmethod
388     def get_log_level_flags(para_value):
389         flags = copy.deepcopy(TestBuildOption.FLAGS)
390 
391         flags["tracelog"] = {"pattern": r"Excuting gn command.*--tracelog=.*/gn_trace.log.*--ide=json", "flag": False}
392         flags["ninja_v"] = {"pattern": r"Excuting ninja command.*-v.*", "flag": False}
393         expect_dict = {}
394 
395         if para_value == "info":
396             expect_dict["tracelog"] = False
397             expect_dict["ninja_v"] = False
398         elif para_value == "debug":
399             expect_dict["tracelog"] = True
400             expect_dict["ninja_v"] = True
401 
402         return flags, expect_dict
403 
404     @staticmethod
405     def get_test_flags(para_value):
406         flags = copy.deepcopy(TestBuildOption.FLAGS)
407         expect_dict = {}
408         flags["notest"] = {"pattern": r'Excuting gn command.*ohos_test_args=\\"notest\\"',
409                            "flag": False}
410         flags["xts"] = {"pattern": r'Excuting gn command.*ohos_xts_test_args=\\"xxx\\"',
411                         "flag": False}
412 
413         if para_value == "":
414             expect_dict["notest"] = False
415             expect_dict["xts"] = False
416         elif para_value == "notest xxx":
417             expect_dict["notest"] = True
418             expect_dict["xts"] = False
419         elif para_value in ["xts xxx", "xxx xts"]:
420             expect_dict["notest"] = False
421             expect_dict["xts"] = True
422         elif para_value == "xxx ccc":
423             expect_dict["notest"] = False
424             expect_dict["xts"] = False
425 
426         return flags, expect_dict
427 
428     @staticmethod
429     def get_gn_args_flags(para_value):
430         flags = copy.deepcopy(TestBuildOption.FLAGS)
431         expect_dict = {}
432 
433         flags["device_type"] = {
434             "pattern": r'Excuting gn command.*device_type=\\"default\\"', "flag": False}
435         flags["build_variant"] = {
436             "pattern": r'Excuting gn command.*build_variant=\\"root\\"', "flag": False}
437         flags["para"] = {
438             "pattern": r'Excuting gn command.*{}'.format(para_value), "flag": False}
439 
440         return flags, expect_dict
441 
442     @staticmethod
443     def get_fast_rebuild_flags(para_value):
444         flags = copy.deepcopy(TestBuildOption.FLAGS)
445         expect_dict = {}
446 
447         if para_value.lower() == "true" or para_value == "":
448             expect_dict["gn"] = False
449             expect_dict["done"] = False
450         return flags, expect_dict
451 
452     @staticmethod
453     def get_skip_partlist_check_flags(para_value):
454         flags = copy.deepcopy(TestBuildOption.FLAGS)
455         expect_dict = {}
456         partlist_flag = True if para_value.lower() == "true" else False
457         flags["partlist"] = {"pattern": r"loader args.*skip_partlist_check={}".format(partlist_flag), "flag": False}
458         return flags, expect_dict
459 
460     @staticmethod
461     def get_deps_guard_flags(para_value):
462         flags = copy.deepcopy(TestBuildOption.FLAGS)
463         expect_dict = {}
464         flags["os_level"] = {"pattern": r"loader args.*os_level=([a-zA-Z]+)\'", "flag": False}
465         return flags, expect_dict
466 
467     @staticmethod
468     def get_compute_overlap_rate_flags(para_value):
469         flags = copy.deepcopy(TestBuildOption.FLAGS)
470         flags["c_targets"] = {"pattern": r"c targets overlap rate statistics", "flag": False}
471         flags["c_overall"] = {"pattern": r"c overall build overlap rate", "flag": False}
472         expect_dict = {}
473 
474         if para_value.lower() in ("true", ""):
475             expect_dict["c_targets"] = True
476             expect_dict["c_overall"] = True
477         else:
478             expect_dict["c_targets"] = False
479             expect_dict["c_overall"] = False
480         return flags, expect_dict
481 
482     @staticmethod
483     def get_stat_ccache_flags(para_value):
484         flags = copy.deepcopy(TestBuildOption.FLAGS)
485         expect_dict = {}
486         flags["ccache_dir"] = {"pattern": r"ccache_dir =.*, ccache_exec =.*", "flag": False}
487         flags["ccache_summary"] = {"pattern": r"ccache summary", "flag": False}
488 
489         if para_value.lower() in ("true", ""):
490             expect_dict["ccache_dir"] = True
491             expect_dict["ccache_summary"] = True
492         else:
493             expect_dict["ccache_dir"] = False
494             expect_dict["ccache_summary"] = False
495 
496         return flags, expect_dict
497 
498     @staticmethod
499     def get_keep_ninja_going_flags(para_value):
500         flags = copy.deepcopy(TestBuildOption.FLAGS)
501         expect_dict = {}
502         if para_value.lower() == "true":
503             flags.setdefault("ninja", {}).setdefault("pattern", r"Excuting ninja command.*-k1000000.*")
504 
505         return flags, expect_dict
506 
507     @staticmethod
508     def get_common_flags(para_value, check_file=False):
509         flags = copy.deepcopy(TestBuildOption.FLAGS)
510         expect_dict = {}
511         if check_file:
512             flags["os_level"] = {"pattern": r"loader args.*os_level=([a-zA-Z]+)\'", "flag": False}
513             flags["root_dir"] = {"pattern": r"""loader args.*source_root_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
514             flags["gn_dir"] = {"pattern": r"""loader args.*gn_root_out_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
515             flags["start_end_time"] = {"pattern": r"(\d+-\d+-\d+ \d+:\d+:\d+)", "flag": False}
516             flags["cost_time"] = {"pattern": r"Cost time:.*(\d+:\d+:\d+)", "flag": False}
517         return flags, expect_dict
518 
519     @staticmethod
520     def check_file_res(resolve_result, file_list, is_real_path=False, time_over=TIME_OVER):
521         root_dir = resolve_result["root_dir"]["flag"][0]
522         gn_dir = resolve_result["gn_dir"]["flag"][0]
523         start_time_str = resolve_result["start_end_time"]["flag"][0]
524         end_time_str = resolve_result["start_end_time"]["flag"][-1]
525 
526         start_time = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
527         end_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")
528 
529         start_timestamp = int(datetime.timestamp(start_time))
530         end_timestamp = int(datetime.timestamp(end_time))
531 
532         file_list_new = []
533         for tmp_file in file_list:
534             real_path = tmp_file if is_real_path else os.path.join(root_dir, gn_dir, tmp_file)
535             if os.path.exists(real_path):
536                 file_list_new.append(real_path)
537         if not file_list_new:
538             log_info("all file {} not exist".format(file_list))
539             return True
540         file_timestamp_li = {tmp_file: int(os.stat(tmp_file).st_mtime) for tmp_file in file_list_new}
541 
542         cost_time_str = resolve_result["cost_time"]["flag"][0]
543         cost_time = datetime.strptime(cost_time_str, "%H:%M:%S")
544         cost_time_int = timedelta(hours=cost_time.hour, minutes=cost_time.minute, seconds=cost_time.second)
545         total_seconds = int(cost_time_int.total_seconds())
546         new_start_timestamp = end_timestamp - total_seconds
547         log_info("log_cost_time:{}s".format(total_seconds))
548         log_info("start_timestamp:{}".format(start_timestamp))
549         log_info("new_start_timestamp:{}".format(new_start_timestamp))
550         log_info("end_timestamp:{}".format(end_timestamp))
551         file_flag = False
552         file_tmp_flag_li = []
553 
554         for file_tmp, file_timestamp in file_timestamp_li.items():
555             log_info("{}:{}".format(file_tmp, file_timestamp))
556             file_tmp_flag = new_start_timestamp - time_over <= file_timestamp <= end_timestamp + time_over
557             file_tmp_flag_li.append(file_tmp_flag)
558 
559         if all(file_tmp_flag_li):
560             file_flag = True
561 
562         return file_flag
563 
564     @pytest.mark.parametrize('cpu_para', ['arm', 'arm64', 'x86_64'])
565     def test_target_cpu(self, cpu_para):
566         """
567         test target_cpu parameter
568         """
569         cmd = self.CMD.format('--target-cpu', cpu_para).split()
570 
571         result = self.get_match_result(cmd, "target_cpu", cpu_para)
572 
573         assert result == 0, "target cpu para {} failed".format(cpu_para)
574 
575     @pytest.mark.parametrize('ccache_para', ['True', 'False'])
576     def test_ccache(self, ccache_para):
577         """
578         test ccache_para parameter
579         """
580         cmd = self.CMD.format('--ccache', ccache_para).split()
581 
582         result = self.get_match_result(cmd, "ccache", ccache_para)
583 
584         assert result == 0, "ccache para {} failed".format(ccache_para)
585 
586     @pytest.mark.parametrize('rename_last_log_para', ['True', 'False'])
587     def test_rename_last_log(self, rename_last_log_para):
588         """
589         test rename_last_log parameter
590         """
591         cmd = self.CMD.format('--rename-last-log', rename_last_log_para).split()
592         mtime = ""
593         file_name = ""
594 
595         if self.is_exist(self.LOG_PATH):
596             mtime = os.stat(self.LOG_PATH).st_mtime
597             file_name = '{}/build.{}.log'.format(self.LOG_PATH, mtime)
598         log_info("test_rename_last_log,file name is {}".format(file_name))
599         result = self.get_match_result(cmd, "rename_last_log", rename_last_log_para)
600         new_path = os.path.join(os.path.dirname(self.LOG_PATH), "build.{}.log".format(mtime))
601         log_info("test_rename_last_log,new path is {}".format(new_path))
602 
603         if rename_last_log_para == 'True':
604             assert self.is_exist(new_path) and result == 0, "rename_last_log para {} failed".format(
605                 rename_last_log_para)
606         elif rename_last_log_para == 'False':
607             assert not self.is_exist(new_path) and result == 0, "rename_last_log para {} failed".format(
608                 rename_last_log_para)
609 
610     @pytest.mark.parametrize('build_target', ['', 'package_testcase'])
611     def test_build_target(self, build_target):
612         """
613         test build_target parameter
614         """
615         cmd = self.CMD.format('--build-target', build_target).split()
616 
617         result = self.get_match_result(cmd, "build_target", build_target)
618 
619         assert result == 0, "build target para {} failed".format(build_target)
620 
621     @pytest.mark.parametrize('ninja_args', ['-dkeeprsp'])
622     def test_ninja_args(self, ninja_args):
623         """
624         test ninja_args parameter
625         """
626         cmd = self.NINJIA_CMD.format(ninja_args).split()
627 
628         result = self.get_match_result(cmd, "ninja_args", ninja_args)
629 
630         assert result == 0, "ninja args para {} failed".format(ninja_args)
631 
632     @pytest.mark.parametrize('full_compilation', ['True', 'False', ''])
633     def test_full_compilation(self, full_compilation):
634         """
635         test full_compilation parameter
636         """
637         cmd = self.CMD.format('--full-compilation', full_compilation).split()
638 
639         result = self.get_match_result(cmd, "full_compilation", full_compilation)
640 
641         assert result == 0, "full compilation para {} failed".format(full_compilation)
642 
643     @pytest.mark.parametrize('strict_mode', ['True', 'False', 'false'])
644     def test_strict_mode(self, strict_mode):
645         """
646         test strict_mode parameter
647         """
648         cmd = self.CMD.format('--strict-mode', strict_mode).split()
649 
650         result = self.get_match_result(cmd, "strict_mode", strict_mode)
651 
652         assert result == 0, "strict mode para {} failed".format(strict_mode)
653 
654     @pytest.mark.parametrize('scalable_build', ['True', 'False', 'false'])
655     def test_scalable_build(self, scalable_build):
656         """
657         test scalable_build parameter
658         """
659         cmd = self.CMD.format('--scalable-build', scalable_build).split()
660 
661         result = self.get_match_result(cmd, "scalable_build", scalable_build)
662 
663         assert result == 0, "scalable build para {} failed".format(scalable_build)
664 
665     @pytest.mark.parametrize('build_example', ['True', 'False', 'true', 'false'])
666     def test_build_example(self, build_example):
667         """
668         test build_example parameter
669         """
670         cmd = self.CMD.format('--build-example', build_example).split()
671 
672         result = self.get_match_result(cmd, "build_example", build_example)
673 
674         assert result == 0, "build example para {} failed".format(build_example)
675 
676     @pytest.mark.parametrize('build_platform_name', ['phone'])
677     def test_build_platform_name(self, build_platform_name):
678         """
679         test build_platform_name parameter
680         """
681         cmd = self.CMD.format('--build-platform-name', build_platform_name).split()
682 
683         result = self.get_match_result(cmd, "build_platform_name", build_platform_name)
684 
685         assert result == 0, "build platform name para {} failed".format(build_platform_name)
686 
687     @pytest.mark.parametrize('build_xts', ['True', 'False', 'true', 'false'])
688     def test_build_xts(self, build_xts):
689         """
690         test build_xts parameter
691         """
692         cmd = self.CMD.format('--build-xts', build_xts).split()
693 
694         result = self.get_match_result(cmd, "build_xts", build_xts)
695 
696         assert result == 0, "build xts para {} failed".format(build_xts)
697 
698     @pytest.mark.parametrize('ignore_api_check', ['common xts', ''])
699     def test_ignore_api_check(self, ignore_api_check):
700         """
701         test ignore_api_check parameter
702         """
703         para_list = ignore_api_check.split()
704         cmd = self.CMD.format('--ignore-api-check', ignore_api_check).split()
705         resolve_result, result, _ = self.get_common_spec_result(ignore_api_check, cmd,
706                                                                 para_type="ignore_api_check")
707         if result != 0:
708             assert result == 0, "ignore api check para {} failed".format(ignore_api_check)
709         else:
710             if ignore_api_check:
711                 ignore_str = resolve_result.get("ignore_api_check").get("flag")[0]  # ['xts', 'common']
712                 log_info("ignore_str is {}".format(ignore_str))
713                 ignor_li = eval(ignore_str)
714                 log_info("ignor_li is {0},type is {1}".format(ignor_li, type(ignor_li)))
715                 assert self.same_element(para_list, ignor_li) and result == 0, "ignore api check para {} failed".format(
716                     ignore_api_check)
717 
718     @pytest.mark.parametrize('load_test_config', ['True', 'False', 'true', 'false'])
719     def test_load_test_config(self, load_test_config):
720         """
721         test load_test_config parameter
722         """
723         cmd = self.CMD.format('--load-test-config', load_test_config).split()
724 
725         result = self.get_match_result(cmd, "load_test_config", load_test_config)
726 
727         assert result == 0, "load test config para {} failed".format(load_test_config)
728 
729     @pytest.mark.parametrize('build_type', ['debug', 'release', 'profile'])
730     def test_build_type(self, build_type):
731         """
732         test build_type parameter
733         """
734         cmd = self.CMD.format('--build-type', build_type).split()
735         result = self.get_match_result(cmd, "build_type", build_type)
736 
737         assert result == 0, "build type para {} failed".format(build_type)
738 
739     @pytest.mark.parametrize('log_level', ['info', 'debug'])
740     def test_log_level(self, log_level):
741         """
742         test log_level parameter
743         """
744         cmd = self.CMD.format('--log-level', log_level).split()
745 
746         result = self.get_match_result(cmd, "log_level", log_level)
747 
748         assert result == 0, "log level para {} failed".format(log_level)
749 
750     @pytest.mark.parametrize('build_only_gn', ['True', 'False'])
751     def test_build_only_gn(self, build_only_gn):
752         """
753         test build_only_gn parameter
754         """
755         cmd = self.CMD.format('--build-only-gn', build_only_gn).split()
756 
757         result = self.get_match_result(cmd, "build_only_gn", build_only_gn)
758 
759         assert result == 0, "build only gn para {} failed".format(build_only_gn)
760 
761     @pytest.mark.parametrize('test', ['', 'notest xxx', 'xts xxx', 'xxx xts'])
762     def test_test(self, test):
763         """
764         test test parameter
765         """
766         cmd = self.CMD.format('--test', test).split()
767 
768         result = self.get_match_result(cmd, "test", test)
769 
770         assert result == 0, "test para {} failed".format(test)
771 
772     @pytest.mark.parametrize('gn_args', ['', 'is_debug=true'])
773     def test_gn_args(self, gn_args):
774         """
775         test gn_args parameter
776         """
777         cmd = self.CMD.format('--gn-args', gn_args).split()
778 
779         result = self.get_match_result(cmd, "gn_args", gn_args)
780 
781         assert result == 0, "gn args para {} failed".format(gn_args)
782 
783     @pytest.mark.parametrize('fast_rebuild', ['True', 'False', ''])
784     def test_fast_rebuild(self, fast_rebuild):
785         """
786         test fast_rebuild parameter
787         """
788         cmd = self.CMD.format('--fast-rebuild', fast_rebuild).split()
789 
790         result = self.get_match_result(cmd, "fast_rebuild", fast_rebuild)
791 
792         assert result == 0, "fast rebuild para {} failed".format(fast_rebuild)
793 
794     @pytest.mark.parametrize('going_option', ['True', 'False'])
795     def test_keep_ninja_going(self, going_option):
796         """
797         test keep_ninja_going parameter
798         """
799         cmd = self.CMD.format('--keep-ninja-going', going_option).split()
800 
801         result = self.get_match_result(cmd, "keep_ninja_going", going_option)
802 
803         assert result == 0, "keep_ninja_going para {} failed".format(going_option)
804 
805     @pytest.mark.parametrize('variant_option', ['user', 'root'])
806     def test_build_variant(self, variant_option):
807         """
808         test build_variant parameter
809         """
810         cmd = self.CMD.format('--build-variant', variant_option).split()
811 
812         resolve_result, result, _ = self.get_common_spec_result(variant_option, cmd)
813         if result != 0:
814             assert result == 0, "build_variant para {} failed".format(variant_option)
815         else:
816             root_dir = resolve_result.get("root_dir").get("flag")[0]
817             gn_dir = resolve_result.get("gn_dir").get("flag")[0]
818 
819             ohos_para_path = "packages/phone/system/etc/param/ohos.para"
820             if os.path.exists(os.path.join(root_dir, gn_dir, ohos_para_path)):
821                 check_file_li = [ohos_para_path]
822                 check_file_flag = self.check_file_res(resolve_result, check_file_li)
823                 assert result == 0 and check_file_flag, "build_variant para {} failed".format(variant_option)
824             else:
825                 assert result == 0, "build_variant para {} failed".format(variant_option)
826 
827     @pytest.mark.parametrize('device_option', ['default', 'unkown'])
828     def test_device_type(self, device_option):
829         """
830         test device_type parameter
831         """
832         cmd = self.CMD.format('--device-type', device_option).split()
833 
834         resolve_result, result, _ = self.get_common_spec_result(device_option, cmd)
835         if result != 0:
836             if device_option == "unkown":
837                 assert result == 1, "device_type para {} failed".format(device_option)
838             else:
839                 assert result == 0, "device_type para {} failed".format(device_option)
840 
841         else:
842             if device_option == "default":
843                 assert result == 0, "device_type para {} failed".format(device_option)
844             else:
845                 check_file_li = ["packages/phone/system/etc/param/ohos.para"]
846                 check_file_flag = self.check_file_res(resolve_result, check_file_li)
847                 assert result == 0 and check_file_flag, "device_type para {} failed".format(device_option)
848 
849     @pytest.mark.parametrize('archive_option', ['True', 'False'])
850     def test_archive_image(self, archive_option):
851         """
852         test archive_image parameter
853         """
854         cmd = self.CMD.format('--archive-image', archive_option).split()
855 
856         resolve_result, result, cmd_res = self.get_common_spec_result(archive_option, cmd)
857         if result != 0:
858             assert result == 0, "archive_image para {} failed".format(archive_option)
859         else:
860             root_dir = resolve_result.get("root_dir").get("flag")[0]
861             gn_dir = resolve_result.get("gn_dir").get("flag")[0]
862             image_path = os.path.join("packages", "phone", "images")
863             if archive_option.lower() == "true":
864                 if os.path.exists(os.path.join(root_dir, gn_dir, image_path)):
865                     check_file_li = ["images.tar.gz"]
866                     check_file_flag = self.check_file_res(resolve_result, check_file_li)
867                     assert result == 0 and check_file_flag, "archive_image para {} failed".format(
868                         archive_option)
869                 else:
870                     archive_flags = {"archive_image": {"pattern": r'"--archive-image" option not work', "flag": False}}
871                     archive_resolve_result = self.resolve_res(cmd_res, archive_flags)
872                     archive_result = self.check_flags(archive_resolve_result)
873                     assert result == 0 and archive_result == 0, "archive_image para {} failed".format(archive_option)
874             else:
875                 assert result == 0, "archive_image para {} failed".format(archive_option)
876 
877     @pytest.mark.parametrize('rom_option', ['True', 'False'])
878     def test_rom_size_statistics(self, rom_option):
879         """
880         test rom_size_statistics parameter
881         """
882         cmd = self.CMD.format('--rom-size-statistics', rom_option).split()
883 
884         resolve_result, result, _ = self.get_common_spec_result(rom_option, cmd, ptyflag=True)
885         if result != 0:
886             assert result == 0, "rom_size_statistics para {} failed".format(rom_option)
887         else:
888             os_level = resolve_result.get("os_level").get("flag")[0]
889             log_info("os_level:{}".format(os_level))
890             if os_level in ("mini", "small"):
891                 assert result == 0, "rom_size_statistics para {} failed".format(rom_option)
892             else:
893                 check_file_li = ["rom_statistics_table.json"]
894                 check_file_flag = self.check_file_res(resolve_result, check_file_li)
895                 if rom_option.lower() == "false":
896                     assert result == 0 and not check_file_flag, "rom_option para {} failed".format(
897                         rom_option)
898                 else:
899                     assert result == 0 and check_file_flag, "rom_option para {} failed".format(rom_option)
900 
901     @pytest.mark.parametrize('ccache_option', ['True', 'False'])
902     def test_stat_ccache(self, ccache_option):
903         """
904         test stat_ccache parameter
905         """
906         cmd = self.CMD.format('--stat-ccache', ccache_option).split()
907 
908         result = self.get_match_result(cmd, "stat_ccache", ccache_option)
909 
910         assert result == 0, "stat_ccache para {} failed".format(ccache_option)
911 
912     @pytest.mark.parametrize('warning_option', ['True', 'False'])
913     def test_get_warning_list(self, warning_option):
914         """
915         test get_warning_list parameter
916         """
917         cmd = self.CMD.format('--get-warning-list', warning_option).split()
918         resolve_result, result, _ = self.get_common_spec_result(warning_option, cmd)
919         if result != 0:
920             assert result == 0, "get_warning_list para {} failed".format(warning_option)
921         else:
922             check_file_li = ["packages/WarningList.txt"]
923             check_file_flag = self.check_file_res(resolve_result, check_file_li)
924             if warning_option.lower() == "false":
925                 assert result == 0 and not check_file_flag, "get_warning_list para {} failed".format(
926                     warning_option)
927             else:
928                 assert result == 0 and check_file_flag, "get_warning_list para {} failed".format(warning_option)
929 
930     @pytest.mark.parametrize('ninja_option', ["True", "False", "true", "false"])
931     def test_generate_ninja_trace(self, ninja_option):
932         """
933         test generate_ninja_trace parameter
934         """
935         cmd = self.CMD.format('--generate-ninja-trace', ninja_option).split()
936         resolve_result, result, _ = self.get_common_spec_result(ninja_option, cmd)
937         if result != 0:
938             assert result == 0, "generate_ninja_trace para {} failed".format(ninja_option)
939         else:
940             check_file_li = ["build.trace.gz", "sorted_action_duration.txt"]
941             check_file_flag = self.check_file_res(resolve_result, check_file_li)
942             if ninja_option.lower() == "false":
943                 assert result == 0 and not check_file_flag, "generate_ninja_trace para {} failed".format(
944                     ninja_option)
945             else:
946                 assert result == 0 and check_file_flag, "generate_ninja_trace para {} failed".format(
947                     ninja_option)
948 
949     @pytest.mark.parametrize('overlap_option', ['True', 'False'])
950     def test_compute_overlap_rate(self, overlap_option):
951         """
952         test compute_overlap_rate parameter
953         """
954         cmd = self.CMD.format('--compute-overlap-rate', overlap_option).split()
955         result = self.get_match_result(cmd, "compute_overlap_rate", overlap_option)
956 
957         assert result == 0, "compute_overlap_rate para {} failed".format(overlap_option)
958 
959     @pytest.mark.parametrize('clean_option', ['True', 'False'])
960     def test_clean_args(self, clean_option):
961         """
962         test clean-args parameter
963         """
964         cmd = self.CMD.format('--clean-args', clean_option).split()
965         resolve_result, result, _ = self.get_common_spec_result(clean_option, cmd)
966         if result != 0:
967             assert result == 0, "clean_args para {} failed".format(clean_option)
968         else:
969             root_dir = resolve_result.get("root_dir").get("flag")[0]
970             json_path = os.path.join(root_dir, "out", "hb_args")
971             json_file_li = [file for file in os.listdir(json_path) if os.path.splitext(file)[-1] == ".json"]
972             log_info("test_clean_args, json_file_li:{}".format(json_file_li))
973             if clean_option.lower() == "false":
974                 exist_flag = bool(json_file_li)
975             else:
976                 exist_flag = not json_file_li
977 
978             assert result == 0 and exist_flag, "clean_args para {} failed".format(clean_option)
979 
980     @pytest.mark.parametrize('deps_guard_option', ['True', 'False'])
981     def test_deps_guard(self, deps_guard_option):
982         """
983         test deps-guard parameter
984         """
985         cmd = self.CMD.format('--deps-guard', deps_guard_option).split()
986         resolve_result, result, cmd_res = self.get_common_spec_result(deps_guard_option, cmd,
987                                                                       para_type="deps_guard")
988         if result != 0:
989             assert result == 0, "deps_guard para {}failed.".format(deps_guard_option)
990         else:
991             os_level = resolve_result.get("os_level").get("flag")[0]
992             log_info("test_deps_guard,os_level:{}".format(os_level))
993             if deps_guard_option.lower() == "false" and os_level == "standard":
994                 standard_flags = {"Scanning": {"pattern": r"Scanning.*ELF files now", "flag": False},
995                                   "rules": {"pattern": r"All rules passed", "flag": False}}
996                 standard_resolve_result = self.resolve_res(cmd_res, standard_flags)
997                 log_info("continue match Scanning and rules ...")
998                 standard_result = self.check_flags(standard_resolve_result)
999                 assert result == 0 and standard_result == 0, "deps_guard para {},os_level {} failed.".format(
1000                     deps_guard_option, os_level)
1001             else:
1002                 assert result == 0, "deps_guard para {},os_level {} failed.".format(deps_guard_option, os_level)
1003 
1004     @pytest.mark.parametrize('partlist_option', ['True', 'False'])
1005     def test_skip_partlist_check(self, partlist_option):
1006         """
1007         test skip-partlist-check parameter
1008         """
1009         cmd = self.CMD.format('--skip-partlist-check', partlist_option).split()
1010         result = self.get_match_result(cmd, "skip_partlist_check", partlist_option)
1011         assert result == 0, "skip_partlist_check para {} failed".format(partlist_option)
1012 
1013     @pytest.mark.parametrize('enable_pycache', ['True', 'true', 'False', 'false'])
1014     def test_enable_pycache(self, enable_pycache):
1015         """
1016         test enable_pycache parameter
1017         """
1018         cmd = self.CMD.format('--enable-pycache', enable_pycache).split()
1019 
1020         pycache_dir = os.environ.get('CCACHE_BASE')
1021         if not pycache_dir:
1022             pycache_dir = os.environ.get('HOME')
1023         pycache_config = os.path.join(pycache_dir, '.pycache', '.config')
1024         resolve_result, result, _ = self.get_common_spec_result(enable_pycache, cmd,
1025                                                                 para_type="enable_pycache", ptyflag=True)
1026         if result != 0:
1027             assert result == 0, "enable pycache para {} failed".format(enable_pycache)
1028         else:
1029             check_file_li = [pycache_config]
1030             check_file_flag = self.check_file_res(resolve_result, check_file_li, is_real_path=True)
1031 
1032             if enable_pycache.lower() == "true":
1033                 assert result == 0 and check_file_flag, "enable pycache para {} failed".format(enable_pycache)
1034             else:
1035                 assert result == 0 and not check_file_flag, "enable pycache para {} failed".format(enable_pycache)
1036 
1037     def exec_command_select(self, cmd, timeout=60, ptyflag=False):
1038         out_queue = queue.Queue()
1039         log_info("select_exec cmd is :{}".format(" ".join(cmd)))
1040         if not ptyflag:
1041             try:
1042                 proc = subprocess.Popen(
1043                     cmd,
1044                     stdout=subprocess.PIPE,
1045                     stderr=subprocess.PIPE,
1046                     encoding="utf-8",
1047                     universal_newlines=True,
1048                     errors='ignore'
1049                 )
1050                 start_time = time.time()
1051                 while True:
1052                     if timeout and time.time() - start_time > timeout:
1053                         raise Exception("exec cmd time out,select")
1054                     ready_to_read, _, _ = select.select([proc.stdout, proc.stderr], [], [], self.select_timeout)
1055                     for stream in ready_to_read:
1056                         output = stream.readline().strip()
1057                         if output:
1058                             out_queue.put(output)
1059                     if proc.poll() is not None:
1060                         break
1061                 returncode = proc.wait()
1062                 out_res = list(out_queue.queue)
1063                 return out_res, returncode
1064             except Exception as err_:
1065                 log_error("An error occurred: {}".format(err_))
1066                 raise Exception(err_)
1067         else:
1068             try:
1069                 master, slave = pty.openpty()
1070                 proc = subprocess.Popen(
1071                     cmd,
1072                     stdin=slave,
1073                     stdout=slave,
1074                     stderr=slave,
1075                     encoding="utf-8",
1076                     universal_newlines=True,
1077                     errors='ignore'
1078                 )
1079                 start_time = time.time()
1080                 incomplete_line = ""
1081                 while True:
1082                     if timeout and time.time() - start_time > timeout:
1083                         raise Exception("exec cmd time out,select")
1084                     ready_to_read, _, _ = select.select([master, ], [], [], self.select_timeout)
1085                     for stream in ready_to_read:
1086                         output_bytes = os.read(stream, 1024)
1087                         output = output_bytes.decode('utf-8')
1088                         lines = (incomplete_line + output).split("\n")
1089                         for line in lines[:-1]:
1090                             line = line.strip()
1091                             if line:
1092                                 out_queue.put(line)
1093                         incomplete_line = lines[-1]
1094                     if proc.poll() is not None:
1095                         break
1096                 returncode = proc.wait()
1097                 out_res = list(out_queue.queue)
1098                 return out_res, returncode
1099             except Exception as err_:
1100                 log_error("An error occurred: {}".format(err_))
1101                 raise Exception(err_)
1102 
1103     def get_match_result(self, cmd, para_type, para_value, ptyflag=PTYFLAG):
1104         cmd_res, returncode = self.exec_command(cmd, ptyflag=ptyflag)
1105         before_flags, expect_dict = self.get_match_flags(para_type, para_value)
1106         flag_res = self.resolve_res(cmd_res, before_flags)
1107         result = self.check_flags(flag_res, expect_dict, returncode)
1108         if result == 1:
1109             self.print_error_line(cmd_res)
1110         else:
1111             self.print_error_line(cmd_res, is_success=True)
1112         return result
1113 
1114     def get_match_flags(self, para_type, para_value):
1115         method_name = "get_{}_flags".format(para_type)
1116         if hasattr(self, method_name):
1117             method = self.__getattribute__(method_name)
1118             flags, expect_dict = method(para_value)
1119             return flags, expect_dict
1120         return None, None
1121 
1122     def get_common_spec_result(self, option, cmd, para_type=None, ptyflag=PTYFLAG):
1123         if not para_type:
1124             flag_res, expect_dict = self.get_common_flags(option, check_file=True)
1125         else:
1126             flag_res, expect_dict = self.get_match_flags(para_type, option)
1127         cmd_res, returncode = self.exec_command(cmd, ptyflag=ptyflag)
1128         resolve_result = self.resolve_res(cmd_res, flag_res)
1129         result = self.check_flags(resolve_result, expect_dict, returncode)
1130         if result == 1:
1131             self.print_error_line(cmd_res)
1132         else:
1133             self.print_error_line(cmd_res, is_success=True)
1134         return resolve_result, result, cmd_res
1135 
1136     def exec_command(self, cmd, ptyflag=PTYFLAG, timeout=TIMEOUT):
1137         if TestBuildOption.COMMAND_TYPE == "select":
1138             return self.exec_command_select(cmd, timeout=timeout, ptyflag=ptyflag)
1139         else:
1140             return self.exec_command_communicate(cmd, timeout=timeout)