1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import sys
21import re
22import subprocess
23
24from datetime import datetime
25
26from util.log_util import LogUtil
27from hb.helper.no_instance import NoInstance
28from exceptions.ohos_exception import OHOSException
29from containers.status import throw_exception
30
31
32class SystemUtil(metaclass=NoInstance):
33
34    @staticmethod
35    def exec_command(cmd: list, log_path='out/build.log', exec_env=None, log_mode='normal', **kwargs):
36        useful_info_pattern = re.compile(r'\[\d+/\d+\].+')
37        is_log_filter = kwargs.pop('log_filter', False)
38        if log_mode == 'silent':
39            is_log_filter = True
40        if '' in cmd:
41            cmd.remove('')
42        if not os.path.exists(os.path.dirname(log_path)):
43            os.makedirs(os.path.dirname(log_path), exist_ok=True)
44        if (sys.argv[1] == 'build' and
45            len(sys.argv) == 5 and
46            '-i' in sys.argv[3:] and
47            {'-t', '-test'} & set(sys.argv[3:])):
48            cmd.extend(['-t', 'both'])
49        if (sys.argv[1] == 'build' and
50            len(sys.argv) == 4 and
51            sys.argv[-1] == '-t'):
52            cmd.extend(['-t', 'onlytest'])
53        with open(log_path, 'at', encoding='utf-8') as log_file:
54            LogUtil.hb_info("start run hpm command")
55            process = subprocess.Popen(cmd,
56                                       stdout=subprocess.PIPE,
57                                       stderr=subprocess.STDOUT,
58                                       encoding='utf-8',
59                                       env=exec_env,
60                                       **kwargs)
61            for line in iter(process.stdout.readline, ''):
62                log_file.write(line)
63                if is_log_filter:
64                    info = re.findall(useful_info_pattern, line)
65                    if len(info):
66                        LogUtil.hb_info(info[0], mode=log_mode)
67                else:
68                    LogUtil.hb_info(line)
69
70        process.wait()
71        LogUtil.hb_info("end hpm command")
72        ret_code = process.returncode
73
74        if ret_code != 0:
75            LogUtil.get_failed_log(log_path)
76
77    @staticmethod
78    def get_current_time(time_type: str = 'default'):
79        if time_type == 'timestamp':
80            return int(datetime.utcnow().timestamp() * 1000)
81        if time_type == 'datetime':
82            return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
83        return datetime.now().replace(microsecond=0)
84
85
86class ExecEnviron:
87    def __init__(self):
88        self._env = None
89
90    @property
91    def allenv(self):
92        return self._env
93
94    @property
95    def allkeys(self):
96        if self._env is None:
97            return []
98        return list(self._env.keys())
99
100    def initenv(self):
101        self._env = os.environ.copy()
102
103    def allow(self, allowed_vars: list):
104        if self._env is not None:
105            allowed_env = {k: v for k, v in self._env.items() if k in allowed_vars}
106            self._env = allowed_env
107