diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index ade72f110..1fdf3c944 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -22,6 +22,19 @@ ) from astrbot.api.platform import AstrBotMessage, MessageType, PlatformMetadata +# sendMessageDraft 的 draft_id 模块级递增计数器(溢出时归 1) +_TELEGRAM_DRAFT_ID_MAX = 2_147_483_647 +_next_draft_id = 0 + + +def _allocate_draft_id() -> int: + """分配一个全局递增的 draft_id,溢出时归 1。""" + global _next_draft_id + _next_draft_id = ( + 1 if _next_draft_id >= _TELEGRAM_DRAFT_ID_MAX else _next_draft_id + 1 + ) + return _next_draft_id + class TelegramPlatformEvent(AstrMessageEvent): # Telegram 的最大消息长度限制 @@ -339,6 +352,44 @@ async def react(self, emoji: str | None, big: bool = False) -> None: except Exception as e: logger.error(f"[Telegram] 添加反应失败: {e}") + async def _send_message_draft( + self, + chat_id: str, + draft_id: int, + text: str, + message_thread_id: str | None = None, + parse_mode: str | None = None, + ) -> None: + """通过 Bot.send_message_draft 发送草稿消息(流式推送部分消息)。 + + 该 API 仅支持私聊。 + + Args: + chat_id: 目标私聊的 chat_id + draft_id: 草稿唯一标识,非零整数;相同 draft_id 的变更会以动画展示 + text: 消息文本,1-4096 字符 + message_thread_id: 可选,目标消息线程 ID + parse_mode: 可选,消息文本的解析模式 + """ + kwargs: dict[str, Any] = {} + if message_thread_id: + kwargs["message_thread_id"] = int(message_thread_id) + if parse_mode: + kwargs["parse_mode"] = parse_mode + + try: + logger.debug( + f"[Telegram] sendMessageDraft: chat_id={chat_id}, draft_id={draft_id}, text_len={len(text)}" + ) + await self.client.send_message_draft( + chat_id=int(chat_id), + draft_id=draft_id, + text=text, + **kwargs, + ) + except Exception as e: + logger.warning(f"[Telegram] sendMessageDraft 失败: {e!s}") + async def send_streaming(self, generator, use_fallback: bool = False): message_thread_id = None @@ -356,6 +407,179 @@ async def send_streaming(self, generator, use_fallback: bool = False): if message_thread_id: payload["message_thread_id"] = message_thread_id + # sendMessageDraft 仅支持私聊 + is_private = self.get_message_type() != MessageType.GROUP_MESSAGE + + if is_private: + logger.info("[Telegram] 流式输出: 使用 sendMessageDraft (私聊)") + await self._send_streaming_draft( + user_name, message_thread_id, payload, generator + ) + else: + logger.info("[Telegram] 流式输出: 使用 edit_message_text fallback (群聊)") + await self._send_streaming_edit( + user_name, message_thread_id, payload, generator + ) + + return await super().send_streaming(generator, use_fallback) + + async def _send_streaming_draft( + self, + user_name: str, + message_thread_id: str | None, + payload: dict[str, Any], + generator, + ) -> None: + """使用 sendMessageDraft API 进行流式推送(私聊专用)。 + + 流式过程中使用 sendMessageDraft 推送草稿动画, + 流式结束后发送一条真实消息保留最终内容(draft 是临时的,会消失)。 + 使用独立的异步发送循环,按固定间隔发送最新缓冲区内容, + 完全解耦 token 到达速度与 API 网络延迟。 + """ + draft_id = _allocate_draft_id() + delta = "" + last_sent_text = "" + send_interval = 0.5 # 独立发送循环间隔 (秒) + streaming_done = False # 信号:生成器已结束 + + async def _draft_sender_loop() -> None: + """独立的草稿发送循环,按固定间隔发送最新内容。""" + nonlocal last_sent_text + while not streaming_done: + await asyncio.sleep(send_interval) + if delta and delta != last_sent_text: + draft_text = delta[: self.MAX_MESSAGE_LENGTH] + if draft_text != last_sent_text: + try: + await self._send_message_draft( + user_name, + draft_id, + draft_text, + message_thread_id, + ) + last_sent_text = draft_text + except Exception: + pass # 草稿发送失败不影响流式 + + # 启动独立发送循环 + sender_task = asyncio.create_task(_draft_sender_loop()) + + try: + async for chain in generator: + if isinstance(chain, MessageChain): + if chain.type == "break": + # 分割符:停止发送循环,发送真实消息,重置状态 + streaming_done = True + await sender_task + if delta: + try: + markdown_text = telegramify_markdown.markdownify( + delta, + normalize_whitespace=False, + ) + await self.client.send_message( + text=markdown_text, + parse_mode="MarkdownV2", + **cast(Any, payload), + ) + except Exception as e: + logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") + await self.client.send_message( + text=delta, **cast(Any, payload) + ) + # 重置并启动新的发送循环 + delta = "" + last_sent_text = "" + draft_id = _allocate_draft_id() + streaming_done = False + sender_task = asyncio.create_task(_draft_sender_loop()) + continue + + # 处理消息链中的每个组件 + for i in chain.chain: + if isinstance(i, Plain): + delta += i.text + elif isinstance(i, Image): + image_path = await i.convert_to_file_path() + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_PHOTO, + self.client.send_photo, + user_name=user_name, + photo=image_path, + **cast(Any, payload), + ) + continue + elif isinstance(i, File): + path = await i.get_file() + name = i.name or os.path.basename(path) + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_DOCUMENT, + self.client.send_document, + user_name=user_name, + document=path, + filename=name, + **cast(Any, payload), + ) + continue + elif isinstance(i, Record): + path = await i.convert_to_file_path() + await self._send_voice_with_fallback( + self.client, + path, + payload, + caption=i.text or delta or None, + user_name=user_name, + message_thread_id=message_thread_id, + use_media_action=True, + ) + continue + elif isinstance(i, Video): + path = await i.convert_to_file_path() + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_VIDEO, + self.client.send_video, + user_name=user_name, + video=path, + **cast(Any, payload), + ) + continue + else: + logger.warning(f"不支持的消息类型: {type(i)}") + continue + finally: + # 停止发送循环 + streaming_done = True + if not sender_task.done(): + await sender_task + + # 流式结束:发送真实消息保留最终内容 + if delta: + try: + markdown_text = telegramify_markdown.markdownify( + delta, + normalize_whitespace=False, + ) + await self.client.send_message( + text=markdown_text, + parse_mode="MarkdownV2", + **cast(Any, payload), + ) + except Exception as e: + logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") + await self.client.send_message(text=delta, **cast(Any, payload)) + + async def _send_streaming_edit( + self, + user_name: str, + message_thread_id: str | None, + payload: dict[str, Any], + generator, + ) -> None: + """使用 send_message + edit_message_text 进行流式推送(群聊 fallback)。""" delta = "" current_content = "" message_id = None @@ -506,5 +730,3 @@ async def send_streaming(self, generator, use_fallback: bool = False): ) except Exception as e: logger.warning(f"编辑消息失败(streaming): {e!s}") - - return await super().send_streaming(generator, use_fallback) diff --git a/pyproject.toml b/pyproject.toml index e57a0216c..d981c2470 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "pydantic>=2.12.5", "pydub>=0.25.1", "pyjwt>=2.10.1", - "python-telegram-bot>=22.0", + "python-telegram-bot>=22.6", "qq-botpy>=1.2.1", "quart>=0.20.0", "readability-lxml>=0.8.4.1", diff --git a/requirements.txt b/requirements.txt index c06c1d0f2..d76a11dde 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,7 +32,7 @@ py-cord>=2.6.1 pydantic>=2.12.5 pydub>=0.25.1 pyjwt>=2.10.1 -python-telegram-bot>=22.0 +python-telegram-bot>=22.6 qq-botpy>=1.2.1 quart>=0.20.0 readability-lxml>=0.8.4.1 diff --git a/tests/fixtures/mocks/telegram.py b/tests/fixtures/mocks/telegram.py index fbe4d0436..904ec4d09 100644 --- a/tests/fixtures/mocks/telegram.py +++ b/tests/fixtures/mocks/telegram.py @@ -110,6 +110,7 @@ def create_bot(): bot.set_my_commands = AsyncMock() bot.set_message_reaction = AsyncMock() bot.edit_message_text = AsyncMock() + bot.send_message_draft = AsyncMock() return bot @staticmethod