1#!/usr/bin/env python 2# coding: utf-8 3 4""" 5Copyright (c) 2023 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18""" 19 20import os 21import re 22import shutil 23import subprocess 24import tempfile 25from concurrent.futures import ThreadPoolExecutor, as_completed 26import copy 27 28SYSTEM_CIL_HASH = "system.cil.sha256" 29PREBUILD_SEPOLICY_SYSTEM_CIL_HASH = "prebuild_sepolicy.system.cil.sha256" 30 31# list of all macros and te for sepolicy build 32SEPOLICY_TYPE_LIST = ["security_classes", 33 "initial_sids", 34 "access_vectors", 35 "glb_perm_def.spt", 36 "glb_never_def.spt", 37 "mls", 38 "policy_cap", 39 "glb_te_def.spt", 40 "attributes", 41 ".te", 42 "glb_roles.spt", 43 "users", 44 "initial_sid_contexts", 45 "fs_use", 46 "virtfs_contexts", 47 ] 48 49POLICY_TYPE_LIST = ["allow", "auditallow", "dontaudit", 50 "allowx", "auditallowx", "dontauditx", 51 "neverallow", "neverallowx", ] 52 53 54class PolicyDirList(object): 55 def __init__(self, min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list): 56 self.min_policy_dir_list = min_policy_dir_list 57 self.system_policy_dir_list = system_policy_dir_list 58 self.vendor_policy_dir_list = vendor_policy_dir_list 59 self.public_policy_dir_list = public_policy_dir_list 60 61 62class PolicyFileList(object): 63 def __init__(self, min_policy_file_list, system_policy_file_list, vendor_policy_file_list, public_policy_file_list): 64 self.min_policy_file_list = min_policy_file_list 65 self.system_policy_file_list = system_policy_file_list 66 self.vendor_policy_file_list = vendor_policy_file_list 67 self.public_policy_file_list = public_policy_file_list 68 69 70def traverse_folder_in_dir_name(search_dir, folder_suffix): 71 folder_list = [] 72 for root, dirs, _ in sorted(os.walk(search_dir)): 73 for dir_i in dirs: 74 if dir_i == folder_suffix: 75 folder_list.append(os.path.join(root, dir_i)) 76 return folder_list 77 78 79def traverse_folder_in_type(search_dir, file_suffix, build_root): 80 policy_file_list = [] 81 flag = 0 82 for root, _, files in sorted(os.walk(search_dir)): 83 for each_file in files: 84 if each_file.endswith(file_suffix): 85 path = os.path.join(root, each_file) 86 rel_path = os.path.relpath(path, build_root) 87 flag |= check_empty_row(rel_path) 88 policy_file_list.append(rel_path) 89 policy_file_list.sort() 90 return policy_file_list, flag 91 92 93def traverse_file_in_each_type(folder_list, sepolicy_type_list, build_root): 94 policy_files_list = [] 95 err = 0 96 for policy_type in sepolicy_type_list: 97 for folder in folder_list: 98 type_file_list, flag = traverse_folder_in_type( 99 folder, policy_type, build_root) 100 err |= flag 101 if len(type_file_list) == 0: 102 continue 103 policy_files_list.extend(type_file_list) 104 if err: 105 raise Exception(err) 106 return policy_files_list 107 108 109def check_empty_row(policy_file): 110 """ 111 Check whether the last line of te file is empty. 112 :param policy_file: te file 113 :return: 114 """ 115 err = 0 116 with open(policy_file, 'r') as fp: 117 lines = fp.readlines() 118 if len(lines) == 0: 119 return 0 120 last_line = lines[-1] 121 if '\n' not in last_line: 122 print("".join([policy_file, " : need an empty line at end\n"])) 123 err = 1 124 return err 125 126 127def run_command(in_cmd): 128 cmdstr = " ".join(in_cmd) 129 ret = subprocess.run(cmdstr, shell=True).returncode 130 if ret != 0: 131 raise Exception(ret) 132 133 134def build_conf(args, output_conf, file_list, with_developer=False): 135 m4_args = ["-D", "build_with_debug=" + args.debug_version] 136 m4_args += ["-D", "build_with_updater=" + args.updater_version] 137 if with_developer: 138 m4_args += ["-D", "build_with_developer=enable"] 139 if getattr(args, "product_args", None): 140 m4_args += ["-D", args.product_args] 141 build_conf_cmd = ["m4", "-s", "--fatal-warnings"] + m4_args + file_list 142 with open(output_conf, 'w') as fd: 143 ret = subprocess.run(build_conf_cmd, shell=False, stdout=fd).returncode 144 if ret != 0: 145 raise Exception(ret) 146 147 148def build_cil(args, output_cil, input_conf): 149 check_policy_cmd = [os.path.join(args.tool_path, "checkpolicy"), 150 input_conf, 151 "-M -C -c 31", 152 "-o " + output_cil] 153 run_command(check_policy_cmd) 154 155 156def add_version(version, string): 157 return "".join([string, "_", version]) 158 159 160def simplify_string(string): 161 return string.replace('(', '').replace(')', '').replace('\n', '') 162 163 164def deal_with_roletype(version, cil_write, elem_list, type_set, file, line): 165 if len(elem_list) < 3: 166 print('Error: invalid roletype in %s:%d' % (file, line)) 167 raise Exception(1) 168 169 sub_string = simplify_string(elem_list[2]) 170 if sub_string in type_set: 171 cil_write.write('(typeattribute ' + add_version(version, sub_string) + ')\n') 172 elem_list[2] = elem_list[2].replace( 173 sub_string, add_version(version, sub_string)) 174 cil_write.write(" ".join(elem_list)) 175 176 177def deal_with_typeattribute(version, cil_write, elem_list, type_set, file, line): 178 if len(elem_list) < 2: 179 print('Error: invalid typeattribute in %s:%d' % (file, line)) 180 raise Exception(1) 181 182 sub_string = simplify_string(elem_list[1]) 183 if sub_string.startswith("base_typeattr_"): 184 elem_list[1] = elem_list[1].replace( 185 sub_string, add_version(version, sub_string)) 186 cil_write.write(" ".join(elem_list)) 187 188 189def deal_with_typeattributeset(version, cil_write, elem_list, type_set, file, line): 190 if len(elem_list) < 2: 191 print('Error: invalid typeattributeset in %s:%d' % (file, line)) 192 raise Exception(1) 193 194 for index, elem in enumerate(elem_list[1:]): 195 sub_string = simplify_string(elem) 196 if sub_string.startswith("base_typeattr_") or sub_string in type_set: 197 elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string)) 198 cil_write.write(" ".join(elem_list)) 199 200 201def deal_with_policy(version, cil_write, elem_list, type_set, file, line): 202 if len(elem_list) < 4: 203 print('Error: invalid policy in %s:%d' % (file, line)) 204 raise Exception(1) 205 206 for index, elem in enumerate(elem_list[1:3]): 207 sub_string = simplify_string(elem) 208 if sub_string.startswith("base_typeattr_") or sub_string in type_set: 209 elem_list[index + 1] = elem.replace(sub_string, add_version(version, sub_string)) 210 cil_write.write(" ".join(elem_list)) 211 212 213def deal_with_type(version, cil_write, elem_list, file, line): 214 if len(elem_list) < 2: 215 print('Error: invalid type in %s:%d' % (file, line)) 216 raise Exception(1) 217 218 sub_string = simplify_string(elem_list[1]) 219 cil_write.write(" ".join(['(typeattributeset', add_version(version, sub_string), '(', sub_string, '))\n'])) 220 cil_write.write(" ".join(['(expandtypeattribute', '(', add_version(version, sub_string), ') true)\n'])) 221 cil_write.write(" ".join(['(typeattribute', add_version(version, sub_string), ')\n'])) 222 223 224def build_version_cil(version, cil_file_input, cil_file_output, type_set): 225 index = 0 226 with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write: 227 for line in cil_read: 228 index += 1 229 if not line.startswith('('): 230 continue 231 232 elem_list = line.split(' ') 233 if not elem_list: 234 continue 235 236 if elem_list[0] == '(type': 237 cil_write.write(line) 238 elif elem_list[0] == '(roletype': 239 deal_with_roletype(version, cil_write, elem_list, type_set, cil_file_input, line) 240 elif elem_list[0] == '(typeattribute': 241 deal_with_typeattribute(version, cil_write, elem_list, type_set, cil_file_input, line) 242 elif elem_list[0] == '(typeattributeset': 243 deal_with_typeattributeset(version, cil_write, elem_list, type_set, cil_file_input, line) 244 elif simplify_string(elem_list[0]) in POLICY_TYPE_LIST: 245 deal_with_policy(version, cil_write, elem_list, type_set, cil_file_input, line) 246 else: 247 cil_write.write(line) 248 249 250def build_type_version_cil(version, cil_file_input, cil_file_output): 251 index = 0 252 with open(cil_file_input, 'r') as cil_read, open(cil_file_output, 'w') as cil_write: 253 for line in cil_read: 254 index += 1 255 if not line.startswith('('): 256 continue 257 258 elem_list = line.split(' ') 259 if not elem_list: 260 continue 261 262 if elem_list[0] == '(type': 263 deal_with_type(version, cil_write, elem_list, line, index) 264 265 266def get_type_set(cil_file_input): 267 pattern_type = re.compile(r'^\(type (.*)\)$') 268 pattern_typeattribute = re.compile(r'^\(type_attribute (base_typeattr_[0-9]+)\)$') 269 type_set = set() 270 with open(cil_file_input, 'r') as cil_read: 271 for line in cil_read: 272 match_type = pattern_type.match(line) 273 match_typeattribute = pattern_typeattribute.match(line) 274 if match_type: 275 type_set.add(match_type.group(1)) 276 elif match_typeattribute: 277 type_set.add(match_typeattribute.group(1)) 278 return type_set 279 280 281def add_base_typeattr_prefix(cil_file, prefix): 282 with open(cil_file, 'r') as cil_read: 283 data = cil_read.read() 284 data = data.replace('base_typeattr_', '_'.join([prefix, 'base_typeattr_'])) 285 with open(cil_file, 'w') as cil_write: 286 cil_write.write(data) 287 288 289def build_binary_policy(tool_path, output_policy, check_neverallow, cil_list): 290 build_policy_cmd = [os.path.join(tool_path, "secilc"), 291 " ".join(cil_list), 292 "-m -M true -G -c 31 -O", 293 "-f /dev/null", 294 "-o " + output_policy] 295 if not check_neverallow: 296 build_policy_cmd.append("-N") 297 run_command(build_policy_cmd) 298 299 300def prepare_build_path(dir_list, root_dir, build_dir_list, sepolicy_path): 301 build_policy_list = [os.path.join(sepolicy_path, "base"), os.path.join(sepolicy_path, "ohos_policy")] 302 build_policy_list += dir_list.split(":") 303 304 for i in build_policy_list: 305 if i == "" or i == "default": 306 continue 307 path = os.path.join(root_dir, i) 308 if (os.path.exists(path)): 309 build_dir_list.append(path) 310 else: 311 print("following path not exists!! {}".format(path)) 312 exit(-1) 313 314 315def get_policy_dir_list(args): 316 sepolicy_path = os.path.join(args.source_root_dir, "base/security/selinux_adapter/sepolicy/") 317 dir_list = [] 318 prepare_build_path(args.policy_dir_list, args.source_root_dir, dir_list, sepolicy_path) 319 min_policy_dir_list = [os.path.join(sepolicy_path, "min")] 320 system_policy = [] 321 public_policy = [] 322 vendor_policy = [] 323 324 for item in dir_list: 325 public_policy += traverse_folder_in_dir_name(item, "public") 326 system_policy += traverse_folder_in_dir_name(item, "system") 327 vendor_policy += traverse_folder_in_dir_name(item, "vendor") 328 329 # list of all policy folders 330 system_policy_dir_list = public_policy + system_policy 331 vendor_policy_dir_list = public_policy + vendor_policy + min_policy_dir_list 332 public_policy_dir_list = public_policy + min_policy_dir_list 333 334 # add temp dirs base/te folders 335 system_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 336 vendor_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 337 public_policy_dir_list.append(os.path.join(sepolicy_path, "base/te")) 338 339 return PolicyDirList(min_policy_dir_list, system_policy_dir_list, vendor_policy_dir_list, public_policy_dir_list) 340 341 342def get_policy_file_list(args, dir_list_object): 343 build_root = os.path.abspath(os.path.join(args.tool_path, "../../..")) 344 # list of all policy files 345 system_policy_file_list = traverse_file_in_each_type( 346 dir_list_object.system_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 347 vendor_policy_file_list = traverse_file_in_each_type( 348 dir_list_object.vendor_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 349 public_policy_file_list = traverse_file_in_each_type( 350 dir_list_object.public_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 351 min_policy_file_list = traverse_file_in_each_type( 352 dir_list_object.min_policy_dir_list, SEPOLICY_TYPE_LIST, build_root) 353 354 return PolicyFileList(min_policy_file_list, system_policy_file_list, vendor_policy_file_list, 355 public_policy_file_list) 356 357 358def filter_out(pattern_file, input_file): 359 with open(pattern_file, 'r', encoding='utf-8') as pat_file: 360 patterns = set(pat_file) 361 tmp_output = tempfile.NamedTemporaryFile() 362 with open(input_file, 'r', encoding='utf-8') as in_file: 363 tmp_output.writelines(line.encode(encoding='utf-8') for line in in_file.readlines() 364 if line not in patterns) 365 tmp_output.write("\n".encode(encoding='utf-8')) 366 tmp_output.flush() 367 shutil.copyfile(tmp_output.name, input_file) 368 369 370def generate_hash_file(input_file_list, output_file): 371 build_policy_cmd = ["cat", 372 " ".join(input_file_list), 373 "| sha256sum", 374 "| cut -d' ' -f1", 375 ">", 376 output_file] 377 run_command(build_policy_cmd) 378 379 380def generate_version_file(args, output_file): 381 cmd = ["echo", args.vendor_policy_version, 382 ">", output_file] 383 run_command(cmd) 384 385 386def generate_default_policy(args, policy, cil_list, with_developer=False): 387 if with_developer: 388 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/") 389 if not os.path.exists(output_path): 390 os.makedirs(output_path) 391 else: 392 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 393 system_output_conf = os.path.join(output_path, "system.conf") 394 vendor_output_conf = os.path.join(output_path, "vendor.conf") 395 min_output_conf = os.path.join(output_path, "min.conf") 396 397 system_cil_path = os.path.join(output_path, "system.cil") 398 vendor_cil_path = os.path.join(output_path, "vendor.cil") 399 min_cil_path = os.path.join(output_path, "min.cil") 400 401 # build system.conf 402 build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer) 403 # build system.cil 404 build_cil(args, system_cil_path, system_output_conf) 405 406 # build vendor.conf 407 build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer) 408 # build vendor.cil 409 build_cil(args, vendor_cil_path, vendor_output_conf) 410 add_base_typeattr_prefix(vendor_cil_path, 'vendor') 411 412 # build min.conf 413 build_conf(args, min_output_conf, policy.min_policy_file_list) 414 # build min.cil 415 build_cil(args, min_cil_path, min_output_conf) 416 417 filter_out(min_cil_path, vendor_cil_path) 418 419 cil_list += [vendor_cil_path, system_cil_path] 420 421 422def generate_special_policy(args, policy, cil_list, with_developer=False): 423 if with_developer: 424 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/") 425 if not os.path.exists(output_path): 426 os.makedirs(output_path) 427 else: 428 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 429 430 compatible_dir = os.path.join(output_path, "compatible/") 431 if not os.path.exists(compatible_dir): 432 os.makedirs(compatible_dir) 433 434 system_output_conf = os.path.join(output_path, "system.conf") 435 vendor_output_conf = os.path.join(output_path, "vendor.conf") 436 public_output_conf = os.path.join(output_path, "public.conf") 437 min_output_conf = os.path.join(output_path, "min.conf") 438 439 vendor_origin_cil_path = os.path.join(output_path, "vendor_origin.cil") 440 public_origin_cil_path = os.path.join(output_path, "public_origin.cil") 441 min_cil_path = os.path.join(output_path, "min.cil") 442 443 # output file 444 system_cil_path = os.path.join(output_path, "system.cil") 445 vendor_cil_path = os.path.join(output_path, "vendor.cil") 446 public_version_cil_path = os.path.join(output_path, "public.cil") 447 type_version_cil_path = os.path.join(output_path, "".join(["compatible/", args.vendor_policy_version, ".cil"])) 448 449 # build system.conf 450 build_conf(args, system_output_conf, policy.system_policy_file_list, with_developer) 451 # build system.cil 452 build_cil(args, system_cil_path, system_output_conf) 453 454 # build min.cil 455 build_conf(args, min_output_conf, policy.min_policy_file_list) 456 build_cil(args, min_cil_path, min_output_conf) 457 458 # build public.cil 459 build_conf(args, public_output_conf, policy.public_policy_file_list, with_developer) 460 build_cil(args, public_origin_cil_path, public_output_conf) 461 type_set = get_type_set(public_origin_cil_path) 462 filter_out(min_cil_path, public_origin_cil_path) 463 build_version_cil(args.vendor_policy_version, public_origin_cil_path, public_version_cil_path, type_set) 464 465 # build vendor.cil 466 build_conf(args, vendor_output_conf, policy.vendor_policy_file_list, with_developer) 467 build_cil(args, vendor_origin_cil_path, vendor_output_conf) 468 filter_out(min_cil_path, vendor_origin_cil_path) 469 build_version_cil(args.vendor_policy_version, vendor_origin_cil_path, vendor_cil_path, type_set) 470 filter_out(public_version_cil_path, vendor_cil_path) 471 472 build_type_version_cil(args.vendor_policy_version, public_origin_cil_path, type_version_cil_path) 473 474 if args.components == "system": 475 generate_hash_file([system_cil_path, type_version_cil_path], 476 os.path.join(output_path, SYSTEM_CIL_HASH)) 477 478 elif args.components == "vendor": 479 generate_hash_file([system_cil_path, type_version_cil_path], os.path.join( 480 output_path, PREBUILD_SEPOLICY_SYSTEM_CIL_HASH)) 481 482 version_file = os.path.join(output_path, "version") 483 generate_version_file(args, version_file) 484 485 cil_list += [vendor_cil_path, system_cil_path, type_version_cil_path, public_version_cil_path] 486 487 488def compile_sepolicy(args): 489 dir_list_object = get_policy_dir_list(args) 490 file_list_object = get_policy_file_list(args, dir_list_object) 491 cil_list = [] 492 developer_cil_list = [] 493 494 if args.updater_version == "enable": 495 generate_default_policy(args, file_list_object, cil_list, False) 496 build_binary_policy(args.tool_path, args.dst_file, True, cil_list) 497 return 498 499 with ThreadPoolExecutor(max_workers=2) as executor: 500 if args.components == "default": 501 normal_thread = executor.submit(generate_default_policy, args, file_list_object, cil_list, False) 502 developer_thread = executor.submit(generate_default_policy, args, file_list_object, 503 developer_cil_list, True) 504 else: 505 normal_thread = executor.submit(generate_special_policy, args, file_list_object, cil_list, False) 506 developer_thread = executor.submit(generate_special_policy, args, file_list_object, 507 developer_cil_list, True) 508 509 for future in as_completed([normal_thread, developer_thread]): 510 try: 511 future.result() 512 except Exception as e: 513 raise e 514 515 with ThreadPoolExecutor(max_workers=2) as executor: 516 build_normal_thread = executor.submit(build_binary_policy, args.tool_path, args.dst_file, True, cil_list) 517 developer_policy_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "developer/policy.31") 518 build_developer_thread = executor.submit(build_binary_policy, args.tool_path, developer_policy_path, 519 True, developer_cil_list) 520 521 for future in as_completed([build_normal_thread, build_developer_thread]): 522 try: 523 future.result() 524 except Exception as e: 525 raise e 526 527 528def copy_user_policy(args): 529 if args.updater_version == "enable": 530 return 531 532 output_path = os.path.abspath(os.path.dirname(args.dst_file)) 533 if args.debug_version == "enable": 534 src_dir = os.path.join(output_path, "pre_check/") 535 else: 536 src_dir = output_path 537 538 src_policy_path = os.path.join(src_dir, "developer/policy.31") 539 dest_policy_path = os.path.join(output_path, "developer/developer_policy") 540 shutil.copyfile(src_policy_path, dest_policy_path) 541 542 src_policy_path = os.path.join(src_dir, "policy.31") 543 dest_policy_path = os.path.join(output_path, "user_policy") 544 shutil.copyfile(src_policy_path, dest_policy_path) 545 546 547def pre_check(args): 548 output_path = os.path.join(os.path.abspath(os.path.dirname(args.dst_file)), "pre_check/") 549 if not os.path.exists(output_path): 550 os.makedirs(output_path) 551 args.dst_file = os.path.join(output_path, "policy.31") 552 553 if args.debug_version == "enable": 554 args.debug_version = "disable" 555 else: 556 args.debug_version = "enable" 557 558 compile_sepolicy(args) 559 560 561def main(args): 562 with ThreadPoolExecutor(max_workers=2) as executor: 563 pre_check_args = copy.deepcopy(args) 564 pre_check_thread = executor.submit(pre_check, pre_check_args) 565 566 compile_args = copy.deepcopy(args) 567 compile_thread = executor.submit(compile_sepolicy, compile_args) 568 569 for future in as_completed([pre_check_thread, compile_thread]): 570 try: 571 future.result() 572 except Exception as e: 573 raise e 574 575 copy_user_policy(args) 576