1#!/usr/bin/env python 2# coding: utf-8 3 4""" 5Copyright (c) 2021-2022 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18""" 19 20import os 21import argparse 22import re 23import shutil 24import subprocess 25import sys 26import platform 27from collections import defaultdict 28 29 30def parse_args(): 31 parser = argparse.ArgumentParser() 32 parser.add_argument( 33 '--dst-dir', help='the output dest path', required=True) 34 parser.add_argument('--tool-path', 35 help='the sefcontext_compile bin path', required=True) 36 parser.add_argument('--policy-file', 37 help='the policy.31 file', required=True) 38 parser.add_argument('--source-root-dir', 39 help='prj root path', required=True) 40 parser.add_argument('--policy_dir_list', 41 help='policy dirs need to be included', required=True) 42 parser.add_argument('--components', 43 help='system or vendor or default', required=True) 44 return parser.parse_args() 45 46 47def run_command(in_cmd): 48 cmdstr = " ".join(in_cmd) 49 ret = subprocess.run(cmdstr, shell=True).returncode 50 if ret != 0: 51 raise Exception(ret) 52 53 54def traverse_folder_in_type(search_dir_list, file_suffix): 55 """ 56 for special folder search_dir, find all files endwith file_suffix. 57 :param search_dir: path to search 58 :param file_suffix: postfix of file name 59 :return: file list 60 """ 61 flag = 0 62 policy_file_list = [] 63 64 for item in search_dir_list: 65 for root, _, files in sorted(os.walk(item)): 66 filtered_files = [f for f in files if f.endswith(file_suffix)] 67 for each_file in filtered_files: 68 file_list_path = os.path.join(root, each_file) 69 flag |= check_contexts_file(file_list_path) 70 policy_file_list.append(file_list_path) 71 72 if flag: 73 raise Exception(flag) 74 policy_file_list.sort() 75 return " ".join(str(x) for x in policy_file_list) 76 77 78def check_contexts_file(contexts_file): 79 """ 80 Check the format of contexts file. 81 :param contexts_file: list of te file 82 :return: 83 """ 84 err = 0 85 lines = [] 86 with open(contexts_file, 'rb') as fp: 87 lines = fp.readlines() 88 if len(lines) == 0: 89 return 0 90 last_line = lines[-1] 91 if b'\n' not in last_line: 92 print("".join((contexts_file, " : need an empty line at end \n"))) 93 err = 1 94 for line in lines: 95 if line.endswith(b'\r\n') or line.endswith(b'\r'): 96 print("".join((contexts_file, " : must be unix format\n"))) 97 err = 1 98 break 99 return err 100 101 102def combine_contexts_file(file_contexts_list, combined_file_contexts): 103 cat_cmd = ["cat", 104 file_contexts_list, 105 ">", combined_file_contexts + "_tmp"] 106 run_command(cat_cmd) 107 108 grep_cmd = ["grep -v ^#", 109 combined_file_contexts + "_tmp", 110 "| grep -v ^$", 111 ">", combined_file_contexts] 112 run_command(grep_cmd) 113 114 115def check_redefinition(contexts_file): 116 type_hash = defaultdict(list) 117 err = 0 118 with open(contexts_file, 'r') as contexts_read: 119 pattern = re.compile(r'(\S+)\s+u:object_r:\S+:s0') 120 line_index = 0 121 for line in contexts_read: 122 line_ = line.lstrip() 123 line_index += 1 124 if line_.startswith('#') or line_.strip() == '': 125 continue 126 match = pattern.match(line_) 127 if match: 128 type_hash[match.group(1)].append(line_index) 129 else: 130 print(contexts_file + ":" + 131 str(line_index) + " format check fail") 132 err = 1 133 contexts_read.close() 134 if err: 135 print("***********************************************************") 136 print("please check whether the format meets the following rules:") 137 print("[required format]: * u:object_r:*:s0") 138 print("***********************************************************") 139 raise Exception(err) 140 err = 0 141 for type_key in type_hash.keys(): 142 if len(type_hash[type_key]) > 1: 143 err = 1 144 str_seq = (contexts_file, ":") 145 err_msg = "".join(str_seq) 146 for linenum in type_hash[type_key]: 147 str_seq = (err_msg, str(linenum), ":") 148 err_msg = "".join(str_seq) 149 str_seq = (err_msg, "'type ", str(type_key), " is redefinition'") 150 err_msg = "".join(str_seq) 151 print(err_msg) 152 if err: 153 raise Exception(err) 154 155 156def check_common_contexts(args, contexts_file): 157 """ 158 check whether context used in contexts_file is defined in policy.31. 159 :param args: 160 :param contexts_file: path of contexts file 161 :return: 162 """ 163 check_redefinition(contexts_file) 164 165 check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 166 "-o", contexts_file + ".bin", 167 "-p", args.policy_file, 168 contexts_file] 169 run_command(check_cmd) 170 if os.path.exists(contexts_file + ".bin"): 171 os.unlink(contexts_file + ".bin") 172 173 174def echo_error(): 175 print("***********************************************************") 176 print("please check whether the format meets the following rules:") 177 print("[required format]: apl=* name=* domain=* type=*") 178 print("apl=*, apl should be one of system_core|system_basic|normal") 179 print("name=*, name is 'optional'") 180 print("domain=*, hapdomain selinux type") 181 print("type=*, hapdatafile selinux type") 182 print("***********************************************************") 183 184 185def sehap_process_line(line, line_index, contexts_write, domain, contexts_file): 186 line_ = line.lstrip() 187 if line_.startswith('#') or line_.strip() == '': 188 contexts_write.write(line) 189 return 190 191 pattern = re.compile( 192 r'apl=(system_core|system_basic|normal)\s+' 193 r'((name|debuggable)=\S+\s+)?' 194 r'(extra=\S+\s+)?' 195 r'domain=(\S+)\s+' 196 r'type=(\S+)\s*\n' 197 ) 198 match = pattern.match(line_) 199 if match: 200 if domain: 201 line = match.group(1) + " u:r:" + match.group(5) + ":s0\n" 202 else: 203 line = match.group(1) + " u:object_r:" + match.group(6) + ":s0\n" 204 contexts_write.write(line) 205 else: 206 print(contexts_file + ":" + str(line_index) + " format check fail") 207 raise Exception(1) 208 209 210def check_sehap_contexts(args, contexts_file, domain): 211 """ 212 check domain or type defined in sehap_contexts. 213 :param args: 214 :param contexts_file: path of contexts file 215 :param domain: true for domain, false for type 216 :return: 217 """ 218 shutil.copyfile(contexts_file, contexts_file + "_bk") 219 try: 220 with open(contexts_file + "_bk", 'r') as contexts_read, open(contexts_file, 'w') as contexts_write: 221 line_index = 0 222 for line in contexts_read: 223 line_index += 1 224 sehap_process_line(line, line_index, contexts_write, domain, contexts_file) 225 except Exception as e: 226 shutil.move(contexts_file + "_bk", contexts_file) 227 echo_error() 228 raise e 229 230 check_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 231 "-o", contexts_file + ".bin", 232 "-p", args.policy_file, 233 contexts_file] 234 ret = subprocess.run(" ".join(check_cmd), shell=True).returncode 235 236 if ret != 0: 237 shutil.move(contexts_file + "_bk", contexts_file) 238 raise Exception(ret) 239 240 shutil.move(contexts_file + "_bk", contexts_file) 241 242 if os.path.exists(contexts_file + ".bin"): 243 os.unlink(contexts_file + ".bin") 244 245 246def build_file_contexts(args, output_path, policy_path, all_policy_path): 247 if args.components == "default": 248 all_combined_file_contexts = os.path.join(output_path, "file_contexts") 249 else: 250 all_combined_file_contexts = os.path.join(output_path, "all_file_contexts") 251 file_contexts_list = traverse_folder_in_type(policy_path, "file_contexts") 252 combined_file_contexts = os.path.join(output_path, "file_contexts") 253 combine_contexts_file(file_contexts_list, combined_file_contexts) 254 255 all_file_contexts_list = traverse_folder_in_type( 256 all_policy_path, "file_contexts") 257 combine_contexts_file(all_file_contexts_list, all_combined_file_contexts) 258 259 check_redefinition(all_combined_file_contexts) 260 261 build_bin_cmd = [os.path.join(args.tool_path, "sefcontext_compile"), 262 "-o", os.path.join(args.dst_dir, "file_contexts.bin"), 263 "-p", args.policy_file, 264 all_combined_file_contexts] 265 run_command(build_bin_cmd) 266 267 268def build_common_contexts(args, output_path, contexts_file_name, policy_path, all_policy_path): 269 if args.components == "default": 270 all_combined_contexts = output_path + contexts_file_name 271 else: 272 all_combined_contexts = output_path + "all_" + contexts_file_name 273 contexts_list = traverse_folder_in_type(policy_path, contexts_file_name) 274 combined_contexts = output_path + contexts_file_name 275 combine_contexts_file(contexts_list, combined_contexts) 276 277 all_contexts_list = traverse_folder_in_type(all_policy_path, contexts_file_name) 278 combine_contexts_file(all_contexts_list, all_combined_contexts) 279 check_common_contexts(args, all_combined_contexts) 280 281 282def build_sehap_contexts(args, output_path, policy_path): 283 contexts_list = traverse_folder_in_type( 284 policy_path, "sehap_contexts") 285 286 combined_contexts = os.path.join(output_path, "sehap_contexts") 287 combine_contexts_file(contexts_list, combined_contexts) 288 289 check_sehap_contexts(args, combined_contexts, 1) 290 check_sehap_contexts(args, combined_contexts, 0) 291 292 293def prepare_build_path(dir_list, root_dir, build_dir_list): 294 build_contexts_list = \ 295 ["base/security/selinux_adapter/sepolicy/base", "base/security/selinux_adapter/sepolicy/ohos_policy"] 296 build_contexts_list += dir_list.split(":") 297 298 for i in build_contexts_list: 299 if i == "" or i == "default": 300 continue 301 path = os.path.join(root_dir, i) 302 if (os.path.exists(path)): 303 build_dir_list.append(path) 304 else: 305 print("following path not exists!! {}".format(path)) 306 exit(-1) 307 308 309def traverse_folder_in_dir_name(search_dir, folder_suffix): 310 folder_list = [] 311 for root, dirs, _ in sorted(os.walk(search_dir)): 312 for dir_i in dirs: 313 if dir_i == folder_suffix: 314 folder_list.append(os.path.join(root, dir_i)) 315 return folder_list 316 317 318def main(args): 319 if sys.platform == "linux" and platform.machine().lower() == "aarch64": 320 libpcre2_path = os.path.realpath("./clang_arm64/thirdparty/pcre2/") 321 else: 322 libpcre2_path = os.path.realpath("./clang_x64/thirdparty/pcre2/") 323 os.environ['LD_LIBRARY_PATH'] = libpcre2_path 324 output_path = args.dst_dir 325 policy_path = [] 326 prepare_build_path(args.policy_dir_list, args.source_root_dir, policy_path) 327 328 public_policy = [] 329 system_policy = [] 330 vendor_policy = [] 331 332 for item in policy_path: 333 public_policy += traverse_folder_in_dir_name(item, "public") 334 system_policy += traverse_folder_in_dir_name(item, "system") 335 vendor_policy += traverse_folder_in_dir_name(item, "vendor") 336 337 system_folder_list = public_policy + system_policy 338 vendor_folder_list = public_policy + vendor_policy 339 all_folder_list = public_policy + system_policy + vendor_policy 340 341 folder_list = [] 342 343 if args.components == "system": 344 folder_list = system_folder_list 345 elif args.components == "vendor": 346 folder_list = vendor_folder_list 347 else: 348 folder_list = all_folder_list 349 350 build_file_contexts(args, output_path, folder_list, all_folder_list) 351 build_common_contexts(args, output_path, "service_contexts", folder_list, all_folder_list) 352 build_common_contexts(args, output_path, "hdf_service_contexts", folder_list, all_folder_list) 353 build_common_contexts(args, output_path, "parameter_contexts", folder_list, all_folder_list) 354 build_sehap_contexts(args, output_path, all_folder_list) 355 356 357if __name__ == "__main__": 358 input_args = parse_args() 359 main(input_args) 360