1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 # Copyright (c) 2021 Huawei Device Co., Ltd.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 import binascii
17 import copy
18 import os
19 import subprocess
20 import tempfile
21 import time
22 import collections as collect
23 import enum
24 import ctypes
25 import zipfile
26 
27 from log_exception import UPDATE_LOGGER
28 from script_generator import create_script
29 from utils import sign_package
30 from utils import HASH_CONTENT_LEN_DICT
31 from utils import OPTIONS_MANAGER
32 from utils import REGISTER_SCRIPT_FILE_NAME
33 from utils import ON_SERVER
34 from utils import SCRIPT_KEY_LIST
35 from utils import EXTEND_OPTIONAL_COMPONENT_LIST
36 from utils import COMPONENT_INFO_INNIT
37 from utils import UPDATE_EXE_FILE_NAME
38 from utils import TOTAL_SCRIPT_FILE_NAME
39 from utils import EXTEND_PATH_EVENT
40 from utils import LINUX_HASH_ALGORITHM_DICT
41 from utils import UPDATE_BIN_FILE_NAME
42 from utils import BUILD_TOOLS_FILE_NAME
43 from utils import SIGN_PACKAGE_EVENT
44 from utils import CHECK_BINARY_EVENT
45 from utils import ZIP_EVENT
46 from utils import GENERATE_SIGNED_DATA_EVENT
47 from utils import DECOUPLED_EVENT
48 from utils import get_extend_path_list
49 from create_update_package import CreatePackage
50 from create_update_package import SIGN_ALGO_RSA
51 from create_update_package import SIGN_ALGO_PSS
52 from create_signed_data import generate_signed_data_default
53 
54 IS_DEL = 0
55 SIGNING_LENGTH_256 = 256
56 DIGEST_LEN = 32
57 HASH_VALUE_MAX_LEN = 128
58 
59 
60 class SignMethod(enum.Enum):
61     RSA = 1
62     ECC = 2
63 
64 
65 class PkgHeader(ctypes.Structure):
66     _fields_ = [("digest_method", ctypes.c_ubyte),
67                 ("sign_method", ctypes.c_ubyte),
68                 ("pkg_type", ctypes.c_ubyte),
69                 ("pkg_flags", ctypes.c_ubyte),
70                 ("entry_count", ctypes.c_int),
71                 ("update_file_version", ctypes.c_int),
72                 ("product_update_id", ctypes.c_char_p),
73                 ("software_version", ctypes.c_char_p),
74                 ("date", ctypes.c_char_p),
75                 ("time", ctypes.c_char_p),
76                 ("describe_package_id", ctypes.c_char_p)]
77 
78 
79 class PkgComponent(ctypes.Structure):
80     _fields_ = [("digest", ctypes.c_ubyte * DIGEST_LEN),
81                 ("file_path", ctypes.c_char_p),
82                 ("component_addr", ctypes.c_char_p),
83                 ("version", ctypes.c_char_p),
84                 ("size", ctypes.c_int),
85                 ("id", ctypes.c_int),
86                 ("original_size", ctypes.c_int),
87                 ("res_type", ctypes.c_ubyte),
88                 ("type", ctypes.c_ubyte),
89                 ("flags", ctypes.c_ubyte)]
90 
91 
92 class SignInfo(ctypes.Structure):
93     _fields_ = [("sign_offset", ctypes.c_int),
94                 ("hash_len", ctypes.c_int),
95                 ("hash_code", ctypes.c_ubyte * (HASH_VALUE_MAX_LEN + 1))]
96 
97 
98 def create_update_bin():
99     """
100     Call the interface to generate the update.bin file.
101     :return update_bin_obj: Update file object.
102                             If exception occurs, return False.
103     """
104     update_bin_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="update_bin-")
105 
106     head_value_list = OPTIONS_MANAGER.head_info_list
107     component_dict = OPTIONS_MANAGER.component_info_dict
108     full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
109     full_img_list = OPTIONS_MANAGER.full_img_list
110 
111     extend_component_list = get_extend_path_list()
112     if not OPTIONS_MANAGER.not_l2:
113         if OPTIONS_MANAGER.partition_file_obj is not None:
114             all_image_name = \
115                 extend_component_list + EXTEND_OPTIONAL_COMPONENT_LIST + full_img_list
116         else:
117             all_image_name = extend_component_list + full_img_list
118     else:
119         all_image_name = full_img_list
120     sort_component_dict = collect.OrderedDict()
121     for each_image_name in all_image_name:
122         sort_component_dict[each_image_name] = component_dict.get(each_image_name)
123     component_dict = copy.deepcopy(sort_component_dict)
124     head_list = get_head_list(len(component_dict), head_value_list)
125 
126     component_list = get_component_list(
127         full_image_file_obj_list, component_dict)
128 
129     save_patch = update_bin_obj.name.encode("utf-8")
130     if OPTIONS_MANAGER.private_key == ON_SERVER:
131         private_key = "./update_package.py"
132     else:
133         private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
134 
135     if OPTIONS_MANAGER.not_l2:
136         sign_algo = SIGN_ALGO_PSS
137     else:
138         sign_algo = SIGN_ALGO_RSA
139 
140     # create bin
141     package = CreatePackage(head_list, component_list, save_patch, OPTIONS_MANAGER.private_key)
142     if not package.create_package():
143         UPDATE_LOGGER.print_log("Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG)
144         return False
145 
146     UPDATE_LOGGER.print_log("Create update package .bin complete! path: %s" % update_bin_obj.name)
147     return update_bin_obj
148 
149 
150 def get_component_list(all_image_file_obj_list, component_dict):
151     """
152     Get the list of component information according to
153     the component information structure.
154     :param all_image_file_obj_list: all image object file list
155     :param component_dict: Component information content dict
156     :return component_list: List of component information.
157                             If exception occurs, return False.
158     """
159     pkg_components = PkgComponent * len(component_dict)
160     component_list = pkg_components()
161     extend_list = get_extend_path_list()
162     if not OPTIONS_MANAGER.not_l2:
163         if OPTIONS_MANAGER.partition_file_obj is not None:
164             extend_component_list = extend_list + EXTEND_OPTIONAL_COMPONENT_LIST
165             extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
166                                 OPTIONS_MANAGER.board_list_file_path,
167                                 OPTIONS_MANAGER.partition_file_obj.name]
168         else:
169             extend_component_list = extend_list
170             extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
171                                 OPTIONS_MANAGER.board_list_file_path]
172     else:
173         extend_component_list = []
174         extend_path_list = []
175     get_path_list = OPTIONS_MANAGER.init.invoke_event(EXTEND_PATH_EVENT)
176     if get_path_list:
177         extend_path_list = get_path_list()
178     idx = 0
179     for key, component in component_dict.items():
180         if idx < len(extend_component_list):
181             file_path = extend_path_list[idx]
182         else:
183             file_path = all_image_file_obj_list[idx - len(extend_component_list)].name
184         digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm)
185         if not digest:
186             return
187         if component is None:
188             component = copy.copy(COMPONENT_INFO_INNIT)
189             component[0] = key
190         component_list[idx].digest = (ctypes.c_ubyte * 32).from_buffer_copy(
191             binascii.a2b_hex(digest.encode('utf-8')))
192         component_list[idx].file_path = file_path.encode("utf-8")
193         if not OPTIONS_MANAGER.not_l2:
194             component_list[idx].component_addr = ('/%s' % component[0]).encode("utf-8")
195         else:
196             component_list[idx].component_addr = ('%s' % component[0]).encode("utf-8")
197         component_list[idx].version = component[4].encode("utf-8")
198         component_list[idx].size = os.path.getsize(file_path)
199         component_list[idx].id = int(component[1])
200         if component[3] == 1:
201             component_list[idx].original_size = os.path.getsize(file_path)
202         else:
203             component_list[idx].original_size = 0
204         component_list[idx].res_type = int(component[2])
205         component_list[idx].type = int(component[3])
206         component_list[idx].flags = IS_DEL
207 
208         idx += 1
209     return component_list
210 
211 
212 def get_head_list(component_count, head_value_list):
213     """
214     According to the header structure, get the list of HEAD headers.
215     :param component_count: number of components
216     :param head_value_list: list of header values
217     :return head_list: header list
218     """
219     head_list = PkgHeader()
220     if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256:
221         # PKG_DIGEST_TYPE_SHA384   3,use sha384
222         head_list.digest_method = 3
223     else:
224         # PKG_DIGEST_TYPE_SHA256   2,use sha256
225         head_list.digest_method = 2
226     if OPTIONS_MANAGER.private_key == ON_SERVER:
227         head_list.sign_method = 0
228     else:
229         if OPTIONS_MANAGER.signing_algorithm == "ECC":
230             # signing algorithm use ECC
231             head_list.sign_method = SignMethod.ECC.value
232         else:
233             # signing algorithm use RSA
234             head_list.sign_method = SignMethod.RSA.value
235     head_list.pkg_type = 1
236     if OPTIONS_MANAGER.not_l2:
237         head_list.pkg_flags = 1
238     else:
239         head_list.pkg_flags = 0
240     head_list.entry_count = component_count
241     head_list.update_file_version = int(head_value_list[0])
242     head_list.product_update_id = head_value_list[1].encode("utf-8")
243     head_list.software_version = head_value_list[2].encode("utf-8")
244     head_list.date = head_value_list[3].encode("utf-8")
245     head_list.time = head_value_list[4].encode("utf-8")
246     head_list.describe_package_id = ctypes.c_char_p("update/info.bin".encode())
247     return head_list
248 
249 
250 def get_tools_component_list(count, opera_script_dict):
251     """
252     Get the list of component information according to
253     the component information structure.
254     :param count: number of components
255     :param opera_script_dict: script file name and path dict
256     :return component_list: list of component information.
257                             If exception occurs, return False.
258     """
259     pkg_components = PkgComponent * count
260     component_list = pkg_components()
261     component_value_list = list(opera_script_dict.keys())
262     component_num = 0
263     for i, component in enumerate(component_value_list):
264         component_list[i].file_path = component.encode("utf-8")
265         component_list[i].component_addr = \
266             (opera_script_dict[component]).encode("utf-8")
267         component_num += 1
268     return component_list, component_num
269 
270 
271 def get_tools_head_list(component_count):
272     """
273     According to the header structure, get the list of HEAD headers.
274     :param component_count: number of components
275     :return head_list: header list
276     """
277     head_list = PkgHeader()
278     head_list.digest_method = 0
279     head_list.sign_method = 0
280     head_list.pkg_type = 2
281     head_list.pkg_flags = 0
282     head_list.entry_count = component_count
283     return head_list
284 
285 
286 def get_signing_from_server(package_path, hash_algorithm, hash_code=None):
287     """
288     Server update package signature requires the vendor to
289     implement its own service signature interface, as shown below:
290     ip = ""
291     user_name = ""
292     pass_word = ""
293     signe_jar = ""
294     signing_config = [signe_jar, ip, user_name, pass_word,
295                       hash_code, hash_algorithm]
296     cmd = ' '.join(signing_config)
297     subprocess.Popen(
298         cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
299     :param package_path: update package file path
300     :param hash_algorithm: hash algorithm
301     :param hash_code: hash code
302     :return:
303     """
304     UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s, "
305                             "Signing hash code: %s" %
306                             (package_path, hash_algorithm, hash_code))
307     signing_content = ""
308     return signing_content.encode()
309 
310 
311 def create_build_tools_zip():
312     """
313     Create the update package file.
314     :param lib: lib object
315     :return:
316     """
317     opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
318     tmp_dict = {}
319     for each in SCRIPT_KEY_LIST:
320         tmp_dict[each] = []
321     if opera_script_file_name_dict == tmp_dict:
322         UPDATE_LOGGER.print_log(
323             "Script dict is null!",
324             log_type=UPDATE_LOGGER.ERROR_LOG)
325         return False
326     count = 0
327     opera_script_dict = {}
328     for each_value in opera_script_file_name_dict.values():
329         for each in each_value:
330             opera_script_dict[each[1].name] = each[0]
331             count += 1
332     # other_file_count --> 1(updater_binary) + 1(loadScript.us)
333     other_file_count = 2
334     count += other_file_count
335     if OPTIONS_MANAGER.register_script_file_obj is not None:
336         count += 1
337     head_list = get_tools_head_list(count)
338     component_list, num = get_tools_component_list(count, opera_script_dict)
339     total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
340     register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj
341     update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, UPDATE_EXE_FILE_NAME)
342 
343     file_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="build_tools-")
344     files_to_sign = []
345     zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED)
346     # file name will be prefixed by build_tools in hash signed data
347     name_format_str = "build_tools/{}"
348     # add opera_script to build_tools.zip
349     for key, value in opera_script_dict.items():
350         zip_file.write(key, value)
351         files_to_sign += [(key, name_format_str.format(value))]
352     binary_check = OPTIONS_MANAGER.init.invoke_event(CHECK_BINARY_EVENT)
353     if callable(binary_check) is False or (callable(binary_check) and binary_check() is False):
354         if not os.path.exists(update_exe_path):
355             UPDATE_LOGGER.print_log("updater_binary file does not exist!path: %s" % update_exe_path,
356                 log_type=UPDATE_LOGGER.ERROR_LOG)
357             return False
358         # add update_binary to build_tools.zip
359         zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME)
360         files_to_sign += [(update_exe_path, name_format_str.format(UPDATE_EXE_FILE_NAME))]
361 
362     # add loadScript to build_tools.zip
363     zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME)
364     files_to_sign += [(total_script_file_obj.name, name_format_str.format(TOTAL_SCRIPT_FILE_NAME))]
365     if OPTIONS_MANAGER.register_script_file_obj is not None:
366         zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME)
367         files_to_sign += [(register_script_file_obj.name, name_format_str.format(REGISTER_SCRIPT_FILE_NAME))]
368 
369     if create_hsd_for_build_tools(zip_file, files_to_sign) is False:
370         zip_file.close()
371         return False
372     zip_file.close()
373     return file_obj
374 
375 
376 def do_sign_package(update_package, update_file_name):
377     signed_package = os.path.join(
378             update_package, "%s.zip" % update_file_name)
379     OPTIONS_MANAGER.signed_package = signed_package
380     if os.path.exists(signed_package):
381         os.remove(signed_package)
382 
383     sign_ota_package = \
384         OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT)
385     if sign_ota_package:
386         return sign_ota_package()
387     else:
388         return sign_package()
389 
390 
391 def get_update_file_name():
392     if OPTIONS_MANAGER.sd_card :
393         package_type = "sd"
394     elif OPTIONS_MANAGER.source_package :
395         package_type = "diff"
396     else :
397         package_type = "full"
398     if OPTIONS_MANAGER.not_l2:
399         update_file_name = ''.join(
400             ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")])
401     else :
402         update_file_name = ''.join(
403             ["updater_", package_type])
404     return update_file_name
405 
406 
407 def do_zip_update_package():
408     zip_file = zipfile.ZipFile(OPTIONS_MANAGER.update_package_file_path,
409                                'w', zipfile.ZIP_DEFLATED, allowZip64=True)
410     # add files to update package
411     do_add_files = OPTIONS_MANAGER.init.invoke_event(ZIP_EVENT)
412     if callable(do_add_files) and do_add_files(zip_file) is False:
413         UPDATE_LOGGER.print_log("add files fail", UPDATE_LOGGER.ERROR_LOG)
414         zip_file.close()
415         return False
416     # add update.bin to update package
417     zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin")
418     # add build_tools.zip to update package
419     zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME)
420 
421     zip_file.write(OPTIONS_MANAGER.board_list_file_path, "board_list")
422     decouple_res = OPTIONS_MANAGER.init.invoke_event(DECOUPLED_EVENT)
423     if decouple_res is False:
424         zip_file.write(OPTIONS_MANAGER.version_mbn_file_path, "version_list")
425 
426     if OPTIONS_MANAGER.max_stash_size != 0:
427         max_stash_file_obj = tempfile.NamedTemporaryFile(mode="w+")
428         max_stash_file_obj.write(str(OPTIONS_MANAGER.max_stash_size))
429         max_stash_file_obj.flush()
430         zip_file.write(max_stash_file_obj.name, "all_max_stash")
431 
432     for package_patch_zip in OPTIONS_MANAGER.incremental_block_file_obj_dict.values():
433         package_patch_zip.package_block_patch(zip_file)
434 
435     for partition, patch_obj in OPTIONS_MANAGER.incremental_image_file_obj_dict.items():
436         zip_file.write(patch_obj.name, "%s.patch.dat" % partition)
437 
438     zip_file.close()
439     return True
440 
441 
442 def create_hsd_for_build_tools(zip_file, files_to_sign):
443     """
444     generate hash signed data for build_tools.zip
445     """
446     generate_signed_data_ext = OPTIONS_MANAGER.init.invoke_event(GENERATE_SIGNED_DATA_EVENT)
447     signed_data = ""
448     # add hash signed data to build_tools.zip
449     if generate_signed_data_ext is False:
450         signed_data = generate_signed_data_default(files_to_sign)
451     else:
452         signed_data = generate_signed_data_ext(files_to_sign)
453     if signed_data == "":
454         UPDATE_LOGGER.print_log("generate_signed_data failed", log_type=UPDATE_LOGGER.ERROR_LOG)
455         zip_file.close()
456         return False
457     zip_file.writestr("hash_signed_data", signed_data)
458     return True
459 
460 
461 def build_update_package(no_zip, update_package, prelude_script,
462                          verse_script, refrain_script, ending_script):
463     """
464     Create the update package file.
465     :param no_zip: no zip
466     :param update_package: update package path
467     :param prelude_script: prelude object
468     :param verse_script: verse object
469     :param refrain_script: refrain object
470     :param ending_script: ending object
471     :return: If exception occurs, return False.
472     """
473     update_bin_obj = create_update_bin()
474     if update_bin_obj:
475         OPTIONS_MANAGER.update_bin_obj = update_bin_obj
476     else:
477         return False
478 
479     update_file_name = get_update_file_name()
480 
481     if not no_zip:
482         update_package_path = os.path.join(
483             update_package, '%s_unsigned.zip' % update_file_name)
484         OPTIONS_MANAGER.update_package_file_path = update_package_path
485 
486         create_script(prelude_script, verse_script,
487                       refrain_script, ending_script)
488 
489         build_tools_zip_obj = create_build_tools_zip()
490         if build_tools_zip_obj is False:
491             UPDATE_LOGGER.print_log(
492                 "Create build tools zip failed!",
493                 log_type=UPDATE_LOGGER.ERROR_LOG)
494             return False
495         OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj
496 
497         if not do_zip_update_package():
498             UPDATE_LOGGER.print_log("Zip update package fail", UPDATE_LOGGER.ERROR_LOG)
499             return False
500 
501         sign_result = do_sign_package(update_package, update_file_name)
502 
503         if not sign_result:
504             UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG)
505             return False
506         if os.path.exists(update_package_path):
507             os.remove(update_package_path)
508     else:
509         update_package_path = os.path.join(
510             update_package, '%s.bin' % update_file_name)
511         if os.path.exists(update_package_path):
512             os.remove(update_package_path)
513         OPTIONS_MANAGER.update_package_file_path = update_package_path
514         with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f:
515             content = r_f.read()
516         with open(update_package_path, 'wb') as w_f:
517             w_f.write(content)
518     return True
519 
520 
521 def get_hash_content(file_path, hash_algorithm):
522     """
523     Use SHA256SUM to get the hash value of the file.
524     :param file_path : file path
525     :param hash_algorithm: hash algorithm
526     :return hash_content: hash value
527     """
528     try:
529         cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path]
530     except KeyError:
531         UPDATE_LOGGER.print_log(
532             "Unsupported hash algorithm! %s" % hash_algorithm,
533             log_type=UPDATE_LOGGER.ERROR_LOG)
534         return False
535     if not os.path.exists(file_path):
536         UPDATE_LOGGER.print_log(
537             "%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm],
538             UPDATE_LOGGER.ERROR_LOG)
539         raise RuntimeError
540     process_obj = subprocess.Popen(
541         cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
542     process_obj.wait()
543     hash_content = \
544         process_obj.stdout.read().decode(encoding='gbk').split(' ')[0]
545     if len(hash_content) != HASH_CONTENT_LEN_DICT.get(hash_algorithm):
546         UPDATE_LOGGER.print_log(
547             "Get hash content failed! The length of the hash_content is 0!",
548             UPDATE_LOGGER.ERROR_LOG)
549         raise RuntimeError
550     if process_obj.returncode == 0:
551         UPDATE_LOGGER.print_log(
552             "Get hash content success! path: %s" % file_path)
553     return hash_content
554