1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5Copyright (c) 2023 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18"""
19
20import os
21import re
22import shutil
23import subprocess
24import tempfile
25from concurrent.futures import ThreadPoolExecutor, as_completed
26import copy
27
28SYSTEM_CIL_HASH = "system.cil.sha256"
29PREBUILD_SEPOLICY_SYSTEM_CIL_HASH = "prebuild_sepolicy.system.cil.sha256"
30
31# list of all macros and te for sepolicy build
32SEPOLICY_TYPE_LIST = ["security_classes",
33                      "initial_sids",
34                      "access_vectors",
35                      "glb_perm_def.spt",
36                      "glb_never_def.spt",
37                      "mls",
38                      "policy_cap",
39                      "glb_te_def.spt",
40                      "attributes",
41                      ".te",
42                      "glb_roles.spt",
43                      "users",
44                      "initial_sid_contexts",
45                      "fs_use",
46                      "virtfs_contexts",
47                      ]
48
49POLICY_TYPE_LIST = ["allow", "auditallow", "dontaudit",
50                    "allowx", "auditallowx", "dontauditx",
51                    "neverallow", "neverallowx", ]
52
53
54class PolicyDirList(object):
55    def __init__(self, min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list):
56        self.min_policy_dir_list = min_policy_dir_list
57        self.system_policy_dir_list = system_policy_dir_list
58        self.vendor_policy_dir_list = vendor_policy_dir_list
59        self.public_policy_dir_list = public_policy_dir_list
60
61
62class PolicyFileList(object):
63    def __init__(self, min_policy_file_list, system_policy_file_list, vendor_policy_file_list, public_policy_file_list):
64        self.min_policy_file_list = min_policy_file_list
65        self.system_policy_file_list = system_policy_file_list
66        self.vendor_policy_file_list = vendor_policy_file_list
67        self.public_policy_file_list = public_policy_file_list
68
69
70def traverse_folder_in_dir_name(search_dir, folder_suffix):
71    folder_list = []
72    for root, dirs, _ in sorted(os.walk(search_dir)):
73        for dir_i in dirs:
74            if dir_i == folder_suffix:
75                folder_list.append(os.path.join(root, dir_i))
76    return folder_list
77
78
79def traverse_folder_in_type(search_dir, file_suffix, build_root):
80    policy_file_list = []
81    flag = 0
82    for root, _, files in sorted(os.walk(search_dir)):
83        for each_file in files:
84            if each_file.endswith(file_suffix):
85                path = os.path.join(root, each_file)
86                rel_path = os.path.relpath(path, build_root)
87                flag |= check_empty_row(rel_path)
88                policy_file_list.append(rel_path)
89    policy_file_list.sort()
90    return policy_file_list, flag
91
92
93def traverse_file_in_each_type(folder_list, sepolicy_type_list, build_root):
94    policy_files_list = []
95    err = 0
96    for policy_type in sepolicy_type_list:
97        for folder in folder_list:
98            type_file_list, flag = traverse_folder_in_type(
99                folder, policy_type, build_root)
100            err |= flag
101            if len(type_file_list) == 0:
102                continue
103            policy_files_list.extend(type_file_list)
104    if err:
105        raise Exception(err)
106    return policy_files_list
107
108
109def check_empty_row(policy_file):
110    """
111    Check whether the last line of te file is empty.
112    :param policy_file: te file
113    :return:
114    """
115    err = 0
116    with open(policy_file, 'r') as fp:
117        lines = fp.readlines()
118        if len(lines) == 0:
119            return 0
120        last_line = lines[-1]
121        if '\n' not in last_line:
122            print("".join([policy_file, " : need an empty line at end\n"]))
123            err = 1
124    return err
125
126
127def run_command(in_cmd):
128    cmdstr = " ".join(in_cmd)
129    ret = subprocess.run(cmdstr, shell=True).returncode
130    if ret != 0:
131        raise Exception(ret)
132
133
134def build_conf(args, output_conf, file_list, with_developer=False):
135    m4_args = ["-D", "build_with_debug=" + args.debug_version]
136    m4_args += ["-D", "build_with_updater=" + args.updater_version]
137    if with_developer:
138        m4_args += ["-D", "build_with_developer=enable"]
139    if getattr(args, "product_args", None):
140        m4_args += ["-D", args.product_args]
141    build_conf_cmd = ["m4", "-s", "--fatal-warnings"] + m4_args + file_list
142    with open(output_conf, 'w') as fd:
143        ret = subprocess.run(build_conf_cmd, shell=False, stdout=fd).returncode
144        if ret != 0:
145            raise Exception(ret)
146
147
148def build_cil(args, output_cil, input_conf):
149    check_policy_cmd = [os.path.join(args.tool_path, "checkpolicy"),
150                        input_conf,
151                        "-M -C -c 31",
152                        "-o " + output_cil]
153    run_command(check_policy_cmd)
154
155
156def add_version(version, string):
157    return "".join([string, "_", version])
158
159
160def simplify_string(string):
161    return string.replace('(', '').replace(')', '').replace('\n', '')
162
163
164def deal_with_roletype(version, cil_write, elem_list, type_set, file, line):
165    if len(elem_list) < 3:
166        print('Error: invalid roletype in %s:%d' % (file, line))
167        raise Exception(1)
168
169    sub_string = simplify_string(elem_list[2])
170    if sub_string in type_set:
171        cil_write.write('(typeattribute ' + add_version(version, sub_string) + ')\n')
172        elem_list[2] = elem_list[2].replace(
173            sub_string, add_version(version, sub_string))
174    cil_write.write(" ".join(elem_list))
175
176
177def deal_with_typeattribute(version, cil_write, elem_list, type_set, file, line):
178    if len(elem_list) < 2:
179        print('Error: invalid typeattribute in %s:%d' % (file, line))
180        raise Exception(1)
181
182    sub_string = simplify_string(elem_list[1])
183    if sub_string.startswith("base_typeattr_"):
184        elem_list[1] = elem_list[1].replace(
185            sub_string, add_version(version, sub_string))
186    cil_write.write(" ".join(elem_list))
187
188
189def deal_with_typeattributeset(version, cil_write, elem_list, type_set, file, line):
190    if len(elem_list) < 2:
191        print('Error: invalid typeattributeset in %s:%d' % (file, line))
192        raise Exception(1)
193
194    for index, elem in enumerate(elem_list[1:]):
195        sub_string = simplify_string(elem)
196        if sub_string.startswith("base_typeattr_") or sub_string in type_set:
197            elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string))
198    cil_write.write(" ".join(elem_list))
199
200
201def deal_with_policy(version, cil_write, elem_list, type_set, file, line):
202    if len(elem_list) < 4:
203        print('Error: invalid policy in %s:%d' % (file, line))
204        raise Exception(1)
205
206    for index, elem in enumerate(elem_list[1:3]):
207        sub_string = simplify_string(elem)
208        if sub_string.startswith("base_typeattr_") or sub_string in type_set:
209            elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string))
210    cil_write.write(" ".join(elem_list))
211
212
213def deal_with_type(version, cil_write, elem_list, file, line):
214    if len(elem_list) < 2:
215        print('Error: invalid type in %s:%d' % (file, line))
216        raise Exception(1)
217
218    sub_string = simplify_string(elem_list[1])
219    cil_write.write(" ".join(['(typeattributeset', add_version(version, sub_string), '(', sub_string, '))\n']))
220    cil_write.write(" ".join(['(expandtypeattribute', '(', add_version(version, sub_string), ') true)\n']))
221    cil_write.write(" ".join(['(typeattribute', add_version(version, sub_string), ')\n']))
222
223
224def build_version_cil(version, cil_file_input, cil_file_output, type_set):
225    index = 0
226    with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write:
227        for line in cil_read:
228            index += 1
229            if not line.startswith('('):
230                continue
231
232            elem_list = line.split(' ')
233            if not elem_list:
234                continue
235
236            if elem_list[0] == '(type':
237                cil_write.write(line)
238            elif elem_list[0] == '(roletype':
239                deal_with_roletype(version, cil_write, elem_list, type_set, cil_file_input, line)
240            elif elem_list[0] == '(typeattribute':
241                deal_with_typeattribute(version, cil_write, elem_list, type_set, cil_file_input, line)
242            elif elem_list[0] == '(typeattributeset':
243                deal_with_typeattributeset(version, cil_write, elem_list, type_set, cil_file_input, line)
244            elif simplify_string(elem_list[0]) in POLICY_TYPE_LIST:
245                deal_with_policy(version, cil_write, elem_list, type_set, cil_file_input, line)
246            else:
247                cil_write.write(line)
248
249
250def build_type_version_cil(version, cil_file_input, cil_file_output):
251    index = 0
252    with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write:
253        for line in cil_read:
254            index += 1
255            if not line.startswith('('):
256                continue
257
258            elem_list = line.split(' ')
259            if not elem_list:
260                continue
261
262            if elem_list[0] == '(type':
263                deal_with_type(version, cil_write, elem_list, line, index)
264
265
266def get_type_set(cil_file_input):
267    pattern_type = re.compile(r'^\(type (.*)\)$')
268    pattern_typeattribute = re.compile(r'^\(type_attribute (base_typeattr_[0-9]+)\)$')
269    type_set = set()
270    with open(cil_file_input, 'r') as cil_read:
271        for line in cil_read:
272            match_type = pattern_type.match(line)
273            match_typeattribute = pattern_typeattribute.match(line)
274            if match_type:
275                type_set.add(match_type.group(1))
276            elif match_typeattribute:
277                type_set.add(match_typeattribute.group(1))
278    return type_set
279
280
281def add_base_typeattr_prefix(cil_file, prefix):
282    with open(cil_file, 'r') as cil_read:
283        data = cil_read.read()
284        data = data.replace('base_typeattr_', '_'.join([prefix, 'base_typeattr_']))
285        with open(cil_file, 'w') as cil_write:
286            cil_write.write(data)
287
288
289def build_binary_policy(tool_path, output_policy, check_neverallow, cil_list):
290    build_policy_cmd = [os.path.join(tool_path, "secilc"),
291                        " ".join(cil_list),
292                        "-m -M true -G -c 31 -O",
293                        "-f /dev/null",
294                        "-o " + output_policy]
295    if not check_neverallow:
296        build_policy_cmd.append("-N")
297    run_command(build_policy_cmd)
298
299
300def prepare_build_path(dir_list, root_dir, build_dir_list, sepolicy_path):
301    build_policy_list = [os.path.join(sepolicy_path, "base"), os.path.join(sepolicy_path, "ohos_policy")]
302    build_policy_list += dir_list.split(":")
303
304    for i in build_policy_list:
305        if i == "" or i == "default":
306            continue
307        path = os.path.join(root_dir, i)
308        if (os.path.exists(path)):
309            build_dir_list.append(path)
310        else:
311            print("following path not exists!! {}".format(path))
312            exit(-1)
313
314
315def get_policy_dir_list(args):
316    sepolicy_path = os.path.join(args.source_root_dir, "base/security/selinux_adapter/sepolicy/")
317    dir_list = []
318    prepare_build_path(args.policy_dir_list, args.source_root_dir, dir_list, sepolicy_path)
319    min_policy_dir_list = [os.path.join(sepolicy_path, "min")]
320    system_policy = []
321    public_policy = []
322    vendor_policy = []
323
324    for item in dir_list:
325        public_policy += traverse_folder_in_dir_name(item, "public")
326        system_policy += traverse_folder_in_dir_name(item, "system")
327        vendor_policy += traverse_folder_in_dir_name(item, "vendor")
328
329    # list of all policy folders
330    system_policy_dir_list = public_policy + system_policy
331    vendor_policy_dir_list = public_policy + vendor_policy + min_policy_dir_list
332    public_policy_dir_list = public_policy + min_policy_dir_list
333
334    # add temp dirs base/te folders
335    system_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
336    vendor_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
337    public_policy_dir_list.append(os.path.join(sepolicy_path, "base/te"))
338
339    return PolicyDirList(min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list)
340
341
342def get_policy_file_list(args, dir_list_object):
343    build_root = os.path.abspath(os.path.join(args.tool_path, "../../.."))
344    # list of all policy files
345    system_policy_file_list = traverse_file_in_each_type(
346        dir_list_object.system_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
347    vendor_policy_file_list = traverse_file_in_each_type(
348        dir_list_object.vendor_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
349    public_policy_file_list = traverse_file_in_each_type(
350        dir_list_object.public_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
351    min_policy_file_list = traverse_file_in_each_type(
352        dir_list_object.min_policy_dir_list, SEPOLICY_TYPE_LIST, build_root)
353
354    return PolicyFileList(min_policy_file_list, system_policy_file_list, vendor_policy_file_list,
355                          public_policy_file_list)
356
357
358def filter_out(pattern_file, input_file):
359    with open(pattern_file, 'r', encoding='utf-8') as pat_file:
360        patterns = set(pat_file)
361        tmp_output = tempfile.NamedTemporaryFile()
362        with open(input_file, 'r', encoding='utf-8') as in_file:
363            tmp_output.writelines(line.encode(encoding='utf-8') for line in in_file.readlines()
364                                if line not in patterns)
365            tmp_output.write("\n".encode(encoding='utf-8'))
366            tmp_output.flush()
367        shutil.copyfile(tmp_output.name, input_file)
368
369
370def generate_hash_file(input_file_list, output_file):
371    build_policy_cmd = ["cat",
372                        " ".join(input_file_list),
373                        "| sha256sum",
374                        "| cut -d' ' -f1",
375                        ">",
376                        output_file]
377    run_command(build_policy_cmd)
378
379
380def generate_version_file(args, output_file):
381    cmd = ["echo", args.vendor_policy_version,
382           ">", output_file]
383    run_command(cmd)
384
385
386def generate_default_policy(args, policy, cil_list, with_developer=False):
387    if with_developer:
388        output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/")
389        if not os.path.exists(output_path):
390            os.makedirs(output_path)
391    else:
392        output_path = os.path.abspath(os.path.dirname(args.dst_file))
393    system_output_conf = os.path.join(output_path, "system.conf")
394    vendor_output_conf = os.path.join(output_path, "vendor.conf")
395    min_output_conf = os.path.join(output_path, "min.conf")
396
397    system_cil_path = os.path.join(output_path, "system.cil")
398    vendor_cil_path = os.path.join(output_path, "vendor.cil")
399    min_cil_path = os.path.join(output_path, "min.cil")
400
401    # build system.conf
402    build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer)
403    # build system.cil
404    build_cil(args, system_cil_path, system_output_conf)
405
406    # build vendor.conf
407    build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer)
408    # build vendor.cil
409    build_cil(args, vendor_cil_path, vendor_output_conf)
410    add_base_typeattr_prefix(vendor_cil_path, 'vendor')
411
412    # build min.conf
413    build_conf(args, min_output_conf, policy.min_policy_file_list)
414    # build min.cil
415    build_cil(args, min_cil_path, min_output_conf)
416
417    filter_out(min_cil_path, vendor_cil_path)
418
419    cil_list += [vendor_cil_path, system_cil_path]
420
421
422def generate_special_policy(args, policy, cil_list, with_developer=False):
423    if with_developer:
424        output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/")
425        if not os.path.exists(output_path):
426            os.makedirs(output_path)
427    else:
428        output_path = os.path.abspath(os.path.dirname(args.dst_file))
429
430    compatible_dir = os.path.join(output_path, "compatible/")
431    if not os.path.exists(compatible_dir):
432        os.makedirs(compatible_dir)
433
434    system_output_conf = os.path.join(output_path, "system.conf")
435    vendor_output_conf = os.path.join(output_path, "vendor.conf")
436    public_output_conf = os.path.join(output_path, "public.conf")
437    min_output_conf = os.path.join(output_path, "min.conf")
438
439    vendor_origin_cil_path = os.path.join(output_path, "vendor_origin.cil")
440    public_origin_cil_path = os.path.join(output_path, "public_origin.cil")
441    min_cil_path = os.path.join(output_path, "min.cil")
442
443    # output file
444    system_cil_path = os.path.join(output_path, "system.cil")
445    vendor_cil_path = os.path.join(output_path, "vendor.cil")
446    public_version_cil_path = os.path.join(output_path, "public.cil")
447    type_version_cil_path = os.path.join(output_path, "".join(["compatible/", args.vendor_policy_version, ".cil"]))
448
449    # build system.conf
450    build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer)
451    # build system.cil
452    build_cil(args, system_cil_path, system_output_conf)
453
454    # build min.cil
455    build_conf(args, min_output_conf, policy.min_policy_file_list)
456    build_cil(args, min_cil_path, min_output_conf)
457
458    # build public.cil
459    build_conf(args, public_output_conf, policy.public_policy_file_list, with_developer)
460    build_cil(args, public_origin_cil_path, public_output_conf)
461    type_set = get_type_set(public_origin_cil_path)
462    filter_out(min_cil_path, public_origin_cil_path)
463    build_version_cil(args.vendor_policy_version, public_origin_cil_path, public_version_cil_path, type_set)
464
465    # build vendor.cil
466    build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer)
467    build_cil(args, vendor_origin_cil_path, vendor_output_conf)
468    filter_out(min_cil_path, vendor_origin_cil_path)
469    build_version_cil(args.vendor_policy_version, vendor_origin_cil_path, vendor_cil_path, type_set)
470    filter_out(public_version_cil_path, vendor_cil_path)
471
472    build_type_version_cil(args.vendor_policy_version, public_origin_cil_path, type_version_cil_path)
473
474    if args.components == "system":
475        generate_hash_file([system_cil_path, type_version_cil_path],
476                           os.path.join(output_path, SYSTEM_CIL_HASH))
477
478    elif args.components == "vendor":
479        generate_hash_file([system_cil_path, type_version_cil_path], os.path.join(
480            output_path, PREBUILD_SEPOLICY_SYSTEM_CIL_HASH))
481
482    version_file = os.path.join(output_path, "version")
483    generate_version_file(args, version_file)
484
485    cil_list += [vendor_cil_path, system_cil_path, type_version_cil_path, public_version_cil_path]
486
487
488def compile_sepolicy(args):
489    dir_list_object = get_policy_dir_list(args)
490    file_list_object = get_policy_file_list(args, dir_list_object)
491    cil_list = []
492    developer_cil_list = []
493
494    if args.updater_version == "enable":
495        generate_default_policy(args, file_list_object, cil_list, False)
496        build_binary_policy(args.tool_path, args.dst_file, True, cil_list)
497        return
498
499    with ThreadPoolExecutor(max_workers=2) as executor:
500        if args.components == "default":
501            normal_thread = executor.submit(generate_default_policy, args, file_list_object, cil_list, False)
502            developer_thread = executor.submit(generate_default_policy, args, file_list_object,
503                                                developer_cil_list, True)
504        else:
505            normal_thread = executor.submit(generate_special_policy, args, file_list_object, cil_list, False)
506            developer_thread = executor.submit(generate_special_policy, args, file_list_object,
507                                                developer_cil_list, True)
508
509        for future in as_completed([normal_thread, developer_thread]):
510            try:
511                future.result()
512            except Exception as e:
513                raise e
514
515    with ThreadPoolExecutor(max_workers=2) as executor:
516        build_normal_thread = executor.submit(build_binary_policy, args.tool_path, args.dst_file, True, cil_list)
517        developer_policy_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/policy.31")
518        build_developer_thread = executor.submit(build_binary_policy, args.tool_path, developer_policy_path,
519                                                    True, developer_cil_list)
520
521        for future in as_completed([build_normal_thread, build_developer_thread]):
522            try:
523                future.result()
524            except Exception as e:
525                raise e
526
527
528def copy_user_policy(args):
529    if args.updater_version == "enable":
530        return
531
532    output_path = os.path.abspath(os.path.dirname(args.dst_file))
533    if args.debug_version == "enable":
534        src_dir = os.path.join(output_path, "pre_check/")
535    else:
536        src_dir = output_path
537
538    src_policy_path = os.path.join(src_dir, "developer/policy.31")
539    dest_policy_path = os.path.join(output_path, "developer/developer_policy")
540    shutil.copyfile(src_policy_path, dest_policy_path)
541
542    src_policy_path = os.path.join(src_dir, "policy.31")
543    dest_policy_path = os.path.join(output_path, "user_policy")
544    shutil.copyfile(src_policy_path, dest_policy_path)
545
546
547def pre_check(args):
548    output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "pre_check/")
549    if not os.path.exists(output_path):
550        os.makedirs(output_path)
551    args.dst_file = os.path.join(output_path, "policy.31")
552
553    if args.debug_version == "enable":
554        args.debug_version = "disable"
555    else:
556        args.debug_version = "enable"
557
558    compile_sepolicy(args)
559
560
561def main(args):
562    with ThreadPoolExecutor(max_workers=2) as executor:
563        pre_check_args = copy.deepcopy(args)
564        pre_check_thread = executor.submit(pre_check, pre_check_args)
565
566        compile_args = copy.deepcopy(args)
567        compile_thread = executor.submit(compile_sepolicy, compile_args)
568
569        for future in as_completed([pre_check_thread, compile_thread]):
570            try:
571                future.result()
572            except Exception as e:
573                raise e
574
575    copy_user_policy(args)
576