1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5Copyright (c) 2021-2022 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18"""
19
20import os
21import argparse
22import re
23import shutil
24import subprocess
25import sys
26import platform
27from collections import defaultdict
28
29
30def parse_args():
31    parser = argparse.ArgumentParser()
32    parser.add_argument(
33        '--dst-dir', help='the output dest path', required=True)
34    parser.add_argument('--tool-path',
35                        help='the sefcontext_compile bin path', required=True)
36    parser.add_argument('--policy-file',
37                        help='the policy.31 file', required=True)
38    parser.add_argument('--source-root-dir',
39                        help='prj root path', required=True)
40    parser.add_argument('--policy_dir_list',
41                        help='policy dirs need to be included', required=True)
42    parser.add_argument('--components',
43                        help='system or vendor or default', required=True)
44    return parser.parse_args()
45
46
47def run_command(in_cmd):
48    cmdstr = " ".join(in_cmd)
49    ret = subprocess.run(cmdstr, shell=True).returncode
50    if ret != 0:
51        raise Exception(ret)
52
53
54def traverse_folder_in_type(search_dir_list, file_suffix):
55    """
56    for special folder search_dir, find all files endwith file_suffix.
57    :param search_dir: path to search
58    :param file_suffix: postfix of file name
59    :return: file list
60    """
61    flag = 0
62    policy_file_list = []
63
64    for item in search_dir_list:
65        for root, _, files in sorted(os.walk(item)):
66            filtered_files = [f for f in files if f.endswith(file_suffix)]
67            for each_file in filtered_files:
68                file_list_path = os.path.join(root, each_file)
69                flag |= check_contexts_file(file_list_path)
70                policy_file_list.append(file_list_path)
71
72    if flag:
73        raise Exception(flag)
74    policy_file_list.sort()
75    return " ".join(str(x) for x in policy_file_list)
76
77
78def check_contexts_file(contexts_file):
79    """
80    Check the format of contexts file.
81    :param contexts_file: list of te file
82    :return:
83    """
84    err = 0
85    lines = []
86    with open(contexts_file, 'rb') as fp:
87        lines = fp.readlines()
88    if len(lines) == 0:
89        return 0
90    last_line = lines[-1]
91    if b'\n' not in last_line:
92        print("".join((contexts_file, " : need an empty line at end \n")))
93        err = 1
94    for line in lines:
95        if line.endswith(b'\r\n') or line.endswith(b'\r'):
96            print("".join((contexts_file, " : must be unix format\n")))
97            err = 1
98            break
99    return err
100
101
102def combine_contexts_file(file_contexts_list, combined_file_contexts):
103    cat_cmd = ["cat",
104               file_contexts_list,
105               ">", combined_file_contexts + "_tmp"]
106    run_command(cat_cmd)
107
108    grep_cmd = ["grep -v ^#",
109                combined_file_contexts + "_tmp",
110                "| grep -v ^$",
111                ">", combined_file_contexts]
112    run_command(grep_cmd)
113
114
115def check_redefinition(contexts_file):
116    type_hash = defaultdict(list)
117    err = 0
118    with open(contexts_file, 'r') as contexts_read:
119        pattern = re.compile(r'(\S+)\s+u:object_r:\S+:s0')
120        line_index = 0
121        for line in contexts_read:
122            line_ = line.lstrip()
123            line_index += 1
124            if line_.startswith('#') or line_.strip() == '':
125                continue
126            match = pattern.match(line_)
127            if match:
128                type_hash[match.group(1)].append(line_index)
129            else:
130                print(contexts_file + ":" +
131                      str(line_index) + " format check fail")
132                err = 1
133        contexts_read.close()
134    if err:
135        print("***********************************************************")
136        print("please check whether the format meets the following rules:")
137        print("[required format]: * u:object_r:*:s0")
138        print("***********************************************************")
139        raise Exception(err)
140    err = 0
141    for type_key in type_hash.keys():
142        if len(type_hash[type_key]) > 1:
143            err = 1
144            str_seq = (contexts_file, ":")
145            err_msg = "".join(str_seq)
146            for linenum in type_hash[type_key]:
147                str_seq = (err_msg, str(linenum), ":")
148                err_msg = "".join(str_seq)
149            str_seq = (err_msg, "'type ", str(type_key), " is redefinition'")
150            err_msg = "".join(str_seq)
151            print(err_msg)
152    if err:
153        raise Exception(err)
154
155
156def check_common_contexts(args, contexts_file):
157    """
158    check whether context used in contexts_file is defined in policy.31.
159    :param args:
160    :param contexts_file: path of contexts file
161    :return:
162    """
163    check_redefinition(contexts_file)
164
165    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
166                 "-o", contexts_file + ".bin",
167                 "-p", args.policy_file,
168                 contexts_file]
169    run_command(check_cmd)
170    if os.path.exists(contexts_file + ".bin"):
171        os.unlink(contexts_file + ".bin")
172
173
174def echo_error():
175    print("***********************************************************")
176    print("please check whether the format meets the following rules:")
177    print("[required format]: apl=* name=* domain=* type=*")
178    print("apl=*, apl should be one of system_core|system_basic|normal")
179    print("name=*, name is 'optional'")
180    print("domain=*, hapdomain selinux type")
181    print("type=*, hapdatafile selinux type")
182    print("***********************************************************")
183
184
185def sehap_process_line(line, line_index, contexts_write, domain, contexts_file):
186    line_ = line.lstrip()
187    if line_.startswith('#') or line_.strip() == '':
188        contexts_write.write(line)
189        return
190
191    pattern = re.compile(
192        r'apl=(system_core|system_basic|normal)\s+'
193        r'((name|debuggable)=\S+\s+)?'
194        r'(extra=\S+\s+)?'
195        r'domain=(\S+)\s+'
196        r'type=(\S+)\s*\n'
197    )
198    match = pattern.match(line_)
199    if match:
200        if domain:
201            line = match.group(1) + " u:r:" + match.group(5) + ":s0\n"
202        else:
203            line = match.group(1) + " u:object_r:" + match.group(6) + ":s0\n"
204        contexts_write.write(line)
205    else:
206        print(contexts_file + ":" + str(line_index) + " format check fail")
207        raise Exception(1)
208
209
210def check_sehap_contexts(args, contexts_file, domain):
211    """
212    check domain or type defined in sehap_contexts.
213    :param args:
214    :param contexts_file: path of contexts file
215    :param domain: true for domain, false for type
216    :return:
217    """
218    shutil.copyfile(contexts_file, contexts_file + "_bk")
219    try:
220        with open(contexts_file + "_bk", 'r') as contexts_read, open(contexts_file, 'w') as contexts_write:
221            line_index = 0
222            for line in contexts_read:
223                line_index += 1
224                sehap_process_line(line, line_index, contexts_write, domain, contexts_file)
225    except Exception as e:
226        shutil.move(contexts_file + "_bk", contexts_file)
227        echo_error()
228        raise e
229
230    check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
231                 "-o", contexts_file + ".bin",
232                 "-p", args.policy_file,
233                 contexts_file]
234    ret = subprocess.run(" ".join(check_cmd), shell=True).returncode
235
236    if ret != 0:
237        shutil.move(contexts_file + "_bk", contexts_file)
238        raise Exception(ret)
239
240    shutil.move(contexts_file + "_bk", contexts_file)
241
242    if os.path.exists(contexts_file + ".bin"):
243        os.unlink(contexts_file + ".bin")
244
245
246def build_file_contexts(args, output_path, policy_path, all_policy_path):
247    if args.components == "default":
248        all_combined_file_contexts = os.path.join(output_path, "file_contexts")
249    else:
250        all_combined_file_contexts = os.path.join(output_path, "all_file_contexts")
251        file_contexts_list = traverse_folder_in_type(policy_path, "file_contexts")
252        combined_file_contexts = os.path.join(output_path, "file_contexts")
253        combine_contexts_file(file_contexts_list, combined_file_contexts)
254
255    all_file_contexts_list = traverse_folder_in_type(
256        all_policy_path, "file_contexts")
257    combine_contexts_file(all_file_contexts_list, all_combined_file_contexts)
258
259    check_redefinition(all_combined_file_contexts)
260
261    build_bin_cmd = [os.path.join(args.tool_path, "sefcontext_compile"),
262                     "-o", os.path.join(args.dst_dir, "file_contexts.bin"),
263                     "-p", args.policy_file,
264                     all_combined_file_contexts]
265    run_command(build_bin_cmd)
266
267
268def build_common_contexts(args, output_path, contexts_file_name, policy_path, all_policy_path):
269    if args.components == "default":
270        all_combined_contexts = output_path + contexts_file_name
271    else:
272        all_combined_contexts = output_path + "all_" + contexts_file_name
273        contexts_list = traverse_folder_in_type(policy_path, contexts_file_name)
274        combined_contexts = output_path + contexts_file_name
275        combine_contexts_file(contexts_list, combined_contexts)
276
277    all_contexts_list = traverse_folder_in_type(all_policy_path, contexts_file_name)
278    combine_contexts_file(all_contexts_list, all_combined_contexts)
279    check_common_contexts(args, all_combined_contexts)
280
281
282def build_sehap_contexts(args, output_path, policy_path):
283    contexts_list = traverse_folder_in_type(
284        policy_path, "sehap_contexts")
285
286    combined_contexts = os.path.join(output_path, "sehap_contexts")
287    combine_contexts_file(contexts_list, combined_contexts)
288
289    check_sehap_contexts(args, combined_contexts, 1)
290    check_sehap_contexts(args, combined_contexts, 0)
291
292
293def prepare_build_path(dir_list, root_dir, build_dir_list):
294    build_contexts_list = \
295        ["base/security/selinux_adapter/sepolicy/base", "base/security/selinux_adapter/sepolicy/ohos_policy"]
296    build_contexts_list += dir_list.split(":")
297
298    for i in build_contexts_list:
299        if i == "" or i == "default":
300            continue
301        path = os.path.join(root_dir, i)
302        if (os.path.exists(path)):
303            build_dir_list.append(path)
304        else:
305            print("following path not exists!! {}".format(path))
306            exit(-1)
307
308
309def traverse_folder_in_dir_name(search_dir, folder_suffix):
310    folder_list = []
311    for root, dirs, _ in sorted(os.walk(search_dir)):
312        for dir_i in dirs:
313            if dir_i == folder_suffix:
314                folder_list.append(os.path.join(root, dir_i))
315    return folder_list
316
317
318def main(args):
319    if sys.platform == "linux" and platform.machine().lower() == "aarch64":
320        libpcre2_path = os.path.realpath("./clang_arm64/thirdparty/pcre2/")
321    else:
322        libpcre2_path = os.path.realpath("./clang_x64/thirdparty/pcre2/")
323    os.environ['LD_LIBRARY_PATH'] = libpcre2_path
324    output_path = args.dst_dir
325    policy_path = []
326    prepare_build_path(args.policy_dir_list, args.source_root_dir, policy_path)
327
328    public_policy = []
329    system_policy = []
330    vendor_policy = []
331
332    for item in policy_path:
333        public_policy += traverse_folder_in_dir_name(item, "public")
334        system_policy += traverse_folder_in_dir_name(item, "system")
335        vendor_policy += traverse_folder_in_dir_name(item, "vendor")
336
337    system_folder_list = public_policy + system_policy
338    vendor_folder_list = public_policy + vendor_policy
339    all_folder_list = public_policy + system_policy + vendor_policy
340
341    folder_list = []
342
343    if args.components == "system":
344        folder_list = system_folder_list
345    elif args.components == "vendor":
346        folder_list = vendor_folder_list
347    else:
348        folder_list = all_folder_list
349
350    build_file_contexts(args, output_path, folder_list, all_folder_list)
351    build_common_contexts(args, output_path, "service_contexts", folder_list, all_folder_list)
352    build_common_contexts(args, output_path, "hdf_service_contexts", folder_list, all_folder_list)
353    build_common_contexts(args, output_path, "parameter_contexts", folder_list, all_folder_list)
354    build_sehap_contexts(args, output_path, all_folder_list)
355
356
357if __name__ == "__main__":
358    input_args = parse_args()
359    main(input_args)
360