1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19import codecs 20import sys 21import time 22import os 23import json 24import logging.config 25import stat 26from logging import FileHandler 27 28 29class SafeFileHandler(FileHandler): 30 def __init__(self, filename, mode="a", encoding="utf-8", delay=0, suffix="%Y-%m-%d_%H"): 31 if codecs is None: 32 encoding = None 33 current_time = time.strftime(suffix, time.localtime()) 34 FileHandler.__init__(self, filename + "." + current_time, mode, encoding, delay) 35 36 self.filename = os.fspath(filename) 37 38 self.mode = mode 39 self.encoding = encoding 40 self.suffix = suffix 41 self.suftime = current_time 42 43 def emit(self, record): 44 try: 45 if self.parse_file_name(): 46 self.gen_file_name() 47 FileHandler.emit(self, record) 48 except Exception as e: 49 print(e) 50 self.handleError(record) 51 52 def parse_file_name(self): 53 time_tuple = time.localtime() 54 55 if self.suftime != time.strftime(self.suffix, time_tuple) or not os.path.exists( 56 os.path.abspath(self.filename) + '.' + self.suftime): 57 return 1 58 else: 59 return 0 60 61 def gen_file_name(self): 62 if self.stream: 63 self.stream.close() 64 self.stream = None 65 66 if self.suftime != "": 67 index = self.baseFilename.find("." + self.suftime) 68 if index == -1: 69 index = self.baseFilename.rfind(".") 70 self.baseFilename = self.baseFilename[:index] 71 72 cur_time = time.localtime() 73 self.suftime = time.strftime(self.suffix, cur_time) 74 self.baseFilename = os.path.abspath(self.filename) + "." + self.suftime 75 76 if not self.delay: 77 with os.fdopen(os.open(self.baseFilename, os.O_WRONLT | os.O_CREAT | os.O_EXCL, 78 stat.S_IWUSR | stat.S_IRUSR), self.mode, encoding=self.encoding) as f: 79 self.stream = f 80 81 82def get_logger(class_name, level="info"): 83 formate = "%(asctime)s -%(levelname)s - %(name)s - %(message)s" 84 formatter = logging.Formatter(formate) 85 log_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "log") 86 if not os.path.exists(log_path): 87 os.makedirs(log_path) 88 tfr_handler = SafeFileHandler(os.path.join(log_path, class_name + ".log")) 89 tfr_handler.setFormatter(formatter) 90 91 sh = logging.StreamHandler() 92 sh.setFormatter(formatter) 93 94 logger = logging.getLogger(class_name) 95 logger.addHandler(tfr_handler) 96 logger.addHandler(sh) 97 98 if level == 'info': 99 logger.setLevel(logging.INFO) 100 elif level == 'error': 101 logger.setLevel(logging.ERROR) 102 return logger 103 104 105def parse_json(): 106 config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "build_example.json") 107 try: 108 with os.fdopen(os.open(config_path, os.O_WRONLT | os.O_CREAT | os.O_EXCL, 109 stat.S_IWUSR | stat.S_IRUSR), "r", encoding="utf-8") as json_file: 110 data = json.load(json_file) 111 return data 112 except Exception as e: 113 print(e) 114 return None 115 116