import logging # 导入 logging 模块,用于记录日志
def getConsoleLogger(name="logger"):
"""
创建并配置一个自定义的日志记录器,用于将日志输出到控制台
参数:
name (str): 日志记录器的名称,默认为 "logger"
返回:
logging.Logger: 配置好的日志记录器对象
"""
logger = logging.getLogger(name) # 获取一个指定名称的日志记录器
logger.setLevel(logging.DEBUG) # 设置日志记录器的日志级别为 DEBUG,即记录所有级别的日志
console_handler = logging.StreamHandler() # 创建一个流处理器,用于将日志输出到控制台
console_handler.setLevel(logging.DEBUG) # 设置流处理器的日志级别为 DEBUG
formatter = logging.Formatter('[%(asctime)s - %(name)s - %(levelname)s ] - [ %(message)s]') # 创建一个日志格式器
console_handler.setFormatter(formatter) # 将格式器应用到流处理器
logger.addHandler(console_handler) # 将流处理器添加到日志记录器
return logger # 返回配置好的日志记录器
def getFileLogger(name="logger", filename="app.log"):
"""
创建并配置一个自定义的日志记录器,用于将日志输出到文件
参数:
name (str): 日志记录器的名称,默认为 "logger"
filename (str): 日志文件的名称,默认为 "app.log"
返回:
logging.Logger: 配置好的日志记录器对象
"""
logger = logging.getLogger(name) # 获取一个指定名称的日志记录器
logger.setLevel(logging.DEBUG) # 设置日志记录器的日志级别为 DEBUG,即记录所有级别的日志
file_handler = logging.FileHandler(filename) # 创建一个文件处理器,用于将日志输出到文件
file_handler.setLevel(logging.DEBUG) # 设置文件处理器的日志级别为 DEBUG
formatter = logging.Formatter('[%(asctime)s - %(name)s - %(levelname)s ] - [ %(message)s]') # 创建一个日志格式器
file_handler.setFormatter(formatter) # 将格式器应用到文件处理器
logger.addHandler(file_handler) # 将文件处理器添加到日志记录器
return logger # 返回配置好的日志记录器
# 创建一个名为 "cLogger" 的控制台日志记录器
cLogger = getConsoleLogger("cLogger")
# 记录不同级别的日志到控制台
cLogger.debug("这是 debug 级别日志") # 记录 DEBUG 级别的日志
cLogger.info("这是 info 级别日志") # 记录 INFO 级别的日志
cLogger.warning("这是 warning 级别日志") # 记录 WARNING 级别的日志
cLogger.error("这是 error 级别日志") # 记录 ERROR 级别的日志
cLogger.critical("这是 critical 级别日志") # 记录 CRITICAL 级别的日志
# 创建一个名为 "fLogger" 的文件日志记录器
fLogger = getFileLogger("fLogger", "app.log")
# 记录不同级别的日志到文件
fLogger.debug("这是 debug 级别日志") # 记录 DEBUG 级别的日志
fLogger.info("这是 info 级别日志") # 记录 INFO 级别的日志
fLogger.warning("这是 warning 级别日志") # 记录 WARNING 级别的日志
fLogger.error("这是 error 级别日志") # 记录 ERROR 级别的日志
fLogger.critical("这是 critical 级别日志") # 记录 CRITICAL 级别的日志
自定义日志模块实现 ilogger
import os
import inspect
import logging
import logging.handlers
# main_path = os.path.dirname(os.path.abspath(__file__))
logger=None
ilogger=None
_hasPrint=False
def get_logger(main_path=None):
global logger
global ilogger
global _hasPrint
if logger is not None:
return logger
if(main_path is None):
main_path=_get_main_path();
(server_name,server_path)=_get_server_name_and_path(main_path)
log_formatter=logging.Formatter('[%(name)s][%(asctime)s][%(levelname)s][%(filename)s line:%(lineno)s]:%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log_folder_path=os.path.join(server_path,"logs")
log_file_path=os.path.join(log_folder_path,"log")
if not os.path.exists(log_folder_path):
os.makedirs(log_folder_path)
max_file_size = 2 * 1024 * 1024 # 2 MB
backup_count = 3 # 最多保留5个备份文件
file_handler=logging.handlers.RotatingFileHandler(log_file_path, maxBytes=max_file_size, backupCount=backup_count,encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(log_formatter)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(log_formatter)
inner_logger=logging.getLogger(server_name)
# 禁用日志传播
inner_logger.propagate = False
inner_logger.setLevel(logging.INFO)
inner_logger.addHandler(console_handler)
inner_logger.addHandler(file_handler)
raw_error=inner_logger.error
def error(msg, *args, **kwargs):
# 将 stack_info 参数设为 True
if 'stack_info' not in kwargs:
kwargs['stack_info'] = True
if 'stacklevel' not in kwargs:
kwargs['stacklevel'] = 2
raw_error(msg, *args, **kwargs)
inner_logger.error=error
logger=inner_logger
ilogger=inner_logger
if not _hasPrint:
_hasPrint=True
inner_logger.info(f"-----------------------------")
inner_logger.info(f"-----------------------------")
inner_logger.info(f"--------servcie start--------")
inner_logger.info(f"-----------------------------")
inner_logger.info(f"-----------------------------")
return inner_logger
def _get_server_name_and_path(main_path):
path_parts = main_path.split(os.sep)
paths=[]
path_len=len(path_parts)
server_name_idx=path_len-2
for i in range(path_len):
if i<=server_name_idx:
paths.append(path_parts[i])
if(i==server_name_idx):
server_name=path_parts[i]
return (server_name,os.sep.join(paths))
def _get_main_path():
# 获取当前栈帧
current_frame = inspect.currentframe()
try:
# 回溯到调用者的栈帧
caller_frame = current_frame.f_back.f_back
if caller_frame:
module = inspect.getmodule(caller_frame)
if module and hasattr(module, '__file__'):
return os.path.abspath(module.__file__)
finally:
del current_frame # 删除对帧对象的引用以避免循环引用
return None
if __name__ == '__main__':
raise Exception("This is a module, not a script,can not run directly.")
call
from ilogger import get_logger
ilogger=get_logger()
flask 日志挂载
import time
import json
from flask import request,g
from ilogger import logger
def mount_flask_app_hook(app):
@app.before_request
def before_request():
g.start_time = time.time()
g.method = request.method
g.url = request.url
g.query_params = request.args.to_dict() or {}
g.form_data = request.form.to_dict() or {}
g.json_data = request.get_json(silent=True) or {}
log_message = (
f"###[START]开始处理请求 :[Method:{g.method}]"
f"[Url:{g.url}]"
)
logger.info(log_message)
@app.after_request
def after_request(response):
elapsed_time_ms = (time.time() - g.start_time) * 1000
status_code = response.status_code if response else 500
input_params = {}
# 聚合所有类型的参数到 input_params
for key, value in g.query_params.items():
input_params[key] = _process_param_value(value)
for key, value in g.form_data.items():
input_params[key] = _process_param_value(value)
if isinstance(g.json_data, dict):
for key, value in g.json_data.items():
input_params[key] = _process_param_value(value)
input_params_str=""
for key in input_params:
input_params_str += f"- {key}: {input_params[key]}\n"
# 尝试解析 JSON 响应内容
response_data={}
try:
response_data = response.get_json(force=True, silent=True) or {}
except Exception:
response_data = {}
if len(response_data) == 0:
try:
res_data=_process_param_value(response.get_data())
response_data = {"res_data":res_data}
except Exception:
response_data = {"res_data":response.get_data()}
output_params_str=""
for key in response_data:
response_data[key]=_process_param_value(response_data[key])
output_params_str += f"- {key}: {response_data[key]}\n"
log_message = (
f"###[END]完成处理请求 :[Method:{g.method}]"
f"[Url:{g.url}]"
f"[Http Status:{status_code}]"
f"[Cost Time:{elapsed_time_ms:.2f} ms]\n"
f"Input Params:\n{input_params_str}"
f"Output Params:\n{output_params_str}"
)
logger.info(log_message)
return response
@app.errorhandler(404)
def page_not_found(e):
return 'Page not found', 404
@app.errorhandler(500)
def internal_server_error(e):
return 'Internal server error', 500
def _process_param_value(param_value):
if isinstance(param_value, bytes):
str_value = param_value.decode('utf-8', errors='ignore')
else:
str_value = str(param_value)
if len(str_value) > 200:
return f"{str_value[:200]}..."
return str_value
if __name__ == '__main__':
raise Exception("This is a module, not a script,can not run directly.")
call
app = Flask(__name__)
mount_flask_app_hook(app)
发表评论