1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 4 # Copyright (c) 2021 Huawei Device Co., Ltd. 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 import binascii 17 import copy 18 import os 19 import subprocess 20 import tempfile 21 import time 22 import collections as collect 23 import enum 24 import ctypes 25 import zipfile 26 27 from log_exception import UPDATE_LOGGER 28 from script_generator import create_script 29 from utils import sign_package 30 from utils import HASH_CONTENT_LEN_DICT 31 from utils import OPTIONS_MANAGER 32 from utils import REGISTER_SCRIPT_FILE_NAME 33 from utils import ON_SERVER 34 from utils import SCRIPT_KEY_LIST 35 from utils import EXTEND_OPTIONAL_COMPONENT_LIST 36 from utils import COMPONENT_INFO_INNIT 37 from utils import UPDATE_EXE_FILE_NAME 38 from utils import TOTAL_SCRIPT_FILE_NAME 39 from utils import EXTEND_PATH_EVENT 40 from utils import LINUX_HASH_ALGORITHM_DICT 41 from utils import UPDATE_BIN_FILE_NAME 42 from utils import BUILD_TOOLS_FILE_NAME 43 from utils import SIGN_PACKAGE_EVENT 44 from utils import CHECK_BINARY_EVENT 45 from utils import ZIP_EVENT 46 from utils import GENERATE_SIGNED_DATA_EVENT 47 from utils import DECOUPLED_EVENT 48 from utils import get_extend_path_list 49 from create_update_package import CreatePackage 50 from create_update_package import SIGN_ALGO_RSA 51 from create_update_package import SIGN_ALGO_PSS 52 from create_signed_data import generate_signed_data_default 53 54 IS_DEL = 0 55 SIGNING_LENGTH_256 = 256 56 DIGEST_LEN = 32 57 HASH_VALUE_MAX_LEN = 128 58 59 60 class SignMethod(enum.Enum): 61 RSA = 1 62 ECC = 2 63 64 65 class PkgHeader(ctypes.Structure): 66 _fields_ = [("digest_method", ctypes.c_ubyte), 67 ("sign_method", ctypes.c_ubyte), 68 ("pkg_type", ctypes.c_ubyte), 69 ("pkg_flags", ctypes.c_ubyte), 70 ("entry_count", ctypes.c_int), 71 ("update_file_version", ctypes.c_int), 72 ("product_update_id", ctypes.c_char_p), 73 ("software_version", ctypes.c_char_p), 74 ("date", ctypes.c_char_p), 75 ("time", ctypes.c_char_p), 76 ("describe_package_id", ctypes.c_char_p)] 77 78 79 class PkgComponent(ctypes.Structure): 80 _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN), 81 ("file_path", ctypes.c_char_p), 82 ("component_addr", ctypes.c_char_p), 83 ("version", ctypes.c_char_p), 84 ("size", ctypes.c_int), 85 ("id", ctypes.c_int), 86 ("original_size", ctypes.c_int), 87 ("res_type", ctypes.c_ubyte), 88 ("type", ctypes.c_ubyte), 89 ("flags", ctypes.c_ubyte)] 90 91 92 class SignInfo(ctypes.Structure): 93 _fields_ = [("sign_offset", ctypes.c_int), 94 ("hash_len", ctypes.c_int), 95 ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))] 96 97 98 def create_update_bin(): 99 """ 100 Call the interface to generate the update.bin file. 101 :return update_bin_obj: Update file object. 102 If exception occurs, return False. 103 """ 104 update_bin_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="update_bin-") 105 106 head_value_list = OPTIONS_MANAGER.head_info_list 107 component_dict = OPTIONS_MANAGER.component_info_dict 108 full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list 109 full_img_list = OPTIONS_MANAGER.full_img_list 110 111 extend_component_list = get_extend_path_list() 112 if not OPTIONS_MANAGER.not_l2: 113 if OPTIONS_MANAGER.partition_file_obj is not None: 114 all_image_name = \ 115 extend_component_list + EXTEND_OPTIONAL_COMPONENT_LIST + full_img_list 116 else: 117 all_image_name = extend_component_list + full_img_list 118 else: 119 all_image_name = full_img_list 120 sort_component_dict = collect.OrderedDict() 121 for each_image_name in all_image_name: 122 sort_component_dict[each_image_name] = component_dict.get(each_image_name) 123 component_dict = copy.deepcopy(sort_component_dict) 124 head_list = get_head_list(len(component_dict), head_value_list) 125 126 component_list = get_component_list( 127 full_image_file_obj_list, component_dict) 128 129 save_patch = update_bin_obj.name.encode("utf-8") 130 if OPTIONS_MANAGER.private_key == ON_SERVER: 131 private_key = "./update_package.py" 132 else: 133 private_key = OPTIONS_MANAGER.private_key.encode("utf-8") 134 135 if OPTIONS_MANAGER.not_l2: 136 sign_algo = SIGN_ALGO_PSS 137 else: 138 sign_algo = SIGN_ALGO_RSA 139 140 # create bin 141 package = CreatePackage(head_list, component_list, save_patch, OPTIONS_MANAGER.private_key) 142 if not package.create_package(): 143 UPDATE_LOGGER.print_log("Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG) 144 return False 145 146 UPDATE_LOGGER.print_log("Create update package .bin complete! path: %s" % update_bin_obj.name) 147 return update_bin_obj 148 149 150 def get_component_list(all_image_file_obj_list, component_dict): 151 """ 152 Get the list of component information according to 153 the component information structure. 154 :param all_image_file_obj_list: all image object file list 155 :param component_dict: Component information content dict 156 :return component_list: List of component information. 157 If exception occurs, return False. 158 """ 159 pkg_components = PkgComponent * len(component_dict) 160 component_list = pkg_components() 161 extend_list = get_extend_path_list() 162 if not OPTIONS_MANAGER.not_l2: 163 if OPTIONS_MANAGER.partition_file_obj is not None: 164 extend_component_list = extend_list + EXTEND_OPTIONAL_COMPONENT_LIST 165 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 166 OPTIONS_MANAGER.board_list_file_path, 167 OPTIONS_MANAGER.partition_file_obj.name] 168 else: 169 extend_component_list = extend_list 170 extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, 171 OPTIONS_MANAGER.board_list_file_path] 172 else: 173 extend_component_list = [] 174 extend_path_list = [] 175 get_path_list = OPTIONS_MANAGER.init.invoke_event(EXTEND_PATH_EVENT) 176 if get_path_list: 177 extend_path_list = get_path_list() 178 idx = 0 179 for key, component in component_dict.items(): 180 if idx < len(extend_component_list): 181 file_path = extend_path_list[idx] 182 else: 183 file_path = all_image_file_obj_list[idx - len(extend_component_list)].name 184 digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm) 185 if not digest: 186 return 187 if component is None: 188 component = copy.copy(COMPONENT_INFO_INNIT) 189 component[0] = key 190 component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy( 191 binascii.a2b_hex(digest.encode('utf-8'))) 192 component_list[idx].file_path = file_path.encode("utf-8") 193 if not OPTIONS_MANAGER.not_l2: 194 component_list[idx].component_addr = ('/%s' % component[0]).encode("utf-8") 195 else: 196 component_list[idx].component_addr = ('%s' % component[0]).encode("utf-8") 197 component_list[idx].version = component[4].encode("utf-8") 198 component_list[idx].size = os.path.getsize(file_path) 199 component_list[idx].id = int(component[1]) 200 if component[3] == 1: 201 component_list[idx].original_size = os.path.getsize(file_path) 202 else: 203 component_list[idx].original_size = 0 204 component_list[idx].res_type = int(component[2]) 205 component_list[idx].type = int(component[3]) 206 component_list[idx].flags = IS_DEL 207 208 idx += 1 209 return component_list 210 211 212 def get_head_list(component_count, head_value_list): 213 """ 214 According to the header structure, get the list of HEAD headers. 215 :param component_count: number of components 216 :param head_value_list: list of header values 217 :return head_list: header list 218 """ 219 head_list = PkgHeader() 220 if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256: 221 # PKG_DIGEST_TYPE_SHA384 3,use sha384 222 head_list.digest_method = 3 223 else: 224 # PKG_DIGEST_TYPE_SHA256 2,use sha256 225 head_list.digest_method = 2 226 if OPTIONS_MANAGER.private_key == ON_SERVER: 227 head_list.sign_method = 0 228 else: 229 if OPTIONS_MANAGER.signing_algorithm == "ECC": 230 # signing algorithm use ECC 231 head_list.sign_method = SignMethod.ECC.value 232 else: 233 # signing algorithm use RSA 234 head_list.sign_method = SignMethod.RSA.value 235 head_list.pkg_type = 1 236 if OPTIONS_MANAGER.not_l2: 237 head_list.pkg_flags = 1 238 else: 239 head_list.pkg_flags = 0 240 head_list.entry_count = component_count 241 head_list.update_file_version = int(head_value_list[0]) 242 head_list.product_update_id = head_value_list[1].encode("utf-8") 243 head_list.software_version = head_value_list[2].encode("utf-8") 244 head_list.date = head_value_list[3].encode("utf-8") 245 head_list.time = head_value_list[4].encode("utf-8") 246 head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode()) 247 return head_list 248 249 250 def get_tools_component_list(count, opera_script_dict): 251 """ 252 Get the list of component information according to 253 the component information structure. 254 :param count: number of components 255 :param opera_script_dict: script file name and path dict 256 :return component_list: list of component information. 257 If exception occurs, return False. 258 """ 259 pkg_components = PkgComponent * count 260 component_list = pkg_components() 261 component_value_list = list(opera_script_dict.keys()) 262 component_num = 0 263 for i, component in enumerate(component_value_list): 264 component_list[i].file_path = component.encode("utf-8") 265 component_list[i].component_addr = \ 266 (opera_script_dict[component]).encode("utf-8") 267 component_num += 1 268 return component_list, component_num 269 270 271 def get_tools_head_list(component_count): 272 """ 273 According to the header structure, get the list of HEAD headers. 274 :param component_count: number of components 275 :return head_list: header list 276 """ 277 head_list = PkgHeader() 278 head_list.digest_method = 0 279 head_list.sign_method = 0 280 head_list.pkg_type = 2 281 head_list.pkg_flags = 0 282 head_list.entry_count = component_count 283 return head_list 284 285 286 def get_signing_from_server(package_path, hash_algorithm, hash_code=None): 287 """ 288 Server update package signature requires the vendor to 289 implement its own service signature interface, as shown below: 290 ip = "" 291 user_name = "" 292 pass_word = "" 293 signe_jar = "" 294 signing_config = [signe_jar, ip, user_name, pass_word, 295 hash_code, hash_algorithm] 296 cmd = ' '.join(signing_config) 297 subprocess.Popen( 298 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 299 :param package_path: update package file path 300 :param hash_algorithm: hash algorithm 301 :param hash_code: hash code 302 :return: 303 """ 304 UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, " 305 "Signing hash code: %s" % 306 (package_path, hash_algorithm, hash_code)) 307 signing_content = "" 308 return signing_content.encode() 309 310 311 def create_build_tools_zip(): 312 """ 313 Create the update package file. 314 :param lib: lib object 315 :return: 316 """ 317 opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict 318 tmp_dict = {} 319 for each in SCRIPT_KEY_LIST: 320 tmp_dict[each] = [] 321 if opera_script_file_name_dict == tmp_dict: 322 UPDATE_LOGGER.print_log( 323 "Script dict is null!", 324 log_type=UPDATE_LOGGER.ERROR_LOG) 325 return False 326 count = 0 327 opera_script_dict = {} 328 for each_value in opera_script_file_name_dict.values(): 329 for each in each_value: 330 opera_script_dict[each[1].name] = each[0] 331 count += 1 332 # other_file_count --> 1(updater_binary) + 1(loadScript.us) 333 other_file_count = 2 334 count += other_file_count 335 if OPTIONS_MANAGER.register_script_file_obj is not None: 336 count += 1 337 head_list = get_tools_head_list(count) 338 component_list, num = get_tools_component_list(count, opera_script_dict) 339 total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj 340 register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj 341 update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, UPDATE_EXE_FILE_NAME) 342 343 file_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="build_tools-") 344 files_to_sign = [] 345 zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED) 346 # file name will be prefixed by build_tools in hash signed data 347 name_format_str = "build_tools/{}" 348 # add opera_script to build_tools.zip 349 for key, value in opera_script_dict.items(): 350 zip_file.write(key, value) 351 files_to_sign += [(key, name_format_str.format(value))] 352 binary_check = OPTIONS_MANAGER.init.invoke_event(CHECK_BINARY_EVENT) 353 if callable(binary_check) is False or (callable(binary_check) and binary_check() is False): 354 if not os.path.exists(update_exe_path): 355 UPDATE_LOGGER.print_log("updater_binary file does not exist!path: %s" % update_exe_path, 356 log_type=UPDATE_LOGGER.ERROR_LOG) 357 return False 358 # add update_binary to build_tools.zip 359 zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME) 360 files_to_sign += [(update_exe_path, name_format_str.format(UPDATE_EXE_FILE_NAME))] 361 362 # add loadScript to build_tools.zip 363 zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME) 364 files_to_sign += [(total_script_file_obj.name, name_format_str.format(TOTAL_SCRIPT_FILE_NAME))] 365 if OPTIONS_MANAGER.register_script_file_obj is not None: 366 zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME) 367 files_to_sign += [(register_script_file_obj.name, name_format_str.format(REGISTER_SCRIPT_FILE_NAME))] 368 369 if create_hsd_for_build_tools(zip_file, files_to_sign) is False: 370 zip_file.close() 371 return False 372 zip_file.close() 373 return file_obj 374 375 376 def do_sign_package(update_package, update_file_name): 377 signed_package = os.path.join( 378 update_package, "%s.zip" % update_file_name) 379 OPTIONS_MANAGER.signed_package = signed_package 380 if os.path.exists(signed_package): 381 os.remove(signed_package) 382 383 sign_ota_package = \ 384 OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT) 385 if sign_ota_package: 386 return sign_ota_package() 387 else: 388 return sign_package() 389 390 391 def get_update_file_name(): 392 if OPTIONS_MANAGER.sd_card : 393 package_type = "sd" 394 elif OPTIONS_MANAGER.source_package : 395 package_type = "diff" 396 else : 397 package_type = "full" 398 if OPTIONS_MANAGER.not_l2: 399 update_file_name = ''.join( 400 ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")]) 401 else : 402 update_file_name = ''.join( 403 ["updater_", package_type]) 404 return update_file_name 405 406 407 def do_zip_update_package(): 408 zip_file = zipfile.ZipFile(OPTIONS_MANAGER.update_package_file_path, 409 'w', zipfile.ZIP_DEFLATED, allowZip64=True) 410 # add files to update package 411 do_add_files = OPTIONS_MANAGER.init.invoke_event(ZIP_EVENT) 412 if callable(do_add_files) and do_add_files(zip_file) is False: 413 UPDATE_LOGGER.print_log("add files fail", UPDATE_LOGGER.ERROR_LOG) 414 zip_file.close() 415 return False 416 # add update.bin to update package 417 zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin") 418 # add build_tools.zip to update package 419 zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME) 420 421 zip_file.write(OPTIONS_MANAGER.board_list_file_path, "board_list") 422 decouple_res = OPTIONS_MANAGER.init.invoke_event(DECOUPLED_EVENT) 423 if decouple_res is False: 424 zip_file.write(OPTIONS_MANAGER.version_mbn_file_path, "version_list") 425 426 if OPTIONS_MANAGER.max_stash_size != 0: 427 max_stash_file_obj = tempfile.NamedTemporaryFile(mode="w+") 428 max_stash_file_obj.write(str(OPTIONS_MANAGER.max_stash_size)) 429 max_stash_file_obj.flush() 430 zip_file.write(max_stash_file_obj.name, "all_max_stash") 431 432 for package_patch_zip in OPTIONS_MANAGER.incremental_block_file_obj_dict.values(): 433 package_patch_zip.package_block_patch(zip_file) 434 435 for partition, patch_obj in OPTIONS_MANAGER.incremental_image_file_obj_dict.items(): 436 zip_file.write(patch_obj.name, "%s.patch.dat" % partition) 437 438 zip_file.close() 439 return True 440 441 442 def create_hsd_for_build_tools(zip_file, files_to_sign): 443 """ 444 generate hash signed data for build_tools.zip 445 """ 446 generate_signed_data_ext = OPTIONS_MANAGER.init.invoke_event(GENERATE_SIGNED_DATA_EVENT) 447 signed_data = "" 448 # add hash signed data to build_tools.zip 449 if generate_signed_data_ext is False: 450 signed_data = generate_signed_data_default(files_to_sign) 451 else: 452 signed_data = generate_signed_data_ext(files_to_sign) 453 if signed_data == "": 454 UPDATE_LOGGER.print_log("generate_signed_data failed", log_type=UPDATE_LOGGER.ERROR_LOG) 455 zip_file.close() 456 return False 457 zip_file.writestr("hash_signed_data", signed_data) 458 return True 459 460 461 def build_update_package(no_zip, update_package, prelude_script, 462 verse_script, refrain_script, ending_script): 463 """ 464 Create the update package file. 465 :param no_zip: no zip 466 :param update_package: update package path 467 :param prelude_script: prelude object 468 :param verse_script: verse object 469 :param refrain_script: refrain object 470 :param ending_script: ending object 471 :return: If exception occurs, return False. 472 """ 473 update_bin_obj = create_update_bin() 474 if update_bin_obj: 475 OPTIONS_MANAGER.update_bin_obj = update_bin_obj 476 else: 477 return False 478 479 update_file_name = get_update_file_name() 480 481 if not no_zip: 482 update_package_path = os.path.join( 483 update_package, '%s_unsigned.zip' % update_file_name) 484 OPTIONS_MANAGER.update_package_file_path = update_package_path 485 486 create_script(prelude_script, verse_script, 487 refrain_script, ending_script) 488 489 build_tools_zip_obj = create_build_tools_zip() 490 if build_tools_zip_obj is False: 491 UPDATE_LOGGER.print_log( 492 "Create build tools zip failed!", 493 log_type=UPDATE_LOGGER.ERROR_LOG) 494 return False 495 OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj 496 497 if not do_zip_update_package(): 498 UPDATE_LOGGER.print_log("Zip update package fail", UPDATE_LOGGER.ERROR_LOG) 499 return False 500 501 sign_result = do_sign_package(update_package, update_file_name) 502 503 if not sign_result: 504 UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG) 505 return False 506 if os.path.exists(update_package_path): 507 os.remove(update_package_path) 508 else: 509 update_package_path = os.path.join( 510 update_package, '%s.bin' % update_file_name) 511 if os.path.exists(update_package_path): 512 os.remove(update_package_path) 513 OPTIONS_MANAGER.update_package_file_path = update_package_path 514 with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f: 515 content = r_f.read() 516 with open(update_package_path, 'wb') as w_f: 517 w_f.write(content) 518 return True 519 520 521 def get_hash_content(file_path, hash_algorithm): 522 """ 523 Use SHA256SUM to get the hash value of the file. 524 :param file_path : file path 525 :param hash_algorithm: hash algorithm 526 :return hash_content: hash value 527 """ 528 try: 529 cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path] 530 except KeyError: 531 UPDATE_LOGGER.print_log( 532 "Unsupported hash algorithm! %s" % hash_algorithm, 533 log_type=UPDATE_LOGGER.ERROR_LOG) 534 return False 535 if not os.path.exists(file_path): 536 UPDATE_LOGGER.print_log( 537 "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm], 538 UPDATE_LOGGER.ERROR_LOG) 539 raise RuntimeError 540 process_obj = subprocess.Popen( 541 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 542 process_obj.wait() 543 hash_content = \ 544 process_obj.stdout.read().decode(encoding='gbk').split(' ')[0] 545 if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm): 546 UPDATE_LOGGER.print_log( 547 "Get hash content failed! The length of the hash_content is 0!", 548 UPDATE_LOGGER.ERROR_LOG) 549 raise RuntimeError 550 if process_obj.returncode == 0: 551 UPDATE_LOGGER.print_log( 552 "Get hash content success! path: %s" % file_path) 553 return hash_content 554