feat(telegram): 使用 sendMessageDraft API 实现私聊流式输出#5726
feat(telegram): 使用 sendMessageDraft API 实现私聊流式输出#5726camera-2018 wants to merge 1 commit intoAstrBotDevs:masterfrom
Conversation
- 新增 _send_message_draft 方法封装 Telegram Bot API sendMessageDraft - 私聊流式输出使用 sendMessageDraft 推送草稿动画,群聊保留 edit_message_text 回退 - 使用独立异步发送循环 (_draft_sender_loop) 按固定间隔推送最新缓冲区内容, 完全解耦 token 到达速度与 API 网络延迟 - 流式结束后发送真实消息保留最终内容(draft 是临时的) - 使用模块级递增 draft_id 替代随机生成,确保 Telegram 端动画连续性
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 此拉取请求旨在显著改善 Telegram 私聊中的流式输出用户体验。通过利用 Telegram Bot API v9.3 中新增的 Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
感谢你的贡献!这个 PR 通过使用 sendMessageDraft API 极大地改善了 Telegram 私聊中的流式输出体验,设计思路清晰,实现也很完整。
我对代码进行了一些审查,并提出以下几点建议,希望能帮助进一步提升代码质量:
- 在
_send_streaming_draft方法中,发送最终消息的逻辑存在重复。 - 在
_draft_sender_loop中,对草稿发送失败的异常处理可以稍作改进,增加日志以方便调试。
具体的修改建议请见行内评论。总体来说,这是一次很棒的更新!
| except Exception: | ||
| pass # 草稿发送失败不影响流式 |
There was a problem hiding this comment.
| if delta: | ||
| try: | ||
| markdown_text = telegramify_markdown.markdownify( | ||
| delta, | ||
| normalize_whitespace=False, | ||
| ) | ||
| await self.client.send_message( | ||
| text=markdown_text, | ||
| parse_mode="MarkdownV2", | ||
| **cast(Any, payload), | ||
| ) | ||
| except Exception as e: | ||
| logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") | ||
| await self.client.send_message(text=delta, **cast(Any, payload)) |
There was a problem hiding this comment.
你好——我发现了 1 个问题,并留下了一些高层次反馈:
- 在
send_streaming中,你现在已经完全通过_send_streaming_draft/_send_streaming_edit处理了流式发送,但在此之后仍然执行return await super().send_streaming(generator, use_fallback),这会在 generator 已被消费的情况下再次运行;建议移除super()调用,或重新调整结构以避免被处理两次。 _allocate_draft_id中的全局_next_draft_id计数器是一个简单的模块级可变状态;如果这段代码可能会在多线程或多进程环境下并发执行,你可能需要让草稿 ID 的分配显式地绑定到进程或实例,以避免冲突。is_private检查通过self.get_message_type() != MessageType.GROUP_MESSAGE来决定是否调用sendMessageDraft;如果MessageType还可能表示其他非私聊上下文(例如频道/超级群),那么显式检查“私聊/DM 类型”会比依赖!= GROUP_MESSAGE更安全。
供 AI 代理使用的提示
Please address the comments from this code review:
## Overall Comments
- In `send_streaming`, you now fully handle streaming via `_send_streaming_draft` / `_send_streaming_edit` but still `return await super().send_streaming(generator, use_fallback)` afterward, which will run with an already-consumed generator; consider removing the `super()` call or restructuring to avoid double-handling.
- The global `_next_draft_id` counter in `_allocate_draft_id` is a simple module-level mutable state; if this code can be executed concurrently across threads or processes, you may want to make draft ID allocation explicitly process- or instance-scoped to avoid collisions.
- The `is_private` check uses `self.get_message_type() != MessageType.GROUP_MESSAGE` to decide whether to call `sendMessageDraft`; if `MessageType` can represent other non-private contexts (e.g., channels/supergroups), it might be safer to explicitly check for a private/DM type instead of relying on `!= GROUP_MESSAGE`.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/telegram/tg_event.py" line_range="426" />
<code_context>
+
+ return await super().send_streaming(generator, use_fallback)
+
+ async def _send_streaming_draft(
+ self,
+ user_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared MessageChain item handling, simplifying the draft sender loop, and localizing draft ID allocation to reduce duplication and make the streaming logic easier to follow and maintain.
You can reduce complexity and duplication without changing behavior by:
### 1. Extracting shared `MessageChain` processing
The inner `for i in chain.chain` loop is essentially duplicated between `_send_streaming_draft` and `_send_streaming_edit`. You can factor it out into a small helper that appends text and sends media; the caller just passes how to accumulate text and any extra context.
```python
async def _process_message_chain_items(
self,
chain: MessageChain,
payload: dict[str, Any],
user_name: str,
message_thread_id: str | None,
append_text: Callable[[str], None],
) -> None:
for i in chain.chain:
if isinstance(i, Plain):
append_text(i.text)
elif isinstance(i, Image):
image_path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_PHOTO,
self.client.send_photo,
user_name=user_name,
photo=image_path,
**cast(Any, payload),
)
elif isinstance(i, File):
path = await i.get_file()
name = i.name or os.path.basename(path)
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_DOCUMENT,
self.client.send_document,
user_name=user_name,
document=path,
filename=name,
**cast(Any, payload),
)
elif isinstance(i, Record):
path = await i.convert_to_file_path()
await self._send_voice_with_fallback(
self.client,
path,
payload,
caption=i.text or None,
user_name=user_name,
message_thread_id=message_thread_id,
use_media_action=True,
)
elif isinstance(i, Video):
path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_VIDEO,
self.client.send_video,
user_name=user_name,
video=path,
**cast(Any, payload),
)
else:
logger.warning(f"不支持的消息类型: {type(i)}")
```
Then in both streaming functions:
```python
# in _send_streaming_draft
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t), # or a small wrapper
)
```
```python
# in _send_streaming_edit
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)
```
This keeps all behavior but removes the large duplicated `Plain/Image/File/Record/Video` branches, making both methods focused on how text is streamed (draft vs edit).
### 2. Simplifying the draft sender loop (no restart per segment)
You can keep a single sender loop for the entire `_send_streaming_draft` call and avoid cancelling/restarting the task on each `break`. Let the loop:
- watch a `current_draft_id`
- send whatever `delta` contains at fixed intervals
- stop only once at the end of the generator
On `break`, you only need to send the final real message, clear `delta`, and update `current_draft_id`; the loop picks up the new state automatically.
```python
async def _send_streaming_draft(...):
draft_id = _allocate_draft_id()
delta = ""
last_sent_text = ""
send_interval = 0.5
done = False # generator finished
async def _draft_sender_loop() -> None:
nonlocal last_sent_text, draft_id
while not done:
await asyncio.sleep(send_interval)
if delta and delta != last_sent_text:
draft_text = delta[: self.MAX_MESSAGE_LENGTH]
if draft_text != last_sent_text:
try:
await self._send_message_draft(
user_name,
draft_id, # always use latest draft_id
draft_text,
message_thread_id,
)
last_sent_text = draft_text
except Exception:
pass
sender_task = asyncio.create_task(_draft_sender_loop())
try:
async for chain in generator:
if not isinstance(chain, MessageChain):
continue
if chain.type == "break":
# flush current segment as real message
if delta:
await self._send_final_segment(delta, payload)
# reset state for next segment; loop keeps running
delta = ""
last_sent_text = ""
draft_id = _allocate_draft_id()
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)
finally:
done = True
await sender_task
if delta:
await self._send_final_segment(delta, payload)
async def _send_final_segment(self, delta: str, payload: dict[str, Any]) -> None:
try:
markdown_text = telegramify_markdown.markdownify(
delta,
normalize_whitespace=False,
)
await self.client.send_message(
text=markdown_text,
parse_mode="MarkdownV2",
**cast(Any, payload),
)
except Exception as e:
logger.warning(f"Markdown转换失败,使用普通文本: {e!s}")
await self.client.send_message(text=delta, **cast(Any, payload))
```
This keeps the “periodically send latest buffer” semantics and segment-by-segment `draft_id` updates, but removes:
- `streaming_done` toggling
- task cancellation / recreation on each `break`
- duplicated “send final markdown vs plain text” logic (moved to `_send_final_segment`).
### 3. Localizing draft ID state
If possible in your object model, you can avoid the module‑level `global` counter by attaching it to the class/instance (still preserves wraparound behavior):
```python
class TelegramPlatformEvent(AstrMessageEvent):
_TELEGRAM_DRAFT_ID_MAX = 2_147_483_647
_next_draft_id: int = 0 # class-level or move to __init__ as self._next_draft_id
@classmethod
def _allocate_draft_id(cls) -> int:
cls._next_draft_id = (
1 if cls._next_draft_id >= cls._TELEGRAM_DRAFT_ID_MAX else cls._next_draft_id + 1
)
return cls._next_draft_id
```
Then in `_send_streaming_draft`:
```python
draft_id = self._allocate_draft_id()
...
draft_id = self._allocate_draft_id()
```
This removes `global` and makes the draft ID evolution easier to reason about (and to override/mock in tests) while keeping the same integer behavior.
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English
Hey - I've found 1 issue, and left some high level feedback:
- In
send_streaming, you now fully handle streaming via_send_streaming_draft/_send_streaming_editbut stillreturn await super().send_streaming(generator, use_fallback)afterward, which will run with an already-consumed generator; consider removing thesuper()call or restructuring to avoid double-handling. - The global
_next_draft_idcounter in_allocate_draft_idis a simple module-level mutable state; if this code can be executed concurrently across threads or processes, you may want to make draft ID allocation explicitly process- or instance-scoped to avoid collisions. - The
is_privatecheck usesself.get_message_type() != MessageType.GROUP_MESSAGEto decide whether to callsendMessageDraft; ifMessageTypecan represent other non-private contexts (e.g., channels/supergroups), it might be safer to explicitly check for a private/DM type instead of relying on!= GROUP_MESSAGE.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `send_streaming`, you now fully handle streaming via `_send_streaming_draft` / `_send_streaming_edit` but still `return await super().send_streaming(generator, use_fallback)` afterward, which will run with an already-consumed generator; consider removing the `super()` call or restructuring to avoid double-handling.
- The global `_next_draft_id` counter in `_allocate_draft_id` is a simple module-level mutable state; if this code can be executed concurrently across threads or processes, you may want to make draft ID allocation explicitly process- or instance-scoped to avoid collisions.
- The `is_private` check uses `self.get_message_type() != MessageType.GROUP_MESSAGE` to decide whether to call `sendMessageDraft`; if `MessageType` can represent other non-private contexts (e.g., channels/supergroups), it might be safer to explicitly check for a private/DM type instead of relying on `!= GROUP_MESSAGE`.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/telegram/tg_event.py" line_range="426" />
<code_context>
+
+ return await super().send_streaming(generator, use_fallback)
+
+ async def _send_streaming_draft(
+ self,
+ user_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared MessageChain item handling, simplifying the draft sender loop, and localizing draft ID allocation to reduce duplication and make the streaming logic easier to follow and maintain.
You can reduce complexity and duplication without changing behavior by:
### 1. Extracting shared `MessageChain` processing
The inner `for i in chain.chain` loop is essentially duplicated between `_send_streaming_draft` and `_send_streaming_edit`. You can factor it out into a small helper that appends text and sends media; the caller just passes how to accumulate text and any extra context.
```python
async def _process_message_chain_items(
self,
chain: MessageChain,
payload: dict[str, Any],
user_name: str,
message_thread_id: str | None,
append_text: Callable[[str], None],
) -> None:
for i in chain.chain:
if isinstance(i, Plain):
append_text(i.text)
elif isinstance(i, Image):
image_path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_PHOTO,
self.client.send_photo,
user_name=user_name,
photo=image_path,
**cast(Any, payload),
)
elif isinstance(i, File):
path = await i.get_file()
name = i.name or os.path.basename(path)
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_DOCUMENT,
self.client.send_document,
user_name=user_name,
document=path,
filename=name,
**cast(Any, payload),
)
elif isinstance(i, Record):
path = await i.convert_to_file_path()
await self._send_voice_with_fallback(
self.client,
path,
payload,
caption=i.text or None,
user_name=user_name,
message_thread_id=message_thread_id,
use_media_action=True,
)
elif isinstance(i, Video):
path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_VIDEO,
self.client.send_video,
user_name=user_name,
video=path,
**cast(Any, payload),
)
else:
logger.warning(f"不支持的消息类型: {type(i)}")
```
Then in both streaming functions:
```python
# in _send_streaming_draft
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t), # or a small wrapper
)
```
```python
# in _send_streaming_edit
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)
```
This keeps all behavior but removes the large duplicated `Plain/Image/File/Record/Video` branches, making both methods focused on how text is streamed (draft vs edit).
### 2. Simplifying the draft sender loop (no restart per segment)
You can keep a single sender loop for the entire `_send_streaming_draft` call and avoid cancelling/restarting the task on each `break`. Let the loop:
- watch a `current_draft_id`
- send whatever `delta` contains at fixed intervals
- stop only once at the end of the generator
On `break`, you only need to send the final real message, clear `delta`, and update `current_draft_id`; the loop picks up the new state automatically.
```python
async def _send_streaming_draft(...):
draft_id = _allocate_draft_id()
delta = ""
last_sent_text = ""
send_interval = 0.5
done = False # generator finished
async def _draft_sender_loop() -> None:
nonlocal last_sent_text, draft_id
while not done:
await asyncio.sleep(send_interval)
if delta and delta != last_sent_text:
draft_text = delta[: self.MAX_MESSAGE_LENGTH]
if draft_text != last_sent_text:
try:
await self._send_message_draft(
user_name,
draft_id, # always use latest draft_id
draft_text,
message_thread_id,
)
last_sent_text = draft_text
except Exception:
pass
sender_task = asyncio.create_task(_draft_sender_loop())
try:
async for chain in generator:
if not isinstance(chain, MessageChain):
continue
if chain.type == "break":
# flush current segment as real message
if delta:
await self._send_final_segment(delta, payload)
# reset state for next segment; loop keeps running
delta = ""
last_sent_text = ""
draft_id = _allocate_draft_id()
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)
finally:
done = True
await sender_task
if delta:
await self._send_final_segment(delta, payload)
async def _send_final_segment(self, delta: str, payload: dict[str, Any]) -> None:
try:
markdown_text = telegramify_markdown.markdownify(
delta,
normalize_whitespace=False,
)
await self.client.send_message(
text=markdown_text,
parse_mode="MarkdownV2",
**cast(Any, payload),
)
except Exception as e:
logger.warning(f"Markdown转换失败,使用普通文本: {e!s}")
await self.client.send_message(text=delta, **cast(Any, payload))
```
This keeps the “periodically send latest buffer” semantics and segment-by-segment `draft_id` updates, but removes:
- `streaming_done` toggling
- task cancellation / recreation on each `break`
- duplicated “send final markdown vs plain text” logic (moved to `_send_final_segment`).
### 3. Localizing draft ID state
If possible in your object model, you can avoid the module‑level `global` counter by attaching it to the class/instance (still preserves wraparound behavior):
```python
class TelegramPlatformEvent(AstrMessageEvent):
_TELEGRAM_DRAFT_ID_MAX = 2_147_483_647
_next_draft_id: int = 0 # class-level or move to __init__ as self._next_draft_id
@classmethod
def _allocate_draft_id(cls) -> int:
cls._next_draft_id = (
1 if cls._next_draft_id >= cls._TELEGRAM_DRAFT_ID_MAX else cls._next_draft_id + 1
)
return cls._next_draft_id
```
Then in `_send_streaming_draft`:
```python
draft_id = self._allocate_draft_id()
...
draft_id = self._allocate_draft_id()
```
This removes `global` and makes the draft ID evolution easier to reason about (and to override/mock in tests) while keeping the same integer behavior.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
|
||
| return await super().send_streaming(generator, use_fallback) | ||
|
|
||
| async def _send_streaming_draft( |
There was a problem hiding this comment.
issue (complexity): 请考虑抽取共享的 MessageChain 条目处理逻辑、简化草稿发送循环并本地化草稿 ID 分配,以减少重复,让流式逻辑更容易理解和维护。
你可以在不改变行为的前提下降低复杂度和重复度,方式包括:
1. 抽取共享的 MessageChain 处理
内部的 for i in chain.chain 循环在 _send_streaming_draft 和 _send_streaming_edit 之间基本是重复的。你可以把它提取到一个小的辅助函数中,用于追加文本和发送媒体;调用方只需要传入如何累积文本以及任何额外上下文。
async def _process_message_chain_items(
self,
chain: MessageChain,
payload: dict[str, Any],
user_name: str,
message_thread_id: str | None,
append_text: Callable[[str], None],
) -> None:
for i in chain.chain:
if isinstance(i, Plain):
append_text(i.text)
elif isinstance(i, Image):
image_path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_PHOTO,
self.client.send_photo,
user_name=user_name,
photo=image_path,
**cast(Any, payload),
)
elif isinstance(i, File):
path = await i.get_file()
name = i.name or os.path.basename(path)
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_DOCUMENT,
self.client.send_document,
user_name=user_name,
document=path,
filename=name,
**cast(Any, payload),
)
elif isinstance(i, Record):
path = await i.convert_to_file_path()
await self._send_voice_with_fallback(
self.client,
path,
payload,
caption=i.text or None,
user_name=user_name,
message_thread_id=message_thread_id,
use_media_action=True,
)
elif isinstance(i, Video):
path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_VIDEO,
self.client.send_video,
user_name=user_name,
video=path,
**cast(Any, payload),
)
else:
logger.warning(f"不支持的消息类型: {type(i)}")然后在两个流式函数中:
# in _send_streaming_draft
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t), # or a small wrapper
)# in _send_streaming_edit
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)这样可以保持所有行为不变,但去掉大量重复的 Plain/Image/File/Record/Video 分支,让两个方法都聚焦于文本是如何流式发送的(草稿 vs 编辑)。
2. 简化草稿发送循环(每个分段无需重启)
你可以在整个 _send_streaming_draft 调用期间保持一个单一的发送循环,避免在每次遇到 break 时取消/重启任务。让循环:
- 观察一个
current_draft_id - 按固定时间间隔发送
delta中的内容 - 只在 generator 结束时停止一次
在 break 时,你只需要发送最终的真实消息、清空 delta 并更新 current_draft_id;循环会自动使用新的状态继续运行。
async def _send_streaming_draft(...):
draft_id = _allocate_draft_id()
delta = ""
last_sent_text = ""
send_interval = 0.5
done = False # generator finished
async def _draft_sender_loop() -> None:
nonlocal last_sent_text, draft_id
while not done:
await asyncio.sleep(send_interval)
if delta and delta != last_sent_text:
draft_text = delta[: self.MAX_MESSAGE_LENGTH]
if draft_text != last_sent_text:
try:
await self._send_message_draft(
user_name,
draft_id, # always use latest draft_id
draft_text,
message_thread_id,
)
last_sent_text = draft_text
except Exception:
pass
sender_task = asyncio.create_task(_draft_sender_loop())
try:
async for chain in generator:
if not isinstance(chain, MessageChain):
continue
if chain.type == "break":
# flush current segment as real message
if delta:
await self._send_final_segment(delta, payload)
# reset state for next segment; loop keeps running
delta = ""
last_sent_text = ""
draft_id = _allocate_draft_id()
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)
finally:
done = True
await sender_task
if delta:
await self._send_final_segment(delta, payload)
async def _send_final_segment(self, delta: str, payload: dict[str, Any]) -> None:
try:
markdown_text = telegramify_markdown.markdownify(
delta,
normalize_whitespace=False,
)
await self.client.send_message(
text=markdown_text,
parse_mode="MarkdownV2",
**cast(Any, payload),
)
except Exception as e:
logger.warning(f"Markdown转换失败,使用普通文本: {e!s}")
await self.client.send_message(text=delta, **cast(Any, payload))这保持了“周期性发送最新缓冲区”的语义以及每个分段的 draft_id 更新,但去掉了:
streaming_done标记的切换- 在每次
break上的任务取消/重建 - 重复的“发送最终 markdown vs 纯文本”逻辑(移到了
_send_final_segment中)。
3. 本地化草稿 ID 状态
如果你的对象模型允许,可以通过把计数器挂到类/实例上来避免模块级的 global 计数器(同时保留回绕行为):
class TelegramPlatformEvent(AstrMessageEvent):
_TELEGRAM_DRAFT_ID_MAX = 2_147_483_647
_next_draft_id: int = 0 # class-level or move to __init__ as self._next_draft_id
@classmethod
def _allocate_draft_id(cls) -> int:
cls._next_draft_id = (
1 if cls._next_draft_id >= cls._TELEGRAM_DRAFT_ID_MAX else cls._next_draft_id + 1
)
return cls._next_draft_id然后在 _send_streaming_draft 中:
draft_id = self._allocate_draft_id()
...
draft_id = self._allocate_draft_id()这样就移除了 global,并让草稿 ID 的变化更容易推理(也更容易在测试中覆盖/模拟),同时保持相同的整数行为。
Original comment in English
issue (complexity): Consider extracting the shared MessageChain item handling, simplifying the draft sender loop, and localizing draft ID allocation to reduce duplication and make the streaming logic easier to follow and maintain.
You can reduce complexity and duplication without changing behavior by:
1. Extracting shared MessageChain processing
The inner for i in chain.chain loop is essentially duplicated between _send_streaming_draft and _send_streaming_edit. You can factor it out into a small helper that appends text and sends media; the caller just passes how to accumulate text and any extra context.
async def _process_message_chain_items(
self,
chain: MessageChain,
payload: dict[str, Any],
user_name: str,
message_thread_id: str | None,
append_text: Callable[[str], None],
) -> None:
for i in chain.chain:
if isinstance(i, Plain):
append_text(i.text)
elif isinstance(i, Image):
image_path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_PHOTO,
self.client.send_photo,
user_name=user_name,
photo=image_path,
**cast(Any, payload),
)
elif isinstance(i, File):
path = await i.get_file()
name = i.name or os.path.basename(path)
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_DOCUMENT,
self.client.send_document,
user_name=user_name,
document=path,
filename=name,
**cast(Any, payload),
)
elif isinstance(i, Record):
path = await i.convert_to_file_path()
await self._send_voice_with_fallback(
self.client,
path,
payload,
caption=i.text or None,
user_name=user_name,
message_thread_id=message_thread_id,
use_media_action=True,
)
elif isinstance(i, Video):
path = await i.convert_to_file_path()
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_VIDEO,
self.client.send_video,
user_name=user_name,
video=path,
**cast(Any, payload),
)
else:
logger.warning(f"不支持的消息类型: {type(i)}")Then in both streaming functions:
# in _send_streaming_draft
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t), # or a small wrapper
)# in _send_streaming_edit
async for chain in generator:
if isinstance(chain, MessageChain):
if chain.type == "break":
# existing break handling...
...
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)This keeps all behavior but removes the large duplicated Plain/Image/File/Record/Video branches, making both methods focused on how text is streamed (draft vs edit).
2. Simplifying the draft sender loop (no restart per segment)
You can keep a single sender loop for the entire _send_streaming_draft call and avoid cancelling/restarting the task on each break. Let the loop:
- watch a
current_draft_id - send whatever
deltacontains at fixed intervals - stop only once at the end of the generator
On break, you only need to send the final real message, clear delta, and update current_draft_id; the loop picks up the new state automatically.
async def _send_streaming_draft(...):
draft_id = _allocate_draft_id()
delta = ""
last_sent_text = ""
send_interval = 0.5
done = False # generator finished
async def _draft_sender_loop() -> None:
nonlocal last_sent_text, draft_id
while not done:
await asyncio.sleep(send_interval)
if delta and delta != last_sent_text:
draft_text = delta[: self.MAX_MESSAGE_LENGTH]
if draft_text != last_sent_text:
try:
await self._send_message_draft(
user_name,
draft_id, # always use latest draft_id
draft_text,
message_thread_id,
)
last_sent_text = draft_text
except Exception:
pass
sender_task = asyncio.create_task(_draft_sender_loop())
try:
async for chain in generator:
if not isinstance(chain, MessageChain):
continue
if chain.type == "break":
# flush current segment as real message
if delta:
await self._send_final_segment(delta, payload)
# reset state for next segment; loop keeps running
delta = ""
last_sent_text = ""
draft_id = _allocate_draft_id()
continue
await self._process_message_chain_items(
chain,
payload,
user_name,
message_thread_id,
append_text=lambda t: delta.__iadd__(t),
)
finally:
done = True
await sender_task
if delta:
await self._send_final_segment(delta, payload)
async def _send_final_segment(self, delta: str, payload: dict[str, Any]) -> None:
try:
markdown_text = telegramify_markdown.markdownify(
delta,
normalize_whitespace=False,
)
await self.client.send_message(
text=markdown_text,
parse_mode="MarkdownV2",
**cast(Any, payload),
)
except Exception as e:
logger.warning(f"Markdown转换失败,使用普通文本: {e!s}")
await self.client.send_message(text=delta, **cast(Any, payload))This keeps the “periodically send latest buffer” semantics and segment-by-segment draft_id updates, but removes:
streaming_donetoggling- task cancellation / recreation on each
break - duplicated “send final markdown vs plain text” logic (moved to
_send_final_segment).
3. Localizing draft ID state
If possible in your object model, you can avoid the module‑level global counter by attaching it to the class/instance (still preserves wraparound behavior):
class TelegramPlatformEvent(AstrMessageEvent):
_TELEGRAM_DRAFT_ID_MAX = 2_147_483_647
_next_draft_id: int = 0 # class-level or move to __init__ as self._next_draft_id
@classmethod
def _allocate_draft_id(cls) -> int:
cls._next_draft_id = (
1 if cls._next_draft_id >= cls._TELEGRAM_DRAFT_ID_MAX else cls._next_draft_id + 1
)
return cls._next_draft_idThen in _send_streaming_draft:
draft_id = self._allocate_draft_id()
...
draft_id = self._allocate_draft_id()This removes global and makes the draft ID evolution easier to reason about (and to override/mock in tests) while keeping the same integer behavior.
Telegram 现有的流式输出方案使用
send_message+edit_message_text,存在消息闪烁、推送通知干扰、编辑频率受 API 限制等问题。Telegram Bot API v9.3 新增了 sendMessageDraft ,可以在私聊中推送草稿预览动画而不创建真实消息,体验更接近"打字机"效果。 (需要开启AI配置-其他配置中的流式输出开关)手机端效果
9ac9334fbbc8ef5c8bca0ab929b4d756.mp4
电脑端效果
a80aa613f787f2f2dfc3f52d5c0e7794.mp4
Modifications / 改动点
astrbot/core/platform/sources/telegram/tg_event.py新增模块级
_allocate_draft_id()函数,生成全局递增的draft_id,确保 Telegram 端草稿变更以动画展示新增
_send_message_draft()方法,封装Bot.send_message_draftAPI 调用新增
_send_streaming_draft()方法,私聊专用的流式推送:_draft_sender_loop),每 0.5s 取最新缓冲区内容发送 draft,完全解耦 LLM token 到达速度与 Telegram API 网络延迟send_message(MarkdownV2)保留最终内容(draft 是临时的会消失)break分割符的正确处理修改
send_streaming()入口:私聊自动使用_send_streaming_draft,群聊回退到_send_streaming_editThis is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.