feat: 支持 MCPRouter 平台 MCP 服务器同步#5690
Conversation
(cherry picked from commit 466580c16d335fcea34c5e98579754c992f9bc6f) (cherry picked from commit e6f7894)
(cherry picked from commit 3bbf33dd3d8c05210767bb40babca31f5363ae3f) (cherry picked from commit 30392fb)
(cherry picked from commit 447c221963fdbca66453e9031d6133c010cc4b4b) (cherry picked from commit e068be8)
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 此拉取请求引入了一个新的 MCP 服务器同步提供者抽象层,极大地增强了系统管理和集成不同 MCP 服务器平台的能力。通过将同步逻辑模块化,现在可以轻松添加对新平台(如 MCPRouter)的支持,同时改进了现有 ModelScope 平台的同步流程。此外,还对 MCP 客户端的错误处理和日志记录进行了改进,并更新了用户界面以提供更直观的 MCPRouter 服务器同步体验,从而提升了整体的用户体验和系统的可维护性。 Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Hey - 我发现了 1 个问题,并留下了一些整体性的反馈:
- HTTP 错误格式化逻辑和
_quick_test_mcp_connection现在在astrbot/core/agent/mcp_client.py和astrbot/core/provider/func_tool_manager.py中都有一份,存在重复;建议提取成共享的辅助函数,以避免逻辑偏移,并保持错误处理的一致性。 McpServerSyncProvider.list_servers当前会抛出NotImplementedError,但并没有被标记为抽象方法;建议要么将其标记为@abc.abstractmethod,要么提供一个无副作用的默认实现(例如返回空列表),以避免子类意外继承一个在运行时会失败的方法。- 在
McpRouterMcpServerSyncProvider.fetch中,MCP 调用使用的请求头(mcp_headers)是独立于_build_api_headers构造的,导致逻辑重复;在这里复用_build_api_headers会简化实现并降低请求头配置出现差异的风险。
给 AI Agent 的提示
Please address the comments from this code review:
## Overall Comments
- The HTTP error formatting and `_quick_test_mcp_connection` logic are now duplicated between `astrbot/core/agent/mcp_client.py` and `astrbot/core/provider/func_tool_manager.py`; consider extracting a shared helper to avoid drift and keep error handling consistent.
- `McpServerSyncProvider.list_servers` currently raises `NotImplementedError` but is not marked abstract; either make it `@abc.abstractmethod` or provide a no-op default implementation (e.g., returning an empty list) so that subclasses don’t accidentally inherit a method that will fail at runtime.
- In `McpRouterMcpServerSyncProvider.fetch`, headers for MCP calls (`mcp_headers`) are constructed separately from `_build_api_headers`, leading to duplicated logic; reusing `_build_api_headers` here would simplify the implementation and reduce the chance of header divergence.
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/mcp_sync_providers.py" line_range="20" />
<code_context>
+ provider: ClassVar[str]
+
+ @abc.abstractmethod
+ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
+ raise NotImplementedError
+
</code_context>
<issue_to_address>
**issue (complexity):** 考虑对 `McpRouterMcpServerSyncProvider` 进行重构,将不同的 `fetch` 模式、共享的解析/匹配逻辑、请求头构造、分页以及校验用 key 的选择拆分到各自的辅助函数中,以缩短方法长度并减少重复。
你可以在保持行为完全一致的前提下降低复杂度,方式包括:
1. **将多模式的 `fetch` 拆分为多个辅助方法**
`fetch` 目前同时处理三种不同模式和校验逻辑。你可以把顶层的流程控制保留在 `fetch` 中,将复杂逻辑移动到一些较小的私有方法里:
```python
async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
api_key = self._normalize_api_key(str(payload.get("api_key", "")))
if not api_key:
raise ValueError("Missing required field: api_key")
app_url = str(payload.get("app_url", "")).strip()
app_name = str(payload.get("app_name", "")).strip()
base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip("/")
list_url = f"{base_url}/list-servers"
get_url = f"{base_url}/get-server"
# existing validation logic...
validation_server_key = self._pick_validation_server_key(payload.get("servers"))
await self._validate_api_key(
api_key=api_key,
app_url=app_url,
app_name=app_name,
base_url=base_url,
server_key=validation_server_key,
)
api_headers = self._build_api_headers(
api_key=api_key,
app_url=app_url,
app_name=app_name,
)
mcp_headers = self._build_mcp_headers(api_key=api_key, app_url=app_url, app_name=app_name)
# shared params
query = str(payload.get("query", "")).strip().lower()
raw_max_servers = payload.get("max_servers")
max_servers = max(1, min(int(raw_max_servers or 30), 500))
limit = max(1, min(int(payload.get("limit", 30) or 30), 100))
max_pages = max(1, min(int(payload.get("max_pages", 10) or 10), 50))
provided_servers = payload.get("servers")
server_keys = self._parse_server_keys(payload.get("server_keys"))
if isinstance(provided_servers, list) and provided_servers:
return await self._fetch_from_provided_servers(
provided_servers, max_servers, raw_max_servers, mcp_headers
)
if server_keys:
return await self._fetch_from_server_keys(
server_keys, max_servers, get_url, api_headers, mcp_headers
)
return await self._fetch_from_listing(
list_url, api_headers, mcp_headers, query, max_servers, limit, max_pages
)
```
然后基于你现有的内部逻辑实现这些辅助方法,而不改变行为:
```python
async def _fetch_from_provided_servers(
self,
provided_servers: list[Any],
max_servers: int,
raw_max_servers: Any,
mcp_headers: dict[str, str],
) -> list[SyncedMcpServer]:
timeout = aiohttp.ClientTimeout(total=30)
used_names: set[str] = set()
items: list[SyncedMcpServer] = []
selected_servers = (
provided_servers[:max_servers] if raw_max_servers is not None else provided_servers
)
for server in selected_servers:
if not isinstance(server, dict):
continue
server_key = server.get("server_key")
server_name = (
server.get("config_name")
or server_key
or server.get("name")
or server.get("title")
)
server_url = server.get("server_url")
if not server_name or not server_url:
continue
items.append(
self._make_item(
name=str(server_name),
url=str(server_url),
used_names=used_names,
headers=mcp_headers,
server_key=str(server_key) if server_key else None,
)
)
return items
```
(你也可以类似地将 `/get-server` 和 `/list-servers` 分支分别移动到 `_fetch_from_server_keys` 和 `_fetch_from_listing` 中。)
2. **将嵌套的辅助函数提升为方法**
`parse_server_keys`、`matches` 和 `make_item` 可以变成私有方法。这会让 `fetch` 更短、更清晰,同时保持可重用性:
```python
def _parse_server_keys(self, value: Any) -> list[str]:
if isinstance(value, list):
parts = [str(item).strip() for item in value]
elif isinstance(value, str):
raw = value.replace(",", "\n").replace(";", "\n")
parts = [line.strip() for line in raw.splitlines()]
else:
return []
keys = [item for item in parts if item]
seen: set[str] = set()
result: list[str] = []
for item in keys:
if item in seen:
continue
seen.add(item)
result.append(item)
return result
def _matches(self, server: dict[str, Any], q: str) -> bool:
if not q:
return True
haystacks = [
server.get("config_name"),
server.get("server_key"),
server.get("name"),
server.get("title"),
server.get("description"),
server.get("author_name"),
]
combined = " ".join(str(v) for v in haystacks if v)
return q in combined.lower()
def _make_item(
self,
*,
name: str,
url: str,
used_names: set[str],
headers: dict[str, str],
server_key: str | None = None,
) -> SyncedMcpServer:
final_name = name
if final_name in used_names:
suffix = server_key or "dup"
final_name = f"{final_name}-{suffix}"
i = 2
while final_name in used_names:
final_name = f"{name}-{i}"
i += 1
used_names.add(final_name)
return SyncedMcpServer(
name=final_name,
config={
"url": url,
"transport": "streamable_http",
"headers": headers,
"active": True,
"provider": "mcprouter",
},
)
```
然后将原来对嵌套函数的调用替换为方法调用(`self._parse_server_keys`、`self._matches`、`self._make_item`)。
3. **统一构造请求头的逻辑**
`mcp_headers` 目前是手动构造的,但本质上与 `_build_api_headers` 相同。你可以引入一个小的封装函数,在不重复逻辑的前提下明确语义:
```python
def _build_mcp_headers(
self, *, api_key: str, app_url: str, app_name: str
) -> dict[str, str]:
# currently identical, but keeps semantic distinction
return self._build_api_headers(api_key=api_key, app_url=app_url, app_name=app_name)
```
然后在 `fetch` 中:
```python
api_headers = self._build_api_headers(
api_key=api_key, app_url=app_url, app_name=app_name
)
mcp_headers = self._build_mcp_headers(
api_key=api_key, app_url=app_url, app_name=app_name
)
```
4. **在 `fetch` 与 `list_servers` 之间共享分页逻辑**
`/list-servers` 的分页循环目前存在重复。你可以引入一个生成批次结果的生成器并在多个地方复用:
```python
async def _iter_list_servers_pages(
self,
*,
list_url: str,
api_headers: dict[str, str],
limit: int,
max_pages: int,
max_items: int | None = None,
) -> list[dict[str, Any]]:
timeout = aiohttp.ClientTimeout(total=30)
collected: list[dict[str, Any]] = []
async with aiohttp.ClientSession(timeout=timeout) as session:
for page in range(1, max_pages + 1):
async with session.post(
list_url,
json={"page": page, "limit": limit},
headers=api_headers,
) as response:
if response.status != 200:
raise RuntimeError(
f"MCPRouter API request failed: HTTP {response.status}"
)
data = await response.json()
if data.get("code") != 0:
raise RuntimeError(
f"MCPRouter API error: {data.get('message', 'unknown')}"
)
batch = data.get("data", {}).get("servers", []) or []
if not batch:
break
for server in batch:
collected.append(server)
if max_items is not None and len(collected) >= max_items:
return collected
if len(batch) < limit:
break
return collected
```
然后:
- 在 `fetch` 的 list 模式中,调用 `_iter_list_servers_pages(..., max_items=None)`,在合并后的列表上应用 `self._matches` / `self._make_item`,并结合 `max_servers` 限制数量。
- 在 `list_servers` 中,调用 `_iter_list_servers_pages(..., max_items=max_items)`,只应用现有的校验过滤(例如是否有 `server_url` 以及 key/名称等)。
5. **为选择用于校验的 server key 提取一个小辅助方法**
“选择用哪个 server 来进行 API key 校验”的逻辑也可以抽取出来,让 `fetch` 更清晰:
```python
def _pick_validation_server_key(self, provided_servers: Any) -> str:
validation_server_key = "time"
if isinstance(provided_servers, list) and provided_servers:
for item in provided_servers:
if not isinstance(item, dict):
continue
server_key = str(item.get("server_key") or "").strip()
config_name = str(item.get("config_name") or "").strip()
if server_key == "time" or config_name == "time":
return "time"
if server_key:
validation_server_key = server_key
break
return validation_server_key
```
然后 `fetch` 只需要调用 `_pick_validation_server_key(payload.get("servers"))` 即可。
这些修改可以拆解长方法、消除重复的控制流和请求头构造逻辑,同时保持当前行为不变。
</issue_to_address>请帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈改进后续的审查。
Original comment in English
Hey - I've found 1 issue, and left some high level feedback:
- The HTTP error formatting and
_quick_test_mcp_connectionlogic are now duplicated betweenastrbot/core/agent/mcp_client.pyandastrbot/core/provider/func_tool_manager.py; consider extracting a shared helper to avoid drift and keep error handling consistent. McpServerSyncProvider.list_serverscurrently raisesNotImplementedErrorbut is not marked abstract; either make it@abc.abstractmethodor provide a no-op default implementation (e.g., returning an empty list) so that subclasses don’t accidentally inherit a method that will fail at runtime.- In
McpRouterMcpServerSyncProvider.fetch, headers for MCP calls (mcp_headers) are constructed separately from_build_api_headers, leading to duplicated logic; reusing_build_api_headershere would simplify the implementation and reduce the chance of header divergence.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The HTTP error formatting and `_quick_test_mcp_connection` logic are now duplicated between `astrbot/core/agent/mcp_client.py` and `astrbot/core/provider/func_tool_manager.py`; consider extracting a shared helper to avoid drift and keep error handling consistent.
- `McpServerSyncProvider.list_servers` currently raises `NotImplementedError` but is not marked abstract; either make it `@abc.abstractmethod` or provide a no-op default implementation (e.g., returning an empty list) so that subclasses don’t accidentally inherit a method that will fail at runtime.
- In `McpRouterMcpServerSyncProvider.fetch`, headers for MCP calls (`mcp_headers`) are constructed separately from `_build_api_headers`, leading to duplicated logic; reusing `_build_api_headers` here would simplify the implementation and reduce the chance of header divergence.
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/mcp_sync_providers.py" line_range="20" />
<code_context>
+ provider: ClassVar[str]
+
+ @abc.abstractmethod
+ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
+ raise NotImplementedError
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring `McpRouterMcpServerSyncProvider` by extracting helpers for the different `fetch` modes, shared parsing/matching logic, header construction, pagination, and validation key selection to shorten the methods and reduce duplication.
You can keep behavior identical while reducing complexity by:
1. **Splitting the multi‑mode `fetch` into helpers**
`fetch` currently handles three distinct modes and validation. You can keep the top‑level orchestration in `fetch` and move the heavy logic into small private methods:
```python
async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
api_key = self._normalize_api_key(str(payload.get("api_key", "")))
if not api_key:
raise ValueError("Missing required field: api_key")
app_url = str(payload.get("app_url", "")).strip()
app_name = str(payload.get("app_name", "")).strip()
base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip("/")
list_url = f"{base_url}/list-servers"
get_url = f"{base_url}/get-server"
# existing validation logic...
validation_server_key = self._pick_validation_server_key(payload.get("servers"))
await self._validate_api_key(
api_key=api_key,
app_url=app_url,
app_name=app_name,
base_url=base_url,
server_key=validation_server_key,
)
api_headers = self._build_api_headers(
api_key=api_key,
app_url=app_url,
app_name=app_name,
)
mcp_headers = self._build_mcp_headers(api_key=api_key, app_url=app_url, app_name=app_name)
# shared params
query = str(payload.get("query", "")).strip().lower()
raw_max_servers = payload.get("max_servers")
max_servers = max(1, min(int(raw_max_servers or 30), 500))
limit = max(1, min(int(payload.get("limit", 30) or 30), 100))
max_pages = max(1, min(int(payload.get("max_pages", 10) or 10), 50))
provided_servers = payload.get("servers")
server_keys = self._parse_server_keys(payload.get("server_keys"))
if isinstance(provided_servers, list) and provided_servers:
return await self._fetch_from_provided_servers(
provided_servers, max_servers, raw_max_servers, mcp_headers
)
if server_keys:
return await self._fetch_from_server_keys(
server_keys, max_servers, get_url, api_headers, mcp_headers
)
return await self._fetch_from_listing(
list_url, api_headers, mcp_headers, query, max_servers, limit, max_pages
)
```
Then implement the helpers using your existing inner logic, without changing behavior:
```python
async def _fetch_from_provided_servers(
self,
provided_servers: list[Any],
max_servers: int,
raw_max_servers: Any,
mcp_headers: dict[str, str],
) -> list[SyncedMcpServer]:
timeout = aiohttp.ClientTimeout(total=30)
used_names: set[str] = set()
items: list[SyncedMcpServer] = []
selected_servers = (
provided_servers[:max_servers] if raw_max_servers is not None else provided_servers
)
for server in selected_servers:
if not isinstance(server, dict):
continue
server_key = server.get("server_key")
server_name = (
server.get("config_name")
or server_key
or server.get("name")
or server.get("title")
)
server_url = server.get("server_url")
if not server_name or not server_url:
continue
items.append(
self._make_item(
name=str(server_name),
url=str(server_url),
used_names=used_names,
headers=mcp_headers,
server_key=str(server_key) if server_key else None,
)
)
return items
```
(You can similarly move the `/get-server` and `/list-servers` branches into `_fetch_from_server_keys` and `_fetch_from_listing`.)
2. **Lifting nested helpers to methods**
`parse_server_keys`, `matches`, and `make_item` can be private methods. This shortens `fetch` and keeps them reusable:
```python
def _parse_server_keys(self, value: Any) -> list[str]:
if isinstance(value, list):
parts = [str(item).strip() for item in value]
elif isinstance(value, str):
raw = value.replace(",", "\n").replace(";", "\n")
parts = [line.strip() for line in raw.splitlines()]
else:
return []
keys = [item for item in parts if item]
seen: set[str] = set()
result: list[str] = []
for item in keys:
if item in seen:
continue
seen.add(item)
result.append(item)
return result
def _matches(self, server: dict[str, Any], q: str) -> bool:
if not q:
return True
haystacks = [
server.get("config_name"),
server.get("server_key"),
server.get("name"),
server.get("title"),
server.get("description"),
server.get("author_name"),
]
combined = " ".join(str(v) for v in haystacks if v)
return q in combined.lower()
def _make_item(
self,
*,
name: str,
url: str,
used_names: set[str],
headers: dict[str, str],
server_key: str | None = None,
) -> SyncedMcpServer:
final_name = name
if final_name in used_names:
suffix = server_key or "dup"
final_name = f"{final_name}-{suffix}"
i = 2
while final_name in used_names:
final_name = f"{name}-{i}"
i += 1
used_names.add(final_name)
return SyncedMcpServer(
name=final_name,
config={
"url": url,
"transport": "streamable_http",
"headers": headers,
"active": True,
"provider": "mcprouter",
},
)
```
Then replace uses of the nested functions with these methods (`self._parse_server_keys`, `self._matches`, `self._make_item`).
3. **Consolidating header‑building**
`mcp_headers` is currently built manually but is effectively the same as `_build_api_headers`. You can introduce a small wrapper to clarify intent without duplicating logic:
```python
def _build_mcp_headers(
self, *, api_key: str, app_url: str, app_name: str
) -> dict[str, str]:
# currently identical, but keeps semantic distinction
return self._build_api_headers(api_key=api_key, app_url=app_url, app_name=app_name)
```
Then in `fetch`:
```python
api_headers = self._build_api_headers(
api_key=api_key, app_url=app_url, app_name=app_name
)
mcp_headers = self._build_mcp_headers(
api_key=api_key, app_url=app_url, app_name=app_name
)
```
4. **Sharing pagination logic between `fetch` and `list_servers`**
The pagination loop for `/list-servers` is duplicated. You can introduce a generator that yields batches and reuse it:
```python
async def _iter_list_servers_pages(
self,
*,
list_url: str,
api_headers: dict[str, str],
limit: int,
max_pages: int,
max_items: int | None = None,
) -> list[dict[str, Any]]:
timeout = aiohttp.ClientTimeout(total=30)
collected: list[dict[str, Any]] = []
async with aiohttp.ClientSession(timeout=timeout) as session:
for page in range(1, max_pages + 1):
async with session.post(
list_url,
json={"page": page, "limit": limit},
headers=api_headers,
) as response:
if response.status != 200:
raise RuntimeError(
f"MCPRouter API request failed: HTTP {response.status}"
)
data = await response.json()
if data.get("code") != 0:
raise RuntimeError(
f"MCPRouter API error: {data.get('message', 'unknown')}"
)
batch = data.get("data", {}).get("servers", []) or []
if not batch:
break
for server in batch:
collected.append(server)
if max_items is not None and len(collected) >= max_items:
return collected
if len(batch) < limit:
break
return collected
```
Then:
- In `fetch`’s listing mode, call `_iter_list_servers_pages(..., max_items=None)` and apply `self._matches` / `self._make_item` over the combined list with your `max_servers` cut‑off.
- In `list_servers`, call `_iter_list_servers_pages(..., max_items=max_items)` and apply the existing validation filter (`server_url` and key/name presence) only.
5. **Small helper for validation server key selection**
The “pick which server to use for API key validation” logic can also be extracted to clarify `fetch`:
```python
def _pick_validation_server_key(self, provided_servers: Any) -> str:
validation_server_key = "time"
if isinstance(provided_servers, list) and provided_servers:
for item in provided_servers:
if not isinstance(item, dict):
continue
server_key = str(item.get("server_key") or "").strip()
config_name = str(item.get("config_name") or "").strip()
if server_key == "time" or config_name == "time":
return "time"
if server_key:
validation_server_key = server_key
break
return validation_server_key
```
Then `fetch` just calls `_pick_validation_server_key(payload.get("servers"))`.
These changes break up the long method, remove duplicated control flow and header construction, and keep behavior intact.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Code Review
This PR introduces support for syncing MCP servers from the MCPRouter platform, a valuable functional enhancement. However, several security concerns were identified, primarily related to Server-Side Request Forgery (SSRF) and potential Denial of Service (DoS) vectors. The SSRF vulnerability allows an authenticated user to make the server send requests to arbitrary internal or external URLs via the api_base and server_url parameters, while DoS vulnerabilities arise from unbounded memory usage when logging error messages and parsing large JSON responses. Addressing these by implementing URL validation and resource limits is crucial. Furthermore, general code improvements are needed, specifically regarding code duplication and handling of edge cases, to enhance overall robustness and maintainability.
| app_url = str(payload.get("app_url", "")).strip() | ||
| app_name = str(payload.get("app_name", "")).strip() | ||
|
|
||
| base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip( |
There was a problem hiding this comment.
The api_base parameter is taken directly from user input and used to construct URLs for network requests without any validation. This allows an authenticated attacker to perform Server-Side Request Forgery (SSRF) by pointing the bot to internal services or other restricted resources. Additionally, the server_url retrieved from the provider (or provided in the servers payload at line 302) is used to establish MCP connections, which is another SSRF sink. Implement strict validation or an allow-list for these URLs.
| @@ -50,14 +50,52 @@ def _prepare_config(config: dict) -> dict: | |||
|
|
|||
| async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]: | |||
| if event_name in {"stream", "connection"}: | ||
| return | ||
| print(f"MCP Server {name} Error: {msg}") | ||
| self.server_errlogs.append(msg) |
There was a problem hiding this comment.
The server_errlogs list grows indefinitely as error messages are received from the MCP server. A malicious or malfunctioning server could flood the bot with messages, leading to memory exhaustion and a Denial of Service (DoS). Consider using a fixed-size buffer like collections.deque(maxlen=100) to store error logs.
| raise RuntimeError( | ||
| f"ModelScope API request failed: HTTP {response.status}" | ||
| ) | ||
| data = await response.json() |
There was a problem hiding this comment.
Calling await response.json() on responses from external APIs without a size limit can lead to Denial of Service (DoS) via memory exhaustion if the API returns an extremely large payload. This pattern is repeated on lines 149, 328, 364, and 444. Consider reading the response content with a limit before parsing, similar to the implementation in _format_http_error.
| if normalized.startswith("Unknown SSE event:"): | ||
| event_name = normalized.split(":", 1)[1].strip() | ||
| if event_name in {"stream", "connection"}: | ||
| return |
There was a problem hiding this comment.
这段过滤SSE噪音事件的逻辑与文件顶部的 _McpSseNoiseFilter 类中的逻辑重复了。为了提高代码的可维护性和复用性,建议将这部分逻辑提取到一个独立的辅助函数中,供两处调用。
另外,当前的 split 操作没有处理不包含':'的字符串,可能会导致 IndexError。建议增加对 split 结果的长度检查以增强代码的健壮性,并可以简化逻辑。
| if normalized.startswith("Unknown SSE event:"): | |
| event_name = normalized.split(":", 1)[1].strip() | |
| if event_name in {"stream", "connection"}: | |
| return | |
| if normalized.startswith("Unknown SSE event:"): | |
| parts = normalized.split(":", 1) | |
| if len(parts) > 1 and parts[1].strip() in {"stream", "connection"}: | |
| return |
| raw_max_servers = payload.get("max_servers") | ||
| max_servers = int(raw_max_servers or 30) | ||
| max_servers = max(1, min(max_servers, 500)) |
There was a problem hiding this comment.
The logic for parsing max_servers has a potential bug. If the value is 0, the expression 0 or 30 evaluates to 30, preventing the user from setting it to 0. Additionally, if the value is a non-integer string (e.g., 'abc'), int() will raise an unhandled ValueError. A similar issue exists for limit on line 207 and max_pages on line 210.
Using a try-except block is a more robust way to handle type conversion and default values.
| raw_max_servers = payload.get("max_servers") | |
| max_servers = int(raw_max_servers or 30) | |
| max_servers = max(1, min(max_servers, 500)) | |
| try: | |
| max_servers = int(payload.get("max_servers", 30)) | |
| except (ValueError, TypeError): | |
| max_servers = 30 | |
| max_servers = max(1, min(max_servers, 500)) |
| "syncProvider": { | ||
| "buttons": { | ||
| "fetchServers": "获取服务器列表" | ||
| }, | ||
| "status": { | ||
| "selectProvider": "请选择一个 MCP 服务器提供商", | ||
| "enterToken": "请输入访问令牌以继续", | ||
| "enterApiKey": "请输入 API Key 以继续", | ||
| "fetchServersFirst": "请先获取服务器列表", | ||
| "fetchedServers": "已获取服务器 ({count})" | ||
| }, | ||
| "messages": { | ||
| "syncSuccess": "MCP 服务器同步成功!", | ||
| "syncError": "同步失败: {error}", | ||
| "fetchServersSuccess": "已获取 {count} 个服务器", | ||
| "fetchServersError": "获取服务器列表失败: {error}" | ||
| } | ||
| }, |
- validate API key via list-servers instead of fixed server keys - centralize MCPRouter HTTP/JSON error handling for clearer failures - stop sync when mcp config persistence fails - add null-safe request body handling in sync-provider route - move new MCPRouter UI strings to i18n and clean zh-CN duplicate syncProvider block
# Conflicts: # astrbot/core/provider/func_tool_manager.py
Modifications / 改动点
astrbot/core/provider/mcp_sync_providers.py:新增 Provider 抽象/注册表 + mcprouter/modelscope 实现
astrbot/core/provider/func_tool_manager.py:按 provider 同步/列出服务器
astrbot/dashboard/routes/tools.py:新增 MCPRouter list-servers 接口;sync-provider 改为通用 provider 分发
dashboard/src/components/extension/McpServersSection.vue:MCPRouter UI 改为“获取列表+删除筛选+同步”
dashboard/src/i18n/locales/*/features/tool-use.json:补齐新 UI 文案
astrbot/core/agent/mcp_client.py:快速测试报错增强 + mcp 日志过滤
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
添加可插拔的 MCP 服务器同步提供程序框架,并将基于 MCPRouter 的服务器同步和列表功能集成到后端和仪表盘 UI 中。
新功能:
增强:
文档:
Original summary in English
Summary by Sourcery
Add a pluggable MCP server sync provider framework and integrate MCPRouter-based server synchronization and listing into both backend and dashboard UI.
New Features:
Enhancements:
Documentation: