Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fastdeploy/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from fastdeploy.input.preprocess import InputPreprocessor
from fastdeploy.inter_communicator import IPCSignal
from fastdeploy.inter_communicator.zmq_client import ZmqIpcClient
from fastdeploy.logger.request_logger import log_request_error
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.utils import EngineError, envs, llm_logger

Expand Down Expand Up @@ -562,7 +563,7 @@ async def generate(
llm_logger.info(f"Request {conn_request_id} generator exit (outer)")
return
except Exception as e:
llm_logger.error(f"Request {conn_request_id} failed: {e}")
log_request_error(message="Request {request_id} failed: {error}", request_id=conn_request_id, error=e)
raise EngineError(str(e), error_code=500) from e
finally:
# Ensure request_map/request_num are cleaned up
Expand All @@ -584,7 +585,7 @@ async def abort_request(self, request_id: str) -> None:
await self.connection_manager.cleanup_request(request_id)
llm_logger.info(f"Aborted request {request_id}")
except Exception as e:
llm_logger.error(f"Failed to abort request {request_id}: {e}")
log_request_error(message="Failed to abort request {request_id}: {error}", request_id=request_id, error=e)

async def shutdown(self):
"""
Expand Down
47 changes: 36 additions & 11 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from fastdeploy.engine.expert_service import start_data_parallel_service
from fastdeploy.engine.request import Request
from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal
from fastdeploy.logger.request_logger import log_request, log_request_error
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.platforms import current_platform
from fastdeploy.utils import EngineError, console_logger, envs, llm_logger
Expand Down Expand Up @@ -285,7 +286,7 @@ def add_requests(self, task, sampling_params=None, **kwargs):
# Create Request struct after processing
request = Request.from_dict(task)
request.metrics.scheduler_recv_req_time = time.time()
llm_logger.info(f"Receive request {request}")
log_request(level=2, message="Receive request {request}", request=request)
request.metrics.preprocess_start_time = time.time()

request.prompt_token_ids_len = len(request.prompt_token_ids)
Expand All @@ -304,12 +305,20 @@ def add_requests(self, task, sampling_params=None, **kwargs):
f"Input text is too long, length of prompt token({input_ids_len}) "
f"+ min_dec_len ({min_tokens}) >= max_model_len "
)
llm_logger.error(error_msg)
log_request_error(
message="Input text is too long, length of prompt token({input_ids_len}) + min_dec_len ({min_tokens}) >= max_model_len",
input_ids_len=input_ids_len,
min_tokens=min_tokens,
)
raise EngineError(error_msg, error_code=400)

if input_ids_len > self.cfg.model_config.max_model_len:
error_msg = f"Length of input token({input_ids_len}) exceeds the limit max_model_len({self.cfg.model_config.max_model_len})."
llm_logger.error(error_msg)
log_request_error(
message="Length of input token({input_ids_len}) exceeds the limit max_model_len({max_model_len}).",
input_ids_len=input_ids_len,
max_model_len=self.cfg.model_config.max_model_len,
)
raise EngineError(error_msg, error_code=400)

if request.get("stop_seqs_len") is not None:
Expand All @@ -320,7 +329,11 @@ def add_requests(self, task, sampling_params=None, **kwargs):
f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})."
"Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`"
)
llm_logger.error(error_msg)
log_request_error(
message="Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num}).",
stop_seqs_len=stop_seqs_len,
max_stop_seqs_num=max_stop_seqs_num,
)
raise EngineError(error_msg, error_code=400)
stop_seqs_max_len = envs.FD_STOP_SEQS_MAX_LEN
for single_stop_seq_len in stop_seqs_len:
Expand All @@ -329,7 +342,11 @@ def add_requests(self, task, sampling_params=None, **kwargs):
f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})."
"Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`"
)
llm_logger.error(error_msg)
log_request_error(
message="Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len}).",
single_stop_seq_len=single_stop_seq_len,
stop_seqs_max_len=stop_seqs_max_len,
)
raise EngineError(error_msg, error_code=400)

if self._has_guided_input(request):
Expand All @@ -342,14 +359,18 @@ def add_requests(self, task, sampling_params=None, **kwargs):
request, err_msg = self.guided_decoding_checker.schema_format(request)

if err_msg is not None:
llm_logger.error(err_msg)
log_request_error(message="guided decoding error: {err_msg}", err_msg=err_msg)
raise EngineError(err_msg, error_code=400)

request.metrics.preprocess_end_time = time.time()
request.metrics.scheduler_recv_req_time = time.time()
self.engine.scheduler.put_requests([request])
llm_logger.info(f"Cache task with request_id ({request.get('request_id')})")
llm_logger.debug(f"cache task: {request}")
log_request(
level=1,
message="Cache task with request_id ({request_id})",
request_id=request.get("request_id"),
)
log_request(level=3, message="cache task: {request}", request=request)

def _worker_processes_ready(self):
"""
Expand Down Expand Up @@ -715,11 +736,15 @@ def generate(self, prompts, stream):
Yields:
dict: The generated response.
"""
llm_logger.info(f"Starting generation for prompt: {prompts}")
log_request(level=2, message="Starting generation for prompt: {prompts}", prompts=prompts)
try:
req_id = self._format_and_add_data(prompts)
except Exception as e:
llm_logger.error(f"Error happened while adding request, details={e}, {str(traceback.format_exc())}")
log_request_error(
message="Error happened while adding request, details={error}, {traceback}",
error=str(e),
traceback=traceback.format_exc(),
)
raise EngineError(str(e), error_code=400)

# Get the result of the current request
Expand All @@ -738,7 +763,7 @@ def generate(self, prompts, stream):
output = self.engine.data_processor.process_response_dict(
result.to_dict(), stream=False, include_stop_str_in_output=False, direct_decode=not stream
)
llm_logger.debug(f"Generate result: {output}")
log_request(level=3, message="Generate result: {output}", output=output)
if not stream:
yield output
else:
Expand Down
14 changes: 7 additions & 7 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
StructuralTagResponseFormat,
ToolCall,
)
from fastdeploy.utils import data_processor_logger
from fastdeploy.logger.request_logger import log_request, log_request_error
from fastdeploy.worker.output import (
LogprobsLists,
PromptLogprobs,
Expand Down Expand Up @@ -313,15 +313,13 @@ def from_generic_request(
), "The parameter `raw_request` is not supported now, please use completion api instead."
for key, value in req.metadata.items():
setattr(request, key, value)
from fastdeploy.utils import api_server_logger

api_server_logger.warning("The parameter metadata is obsolete.")
log_request(level=1, message="The parameter metadata is obsolete.")

return request

@classmethod
def from_dict(cls, d: dict):
data_processor_logger.debug(f"{d}")
log_request(level=3, message="{request}", request=d)
sampling_params: SamplingParams = None
pooling_params: PoolingParams = None
metrics: RequestMetrics = None
Expand Down Expand Up @@ -352,8 +350,10 @@ def from_dict(cls, d: dict):
ImagePosition(**mm_pos) if not isinstance(mm_pos, ImagePosition) else mm_pos
)
except Exception as e:
data_processor_logger.error(
f"Convert mm_positions to ImagePosition error: {e}, {str(traceback.format_exc())}"
log_request_error(
message="Convert mm_positions to ImagePosition error: {error}, {traceback}",
error=str(e),
traceback=traceback.format_exc(),
)
return cls(
request_id=d["request_id"],
Expand Down
7 changes: 4 additions & 3 deletions fastdeploy/entrypoints/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.engine import LLMEngine
from fastdeploy.logger.request_logger import log_request, log_request_error
from fastdeploy.utils import (
FlexibleArgumentParser,
api_server_logger,
Expand Down Expand Up @@ -61,7 +62,7 @@ async def generate(request: dict):
"""
generate stream api
"""
api_server_logger.info(f"Receive request: {request}")
log_request(level=3, message="Receive request: {request}", request=request)
stream = request.get("stream", 0)

if not stream:
Expand All @@ -72,7 +73,7 @@ async def generate(request: dict):
output = result
except Exception as e:
# 记录完整的异常堆栈信息
api_server_logger.error(f"Error during generation: {e!s}", exc_info=True)
log_request_error(message="Error during generation: {error}", error=str(e))
# 返回结构化的错误消息并终止流
output = {"error": str(e), "error_type": e.__class__.__name__}
return output
Expand All @@ -84,7 +85,7 @@ async def event_generator():
yield f"data: {json.dumps(result)}\n\n"
except Exception as e:
# 记录完整的异常堆栈信息
api_server_logger.error(f"Error during generation: {e!s}", exc_info=True)
log_request_error(message="Error during generation: {error}", error=str(e))
# 返回结构化的错误消息并终止流
error_msg = {"error": str(e), "error_type": e.__class__.__name__}
yield f"data: {json.dumps(error_msg)}\n\n"
Expand Down
Loading
Loading