Skip to content

feat: 支持 MCPRouter 平台 MCP 服务器同步#5690

Open
exynos967 wants to merge 6 commits intoAstrBotDevs:masterfrom
exynos967:dev2632
Open

feat: 支持 MCPRouter 平台 MCP 服务器同步#5690
exynos967 wants to merge 6 commits intoAstrBotDevs:masterfrom
exynos967:dev2632

Conversation

@exynos967
Copy link
Contributor

@exynos967 exynos967 commented Mar 2, 2026

Modifications / 改动点

astrbot/core/provider/mcp_sync_providers.py:新增 Provider 抽象/注册表 + mcprouter/modelscope 实现
astrbot/core/provider/func_tool_manager.py:按 provider 同步/列出服务器
astrbot/dashboard/routes/tools.py:新增 MCPRouter list-servers 接口;sync-provider 改为通用 provider 分发
dashboard/src/components/extension/McpServersSection.vue:MCPRouter UI 改为“获取列表+删除筛选+同步”
dashboard/src/i18n/locales/*/features/tool-use.json:补齐新 UI 文案
astrbot/core/agent/mcp_client.py:快速测试报错增强 + mcp 日志过滤

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

image image image

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

添加可插拔的 MCP 服务器同步提供程序框架,并将基于 MCPRouter 的服务器同步和列表功能集成到后端和仪表盘 UI 中。

新功能:

  • 引入通用的 MCP 服务器同步提供程序抽象,以及对应的注册表,并为 ModelScope 和 MCPRouter 提供具体实现。
  • 在后端增加从 MCPRouter 同步 MCP 服务器的支持,包括列出可用服务器,并在并发控制下启用这些服务器。
  • 扩展仪表盘中的 MCP 区域以支持 MCPRouter,使用户可以使用 API Key 和可选的应用身份标头来获取、审查、筛选并同步所选服务器。

增强:

  • 改进 MCP 连接性测试以使用最新的协议版本,并返回更丰富的 HTTP 错误信息,从而更易于排查问题。
  • 优化 MCP 客户端初始化日志,更好地暴露无效 API Key 问题,尤其是在 MCPRouter 场景下,并过滤噪声较大的 SSE 相关日志消息。
  • 在 tools API 中统一基于提供程序的 MCP 同步处理方式,返回结构化的同步结果以及更清晰的面向用户的状态消息。

文档:

  • 更新工具使用 UI 的 i18n 字符串,以覆盖新的 MCPRouter 工作流和相关消息。
Original summary in English

Summary by Sourcery

Add a pluggable MCP server sync provider framework and integrate MCPRouter-based server synchronization and listing into both backend and dashboard UI.

New Features:

  • Introduce a generic MCP server sync provider abstraction with a registry and concrete implementations for ModelScope and MCPRouter.
  • Add backend support to sync MCP servers from MCPRouter, including listing available servers and enabling them with concurrency control.
  • Extend the dashboard MCP section to support MCPRouter, allowing users to fetch, review, filter, and sync selected servers using an API key and optional app identity headers.

Enhancements:

  • Improve MCP connectivity tests to use the latest protocol version and return richer HTTP error details for easier troubleshooting.
  • Refine MCP client initialization logging to better surface invalid API key issues, especially for MCPRouter, and filter noisy SSE-related log messages.
  • Unify provider-based MCP sync handling in the tools API, returning structured sync results and clearer user-facing status messages.

Documentation:

  • Update i18n strings for the tool-use UI to cover the new MCPRouter workflow and messages.

(cherry picked from commit 466580c16d335fcea34c5e98579754c992f9bc6f)
(cherry picked from commit e6f7894)
(cherry picked from commit 3bbf33dd3d8c05210767bb40babca31f5363ae3f)
(cherry picked from commit 30392fb)
(cherry picked from commit 447c221963fdbca66453e9031d6133c010cc4b4b)
(cherry picked from commit e068be8)
@auto-assign auto-assign bot requested review from Soulter and advent259141 March 2, 2026 14:41
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Mar 2, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求引入了一个新的 MCP 服务器同步提供者抽象层,极大地增强了系统管理和集成不同 MCP 服务器平台的能力。通过将同步逻辑模块化,现在可以轻松添加对新平台(如 MCPRouter)的支持,同时改进了现有 ModelScope 平台的同步流程。此外,还对 MCP 客户端的错误处理和日志记录进行了改进,并更新了用户界面以提供更直观的 MCPRouter 服务器同步体验,从而提升了整体的用户体验和系统的可维护性。

Highlights

  • MCP 服务器同步提供者抽象: 新增了 MCP 服务器同步提供者抽象层和注册机制,允许更灵活地集成不同的 MCP 服务器平台。
  • MCPRouter 平台支持: 实现了对 MCPRouter 平台 MCP 服务器的同步功能,包括 API Key 验证、服务器列表获取和配置同步。
  • MCP 客户端错误报告和日志优化: 改进了 MCP 客户端的错误报告机制,提供了更详细的 HTTP 错误信息,并增加了对特定 SSE 事件噪声日志的过滤。
  • 前端 UI 更新: 更新了仪表盘界面,为 MCPRouter 平台提供了专用的服务器列表获取、筛选和同步流程,提升了用户体验。
Changelog
  • astrbot/core/agent/mcp_client.py
    • 添加了 _McpSseNoiseFilter 类以过滤掉特定的 SSE 事件噪声日志
    • 改进了 _quick_test_mcp_connection 函数,以提供更详细的 HTTP 错误信息
    • 更新了协议版本,使用 mcp.types.LATEST_PROTOCOL_VERSION
    • logging_callback 中增加了对 SSE 噪声日志的过滤
  • astrbot/core/provider/func_tool_manager.py
    • 移除了 aiohttp 导入,现在由新的提供者模块处理
    • 引入了 SyncedMcpServerget_mcp_sync_provider
    • 重构了 _quick_test_mcp_connection,使其与 mcp_client.py 中的错误处理逻辑保持一致
    • 更新了协议版本,使用 mcp.types.LATEST_PROTOCOL_VERSION
    • 增强了 MCP 客户端初始化失败时的错误日志,特别是针对 MCPRouter 的无效 API Key 情况
    • sync_modelscope_mcp_servers 替换为通用的 sync_mcp_servers_from_provider 方法
    • 新增了 _enable_mcp_servers_with_concurrency_limit 方法,支持并发启用 MCP 服务器
    • 新增了 list_mcp_servers_from_provider 方法,用于从提供者列出服务器
    • 新增了 sync_mcprouter_mcp_servers 方法,用于同步 MCPRouter 服务器
  • astrbot/core/provider/mcp_sync_providers.py
    • 新增文件,定义了 McpServerSyncProvider 抽象基类和提供者注册机制
    • 实现了 ModelscopeMcpServerSyncProvider,用于处理 ModelScope 平台的 MCP 服务器同步
    • 实现了 McpRouterMcpServerSyncProvider,支持 MCPRouter 平台的 API Key 规范化、请求头构建、API Key 验证以及服务器的获取和列表功能
  • astrbot/dashboard/routes/tools.py
    • 新增了 /tools/mcp/providers/mcprouter/list-servers API 路由,用于获取 MCPRouter 服务器列表
    • 修改了 sync_provider 路由,使其能够根据提供者名称分发同步请求,并支持 MCPRouter
    • 为 MCPRouter 同步请求自动设置 app_urlapp_name 默认值
    • 优化了同步操作的响应消息
  • dashboard/src/components/extension/McpServersSection.vue
    • 为 MCPRouter 平台添加了新的 UI 区域,包括 API Key 输入框、应用 URL 和名称字段
    • 实现了获取 MCPRouter 服务器列表的功能,并展示在 UI 中,支持移除单个服务器
    • 更新了提供者列表,包含了 'mcprouter' 选项
    • 新增了 mcprouterApiKey, mcprouterAppUrl, mcprouterAppName, mcprouterServersLoading, mcprouterServers 等数据属性
    • 在组件挂载时,将 mcprouterAppUrl 初始化为当前页面的来源 URL
    • 修改了 syncMcpServers 方法,以支持 MCPRouter 的特定数据和验证逻辑
    • 新增了 fetchMcpRouterServersremoveMcpRouterServer 方法
    • 添加了 .mcprouter-server-list 的 CSS 样式,用于限制服务器列表的高度并启用滚动
  • dashboard/src/i18n/locales/en-US/features/tool-use.json
    • 新增了与 MCPRouter UI 相关的英文翻译键,包括获取服务器按钮、API Key 提示、服务器列表状态和错误消息
  • dashboard/src/i18n/locales/zh-CN/features/tool-use.json
    • 新增了与 MCPRouter UI 相关的中文翻译键,与英文翻译保持一致
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. area:webui The bug / feature is about webui(dashboard) of astrbot. labels Mar 2, 2026
@dosubot
Copy link

dosubot bot commented Mar 2, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 1 个问题,并留下了一些整体性的反馈:

  • HTTP 错误格式化逻辑和 _quick_test_mcp_connection 现在在 astrbot/core/agent/mcp_client.pyastrbot/core/provider/func_tool_manager.py 中都有一份,存在重复;建议提取成共享的辅助函数,以避免逻辑偏移,并保持错误处理的一致性。
  • McpServerSyncProvider.list_servers 当前会抛出 NotImplementedError,但并没有被标记为抽象方法;建议要么将其标记为 @abc.abstractmethod,要么提供一个无副作用的默认实现(例如返回空列表),以避免子类意外继承一个在运行时会失败的方法。
  • McpRouterMcpServerSyncProvider.fetch 中,MCP 调用使用的请求头(mcp_headers)是独立于 _build_api_headers 构造的,导致逻辑重复;在这里复用 _build_api_headers 会简化实现并降低请求头配置出现差异的风险。
给 AI Agent 的提示
Please address the comments from this code review:

## Overall Comments
- The HTTP error formatting and `_quick_test_mcp_connection` logic are now duplicated between `astrbot/core/agent/mcp_client.py` and `astrbot/core/provider/func_tool_manager.py`; consider extracting a shared helper to avoid drift and keep error handling consistent.
- `McpServerSyncProvider.list_servers` currently raises `NotImplementedError` but is not marked abstract; either make it `@abc.abstractmethod` or provide a no-op default implementation (e.g., returning an empty list) so that subclasses don’t accidentally inherit a method that will fail at runtime.
- In `McpRouterMcpServerSyncProvider.fetch`, headers for MCP calls (`mcp_headers`) are constructed separately from `_build_api_headers`, leading to duplicated logic; reusing `_build_api_headers` here would simplify the implementation and reduce the chance of header divergence.

## Individual Comments

### Comment 1
<location path="astrbot/core/provider/mcp_sync_providers.py" line_range="20" />
<code_context>
+    provider: ClassVar[str]
+
+    @abc.abstractmethod
+    async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
+        raise NotImplementedError
+
</code_context>
<issue_to_address>
**issue (complexity):** 考虑对 `McpRouterMcpServerSyncProvider` 进行重构,将不同的 `fetch` 模式、共享的解析/匹配逻辑、请求头构造、分页以及校验用 key 的选择拆分到各自的辅助函数中,以缩短方法长度并减少重复。

你可以在保持行为完全一致的前提下降低复杂度,方式包括:

1. **将多模式的 `fetch` 拆分为多个辅助方法**

`fetch` 目前同时处理三种不同模式和校验逻辑。你可以把顶层的流程控制保留在 `fetch` 中,将复杂逻辑移动到一些较小的私有方法里:

```python
async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
    api_key = self._normalize_api_key(str(payload.get("api_key", "")))
    if not api_key:
        raise ValueError("Missing required field: api_key")

    app_url = str(payload.get("app_url", "")).strip()
    app_name = str(payload.get("app_name", "")).strip()

    base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip("/")
    list_url = f"{base_url}/list-servers"
    get_url = f"{base_url}/get-server"

    # existing validation logic...
    validation_server_key = self._pick_validation_server_key(payload.get("servers"))
    await self._validate_api_key(
        api_key=api_key,
        app_url=app_url,
        app_name=app_name,
        base_url=base_url,
        server_key=validation_server_key,
    )

    api_headers = self._build_api_headers(
        api_key=api_key,
        app_url=app_url,
        app_name=app_name,
    )
    mcp_headers = self._build_mcp_headers(api_key=api_key, app_url=app_url, app_name=app_name)

    # shared params
    query = str(payload.get("query", "")).strip().lower()
    raw_max_servers = payload.get("max_servers")
    max_servers = max(1, min(int(raw_max_servers or 30), 500))
    limit = max(1, min(int(payload.get("limit", 30) or 30), 100))
    max_pages = max(1, min(int(payload.get("max_pages", 10) or 10), 50))

    provided_servers = payload.get("servers")
    server_keys = self._parse_server_keys(payload.get("server_keys"))

    if isinstance(provided_servers, list) and provided_servers:
        return await self._fetch_from_provided_servers(
            provided_servers, max_servers, raw_max_servers, mcp_headers
        )

    if server_keys:
        return await self._fetch_from_server_keys(
            server_keys, max_servers, get_url, api_headers, mcp_headers
        )

    return await self._fetch_from_listing(
        list_url, api_headers, mcp_headers, query, max_servers, limit, max_pages
    )
```

然后基于你现有的内部逻辑实现这些辅助方法,而不改变行为:

```python
async def _fetch_from_provided_servers(
    self,
    provided_servers: list[Any],
    max_servers: int,
    raw_max_servers: Any,
    mcp_headers: dict[str, str],
) -> list[SyncedMcpServer]:
    timeout = aiohttp.ClientTimeout(total=30)
    used_names: set[str] = set()
    items: list[SyncedMcpServer] = []

    selected_servers = (
        provided_servers[:max_servers] if raw_max_servers is not None else provided_servers
    )

    for server in selected_servers:
        if not isinstance(server, dict):
            continue
        server_key = server.get("server_key")
        server_name = (
            server.get("config_name")
            or server_key
            or server.get("name")
            or server.get("title")
        )
        server_url = server.get("server_url")
        if not server_name or not server_url:
            continue
        items.append(
            self._make_item(
                name=str(server_name),
                url=str(server_url),
                used_names=used_names,
                headers=mcp_headers,
                server_key=str(server_key) if server_key else None,
            )
        )
    return items
```

(你也可以类似地将 `/get-server``/list-servers` 分支分别移动到 `_fetch_from_server_keys``_fetch_from_listing` 中。)

2. **将嵌套的辅助函数提升为方法**

`parse_server_keys``matches``make_item` 可以变成私有方法。这会让 `fetch` 更短、更清晰,同时保持可重用性:

```python
def _parse_server_keys(self, value: Any) -> list[str]:
    if isinstance(value, list):
        parts = [str(item).strip() for item in value]
    elif isinstance(value, str):
        raw = value.replace(",", "\n").replace(";", "\n")
        parts = [line.strip() for line in raw.splitlines()]
    else:
        return []
    keys = [item for item in parts if item]
    seen: set[str] = set()
    result: list[str] = []
    for item in keys:
        if item in seen:
            continue
        seen.add(item)
        result.append(item)
    return result

def _matches(self, server: dict[str, Any], q: str) -> bool:
    if not q:
        return True
    haystacks = [
        server.get("config_name"),
        server.get("server_key"),
        server.get("name"),
        server.get("title"),
        server.get("description"),
        server.get("author_name"),
    ]
    combined = " ".join(str(v) for v in haystacks if v)
    return q in combined.lower()

def _make_item(
    self,
    *,
    name: str,
    url: str,
    used_names: set[str],
    headers: dict[str, str],
    server_key: str | None = None,
) -> SyncedMcpServer:
    final_name = name
    if final_name in used_names:
        suffix = server_key or "dup"
        final_name = f"{final_name}-{suffix}"
    i = 2
    while final_name in used_names:
        final_name = f"{name}-{i}"
        i += 1
    used_names.add(final_name)
    return SyncedMcpServer(
        name=final_name,
        config={
            "url": url,
            "transport": "streamable_http",
            "headers": headers,
            "active": True,
            "provider": "mcprouter",
        },
    )
```

然后将原来对嵌套函数的调用替换为方法调用(`self._parse_server_keys``self._matches``self._make_item`)。

3. **统一构造请求头的逻辑**

`mcp_headers` 目前是手动构造的,但本质上与 `_build_api_headers` 相同。你可以引入一个小的封装函数,在不重复逻辑的前提下明确语义:

```python
def _build_mcp_headers(
    self, *, api_key: str, app_url: str, app_name: str
) -> dict[str, str]:
    # currently identical, but keeps semantic distinction
    return self._build_api_headers(api_key=api_key, app_url=app_url, app_name=app_name)
```

然后在 `fetch` 中:

```python
api_headers = self._build_api_headers(
    api_key=api_key, app_url=app_url, app_name=app_name
)
mcp_headers = self._build_mcp_headers(
    api_key=api_key, app_url=app_url, app_name=app_name
)
```

4. **`fetch``list_servers` 之间共享分页逻辑**

`/list-servers` 的分页循环目前存在重复。你可以引入一个生成批次结果的生成器并在多个地方复用:

```python
async def _iter_list_servers_pages(
    self,
    *,
    list_url: str,
    api_headers: dict[str, str],
    limit: int,
    max_pages: int,
    max_items: int | None = None,
) -> list[dict[str, Any]]:
    timeout = aiohttp.ClientTimeout(total=30)
    collected: list[dict[str, Any]] = []
    async with aiohttp.ClientSession(timeout=timeout) as session:
        for page in range(1, max_pages + 1):
            async with session.post(
                list_url,
                json={"page": page, "limit": limit},
                headers=api_headers,
            ) as response:
                if response.status != 200:
                    raise RuntimeError(
                        f"MCPRouter API request failed: HTTP {response.status}"
                    )
                data = await response.json()
                if data.get("code") != 0:
                    raise RuntimeError(
                        f"MCPRouter API error: {data.get('message', 'unknown')}"
                    )
                batch = data.get("data", {}).get("servers", []) or []
                if not batch:
                    break
                for server in batch:
                    collected.append(server)
                    if max_items is not None and len(collected) >= max_items:
                        return collected
                if len(batch) < limit:
                    break
    return collected
```

然后:

-`fetch` 的 list 模式中,调用 `_iter_list_servers_pages(..., max_items=None)`,在合并后的列表上应用 `self._matches` / `self._make_item`,并结合 `max_servers` 限制数量。
-`list_servers` 中,调用 `_iter_list_servers_pages(..., max_items=max_items)`,只应用现有的校验过滤(例如是否有 `server_url` 以及 key/名称等)。

5. **为选择用于校验的 server key 提取一个小辅助方法**

“选择用哪个 server 来进行 API key 校验”的逻辑也可以抽取出来,让 `fetch` 更清晰:

```python
def _pick_validation_server_key(self, provided_servers: Any) -> str:
    validation_server_key = "time"
    if isinstance(provided_servers, list) and provided_servers:
        for item in provided_servers:
            if not isinstance(item, dict):
                continue
            server_key = str(item.get("server_key") or "").strip()
            config_name = str(item.get("config_name") or "").strip()
            if server_key == "time" or config_name == "time":
                return "time"
            if server_key:
                validation_server_key = server_key
                break
    return validation_server_key
```

然后 `fetch` 只需要调用 `_pick_validation_server_key(payload.get("servers"))` 即可。

这些修改可以拆解长方法、消除重复的控制流和请求头构造逻辑,同时保持当前行为不变。
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得我们的代码审查有帮助,欢迎分享 ✨
请帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈改进后续的审查。
Original comment in English

Hey - I've found 1 issue, and left some high level feedback:

  • The HTTP error formatting and _quick_test_mcp_connection logic are now duplicated between astrbot/core/agent/mcp_client.py and astrbot/core/provider/func_tool_manager.py; consider extracting a shared helper to avoid drift and keep error handling consistent.
  • McpServerSyncProvider.list_servers currently raises NotImplementedError but is not marked abstract; either make it @abc.abstractmethod or provide a no-op default implementation (e.g., returning an empty list) so that subclasses don’t accidentally inherit a method that will fail at runtime.
  • In McpRouterMcpServerSyncProvider.fetch, headers for MCP calls (mcp_headers) are constructed separately from _build_api_headers, leading to duplicated logic; reusing _build_api_headers here would simplify the implementation and reduce the chance of header divergence.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The HTTP error formatting and `_quick_test_mcp_connection` logic are now duplicated between `astrbot/core/agent/mcp_client.py` and `astrbot/core/provider/func_tool_manager.py`; consider extracting a shared helper to avoid drift and keep error handling consistent.
- `McpServerSyncProvider.list_servers` currently raises `NotImplementedError` but is not marked abstract; either make it `@abc.abstractmethod` or provide a no-op default implementation (e.g., returning an empty list) so that subclasses don’t accidentally inherit a method that will fail at runtime.
- In `McpRouterMcpServerSyncProvider.fetch`, headers for MCP calls (`mcp_headers`) are constructed separately from `_build_api_headers`, leading to duplicated logic; reusing `_build_api_headers` here would simplify the implementation and reduce the chance of header divergence.

## Individual Comments

### Comment 1
<location path="astrbot/core/provider/mcp_sync_providers.py" line_range="20" />
<code_context>
+    provider: ClassVar[str]
+
+    @abc.abstractmethod
+    async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
+        raise NotImplementedError
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring `McpRouterMcpServerSyncProvider` by extracting helpers for the different `fetch` modes, shared parsing/matching logic, header construction, pagination, and validation key selection to shorten the methods and reduce duplication.

You can keep behavior identical while reducing complexity by:

1. **Splitting the multi‑mode `fetch` into helpers**

`fetch` currently handles three distinct modes and validation. You can keep the top‑level orchestration in `fetch` and move the heavy logic into small private methods:

```python
async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
    api_key = self._normalize_api_key(str(payload.get("api_key", "")))
    if not api_key:
        raise ValueError("Missing required field: api_key")

    app_url = str(payload.get("app_url", "")).strip()
    app_name = str(payload.get("app_name", "")).strip()

    base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip("/")
    list_url = f"{base_url}/list-servers"
    get_url = f"{base_url}/get-server"

    # existing validation logic...
    validation_server_key = self._pick_validation_server_key(payload.get("servers"))
    await self._validate_api_key(
        api_key=api_key,
        app_url=app_url,
        app_name=app_name,
        base_url=base_url,
        server_key=validation_server_key,
    )

    api_headers = self._build_api_headers(
        api_key=api_key,
        app_url=app_url,
        app_name=app_name,
    )
    mcp_headers = self._build_mcp_headers(api_key=api_key, app_url=app_url, app_name=app_name)

    # shared params
    query = str(payload.get("query", "")).strip().lower()
    raw_max_servers = payload.get("max_servers")
    max_servers = max(1, min(int(raw_max_servers or 30), 500))
    limit = max(1, min(int(payload.get("limit", 30) or 30), 100))
    max_pages = max(1, min(int(payload.get("max_pages", 10) or 10), 50))

    provided_servers = payload.get("servers")
    server_keys = self._parse_server_keys(payload.get("server_keys"))

    if isinstance(provided_servers, list) and provided_servers:
        return await self._fetch_from_provided_servers(
            provided_servers, max_servers, raw_max_servers, mcp_headers
        )

    if server_keys:
        return await self._fetch_from_server_keys(
            server_keys, max_servers, get_url, api_headers, mcp_headers
        )

    return await self._fetch_from_listing(
        list_url, api_headers, mcp_headers, query, max_servers, limit, max_pages
    )
```

Then implement the helpers using your existing inner logic, without changing behavior:

```python
async def _fetch_from_provided_servers(
    self,
    provided_servers: list[Any],
    max_servers: int,
    raw_max_servers: Any,
    mcp_headers: dict[str, str],
) -> list[SyncedMcpServer]:
    timeout = aiohttp.ClientTimeout(total=30)
    used_names: set[str] = set()
    items: list[SyncedMcpServer] = []

    selected_servers = (
        provided_servers[:max_servers] if raw_max_servers is not None else provided_servers
    )

    for server in selected_servers:
        if not isinstance(server, dict):
            continue
        server_key = server.get("server_key")
        server_name = (
            server.get("config_name")
            or server_key
            or server.get("name")
            or server.get("title")
        )
        server_url = server.get("server_url")
        if not server_name or not server_url:
            continue
        items.append(
            self._make_item(
                name=str(server_name),
                url=str(server_url),
                used_names=used_names,
                headers=mcp_headers,
                server_key=str(server_key) if server_key else None,
            )
        )
    return items
```

(You can similarly move the `/get-server` and `/list-servers` branches into `_fetch_from_server_keys` and `_fetch_from_listing`.)

2. **Lifting nested helpers to methods**

`parse_server_keys`, `matches`, and `make_item` can be private methods. This shortens `fetch` and keeps them reusable:

```python
def _parse_server_keys(self, value: Any) -> list[str]:
    if isinstance(value, list):
        parts = [str(item).strip() for item in value]
    elif isinstance(value, str):
        raw = value.replace(",", "\n").replace(";", "\n")
        parts = [line.strip() for line in raw.splitlines()]
    else:
        return []
    keys = [item for item in parts if item]
    seen: set[str] = set()
    result: list[str] = []
    for item in keys:
        if item in seen:
            continue
        seen.add(item)
        result.append(item)
    return result

def _matches(self, server: dict[str, Any], q: str) -> bool:
    if not q:
        return True
    haystacks = [
        server.get("config_name"),
        server.get("server_key"),
        server.get("name"),
        server.get("title"),
        server.get("description"),
        server.get("author_name"),
    ]
    combined = " ".join(str(v) for v in haystacks if v)
    return q in combined.lower()

def _make_item(
    self,
    *,
    name: str,
    url: str,
    used_names: set[str],
    headers: dict[str, str],
    server_key: str | None = None,
) -> SyncedMcpServer:
    final_name = name
    if final_name in used_names:
        suffix = server_key or "dup"
        final_name = f"{final_name}-{suffix}"
    i = 2
    while final_name in used_names:
        final_name = f"{name}-{i}"
        i += 1
    used_names.add(final_name)
    return SyncedMcpServer(
        name=final_name,
        config={
            "url": url,
            "transport": "streamable_http",
            "headers": headers,
            "active": True,
            "provider": "mcprouter",
        },
    )
```

Then replace uses of the nested functions with these methods (`self._parse_server_keys`, `self._matches`, `self._make_item`).

3. **Consolidating header‑building**

`mcp_headers` is currently built manually but is effectively the same as `_build_api_headers`. You can introduce a small wrapper to clarify intent without duplicating logic:

```python
def _build_mcp_headers(
    self, *, api_key: str, app_url: str, app_name: str
) -> dict[str, str]:
    # currently identical, but keeps semantic distinction
    return self._build_api_headers(api_key=api_key, app_url=app_url, app_name=app_name)
```

Then in `fetch`:

```python
api_headers = self._build_api_headers(
    api_key=api_key, app_url=app_url, app_name=app_name
)
mcp_headers = self._build_mcp_headers(
    api_key=api_key, app_url=app_url, app_name=app_name
)
```

4. **Sharing pagination logic between `fetch` and `list_servers`**

The pagination loop for `/list-servers` is duplicated. You can introduce a generator that yields batches and reuse it:

```python
async def _iter_list_servers_pages(
    self,
    *,
    list_url: str,
    api_headers: dict[str, str],
    limit: int,
    max_pages: int,
    max_items: int | None = None,
) -> list[dict[str, Any]]:
    timeout = aiohttp.ClientTimeout(total=30)
    collected: list[dict[str, Any]] = []
    async with aiohttp.ClientSession(timeout=timeout) as session:
        for page in range(1, max_pages + 1):
            async with session.post(
                list_url,
                json={"page": page, "limit": limit},
                headers=api_headers,
            ) as response:
                if response.status != 200:
                    raise RuntimeError(
                        f"MCPRouter API request failed: HTTP {response.status}"
                    )
                data = await response.json()
                if data.get("code") != 0:
                    raise RuntimeError(
                        f"MCPRouter API error: {data.get('message', 'unknown')}"
                    )
                batch = data.get("data", {}).get("servers", []) or []
                if not batch:
                    break
                for server in batch:
                    collected.append(server)
                    if max_items is not None and len(collected) >= max_items:
                        return collected
                if len(batch) < limit:
                    break
    return collected
```

Then:

- In `fetch`’s listing mode, call `_iter_list_servers_pages(..., max_items=None)` and apply `self._matches` / `self._make_item` over the combined list with your `max_servers` cut‑off.
- In `list_servers`, call `_iter_list_servers_pages(..., max_items=max_items)` and apply the existing validation filter (`server_url` and key/name presence) only.

5. **Small helper for validation server key selection**

The “pick which server to use for API key validation” logic can also be extracted to clarify `fetch`:

```python
def _pick_validation_server_key(self, provided_servers: Any) -> str:
    validation_server_key = "time"
    if isinstance(provided_servers, list) and provided_servers:
        for item in provided_servers:
            if not isinstance(item, dict):
                continue
            server_key = str(item.get("server_key") or "").strip()
            config_name = str(item.get("config_name") or "").strip()
            if server_key == "time" or config_name == "time":
                return "time"
            if server_key:
                validation_server_key = server_key
                break
    return validation_server_key
```

Then `fetch` just calls `_pick_validation_server_key(payload.get("servers"))`.

These changes break up the long method, remove duplicated control flow and header construction, and keep behavior intact.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR introduces support for syncing MCP servers from the MCPRouter platform, a valuable functional enhancement. However, several security concerns were identified, primarily related to Server-Side Request Forgery (SSRF) and potential Denial of Service (DoS) vectors. The SSRF vulnerability allows an authenticated user to make the server send requests to arbitrary internal or external URLs via the api_base and server_url parameters, while DoS vulnerabilities arise from unbounded memory usage when logging error messages and parsing large JSON responses. Addressing these by implementing URL validation and resource limits is crucial. Furthermore, general code improvements are needed, specifically regarding code duplication and handling of edge cases, to enhance overall robustness and maintainability.

app_url = str(payload.get("app_url", "")).strip()
app_name = str(payload.get("app_name", "")).strip()

base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The api_base parameter is taken directly from user input and used to construct URLs for network requests without any validation. This allows an authenticated attacker to perform Server-Side Request Forgery (SSRF) by pointing the bot to internal services or other restricted resources. Additionally, the server_url retrieved from the provider (or provided in the servers payload at line 302) is used to establish MCP connections, which is another SSRF sink. Implement strict validation or an allow-list for these URLs.

@@ -50,14 +50,52 @@ def _prepare_config(config: dict) -> dict:

async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

这个函数 _quick_test_mcp_connectionastrbot/core/agent/mcp_client.py 中的同名函数几乎完全重复。代码重复会增加维护成本,当需要修改连接测试逻辑时,必须在两个地方同步修改,容易出错。

建议将此函数移至一个公共模块,或者在 mcp_client.py 中将其设为公共函数(例如,移除前导下划线 _),然后在这里导入并使用,以消除重复代码。

if event_name in {"stream", "connection"}:
return
print(f"MCP Server {name} Error: {msg}")
self.server_errlogs.append(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The server_errlogs list grows indefinitely as error messages are received from the MCP server. A malicious or malfunctioning server could flood the bot with messages, leading to memory exhaustion and a Denial of Service (DoS). Consider using a fixed-size buffer like collections.deque(maxlen=100) to store error logs.

raise RuntimeError(
f"ModelScope API request failed: HTTP {response.status}"
)
data = await response.json()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Calling await response.json() on responses from external APIs without a size limit can lead to Denial of Service (DoS) via memory exhaustion if the API returns an extremely large payload. This pattern is repeated on lines 149, 328, 364, and 444. Consider reading the response content with a limit before parsing, similar to the implementation in _format_http_error.

Comment on lines +210 to +213
if normalized.startswith("Unknown SSE event:"):
event_name = normalized.split(":", 1)[1].strip()
if event_name in {"stream", "connection"}:
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

这段过滤SSE噪音事件的逻辑与文件顶部的 _McpSseNoiseFilter 类中的逻辑重复了。为了提高代码的可维护性和复用性,建议将这部分逻辑提取到一个独立的辅助函数中,供两处调用。

另外,当前的 split 操作没有处理不包含':'的字符串,可能会导致 IndexError。建议增加对 split 结果的长度检查以增强代码的健壮性,并可以简化逻辑。

Suggested change
if normalized.startswith("Unknown SSE event:"):
event_name = normalized.split(":", 1)[1].strip()
if event_name in {"stream", "connection"}:
return
if normalized.startswith("Unknown SSE event:"):
parts = normalized.split(":", 1)
if len(parts) > 1 and parts[1].strip() in {"stream", "connection"}:
return

Comment on lines +203 to +205
raw_max_servers = payload.get("max_servers")
max_servers = int(raw_max_servers or 30)
max_servers = max(1, min(max_servers, 500))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for parsing max_servers has a potential bug. If the value is 0, the expression 0 or 30 evaluates to 30, preventing the user from setting it to 0. Additionally, if the value is a non-integer string (e.g., 'abc'), int() will raise an unhandled ValueError. A similar issue exists for limit on line 207 and max_pages on line 210.

Using a try-except block is a more robust way to handle type conversion and default values.

Suggested change
raw_max_servers = payload.get("max_servers")
max_servers = int(raw_max_servers or 30)
max_servers = max(1, min(max_servers, 500))
try:
max_servers = int(payload.get("max_servers", 30))
except (ValueError, TypeError):
max_servers = 30
max_servers = max(1, min(max_servers, 500))

Comment on lines +138 to +155
"syncProvider": {
"buttons": {
"fetchServers": "获取服务器列表"
},
"status": {
"selectProvider": "请选择一个 MCP 服务器提供商",
"enterToken": "请输入访问令牌以继续",
"enterApiKey": "请输入 API Key 以继续",
"fetchServersFirst": "请先获取服务器列表",
"fetchedServers": "已获取服务器 ({count})"
},
"messages": {
"syncSuccess": "MCP 服务器同步成功!",
"syncError": "同步失败: {error}",
"fetchServersSuccess": "已获取 {count} 个服务器",
"fetchServersError": "获取服务器列表失败: {error}"
}
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

此更改在根级别添加了 syncProvider 键,但 dialogs 内部已经存在一个 syncProvider 键。这会导致翻译字符串结构不一致,一些键在 dialogs.syncProvider 下,而另一些在根 syncProvider 下。

为了保持与 en-US 文件的一致性并提高可维护性,建议将 dialogs.syncProvider 中的所有内容移动到根级别的 syncProvider 对象中,并删除 dialogs.syncProvider

- validate API key via list-servers instead of fixed server keys

- centralize MCPRouter HTTP/JSON error handling for clearer failures

- stop sync when mcp config persistence fails

- add null-safe request body handling in sync-provider route

- move new MCPRouter UI strings to i18n and clean zh-CN duplicate syncProvider block
# Conflicts:
#	astrbot/core/provider/func_tool_manager.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. area:webui The bug / feature is about webui(dashboard) of astrbot. size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant