1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18
19import os
20import re
21import shutil
22import sys
23import subprocess
24import time
25import copy
26import queue
27import select
28import pty
29import pytest
30
31from datetime import datetime, timedelta
32
33sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "mylogger.py"))
34from mylogger import get_logger, parse_json
35
36Log = get_logger("build_option")
37current_file_path = os.path.abspath(__file__)
38script_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
39log_info = Log.info
40log_error = Log.error
41
42config = parse_json()
43if not config:
44    log_error("config file: build_example.json not exist")
45    sys.exit(0)
46
47out_dir = os.path.join(script_path, "out")
48exclude = config.get("build_option").get("exclude")
49try:
50    if os.path.exists(out_dir):
51        for tmp_dir in os.listdir(out_dir):
52            if tmp_dir in exclude:
53                continue
54            if os.path.isdir(os.path.join(out_dir, tmp_dir)):
55                shutil.rmtree(os.path.join(out_dir, tmp_dir))
56            else:
57                os.remove(os.path.join(out_dir, tmp_dir))
58except Exception as err:
59    log_error(err)
60
61
62@pytest.fixture()
63def init_build_env():
64    def find_top_dir():
65        cur_dir = os.getcwd()
66        while cur_dir != "/":
67            build_scripts = os.path.join(
68                cur_dir, 'build/scripts/build_package_list.json')
69            if os.path.exists(build_scripts):
70                return cur_dir
71            cur_dir = os.path.dirname(cur_dir)
72
73    os.chdir(find_top_dir())
74    subprocess.run(['repo', 'forall', '-c', 'git reset --hard'],
75                   stdout=subprocess.PIPE, stderr=subprocess.PIPE)
76    subprocess.run(['repo', 'forall', '-c', 'git clean -dfx'],
77                   stdout=subprocess.PIPE, stderr=subprocess.PIPE)
78
79
80class TestBuildOption:
81    FLAGS = {"gn": {"pattern": r"Excuting gn command", "flag": False},
82             "done": {"pattern": r"Done\. Made \d+ targets from \d+ files in \d+ms", "flag": False},
83             "ninja": {"pattern": r"Excuting ninja command", "flag": False},
84             "success": {"pattern": r"=====build  successful=====", "flag": False}
85             }
86
87    try:
88        LOG_PATH = script_path + config.get("build_option").get("log_path")
89        CMD = script_path + config.get("build_option").get("common_cmd")
90        NINJIA_CMD = script_path + config.get("build_option").get("ninjia_cmd")
91        TIMEOUT = int(config.get("build_option").get("exec_timeout"))
92        TIME_OVER = int(config.get("build_option").get("file_time_intever"))
93        COMMAND_TYPE = config.get("build_option").get("cmd_type")
94        PTYFLAG = True if config.get("build_option").get("ptyflag").lower() == "true" else False
95        select_timeout = float(config.get("build_option").get("select_timeout"))
96        log_info("TIMEOUT:{}".format(TIMEOUT))
97        log_info("COMMAND_TYPE:{}".format(COMMAND_TYPE))
98        log_info("TIME_OVER:{}".format(TIME_OVER))
99        log_info("PTYFLAG:{}".format(PTYFLAG))
100        log_info("select_timeout:{}".format(select_timeout))
101    except Exception as err:
102        log_error("build_example.json has error")
103        log_error(err)
104        raise err
105
106    @staticmethod
107    def exec_command_communicate(cmd, timeout=60):
108        try:
109            log_info("communicate_exec cmd is :{}".format(" ".join(cmd)))
110            proc = subprocess.Popen(
111                cmd,
112                stdout=subprocess.PIPE,
113                stderr=subprocess.PIPE,
114                encoding="utf-8",
115                universal_newlines=True,
116                errors='ignore',
117                cwd=script_path
118            )
119            out, err_ = proc.communicate(timeout=timeout)
120            out_res = out.splitlines() + err_.splitlines()
121            return out_res, proc.returncode
122        except Exception as errs:
123            log_error("An error occurred: {}".format(errs))
124            raise Exception("exec cmd time out,communicate")
125
126    @staticmethod
127    def resolve_res(cmd_res, flag_res):
128        for line_count, line in enumerate(cmd_res):
129            for flag_name, value in flag_res.items():
130                re_match = re.search(value["pattern"], line)
131                if re_match:
132                    log_info("【match success {}】:{}\n".format(line_count, line))  # 输出命令终端的显示
133                    if len(re_match.groups()) > 0:
134                        if isinstance(flag_res[flag_name]["flag"], bool):
135                            flag_res[flag_name]["flag"] = [re_match.group(1)]
136                        else:
137                            data = flag_res[flag_name]["flag"]
138                            data.append(re_match.group(1))
139                            flag_res[flag_name]["flag"] = data
140                    else:
141                        flag_res[flag_name]["flag"] = True
142        return flag_res
143
144    @staticmethod
145    def check_flags(flags, expect_dict=None, returncode=0):
146        new_dict = copy.deepcopy(flags)
147        if returncode != 0:
148            log_error("returncode != 0")
149            return 1
150        if expect_dict:
151            error_count = 0
152            for k in expect_dict.keys():
153                flags.pop(k)
154                if k in new_dict and new_dict[k]["flag"] != expect_dict[k]:
155                    error_count += 1
156            if error_count != 0:
157                log_error("【actual_result】:{}\n".format(new_dict))
158                return 1
159        check_li = [item for item in flags.values() if not item["flag"]]
160        log_info("【expect_result】:{}\n".format(expect_dict))
161        log_info("【actual_result】:{}\n".format(new_dict))
162        if len(check_li) > 0:
163            return 1
164        return 0
165
166    @staticmethod
167    def is_exist(path):
168        if os.path.exists(path):
169            return True
170        return False
171
172    @staticmethod
173    def same_element(list1, list2):
174        for el in list1:
175            if el not in list2:
176                return False
177        return True
178
179    @staticmethod
180    def print_error_line(cmd_res, is_success=False):
181        if is_success:
182            for ind, line in enumerate(cmd_res):
183                log_info("【{}】:{}".format(ind, line))
184        else:
185            for ind, line in enumerate(cmd_res):
186                log_error("【{}】:{}".format(ind, line))
187
188    @staticmethod
189    def get_build_only_gn_flags(para_value):
190        flags = copy.deepcopy(TestBuildOption.FLAGS)
191        expect_dict = {}
192
193        if para_value.lower() == "true":
194            expect_dict["ninja"] = False
195        else:
196            expect_dict["ninja"] = True
197
198        return flags, expect_dict
199
200    @staticmethod
201    def get_ccache_flags(para_value):
202        flags = copy.deepcopy(TestBuildOption.FLAGS)
203        expect_dict = {}
204        flags["ccache"] = {"pattern": r"Excuting gn command.*ohos_build_enable_ccache=true", "flag": False}
205
206        if para_value.lower() == "true":
207            expect_dict["ccache"] = True
208        else:
209            expect_dict["ccache"] = False
210
211        return flags, expect_dict
212
213    @staticmethod
214    def get_target_cpu_flags(para_value):
215        flags = copy.deepcopy(TestBuildOption.FLAGS)
216        expect_dict = {"loader": True}
217
218        flags["loader"] = {"pattern": r"loader args.*'target_cpu={}".format(para_value), "flag": False}
219
220        return flags, expect_dict
221
222    @staticmethod
223    def get_rename_last_log_flags(para_value):
224        flags = copy.deepcopy(TestBuildOption.FLAGS)
225        expect_dict = {}
226        return flags, expect_dict
227
228    @staticmethod
229    def get_enable_pycache_flags(para_value):
230        flags = copy.deepcopy(TestBuildOption.FLAGS)
231        expect_dict = {}
232        if para_value.lower() == "true":
233            expect_dict["pycache"] = True
234        else:
235            expect_dict["pycache"] = False
236        flags["pycache"] = {"pattern": r"Starting pycache daemon at", "flag": False}
237        flags["os_level"] = {"pattern": r"loader args.*os_level=([a-zA-Z]+)\'", "flag": False}
238        flags["root_dir"] = {"pattern": r"""loader args.*source_root_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
239        flags["gn_dir"] = {"pattern": r"""loader args.*gn_root_out_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
240        flags["start_end_time"] = {"pattern": r"(\d+-\d+-\d+ \d+:\d+:\d+)", "flag": False}
241        flags["cost_time"] = {"pattern": r"Cost time:.*(\d+:\d+:\d+)", "flag": False}
242        return flags, expect_dict
243
244    @staticmethod
245    def get_build_target_flags(para_value):
246        flags = copy.deepcopy(TestBuildOption.FLAGS)
247        flags["use_thin"] = {"pattern": r"Excuting gn command.*use_thin_lto=false.*", "flag": False}
248        flags["ninja_build_target"] = {"pattern": r"Excuting ninja command.*{}$".format(para_value), "flag": False}
249        expect_dict = {}
250        test_target_list = ['build_all_test_pkg', 'package_testcase', 'package_testcase_mlf']
251
252        if para_value.endswith('make_test') or para_value.split(':')[-1] in test_target_list:
253            expect_dict["use_thin"] = True
254            expect_dict["ninja_build_target"] = True
255        else:
256            expect_dict["use_thin"] = False
257            expect_dict["ninja_build_target"] = True
258        return flags, expect_dict
259
260    @staticmethod
261    def get_ninja_args_flags(para_value):
262        flags = copy.deepcopy(TestBuildOption.FLAGS)
263        expect_dict = {"ninja_args": True}
264        flags["ninja_args"] = {"pattern": r"Excuting ninja command.*{}".format(para_value), "flag": False}
265
266        return flags, expect_dict
267
268    @staticmethod
269    def get_full_compilation_flags(para_value):
270        flags = copy.deepcopy(TestBuildOption.FLAGS)
271        flags["full_compilation_gn"] = {"pattern": r"Excuting gn command.*use_thin_lto=false.*", "flag": False}
272        flags["full_compilation_ninja"] = {"pattern": r"Excuting ninja command.*make_all make_test$", "flag": False}
273        expect_dict = {}
274
275        if para_value in ["", "True"]:
276            expect_dict["full_compilation_gn"] = True
277            expect_dict["full_compilation_ninja"] = True
278        else:
279            expect_dict["full_compilation_gn"] = False
280            expect_dict["full_compilation_ninja"] = False
281
282        return flags, expect_dict
283
284    @staticmethod
285    def get_strict_mode_flags(para_value):
286        flags = copy.deepcopy(TestBuildOption.FLAGS)
287        expect_dict = {}
288
289        return flags, expect_dict
290
291    @staticmethod
292    def get_scalable_build_flags(para_value):
293        flags = copy.deepcopy(TestBuildOption.FLAGS)
294        expect_dict = {}
295
296        return flags, expect_dict
297
298    @staticmethod
299    def get_build_example_flags(para_value):
300        flags = copy.deepcopy(TestBuildOption.FLAGS)
301        build_example_file_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
302            os.path.abspath(__file__)))), "subsystem_config_example.json")
303        flags["build_example"] = {
304            "pattern": r"loader args.*example_subsystem_file=.*{}.*".format(build_example_file_path), "flag": False}
305        expect_dict = {}
306
307        if para_value.lower() == "true":
308            expect_dict["build_example"] = True
309        else:
310            expect_dict["build_example"] = False
311
312        return flags, expect_dict
313
314    @staticmethod
315    def get_build_platform_name_flags(para_value):
316        flags = copy.deepcopy(TestBuildOption.FLAGS)
317        expect_dict = {}
318
319        if para_value == "phone":
320            flags["build_platform"] = {
321                "pattern": r"loader args.*build_platform_name=phone", "flag": False}
322            expect_dict["build_platform"] = True
323
324        return flags, expect_dict
325
326    @staticmethod
327    def get_build_xts_flags(para_value):
328        flags = copy.deepcopy(TestBuildOption.FLAGS)
329        expect_dict = {}
330
331        flags["build_xts"] = {"pattern": r"loader args.*build_xts={}.*".format(para_value.capitalize()), "flag": False}
332
333        return flags, expect_dict
334
335    @staticmethod
336    def get_ignore_api_check_flags(para_value):
337        flags = copy.deepcopy(TestBuildOption.FLAGS)
338        expect_dict = {}
339
340        if para_value == "":
341            flags["ignore_api_check"] = {"pattern": r"loader args.*ignore_api_check=\['xts', 'common', 'testfwk'\]",
342                                         "flag": False}
343        else:
344            flags["ignore_api_check"] = {
345                "pattern": r"loader args.*ignore_api_check=(.*)\",",
346                "flag": False}
347
348        return flags, expect_dict
349
350    @staticmethod
351    def get_load_test_config_flags(para_value):
352        flags = copy.deepcopy(TestBuildOption.FLAGS)
353        expect_dict = {}
354
355        flags["load_test"] = {"pattern": r"loader args.*load_test_config={}.*".format(para_value.capitalize()),
356                              "flag": False}
357
358        return flags, expect_dict
359
360    @staticmethod
361    def get_build_type_flags(para_value):
362        flags = copy.deepcopy(TestBuildOption.FLAGS)
363        expect_dict = {}
364        flags["build_type_debug"] = {"pattern": r"Excuting gn command.*is_debug=true",
365                                     "flag": False}
366        flags["build_type_profile"] = {"pattern": r"Excuting gn command.*is_profile=true",
367                                       "flag": False}
368        flags["build_type_none"] = {
369            "pattern": r'Excuting gn command.*ohos_build_type=\\"debug\\"',
370            "flag": False}
371
372        if para_value == "debug":
373            expect_dict["build_type_debug"] = True
374            expect_dict["build_type_profile"] = False
375            expect_dict["build_type_none"] = True
376        elif para_value == "profile":
377            expect_dict["build_type_debug"] = False
378            expect_dict["build_type_profile"] = True
379            expect_dict["build_type_none"] = True
380        else:
381            expect_dict["build_type_debug"] = False
382            expect_dict["build_type_profile"] = False
383            expect_dict["build_type_none"] = True
384
385        return flags, expect_dict
386
387    @staticmethod
388    def get_log_level_flags(para_value):
389        flags = copy.deepcopy(TestBuildOption.FLAGS)
390
391        flags["tracelog"] = {"pattern": r"Excuting gn command.*--tracelog=.*/gn_trace.log.*--ide=json", "flag": False}
392        flags["ninja_v"] = {"pattern": r"Excuting ninja command.*-v.*", "flag": False}
393        expect_dict = {}
394
395        if para_value == "info":
396            expect_dict["tracelog"] = False
397            expect_dict["ninja_v"] = False
398        elif para_value == "debug":
399            expect_dict["tracelog"] = True
400            expect_dict["ninja_v"] = True
401
402        return flags, expect_dict
403
404    @staticmethod
405    def get_test_flags(para_value):
406        flags = copy.deepcopy(TestBuildOption.FLAGS)
407        expect_dict = {}
408        flags["notest"] = {"pattern": r'Excuting gn command.*ohos_test_args=\\"notest\\"',
409                           "flag": False}
410        flags["xts"] = {"pattern": r'Excuting gn command.*ohos_xts_test_args=\\"xxx\\"',
411                        "flag": False}
412
413        if para_value == "":
414            expect_dict["notest"] = False
415            expect_dict["xts"] = False
416        elif para_value == "notest xxx":
417            expect_dict["notest"] = True
418            expect_dict["xts"] = False
419        elif para_value in ["xts xxx", "xxx xts"]:
420            expect_dict["notest"] = False
421            expect_dict["xts"] = True
422        elif para_value == "xxx ccc":
423            expect_dict["notest"] = False
424            expect_dict["xts"] = False
425
426        return flags, expect_dict
427
428    @staticmethod
429    def get_gn_args_flags(para_value):
430        flags = copy.deepcopy(TestBuildOption.FLAGS)
431        expect_dict = {}
432
433        flags["device_type"] = {
434            "pattern": r'Excuting gn command.*device_type=\\"default\\"', "flag": False}
435        flags["build_variant"] = {
436            "pattern": r'Excuting gn command.*build_variant=\\"root\\"', "flag": False}
437        flags["para"] = {
438            "pattern": r'Excuting gn command.*{}'.format(para_value), "flag": False}
439
440        return flags, expect_dict
441
442    @staticmethod
443    def get_fast_rebuild_flags(para_value):
444        flags = copy.deepcopy(TestBuildOption.FLAGS)
445        expect_dict = {}
446
447        if para_value.lower() == "true" or para_value == "":
448            expect_dict["gn"] = False
449            expect_dict["done"] = False
450        return flags, expect_dict
451
452    @staticmethod
453    def get_skip_partlist_check_flags(para_value):
454        flags = copy.deepcopy(TestBuildOption.FLAGS)
455        expect_dict = {}
456        partlist_flag = True if para_value.lower() == "true" else False
457        flags["partlist"] = {"pattern": r"loader args.*skip_partlist_check={}".format(partlist_flag), "flag": False}
458        return flags, expect_dict
459
460    @staticmethod
461    def get_deps_guard_flags(para_value):
462        flags = copy.deepcopy(TestBuildOption.FLAGS)
463        expect_dict = {}
464        flags["os_level"] = {"pattern": r"loader args.*os_level=([a-zA-Z]+)\'", "flag": False}
465        return flags, expect_dict
466
467    @staticmethod
468    def get_compute_overlap_rate_flags(para_value):
469        flags = copy.deepcopy(TestBuildOption.FLAGS)
470        flags["c_targets"] = {"pattern": r"c targets overlap rate statistics", "flag": False}
471        flags["c_overall"] = {"pattern": r"c overall build overlap rate", "flag": False}
472        expect_dict = {}
473
474        if para_value.lower() in ("true", ""):
475            expect_dict["c_targets"] = True
476            expect_dict["c_overall"] = True
477        else:
478            expect_dict["c_targets"] = False
479            expect_dict["c_overall"] = False
480        return flags, expect_dict
481
482    @staticmethod
483    def get_stat_ccache_flags(para_value):
484        flags = copy.deepcopy(TestBuildOption.FLAGS)
485        expect_dict = {}
486        flags["ccache_dir"] = {"pattern": r"ccache_dir =.*, ccache_exec =.*", "flag": False}
487        flags["ccache_summary"] = {"pattern": r"ccache summary", "flag": False}
488
489        if para_value.lower() in ("true", ""):
490            expect_dict["ccache_dir"] = True
491            expect_dict["ccache_summary"] = True
492        else:
493            expect_dict["ccache_dir"] = False
494            expect_dict["ccache_summary"] = False
495
496        return flags, expect_dict
497
498    @staticmethod
499    def get_keep_ninja_going_flags(para_value):
500        flags = copy.deepcopy(TestBuildOption.FLAGS)
501        expect_dict = {}
502        if para_value.lower() == "true":
503            flags.setdefault("ninja", {}).setdefault("pattern", r"Excuting ninja command.*-k1000000.*")
504
505        return flags, expect_dict
506
507    @staticmethod
508    def get_common_flags(para_value, check_file=False):
509        flags = copy.deepcopy(TestBuildOption.FLAGS)
510        expect_dict = {}
511        if check_file:
512            flags["os_level"] = {"pattern": r"loader args.*os_level=([a-zA-Z]+)\'", "flag": False}
513            flags["root_dir"] = {"pattern": r"""loader args.*source_root_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
514            flags["gn_dir"] = {"pattern": r"""loader args.*gn_root_out_dir="([a-zA-Z\d/\\_]+)""""", "flag": False}
515            flags["start_end_time"] = {"pattern": r"(\d+-\d+-\d+ \d+:\d+:\d+)", "flag": False}
516            flags["cost_time"] = {"pattern": r"Cost time:.*(\d+:\d+:\d+)", "flag": False}
517        return flags, expect_dict
518
519    @staticmethod
520    def check_file_res(resolve_result, file_list, is_real_path=False, time_over=TIME_OVER):
521        root_dir = resolve_result["root_dir"]["flag"][0]
522        gn_dir = resolve_result["gn_dir"]["flag"][0]
523        start_time_str = resolve_result["start_end_time"]["flag"][0]
524        end_time_str = resolve_result["start_end_time"]["flag"][-1]
525
526        start_time = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
527        end_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")
528
529        start_timestamp = int(datetime.timestamp(start_time))
530        end_timestamp = int(datetime.timestamp(end_time))
531
532        file_list_new = []
533        for tmp_file in file_list:
534            real_path = tmp_file if is_real_path else os.path.join(root_dir, gn_dir, tmp_file)
535            if os.path.exists(real_path):
536                file_list_new.append(real_path)
537        if not file_list_new:
538            log_info("all file {} not exist".format(file_list))
539            return True
540        file_timestamp_li = {tmp_file: int(os.stat(tmp_file).st_mtime) for tmp_file in file_list_new}
541
542        cost_time_str = resolve_result["cost_time"]["flag"][0]
543        cost_time = datetime.strptime(cost_time_str, "%H:%M:%S")
544        cost_time_int = timedelta(hours=cost_time.hour, minutes=cost_time.minute, seconds=cost_time.second)
545        total_seconds = int(cost_time_int.total_seconds())
546        new_start_timestamp = end_timestamp - total_seconds
547        log_info("log_cost_time:{}s".format(total_seconds))
548        log_info("start_timestamp:{}".format(start_timestamp))
549        log_info("new_start_timestamp:{}".format(new_start_timestamp))
550        log_info("end_timestamp:{}".format(end_timestamp))
551        file_flag = False
552        file_tmp_flag_li = []
553
554        for file_tmp, file_timestamp in file_timestamp_li.items():
555            log_info("{}:{}".format(file_tmp, file_timestamp))
556            file_tmp_flag = new_start_timestamp - time_over <= file_timestamp <= end_timestamp + time_over
557            file_tmp_flag_li.append(file_tmp_flag)
558
559        if all(file_tmp_flag_li):
560            file_flag = True
561
562        return file_flag
563
564    @pytest.mark.parametrize('cpu_para', ['arm', 'arm64', 'x86_64'])
565    def test_target_cpu(self, cpu_para):
566        """
567        test target_cpu parameter
568        """
569        cmd = self.CMD.format('--target-cpu', cpu_para).split()
570
571        result = self.get_match_result(cmd, "target_cpu", cpu_para)
572
573        assert result == 0, "target cpu para {} failed".format(cpu_para)
574
575    @pytest.mark.parametrize('ccache_para', ['True', 'False'])
576    def test_ccache(self, ccache_para):
577        """
578        test ccache_para parameter
579        """
580        cmd = self.CMD.format('--ccache', ccache_para).split()
581
582        result = self.get_match_result(cmd, "ccache", ccache_para)
583
584        assert result == 0, "ccache para {} failed".format(ccache_para)
585
586    @pytest.mark.parametrize('rename_last_log_para', ['True', 'False'])
587    def test_rename_last_log(self, rename_last_log_para):
588        """
589        test rename_last_log parameter
590        """
591        cmd = self.CMD.format('--rename-last-log', rename_last_log_para).split()
592        mtime = ""
593        file_name = ""
594
595        if self.is_exist(self.LOG_PATH):
596            mtime = os.stat(self.LOG_PATH).st_mtime
597            file_name = '{}/build.{}.log'.format(self.LOG_PATH, mtime)
598        log_info("test_rename_last_log,file name is {}".format(file_name))
599        result = self.get_match_result(cmd, "rename_last_log", rename_last_log_para)
600        new_path = os.path.join(os.path.dirname(self.LOG_PATH), "build.{}.log".format(mtime))
601        log_info("test_rename_last_log,new path is {}".format(new_path))
602
603        if rename_last_log_para == 'True':
604            assert self.is_exist(new_path) and result == 0, "rename_last_log para {} failed".format(
605                rename_last_log_para)
606        elif rename_last_log_para == 'False':
607            assert not self.is_exist(new_path) and result == 0, "rename_last_log para {} failed".format(
608                rename_last_log_para)
609
610    @pytest.mark.parametrize('build_target', ['', 'package_testcase'])
611    def test_build_target(self, build_target):
612        """
613        test build_target parameter
614        """
615        cmd = self.CMD.format('--build-target', build_target).split()
616
617        result = self.get_match_result(cmd, "build_target", build_target)
618
619        assert result == 0, "build target para {} failed".format(build_target)
620
621    @pytest.mark.parametrize('ninja_args', ['-dkeeprsp'])
622    def test_ninja_args(self, ninja_args):
623        """
624        test ninja_args parameter
625        """
626        cmd = self.NINJIA_CMD.format(ninja_args).split()
627
628        result = self.get_match_result(cmd, "ninja_args", ninja_args)
629
630        assert result == 0, "ninja args para {} failed".format(ninja_args)
631
632    @pytest.mark.parametrize('full_compilation', ['True', 'False', ''])
633    def test_full_compilation(self, full_compilation):
634        """
635        test full_compilation parameter
636        """
637        cmd = self.CMD.format('--full-compilation', full_compilation).split()
638
639        result = self.get_match_result(cmd, "full_compilation", full_compilation)
640
641        assert result == 0, "full compilation para {} failed".format(full_compilation)
642
643    @pytest.mark.parametrize('strict_mode', ['True', 'False', 'false'])
644    def test_strict_mode(self, strict_mode):
645        """
646        test strict_mode parameter
647        """
648        cmd = self.CMD.format('--strict-mode', strict_mode).split()
649
650        result = self.get_match_result(cmd, "strict_mode", strict_mode)
651
652        assert result == 0, "strict mode para {} failed".format(strict_mode)
653
654    @pytest.mark.parametrize('scalable_build', ['True', 'False', 'false'])
655    def test_scalable_build(self, scalable_build):
656        """
657        test scalable_build parameter
658        """
659        cmd = self.CMD.format('--scalable-build', scalable_build).split()
660
661        result = self.get_match_result(cmd, "scalable_build", scalable_build)
662
663        assert result == 0, "scalable build para {} failed".format(scalable_build)
664
665    @pytest.mark.parametrize('build_example', ['True', 'False', 'true', 'false'])
666    def test_build_example(self, build_example):
667        """
668        test build_example parameter
669        """
670        cmd = self.CMD.format('--build-example', build_example).split()
671
672        result = self.get_match_result(cmd, "build_example", build_example)
673
674        assert result == 0, "build example para {} failed".format(build_example)
675
676    @pytest.mark.parametrize('build_platform_name', ['phone'])
677    def test_build_platform_name(self, build_platform_name):
678        """
679        test build_platform_name parameter
680        """
681        cmd = self.CMD.format('--build-platform-name', build_platform_name).split()
682
683        result = self.get_match_result(cmd, "build_platform_name", build_platform_name)
684
685        assert result == 0, "build platform name para {} failed".format(build_platform_name)
686
687    @pytest.mark.parametrize('build_xts', ['True', 'False', 'true', 'false'])
688    def test_build_xts(self, build_xts):
689        """
690        test build_xts parameter
691        """
692        cmd = self.CMD.format('--build-xts', build_xts).split()
693
694        result = self.get_match_result(cmd, "build_xts", build_xts)
695
696        assert result == 0, "build xts para {} failed".format(build_xts)
697
698    @pytest.mark.parametrize('ignore_api_check', ['common xts', ''])
699    def test_ignore_api_check(self, ignore_api_check):
700        """
701        test ignore_api_check parameter
702        """
703        para_list = ignore_api_check.split()
704        cmd = self.CMD.format('--ignore-api-check', ignore_api_check).split()
705        resolve_result, result, _ = self.get_common_spec_result(ignore_api_check, cmd,
706                                                                para_type="ignore_api_check")
707        if result != 0:
708            assert result == 0, "ignore api check para {} failed".format(ignore_api_check)
709        else:
710            if ignore_api_check:
711                ignore_str = resolve_result.get("ignore_api_check").get("flag")[0]  # ['xts', 'common']
712                log_info("ignore_str is {}".format(ignore_str))
713                ignor_li = eval(ignore_str)
714                log_info("ignor_li is {0},type is {1}".format(ignor_li, type(ignor_li)))
715                assert self.same_element(para_list, ignor_li) and result == 0, "ignore api check para {} failed".format(
716                    ignore_api_check)
717
718    @pytest.mark.parametrize('load_test_config', ['True', 'False', 'true', 'false'])
719    def test_load_test_config(self, load_test_config):
720        """
721        test load_test_config parameter
722        """
723        cmd = self.CMD.format('--load-test-config', load_test_config).split()
724
725        result = self.get_match_result(cmd, "load_test_config", load_test_config)
726
727        assert result == 0, "load test config para {} failed".format(load_test_config)
728
729    @pytest.mark.parametrize('build_type', ['debug', 'release', 'profile'])
730    def test_build_type(self, build_type):
731        """
732        test build_type parameter
733        """
734        cmd = self.CMD.format('--build-type', build_type).split()
735        result = self.get_match_result(cmd, "build_type", build_type)
736
737        assert result == 0, "build type para {} failed".format(build_type)
738
739    @pytest.mark.parametrize('log_level', ['info', 'debug'])
740    def test_log_level(self, log_level):
741        """
742        test log_level parameter
743        """
744        cmd = self.CMD.format('--log-level', log_level).split()
745
746        result = self.get_match_result(cmd, "log_level", log_level)
747
748        assert result == 0, "log level para {} failed".format(log_level)
749
750    @pytest.mark.parametrize('build_only_gn', ['True', 'False'])
751    def test_build_only_gn(self, build_only_gn):
752        """
753        test build_only_gn parameter
754        """
755        cmd = self.CMD.format('--build-only-gn', build_only_gn).split()
756
757        result = self.get_match_result(cmd, "build_only_gn", build_only_gn)
758
759        assert result == 0, "build only gn para {} failed".format(build_only_gn)
760
761    @pytest.mark.parametrize('test', ['', 'notest xxx', 'xts xxx', 'xxx xts'])
762    def test_test(self, test):
763        """
764        test test parameter
765        """
766        cmd = self.CMD.format('--test', test).split()
767
768        result = self.get_match_result(cmd, "test", test)
769
770        assert result == 0, "test para {} failed".format(test)
771
772    @pytest.mark.parametrize('gn_args', ['', 'is_debug=true'])
773    def test_gn_args(self, gn_args):
774        """
775        test gn_args parameter
776        """
777        cmd = self.CMD.format('--gn-args', gn_args).split()
778
779        result = self.get_match_result(cmd, "gn_args", gn_args)
780
781        assert result == 0, "gn args para {} failed".format(gn_args)
782
783    @pytest.mark.parametrize('fast_rebuild', ['True', 'False', ''])
784    def test_fast_rebuild(self, fast_rebuild):
785        """
786        test fast_rebuild parameter
787        """
788        cmd = self.CMD.format('--fast-rebuild', fast_rebuild).split()
789
790        result = self.get_match_result(cmd, "fast_rebuild", fast_rebuild)
791
792        assert result == 0, "fast rebuild para {} failed".format(fast_rebuild)
793
794    @pytest.mark.parametrize('going_option', ['True', 'False'])
795    def test_keep_ninja_going(self, going_option):
796        """
797        test keep_ninja_going parameter
798        """
799        cmd = self.CMD.format('--keep-ninja-going', going_option).split()
800
801        result = self.get_match_result(cmd, "keep_ninja_going", going_option)
802
803        assert result == 0, "keep_ninja_going para {} failed".format(going_option)
804
805    @pytest.mark.parametrize('variant_option', ['user', 'root'])
806    def test_build_variant(self, variant_option):
807        """
808        test build_variant parameter
809        """
810        cmd = self.CMD.format('--build-variant', variant_option).split()
811
812        resolve_result, result, _ = self.get_common_spec_result(variant_option, cmd)
813        if result != 0:
814            assert result == 0, "build_variant para {} failed".format(variant_option)
815        else:
816            root_dir = resolve_result.get("root_dir").get("flag")[0]
817            gn_dir = resolve_result.get("gn_dir").get("flag")[0]
818
819            ohos_para_path = "packages/phone/system/etc/param/ohos.para"
820            if os.path.exists(os.path.join(root_dir, gn_dir, ohos_para_path)):
821                check_file_li = [ohos_para_path]
822                check_file_flag = self.check_file_res(resolve_result, check_file_li)
823                assert result == 0 and check_file_flag, "build_variant para {} failed".format(variant_option)
824            else:
825                assert result == 0, "build_variant para {} failed".format(variant_option)
826
827    @pytest.mark.parametrize('device_option', ['default', 'unkown'])
828    def test_device_type(self, device_option):
829        """
830        test device_type parameter
831        """
832        cmd = self.CMD.format('--device-type', device_option).split()
833
834        resolve_result, result, _ = self.get_common_spec_result(device_option, cmd)
835        if result != 0:
836            if device_option == "unkown":
837                assert result == 1, "device_type para {} failed".format(device_option)
838            else:
839                assert result == 0, "device_type para {} failed".format(device_option)
840
841        else:
842            if device_option == "default":
843                assert result == 0, "device_type para {} failed".format(device_option)
844            else:
845                check_file_li = ["packages/phone/system/etc/param/ohos.para"]
846                check_file_flag = self.check_file_res(resolve_result, check_file_li)
847                assert result == 0 and check_file_flag, "device_type para {} failed".format(device_option)
848
849    @pytest.mark.parametrize('archive_option', ['True', 'False'])
850    def test_archive_image(self, archive_option):
851        """
852        test archive_image parameter
853        """
854        cmd = self.CMD.format('--archive-image', archive_option).split()
855
856        resolve_result, result, cmd_res = self.get_common_spec_result(archive_option, cmd)
857        if result != 0:
858            assert result == 0, "archive_image para {} failed".format(archive_option)
859        else:
860            root_dir = resolve_result.get("root_dir").get("flag")[0]
861            gn_dir = resolve_result.get("gn_dir").get("flag")[0]
862            image_path = os.path.join("packages", "phone", "images")
863            if archive_option.lower() == "true":
864                if os.path.exists(os.path.join(root_dir, gn_dir, image_path)):
865                    check_file_li = ["images.tar.gz"]
866                    check_file_flag = self.check_file_res(resolve_result, check_file_li)
867                    assert result == 0 and check_file_flag, "archive_image para {} failed".format(
868                        archive_option)
869                else:
870                    archive_flags = {"archive_image": {"pattern": r'"--archive-image" option not work', "flag": False}}
871                    archive_resolve_result = self.resolve_res(cmd_res, archive_flags)
872                    archive_result = self.check_flags(archive_resolve_result)
873                    assert result == 0 and archive_result == 0, "archive_image para {} failed".format(archive_option)
874            else:
875                assert result == 0, "archive_image para {} failed".format(archive_option)
876
877    @pytest.mark.parametrize('rom_option', ['True', 'False'])
878    def test_rom_size_statistics(self, rom_option):
879        """
880        test rom_size_statistics parameter
881        """
882        cmd = self.CMD.format('--rom-size-statistics', rom_option).split()
883
884        resolve_result, result, _ = self.get_common_spec_result(rom_option, cmd, ptyflag=True)
885        if result != 0:
886            assert result == 0, "rom_size_statistics para {} failed".format(rom_option)
887        else:
888            os_level = resolve_result.get("os_level").get("flag")[0]
889            log_info("os_level:{}".format(os_level))
890            if os_level in ("mini", "small"):
891                assert result == 0, "rom_size_statistics para {} failed".format(rom_option)
892            else:
893                check_file_li = ["rom_statistics_table.json"]
894                check_file_flag = self.check_file_res(resolve_result, check_file_li)
895                if rom_option.lower() == "false":
896                    assert result == 0 and not check_file_flag, "rom_option para {} failed".format(
897                        rom_option)
898                else:
899                    assert result == 0 and check_file_flag, "rom_option para {} failed".format(rom_option)
900
901    @pytest.mark.parametrize('ccache_option', ['True', 'False'])
902    def test_stat_ccache(self, ccache_option):
903        """
904        test stat_ccache parameter
905        """
906        cmd = self.CMD.format('--stat-ccache', ccache_option).split()
907
908        result = self.get_match_result(cmd, "stat_ccache", ccache_option)
909
910        assert result == 0, "stat_ccache para {} failed".format(ccache_option)
911
912    @pytest.mark.parametrize('warning_option', ['True', 'False'])
913    def test_get_warning_list(self, warning_option):
914        """
915        test get_warning_list parameter
916        """
917        cmd = self.CMD.format('--get-warning-list', warning_option).split()
918        resolve_result, result, _ = self.get_common_spec_result(warning_option, cmd)
919        if result != 0:
920            assert result == 0, "get_warning_list para {} failed".format(warning_option)
921        else:
922            check_file_li = ["packages/WarningList.txt"]
923            check_file_flag = self.check_file_res(resolve_result, check_file_li)
924            if warning_option.lower() == "false":
925                assert result == 0 and not check_file_flag, "get_warning_list para {} failed".format(
926                    warning_option)
927            else:
928                assert result == 0 and check_file_flag, "get_warning_list para {} failed".format(warning_option)
929
930    @pytest.mark.parametrize('ninja_option', ["True", "False", "true", "false"])
931    def test_generate_ninja_trace(self, ninja_option):
932        """
933        test generate_ninja_trace parameter
934        """
935        cmd = self.CMD.format('--generate-ninja-trace', ninja_option).split()
936        resolve_result, result, _ = self.get_common_spec_result(ninja_option, cmd)
937        if result != 0:
938            assert result == 0, "generate_ninja_trace para {} failed".format(ninja_option)
939        else:
940            check_file_li = ["build.trace.gz", "sorted_action_duration.txt"]
941            check_file_flag = self.check_file_res(resolve_result, check_file_li)
942            if ninja_option.lower() == "false":
943                assert result == 0 and not check_file_flag, "generate_ninja_trace para {} failed".format(
944                    ninja_option)
945            else:
946                assert result == 0 and check_file_flag, "generate_ninja_trace para {} failed".format(
947                    ninja_option)
948
949    @pytest.mark.parametrize('overlap_option', ['True', 'False'])
950    def test_compute_overlap_rate(self, overlap_option):
951        """
952        test compute_overlap_rate parameter
953        """
954        cmd = self.CMD.format('--compute-overlap-rate', overlap_option).split()
955        result = self.get_match_result(cmd, "compute_overlap_rate", overlap_option)
956
957        assert result == 0, "compute_overlap_rate para {} failed".format(overlap_option)
958
959    @pytest.mark.parametrize('clean_option', ['True', 'False'])
960    def test_clean_args(self, clean_option):
961        """
962        test clean-args parameter
963        """
964        cmd = self.CMD.format('--clean-args', clean_option).split()
965        resolve_result, result, _ = self.get_common_spec_result(clean_option, cmd)
966        if result != 0:
967            assert result == 0, "clean_args para {} failed".format(clean_option)
968        else:
969            root_dir = resolve_result.get("root_dir").get("flag")[0]
970            json_path = os.path.join(root_dir, "out", "hb_args")
971            json_file_li = [file for file in os.listdir(json_path) if os.path.splitext(file)[-1] == ".json"]
972            log_info("test_clean_args, json_file_li:{}".format(json_file_li))
973            if clean_option.lower() == "false":
974                exist_flag = bool(json_file_li)
975            else:
976                exist_flag = not json_file_li
977
978            assert result == 0 and exist_flag, "clean_args para {} failed".format(clean_option)
979
980    @pytest.mark.parametrize('deps_guard_option', ['True', 'False'])
981    def test_deps_guard(self, deps_guard_option):
982        """
983        test deps-guard parameter
984        """
985        cmd = self.CMD.format('--deps-guard', deps_guard_option).split()
986        resolve_result, result, cmd_res = self.get_common_spec_result(deps_guard_option, cmd,
987                                                                      para_type="deps_guard")
988        if result != 0:
989            assert result == 0, "deps_guard para {}failed.".format(deps_guard_option)
990        else:
991            os_level = resolve_result.get("os_level").get("flag")[0]
992            log_info("test_deps_guard,os_level:{}".format(os_level))
993            if deps_guard_option.lower() == "false" and os_level == "standard":
994                standard_flags = {"Scanning": {"pattern": r"Scanning.*ELF files now", "flag": False},
995                                  "rules": {"pattern": r"All rules passed", "flag": False}}
996                standard_resolve_result = self.resolve_res(cmd_res, standard_flags)
997                log_info("continue match Scanning and rules ...")
998                standard_result = self.check_flags(standard_resolve_result)
999                assert result == 0 and standard_result == 0, "deps_guard para {},os_level {} failed.".format(
1000                    deps_guard_option, os_level)
1001            else:
1002                assert result == 0, "deps_guard para {},os_level {} failed.".format(deps_guard_option, os_level)
1003
1004    @pytest.mark.parametrize('partlist_option', ['True', 'False'])
1005    def test_skip_partlist_check(self, partlist_option):
1006        """
1007        test skip-partlist-check parameter
1008        """
1009        cmd = self.CMD.format('--skip-partlist-check', partlist_option).split()
1010        result = self.get_match_result(cmd, "skip_partlist_check", partlist_option)
1011        assert result == 0, "skip_partlist_check para {} failed".format(partlist_option)
1012
1013    @pytest.mark.parametrize('enable_pycache', ['True', 'true', 'False', 'false'])
1014    def test_enable_pycache(self, enable_pycache):
1015        """
1016        test enable_pycache parameter
1017        """
1018        cmd = self.CMD.format('--enable-pycache', enable_pycache).split()
1019
1020        pycache_dir = os.environ.get('CCACHE_BASE')
1021        if not pycache_dir:
1022            pycache_dir = os.environ.get('HOME')
1023        pycache_config = os.path.join(pycache_dir, '.pycache', '.config')
1024        resolve_result, result, _ = self.get_common_spec_result(enable_pycache, cmd,
1025                                                                para_type="enable_pycache", ptyflag=True)
1026        if result != 0:
1027            assert result == 0, "enable pycache para {} failed".format(enable_pycache)
1028        else:
1029            check_file_li = [pycache_config]
1030            check_file_flag = self.check_file_res(resolve_result, check_file_li, is_real_path=True)
1031
1032            if enable_pycache.lower() == "true":
1033                assert result == 0 and check_file_flag, "enable pycache para {} failed".format(enable_pycache)
1034            else:
1035                assert result == 0 and not check_file_flag, "enable pycache para {} failed".format(enable_pycache)
1036
1037    def exec_command_select(self, cmd, timeout=60, ptyflag=False):
1038        out_queue = queue.Queue()
1039        log_info("select_exec cmd is :{}".format(" ".join(cmd)))
1040        if not ptyflag:
1041            try:
1042                proc = subprocess.Popen(
1043                    cmd,
1044                    stdout=subprocess.PIPE,
1045                    stderr=subprocess.PIPE,
1046                    encoding="utf-8",
1047                    universal_newlines=True,
1048                    errors='ignore'
1049                )
1050                start_time = time.time()
1051                while True:
1052                    if timeout and time.time() - start_time > timeout:
1053                        raise Exception("exec cmd time out,select")
1054                    ready_to_read, _, _ = select.select([proc.stdout, proc.stderr], [], [], self.select_timeout)
1055                    for stream in ready_to_read:
1056                        output = stream.readline().strip()
1057                        if output:
1058                            out_queue.put(output)
1059                    if proc.poll() is not None:
1060                        break
1061                returncode = proc.wait()
1062                out_res = list(out_queue.queue)
1063                return out_res, returncode
1064            except Exception as err_:
1065                log_error("An error occurred: {}".format(err_))
1066                raise Exception(err_)
1067        else:
1068            try:
1069                master, slave = pty.openpty()
1070                proc = subprocess.Popen(
1071                    cmd,
1072                    stdin=slave,
1073                    stdout=slave,
1074                    stderr=slave,
1075                    encoding="utf-8",
1076                    universal_newlines=True,
1077                    errors='ignore'
1078                )
1079                start_time = time.time()
1080                incomplete_line = ""
1081                while True:
1082                    if timeout and time.time() - start_time > timeout:
1083                        raise Exception("exec cmd time out,select")
1084                    ready_to_read, _, _ = select.select([master, ], [], [], self.select_timeout)
1085                    for stream in ready_to_read:
1086                        output_bytes = os.read(stream, 1024)
1087                        output = output_bytes.decode('utf-8')
1088                        lines = (incomplete_line + output).split("\n")
1089                        for line in lines[:-1]:
1090                            line = line.strip()
1091                            if line:
1092                                out_queue.put(line)
1093                        incomplete_line = lines[-1]
1094                    if proc.poll() is not None:
1095                        break
1096                returncode = proc.wait()
1097                out_res = list(out_queue.queue)
1098                return out_res, returncode
1099            except Exception as err_:
1100                log_error("An error occurred: {}".format(err_))
1101                raise Exception(err_)
1102
1103    def get_match_result(self, cmd, para_type, para_value, ptyflag=PTYFLAG):
1104        cmd_res, returncode = self.exec_command(cmd, ptyflag=ptyflag)
1105        before_flags, expect_dict = self.get_match_flags(para_type, para_value)
1106        flag_res = self.resolve_res(cmd_res, before_flags)
1107        result = self.check_flags(flag_res, expect_dict, returncode)
1108        if result == 1:
1109            self.print_error_line(cmd_res)
1110        else:
1111            self.print_error_line(cmd_res, is_success=True)
1112        return result
1113
1114    def get_match_flags(self, para_type, para_value):
1115        method_name = "get_{}_flags".format(para_type)
1116        if hasattr(self, method_name):
1117            method = self.__getattribute__(method_name)
1118            flags, expect_dict = method(para_value)
1119            return flags, expect_dict
1120        return None, None
1121
1122    def get_common_spec_result(self, option, cmd, para_type=None, ptyflag=PTYFLAG):
1123        if not para_type:
1124            flag_res, expect_dict = self.get_common_flags(option, check_file=True)
1125        else:
1126            flag_res, expect_dict = self.get_match_flags(para_type, option)
1127        cmd_res, returncode = self.exec_command(cmd, ptyflag=ptyflag)
1128        resolve_result = self.resolve_res(cmd_res, flag_res)
1129        result = self.check_flags(resolve_result, expect_dict, returncode)
1130        if result == 1:
1131            self.print_error_line(cmd_res)
1132        else:
1133            self.print_error_line(cmd_res, is_success=True)
1134        return resolve_result, result, cmd_res
1135
1136    def exec_command(self, cmd, ptyflag=PTYFLAG, timeout=TIMEOUT):
1137        if TestBuildOption.COMMAND_TYPE == "select":
1138            return self.exec_command_select(cmd, timeout=timeout, ptyflag=ptyflag)
1139        else:
1140            return self.exec_command_communicate(cmd, timeout=timeout)