From 882f9e44aac7220430978c84935259adc3dc957c Mon Sep 17 00:00:00 2001 From: zhouchong Date: Wed, 8 Apr 2026 14:31:59 +0800 Subject: [PATCH] feat: implement log channel separation and request log level system --- fastdeploy/engine/async_llm.py | 5 +- fastdeploy/engine/engine.py | 47 +++-- fastdeploy/engine/request.py | 14 +- fastdeploy/entrypoints/api_server.py | 7 +- fastdeploy/entrypoints/engine_client.py | 137 ++++++++++---- fastdeploy/entrypoints/llm.py | 13 +- fastdeploy/entrypoints/openai/api_server.py | 12 +- fastdeploy/entrypoints/openai/protocol.py | 5 +- fastdeploy/entrypoints/openai/serving_chat.py | 62 +++++-- .../entrypoints/openai/serving_completion.py | 76 +++++--- .../entrypoints/openai/serving_embedding.py | 9 +- .../entrypoints/openai/serving_engine.py | 42 +++-- .../entrypoints/openai/serving_models.py | 5 +- .../entrypoints/openai/serving_reward.py | 11 +- .../ernie_45_vl_thinking_tool_parser.py | 9 +- .../tool_parsers/ernie_x1_tool_parser.py | 5 +- .../entrypoints/openai/v1/serving_chat.py | 18 +- .../openai/v1/serving_completion.py | 40 ++++- fastdeploy/input/base_processor.py | 33 +++- .../ernie4_5_vl_processor.py | 3 +- .../qwen3_vl_processor/qwen3_vl_processor.py | 3 +- .../qwen_vl_processor/qwen_vl_processor.py | 3 +- fastdeploy/input/tokenizer_client.py | 19 +- fastdeploy/logger/config.py | 47 +++++ fastdeploy/logger/logger.py | 35 +++- fastdeploy/logger/request_logger.py | 90 ++++++++++ fastdeploy/logger/setup_logging.py | 167 ++++++++++-------- fastdeploy/output/token_processor.py | 154 ++++++++++++---- fastdeploy/scheduler/dp_scheduler.py | 19 +- fastdeploy/scheduler/global_scheduler.py | 32 +++- fastdeploy/scheduler/local_scheduler.py | 26 ++- fastdeploy/scheduler/splitwise_scheduler.py | 22 ++- fastdeploy/utils.py | 28 +-- .../openai/v1/test_serving_completion_v1.py | 13 +- tests/entrypoints/test_abort.py | 18 +- tests/entrypoints/test_engine_client.py | 13 +- tests/entrypoints/test_llm.py | 15 +- tests/input/test_video_utils.py | 2 +- tests/logger/test_logger.py | 8 +- tests/logger/test_logging_config.py | 91 ++++++++++ tests/logger/test_request_logger.py | 155 ++++++++++++++++ tests/logger/test_setup_logging.py | 58 +++++- tests/output/test_process_batch_output.py | 10 +- .../test_process_batch_output_use_zmq.py | 12 +- tests/scheduler/test_dp_scheduler.py | 20 ++- tests/scheduler/test_local_scheduler.py | 8 +- 46 files changed, 1270 insertions(+), 351 deletions(-) create mode 100644 fastdeploy/logger/config.py create mode 100644 fastdeploy/logger/request_logger.py create mode 100644 tests/logger/test_logging_config.py create mode 100644 tests/logger/test_request_logger.py diff --git a/fastdeploy/engine/async_llm.py b/fastdeploy/engine/async_llm.py index c06292ec981..75892a60267 100644 --- a/fastdeploy/engine/async_llm.py +++ b/fastdeploy/engine/async_llm.py @@ -37,6 +37,7 @@ from fastdeploy.input.preprocess import InputPreprocessor from fastdeploy.inter_communicator import IPCSignal from fastdeploy.inter_communicator.zmq_client import ZmqIpcClient +from fastdeploy.logger.request_logger import log_request_error from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.utils import EngineError, envs, llm_logger @@ -562,7 +563,7 @@ async def generate( llm_logger.info(f"Request {conn_request_id} generator exit (outer)") return except Exception as e: - llm_logger.error(f"Request {conn_request_id} failed: {e}") + log_request_error(message="Request {request_id} failed: {error}", request_id=conn_request_id, error=e) raise EngineError(str(e), error_code=500) from e finally: # Ensure request_map/request_num are cleaned up @@ -584,7 +585,7 @@ async def abort_request(self, request_id: str) -> None: await self.connection_manager.cleanup_request(request_id) llm_logger.info(f"Aborted request {request_id}") except Exception as e: - llm_logger.error(f"Failed to abort request {request_id}: {e}") + log_request_error(message="Failed to abort request {request_id}: {error}", request_id=request_id, error=e) async def shutdown(self): """ diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 283693fae8c..9a85c17bb98 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -44,6 +44,7 @@ from fastdeploy.engine.expert_service import start_data_parallel_service from fastdeploy.engine.request import Request from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.platforms import current_platform from fastdeploy.utils import EngineError, console_logger, envs, llm_logger @@ -285,7 +286,7 @@ def add_requests(self, task, sampling_params=None, **kwargs): # Create Request struct after processing request = Request.from_dict(task) request.metrics.scheduler_recv_req_time = time.time() - llm_logger.info(f"Receive request {request}") + log_request(level=2, message="Receive request {request}", request=request) request.metrics.preprocess_start_time = time.time() request.prompt_token_ids_len = len(request.prompt_token_ids) @@ -304,12 +305,20 @@ def add_requests(self, task, sampling_params=None, **kwargs): f"Input text is too long, length of prompt token({input_ids_len}) " f"+ min_dec_len ({min_tokens}) >= max_model_len " ) - llm_logger.error(error_msg) + log_request_error( + message="Input text is too long, length of prompt token({input_ids_len}) + min_dec_len ({min_tokens}) >= max_model_len", + input_ids_len=input_ids_len, + min_tokens=min_tokens, + ) raise EngineError(error_msg, error_code=400) if input_ids_len > self.cfg.model_config.max_model_len: error_msg = f"Length of input token({input_ids_len}) exceeds the limit max_model_len({self.cfg.model_config.max_model_len})." - llm_logger.error(error_msg) + log_request_error( + message="Length of input token({input_ids_len}) exceeds the limit max_model_len({max_model_len}).", + input_ids_len=input_ids_len, + max_model_len=self.cfg.model_config.max_model_len, + ) raise EngineError(error_msg, error_code=400) if request.get("stop_seqs_len") is not None: @@ -320,7 +329,11 @@ def add_requests(self, task, sampling_params=None, **kwargs): f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})." "Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`" ) - llm_logger.error(error_msg) + log_request_error( + message="Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num}).", + stop_seqs_len=stop_seqs_len, + max_stop_seqs_num=max_stop_seqs_num, + ) raise EngineError(error_msg, error_code=400) stop_seqs_max_len = envs.FD_STOP_SEQS_MAX_LEN for single_stop_seq_len in stop_seqs_len: @@ -329,7 +342,11 @@ def add_requests(self, task, sampling_params=None, **kwargs): f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})." "Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`" ) - llm_logger.error(error_msg) + log_request_error( + message="Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len}).", + single_stop_seq_len=single_stop_seq_len, + stop_seqs_max_len=stop_seqs_max_len, + ) raise EngineError(error_msg, error_code=400) if self._has_guided_input(request): @@ -342,14 +359,18 @@ def add_requests(self, task, sampling_params=None, **kwargs): request, err_msg = self.guided_decoding_checker.schema_format(request) if err_msg is not None: - llm_logger.error(err_msg) + log_request_error(message="guided decoding error: {err_msg}", err_msg=err_msg) raise EngineError(err_msg, error_code=400) request.metrics.preprocess_end_time = time.time() request.metrics.scheduler_recv_req_time = time.time() self.engine.scheduler.put_requests([request]) - llm_logger.info(f"Cache task with request_id ({request.get('request_id')})") - llm_logger.debug(f"cache task: {request}") + log_request( + level=1, + message="Cache task with request_id ({request_id})", + request_id=request.get("request_id"), + ) + log_request(level=3, message="cache task: {request}", request=request) def _worker_processes_ready(self): """ @@ -715,11 +736,15 @@ def generate(self, prompts, stream): Yields: dict: The generated response. """ - llm_logger.info(f"Starting generation for prompt: {prompts}") + log_request(level=2, message="Starting generation for prompt: {prompts}", prompts=prompts) try: req_id = self._format_and_add_data(prompts) except Exception as e: - llm_logger.error(f"Error happened while adding request, details={e}, {str(traceback.format_exc())}") + log_request_error( + message="Error happened while adding request, details={error}, {traceback}", + error=str(e), + traceback=traceback.format_exc(), + ) raise EngineError(str(e), error_code=400) # Get the result of the current request @@ -738,7 +763,7 @@ def generate(self, prompts, stream): output = self.engine.data_processor.process_response_dict( result.to_dict(), stream=False, include_stop_str_in_output=False, direct_decode=not stream ) - llm_logger.debug(f"Generate result: {output}") + log_request(level=3, message="Generate result: {output}", output=output) if not stream: yield output else: diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 0e95cd5e1fb..aea3674be4d 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -39,7 +39,7 @@ StructuralTagResponseFormat, ToolCall, ) -from fastdeploy.utils import data_processor_logger +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.worker.output import ( LogprobsLists, PromptLogprobs, @@ -313,15 +313,13 @@ def from_generic_request( ), "The parameter `raw_request` is not supported now, please use completion api instead." for key, value in req.metadata.items(): setattr(request, key, value) - from fastdeploy.utils import api_server_logger - - api_server_logger.warning("The parameter metadata is obsolete.") + log_request(level=1, message="The parameter metadata is obsolete.") return request @classmethod def from_dict(cls, d: dict): - data_processor_logger.debug(f"{d}") + log_request(level=3, message="{request}", request=d) sampling_params: SamplingParams = None pooling_params: PoolingParams = None metrics: RequestMetrics = None @@ -352,8 +350,10 @@ def from_dict(cls, d: dict): ImagePosition(**mm_pos) if not isinstance(mm_pos, ImagePosition) else mm_pos ) except Exception as e: - data_processor_logger.error( - f"Convert mm_positions to ImagePosition error: {e}, {str(traceback.format_exc())}" + log_request_error( + message="Convert mm_positions to ImagePosition error: {error}, {traceback}", + error=str(e), + traceback=traceback.format_exc(), ) return cls( request_id=d["request_id"], diff --git a/fastdeploy/entrypoints/api_server.py b/fastdeploy/entrypoints/api_server.py index 4f4d7f2250c..0e0b59da9f7 100644 --- a/fastdeploy/entrypoints/api_server.py +++ b/fastdeploy/entrypoints/api_server.py @@ -23,6 +23,7 @@ from fastdeploy.engine.args_utils import EngineArgs from fastdeploy.engine.engine import LLMEngine +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.utils import ( FlexibleArgumentParser, api_server_logger, @@ -61,7 +62,7 @@ async def generate(request: dict): """ generate stream api """ - api_server_logger.info(f"Receive request: {request}") + log_request(level=3, message="Receive request: {request}", request=request) stream = request.get("stream", 0) if not stream: @@ -72,7 +73,7 @@ async def generate(request: dict): output = result except Exception as e: # 记录完整的异常堆栈信息 - api_server_logger.error(f"Error during generation: {e!s}", exc_info=True) + log_request_error(message="Error during generation: {error}", error=str(e)) # 返回结构化的错误消息并终止流 output = {"error": str(e), "error_type": e.__class__.__name__} return output @@ -84,7 +85,7 @@ async def event_generator(): yield f"data: {json.dumps(result)}\n\n" except Exception as e: # 记录完整的异常堆栈信息 - api_server_logger.error(f"Error during generation: {e!s}", exc_info=True) + log_request_error(message="Error during generation: {error}", error=str(e)) # 返回结构化的错误消息并终止流 error_msg = {"error": str(e), "error_type": e.__class__.__name__} yield f"data: {json.dumps(error_msg)}\n\n" diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 4c56e9bcd76..4b8c47caa9a 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -49,6 +49,7 @@ RearrangeExpertStatus, ZmqIpcClient, ) +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.platforms import current_platform from fastdeploy.trace.constants import LoggingEventName @@ -362,12 +363,13 @@ async def add_requests(self, task): if "messages" in task: task["messages"] = None - api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}") main_process_metrics.request_params_max_tokens.observe(task["max_tokens"]) main_process_metrics.prompt_tokens_total.inc(input_ids_len) main_process_metrics.request_prompt_tokens.observe(input_ids_len) except Exception as e: - api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}") + log_request_error( + message="add_requests error: {error}, {traceback}", error=e, traceback=traceback.format_exc() + ) raise EngineError(str(e), error_code=400) if input_ids_len + min_tokens >= self.max_model_len: @@ -375,14 +377,14 @@ async def add_requests(self, task): f"Input text is too long, input_ids_len ({input_ids_len}) " f"+ min_tokens({min_tokens}) >= max_model_len({self.max_model_len})" ) - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) raise EngineError(error_msg, error_code=400) if input_ids_len > self.max_model_len: error_msg = ( f"Length of input token({input_ids_len}) exceeds the limit max_model_len({self.max_model_len})." ) - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) raise EngineError(error_msg, error_code=400) if "stop_seqs_len" in task and task["stop_seqs_len"]: @@ -393,7 +395,7 @@ async def add_requests(self, task): f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})." "Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`" ) - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) raise EngineError(error_msg, error_code=400) stop_seqs_max_len = envs.FD_STOP_SEQS_MAX_LEN for single_stop_seq_len in stop_seqs_len: @@ -402,18 +404,24 @@ async def add_requests(self, task): f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})." "Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`" ) - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) raise EngineError(error_msg, error_code=400) task["metrics"]["preprocess_end_time"] = time.time() preprocess_cost_time = task["metrics"]["preprocess_end_time"] - task["metrics"]["preprocess_start_time"] - api_server_logger.info( - f"Cache request with request_id ({task.get('request_id')}), " - f"preprocess time cost {preprocess_cost_time}" + log_request( + level=1, + message="Cache request with request_id ({request_id}), preprocess time cost {preprocess_cost_time}", + request_id=task.get("request_id"), + preprocess_cost_time=preprocess_cost_time, ) self.valid_parameters(task) - api_server_logger.debug(f"Receive task: {task}") + log_request( + level=3, + message="Receive task: {task}", + task=task, + ) n = task.get("n", 1) try: request_id_idx = task.get("request_id") @@ -433,7 +441,9 @@ async def add_requests(self, task): tracing.TraceSpanName.PREPROCESSING, task.get("request_id").split("_")[0], thread_finish_flag=True ) except Exception as e: - api_server_logger.error(f"zmq_client send task error: {e}, {str(traceback.format_exc())}") + log_request_error( + message="zmq_client send task error: {error}, {traceback}", error=e, traceback=traceback.format_exc() + ) raise EngineError(str(e), error_code=400) def _send_task(self, task): @@ -455,8 +465,11 @@ def valid_parameters(self, data): if data.get("max_tokens") is not None: if data["max_tokens"] < 1 or data["max_tokens"] >= self.max_model_len: - api_server_logger.error( - f"req_id:{data['request_id']}, max_tokens must be defined [1, {self.max_model_len}), but now it's {data['max_tokens']}." + log_request_error( + message="req_id:{request_id}, max_tokens must be defined [1, {max_model_len}), but now it's {max_tokens}.", + request_id=data["request_id"], + max_model_len=self.max_model_len, + max_tokens=data["max_tokens"], ) raise ValueError( f"max_tokens can be defined [1, {self.max_model_len}), but now it's {data['max_tokens']}." @@ -467,14 +480,18 @@ def valid_parameters(self, data): raise ParameterError("reasoning_max_tokens", "reasoning_max_tokens must be greater than 0") if data["reasoning_max_tokens"] > data["max_tokens"]: data["reasoning_max_tokens"] = data["max_tokens"] - api_server_logger.warning( - f"req_id: {data['request_id']}, reasoning_max_tokens exceeds max_tokens, the value of reasoning_max_tokens will be adjusted to {data['max_tokens']}" + log_request( + level=1, + message="req_id: {request_id}, reasoning_max_tokens exceeds max_tokens, the value of reasoning_max_tokens will be adjusted to {max_tokens}", + request_id=data["request_id"], + max_tokens=data["max_tokens"], ) if data.get("reasoning_effort") is not None: data["reasoning_max_tokens"] = None - api_server_logger.warning( - f"req_id: {data['request_id']}, reasoning_max_tokens and reasoning_effort are both set, " - f"enable_thinking will be disabled." + log_request( + level=1, + message="req_id: {request_id}, reasoning_max_tokens and reasoning_effort are both set, enable_thinking will be disabled.", + request_id=data["request_id"], ) if data.get("response_max_tokens") is not None: @@ -493,7 +510,7 @@ def valid_parameters(self, data): is_chat = True if not self.enable_logprob: err_msg = "Logprobs is disabled, please enable it in startup config." - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) raise ParameterError("logprobs", err_msg) top_logprobs = data.get("top_logprobs") elif isinstance(logprobs, int): @@ -506,11 +523,17 @@ def valid_parameters(self, data): max_logprobs = self.ori_vocab_size if max_logprobs < -1: err_msg = f"Invalid 'max_logprobs': must be >= -1, got {max_logprobs}." - api_server_logger.error(err_msg) + log_request_error( + message="Invalid 'max_logprobs': must be >= -1, got {max_logprobs}.", max_logprobs=max_logprobs + ) raise ValueError("max_logprobs", err_msg) if max_logprobs > self.ori_vocab_size: err_msg = f"Invalid 'max_logprobs': must be <= vocab_size {self.ori_vocab_size}, got {max_logprobs}." - api_server_logger.error(err_msg) + log_request_error( + message="Invalid 'max_logprobs': must be <= vocab_size {ori_vocab_size}, got {max_logprobs}.", + ori_vocab_size=self.ori_vocab_size, + max_logprobs=max_logprobs, + ) raise ValueError("max_logprobs", err_msg) prompt_logprobs = data.get("prompt_logprobs", None) @@ -518,41 +541,52 @@ def valid_parameters(self, data): if prompt_logprobs is not None: if not self.enable_logprob: err_msg = "`enable_logprob` is disabled, please enable it in startup config." - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) raise ParameterError("prompt_logprobs", err_msg) if not envs.FD_USE_GET_SAVE_OUTPUT_V1: err_msg = "prompt_logprobs is not support when FD_USE_GET_SAVE_OUTPUT_V1 is disabled." - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) raise ParameterError("prompt_logprobs", err_msg) if self.enable_prefix_caching: err_msg = "prompt_logprobs is not support when prefix caching is enabled." - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) raise ParameterError("prompt_logprobs", err_msg) if prompt_logprobs == -1 and self.ori_vocab_size > max_logprobs: err_msg = f"The requested value of ({self.ori_vocab_size}) for prompt_logprobs (-1) exceeds the maximum allowed value of ({max_logprobs})" - api_server_logger.error(err_msg) + log_request_error( + message="The requested value of ({ori_vocab_size}) for prompt_logprobs (-1) exceeds the maximum allowed value of ({max_logprobs})", + ori_vocab_size=self.ori_vocab_size, + max_logprobs=max_logprobs, + ) raise ValueError("prompt_logprobs", err_msg) if prompt_logprobs < -1: err_msg = ( f"prompt_logprobs must be a non-negative value or -1; the current value is {prompt_logprobs}." ) - api_server_logger.error(err_msg) + log_request_error( + message="prompt_logprobs must be a non-negative value or -1; the current value is {prompt_logprobs}.", + prompt_logprobs=prompt_logprobs, + ) raise ValueError("prompt_logprobs", err_msg) if prompt_logprobs > max_logprobs: err_msg = f"Number of prompt_logprobs requested ({prompt_logprobs}) exceeds maximum allowed value ({max_logprobs})." - api_server_logger.error(err_msg) + log_request_error( + message="Number of prompt_logprobs requested ({prompt_logprobs}) exceeds maximum allowed value ({max_logprobs}).", + prompt_logprobs=prompt_logprobs, + max_logprobs=max_logprobs, + ) raise ValueError("prompt_logprobs", err_msg) # enable_logprob if top_logprobs is not None: if not self.enable_logprob: err_msg = "Logprobs is disabled, please enable it in startup config." - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) raise ParameterError("top_logprobs" if is_chat else "logprobs", err_msg) if not isinstance(top_logprobs, int): @@ -560,28 +594,45 @@ def valid_parameters(self, data): err_msg = ( f"Invalid type for {'top_logprobs' if is_chat else 'logprobs'}: expected int but got {err_type}." ) - api_server_logger.error(err_msg) + log_request_error( + message="Invalid type for logprobs: expected int but got {err_type}.", err_type=err_type + ) raise ParameterError("top_logprobs" if is_chat else "logprobs", err_msg) if top_logprobs > max_logprobs: err_msg = f"Number of {'top_logprobs' if is_chat else 'logprobs'} requested ({top_logprobs}) exceeds maximum allowed value ({max_logprobs})." - api_server_logger.error(err_msg) + log_request_error( + message="Number of logprobs requested ({top_logprobs}) exceeds maximum allowed value ({max_logprobs}).", + top_logprobs=top_logprobs, + max_logprobs=max_logprobs, + ) raise ValueError("top_logprobs" if is_chat else "logprobs", err_msg) if not envs.FD_USE_GET_SAVE_OUTPUT_V1: if top_logprobs < 0 or top_logprobs > max_logprobs: err_msg = f"{'top_logprobs' if is_chat else 'logprobs'} must be between 0 and {max_logprobs}; the current value is {top_logprobs}." - api_server_logger.error(err_msg) + log_request_error( + message="logprobs must be between 0 and {max_logprobs}; the current value is {top_logprobs}.", + max_logprobs=max_logprobs, + top_logprobs=top_logprobs, + ) raise ValueError("top_logprobs" if is_chat else "logprobs", err_msg) else: if top_logprobs == -1 and self.ori_vocab_size > max_logprobs: err_msg = f"The requested value of ({self.ori_vocab_size}) for {'top_logprobs' if is_chat else 'logprobs'} (-1) exceeds the maximum allowed value of ({max_logprobs})" - api_server_logger.error(err_msg) + log_request_error( + message="The requested value of ({ori_vocab_size}) for logprobs (-1) exceeds the maximum allowed value of ({max_logprobs})", + ori_vocab_size=self.ori_vocab_size, + max_logprobs=max_logprobs, + ) raise ValueError("top_logprobs" if is_chat else "logprobs", err_msg) if top_logprobs < -1: err_msg = f"{'top_logprobs' if is_chat else 'logprobs'} must be a non-negative value or -1; the current value is {top_logprobs}." - api_server_logger.error(err_msg) + log_request_error( + message="logprobs must be a non-negative value or -1; the current value is {top_logprobs}.", + top_logprobs=top_logprobs, + ) raise ValueError("top_logprobs" if is_chat else "logprobs", err_msg) def check_health(self, time_interval_threashold=30): @@ -617,12 +668,16 @@ async def run_control_method(self, request: ControlRequest): return response except asyncio.TimeoutError: error_response = ControlResponse(request_id, 500, "Timeout waiting for control method response") - api_server_logger.error(f"Control request timed out: {error_response}") + log_request_error(message="Control request timed out: {error_response}", error_response=error_response) return error_response except Exception as e: import traceback - api_server_logger.error(f"Unknown error in control method: {str(e)}\n{traceback.format_exc()}") + log_request_error( + message="Unknown error in control method: {error}\n{traceback}", + error=str(e), + traceback=traceback.format_exc(), + ) error_response = ControlResponse(request_id, 500, str(e)) return error_response @@ -1024,7 +1079,11 @@ async def check_redundant(self, request_dict: dict): async def abort(self, request_id, n=1) -> None: if envs.FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE: - api_server_logger.info(f"abort request_id:{request_id}") + log_request( + level=0, + message="abort request_id: {request_id}", + request_id=request_id, + ) if n <= 0: api_server_logger.warning("Abort function called with non-positive n: %d. No requests aborted.", n) return @@ -1044,7 +1103,11 @@ async def abort(self, request_id, n=1) -> None: } self._send_task(data) - api_server_logger.info("Aborted request(s) %s.", ",".join(request_ids)) + log_request( + level=0, + message="Aborted request(s) {request_ids}.", + request_ids=",".join(request_ids), + ) def process_messages(self, messages): for message in messages: diff --git a/fastdeploy/entrypoints/llm.py b/fastdeploy/entrypoints/llm.py index c452c84c44f..1d18421a55d 100644 --- a/fastdeploy/entrypoints/llm.py +++ b/fastdeploy/entrypoints/llm.py @@ -36,6 +36,7 @@ from fastdeploy.entrypoints.openai.protocol import ChatCompletionToolsParam from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager from fastdeploy.input.utils import validate_model_path +from fastdeploy.logger.request_logger import log_request_error from fastdeploy.utils import ( deprecated_kwargs_warning, llm_logger, @@ -139,7 +140,11 @@ def _receive_output(self): continue self.req_output[request_id].add(result) except Exception as e: - llm_logger.error(f"Unexcepted error happened: {e}, {traceback.format_exc()!s}") + log_request_error( + message="Unexpected error happened: {error}, {traceback}", + error=e, + traceback=traceback.format_exc(), + ) def generate( self, @@ -437,7 +442,11 @@ def _build_sample_logprobs(self, logprobs_lists: LogprobsLists, topk_logprobs: i return result except Exception as e: - llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}, {str(traceback.format_exc())}") + log_request_error( + message="Error building sample logprobs from LogprobsLists: {error}, {traceback}", + error=e, + traceback=traceback.format_exc(), + ) def _build_prompt_logprobs( self, diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index b96d93ab312..3f69d5cf364 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -73,6 +73,7 @@ OpenAIServingCompletion as OpenAIServingCompletionV1, ) from fastdeploy.envs import environment_variables +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.metrics.metrics import get_filtered_metrics from fastdeploy.utils import ( ExceptionHandler, @@ -324,7 +325,10 @@ async def connection_manager(): await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001) yield except asyncio.TimeoutError: - api_server_logger.info(f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}") + log_request( + level=0, + message=f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}", + ) raise HTTPException( status_code=429, detail=f"Too many requests,current max concurrency is {args.max_concurrency}" ) @@ -544,7 +548,7 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request): """ Create a chat completion for the provided prompt and parameters. """ - api_server_logger.debug(f"Chat Received request: {request.model_dump_json()}") + log_request(level=3, message="Chat Received request: {request}", request=request.model_dump_json()) if envs.TRACES_ENABLE: if req.headers: headers = dict(req.headers) @@ -571,7 +575,7 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request): return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream") except HTTPException as e: - api_server_logger.error(f"Error in chat completion: {str(e)}") + log_request_error(message="Error in chat completion: {error}", error=str(e)) return JSONResponse(status_code=e.status_code, content={"detail": e.detail}) @@ -581,7 +585,7 @@ async def create_completion(request: CompletionRequest, req: Request): """ Create a completion for the provided prompt and parameters. """ - api_server_logger.info(f"Completion Received request: {request.model_dump_json()}") + log_request(level=3, message="Completion Received request: {request}", request=request.model_dump_json()) if envs.TRACES_ENABLE: if req.headers: headers = dict(req.headers) diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 3560f3a8aef..8af54c4a744 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -31,6 +31,7 @@ ) from fastdeploy.engine.pooling_params import PoolingParams +from fastdeploy.logger.request_logger import log_request from fastdeploy.worker.output import PromptLogprobs, SpeculateMetrics @@ -763,9 +764,7 @@ def to_dict_for_infer(self, request_id=None): ), "The parameter `raw_request` is not supported now, please use completion api instead." for key, value in self.metadata.items(): req_dict[key] = value - from fastdeploy.utils import api_server_logger - - api_server_logger.warning("The parameter metadata is obsolete.") + log_request(level=1, message="The parameter metadata is obsolete.") for key, value in self.dict().items(): if value is not None: req_dict[key] = value diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index eb106f6550f..021f3dfba16 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -44,6 +44,7 @@ UsageInfo, ) from fastdeploy.entrypoints.openai.response_processors import ChatResponseProcessor +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.trace.constants import LoggingEventName from fastdeploy.trace.trace_logger import print as trace_print @@ -112,14 +113,14 @@ async def create_chat_completion(self, request: ChatCompletionRequest): err_msg = ( f"Only master node can accept completion request, please send request to master node: {self.master_ip}" ) - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) return ErrorResponse(error=ErrorInfo(message=err_msg, type=ErrorType.INTERNAL_ERROR)) if self.models: is_supported, request.model = self.models.is_supported_model(request.model) if not is_supported: err_msg = f"Unsupported model: [{request.model}], support [{', '.join([x.name for x in self.models.model_paths])}] or default" - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) return ErrorResponse( error=ErrorInfo(message=err_msg, type=ErrorType.INTERNAL_ERROR, code=ErrorCode.MODEL_NOT_SUPPORT) ) @@ -129,7 +130,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest): await self.engine_client.semaphore.acquire() else: await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - api_server_logger.info(f"current {self.engine_client.semaphore.status()}") + log_request(level=2, message="semaphore status: {status}", status=self.engine_client.semaphore.status()) if request.request_id is not None: request_id = request.request_id @@ -141,7 +142,11 @@ async def create_chat_completion(self, request: ChatCompletionRequest): request_id = f"chatcmpl-{uuid.uuid4()}" tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy") del request.trace_context - api_server_logger.info(f"create chat completion request: {request_id}") + log_request( + level=0, + message="create chat completion request: {request_id}", + request_id=request_id, + ) prompt_tokens = None max_tokens = None try: @@ -156,14 +161,19 @@ async def create_chat_completion(self, request: ChatCompletionRequest): if isinstance(prompt_token_ids, np.ndarray): prompt_token_ids = prompt_token_ids.tolist() except ParameterError as e: - api_server_logger.error(f"request[{request_id}] generator error: {str(e)}, {e.message}") + log_request_error( + message="request[{request_id}] generator error: {error}, {error_message}", + request_id=request_id, + error=str(e), + error_message=e.message, + ) self.engine_client.semaphore.release() return ErrorResponse( error=ErrorInfo(message=str(e.message), type=ErrorType.INVALID_REQUEST_ERROR, param=e.param) ) except Exception as e: error_msg = f"request[{request_id}] generator error: {str(e)}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) self.engine_client.semaphore.release() return ErrorResponse(error=ErrorInfo(message=error_msg, type=ErrorType.INVALID_REQUEST_ERROR)) @@ -178,12 +188,12 @@ async def create_chat_completion(self, request: ChatCompletionRequest): ) except Exception as e: error_msg = f"request[{request_id}]full generator error: {str(e)}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse(error=ErrorInfo(message=error_msg, type=ErrorType.INTERNAL_ERROR)) except asyncio.CancelledError as e: await self.engine_client.abort(f"{request_id}_0", 1 if request.n is None else request.n) error_msg = f"request[{request_id}_0] client disconnected: {str(e)}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse( error=ErrorInfo(message=error_msg, type=ErrorType.INVALID_REQUEST_ERROR, code=ErrorCode.CLIENT_ABORTED) ) @@ -192,13 +202,13 @@ async def create_chat_completion(self, request: ChatCompletionRequest): f"request[{request_id}] waiting error: {str(e)}, {str(traceback.format_exc())}, " f"max waiting time: {self.max_waiting_time}" ) - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse( error=ErrorInfo(message=error_msg, type=ErrorType.TIMEOUT_ERROR, code=ErrorCode.TIMEOUT) ) def _create_streaming_error_response(self, message: str) -> str: - api_server_logger.error(message) + log_request_error(message="{error_message}", error_message=message) error_response = ErrorResponse(error=ErrorInfo(message=message, type=ErrorType.INTERNAL_ERROR)) return error_response.model_dump_json() @@ -249,7 +259,7 @@ async def chat_completion_stream_generator( choices=[], model=model_name, ) - api_server_logger.info(f"create chat completion request: {request_id}") + log_request(level=0, message="create chat completion request: {request_id}", request_id=request_id) try: dealer, response_queue = await self.engine_client.connection_manager.get_connection( @@ -372,7 +382,12 @@ async def chat_completion_stream_generator( completion_tokens_details=CompletionTokenUsageInfo(reasoning_tokens=0), ) yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n" - api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}") + log_request( + level=0, + message="Chat Streaming response send_idx 0: request_id={request_id}, completion_tokens={completion_tokens}", + request_id=request_id, + completion_tokens=0, + ) first_iteration = False output = res["outputs"] @@ -497,7 +512,13 @@ async def chat_completion_stream_generator( chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" if res["finished"]: - api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") + log_request( + level=0, + message="Chat Streaming response last send: request_id={request_id}, finish_reason={finish_reason}, completion_tokens={completion_tokens}", + request_id=request_id, + finish_reason=choice.finish_reason, + completion_tokens=previous_num_tokens[idx], + ) choices = [] if include_usage: @@ -525,7 +546,7 @@ async def chat_completion_stream_generator( except asyncio.CancelledError as e: await self.engine_client.abort(f"{request_id}_0", 1 if request.n is None else request.n) error_msg = f"request[{request_id}_0] client disconnected: {str(e)}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) except Exception as e: error_data = self._create_streaming_error_response( f"request[{request_id}] generate stream error: {str(e)}, {str(traceback.format_exc())}" @@ -536,7 +557,12 @@ async def chat_completion_stream_generator( tracing.trace_req_finish(request_id) await self.engine_client.connection_manager.cleanup_request(request_id) self.engine_client.semaphore.release() - api_server_logger.info(f"release {request_id} {self.engine_client.semaphore.status()}") + log_request( + level=2, + message="release {request_id} {status}", + request_id=request_id, + status=self.engine_client.semaphore.status(), + ) yield "data: [DONE]\n\n" async def chat_completion_full_generator( @@ -704,7 +730,7 @@ async def chat_completion_full_generator( tracing.trace_req_finish(request_id) await self.engine_client.connection_manager.cleanup_request(request_id) self.engine_client.semaphore.release() - api_server_logger.info(f"release {self.engine_client.semaphore.status()}") + log_request(level=2, message="release {status}", status=self.engine_client.semaphore.status()) num_prompt_tokens = len(prompt_token_ids) num_generated_tokens = sum(previous_num_tokens) @@ -731,7 +757,7 @@ async def chat_completion_full_generator( choices=choices, usage=usage, ) - api_server_logger.info(f"Chat response: {res.model_dump_json()}") + log_request(level=3, message="Chat response: {response}", response=res.model_dump_json()) return res async def _create_chat_completion_choice( @@ -904,7 +930,7 @@ def _build_logprobs_response( except Exception as e: error_msg = f"Error in _build_logprobs_response: {e}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return None def _build_prompt_logprobs( diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index b277576a1fc..44480439b59 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -41,6 +41,7 @@ PromptTokenUsageInfo, UsageInfo, ) +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.trace.constants import LoggingEventName from fastdeploy.trace.trace_logger import print as trace_print from fastdeploy.utils import ( @@ -91,13 +92,13 @@ async def create_completion(self, request: CompletionRequest): err_msg = ( f"Only master node can accept completion request, please send request to master node: {self.master_ip}" ) - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) return ErrorResponse(error=ErrorInfo(message=err_msg, type=ErrorType.INTERNAL_ERROR)) if self.models: is_supported, request.model = self.models.is_supported_model(request.model) if not is_supported: err_msg = f"Unsupported model: [{request.model}], support [{', '.join([x.name for x in self.models.model_paths])}] or default" - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) return ErrorResponse( error=ErrorInfo(message=err_msg, type=ErrorType.INTERNAL_ERROR, code=ErrorCode.MODEL_NOT_SUPPORT) ) @@ -110,7 +111,11 @@ async def create_completion(self, request: CompletionRequest): request_id = f"cmpl-{request.user}-{uuid.uuid4()}" else: request_id = f"cmpl-{uuid.uuid4()}" - api_server_logger.info(f"Initialize request {request_id}: {request}") + log_request( + level=0, + message="Initialize request {request_id}", + request_id=request_id, + ) tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy") del request.trace_context request_prompt_ids = None @@ -148,14 +153,19 @@ async def create_completion(self, request: CompletionRequest): raise ValueError("Prompt type must be one of: str, list[str], list[int], list[list[int]]") except Exception as e: error_msg = f"OpenAIServingCompletion create_completion: {e}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse(error=ErrorInfo(message=error_msg, type=ErrorType.INTERNAL_ERROR)) if request_prompt_ids is not None: request_prompts = request_prompt_ids num_choices = len(request_prompts) * (1 if request.n is None else request.n) - api_server_logger.info(f"Start preprocessing request: req_id={request_id}), num_choices={num_choices}") + log_request( + level=1, + message="Start preprocessing request: req_id={request_id}), num_choices={num_choices}", + request_id=request_id, + num_choices=num_choices, + ) prompt_batched_token_ids = [] prompt_tokens_list = [] max_tokens_list = [] @@ -169,7 +179,7 @@ async def create_completion(self, request: CompletionRequest): f"OpenAIServingCompletion waiting error: {e}, {str(traceback.format_exc())}, " f"max waiting time: {self.max_waiting_time}" ) - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse( error=ErrorInfo(message=error_msg, code=ErrorCode.TIMEOUT, type=ErrorType.TIMEOUT_ERROR) ) @@ -188,14 +198,18 @@ async def create_completion(self, request: CompletionRequest): max_tokens_list.append(current_req_dict.get("max_tokens")) del current_req_dict except ParameterError as e: - api_server_logger.error(f"OpenAIServingCompletion format error: {e}, {e.message}") + log_request_error( + message="OpenAIServingCompletion format error: {error}, {error_message}", + error=e, + error_message=e.message, + ) self.engine_client.semaphore.release() return ErrorResponse( error=ErrorInfo(code="400", message=str(e.message), type="invalid_request", param=e.param) ) except Exception as e: error_msg = f"OpenAIServingCompletion format error: {e}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) self.engine_client.semaphore.release() return ErrorResponse( error=ErrorInfo(message=str(e), code=ErrorCode.INVALID_VALUE, type=ErrorType.INVALID_REQUEST_ERROR) @@ -228,18 +242,18 @@ async def create_completion(self, request: CompletionRequest): error_msg = ( f"OpenAIServingCompletion completion_full_generator error: {e}, {str(traceback.format_exc())}" ) - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse(error=ErrorInfo(message=error_msg, type=ErrorType.INTERNAL_ERROR)) except asyncio.CancelledError as e: await self.engine_client.abort(f"{request_id}_0", num_choices) error_msg = f"request[{request_id}_0] client disconnected: {str(e)}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse( error=ErrorInfo(message=error_msg, type=ErrorType.INVALID_REQUEST_ERROR, code=ErrorCode.CLIENT_ABORTED) ) except Exception as e: error_msg = f"OpenAIServingCompletion create_completion error: {e}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse(error=ErrorInfo(message=error_msg, type=ErrorType.INTERNAL_ERROR)) async def completion_full_generator( @@ -368,10 +382,10 @@ async def completion_full_generator( prompt_tokens_list=prompt_tokens_list, max_tokens_list=max_tokens_list, ) - api_server_logger.info(f"Completion response: {res.model_dump_json()}") + log_request(level=3, message="Completion response: {response}", response=res.model_dump_json()) return res except Exception as e: - api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True) + log_request_error(message="Error in completion_full_generator: {error}", error=e) finally: trace_print(LoggingEventName.POSTPROCESSING_END, request_id, getattr(request, "user", "")) tracing.trace_req_finish(request_id) @@ -514,8 +528,11 @@ async def completion_stream_generator( ], ) yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" - api_server_logger.info( - f"Completion Streaming response send_idx 0: {chunk.model_dump_json()}" + log_request( + level=0, + message="Completion Streaming response send_idx 0: request_id={request_id}, completion_tokens={completion_tokens}", + request_id=request_id, + completion_tokens=0, ) first_iteration[idx] = False @@ -592,8 +609,11 @@ async def completion_stream_generator( if send_idx == 0 and not request.return_token_ids: chunk_temp = chunk chunk_temp.choices = choices - api_server_logger.info( - f"Completion Streaming response send_idx 0: {chunk_temp.model_dump_json()}" + log_request( + level=0, + message="Completion Streaming response send_idx 0: request_id={request_id}, completion_tokens={completion_tokens}", + request_id=request_id, + completion_tokens=output_tokens[idx], ) del chunk_temp @@ -646,14 +666,24 @@ async def completion_stream_generator( metrics=res["metrics"] if request.collect_metrics else None, ) yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n" - api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}") + log_request( + level=0, + message="Completion Streaming response last send: request_id={request_id}, finish_reason={finish_reason}, completion_tokens={completion_tokens}", + request_id=request_id, + finish_reason=chunk.choices[-1].finish_reason if chunk.choices else None, + completion_tokens=output_tokens[idx], + ) except asyncio.CancelledError as e: await self.engine_client.abort(f"{request_id}_0", num_choices) error_msg = f"request[{request_id}_0] client disconnected: {str(e)}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) except Exception as e: - api_server_logger.error(f"Error in completion_stream_generator: {e}, {str(traceback.format_exc())}") + log_request_error( + message="Error in completion_stream_generator: {error}, {traceback}", + error=e, + traceback=traceback.format_exc(), + ) yield f"data: {ErrorResponse(error=ErrorInfo(message=str(e), code='400', type=ErrorType.INTERNAL_ERROR)).model_dump_json(exclude_unset=True)}\n\n" finally: trace_print(LoggingEventName.POSTPROCESSING_END, request_id, getattr(request, "user", "")) @@ -887,7 +917,11 @@ def _build_logprobs_response( ) except Exception as e: - api_server_logger.error(f"Error in _build_logprobs_response: {str(e)}, {str(traceback.format_exc())}") + log_request_error( + message="Error in _build_logprobs_response: {error}, {traceback}", + error=str(e), + traceback=traceback.format_exc(), + ) return None def _build_prompt_logprobs( diff --git a/fastdeploy/entrypoints/openai/serving_embedding.py b/fastdeploy/entrypoints/openai/serving_embedding.py index 25f3f630510..1ca69d14bca 100644 --- a/fastdeploy/entrypoints/openai/serving_embedding.py +++ b/fastdeploy/entrypoints/openai/serving_embedding.py @@ -35,7 +35,7 @@ UsageInfo, ) from fastdeploy.entrypoints.openai.serving_engine import ServeContext, ZmqOpenAIServing -from fastdeploy.utils import api_server_logger +from fastdeploy.logger.request_logger import log_request def _get_embedding( @@ -140,7 +140,12 @@ async def create_embedding(self, request: EmbeddingRequest): @override def _build_response(self, ctx: ServeContext, request_output: dict): """Generate final embedding response""" - api_server_logger.info(f"[{ctx.request_id}] Embedding RequestOutput received:{request_output}") + log_request( + level=2, + message="[{request_id}] Embedding RequestOutput received:{request_output}", + request_id=ctx.request_id, + request_output=request_output, + ) base = PoolingRequestOutput.from_dict(request_output) embedding_res = EmbeddingRequestOutput.from_base(base) diff --git a/fastdeploy/entrypoints/openai/serving_engine.py b/fastdeploy/entrypoints/openai/serving_engine.py index d83f8da1ab4..166bedb4429 100644 --- a/fastdeploy/entrypoints/openai/serving_engine.py +++ b/fastdeploy/entrypoints/openai/serving_engine.py @@ -33,6 +33,7 @@ InvalidParameterException, ) from fastdeploy.envs import FD_SUPPORT_MAX_CONNECTIONS +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.utils import ErrorCode, ErrorType, StatefulSemaphore, api_server_logger RequestT = TypeVar("RequestT") @@ -96,13 +97,19 @@ def _check_supported_model(self, model_name: str) -> tuple[bool, str]: is_supported, adjusted_name = self.models.is_supported_model(model_name) if not is_supported: err_msg = f"Unsupported model: [{model_name}]" - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) return is_supported, adjusted_name + def _log_request_status(self, action: str, request_id: str) -> None: + if action == "Acquire": + log_request(level=2, message="Request schedule start: {request_id}", request_id=request_id) + elif action == "Release": + log_request(level=2, message="Postprocessing end: {request_id}", request_id=request_id) + async def _acquire_semaphore(self, request_id: str) -> bool: """Acquire engine client semaphore with timeout""" try: - api_server_logger.info(f"Acquire request:{request_id} status:{self._get_semaphore().status()}") + self._log_request_status("Acquire", request_id) if self.max_waiting_time < 0: await self._get_semaphore().acquire() else: @@ -111,13 +118,13 @@ async def _acquire_semaphore(self, request_id: str) -> bool: except asyncio.TimeoutError: self._release_semaphore(request_id) error_msg = f"Request waiting timeout, request:{request_id} max waiting time:{self.max_waiting_time}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return False def _release_semaphore(self, request_id: str) -> None: """Release engine client semaphore""" self._get_semaphore().release() - api_server_logger.info(f"Release request:{request_id} status:{self._get_semaphore().status()}") + self._log_request_status("Release", request_id) def _create_error_response( self, @@ -128,7 +135,7 @@ def _create_error_response( ) -> ErrorResponse: """Create standardized error response""" traceback.print_exc() - api_server_logger.error(message) + log_request_error(message="{error_message}", error_message=message) return ErrorResponse(error=ErrorInfo(message=message, type=error_type, code=code, param=param)) def _generate_request_id(self, request: RequestT) -> str: @@ -193,7 +200,11 @@ async def _pipeline(self, ctx: ServeContext) -> Union[Any, ErrorResponse]: request_id = self._generate_request_id(request) ctx.request_id = request_id - api_server_logger.info(f"Initialize request {request_id}: {request}") + log_request( + level=0, + message="Initialize request {request_id}", + request_id=request_id, + ) # Step 2: Semaphore acquisition if not await self._acquire_semaphore(request_id): @@ -252,7 +263,12 @@ async def _preprocess(self, ctx: ServeContext): request_dicts = self._request_to_batch_dicts(ctx) ctx.preprocess_requests = request_dicts for request_dict in request_dicts: - api_server_logger.info(f"batch add request_id: {request_dict['request_id']}, request: {request_dict}") + log_request( + level=2, + message="batch add request_id: {request_id}, request: {request}", + request_id=request_dict["request_id"], + request=request_dict, + ) await self.engine_client.format_and_add_data(request_dict) def _process_chat_template_kwargs(self, request_dict): @@ -283,7 +299,11 @@ async def _prepare_generators(self, ctx: ServeContext) -> AsyncGenerator[dict]: while num_choices > 0: request_output_dicts = await asyncio.wait_for(request_output_queue.get(), timeout=60) for request_output_dict in request_output_dicts: - api_server_logger.debug(f"Received RequestOutput: {request_output_dict}") + log_request( + level=2, + message="Received RequestOutput: {request_output}", + request_output=request_output_dict, + ) if request_output_dict["finished"] is True: num_choices -= 1 yield request_output_dict @@ -301,7 +321,7 @@ def _get_semaphore(self): async def _acquire_semaphore(self, request_id: str) -> bool: """Acquire engine client semaphore with timeout""" try: - api_server_logger.info(f"Acquire request:{request_id} status:{self._get_semaphore().status()}") + self._log_request_status("Acquire", request_id) if self.max_waiting_time < 0: await self._get_semaphore().acquire() else: @@ -310,14 +330,14 @@ async def _acquire_semaphore(self, request_id: str) -> bool: except asyncio.TimeoutError: self._release_semaphore(request_id) error_msg = f"Request waiting timeout, request:{request_id} max waiting time:{self.max_waiting_time}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return False @override def _release_semaphore(self, request_id: str) -> None: """Release engine client semaphore""" self._get_semaphore().release() - api_server_logger.info(f"Release request:{request_id} status:{self._get_semaphore().status()}") + self._log_request_status("Release", request_id) @override def _check_master(self) -> bool: diff --git a/fastdeploy/entrypoints/openai/serving_models.py b/fastdeploy/entrypoints/openai/serving_models.py index fc1507e5f35..3af83214686 100644 --- a/fastdeploy/entrypoints/openai/serving_models.py +++ b/fastdeploy/entrypoints/openai/serving_models.py @@ -24,7 +24,8 @@ ModelList, ModelPermission, ) -from fastdeploy.utils import ErrorType, api_server_logger, get_host_ip +from fastdeploy.logger.request_logger import log_request_error +from fastdeploy.utils import ErrorType, get_host_ip @dataclass @@ -86,7 +87,7 @@ async def list_models(self) -> ModelList: err_msg = ( f"Only master node can accept models request, please send request to master node: {self.master_ip}" ) - api_server_logger.error(err_msg) + log_request_error(message="{err_msg}", err_msg=err_msg) return ErrorResponse(error=ErrorInfo(message=err_msg, type=ErrorType.INTERNAL_ERROR)) model_infos = [ ModelInfo( diff --git a/fastdeploy/entrypoints/openai/serving_reward.py b/fastdeploy/entrypoints/openai/serving_reward.py index cc3ed8a4729..4425a61fa5b 100644 --- a/fastdeploy/entrypoints/openai/serving_reward.py +++ b/fastdeploy/entrypoints/openai/serving_reward.py @@ -28,7 +28,7 @@ UsageInfo, ) from fastdeploy.entrypoints.openai.serving_engine import ServeContext, ZmqOpenAIServing -from fastdeploy.utils import api_server_logger +from fastdeploy.logger.request_logger import log_request class OpenAIServingReward(ZmqOpenAIServing): @@ -77,7 +77,7 @@ async def create_reward(self, request: ChatRewardRequest): response: ChatRewardResponse = None generators: AsyncGenerator[ChatRewardResponse, None] = self.handle(ctx) async for r in generators: - api_server_logger.info(f"engine pooling result:{r}") + log_request(level=2, message="engine pooling result: {result}", result=r) r.data[0].index = idx idx += 1 if response is None or isinstance(r, ErrorResponse): @@ -93,7 +93,12 @@ async def create_reward(self, request: ChatRewardRequest): @override def _build_response(self, ctx: ServeContext, request_output: dict): """Generate final reward response""" - api_server_logger.info(f"[{ctx.request_id}] Reward RequestOutput received:{request_output}") + log_request( + level=2, + message="Reward RequestOutput received: request_id={request_id}, output={request_output}", + request_id=ctx.request_id, + request_output=request_output, + ) base = PoolingRequestOutput.from_dict(request_output) reward_res = RewardRequestOutput.from_base(base) diff --git a/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py b/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py index 4e6cc93cbaa..4cd35d52dd2 100644 --- a/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py +++ b/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py @@ -41,6 +41,7 @@ def random_tool_call_id() -> str: ToolParser, ToolParserManager, ) +from fastdeploy.logger.request_logger import log_request_error from fastdeploy.utils import data_processor_logger @@ -184,7 +185,7 @@ def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest) continue if not function_call_arr: - data_processor_logger.error("No valid tool calls found") + log_request_error(message="No valid tool calls found") return ExtractedToolCallInformation(tools_called=False, content=model_output) tool_calls = [] @@ -226,7 +227,7 @@ def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest) ) except Exception as e: - data_processor_logger.error(f"Error in extracting tool call from response: {str(e)}") + log_request_error(message="Error in extracting tool call from response: {error}", error=str(e)) return ExtractedToolCallInformation(tools_called=False, tool_calls=None, content=model_output) def extract_tool_calls_streaming( @@ -343,7 +344,7 @@ def extract_tool_calls_streaming( ) return delta except Exception as e: - data_processor_logger.error(f"Error in streaming tool call extraction: {str(e)}") + log_request_error(message="Error in streaming tool call extraction: {error}", error=str(e)) return None if "" in self.buffer: end_pos = self.buffer.find("") @@ -354,5 +355,5 @@ def extract_tool_calls_streaming( return delta except Exception as e: - data_processor_logger.error(f"Error in streaming tool call extraction: {str(e)}") + log_request_error(message="Error in streaming tool call extraction: {error}", error=str(e)) return None diff --git a/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py b/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py index f4556a3679f..b5f3727de36 100644 --- a/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py +++ b/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py @@ -41,6 +41,7 @@ def random_tool_call_id() -> str: ToolParser, ToolParserManager, ) +from fastdeploy.logger.request_logger import log_request_error from fastdeploy.utils import data_processor_logger as logger @@ -253,7 +254,9 @@ def extract_tool_calls_streaming( delta = None elif not cur_arguments and prev_arguments: - logger.error("should be impossible to have arguments reset " "mid-call. skipping streaming anything.") + log_request_error( + message="should be impossible to have arguments reset mid-call. skipping streaming anything." + ) delta = None elif cur_arguments and not prev_arguments: diff --git a/fastdeploy/entrypoints/openai/v1/serving_chat.py b/fastdeploy/entrypoints/openai/v1/serving_chat.py index 4337feea6ab..6606d88ee32 100644 --- a/fastdeploy/entrypoints/openai/v1/serving_chat.py +++ b/fastdeploy/entrypoints/openai/v1/serving_chat.py @@ -45,8 +45,8 @@ ServingResponseContext, ) from fastdeploy.input.tokenizer_client import AsyncTokenizerClient, ImageDecodeRequest +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.metrics.metrics import main_process_metrics -from fastdeploy.utils import api_server_logger from fastdeploy.worker.output import LogprobsLists @@ -178,7 +178,7 @@ def _build_logprobs_response( except Exception as e: error_msg = f"Error in _build_logprobs_response: {e}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return None @override @@ -302,7 +302,13 @@ async def _build_stream_response( max_tokens = request.max_completion_tokens or request.max_tokens choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index] choice.finish_reason = self._calc_finish_reason(request_output, max_tokens, choice_completion_tokens) - api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") + log_request( + level=0, + message="Chat Streaming response last send: request_id={request_id}, finish_reason={finish_reason}, completion_tokens={completion_tokens}", + request_id=request_id, + finish_reason=choice.finish_reason, + completion_tokens=choice_completion_tokens, + ) yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" if request_output.finished and response_ctx.remain_choices == 0: @@ -339,7 +345,11 @@ async def _build_full_response( res = ChatCompletionResponse( id=ctx.request_id, model=request.model, choices=choices, created=ctx.created_time, usage=response_ctx.usage ) - api_server_logger.info(f"Chat response: {res.model_dump_json()}") + log_request( + level=3, + message="Chat response: {response}", + response=res.model_dump_json(), + ) return res async def _create_chat_completion_choice( diff --git a/fastdeploy/entrypoints/openai/v1/serving_completion.py b/fastdeploy/entrypoints/openai/v1/serving_completion.py index c8e82eed8a8..7e6e9addf0e 100644 --- a/fastdeploy/entrypoints/openai/v1/serving_completion.py +++ b/fastdeploy/entrypoints/openai/v1/serving_completion.py @@ -38,7 +38,8 @@ ServeContext, ServingResponseContext, ) -from fastdeploy.utils import ErrorType, api_server_logger +from fastdeploy.logger.request_logger import log_request, log_request_error +from fastdeploy.utils import ErrorType from fastdeploy.worker.output import LogprobsLists @@ -94,7 +95,7 @@ async def _preprocess(self, ctx: ServeContext[CompletionRequest]) -> None: raise ValueError("Prompt type must be one of: str, list[str], list[int], list[list[int]]") except Exception as e: error_msg = f"OpenAIServingCompletion create_completion: {e}, {str(traceback.format_exc())}" - api_server_logger.error(error_msg) + log_request_error(message="{error_msg}", error_msg=error_msg) return ErrorResponse(error=ErrorInfo(message=error_msg, type=ErrorType.INTERNAL_ERROR)) if request_prompt_ids is not None: @@ -199,7 +200,11 @@ def _build_logprobs_response( ) except Exception as e: - api_server_logger.error(f"Error in _build_logprobs_response: {str(e)}, {str(traceback.format_exc())}") + log_request_error( + message="Error in _build_logprobs_response: {error}, {traceback}", + error=str(e), + traceback=traceback.format_exc(), + ) return None async def _build_stream_response( @@ -271,9 +276,20 @@ async def _build_stream_response( choice.finish_reason = self._calc_finish_reason( request_output, request.max_tokens, choice_completion_tokens ) - api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}") + log_request( + level=0, + message="Completion Streaming response last send: request_id={request_id}, finish_reason={finish_reason}, completion_tokens={completion_tokens}", + request_id=request_id, + finish_reason=choice.finish_reason, + completion_tokens=choice_completion_tokens, + ) if send_idx == 0 and not request.return_token_ids: - api_server_logger.info(f"Completion Streaming response send_idx 0: {chunk.model_dump_json()}") + log_request( + level=0, + message="Completion Streaming response send_idx 0: request_id={request_id}, completion_tokens={completion_tokens}", + request_id=request_id, + completion_tokens=response_ctx.choice_completion_tokens_dict[output.index], + ) yield f"data: {chunk.model_dump_json()}\n\n" if request_output.finished and response_ctx.remain_choices == 0: if include_usage: @@ -287,7 +303,11 @@ async def _build_stream_response( yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n" yield "data: [DONE]\n\n" except Exception as e: - api_server_logger.error(f"Error in completion_stream_generator: {e}, {str(traceback.format_exc())}") + log_request_error( + message="Error in completion_stream_generator: {error}, {traceback}", + error=e, + traceback=traceback.format_exc(), + ) yield f"data: {ErrorResponse(error=ErrorInfo(message=str(e), code='400', type=ErrorType.INTERNAL_ERROR)).model_dump_json(exclude_unset=True)}\n\n" async def _build_full_response( @@ -321,10 +341,14 @@ async def _build_full_response( choices=choices, usage=response_ctx.usage, ) - api_server_logger.info(f"Completion response: {res.model_dump_json()}") + log_request( + level=3, + message="Completion response: {response}", + response=res.model_dump_json(), + ) return res except Exception as e: - api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True) + log_request_error(message="Error in completion_full_generator: {error}", error=e) return self._create_error_response(str(e)) def build_completion_choice( diff --git a/fastdeploy/input/base_processor.py b/fastdeploy/input/base_processor.py index 357339be766..d991f05868f 100644 --- a/fastdeploy/input/base_processor.py +++ b/fastdeploy/input/base_processor.py @@ -55,6 +55,7 @@ from fastdeploy import envs from fastdeploy.input.utils import process_stop_token_ids +from fastdeploy.logger.request_logger import log_request from fastdeploy.utils import data_processor_logger _SAMPLING_EPS = 1e-5 @@ -164,7 +165,12 @@ def messages2ids(self, request, **kwargs): req_id = request.get("request_id", None) if isinstance(request, dict) else None tokens = self.tokenizer.tokenize(spliced_message) token_ids = self.tokenizer.convert_tokens_to_ids(tokens) - data_processor_logger.info(f"req_id:{req_id}, tokens:{tokens}, token_ids: {token_ids}") + log_request( + level=2, + message="req_id:{req_id}, token_ids: {token_ids}", + req_id=req_id, + token_ids=token_ids, + ) return token_ids # ------------------------------------------------------------------ @@ -359,7 +365,7 @@ def process_response_dict_streaming(self, response_dict, **kwargs): def process_request_dict(self, request, max_model_len=None, **kwargs): """Unified request pre-processing shared by all processors.""" - data_processor_logger.info(f"Start processing request dict: {request}") + log_request(level=2, message="Start processing request dict: {request}", request=request) request = self._apply_default_parameters(request) if not request.get("eos_token_ids"): request["eos_token_ids"] = self.eos_token_ids @@ -450,7 +456,7 @@ def process_request_dict(self, request, max_model_len=None, **kwargs): if request.get("response_max_tokens") is not None and request.get("enable_thinking") is False: request["max_tokens"] = min(request["response_max_tokens"], request["max_tokens"]) - data_processor_logger.info(f"Processed request dict: {request}") + log_request(level=2, message="Processed request dict: {request}", request=request) return request def clear_request_status(self, task_id): @@ -477,7 +483,12 @@ def update_stop_seq(self, stop_sequences): if seq != self.tokenizer.eos_token_id: stop_seqs.append(self.tokenizer.convert_tokens_to_ids(self.tokenizer.tokenize(seq))) stop_seqs, stop_seqs_len = self.pad_batch_data(stop_seqs, pad_id=-1, return_seq_len=True, return_array=False) - data_processor_logger.debug(f"processed stop_seqs: {stop_seqs}, {stop_seqs_len}") + log_request( + level=3, + message="processed stop_seqs: {stop_seqs}, {stop_seqs_len}", + stop_seqs=stop_seqs, + stop_seqs_len=stop_seqs_len, + ) return stop_seqs, stop_seqs_len # ------------------------------------------------------------------ @@ -603,14 +614,20 @@ def update_bad_words(self, bad_words, bad_words_token_ids): prompt_token_ids = self.tokenizer.convert_tokens_to_ids(self.tokenizer.tokenize(prompt)) if len(prompt_token_ids) != 1: if not add_prefix_space: - data_processor_logger.warning( - f"bad_words: '{prompt}' tokenises to {len(prompt_token_ids)} tokens, skipping" + log_request( + level=1, + message="bad_words: '{prompt}' tokenises to {num_tokens} tokens, skipping", + prompt=prompt, + num_tokens=len(prompt_token_ids), ) continue if prompt_token_ids[0] > self.tokenizer.vocab_size: if not add_prefix_space: - data_processor_logger.warning( - f"bad_words: '{prompt}' token id {prompt_token_ids[0]} > vocab_size, skipping" + log_request( + level=1, + message="bad_words: '{prompt}' token id {token_id} > vocab_size, skipping", + prompt=prompt, + token_id=prompt_token_ids[0], ) continue if prompt_token_ids not in token_ids: diff --git a/fastdeploy/input/ernie4_5_vl_processor/ernie4_5_vl_processor.py b/fastdeploy/input/ernie4_5_vl_processor/ernie4_5_vl_processor.py index cfc394d463c..f675927fb10 100644 --- a/fastdeploy/input/ernie4_5_vl_processor/ernie4_5_vl_processor.py +++ b/fastdeploy/input/ernie4_5_vl_processor/ernie4_5_vl_processor.py @@ -22,6 +22,7 @@ from fastdeploy.input.ernie4_5_processor import Ernie4_5Processor from fastdeploy.input.utils import IDS_TYPE_FLAG, process_stop_token_ids +from fastdeploy.logger.request_logger import log_request from fastdeploy.utils import data_processor_logger from .process import DataProcessor @@ -294,7 +295,7 @@ def process_request_dict(self, request, max_model_len=None): if request.get("response_max_tokens") is not None and request.get("enable_thinking") is False: request["max_tokens"] = min(request["response_max_tokens"], request["max_tokens"]) - data_processor_logger.info(f"Processed request {request}") + log_request(level=2, message="Processed request: {request}", request=request) return request def append_completion_tokens(self, multimodal_inputs, completion_token_ids): diff --git a/fastdeploy/input/qwen3_vl_processor/qwen3_vl_processor.py b/fastdeploy/input/qwen3_vl_processor/qwen3_vl_processor.py index cc0110e1e1d..2a9d629770d 100644 --- a/fastdeploy/input/qwen3_vl_processor/qwen3_vl_processor.py +++ b/fastdeploy/input/qwen3_vl_processor/qwen3_vl_processor.py @@ -17,6 +17,7 @@ import numpy as np from fastdeploy.input.text_processor import DataProcessor as TextProcessor +from fastdeploy.logger.request_logger import log_request from fastdeploy.utils import data_processor_logger from .process import DataProcessor @@ -261,7 +262,7 @@ def process_request_dict(self, request, max_model_len=None): request["max_tokens"] = max(1, max_tokens) else: request["max_tokens"] = min(max_tokens, request["max_tokens"]) - data_processor_logger.info(f"Processed request {request}") + log_request(level=2, message="Processed request: {request}", request=request) return request diff --git a/fastdeploy/input/qwen_vl_processor/qwen_vl_processor.py b/fastdeploy/input/qwen_vl_processor/qwen_vl_processor.py index 88bc5c76938..98723aa55b5 100644 --- a/fastdeploy/input/qwen_vl_processor/qwen_vl_processor.py +++ b/fastdeploy/input/qwen_vl_processor/qwen_vl_processor.py @@ -18,6 +18,7 @@ from fastdeploy.input.text_processor import DataProcessor as TextProcessor from fastdeploy.input.utils import process_stop_token_ids +from fastdeploy.logger.request_logger import log_request from fastdeploy.utils import data_processor_logger from .process import DataProcessor @@ -262,7 +263,7 @@ def process_request_dict(self, request, max_model_len=None): else: self.model_status_dict[request["request_id"]] = model_status request["enable_thinking"] = model_status == "think_start" - data_processor_logger.info(f"Processed request {request}") + log_request(level=2, message="Processed request: {request}", request=request) return request diff --git a/fastdeploy/input/tokenizer_client.py b/fastdeploy/input/tokenizer_client.py index 1bee41cbc0f..97f62a63f2a 100644 --- a/fastdeploy/input/tokenizer_client.py +++ b/fastdeploy/input/tokenizer_client.py @@ -20,6 +20,7 @@ import httpx from pydantic import BaseModel, HttpUrl +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.utils import data_processor_logger @@ -161,7 +162,12 @@ async def _async_encode_request(self, type: str, request: dict): except httpx.RequestError as e: # Network error, keep polling - data_processor_logger.debug(f"Request error while polling tokenize task {task_tag}: {e}") + log_request( + level=3, + message="Request error while polling tokenize task {task_tag}: {error}", + task_tag=task_tag, + error=str(e), + ) # 超时检测 if asyncio.get_event_loop().time() - start_time > self.max_wait: @@ -193,10 +199,15 @@ async def _async_decode_request(self, type: str, request: dict): raise RuntimeError(f"Tokenize task creation failed, {resp.json().get('message')}") return resp.json().get("result") except Exception as e: - data_processor_logger.error(f"Attempt to decode_request {attempt + 1} failed: {e}") + log_request_error( + message="Attempt to decode_request {attempt} failed: {error}", + attempt=attempt + 1, + error=str(e), + ) if attempt == self.max_retries - 1: - data_processor_logger.error( - f"Max retries of decode_request reached. Giving up. request is {request}" + log_request_error( + message="Max retries of decode_request reached. Giving up. request is {request}", + request=str(request), ) await asyncio.sleep(1) except httpx.RequestError as e: diff --git a/fastdeploy/logger/config.py b/fastdeploy/logger/config.py new file mode 100644 index 00000000000..caa8515dfd4 --- /dev/null +++ b/fastdeploy/logger/config.py @@ -0,0 +1,47 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +日志配置解析模块 +""" + +import os + + +def resolve_log_level(raw_level=None, debug_enabled=None) -> str: + """ + 解析日志级别配置 + + 优先级: FD_LOG_LEVEL > FD_DEBUG + """ + raw = os.getenv("FD_LOG_LEVEL") if raw_level is None else raw_level + # 处理 None 或字符串 "None" 的情况 + if raw and str(raw).upper() != "NONE": + level = raw.upper() + if level not in {"INFO", "DEBUG"}: + raise ValueError(f"Unsupported FD_LOG_LEVEL: {raw}") + return level + debug = os.getenv("FD_DEBUG", "0") if debug_enabled is None else str(debug_enabled) + return "DEBUG" if debug == "1" else "INFO" + + +def resolve_request_logging_defaults() -> dict[str, int]: + """ + 解析 request 日志的默认配置 + """ + return { + "enabled": int(os.getenv("FD_LOG_REQUESTS", "1")), + "level": int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")), + "max_len": int(os.getenv("FD_LOG_MAX_LEN", "2048")), + } diff --git a/fastdeploy/logger/logger.py b/fastdeploy/logger/logger.py index e50b484b90c..e1d159a6a59 100644 --- a/fastdeploy/logger/logger.py +++ b/fastdeploy/logger/logger.py @@ -50,7 +50,7 @@ def _initialize(self): setup_logging() self._initialized = True - def get_logger(self, name, file_name=None, without_formater=False, print_to_console=False): + def get_logger(self, name, file_name=None, without_formater=False, print_to_console=False, channel=None): """ Get logger (compatible with the original interface) @@ -59,7 +59,14 @@ def get_logger(self, name, file_name=None, without_formater=False, print_to_cons file_name: Log file name (for compatibility) without_formater: Whether to not use a formatter print_to_console: Whether to print to console + channel: Log channel (main, request, console) """ + # 如果指定了 channel,使用通道化日志 + if channel is not None: + if not self._initialized: + self._initialize() + return self._get_channel_logger(name, channel) + # If only one parameter is provided, use the new unified naming convention if file_name is None and not without_formater and not print_to_console: # Lazy initialization @@ -74,8 +81,21 @@ def _get_unified_logger(self, name): """ New unified way to get logger """ + return self._get_channel_logger(name, "main") + + def _get_channel_logger(self, name, channel): + """ + 通过通道获取 logger + + Args: + name: logger 名称 + channel: 日志通道 (main, request, console) + """ if name is None: - return logging.getLogger("fastdeploy") + return logging.getLogger(f"fastdeploy.{channel}") + + if name == "fastdeploy": + return logging.getLogger(f"fastdeploy.{channel}") # Handle __main__ special case if name == "__main__": @@ -86,15 +106,14 @@ def _get_unified_logger(self, name): # Get the main module file name base_name = Path(__main__.__file__).stem # Create logger with prefix - return logging.getLogger(f"fastdeploy.main.{base_name}") - return logging.getLogger("fastdeploy.main") + return logging.getLogger(f"fastdeploy.{channel}.{base_name}") + return logging.getLogger(f"fastdeploy.{channel}") # If already in fastdeploy namespace, use directly - if name.startswith("fastdeploy.") or name == "fastdeploy": + if name.startswith("fastdeploy."): return logging.getLogger(name) - else: - # Add fastdeploy prefix for other cases - return logging.getLogger(f"fastdeploy.{name}") + + return logging.getLogger(f"fastdeploy.{channel}.{name}") def get_trace_logger(self, name, file_name, without_formater=False, print_to_console=False): """ diff --git a/fastdeploy/logger/request_logger.py b/fastdeploy/logger/request_logger.py new file mode 100644 index 00000000000..ebeada65425 --- /dev/null +++ b/fastdeploy/logger/request_logger.py @@ -0,0 +1,90 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Request 日志模块 + +提供分级的 request 日志记录功能,支持 L0-L3 四个级别: +- L0: 关键请求事件(创建、完成、释放) +- L1: 请求处理细节 +- L2: 请求内容(会被截断) +- L3: 完整响应内容 +""" + +from enum import IntEnum + +from fastdeploy import envs +from fastdeploy.logger.logger import FastDeployLogger + + +class RequestLogLevel(IntEnum): + """Request 日志级别""" + + L0 = 0 # 关键事件 + L1 = 1 # 处理细节 + L2 = 2 # 请求内容(截断) + L3 = 3 # 完整响应 + + +_request_logger = FastDeployLogger().get_logger("request", channel="request") + + +def _should_log(level: int) -> bool: + """判断是否应该记录该级别的日志""" + if int(envs.FD_LOG_REQUESTS) == 0: + return False + return int(level) <= int(envs.FD_LOG_REQUESTS_LEVEL) + + +def _truncate(value): + """截断超长内容""" + text = str(value) + max_len = int(envs.FD_LOG_MAX_LEN) + if len(text) <= max_len: + return text + return text[:max_len] + + +def log_request(level: int, message: str, **fields): + """ + 记录 request 日志 + + Args: + level: 日志级别 (0-3) + message: 日志消息模板,支持 {field} 格式化 + **fields: 消息字段 + """ + if not _should_log(level): + return + + payload = fields + # L2 级别的内容需要截断 + if int(level) == int(RequestLogLevel.L2): + payload = {key: _truncate(value) for key, value in fields.items()} + + _request_logger.info(message.format(**payload), stacklevel=2) + + +def log_request_error(message: str, **fields): + """ + 记录 request 错误日志 + + Args: + message: 日志消息模板,支持 {field} 格式化 + **fields: 消息字段 + """ + if int(envs.FD_LOG_REQUESTS) == 0: + return + + _request_logger.error(message.format(**fields), stacklevel=2) diff --git a/fastdeploy/logger/setup_logging.py b/fastdeploy/logger/setup_logging.py index e90f215fc90..36ffeae4fc3 100644 --- a/fastdeploy/logger/setup_logging.py +++ b/fastdeploy/logger/setup_logging.py @@ -15,6 +15,11 @@ """ 配置日志系统 + +日志通道划分: +- fastdeploy.main: 主日志,输出到 fastdeploy.log 和 console.log +- fastdeploy.request: 请求日志,输出到 request.log +- fastdeploy.console: 控制台日志,输出到 console.log 和终端 """ import json @@ -24,6 +29,7 @@ from pathlib import Path from fastdeploy import envs +from fastdeploy.logger.config import resolve_log_level class MaxLevelFilter(logging.Filter): @@ -40,36 +46,14 @@ def filter(self, record): return record.levelno < self.level -def setup_logging(log_dir=None, config_file=None): - """ - 设置FastDeploy的日志配置 +def _build_default_config(log_dir, log_level, backup_count): + """构建默认的日志配置""" + fmt = "%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s" - Args: - log_dir: 日志文件存储目录,如果不提供则使用环境变量 - config_file: JSON配置文件路径,如果不提供则使用默认配置 - """ + # main 通道的 handlers + main_handlers = ["main_file", "console_file", "console_stdout", "error_file", "console_stderr"] - # 避免重复配置 - if getattr(setup_logging, "_configured", False): - return logging.getLogger("fastdeploy") - - # 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值 - if log_dir is None: - log_dir = getattr(envs, "FD_LOG_DIR", "log") - - # 确保日志目录存在 - Path(log_dir).mkdir(parents=True, exist_ok=True) - - # 从环境变量获取日志级别和备份数量 - is_debug = int(getattr(envs, "FD_DEBUG", 0)) - FASTDEPLOY_LOGGING_LEVEL = "DEBUG" if is_debug else "INFO" - backup_count = int(getattr(envs, "FD_LOG_BACKUP_COUNT", 7)) - - # 定义日志输出格式 - _FORMAT = "%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s" - - # 默认配置 - default_config = { + return { "version": 1, "disable_existing_loggers": False, "filters": { @@ -81,12 +65,12 @@ def setup_logging(log_dir=None, config_file=None): "formatters": { "standard": { "class": "logging.Formatter", - "format": _FORMAT, + "format": fmt, "datefmt": "%Y-%m-%d %H:%M:%S", }, "colored": { "class": "fastdeploy.logger.formatters.ColoredFormatter", - "format": _FORMAT, + "format": fmt, "datefmt": "%Y-%m-%d %H:%M:%S", }, }, @@ -94,7 +78,7 @@ def setup_logging(log_dir=None, config_file=None): # 控制台标准输出,用于 INFO/DEBUG(低于 ERROR 级别) "console_stdout": { "class": "logging.StreamHandler", - "level": FASTDEPLOY_LOGGING_LEVEL, + "level": log_level, "filters": ["below_error"], "formatter": "colored", "stream": "ext://sys.stdout", @@ -106,75 +90,106 @@ def setup_logging(log_dir=None, config_file=None): "formatter": "colored", "stream": "ext://sys.stderr", }, - # 默认错误日志,保留最新1个小时的日志,位置在log/error.log - "error_file": { - "class": "logging.handlers.TimedRotatingFileHandler", - "level": "ERROR", + # 主日志文件 + "main_file": { + "class": "fastdeploy.logger.handlers.LazyFileHandler", + "level": log_level, "formatter": "standard", - "filename": os.path.join(log_dir, "error.log"), - "when": "H", - "interval": 1, - "backupCount": 1, + "filename": os.path.join(log_dir, "fastdeploy.log"), + "backupCount": backup_count, }, - # 全量日志,保留最新1小时的日志,位置在log/default.log - "default_file": { - "class": "logging.handlers.TimedRotatingFileHandler", - "level": FASTDEPLOY_LOGGING_LEVEL, + # 控制台日志文件 + "console_file": { + "class": "fastdeploy.logger.handlers.LazyFileHandler", + "level": log_level, "formatter": "standard", - "filename": os.path.join(log_dir, "default.log"), - "when": "H", - "interval": 1, - "backupCount": 1, + "filename": os.path.join(log_dir, "console.log"), + "backupCount": backup_count, }, - # 错误日志归档,保留7天内的日志,每隔1小时一个文件,形式如:FastDeploy/log/2025-08-14/error_2025-08-14-18.log - "error_archive": { - "class": "fastdeploy.logger.handlers.IntervalRotatingFileHandler", - "level": "ERROR", + # 请求日志文件 + "request_file": { + "class": "fastdeploy.logger.handlers.LazyFileHandler", + "level": log_level, "formatter": "standard", - "filename": os.path.join(log_dir, "error.log"), - "backupDays": 7, - "interval": 1, - "encoding": "utf-8", + "filename": os.path.join(log_dir, "request.log"), + "backupCount": backup_count, }, - # 全量日志归档,保留7天内的日志,每隔1小时一个文件,形式如:FastDeploy/log/2025-08-14/default_2025-08-14-18.log - "default_archive": { - "class": "fastdeploy.logger.handlers.IntervalRotatingFileHandler", - "level": FASTDEPLOY_LOGGING_LEVEL, + # 错误日志文件 + "error_file": { + "class": "fastdeploy.logger.handlers.LazyFileHandler", + "level": "ERROR", "formatter": "standard", - "filename": os.path.join(log_dir, "default.log"), - "backupDays": 7, - "interval": 1, - "encoding": "utf-8", + "filename": os.path.join(log_dir, "error.log"), + "backupCount": backup_count, }, }, "loggers": { - # 默认日志记录器,全局共享 + # 默认日志记录器 "fastdeploy": { "level": "DEBUG", - "handlers": [ - "console_stdout", - "console_stderr", - "error_file", - "default_file", - "error_archive", - "default_archive", - ], + "handlers": main_handlers, "propagate": False, - } + }, + # main 通道 + "fastdeploy.main": { + "level": "DEBUG", + "handlers": main_handlers, + "propagate": False, + }, + # request 通道 - 只输出到 request.log 和 error.log + "fastdeploy.request": { + "level": "DEBUG", + "handlers": ["request_file", "error_file", "console_stderr"], + "propagate": False, + }, + # console 通道 - 输出到 console.log 和终端 + "fastdeploy.console": { + "level": "DEBUG", + "handlers": ["console_file", "console_stdout", "error_file", "console_stderr"], + "propagate": False, + }, }, } + +def setup_logging(log_dir=None, config_file=None): + """ + 设置FastDeploy的日志配置 + + Args: + log_dir: 日志文件存储目录,如果不提供则使用环境变量 + config_file: JSON配置文件路径,如果不提供则使用默认配置 + """ + + # 避免重复配置 + if getattr(setup_logging, "_configured", False): + return logging.getLogger("fastdeploy") + + # 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值 + if log_dir is None: + log_dir = getattr(envs, "FD_LOG_DIR", "log") + + # 确保日志目录存在 + Path(log_dir).mkdir(parents=True, exist_ok=True) + + # 解析日志级别 + log_level = resolve_log_level(getattr(envs, "FD_LOG_LEVEL", None), getattr(envs, "FD_DEBUG", 0)) + backup_count = int(getattr(envs, "FD_LOG_BACKUP_COUNT", 7)) + + # 构建默认配置 + default_config = _build_default_config(log_dir, log_level, backup_count) + # 如果提供了配置文件,则加载配置文件 if config_file and os.path.exists(config_file): with open(config_file, "r", encoding="utf-8") as f: config = json.load(f) - # 合并环境变量配置到用户配置中,环境变量的优先级高于自定义的优先级 + # 合并环境变量配置到用户配置中 if "handlers" in config: - for handler_name, handler_config in config["handlers"].items(): + for handler_config in config["handlers"].values(): if "backupCount" not in handler_config and "DailyRotating" in handler_config.get("class", ""): handler_config["backupCount"] = backup_count - if handler_config.get("level") == "INFO" and is_debug: + if handler_config.get("level") == "INFO" and log_level == "DEBUG": handler_config["level"] = "DEBUG" else: config = default_config diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 85e54647b7e..a740fdad28a 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -40,6 +40,7 @@ SpeculateMetrics, ) from fastdeploy.inter_communicator import ZmqIpcServer +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.platforms import current_platform from fastdeploy.spec_decode import SpecMethod @@ -224,7 +225,11 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: for token_id in token_id_list: recovery_stop = token_id == RECOVERY_STOP_SIGNAL if recovery_stop: - llm_logger.info(f"recovery stop signal found at task {task_id}") + log_request( + level=1, + message="recovery stop signal found at task {request_id}", + request_id=task_id, + ) self.tokens_counter[task_id] += 1 if token_id != RECOVERY_STOP_SIGNAL: result.outputs.token_ids.append(token_id) @@ -252,12 +257,25 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: # Print combined log with all required information ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0 - llm_logger.info( - f"Request={task_id}, InputToken={task.prompt_token_ids_len}, " - f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, " - f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, " - f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " - f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}" + log_request( + level=0, + message=( + "Request={request_id}, InputToken={input_tokens}, " + "CachedDetail={cached_detail}, OutputToken={output_tokens}, " + "TokenRatio={token_ratio}, TTFT={ttft}, " + "E2E={e2e_time}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " + "PreemptedCount={preempted_count}" + ), + request_id=task_id, + input_tokens=task.prompt_token_ids_len, + cached_detail=cached_detail, + output_tokens=self.tokens_counter[task_id], + token_ratio=f"{token_ratio:.2f}", + ttft=f"{ttft:.2f}", + e2e_time=f"{e2e_time:.2f}", + is_prefill=is_prefill, + recovery_stop=recovery_stop, + preempted_count=getattr(task.metrics, "preempted_count", 0), ) main_process_metrics.request_token_ratio.observe(token_ratio) @@ -289,13 +307,21 @@ def _process_batch_output_use_zmq(self, receive_datas): task_id in self.resource_manager.to_be_aborted_req_id_set and token_ids[-1] == PREEMPTED_TOKEN_ID ): - llm_logger.info(f"start to recycle abort request_id {task_id}") + log_request( + level=0, + message="start to recycle abort request_id {request_id}", + request_id=task_id, + ) self.resource_manager.recycle_abort_task(task_id) if ( task_id in self.resource_manager.to_be_rescheduled_request_id_set and token_ids[-1] == PREEMPTED_TOKEN_ID ): - llm_logger.info(f"sync preemption for request_id {task_id} done.") + log_request( + level=0, + message="sync preemption for request_id {request_id} done.", + request_id=task_id, + ) self.resource_manager.reschedule_preempt_task(task_id) continue if self.cfg.scheduler_config.splitwise_role == "decode": @@ -351,12 +377,20 @@ def _process_batch_output_use_zmq(self, receive_datas): result.outputs.logprob = float(logprobs_list.logprobs[0][0]) result.outputs.top_logprobs = logprobs_list except Exception as e: - llm_logger.warning(f"Failed to parse logprobs from StreamTransferData: {e}") + log_request( + level=1, + message="Failed to parse logprobs from StreamTransferData: {error}", + error=str(e), + ) if getattr(stream_data, "prompt_logprobs", None) is not None: try: result.prompt_logprobs = stream_data.prompt_logprobs except Exception as e: - llm_logger.warning(f"Failed to parse prompt_logprobs from StreamTransferData: {e}") + log_request( + level=1, + message="Failed to parse prompt_logprobs from StreamTransferData: {error}", + error=str(e), + ) if self.tokens_counter[task_id] == 0: if task.messages is not None: result.prompt = task.messages @@ -394,7 +428,9 @@ def process_sampling_results_use_zmq(self): batch_result = self._process_batch_output_use_zmq(receive_datas) self.postprocess(batch_result) except Exception as e: - llm_logger.error(f"Receive message:{receive_datas}, error:{e}") + log_request_error( + message="Receive message:{receive_datas}, error:{error}", receive_datas=receive_datas, error=e + ) continue def process_sampling_results(self): @@ -511,7 +547,11 @@ def postprocess(self, batch_result: List[RequestOutput], mtype=3): else: self.cached_generated_tokens.put_results(batch_result) except Exception as e: - llm_logger.error(f"Error in TokenProcessor's postprocess: {e}, {str(traceback.format_exc())}") + log_request_error( + message="Error in TokenProcessor's postprocess: {error}, {traceback}", + error=e, + traceback=traceback.format_exc(), + ) def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False): """ @@ -524,14 +564,21 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False finished_task_ids = self.engine_worker_queue.get_finished_req() if len(finished_task_ids) > 0: for finished_task_id in finished_task_ids: - llm_logger.info(f"finished_task_id: {finished_task_id}") + log_request( + level=0, + message="finished_task_id: {finished_task_id}", + finished_task_id=finished_task_id, + ) self.prefill_result_status[finished_task_id[0]] = finished_task_id[1] if task_id in self.prefill_result_status: if self.prefill_result_status[task_id] != "finished": result.error_code = 400 result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}" - llm_logger.info( - f"wait for sending cache, request_id: {task_id}, cost seconds: {time.time()-start_time:.5f}" + log_request( + level=1, + message="wait for sending cache, request_id: {request_id}, cost seconds: {cost_seconds}", + request_id=task_id, + cost_seconds=f"{time.time()-start_time:.5f}", ) result.metrics.send_request_output_to_decode_time = time.time() self.split_connector.send_first_token(task.disaggregate_info, [result]) @@ -755,7 +802,11 @@ def _process_batch_output(self): if self.cfg.speculative_config.method: self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i]) if accept_num[i] == PREEMPTED_TOKEN_ID: # in MTP, means preemption has happened in worker - llm_logger.info(f"sync preemption for request_id {task_id} done.") + log_request( + level=0, + message="sync preemption for request_id {request_id} done.", + request_id=task_id, + ) if envs.ENABLE_V1_KVCACHE_SCHEDULER: if task_id in self.resource_manager.to_be_aborted_req_id_set: self.resource_manager.recycle_abort_task(task_id) @@ -765,7 +816,11 @@ def _process_batch_output(self): if accept_num[i] == -3: recovery_stop = True if recovery_stop: - llm_logger.info(f"recovery stop signal found at task {task_id}") + log_request( + level=0, + message="recovery stop signal found at task {request_id}", + request_id=task_id, + ) token_ids = [RECOVERY_STOP_SIGNAL] elif self.use_logprobs: token_ids = tokens[i][:, 0].tolist()[: accept_num[i]] @@ -785,7 +840,11 @@ def _process_batch_output(self): token_ids = [token_id] recovery_stop = token_id == RECOVERY_STOP_SIGNAL if recovery_stop: - llm_logger.info(f"recovery stop signal found at task {task_id}") + log_request( + level=0, + message="recovery stop signal found at task {request_id}", + request_id=task_id, + ) if not recovery_stop and token_id < 0: if envs.ENABLE_V1_KVCACHE_SCHEDULER: if ( @@ -793,12 +852,20 @@ def _process_batch_output(self): and token_id == PREEMPTED_TOKEN_ID ): self.resource_manager.recycle_abort_task(task_id) - llm_logger.info(f"sync abortion for request_id {task_id} done.") + log_request( + level=0, + message="sync abortion for request_id {request_id} done.", + request_id=task_id, + ) if ( task_id in self.resource_manager.to_be_rescheduled_request_id_set and token_id == PREEMPTED_TOKEN_ID ): - llm_logger.info(f"sync preemption for request_id {task_id} done.") + log_request( + level=0, + message="sync preemption for request_id {request_id} done.", + request_id=task_id, + ) self.resource_manager.reschedule_preempt_task(task_id) continue if self.cfg.scheduler_config.splitwise_role == "decode": @@ -826,7 +893,11 @@ def _process_batch_output(self): task.metrics.record_recv_first_token() task.metrics.cal_cost_time() metrics = copy.copy(task.metrics) - llm_logger.info(f"task:{task.request_id} start recode first token") + log_request( + level=0, + message="task:{request_id} start recode first token", + request_id=task.request_id, + ) self._record_first_token_metrics(task, current_time) tracing.trace_report_span( @@ -932,12 +1003,26 @@ def _process_batch_output(self): # Print combined log with all required information ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0 ttft_s = ttft + task.metrics.time_in_queue - llm_logger.info( - f"Request={task_id}, InputToken={task.prompt_token_ids_len}, " - f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, " - f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, TTFT_S={ttft_s:.2f}, " - f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " - f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}" + log_request( + level=0, + message=( + "Request={request_id}, InputToken={input_tokens}, " + "CachedDetail={cached_detail}, OutputToken={output_tokens}, " + "TokenRatio={token_ratio}, TTFT={ttft}, TTFT_S={ttft_s}, " + "E2E={e2e_time}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " + "PreemptedCount={preempted_count}" + ), + request_id=task_id, + input_tokens=task.prompt_token_ids_len, + cached_detail=cached_detail, + output_tokens=self.tokens_counter[task_id], + token_ratio=f"{token_ratio:.2f}", + ttft=f"{ttft:.2f}", + ttft_s=f"{ttft_s:.2f}", + e2e_time=f"{e2e_time:.2f}", + is_prefill=is_prefill, + recovery_stop=recovery_stop, + preempted_count=getattr(task.metrics, "preempted_count", 0), ) main_process_metrics.request_token_ratio.observe(token_ratio) @@ -946,8 +1031,11 @@ def _process_batch_output(self): self._compute_speculative_status(result) if not is_prefill: self._record_completion_metrics(task, current_time) - llm_logger.info(f"task {task_id} received eos token. Recycling.") - + log_request( + level=0, + message="task {request_id} received eos token. Recycling.", + request_id=task_id, + ) if ( envs.ENABLE_V1_KVCACHE_SCHEDULER and self.cfg.cache_config.enable_prefix_caching @@ -957,7 +1045,11 @@ def _process_batch_output(self): task ) # when enable prefix caching, cache kv cache for output tokens self._recycle_resources(task_id, i, task, result, is_prefill) - llm_logger.info(f"eos token {task_id} Recycle end.") + log_request( + level=0, + message="eos token {request_id} Recycle end.", + request_id=task_id, + ) break llm_logger.debug(f"get response from infer: {result}") diff --git a/fastdeploy/scheduler/dp_scheduler.py b/fastdeploy/scheduler/dp_scheduler.py index 2339a077c96..4d0f53539c3 100644 --- a/fastdeploy/scheduler/dp_scheduler.py +++ b/fastdeploy/scheduler/dp_scheduler.py @@ -21,6 +21,7 @@ from typing import Dict, List, Optional from fastdeploy.engine.request import Request, RequestOutput +from fastdeploy.logger.request_logger import log_request from fastdeploy.scheduler.data import ScheduledResponse from fastdeploy.scheduler.local_scheduler import LocalScheduler from fastdeploy.utils import get_logger @@ -58,7 +59,11 @@ def put_results(self, results: List[RequestOutput]): finished_responses = [response.request_id for response in responses if response.finished] if len(finished_responses) > 0: - self.scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}") + log_request( + level=2, + message="Scheduler has received some finished responses: {request_ids}", + request_ids=finished_responses, + ) with self.mutex: self.batch_responses_per_step.append([response.raw for response in responses]) @@ -146,8 +151,10 @@ def get_requests( self.ids_read_cursor += 1 if len(requests) > 0: - self.scheduler_logger.info( - f"Scheduler has pulled some request: {[request.request_id for request in requests]}" + log_request( + level=2, + message="Scheduler has pulled some request: {request_ids}", + request_ids=[request.request_id for request in requests], ) return requests @@ -195,7 +202,11 @@ def put_requests(self, requests: List[Dict]): def _put_requests_to_local(self): while True: request = self.request_queues.get() - self.scheduler_logger.info(f"Receive request from puller, request_id: {request.request_id}") + log_request( + level=2, + message="Receive request from puller, request_id: {request_id}", + request_id=request.request_id, + ) self._scheduler.put_requests([request]) def _get_response_from_local(self): diff --git a/fastdeploy/scheduler/global_scheduler.py b/fastdeploy/scheduler/global_scheduler.py index 9f6d0644613..ca3dc9011ec 100644 --- a/fastdeploy/scheduler/global_scheduler.py +++ b/fastdeploy/scheduler/global_scheduler.py @@ -25,6 +25,7 @@ from redis import ConnectionPool from fastdeploy.engine.request import Request, RequestOutput +from fastdeploy.logger.request_logger import log_request, log_request_error from fastdeploy.scheduler import utils from fastdeploy.scheduler.data import ScheduledRequest, ScheduledResponse from fastdeploy.scheduler.storage import AdaptedRedis @@ -371,6 +372,11 @@ def _put_requests_worker(self, tasks: List[Task]) -> List[Task]: ttl=self.ttl, ) scheduler_logger.info(f"Scheduler has enqueued some requests: {requests}") + log_request( + level=2, + message="Scheduler has enqueued some requests: {request_ids}", + request_ids=[request.request_id for request in requests], + ) if duplicate: scheduler_logger.warning( @@ -573,7 +579,9 @@ def get_requests( self.stolen_requests[request.request_id] = request continue - scheduler_logger.error(f"Scheduler has received a duplicate request from others: {request}") + log_request_error( + message="Scheduler has received a duplicate request from others: {request}", request=request + ) requests: List[Request] = [request.raw for request in scheduled_requests] if len(remaining_request) > 0: @@ -603,7 +611,11 @@ def get_requests( ) if len(requests) > 0: - scheduler_logger.info(f"Scheduler has pulled some request: {[request.request_id for request in requests]}") + log_request( + level=2, + message="Scheduler has pulled some request: {request_ids}", + request_ids=[request.request_id for request in requests], + ) return requests def _put_results_worker(self, tasks: List[Task]): @@ -649,7 +661,9 @@ def _put_results_worker(self, tasks: List[Task]): stolen_responses[response_queue_name].append(response.serialize()) continue - scheduler_logger.error(f"Scheduler has received a non-existent response from engine: {[response]}") + log_request_error( + message="Scheduler has received a non-existent response from engine: {response}", response=[response] + ) with self.mutex: for request_id, responses in local_responses.items(): @@ -664,7 +678,11 @@ def _put_results_worker(self, tasks: List[Task]): self.local_response_not_empty.notify_all() if len(finished_request_ids) > 0: - scheduler_logger.info(f"Scheduler has received some finished responses: {finished_request_ids}") + log_request( + level=2, + message="Scheduler has received some finished responses: {request_ids}", + request_ids=finished_request_ids, + ) for response_queue_name, responses in stolen_responses.items(): self.client.rpush(response_queue_name, *responses, ttl=self.ttl) @@ -793,7 +811,11 @@ def _get_results() -> Dict[str, List[ScheduledResponse]]: if finished: del self.local_responses[request_id] - scheduler_logger.info(f"Scheduler has pulled a finished response: {[request_id]}") + log_request( + level=2, + message="Scheduler has pulled a finished response: {request_ids}", + request_ids=[request_id], + ) return results def reset(self): diff --git a/fastdeploy/scheduler/local_scheduler.py b/fastdeploy/scheduler/local_scheduler.py index fc4a64686b5..703fa8dcbd0 100644 --- a/fastdeploy/scheduler/local_scheduler.py +++ b/fastdeploy/scheduler/local_scheduler.py @@ -19,6 +19,7 @@ from typing import Dict, List, Optional, Tuple from fastdeploy.engine.request import Request, RequestOutput +from fastdeploy.logger.request_logger import log_request from fastdeploy.scheduler.data import ScheduledRequest, ScheduledResponse from fastdeploy.utils import envs, scheduler_logger @@ -191,7 +192,12 @@ def put_requests(self, requests: List[Request]) -> List[Tuple[str, Optional[str] self.ids += valid_ids self.requests_not_empty.notify_all() - scheduler_logger.info(f"Scheduler has enqueued some requests: {valid_ids}") + if len(valid_ids) > 0: + log_request( + level=2, + message="Scheduler has enqueued some requests: {request_ids}", + request_ids=valid_ids, + ) if len(duplicated_ids) > 0: scheduler_logger.warning(f"Scheduler has received some duplicated requests: {duplicated_ids}") @@ -300,7 +306,11 @@ def get_requests( scheduler_logger.debug(f"Scheduler has put all just-pulled request into the queue: {len(batch_ids)}") if len(requests) > 0: - scheduler_logger.info(f"Scheduler has pulled some request: {[request.request_id for request in requests]}") + log_request( + level=2, + message="Scheduler has pulled some request: {request_ids}", + request_ids=[request.request_id for request in requests], + ) return requests @@ -316,7 +326,11 @@ def put_results(self, results: List[RequestOutput]): finished_responses = [response.request_id for response in responses if response.finished] if len(finished_responses) > 0: - scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}") + log_request( + level=2, + message="Scheduler has received some finished responses: {request_ids}", + request_ids=finished_responses, + ) with self.mutex: self.batch_responses_per_step.append([response.raw for response in responses]) @@ -381,7 +395,11 @@ def _get_results(): if finished: self._recycle(request_id) - scheduler_logger.info(f"Scheduler has pulled a finished response: {[request_id]}") + log_request( + level=2, + message="Scheduler has pulled a finished response: {request_ids}", + request_ids=[request_id], + ) if results: scheduler_logger.debug(f"get responses, {results}") diff --git a/fastdeploy/scheduler/splitwise_scheduler.py b/fastdeploy/scheduler/splitwise_scheduler.py index cd5e0736436..8116a29cf94 100644 --- a/fastdeploy/scheduler/splitwise_scheduler.py +++ b/fastdeploy/scheduler/splitwise_scheduler.py @@ -33,6 +33,7 @@ RequestMetrics, RequestOutput, ) +from fastdeploy.logger.request_logger import log_request_error from fastdeploy.utils import scheduler_logger as logger @@ -240,7 +241,12 @@ def expire_reqs(self, ttl): for req_id, pairs in self.reqs.items(): load, arrival_time = pairs if cur_time - arrival_time > ttl: - logger.error(f"InferScheduler Expire Reqs({req_id}), arrival({arrival_time}), ttl({ttl})") + log_request_error( + message="InferScheduler Expire Reqs({req_id}), arrival({arrival_time}), ttl({ttl})", + req_id=req_id, + arrival_time=arrival_time, + ttl=ttl, + ) expire_reqs.add((req_id, load)) for req_id, load in expire_reqs: if req_id in self.reqs: @@ -378,7 +384,7 @@ def run(self): ) self.data.appendleft(result) - logger.error(f"Req({req_id}) is expired({self.ttl})") + log_request_error(message="Req({req_id}) is expired({ttl})", req_id=req_id, ttl=self.ttl) expired_reqs.add(req_id) continue keys.append(req_id) @@ -511,7 +517,11 @@ def loop_schedule(self): except IndexError: continue except Exception as e: - logger.error(f"APIScheduler Schedule req error: {e!s}, {str(traceback.format_exc())}") + log_request_error( + message="APIScheduler Schedule req error: {error}, {traceback}", + error=str(e), + traceback=traceback.format_exc(), + ) def schedule(self, req, pnodes, dnodes, mnodes, group=""): """ @@ -841,7 +851,11 @@ def get_requests( req = self.reqs_queue.popleft() if cur_time - req.metrics.arrival_time > self.ttl: - logger.error(f"req({req.request_id}) is expired({self.ttl}) when InferScheduler Get Requests") + log_request_error( + message="req({request_id}) is expired({ttl}) when InferScheduler Get Requests", + request_id=req.request_id, + ttl=self.ttl, + ) self.node.finish_req(req.request_id) continue current_prefill_tokens += req.prompt_token_ids_len diff --git a/fastdeploy/utils.py b/fastdeploy/utils.py index 0a591dc2777..fe5b51f00b9 100644 --- a/fastdeploy/utils.py +++ b/fastdeploy/utils.py @@ -54,6 +54,7 @@ from fastdeploy import envs from fastdeploy.entrypoints.openai.protocol import ErrorInfo, ErrorResponse from fastdeploy.logger.logger import FastDeployLogger +from fastdeploy.logger.request_logger import log_request_error from fastdeploy.worker.output import PromptLogprobs T = TypeVar("T") @@ -201,7 +202,12 @@ async def handle_request_validation_exception(request: Request, exc: RequestVali param=param, ) ) - api_server_logger.error(f"invalid_request_error: {request.url} {param} {message}") + log_request_error( + message="invalid_request_error: {url} {param} {msg}", + url=str(request.url), + param=param, + msg=message, + ) return JSONResponse(content=err.model_dump(), status_code=HTTPStatus.BAD_REQUEST) @@ -1055,9 +1061,9 @@ def parse_quantization(value: str): # 日志使用全局访问点(兼容原有使用方式) -def get_logger(name, file_name=None, without_formater=False, print_to_console=False): +def get_logger(name, file_name=None, without_formater=False, print_to_console=False, channel=None): """全局函数包装器,保持向后兼容""" - return FastDeployLogger().get_logger(name, file_name, without_formater, print_to_console) + return FastDeployLogger().get_logger(name, file_name, without_formater, print_to_console, channel=channel) def check_download_links(bos_client, links, timeout=1): @@ -1156,16 +1162,16 @@ def _bos_download(bos_client, link): break -llm_logger = get_logger("fastdeploy", "fastdeploy.log") -data_processor_logger = get_logger("data_processor", "data_processor.log") -scheduler_logger = get_logger("scheduler", "scheduler.log") -api_server_logger = get_logger("api_server", "api_server.log") -console_logger = get_logger("console", "console.log", print_to_console=True) +llm_logger = get_logger("fastdeploy", channel="main") +data_processor_logger = get_logger("data_processor", channel="main") +scheduler_logger = get_logger("scheduler", channel="main") +api_server_logger = get_logger("api_server", channel="main") +console_logger = get_logger(None, channel="console") spec_logger = get_logger("speculate", "speculate.log") -zmq_client_logger = get_logger("zmq_client", "zmq_client.log") +zmq_client_logger = get_logger("zmq_client", "comm.log") trace_logger = FastDeployLogger().get_trace_logger("trace", "trace.log") -router_logger = get_logger("router", "router.log") -fmq_logger = get_logger("fmq", "fmq.log") +router_logger = get_logger("router", "comm.log") +fmq_logger = get_logger("fmq", "comm.log") obj_logger = get_logger("obj", "obj.log") # debug内存问题 register_manager_logger = get_logger("register_manager", "register_manager.log") diff --git a/tests/entrypoints/openai/v1/test_serving_completion_v1.py b/tests/entrypoints/openai/v1/test_serving_completion_v1.py index 5b5982f99b5..9699cd25cf3 100644 --- a/tests/entrypoints/openai/v1/test_serving_completion_v1.py +++ b/tests/entrypoints/openai/v1/test_serving_completion_v1.py @@ -756,8 +756,8 @@ async def test_preprocess_request_id_generation(self): expected_id = f"{request_id}_{i}" if request_id else f"_{i}" self.assertEqual(ctx.preprocess_requests[i]["request_id"], expected_id) - @patch("fastdeploy.entrypoints.openai.v1.serving_completion.api_server_logger") - async def test_preprocess_exception_logging(self, mock_logger): + @patch("fastdeploy.entrypoints.openai.v1.serving_completion.log_request_error") + async def test_preprocess_exception_logging(self, mock_log_request_error): """Test _preprocess logs exceptions properly""" # Setup - create a request that will cause an exception request = CompletionRequest(model="test_model", prompt="dummy", max_tokens=50) @@ -771,11 +771,10 @@ async def test_preprocess_exception_logging(self, mock_logger): # Assert self.assertIsInstance(result, ErrorResponse) - mock_logger.error.assert_called_once() - error_log = mock_logger.error.call_args[0][0] - self.assertIn("OpenAIServingCompletion create_completion", error_log) - self.assertIn("ValueError", error_log) - self.assertIn("Traceback", error_log) # Changed from "traceback" to "Traceback" + mock_log_request_error.assert_called_once() + error_msg = mock_log_request_error.call_args[1].get("error_msg", "") + self.assertIn("OpenAIServingCompletion create_completion", error_msg) + self.assertIn("ValueError", error_msg) if __name__ == "__main__": diff --git a/tests/entrypoints/test_abort.py b/tests/entrypoints/test_abort.py index e378ff814ed..598a3023f75 100644 --- a/tests/entrypoints/test_abort.py +++ b/tests/entrypoints/test_abort.py @@ -206,9 +206,9 @@ def test_abort_request_id_regex_parsing(self, mock_send_task): mock_send_task.assert_called_once_with(expected_data) @patch("fastdeploy.entrypoints.engine_client.envs.FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE", True) - @patch("fastdeploy.entrypoints.engine_client.api_server_logger") + @patch("fastdeploy.entrypoints.engine_client.log_request") @patch.object(EngineClient, "_send_task") - def test_abort_logging(self, mock_send_task, mock_logger): + def test_abort_logging(self, mock_send_task, mock_log_request): """Test that abort method logs correctly""" request_id = "test_request" n = 2 @@ -216,18 +216,16 @@ def test_abort_logging(self, mock_send_task, mock_logger): # Run the abort method self.loop.run_until_complete(self.engine_client.abort(request_id, n=n)) - # Verify info log was called twice - self.assertEqual(mock_logger.info.call_count, 2) + # Verify log_request was called twice + self.assertEqual(mock_log_request.call_count, 2) # Verify the first log message (abort start) - first_call = mock_logger.info.call_args_list[0] - self.assertEqual(first_call[0][0], "abort request_id:test_request") + first_call = mock_log_request.call_args_list[0] + self.assertIn("abort request_id", first_call[1].get("message", "")) # Verify the second log message (abort completion with request IDs) - second_call = mock_logger.info.call_args_list[1] - expected_log_message = "Aborted request(s) %s." - self.assertEqual(second_call[0][0], expected_log_message) - self.assertEqual(second_call[0][1], "test_request_0,test_request_1") + second_call = mock_log_request.call_args_list[1] + self.assertIn("Aborted request(s)", second_call[1].get("message", "")) @patch("fastdeploy.entrypoints.engine_client.envs.FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE", True) @patch("fastdeploy.entrypoints.engine_client.api_server_logger") diff --git a/tests/entrypoints/test_engine_client.py b/tests/entrypoints/test_engine_client.py index 71ad4b29db9..bbbd42d795f 100644 --- a/tests/entrypoints/test_engine_client.py +++ b/tests/entrypoints/test_engine_client.py @@ -843,11 +843,11 @@ def test_valid_parameters_reasoning_max_tokens_adjustment(self): """Test valid_parameters adjusts reasoning_max_tokens when needed.""" data = {"max_tokens": 50, "reasoning_max_tokens": 100, "request_id": "test-id"} # Larger than max_tokens - with patch("fastdeploy.entrypoints.engine_client.api_server_logger") as mock_logger: + with patch("fastdeploy.entrypoints.engine_client.log_request") as mock_log_request: self.engine_client.valid_parameters(data) self.assertEqual(data["reasoning_max_tokens"], 50) - mock_logger.warning.assert_called_once() + mock_log_request.assert_called_once() def test_valid_parameters_reasoning_max_tokens_with_reasoning_effort(self): """Test valid_parameters when both reasoning_max_tokens and reasoning_effort are set.""" @@ -858,14 +858,13 @@ def test_valid_parameters_reasoning_max_tokens_with_reasoning_effort(self): "request_id": "test-id", } - with patch("fastdeploy.entrypoints.engine_client.api_server_logger") as mock_logger: + with patch("fastdeploy.entrypoints.engine_client.log_request") as mock_log_request: self.engine_client.valid_parameters(data) # When reasoning_effort is set, reasoning_max_tokens should be set to None self.assertIsNone(data["reasoning_max_tokens"]) - mock_logger.warning.assert_called_once() - warning_call = mock_logger.warning.call_args[0][0] - self.assertIn("reasoning_max_tokens and reasoning_effort are both set", warning_call) + # log_request is called once: for reasoning_effort conflict (reasoning_max_tokens=50 < max_tokens=100) + mock_log_request.assert_called_once() def test_valid_parameters_temperature_zero_adjustment(self): """Test valid_parameters adjusts zero temperature.""" @@ -1846,7 +1845,7 @@ def process_request_dict(self, *_args, **_kwargs): with ( patch( "fastdeploy.entrypoints.engine_client.os.getenv", - side_effect=lambda k: "1" if k == "FD_ENABLE_OBJGRAPH_DEBUG" else None, + side_effect=lambda k, default=None: "1" if k == "FD_ENABLE_OBJGRAPH_DEBUG" else default, ), patch("fastdeploy.entrypoints.engine_client._has_objgraph", True), patch("fastdeploy.entrypoints.engine_client._has_psutil", False), diff --git a/tests/entrypoints/test_llm.py b/tests/entrypoints/test_llm.py index 2944235e525..9c1baa53663 100644 --- a/tests/entrypoints/test_llm.py +++ b/tests/entrypoints/test_llm.py @@ -148,7 +148,9 @@ def _get_generated_result(): assert first.added is True -def test_receive_output_logs_exception(caplog): +def test_receive_output_logs_exception(): + from unittest.mock import patch + llm = _make_llm(_make_engine()) calls = iter([RuntimeError("boom"), SystemExit()]) @@ -159,9 +161,14 @@ def _get_generated_result(): return nxt llm.llm_engine._get_generated_result = _get_generated_result - with pytest.raises(SystemExit): - llm._receive_output() - assert "Unexcepted error happened" in caplog.text + with patch("fastdeploy.entrypoints.llm.log_request_error") as mock_log: + with pytest.raises(SystemExit): + llm._receive_output() + mock_log.assert_called_once() + call_kwargs = mock_log.call_args[1] + assert "Unexpected error happened" in call_kwargs.get( + "message", mock_log.call_args[0][0] if mock_log.call_args[0] else "" + ) def test_generate_and_chat_branches(): diff --git a/tests/input/test_video_utils.py b/tests/input/test_video_utils.py index 28e6f97e3d9..af9a8ad57c5 100644 --- a/tests/input/test_video_utils.py +++ b/tests/input/test_video_utils.py @@ -264,7 +264,7 @@ def test_num_frames_exceeds_total_raises(self): def test_fps_warning_when_nframes_exceeds_total(self): """fps so high that computed num_frames > total → warning logged.""" - with self.assertLogs(level="WARNING"): + with self.assertLogs(logger="fastdeploy.main", level="WARNING"): sample_frames_qwen(2, 4, 100, {"num_of_frame": 10, "fps": 1.0}, fps=100.0) def test_divisible_by_4_correction(self): diff --git a/tests/logger/test_logger.py b/tests/logger/test_logger.py index 4939356a620..da708db6f9d 100644 --- a/tests/logger/test_logger.py +++ b/tests/logger/test_logger.py @@ -46,8 +46,12 @@ def tearDown(self): shutil.rmtree(self.tmp_dir, ignore_errors=True) def test_unified_logger(self): - """Test _get_unified_logger through instance""" - test_cases = [(None, "fastdeploy"), ("module", "fastdeploy.module"), ("fastdeploy.utils", "fastdeploy.utils")] + """Test _get_unified_logger through instance (uses main channel)""" + test_cases = [ + (None, "fastdeploy.main"), + ("module", "fastdeploy.main.module"), + ("fastdeploy.utils", "fastdeploy.utils"), # 已有 fastdeploy. 前缀的保持不变 + ] for name, expected in test_cases: with self.subTest(name=name): diff --git a/tests/logger/test_logging_config.py b/tests/logger/test_logging_config.py new file mode 100644 index 00000000000..63a451d9923 --- /dev/null +++ b/tests/logger/test_logging_config.py @@ -0,0 +1,91 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import patch + +from fastdeploy.logger.config import resolve_log_level, resolve_request_logging_defaults + + +class TestResolveLogLevel(unittest.TestCase): + """测试 resolve_log_level 函数""" + + def test_explicit_info_level(self): + """显式设置 INFO 级别""" + result = resolve_log_level(raw_level="INFO") + self.assertEqual(result, "INFO") + + def test_explicit_debug_level(self): + """显式设置 DEBUG 级别""" + result = resolve_log_level(raw_level="DEBUG") + self.assertEqual(result, "DEBUG") + + def test_case_insensitive(self): + """级别名称应该大小写不敏感""" + self.assertEqual(resolve_log_level(raw_level="info"), "INFO") + self.assertEqual(resolve_log_level(raw_level="debug"), "DEBUG") + + def test_invalid_level_raises(self): + """无效级别应该抛出 ValueError""" + with self.assertRaises(ValueError) as ctx: + resolve_log_level(raw_level="INVALID") + self.assertIn("Unsupported FD_LOG_LEVEL", str(ctx.exception)) + + def test_debug_enabled_fallback(self): + """FD_DEBUG=1 应该返回 DEBUG""" + result = resolve_log_level(raw_level=None, debug_enabled=1) + self.assertEqual(result, "DEBUG") + + def test_debug_disabled_fallback(self): + """FD_DEBUG=0 应该返回 INFO""" + result = resolve_log_level(raw_level=None, debug_enabled=0) + self.assertEqual(result, "INFO") + + def test_env_fd_log_level_priority(self): + """FD_LOG_LEVEL 环境变量优先级高于 FD_DEBUG""" + with patch.dict("os.environ", {"FD_LOG_LEVEL": "INFO", "FD_DEBUG": "1"}): + result = resolve_log_level() + self.assertEqual(result, "INFO") + + def test_env_fd_debug_fallback(self): + """无 FD_LOG_LEVEL 时使用 FD_DEBUG""" + with patch.dict("os.environ", {"FD_DEBUG": "1"}, clear=True): + result = resolve_log_level() + self.assertEqual(result, "DEBUG") + + +class TestResolveRequestLoggingDefaults(unittest.TestCase): + """测试 resolve_request_logging_defaults 函数""" + + def test_default_values(self): + """默认值测试""" + with patch.dict("os.environ", {}, clear=True): + result = resolve_request_logging_defaults() + self.assertEqual(result["enabled"], 1) + self.assertEqual(result["level"], 0) + self.assertEqual(result["max_len"], 2048) + + def test_custom_values(self): + """自定义值测试""" + with patch.dict( + "os.environ", {"FD_LOG_REQUESTS": "0", "FD_LOG_REQUESTS_LEVEL": "2", "FD_LOG_MAX_LEN": "1024"} + ): + result = resolve_request_logging_defaults() + self.assertEqual(result["enabled"], 0) + self.assertEqual(result["level"], 2) + self.assertEqual(result["max_len"], 1024) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/logger/test_request_logger.py b/tests/logger/test_request_logger.py new file mode 100644 index 00000000000..1df1d2cc364 --- /dev/null +++ b/tests/logger/test_request_logger.py @@ -0,0 +1,155 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import patch + +from fastdeploy.logger.request_logger import ( + RequestLogLevel, + _should_log, + _truncate, + log_request, +) + + +class TestRequestLogLevel(unittest.TestCase): + """测试 RequestLogLevel 枚举""" + + def test_level_values(self): + """测试级别值""" + self.assertEqual(int(RequestLogLevel.L0), 0) + self.assertEqual(int(RequestLogLevel.L1), 1) + self.assertEqual(int(RequestLogLevel.L2), 2) + self.assertEqual(int(RequestLogLevel.L3), 3) + + +class TestShouldLog(unittest.TestCase): + """测试 _should_log 函数""" + + def test_disabled_returns_false(self): + """FD_LOG_REQUESTS=0 应该返回 False""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 0 + mock_envs.FD_LOG_REQUESTS_LEVEL = 3 + self.assertFalse(_should_log(0)) + + def test_level_within_threshold(self): + """级别在阈值内应该返回 True""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 1 + mock_envs.FD_LOG_REQUESTS_LEVEL = 2 + self.assertTrue(_should_log(0)) + self.assertTrue(_should_log(1)) + self.assertTrue(_should_log(2)) + + def test_level_above_threshold(self): + """级别超过阈值应该返回 False""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 1 + mock_envs.FD_LOG_REQUESTS_LEVEL = 1 + self.assertFalse(_should_log(2)) + self.assertFalse(_should_log(3)) + + +class TestTruncate(unittest.TestCase): + """测试 _truncate 函数""" + + def test_short_text_unchanged(self): + """短文本应该保持不变""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_MAX_LEN = 100 + result = _truncate("short text") + self.assertEqual(result, "short text") + + def test_long_text_truncated(self): + """长文本应该被截断""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_MAX_LEN = 10 + result = _truncate("this is a very long text") + self.assertEqual(result, "this is a ") + self.assertEqual(len(result), 10) + + def test_non_string_converted(self): + """非字符串应该被转换""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_MAX_LEN = 100 + result = _truncate(12345) + self.assertEqual(result, "12345") + + +class TestLogRequest(unittest.TestCase): + """测试 log_request 函数""" + + @patch("fastdeploy.logger.request_logger._request_logger") + def test_log_when_enabled(self, mock_logger): + """启用时应该记录日志""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 1 + mock_envs.FD_LOG_REQUESTS_LEVEL = 0 + mock_envs.FD_LOG_MAX_LEN = 2048 + + log_request(level=0, message="test {value}", value="hello") + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + self.assertEqual(call_args, "test hello") + + @patch("fastdeploy.logger.request_logger._request_logger") + def test_no_log_when_disabled(self, mock_logger): + """禁用时不应该记录日志""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 0 + mock_envs.FD_LOG_REQUESTS_LEVEL = 3 + + log_request(level=0, message="test {value}", value="hello") + mock_logger.info.assert_not_called() + + @patch("fastdeploy.logger.request_logger._request_logger") + def test_no_log_when_level_too_high(self, mock_logger): + """级别过高时不应该记录日志""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 1 + mock_envs.FD_LOG_REQUESTS_LEVEL = 0 + + log_request(level=2, message="test {value}", value="hello") + mock_logger.info.assert_not_called() + + @patch("fastdeploy.logger.request_logger._request_logger") + def test_l2_level_truncates_content(self, mock_logger): + """L2 级别应该截断内容""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 1 + mock_envs.FD_LOG_REQUESTS_LEVEL = 3 + mock_envs.FD_LOG_MAX_LEN = 5 + + log_request(level=2, message="content: {data}", data="very long data") + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + self.assertEqual(call_args, "content: very ") + + @patch("fastdeploy.logger.request_logger._request_logger") + def test_l0_level_no_truncation(self, mock_logger): + """L0 级别不应该截断内容""" + with patch("fastdeploy.logger.request_logger.envs") as mock_envs: + mock_envs.FD_LOG_REQUESTS = 1 + mock_envs.FD_LOG_REQUESTS_LEVEL = 3 + mock_envs.FD_LOG_MAX_LEN = 5 + + log_request(level=0, message="content: {data}", data="very long data") + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args[0][0] + self.assertEqual(call_args, "content: very long data") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/logger/test_setup_logging.py b/tests/logger/test_setup_logging.py index 0bf02c14e88..4e45b013f45 100644 --- a/tests/logger/test_setup_logging.py +++ b/tests/logger/test_setup_logging.py @@ -54,7 +54,7 @@ def test_default_config_fallback(self): logger = logging.getLogger("fastdeploy") self.assertTrue(logger.handlers) handler_classes = [h.__class__.__name__ for h in logger.handlers] - self.assertIn("TimedRotatingFileHandler", handler_classes) + self.assertIn("LazyFileHandler", handler_classes) def test_debug_level_affects_handlers(self): """FD_DEBUG=1 should force DEBUG level""" @@ -181,5 +181,61 @@ def test_filter_with_numeric_level(self): self.assertFalse(filter.filter(warning_record)) +class TestChannelLoggers(unittest.TestCase): + """测试日志通道配置""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="logger_channel_test_") + if hasattr(setup_logging, "_configured"): + delattr(setup_logging, "_configured") + self.patches = [ + patch("fastdeploy.envs.FD_LOG_DIR", self.temp_dir), + patch("fastdeploy.envs.FD_DEBUG", 0), + patch("fastdeploy.envs.FD_LOG_BACKUP_COUNT", "3"), + patch("fastdeploy.envs.FD_LOG_LEVEL", None), + ] + [p.start() for p in self.patches] + + def tearDown(self): + [p.stop() for p in self.patches] + shutil.rmtree(self.temp_dir, ignore_errors=True) + if hasattr(setup_logging, "_configured"): + delattr(setup_logging, "_configured") + + @patch("logging.config.dictConfig") + def test_request_channel_configured(self, mock_dict): + """request 通道应该配置 request_file handler""" + setup_logging() + config_used = mock_dict.call_args[0][0] + self.assertIn("fastdeploy.request", config_used["loggers"]) + self.assertIn("request_file", config_used["loggers"]["fastdeploy.request"]["handlers"]) + + @patch("logging.config.dictConfig") + def test_main_channel_configured(self, mock_dict): + """main 通道应该配置 main_file 和 console_file handlers""" + setup_logging() + config_used = mock_dict.call_args[0][0] + self.assertIn("fastdeploy.main", config_used["loggers"]) + self.assertIn("main_file", config_used["loggers"]["fastdeploy.main"]["handlers"]) + self.assertIn("console_file", config_used["loggers"]["fastdeploy.main"]["handlers"]) + + @patch("logging.config.dictConfig") + def test_console_channel_configured(self, mock_dict): + """console 通道应该配置 console_file 和 console_stdout handlers""" + setup_logging() + config_used = mock_dict.call_args[0][0] + self.assertIn("fastdeploy.console", config_used["loggers"]) + self.assertIn("console_file", config_used["loggers"]["fastdeploy.console"]["handlers"]) + self.assertIn("console_stdout", config_used["loggers"]["fastdeploy.console"]["handlers"]) + + @patch("logging.config.dictConfig") + def test_request_file_handler_configured(self, mock_dict): + """request_file handler 应该输出到 request.log""" + setup_logging() + config_used = mock_dict.call_args[0][0] + self.assertIn("request_file", config_used["handlers"]) + self.assertTrue(config_used["handlers"]["request_file"]["filename"].endswith("request.log")) + + if __name__ == "__main__": unittest.main() diff --git a/tests/output/test_process_batch_output.py b/tests/output/test_process_batch_output.py index 46282cd386a..b44eb489dbc 100644 --- a/tests/output/test_process_batch_output.py +++ b/tests/output/test_process_batch_output.py @@ -295,9 +295,9 @@ def test_process_batch_output_aborted_task_negative_token_speculative_decoding(s processor.tokens_counter[task_id] = 0 processor.tokens_counter[task2.request_id] = 0 - # Mock llm_logger to capture the log message and envs.ENABLE_V1_KVCACHE_SCHEDULER + # Mock log_request to capture the log message and envs.ENABLE_V1_KVCACHE_SCHEDULER with ( - patch("fastdeploy.output.token_processor.llm_logger") as mock_logger, + patch("fastdeploy.output.token_processor.log_request") as mock_log_request, patch("fastdeploy.output.token_processor.envs.ENABLE_V1_KVCACHE_SCHEDULER", 0), ): # Call the method @@ -306,7 +306,11 @@ def test_process_batch_output_aborted_task_negative_token_speculative_decoding(s # In speculative decoding mode, when accept_num[i] == PREEMPTED_TOKEN_ID, # the code logs "sync preemption" and continues without triggering abort recycling # This is the expected behavior for speculative decoding mode - mock_logger.info.assert_any_call(f"sync preemption for request_id {task_id} done.") + mock_log_request.assert_any_call( + level=0, + message="sync preemption for request_id {request_id} done.", + request_id=task_id, + ) # Verify that _recycle_resources was NOT called for the aborted task # (it may be called for other tasks like test_request_2 if they receive EOS tokens) for call in processor._recycle_resources.call_args_list: diff --git a/tests/output/test_process_batch_output_use_zmq.py b/tests/output/test_process_batch_output_use_zmq.py index 07826e6f0eb..50ec77ccb69 100644 --- a/tests/output/test_process_batch_output_use_zmq.py +++ b/tests/output/test_process_batch_output_use_zmq.py @@ -171,16 +171,20 @@ def test_process_batch_output_use_zmq_aborted_task_negative_token(self): # Mock _recycle_resources to track if it's called self.processor._recycle_resources = MagicMock() - # Mock the llm_logger module and envs.ENABLE_V1_KVCACHE_SCHEDULER + # Mock the log_request function and envs.ENABLE_V1_KVCACHE_SCHEDULER with ( - patch("fastdeploy.output.token_processor.llm_logger") as mock_logger, + patch("fastdeploy.output.token_processor.log_request") as mock_log_request, patch("fastdeploy.output.token_processor.envs.ENABLE_V1_KVCACHE_SCHEDULER", 1), ): # Call the method result = self.processor._process_batch_output_use_zmq([stream_data]) - # Verify the recycling logic was triggered - mock_logger.info.assert_any_call(f"start to recycle abort request_id {task_id}") + # Verify the recycling logic was triggered via log_request + mock_log_request.assert_any_call( + level=0, + message="start to recycle abort request_id {request_id}", + request_id=task_id, + ) self.processor.resource_manager.recycle_abort_task.assert_called_once_with(task_id) self.assertNotIn(task_id, self.processor.resource_manager.to_be_aborted_req_id_set) self.assertEqual(len(result), 0) # Aborted task is skipped (continue) diff --git a/tests/scheduler/test_dp_scheduler.py b/tests/scheduler/test_dp_scheduler.py index 0e42c4491f3..e67d2571494 100644 --- a/tests/scheduler/test_dp_scheduler.py +++ b/tests/scheduler/test_dp_scheduler.py @@ -47,9 +47,13 @@ class MockEnv: sys.modules["fastdeploy.scheduler"] = Mock() sys.modules["fastdeploy.scheduler.local_scheduler"] = Mock() sys.modules["fastdeploy.scheduler.data"] = Mock() +sys.modules["fastdeploy.logger"] = Mock() +sys.modules["fastdeploy.logger.request_logger"] = Mock() # Mock the get_logger function sys.modules["fastdeploy.utils"].get_logger = Mock(return_value=mock_logger) +# Mock the log_request function +sys.modules["fastdeploy.logger.request_logger"].log_request = Mock() # Mock the Request, RequestOutput, and ScheduledResponse classes @@ -240,8 +244,9 @@ def test_initialization_with_custom_role(self): def test_put_results_with_finished_requests(self): """Test putting results with finished requests.""" - # Reset mock logger - mock_logger.reset_mock() + # Get the mock log_request function + mock_log_request = sys.modules["fastdeploy.logger.request_logger"].log_request + mock_log_request.reset_mock() # Create mock request outputs results = [ @@ -254,13 +259,12 @@ def test_put_results_with_finished_requests(self): with patch.object(self.scheduler, "responses_not_empty"): self.scheduler.put_results(results) - # Check that finished requests were logged - the logger should have been called - self.assertTrue(mock_logger.info.called) + # Check that finished requests were logged via log_request + self.assertTrue(mock_log_request.called) # Get the actual call arguments to verify the message format - call_args = mock_logger.info.call_args[0][0] - self.assertIn("finished responses", call_args) - self.assertIn("req1", call_args) - self.assertIn("req3", call_args) + call_kwargs = mock_log_request.call_args[1] + self.assertIn("finished responses", call_kwargs.get("message", "")) + self.assertIn("req1", str(call_kwargs.get("request_ids", []))) def test_put_results_with_new_responses(self): """Test putting results with new responses.""" diff --git a/tests/scheduler/test_local_scheduler.py b/tests/scheduler/test_local_scheduler.py index 48ef2844a09..1d55fe23b0d 100644 --- a/tests/scheduler/test_local_scheduler.py +++ b/tests/scheduler/test_local_scheduler.py @@ -449,12 +449,14 @@ def test_logging_put_results_finished(self): # Add request first self.scheduler.put_requests([self.mock_request_1]) - with patch.object(scheduler_logger, "info") as mock_info: + with patch("fastdeploy.scheduler.local_scheduler.log_request") as mock_log_request: mock_output = self._create_test_request_output("req_1", finished=True) self.scheduler.put_results([mock_output]) - # Should log finished response - self._assert_log_contains(mock_info, "finished responses") + # Should log finished response via log_request + mock_log_request.assert_called_once() + call_kwargs = mock_log_request.call_args[1] + self.assertIn("finished responses", call_kwargs.get("message", "")) if __name__ == "__main__":