1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2021 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import sys
17import os
18import argparse
19import shutil
20import xml.etree.ElementTree as ET
21
22sys.path.append(
23    os.path.dirname(os.path.dirname(os.path.dirname(
24        os.path.abspath(__file__)))))
25from scripts.util.file_utils import read_json_file, write_json_file  # noqa: E402
26from scripts.util import build_utils  # noqa: E402
27
28
29def copy_dir(src: str, dest: str) -> list:
30    if not os.path.exists(src):
31        raise Exception("src dir '{}' doesn't exist.".format(src))
32    if not os.path.exists(dest):
33        os.makedirs(dest, exist_ok=True)
34    result_files = []
35    src_files = []
36    for root, _, files in os.walk(src):
37        for _file in files:
38            file_path = os.path.join(root, _file)
39            src_files.append(file_path)
40    for src_path in src_files:
41        if os.path.islink(src_path):
42            continue
43        file_relpath = os.path.relpath(src_path, src)
44        dest_path = os.path.join(dest, file_relpath)
45        dest_dir = os.path.dirname(dest_path)
46        if not os.path.exists(dest_dir):
47            os.makedirs(dest_dir, exist_ok=True)
48        shutil.copy2(src_path, dest_path)
49        result_files.append(src_path)
50        result_files.append(dest_path)
51    return result_files
52
53
54def _resources_with_xml_v1(root, testcase_target_name: str, test_resource_path: str,
55                           part_build_out_path: str, resource_output_path: str) -> list:
56    _out_resources_list = []
57    for target in root:
58        if target.attrib.get('name') != testcase_target_name:
59            continue
60        for _depend in target:
61            _findpath = _depend.attrib.get('findpath')
62            _resource_file = _depend.attrib.get('resource')
63            if _findpath == 'res':
64                _resource_src = os.path.join(test_resource_path,
65                                             _resource_file)
66                _res_dest = os.path.join(resource_output_path, _resource_file)
67            elif _findpath == 'out':
68                if not os.path.exists(_resource_file):
69                    __dir_name = _resource_file.split('/')[0]
70                    _resource_file_new = os.path.join(__dir_name,
71                                                      _resource_file)
72                    _resource_src_new = os.path.join(part_build_out_path,
73                                                     _resource_file_new)
74                    if os.path.exists(_resource_src_new):
75                        _resource_src = _resource_src_new
76                        _res_dest = os.path.join(resource_output_path,
77                                                 _resource_file)
78                    else:
79                        _resource_src = ''
80                        _res_dest = ''
81                else:
82                    _resource_src = os.path.join(part_build_out_path,
83                                                 _resource_file)
84                    _res_dest = os.path.join(resource_output_path,
85                                             _resource_file)
86            else:
87                raise Exception(
88                    "resource findpath type '{}' not support.".format(
89                        _findpath))
90            if _resource_src:
91                _out_resources_list.append({
92                    "src":
93                    os.path.relpath(_resource_src),
94                    "dest":
95                    os.path.relpath(_res_dest)
96                })
97    return _out_resources_list
98
99
100def _parse_res_value(value) -> str:
101    res_file = value.split('->')[0].strip()
102    return res_file
103
104
105def _resources_with_xml_v2(root, testcase_target_name: str, test_resource_path: str,
106                           part_build_out_path: str, resource_output_path: str) -> list:
107    _out_resources_list = []
108    for target in root:
109        if target.attrib.get('name') != testcase_target_name:
110            continue
111        for child in target:
112            if child.tag != 'preparer':
113                continue
114            for _option in child:
115                if _option.attrib.get('name') != 'push':
116                    continue
117                _src_type = _option.attrib.get('src')
118                _resource_file_val = _option.attrib.get('value')
119                _resource_file = _parse_res_value(_resource_file_val)
120                if _src_type == 'res':
121                    _resource_src = os.path.join(test_resource_path,
122                                                 _resource_file)
123                    _res_dest = os.path.join(resource_output_path,
124                                             _resource_file)
125                elif _src_type == 'out':
126                    _resource_src = os.path.join(part_build_out_path,
127                                                 _resource_file)
128                    _res_dest = os.path.join(resource_output_path,
129                                             _resource_file)
130                else:
131                    raise Exception(
132                        "resource src type '{}' not support.".format(
133                            _src_type))
134                if _resource_src:
135                    _out_resources_list.append({
136                        "src":
137                        os.path.relpath(_resource_src),
138                        "dest":
139                        os.path.relpath(_res_dest)
140                    })
141    return _out_resources_list
142
143
144def find_testcase_resources(resource_config_file: str, testcase_target_name: str,
145                            test_resource_path: str, part_build_out_path: str,
146                            resource_output_path: str) -> list:
147    if not os.path.exists(resource_config_file):
148        return []
149    tree = ET.parse(resource_config_file)
150    root = tree.getroot()
151    if root.attrib.get('ver') == '2.0':
152        _resources_list = _resources_with_xml_v2(root, testcase_target_name,
153                                                 test_resource_path,
154                                                 part_build_out_path,
155                                                 resource_output_path)
156    else:
157        _resources_list = _resources_with_xml_v1(root, testcase_target_name,
158                                                 test_resource_path,
159                                                 part_build_out_path,
160                                                 resource_output_path)
161    # copy ohos_test.xml
162    _resources_list.append({
163        "src":
164        resource_config_file,
165        "dest":
166        os.path.join(resource_output_path,
167                     os.path.basename(resource_config_file))
168    })
169    return _resources_list
170
171
172def copy_testcase_resources(resource_infos: list) -> list:
173    result_dest_list = []
174    for resource_info in resource_infos:
175        src_file = resource_info.get('src')
176        if not os.path.exists(src_file):
177            print("warning: testcase resource {} doesn't exist.".format(
178                src_file))
179            return result_dest_list
180        dest_file = resource_info.get('dest')
181        dest_dir = os.path.dirname(dest_file)
182        if os.path.isdir(src_file):
183            result_files = copy_dir(src_file, dest_file)
184            result_dest_list.extend(result_files)
185        else:
186            if not os.path.exists(dest_dir):
187                os.makedirs(dest_dir, exist_ok=True)
188            shutil.copy2(src_file, dest_file)
189            if src_file:
190                result_dest_list.append(src_file)
191                result_dest_list.append(dest_file)
192    return result_dest_list
193
194
195def _get_subsystem_name(part_name: str):
196    subsystem_parts_file = 'build_configs/parts_info/components.json'
197    subsystem_parts_info = read_json_file(subsystem_parts_file)
198    if subsystem_parts_info is None:
199        raise Exception("read file '{}' failed.".format(subsystem_parts_file))
200    for _part_name, p_dict in subsystem_parts_info.items():
201        if part_name == _part_name:
202            return p_dict.get("subsystem")
203    return None
204
205
206def _get_subsystem_path(part_name: str) -> str:
207    subsystem_name = _get_subsystem_name(part_name)
208    if subsystem_name is None:
209        return None
210    subsystem_build_config_file = os.path.join('../../build/subsystem_config.json')
211    config_info = read_json_file(subsystem_build_config_file)
212    if config_info is None:
213        raise Exception(
214            "read file '{}' failed.".format(subsystem_build_config_file))
215    info = config_info.get(subsystem_name)
216    if info is None:
217        raise Exception(
218            "subsystem '{}' info doesn't exist.".format(subsystem_name))
219    subsystem_paths = info.get('path')
220    return subsystem_paths
221
222
223def _parse_module_out_path(module_out_path: str):
224    split_re = module_out_path.split('/', 1)
225    part_name = split_re[0]
226    module_name = split_re[1]
227    return part_name, module_name
228
229
230def _find_resource_config_file(config_file_name: str, subsystem_path: str, module_name: str) -> str:
231    resource_config_file = os.path.join('../../', subsystem_path,
232                                        'test/resource', module_name,
233                                        config_file_name)
234    # compatibility
235    if not os.path.exists(resource_config_file):
236        module_dirs = module_name.split('/')
237        _dirs_num = len(module_dirs)
238        _dir_name = os.path.dirname(resource_config_file)
239        while _dirs_num > 1:
240            _dir_name = os.path.dirname(_dir_name)
241            resource_config_file = os.path.join(_dir_name, config_file_name)
242            if os.path.exists(resource_config_file):
243                break
244            _dirs_num -= 1
245    return resource_config_file
246
247
248def _get_res_config_file(module_out_path: str) -> str:
249    part_name, module_name = _parse_module_out_path(module_out_path)
250    subsystem_paths = _get_subsystem_path(part_name)
251    resource_config_files = []
252    if not subsystem_paths:
253        return resource_config_files
254    for _path in subsystem_paths:
255        resource_config_file = _find_resource_config_file(
256            'ohos_test.xml', _path, module_name)
257        if not os.path.exists(resource_config_file):
258            resource_config_file = _find_resource_config_file(
259                'harmony_test.xml', _path, module_name)
260        resource_config_files.append(resource_config_file)
261    return resource_config_files
262
263
264def _get_resources_list(resource_config_file: str, testcase_target_name: str,
265                        part_build_out_path: str, resource_output_path: str) -> list:
266    if not os.path.exists(resource_config_file):
267        raise Exception(
268            "testcase '{}' resource_config_file config incorrect.".format(
269                testcase_target_name))
270    test_resource_path = os.path.dirname(resource_config_file)
271    resources_list = find_testcase_resources(resource_config_file,
272                                             testcase_target_name,
273                                             test_resource_path,
274                                             part_build_out_path,
275                                             resource_output_path)
276    return resources_list
277
278
279def _get_resources_list_auto_match(module_out_path: str, testcase_target_name: str,
280                                   part_build_out_path: str, resource_output_path: str) -> list:
281    resource_config_files = _get_res_config_file(module_out_path)
282    all_resources_list = []
283    for resource_config_file in resource_config_files:
284        if resource_config_file is None or not os.path.exists(
285                resource_config_file):
286            continue
287        test_resource_path = os.path.dirname(resource_config_file)
288        resources_list = find_testcase_resources(resource_config_file,
289                                                 testcase_target_name,
290                                                 test_resource_path,
291                                                 part_build_out_path,
292                                                 resource_output_path)
293        all_resources_list.extend(resources_list)
294    return all_resources_list
295
296
297def main() -> int:
298    parser = argparse.ArgumentParser()
299    parser.add_argument('--resource-config-file', required=False)
300    parser.add_argument('--testcase-target-name', required=True)
301    parser.add_argument('--part-build-out-path', required=True)
302    parser.add_argument('--resource-output-path', required=True)
303    parser.add_argument('--module-out-path', required=False)
304    parser.add_argument('--output-file', required=True)
305    parser.add_argument('--depfile', required=False)
306    args = parser.parse_args()
307    if not args.resource_config_file:
308        if not args.module_out_path:
309            raise Exception('Missing parameter module_out_path.')
310        resources_list = _get_resources_list_auto_match(
311            args.module_out_path, args.testcase_target_name,
312            args.part_build_out_path, args.resource_output_path)
313    else:
314        resources_list = _get_resources_list(args.resource_config_file,
315                                             args.testcase_target_name,
316                                             args.part_build_out_path,
317                                             args.resource_output_path)
318    if not resources_list:
319        return 0
320    write_json_file(args.output_file, resources_list)
321    result_dest_list = copy_testcase_resources(resources_list)
322    if args.depfile and result_dest_list:
323        result_dest_list.sort()
324        build_utils.write_depfile(args.depfile,
325                                  args.output_file,
326                                  result_dest_list,
327                                  add_pydeps=False)
328    return 0
329
330
331if __name__ == '__main__':
332    sys.exit(main())
333