From cb78531367419cbb64fd5e0dc8537df3e6d709b0 Mon Sep 17 00:00:00 2001
From: xiaoxi68 <3520824673@qq.com>
Date: Mon, 2 Mar 2026 21:30:18 +0800
Subject: [PATCH 1/5] feat(mcp): add sync providers and mcprouter support
(cherry picked from commit 466580c16d335fcea34c5e98579754c992f9bc6f)
(cherry picked from commit e6f78948c4e042b0a6b4c13e9f8be69961434023)
---
astrbot/core/provider/func_tool_manager.py | 175 +++++---
astrbot/core/provider/mcp_sync_providers.py | 466 ++++++++++++++++++++
astrbot/dashboard/routes/tools.py | 78 +++-
3 files changed, 646 insertions(+), 73 deletions(-)
create mode 100644 astrbot/core/provider/mcp_sync_providers.py
diff --git a/astrbot/core/provider/func_tool_manager.py b/astrbot/core/provider/func_tool_manager.py
index 106b42cc5..ddcb0755a 100644
--- a/astrbot/core/provider/func_tool_manager.py
+++ b/astrbot/core/provider/func_tool_manager.py
@@ -7,14 +7,14 @@
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Any
-import aiohttp
-
from astrbot import logger
from astrbot.core import sp
from astrbot.core.agent.mcp_client import MCPClient, MCPTool
from astrbot.core.agent.tool import FunctionTool, ToolSet
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
+from .mcp_sync_providers import SyncedMcpServer, get_mcp_sync_provider
+
DEFAULT_MCP_CONFIG = {"mcpServers": {}}
SUPPORTED_TYPES = [
@@ -509,71 +509,116 @@ def save_mcp_config(self, config: dict) -> bool:
logger.error(f"保存 MCP 配置失败: {e}")
return False
- async def sync_modelscope_mcp_servers(self, access_token: str) -> None:
- """从 ModelScope 平台同步 MCP 服务器配置"""
- base_url = "https://www.modelscope.cn/openapi/v1"
- url = f"{base_url}/mcp/servers/operational"
- headers = {
- "Authorization": f"Bearer {access_token.strip()}",
- "Content-Type": "application/json",
+ async def _enable_mcp_servers_with_concurrency_limit(
+ self,
+ server_names: list[str],
+ config: dict,
+ *,
+ max_concurrency: int = 5,
+ timeout: int = 30,
+ ) -> tuple[int, dict[str, str]]:
+ sem = asyncio.Semaphore(max_concurrency)
+ failures: dict[str, str] = {}
+
+ async def _enable_one(name: str) -> bool:
+ async with sem:
+ try:
+ if name in self.mcp_client_dict:
+ await self.disable_mcp_server(name, timeout=10)
+ await self.enable_mcp_server(
+ name=name,
+ config=config["mcpServers"][name],
+ timeout=timeout,
+ )
+ return True
+ except Exception as e:
+ failures[name] = str(e)
+ logger.warning(f"启用 MCP 服务器失败: {name}, err={e!s}")
+ return False
+
+ results = await asyncio.gather(*[_enable_one(n) for n in server_names])
+ enabled_count = sum(1 for ok in results if ok)
+ return enabled_count, failures
+
+ async def sync_mcp_servers_from_provider(
+ self,
+ provider_name: str,
+ payload: dict[str, Any],
+ *,
+ max_concurrency: int = 5,
+ ) -> dict[str, Any]:
+ provider = get_mcp_sync_provider(provider_name)
+ servers: list[SyncedMcpServer] = await provider.fetch(payload)
+ if not servers:
+ return {
+ "provider": provider_name,
+ "synced": 0,
+ "enabled": 0,
+ "failed": 0,
+ "failed_servers": [],
+ }
+
+ local_mcp_config = self.load_mcp_config()
+ local_mcp_config.setdefault("mcpServers", {})
+
+ for item in servers:
+ local_mcp_config["mcpServers"][item.name] = item.config
+
+ self.save_mcp_config(local_mcp_config)
+
+ enabled_count, failures = await self._enable_mcp_servers_with_concurrency_limit(
+ [item.name for item in servers],
+ local_mcp_config,
+ max_concurrency=max_concurrency,
+ )
+
+ return {
+ "provider": provider_name,
+ "synced": len(servers),
+ "enabled": enabled_count,
+ "failed": len(failures),
+ "failed_servers": sorted(failures.keys()),
}
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url, headers=headers) as response:
- if response.status == 200:
- data = await response.json()
- mcp_server_list = data.get("data", {}).get(
- "mcp_server_list",
- [],
- )
- local_mcp_config = self.load_mcp_config()
-
- synced_count = 0
- for server in mcp_server_list:
- server_name = server["name"]
- operational_urls = server.get("operational_urls", [])
- if not operational_urls:
- continue
- url_info = operational_urls[0]
- server_url = url_info.get("url")
- if not server_url:
- continue
- # 添加到配置中(同名会覆盖)
- local_mcp_config["mcpServers"][server_name] = {
- "url": server_url,
- "transport": "sse",
- "active": True,
- "provider": "modelscope",
- }
- synced_count += 1
-
- if synced_count > 0:
- self.save_mcp_config(local_mcp_config)
- tasks = []
- for server in mcp_server_list:
- name = server["name"]
- tasks.append(
- self.enable_mcp_server(
- name=name,
- config=local_mcp_config["mcpServers"][name],
- ),
- )
- await asyncio.gather(*tasks)
- logger.info(
- f"从 ModelScope 同步了 {synced_count} 个 MCP 服务器",
- )
- else:
- logger.warning("没有找到可用的 ModelScope MCP 服务器")
- else:
- raise Exception(
- f"ModelScope API 请求失败: HTTP {response.status}",
- )
-
- except aiohttp.ClientError as e:
- raise Exception(f"网络连接错误: {e!s}")
- except Exception as e:
- raise Exception(f"同步 ModelScope MCP 服务器时发生错误: {e!s}")
+ async def list_mcp_servers_from_provider(
+ self,
+ provider_name: str,
+ payload: dict[str, Any],
+ ) -> list[dict[str, Any]]:
+ provider = get_mcp_sync_provider(provider_name)
+ return await provider.list_servers(payload)
+
+ async def sync_modelscope_mcp_servers(self, access_token: str) -> None:
+ await self.sync_mcp_servers_from_provider(
+ "modelscope",
+ {"access_token": access_token},
+ )
+
+ async def sync_mcprouter_mcp_servers(
+ self,
+ api_key: str,
+ *,
+ app_url: str = "",
+ app_name: str = "AstrBot",
+ api_base: str = "https://api.mcprouter.to/v1",
+ limit: int = 100,
+ max_servers: int = 30,
+ query: str = "",
+ server_keys: str | list[str] | None = None,
+ ) -> dict[str, Any]:
+ return await self.sync_mcp_servers_from_provider(
+ "mcprouter",
+ {
+ "api_key": api_key,
+ "app_url": app_url,
+ "app_name": app_name,
+ "api_base": api_base,
+ "limit": limit,
+ "max_servers": max_servers,
+ "query": query,
+ "server_keys": server_keys,
+ },
+ )
def __str__(self) -> str:
return str(self.func_list)
diff --git a/astrbot/core/provider/mcp_sync_providers.py b/astrbot/core/provider/mcp_sync_providers.py
new file mode 100644
index 000000000..e40818d62
--- /dev/null
+++ b/astrbot/core/provider/mcp_sync_providers.py
@@ -0,0 +1,466 @@
+from __future__ import annotations
+
+import abc
+from dataclasses import dataclass
+from typing import Any, ClassVar
+
+import aiohttp
+
+
+@dataclass(frozen=True, slots=True)
+class SyncedMcpServer:
+ name: str
+ config: dict[str, Any]
+
+
+class McpServerSyncProvider(abc.ABC):
+ provider: ClassVar[str]
+
+ @abc.abstractmethod
+ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
+ raise NotImplementedError
+
+ async def list_servers(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
+ raise NotImplementedError
+
+
+_provider_registry: dict[str, type[McpServerSyncProvider]] = {}
+
+
+def register_mcp_sync_provider(provider: str):
+ def decorator(cls: type[McpServerSyncProvider]) -> type[McpServerSyncProvider]:
+ if provider in _provider_registry:
+ raise ValueError(f"MCP sync provider already registered: {provider}")
+ cls.provider = provider # type: ignore[attr-defined]
+ _provider_registry[provider] = cls
+ return cls
+
+ return decorator
+
+
+def get_mcp_sync_provider(provider: str) -> McpServerSyncProvider:
+ cls = _provider_registry.get(provider)
+ if not cls:
+ raise ValueError(f"Unknown MCP sync provider: {provider}")
+ return cls()
+
+
+@register_mcp_sync_provider("modelscope")
+class ModelscopeMcpServerSyncProvider(McpServerSyncProvider):
+ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
+ access_token = str(payload.get("access_token", "")).strip()
+ if not access_token:
+ raise ValueError("Missing required field: access_token")
+
+ base_url = "https://www.modelscope.cn/openapi/v1"
+ url = f"{base_url}/mcp/servers/operational"
+ headers = {
+ "Authorization": f"Bearer {access_token}",
+ "Content-Type": "application/json",
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, headers=headers) as response:
+ if response.status != 200:
+ raise RuntimeError(
+ f"ModelScope API request failed: HTTP {response.status}"
+ )
+ data = await response.json()
+
+ mcp_server_list = data.get("data", {}).get("mcp_server_list", []) or []
+ items: list[SyncedMcpServer] = []
+ for server in mcp_server_list:
+ server_name = server.get("name")
+ operational_urls = server.get("operational_urls") or []
+ if not server_name or not operational_urls:
+ continue
+ server_url = (operational_urls[0] or {}).get("url")
+ if not server_url:
+ continue
+ items.append(
+ SyncedMcpServer(
+ name=server_name,
+ config={
+ "url": server_url,
+ "transport": "sse",
+ "active": True,
+ "provider": "modelscope",
+ },
+ )
+ )
+ return items
+
+
+@register_mcp_sync_provider("mcprouter")
+class McpRouterMcpServerSyncProvider(McpServerSyncProvider):
+ def _normalize_api_key(self, value: str) -> str:
+ raw = value.strip()
+ if not raw:
+ return raw
+ lower = raw.lower()
+ if lower.startswith("bearer "):
+ return raw[7:].strip()
+ if lower.startswith("authorization:"):
+ after = raw.split(":", 1)[1].strip()
+ if after.lower().startswith("bearer "):
+ return after[7:].strip()
+ return after
+ return raw
+
+ def _build_api_headers(
+ self,
+ *,
+ api_key: str,
+ app_url: str,
+ app_name: str,
+ ) -> dict[str, str]:
+ headers: dict[str, str] = {
+ "Authorization": f"Bearer {api_key}",
+ "Content-Type": "application/json",
+ }
+ if app_url:
+ headers["HTTP-Referer"] = app_url
+ if app_name:
+ headers["X-Title"] = app_name
+ return headers
+
+ async def _validate_api_key(
+ self,
+ *,
+ api_key: str,
+ app_url: str,
+ app_name: str,
+ base_url: str,
+ server_key: str,
+ ) -> None:
+ url = f"{base_url}/list-tools"
+ headers = self._build_api_headers(
+ api_key=api_key,
+ app_url=app_url,
+ app_name=app_name,
+ )
+ timeout = aiohttp.ClientTimeout(total=20)
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ async with session.post(
+ url,
+ json={"server": server_key},
+ headers=headers,
+ ) as response:
+ data = await response.json()
+ if response.status != 200:
+ raise RuntimeError(
+ f"MCPRouter API request failed: HTTP {response.status}"
+ )
+ if data.get("code") != 0:
+ raise ValueError(
+ f"MCPRouter API key validation failed: {data.get('message', 'unknown')}"
+ )
+
+ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
+ api_key = self._normalize_api_key(str(payload.get("api_key", "")))
+ if not api_key:
+ raise ValueError("Missing required field: api_key")
+
+ app_url = str(payload.get("app_url", "")).strip()
+ app_name = str(payload.get("app_name", "")).strip()
+
+ base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip(
+ "/"
+ )
+ list_url = f"{base_url}/list-servers"
+ get_url = f"{base_url}/get-server"
+
+ validation_server_key = "time"
+ provided_servers = payload.get("servers")
+ if isinstance(provided_servers, list) and provided_servers:
+ for item in provided_servers:
+ if not isinstance(item, dict):
+ continue
+ server_key = str(item.get("server_key") or "").strip()
+ config_name = str(item.get("config_name") or "").strip()
+ if server_key == "time" or config_name == "time":
+ validation_server_key = "time"
+ break
+ if server_key:
+ validation_server_key = server_key
+ break
+
+ await self._validate_api_key(
+ api_key=api_key,
+ app_url=app_url,
+ app_name=app_name,
+ base_url=base_url,
+ server_key=validation_server_key,
+ )
+
+ api_headers = self._build_api_headers(
+ api_key=api_key,
+ app_url=app_url,
+ app_name=app_name,
+ )
+
+ query = str(payload.get("query", "")).strip().lower()
+ raw_max_servers = payload.get("max_servers")
+ max_servers = int(raw_max_servers or 30)
+ max_servers = max(1, min(max_servers, 500))
+
+ limit = int(payload.get("limit", 30) or 30)
+ limit = max(1, min(limit, 100))
+
+ max_pages = int(payload.get("max_pages", 10) or 10)
+ max_pages = max(1, min(max_pages, 50))
+
+ mcp_headers: dict[str, str] = {
+ "Authorization": f"Bearer {api_key}",
+ "Content-Type": "application/json",
+ }
+ if app_url:
+ mcp_headers["HTTP-Referer"] = app_url
+ if app_name:
+ mcp_headers["X-Title"] = app_name
+
+ def parse_server_keys(value: Any) -> list[str]:
+ if isinstance(value, list):
+ parts = [str(item).strip() for item in value]
+ elif isinstance(value, str):
+ raw = value.replace(",", "\n").replace(";", "\n")
+ parts = [line.strip() for line in raw.splitlines()]
+ else:
+ return []
+ keys = [item for item in parts if item]
+ seen: set[str] = set()
+ result: list[str] = []
+ for item in keys:
+ if item in seen:
+ continue
+ seen.add(item)
+ result.append(item)
+ return result
+
+ def matches(server: dict[str, Any], q: str) -> bool:
+ if not q:
+ return True
+ haystacks = [
+ server.get("config_name"),
+ server.get("server_key"),
+ server.get("name"),
+ server.get("title"),
+ server.get("description"),
+ server.get("author_name"),
+ ]
+ combined = " ".join(str(v) for v in haystacks if v)
+ return q in combined.lower()
+
+ def make_item(
+ *,
+ name: str,
+ url: str,
+ used_names: set[str],
+ server_key: str | None = None,
+ ) -> SyncedMcpServer:
+ final_name = name
+ if final_name in used_names:
+ suffix = server_key or "dup"
+ final_name = f"{final_name}-{suffix}"
+ i = 2
+ while final_name in used_names:
+ final_name = f"{name}-{i}"
+ i += 1
+ used_names.add(final_name)
+ return SyncedMcpServer(
+ name=final_name,
+ config={
+ "url": url,
+ "transport": "streamable_http",
+ "headers": mcp_headers,
+ "active": True,
+ "provider": "mcprouter",
+ },
+ )
+
+ timeout = aiohttp.ClientTimeout(total=30)
+ used_names: set[str] = set()
+ items: list[SyncedMcpServer] = []
+
+ provided_servers = payload.get("servers")
+ if isinstance(provided_servers, list) and provided_servers:
+ selected_servers = (
+ provided_servers[:max_servers]
+ if raw_max_servers is not None
+ else provided_servers
+ )
+ for server in selected_servers:
+ if not isinstance(server, dict):
+ continue
+ server_key = server.get("server_key")
+ server_name = (
+ server.get("config_name")
+ or server_key
+ or server.get("name")
+ or server.get("title")
+ )
+ server_url = server.get("server_url")
+ if not server_name or not server_url:
+ continue
+ items.append(
+ make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ server_key=str(server_key) if server_key else None,
+ )
+ )
+ return items
+
+ server_keys = parse_server_keys(payload.get("server_keys"))
+ if server_keys:
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ for server_key in server_keys[:max_servers]:
+ async with session.post(
+ get_url,
+ json={"server": server_key},
+ headers=api_headers,
+ ) as response:
+ if response.status != 200:
+ raise RuntimeError(
+ f"MCPRouter API request failed: HTTP {response.status}"
+ )
+ data = await response.json()
+ if data.get("code") != 0:
+ raise RuntimeError(
+ f"MCPRouter API error: {data.get('message', 'unknown')}"
+ )
+ server = data.get("data") or {}
+ server_url = server.get("server_url")
+ if not server_url:
+ continue
+ server_name = (
+ server.get("config_name")
+ or server.get("server_key")
+ or server.get("name")
+ or server_key
+ )
+ items.append(
+ make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ server_key=server_key,
+ )
+ )
+ return items
+
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ for page in range(1, max_pages + 1):
+ async with session.post(
+ list_url,
+ json={"page": page, "limit": limit},
+ headers=api_headers,
+ ) as response:
+ if response.status != 200:
+ raise RuntimeError(
+ f"MCPRouter API request failed: HTTP {response.status}"
+ )
+ data = await response.json()
+ if data.get("code") != 0:
+ raise RuntimeError(
+ f"MCPRouter API error: {data.get('message', 'unknown')}"
+ )
+ batch = data.get("data", {}).get("servers", []) or []
+ if not batch:
+ break
+ for server in batch:
+ if not matches(server, query):
+ continue
+ server_url = server.get("server_url")
+ if not server_url:
+ continue
+ server_key = server.get("server_key")
+ server_name = (
+ server.get("config_name")
+ or server_key
+ or server.get("name")
+ )
+ if not server_name:
+ continue
+ items.append(
+ make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ server_key=str(server_key) if server_key else None,
+ )
+ )
+ if len(items) >= max_servers:
+ return items
+ if len(batch) < limit:
+ break
+
+ return items
+
+ async def list_servers(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
+ api_key = self._normalize_api_key(str(payload.get("api_key", "")))
+ if not api_key:
+ raise ValueError("Missing required field: api_key")
+
+ app_url = str(payload.get("app_url", "")).strip()
+ app_name = str(payload.get("app_name", "")).strip()
+ base_url = str(payload.get("api_base", "https://api.mcprouter.to/v1")).rstrip(
+ "/"
+ )
+ list_url = f"{base_url}/list-servers"
+
+ await self._validate_api_key(
+ api_key=api_key,
+ app_url=app_url,
+ app_name=app_name,
+ base_url=base_url,
+ server_key="time",
+ )
+
+ api_headers = self._build_api_headers(
+ api_key=api_key,
+ app_url=app_url,
+ app_name=app_name,
+ )
+
+ limit = 100
+ max_pages = 20
+ max_items = 2000
+
+ servers: list[dict[str, Any]] = []
+ timeout = aiohttp.ClientTimeout(total=30)
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ for page in range(1, max_pages + 1):
+ async with session.post(
+ list_url,
+ json={"page": page, "limit": limit},
+ headers=api_headers,
+ ) as response:
+ if response.status != 200:
+ raise RuntimeError(
+ f"MCPRouter API request failed: HTTP {response.status}"
+ )
+ data = await response.json()
+ if data.get("code") != 0:
+ raise RuntimeError(
+ f"MCPRouter API error: {data.get('message', 'unknown')}"
+ )
+ batch = data.get("data", {}).get("servers", []) or []
+ if not batch:
+ break
+ for item in batch:
+ if not isinstance(item, dict):
+ continue
+ server_url = item.get("server_url")
+ server_key = item.get("server_key")
+ config_name = item.get("config_name")
+ if not server_url or not (server_key or config_name):
+ continue
+ servers.append(item)
+ if len(servers) >= max_items:
+ return servers
+ if len(batch) < limit:
+ break
+
+ return servers
diff --git a/astrbot/dashboard/routes/tools.py b/astrbot/dashboard/routes/tools.py
index 333700410..ba8643fb1 100644
--- a/astrbot/dashboard/routes/tools.py
+++ b/astrbot/dashboard/routes/tools.py
@@ -26,6 +26,10 @@ def __init__(
"/tools/mcp/update": ("POST", self.update_mcp_server),
"/tools/mcp/delete": ("POST", self.delete_mcp_server),
"/tools/mcp/test": ("POST", self.test_mcp_connection),
+ "/tools/mcp/providers/mcprouter/list-servers": (
+ "POST",
+ self.list_mcprouter_servers,
+ ),
"/tools/list": ("GET", self.get_tool_list),
"/tools/toggle-tool": ("POST", self.toggle_tool),
"/tools/mcp/sync-provider": ("POST", self.sync_provider),
@@ -379,19 +383,77 @@ async def toggle_tool(self):
logger.error(traceback.format_exc())
return Response().error(f"操作工具失败: {e!s}").__dict__
+ async def list_mcprouter_servers(self):
+ """List MCP servers from MCPRouter."""
+ try:
+ data = await request.json
+ api_key = str((data or {}).get("api_key", "")).strip()
+ if not api_key:
+ return Response().error("缺少必要参数: api_key").__dict__
+
+ app_url = str((data or {}).get("app_url", "")).strip()
+ if not app_url:
+ app_url = (
+ request.headers.get("Origin")
+ or request.headers.get("Referer")
+ or ""
+ )
+ app_name = str((data or {}).get("app_name", "")).strip() or "AstrBot"
+ api_base = (
+ str((data or {}).get("api_base", "https://api.mcprouter.to/v1")).strip()
+ or "https://api.mcprouter.to/v1"
+ )
+
+ servers = await self.tool_mgr.list_mcp_servers_from_provider(
+ "mcprouter",
+ {
+ "api_key": api_key,
+ "app_url": app_url,
+ "app_name": app_name,
+ "api_base": api_base,
+ },
+ )
+ return (
+ Response()
+ .ok(data=servers, message=f"已获取 {len(servers)} 个服务器")
+ .__dict__
+ )
+ except Exception as e:
+ logger.error(traceback.format_exc())
+ return Response().error(f"获取 MCPRouter 服务器列表失败: {e!s}").__dict__
+
async def sync_provider(self):
"""同步 MCP 提供者配置"""
try:
data = await request.json
provider_name = data.get("name") # modelscope, or others
- match provider_name:
- case "modelscope":
- access_token = data.get("access_token", "")
- await self.tool_mgr.sync_modelscope_mcp_servers(access_token)
- case _:
- return Response().error(f"未知: {provider_name}").__dict__
-
- return Response().ok(message="同步成功").__dict__
+ if not provider_name:
+ return Response().error("缺少必要参数: name").__dict__
+
+ if provider_name == "mcprouter":
+ data.setdefault(
+ "app_url",
+ request.headers.get("Origin")
+ or request.headers.get("Referer")
+ or "",
+ )
+ data.setdefault("app_name", "AstrBot")
+
+ result = await self.tool_mgr.sync_mcp_servers_from_provider(
+ provider_name,
+ data,
+ )
+ synced = int(result.get("synced", 0) or 0)
+ enabled = int(result.get("enabled", 0) or 0)
+ failed = int(result.get("failed", 0) or 0)
+
+ if synced == 0:
+ return Response().ok(message="未找到可同步的 MCP 服务器").__dict__
+
+ msg = f"同步完成:同步 {synced} 个,启用 {enabled} 个"
+ if failed:
+ msg += f",失败 {failed} 个"
+ return Response().ok(message=msg).__dict__
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(f"同步失败: {e!s}").__dict__
From 892bfcdc20c6b9a42bed651d1918fd9f89ee2f41 Mon Sep 17 00:00:00 2001
From: xiaoxi68 <3520824673@qq.com>
Date: Mon, 2 Mar 2026 21:31:57 +0800
Subject: [PATCH 2/5] feat(dashboard): fetch and filter mcprouter servers
before sync
(cherry picked from commit 3bbf33dd3d8c05210767bb40babca31f5363ae3f)
(cherry picked from commit 30392fb2bf2c11031ef991a9da7aa49795e13d71)
---
.../extension/McpServersSection.vue | 130 +++++++++++++++++-
.../i18n/locales/en-US/features/tool-use.json | 8 +-
.../i18n/locales/zh-CN/features/tool-use.json | 20 ++-
3 files changed, 155 insertions(+), 3 deletions(-)
diff --git a/dashboard/src/components/extension/McpServersSection.vue b/dashboard/src/components/extension/McpServersSection.vue
index 95b679580..073f9d15e 100644
--- a/dashboard/src/components/extension/McpServersSection.vue
+++ b/dashboard/src/components/extension/McpServersSection.vue
@@ -192,6 +192,68 @@
+
+
+
+
+
创建 API Key
+
+ 访问 MCPRouter 创建并复制您的 API Key。
+
+
+
+
+
+
+
输入 API Key
+
+ 输入 API Key 以同步 MCP 服务器。
+
+
+
+ {{ tm('syncProvider.buttons.fetchServers') }}
+
+
+
+
+ {{ tm('syncProvider.status.fetchedServers', { count: mcprouterServers.length }) }}
+
+
+
+
+
+ {{ server.title || server.config_name || server.server_key || server.name }}
+ ({{ server.server_key }})
+
+
+ {{ server.description || server.author_name || server.config_name || '' }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
可选:应用标识
+
+ 部分 MCPRouter 服务可能会校验以下标识(可留空)。
+
+
+
+
+
+
+
@@ -241,8 +303,13 @@ export default {
mcpServers: [],
showMcpServerDialog: false,
selectedMcpServerProvider: 'modelscope',
- mcpServerProviderList: ['modelscope'],
+ mcpServerProviderList: ['modelscope', 'mcprouter'],
mcpProviderToken: '',
+ mcprouterApiKey: '',
+ mcprouterAppUrl: '',
+ mcprouterAppName: 'AstrBot',
+ mcprouterServersLoading: false,
+ mcprouterServers: [],
showSyncMcpServerDialog: false,
addServerDialogMessage: '',
loading: false,
@@ -283,6 +350,7 @@ export default {
},
mounted() {
this.getServers();
+ this.mcprouterAppUrl = window.location.origin;
this.refreshInterval = setInterval(() => {
this.getServers();
}, 5000);
@@ -486,6 +554,40 @@ export default {
this.save_message_success = 'error';
this.save_message_snack = true;
},
+ async fetchMcpRouterServers() {
+ if (!this.mcprouterApiKey.trim()) {
+ this.showError(this.tm('syncProvider.status.enterApiKey'));
+ return;
+ }
+ this.mcprouterServersLoading = true;
+ try {
+ const requestData = {
+ api_key: this.mcprouterApiKey.trim()
+ };
+ if (this.mcprouterAppUrl.trim()) {
+ requestData.app_url = this.mcprouterAppUrl.trim();
+ }
+ if (this.mcprouterAppName.trim()) {
+ requestData.app_name = this.mcprouterAppName.trim();
+ }
+ const response = await axios.post('/api/tools/mcp/providers/mcprouter/list-servers', requestData);
+ if (response.data.status === 'ok') {
+ this.mcprouterServers = response.data.data || [];
+ this.showSuccess(response.data.message || this.tm('syncProvider.messages.fetchServersSuccess', { count: this.mcprouterServers.length }));
+ } else {
+ this.showError(response.data.message || this.tm('syncProvider.messages.fetchServersError', { error: 'Unknown error' }));
+ }
+ } catch (error) {
+ this.showError(this.tm('syncProvider.messages.fetchServersError', {
+ error: error.response?.data?.message || error.message || '网络连接或 API Key 问题'
+ }));
+ } finally {
+ this.mcprouterServersLoading = false;
+ }
+ },
+ removeMcpRouterServer(index) {
+ this.mcprouterServers.splice(index, 1);
+ },
async syncMcpServers() {
if (!this.selectedMcpServerProvider) {
this.showError(this.tm('syncProvider.status.selectProvider'));
@@ -503,12 +605,33 @@ export default {
return;
}
requestData.access_token = this.mcpProviderToken.trim();
+ } else if (this.selectedMcpServerProvider === 'mcprouter') {
+ if (!this.mcprouterApiKey.trim()) {
+ this.showError(this.tm('syncProvider.status.enterApiKey'));
+ this.loading = false;
+ return;
+ }
+ if (!this.mcprouterServers.length) {
+ this.showError(this.tm('syncProvider.status.fetchServersFirst'));
+ this.loading = false;
+ return;
+ }
+ requestData.api_key = this.mcprouterApiKey.trim();
+ if (this.mcprouterAppUrl.trim()) {
+ requestData.app_url = this.mcprouterAppUrl.trim();
+ }
+ if (this.mcprouterAppName.trim()) {
+ requestData.app_name = this.mcprouterAppName.trim();
+ }
+ requestData.servers = this.mcprouterServers;
}
const response = await axios.post('/api/tools/mcp/sync-provider', requestData);
if (response.data.status === 'ok') {
this.showSuccess(response.data.message || this.tm('syncProvider.messages.syncSuccess'));
this.showSyncMcpServerDialog = false;
this.mcpProviderToken = '';
+ this.mcprouterApiKey = '';
+ this.mcprouterServers = [];
this.getServers();
} else {
this.showError(response.data.message || this.tm('syncProvider.messages.syncError', { error: 'Unknown error' }));
@@ -538,4 +661,9 @@ export default {
margin-top: 4px;
overflow: hidden;
}
+
+.mcprouter-server-list {
+ max-height: 260px;
+ overflow-y: auto;
+}
diff --git a/dashboard/src/i18n/locales/en-US/features/tool-use.json b/dashboard/src/i18n/locales/en-US/features/tool-use.json
index 2c68b8243..42c1f69f1 100644
--- a/dashboard/src/i18n/locales/en-US/features/tool-use.json
+++ b/dashboard/src/i18n/locales/en-US/features/tool-use.json
@@ -122,16 +122,22 @@
"previous": "Previous",
"next": "Next",
"sync": "Start Sync",
- "getToken": "Get Token"
+ "getToken": "Get Token",
+ "fetchServers": "Fetch Servers"
},
"status": {
"selectProvider": "Please select an MCP server provider",
"enterToken": "Please enter the access token to continue",
+ "enterApiKey": "Please enter the API key to continue",
+ "fetchServersFirst": "Please fetch the server list first",
+ "fetchedServers": "Fetched servers ({count})",
"readyToSync": "Ready to sync server configurations"
},
"messages": {
"syncSuccess": "MCP servers synced successfully!",
"syncError": "Sync failed: {error}",
+ "fetchServersSuccess": "Fetched {count} servers",
+ "fetchServersError": "Failed to fetch server list: {error}",
"tokenHelp": "How to get a ModelScope access token? Click the button on the right for instructions"
}
},
diff --git a/dashboard/src/i18n/locales/zh-CN/features/tool-use.json b/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
index f6e6c4407..296b31f73 100644
--- a/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
+++ b/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
@@ -135,6 +135,24 @@
}
}
},
+ "syncProvider": {
+ "buttons": {
+ "fetchServers": "获取服务器列表"
+ },
+ "status": {
+ "selectProvider": "请选择一个 MCP 服务器提供商",
+ "enterToken": "请输入访问令牌以继续",
+ "enterApiKey": "请输入 API Key 以继续",
+ "fetchServersFirst": "请先获取服务器列表",
+ "fetchedServers": "已获取服务器 ({count})"
+ },
+ "messages": {
+ "syncSuccess": "MCP 服务器同步成功!",
+ "syncError": "同步失败: {error}",
+ "fetchServersSuccess": "已获取 {count} 个服务器",
+ "fetchServersError": "获取服务器列表失败: {error}"
+ }
+ },
"messages": {
"getServersError": "获取 MCP 服务器列表失败: {error}",
"getToolsError": "获取函数工具列表失败: {error}",
@@ -156,4 +174,4 @@
"toggleToolError": "工具状态切换失败: {error}",
"testError": "测试连接失败: {error}"
}
-}
\ No newline at end of file
+}
From 921878a509dd4c7ad5cece2bf9de26745aaf277a Mon Sep 17 00:00:00 2001
From: xiaoxi68 <3520824673@qq.com>
Date: Mon, 2 Mar 2026 21:33:09 +0800
Subject: [PATCH 3/5] fix(mcp): improve connection errors and reduce noisy logs
(cherry picked from commit 447c221963fdbca66453e9031d6133c010cc4b4b)
(cherry picked from commit e068be838cf0f9e41638777bb23b0bc44e7d1bb7)
---
astrbot/core/agent/mcp_client.py | 71 +++++++++++++++++++++-
astrbot/core/provider/func_tool_manager.py | 58 ++++++++++++++++--
2 files changed, 122 insertions(+), 7 deletions(-)
diff --git a/astrbot/core/agent/mcp_client.py b/astrbot/core/agent/mcp_client.py
index 18f4d47e0..d65459298 100644
--- a/astrbot/core/agent/mcp_client.py
+++ b/astrbot/core/agent/mcp_client.py
@@ -19,10 +19,34 @@
from .run_context import TContext
from .tool import FunctionTool
+
+class _McpSseNoiseFilter(logging.Filter):
+ def filter(self, record: logging.LogRecord) -> bool:
+ try:
+ msg = record.getMessage().strip()
+ except Exception:
+ return True
+ if msg.startswith("Unknown SSE event:"):
+ event_name = msg.split(":", 1)[1].strip()
+ if event_name in {"stream", "connection"}:
+ return False
+ return True
+
+
+def _install_mcp_noise_filters() -> None:
+ for logger_name in ("mcp.client.streamable_http", "mcp.client.sse"):
+ log = logging.getLogger(logger_name)
+ if any(isinstance(f, _McpSseNoiseFilter) for f in log.filters):
+ continue
+ log.addFilter(_McpSseNoiseFilter())
+
+
try:
import anyio
import mcp
from mcp.client.sse import sse_client
+
+ _install_mcp_noise_filters()
except (ModuleNotFoundError, ImportError):
logger.warning(
"Warning: Missing 'mcp' dependency, MCP services will be unavailable."
@@ -47,6 +71,8 @@ def _prepare_config(config: dict) -> dict:
async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
"""Quick test MCP server connectivity"""
+ import json
+
import aiohttp
cfg = _prepare_config(config.copy())
@@ -55,6 +81,40 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
headers = cfg.get("headers", {})
timeout = cfg.get("timeout", 10)
+ async def _format_http_error(response: aiohttp.ClientResponse) -> str:
+ reason = response.reason or ""
+ detail = ""
+ try:
+ raw = await response.content.read(2048)
+ if raw:
+ text = raw.decode(errors="replace").strip()
+ if text:
+ try:
+ data = json.loads(text)
+ except Exception:
+ detail = text
+ else:
+ if isinstance(data, dict):
+ msg = (
+ data.get("message")
+ or data.get("error")
+ or data.get("detail")
+ )
+ code = data.get("code")
+ if msg is not None:
+ detail = (
+ f"{code}: {msg}" if code is not None else str(msg)
+ )
+ else:
+ detail = text
+ else:
+ detail = text
+ except Exception:
+ detail = ""
+ if detail:
+ return f"HTTP {response.status}: {reason} ({detail})"
+ return f"HTTP {response.status}: {reason}"
+
try:
if "transport" in cfg:
transport_type = cfg["transport"]
@@ -70,7 +130,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
"method": "initialize",
"id": 0,
"params": {
- "protocolVersion": "2024-11-05",
+ "protocolVersion": mcp.types.LATEST_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.2.3"},
},
@@ -87,7 +147,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
) as response:
if response.status == 200:
return True, ""
- return False, f"HTTP {response.status}: {response.reason}"
+ return False, await _format_http_error(response)
else:
async with session.get(
url,
@@ -99,7 +159,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
) as response:
if response.status == 200:
return True, ""
- return False, f"HTTP {response.status}: {response.reason}"
+ return False, await _format_http_error(response)
except asyncio.TimeoutError:
return False, f"Connection timeout: {timeout} seconds"
@@ -146,6 +206,11 @@ async def connect_to_server(self, mcp_server_config: dict, name: str) -> None:
def logging_callback(msg: str) -> None:
# Handle MCP service error logs
+ normalized = msg.strip()
+ if normalized.startswith("Unknown SSE event:"):
+ event_name = normalized.split(":", 1)[1].strip()
+ if event_name in {"stream", "connection"}:
+ return
print(f"MCP Server {name} Error: {msg}")
self.server_errlogs.append(msg)
diff --git a/astrbot/core/provider/func_tool_manager.py b/astrbot/core/provider/func_tool_manager.py
index ddcb0755a..39c3e2a15 100644
--- a/astrbot/core/provider/func_tool_manager.py
+++ b/astrbot/core/provider/func_tool_manager.py
@@ -50,7 +50,10 @@ def _prepare_config(config: dict) -> dict:
async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
"""快速测试 MCP 服务器可达性"""
+ import json
+
import aiohttp
+ import mcp
cfg = _prepare_config(config.copy())
@@ -58,6 +61,41 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
headers = cfg.get("headers", {})
timeout = cfg.get("timeout", 10)
+ async def _format_http_error(response: aiohttp.ClientResponse) -> str:
+ reason = response.reason or ""
+ detail = ""
+ try:
+ raw = await response.content.read(2048)
+ if raw:
+ text = raw.decode(errors="replace").strip()
+ if text:
+ try:
+ data = json.loads(text)
+ except Exception:
+ detail = text
+ else:
+ if isinstance(data, dict):
+ msg = (
+ data.get("message")
+ or data.get("error")
+ or data.get("detail")
+ )
+ code = data.get("code")
+ if msg is not None:
+ detail = (
+ f"{code}: {msg}" if code is not None else str(msg)
+ )
+ else:
+ detail = text
+ else:
+ detail = text
+ except Exception:
+ detail = ""
+
+ if detail:
+ return f"HTTP {response.status}: {reason} ({detail})"
+ return f"HTTP {response.status}: {reason}"
+
try:
async with aiohttp.ClientSession() as session:
if cfg.get("transport") == "streamable_http":
@@ -66,7 +104,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
"method": "initialize",
"id": 0,
"params": {
- "protocolVersion": "2024-11-05",
+ "protocolVersion": mcp.types.LATEST_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.2.3"},
},
@@ -83,7 +121,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
) as response:
if response.status == 200:
return True, ""
- return False, f"HTTP {response.status}: {response.reason}"
+ return False, await _format_http_error(response)
else:
async with session.get(
url,
@@ -95,7 +133,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
) as response:
if response.status == 200:
return True, ""
- return False, f"HTTP {response.status}: {response.reason}"
+ return False, await _format_http_error(response)
except asyncio.TimeoutError:
return False, f"连接超时: {timeout}秒"
@@ -238,7 +276,19 @@ async def _init_mcp_client_task_wrapper(
await event.wait()
logger.info(f"收到 MCP 客户端 {name} 终止信号")
except Exception as e:
- logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
+ msg = str(e).lower()
+ is_invalid_key = (
+ "invalid apikey" in msg or "invalid authorization key" in msg
+ )
+ if is_invalid_key:
+ if str(cfg.get("provider", "")).lower() == "mcprouter":
+ logger.warning(
+ f"初始化 MCP 客户端 {name} 失败:MCPRouter API Key 无效(请重新同步/更新 API Key): {e!s}",
+ )
+ else:
+ logger.warning(f"初始化 MCP 客户端 {name} 失败: {e!s}")
+ else:
+ logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
if ready_future and not ready_future.done():
ready_future.set_exception(e)
finally:
From d56587784e6191f702f8b8a636cbda0b39df2557 Mon Sep 17 00:00:00 2001
From: xiaoxi68 <3520824673@qq.com>
Date: Mon, 2 Mar 2026 23:17:01 +0800
Subject: [PATCH 4/5] fix(mcprouter): harden sync flow and i18n polish
- validate API key via list-servers instead of fixed server keys
- centralize MCPRouter HTTP/JSON error handling for clearer failures
- stop sync when mcp config persistence fails
- add null-safe request body handling in sync-provider route
- move new MCPRouter UI strings to i18n and clean zh-CN duplicate syncProvider block
---
astrbot/core/provider/func_tool_manager.py | 3 +-
astrbot/core/provider/mcp_sync_providers.py | 274 +++++++++---------
astrbot/dashboard/routes/tools.py | 2 +-
.../extension/McpServersSection.vue | 40 ++-
.../i18n/locales/en-US/features/tool-use.json | 17 ++
.../i18n/locales/zh-CN/features/tool-use.json | 83 +++---
6 files changed, 231 insertions(+), 188 deletions(-)
diff --git a/astrbot/core/provider/func_tool_manager.py b/astrbot/core/provider/func_tool_manager.py
index 39c3e2a15..5d32e6761 100644
--- a/astrbot/core/provider/func_tool_manager.py
+++ b/astrbot/core/provider/func_tool_manager.py
@@ -614,7 +614,8 @@ async def sync_mcp_servers_from_provider(
for item in servers:
local_mcp_config["mcpServers"][item.name] = item.config
- self.save_mcp_config(local_mcp_config)
+ if not self.save_mcp_config(local_mcp_config):
+ raise RuntimeError("保存 MCP 配置失败,已取消同步启用")
enabled_count, failures = await self._enable_mcp_servers_with_concurrency_limit(
[item.name for item in servers],
diff --git a/astrbot/core/provider/mcp_sync_providers.py b/astrbot/core/provider/mcp_sync_providers.py
index e40818d62..0a8ad9053 100644
--- a/astrbot/core/provider/mcp_sync_providers.py
+++ b/astrbot/core/provider/mcp_sync_providers.py
@@ -93,6 +93,59 @@ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
@register_mcp_sync_provider("mcprouter")
class McpRouterMcpServerSyncProvider(McpServerSyncProvider):
+ @staticmethod
+ def _build_error_detail(data: dict[str, Any]) -> str:
+ message = data.get("message") or data.get("error") or data.get("detail")
+ code = data.get("code")
+ if message is None:
+ return ""
+ message_text = str(message).strip()
+ if not message_text:
+ return ""
+ if code is None:
+ return message_text
+ return f"{code}: {message_text}"
+
+ async def _post_json(
+ self,
+ *,
+ session: aiohttp.ClientSession,
+ url: str,
+ payload: dict[str, Any],
+ headers: dict[str, str],
+ action: str,
+ ) -> dict[str, Any]:
+ async with session.post(url, json=payload, headers=headers) as response:
+ body_text = ""
+ data: dict[str, Any] = {}
+ try:
+ parsed = await response.json(content_type=None)
+ if isinstance(parsed, dict):
+ data = parsed
+ except Exception:
+ body_text = (await response.text()).strip()
+
+ if response.status != 200:
+ reason = response.reason or ""
+ detail = self._build_error_detail(data) or body_text[:300]
+ if detail:
+ raise RuntimeError(
+ f"{action} failed: HTTP {response.status} {reason} ({detail})"
+ )
+ raise RuntimeError(f"{action} failed: HTTP {response.status} {reason}")
+
+ if not data:
+ detail = body_text[:300] if body_text else "empty or non-json response"
+ raise RuntimeError(f"{action} failed: invalid response ({detail})")
+
+ return data
+
+ def _ensure_api_success(self, data: dict[str, Any], *, action: str) -> None:
+ if data.get("code") == 0:
+ return
+ detail = self._build_error_detail(data) or "unknown error"
+ raise RuntimeError(f"{action} failed: {detail}")
+
def _normalize_api_key(self, value: str) -> str:
raw = value.strip()
if not raw:
@@ -131,9 +184,8 @@ async def _validate_api_key(
app_url: str,
app_name: str,
base_url: str,
- server_key: str,
) -> None:
- url = f"{base_url}/list-tools"
+ url = f"{base_url}/list-servers"
headers = self._build_api_headers(
api_key=api_key,
app_url=app_url,
@@ -141,21 +193,18 @@ async def _validate_api_key(
)
timeout = aiohttp.ClientTimeout(total=20)
async with aiohttp.ClientSession(timeout=timeout) as session:
- async with session.post(
- url,
- json={"server": server_key},
+ data = await self._post_json(
+ session=session,
+ url=url,
+ payload={"page": 1, "limit": 1},
headers=headers,
- ) as response:
- data = await response.json()
- if response.status != 200:
- raise RuntimeError(
- f"MCPRouter API request failed: HTTP {response.status}"
- )
- if data.get("code") != 0:
- raise ValueError(
- f"MCPRouter API key validation failed: {data.get('message', 'unknown')}"
+ action="MCPRouter API key validation",
)
+ if data.get("code") != 0:
+ detail = self._build_error_detail(data) or "unknown"
+ raise ValueError(f"MCPRouter API key validation failed: {detail}")
+
async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
api_key = self._normalize_api_key(str(payload.get("api_key", "")))
if not api_key:
@@ -170,27 +219,11 @@ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
list_url = f"{base_url}/list-servers"
get_url = f"{base_url}/get-server"
- validation_server_key = "time"
- provided_servers = payload.get("servers")
- if isinstance(provided_servers, list) and provided_servers:
- for item in provided_servers:
- if not isinstance(item, dict):
- continue
- server_key = str(item.get("server_key") or "").strip()
- config_name = str(item.get("config_name") or "").strip()
- if server_key == "time" or config_name == "time":
- validation_server_key = "time"
- break
- if server_key:
- validation_server_key = server_key
- break
-
await self._validate_api_key(
api_key=api_key,
app_url=app_url,
app_name=app_name,
base_url=base_url,
- server_key=validation_server_key,
)
api_headers = self._build_api_headers(
@@ -316,85 +349,71 @@ def make_item(
if server_keys:
async with aiohttp.ClientSession(timeout=timeout) as session:
for server_key in server_keys[:max_servers]:
- async with session.post(
- get_url,
- json={"server": server_key},
+ data = await self._post_json(
+ session=session,
+ url=get_url,
+ payload={"server": server_key},
headers=api_headers,
- ) as response:
- if response.status != 200:
- raise RuntimeError(
- f"MCPRouter API request failed: HTTP {response.status}"
- )
- data = await response.json()
- if data.get("code") != 0:
- raise RuntimeError(
- f"MCPRouter API error: {data.get('message', 'unknown')}"
- )
- server = data.get("data") or {}
- server_url = server.get("server_url")
- if not server_url:
- continue
- server_name = (
- server.get("config_name")
- or server.get("server_key")
- or server.get("name")
- or server_key
- )
- items.append(
- make_item(
- name=str(server_name),
- url=str(server_url),
- used_names=used_names,
- server_key=server_key,
- )
+ action="MCPRouter get-server",
+ )
+ self._ensure_api_success(data, action="MCPRouter get-server")
+ server = data.get("data") or {}
+ server_url = server.get("server_url")
+ if not server_url:
+ continue
+ server_name = (
+ server.get("config_name")
+ or server.get("server_key")
+ or server.get("name")
+ or server_key
+ )
+ items.append(
+ make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ server_key=server_key,
)
+ )
return items
async with aiohttp.ClientSession(timeout=timeout) as session:
for page in range(1, max_pages + 1):
- async with session.post(
- list_url,
- json={"page": page, "limit": limit},
+ data = await self._post_json(
+ session=session,
+ url=list_url,
+ payload={"page": page, "limit": limit},
headers=api_headers,
- ) as response:
- if response.status != 200:
- raise RuntimeError(
- f"MCPRouter API request failed: HTTP {response.status}"
- )
- data = await response.json()
- if data.get("code") != 0:
- raise RuntimeError(
- f"MCPRouter API error: {data.get('message', 'unknown')}"
- )
- batch = data.get("data", {}).get("servers", []) or []
- if not batch:
- break
- for server in batch:
- if not matches(server, query):
- continue
- server_url = server.get("server_url")
- if not server_url:
- continue
- server_key = server.get("server_key")
- server_name = (
- server.get("config_name")
- or server_key
- or server.get("name")
- )
- if not server_name:
- continue
- items.append(
- make_item(
- name=str(server_name),
- url=str(server_url),
- used_names=used_names,
- server_key=str(server_key) if server_key else None,
- )
+ action="MCPRouter list-servers",
+ )
+ self._ensure_api_success(data, action="MCPRouter list-servers")
+ batch = data.get("data", {}).get("servers", []) or []
+ if not batch:
+ break
+ for server in batch:
+ if not matches(server, query):
+ continue
+ server_url = server.get("server_url")
+ if not server_url:
+ continue
+ server_key = server.get("server_key")
+ server_name = (
+ server.get("config_name") or server_key or server.get("name")
+ )
+ if not server_name:
+ continue
+ items.append(
+ make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ server_key=str(server_key) if server_key else None,
)
- if len(items) >= max_servers:
- return items
- if len(batch) < limit:
- break
+ )
+ if len(items) >= max_servers:
+ return items
+ if len(batch) < limit:
+ break
return items
@@ -415,7 +434,6 @@ async def list_servers(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
app_url=app_url,
app_name=app_name,
base_url=base_url,
- server_key="time",
)
api_headers = self._build_api_headers(
@@ -432,35 +450,29 @@ async def list_servers(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
for page in range(1, max_pages + 1):
- async with session.post(
- list_url,
- json={"page": page, "limit": limit},
+ data = await self._post_json(
+ session=session,
+ url=list_url,
+ payload={"page": page, "limit": limit},
headers=api_headers,
- ) as response:
- if response.status != 200:
- raise RuntimeError(
- f"MCPRouter API request failed: HTTP {response.status}"
- )
- data = await response.json()
- if data.get("code") != 0:
- raise RuntimeError(
- f"MCPRouter API error: {data.get('message', 'unknown')}"
- )
- batch = data.get("data", {}).get("servers", []) or []
- if not batch:
- break
- for item in batch:
- if not isinstance(item, dict):
- continue
- server_url = item.get("server_url")
- server_key = item.get("server_key")
- config_name = item.get("config_name")
- if not server_url or not (server_key or config_name):
- continue
- servers.append(item)
- if len(servers) >= max_items:
- return servers
- if len(batch) < limit:
- break
+ action="MCPRouter list-servers",
+ )
+ self._ensure_api_success(data, action="MCPRouter list-servers")
+ batch = data.get("data", {}).get("servers", []) or []
+ if not batch:
+ break
+ for item in batch:
+ if not isinstance(item, dict):
+ continue
+ server_url = item.get("server_url")
+ server_key = item.get("server_key")
+ config_name = item.get("config_name")
+ if not server_url or not (server_key or config_name):
+ continue
+ servers.append(item)
+ if len(servers) >= max_items:
+ return servers
+ if len(batch) < limit:
+ break
return servers
diff --git a/astrbot/dashboard/routes/tools.py b/astrbot/dashboard/routes/tools.py
index ba8643fb1..3c7769e55 100644
--- a/astrbot/dashboard/routes/tools.py
+++ b/astrbot/dashboard/routes/tools.py
@@ -425,7 +425,7 @@ async def list_mcprouter_servers(self):
async def sync_provider(self):
"""同步 MCP 提供者配置"""
try:
- data = await request.json
+ data = (await request.json) or {}
provider_name = data.get("name") # modelscope, or others
if not provider_name:
return Response().error("缺少必要参数: name").__dict__
diff --git a/dashboard/src/components/extension/McpServersSection.vue b/dashboard/src/components/extension/McpServersSection.vue
index 073f9d15e..69aab7961 100644
--- a/dashboard/src/components/extension/McpServersSection.vue
+++ b/dashboard/src/components/extension/McpServersSection.vue
@@ -152,14 +152,14 @@
-
-
- 同步外部平台 MCP 服务器
-
+
+
+ {{ tm('syncProvider.title') }}
+
+ :label="tm('syncProvider.fields.provider')" variant="outlined" required>
@@ -196,21 +196,23 @@
-
创建 API Key
+
{{ tm('syncProvider.timeline.mcprouter.createApiKeyTitle') }}
- 访问 MCPRouter 创建并复制您的 API Key。
+ {{ tm('syncProvider.timeline.mcprouter.createApiKeyDescPrefix') }}
+ MCPRouter
+ {{ tm('syncProvider.timeline.mcprouter.createApiKeyDescSuffix') }}
-
输入 API Key
+
{{ tm('syncProvider.timeline.mcprouter.inputApiKeyTitle') }}
- 输入 API Key 以同步 MCP 服务器。
+ {{ tm('syncProvider.timeline.mcprouter.inputApiKeyDesc') }}
+ :label="tm('syncProvider.fields.apiKey')" class="mt-2" hide-details/>
{{ tm('syncProvider.buttons.fetchServers') }}
@@ -242,14 +244,14 @@
-
可选:应用标识
+
{{ tm('syncProvider.timeline.mcprouter.optionalAppInfoTitle') }}
- 部分 MCPRouter 服务可能会校验以下标识(可留空)。
+ {{ tm('syncProvider.timeline.mcprouter.optionalAppInfoDesc') }}
+ :label="tm('syncProvider.fields.appUrl')" class="mt-2" hide-details/>
+ :label="tm('syncProvider.fields.appName')" class="mt-2" hide-details/>
@@ -579,7 +581,10 @@ export default {
}
} catch (error) {
this.showError(this.tm('syncProvider.messages.fetchServersError', {
- error: error.response?.data?.message || error.message || '网络连接或 API Key 问题'
+ error:
+ error.response?.data?.message ||
+ error.message ||
+ this.tm('syncProvider.messages.networkOrApiKeyIssue')
}));
} finally {
this.mcprouterServersLoading = false;
@@ -638,7 +643,10 @@ export default {
}
} catch (error) {
this.showError(this.tm('syncProvider.messages.syncError', {
- error: error.response?.data?.message || error.message || '网络连接或访问令牌问题'
+ error:
+ error.response?.data?.message ||
+ error.message ||
+ this.tm('syncProvider.messages.networkOrTokenIssue')
}));
} finally {
this.loading = false;
diff --git a/dashboard/src/i18n/locales/en-US/features/tool-use.json b/dashboard/src/i18n/locales/en-US/features/tool-use.json
index 42c1f69f1..6d548ca7a 100644
--- a/dashboard/src/i18n/locales/en-US/features/tool-use.json
+++ b/dashboard/src/i18n/locales/en-US/features/tool-use.json
@@ -109,14 +109,29 @@
},
"providers": {
"modelscope": "ModelScope",
+ "mcprouter": "MCPRouter",
"description": "ModelScope is an open model community providing MCP servers for various machine learning and AI services"
},
"fields": {
"provider": "Select Provider",
"accessToken": "Access Token",
+ "apiKey": "API Key",
+ "appUrl": "HTTP-Referer",
+ "appName": "X-Title",
"tokenRequired": "Access token is required",
"tokenHint": "Please enter your ModelScope access token"
},
+ "timeline": {
+ "mcprouter": {
+ "createApiKeyTitle": "Create API Key",
+ "createApiKeyDescPrefix": "Visit ",
+ "createApiKeyDescSuffix": " to create and copy your API key.",
+ "inputApiKeyTitle": "Enter API Key",
+ "inputApiKeyDesc": "Enter your API key to sync MCP servers.",
+ "optionalAppInfoTitle": "Optional: App Identity",
+ "optionalAppInfoDesc": "Some MCPRouter services may validate the following headers (optional)."
+ }
+ },
"buttons": {
"cancel": "Cancel",
"previous": "Previous",
@@ -138,6 +153,8 @@
"syncError": "Sync failed: {error}",
"fetchServersSuccess": "Fetched {count} servers",
"fetchServersError": "Failed to fetch server list: {error}",
+ "networkOrApiKeyIssue": "Network connection issue or invalid API key",
+ "networkOrTokenIssue": "Network connection issue or invalid access token",
"tokenHelp": "How to get a ModelScope access token? Click the button on the right for instructions"
}
},
diff --git a/dashboard/src/i18n/locales/zh-CN/features/tool-use.json b/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
index 296b31f73..db2c152d3 100644
--- a/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
+++ b/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
@@ -97,46 +97,47 @@
"importConfig": "导入配置"
}
},
- "confirmDelete": "确定要删除服务器 {name} 吗?",
- "syncProvider": {
- "title": "同步 MCP 服务器",
- "subtitle": "从提供商同步 MCP 服务器配置到本地",
- "steps": {
- "selectProvider": "步骤 1: 选择提供商",
- "configureAuth": "步骤 2: 配置认证",
- "syncServers": "步骤 3: 同步服务器"
- },
- "providers": {
- "modelscope": "ModelScope",
- "description": "ModelScope 是一个开源的模型社区,提供各种机器学习和AI服务的MCP服务器"
- },
- "fields": {
- "provider": "选择提供商",
- "accessToken": "访问令牌",
- "tokenRequired": "访问令牌是必填项",
- "tokenHint": "请输入您的 ModelScope 访问令牌"
- },
- "buttons": {
- "cancel": "取消",
- "previous": "上一步",
- "next": "下一步",
- "sync": "开始同步",
- "getToken": "获取令牌"
- },
- "status": {
- "selectProvider": "请选择一个 MCP 服务器提供商",
- "enterToken": "请输入访问令牌以继续",
- "readyToSync": "准备同步服务器配置"
- },
- "messages": {
- "syncSuccess": "MCP 服务器同步成功!",
- "syncError": "同步失败: {error}",
- "tokenHelp": "如何获取 ModelScope 访问令牌?点击右侧按钮查看说明"
- }
- }
+ "confirmDelete": "确定要删除服务器 {name} 吗?"
},
"syncProvider": {
+ "title": "同步外部平台 MCP 服务器",
+ "subtitle": "从提供商同步 MCP 服务器配置到本地",
+ "steps": {
+ "selectProvider": "步骤 1: 选择提供商",
+ "configureAuth": "步骤 2: 配置认证",
+ "syncServers": "步骤 3: 同步服务器"
+ },
+ "providers": {
+ "modelscope": "ModelScope",
+ "mcprouter": "MCPRouter",
+ "description": "ModelScope 是一个开源的模型社区,提供各种机器学习和AI服务的MCP服务器"
+ },
+ "fields": {
+ "provider": "选择提供商",
+ "accessToken": "访问令牌",
+ "apiKey": "API Key",
+ "appUrl": "HTTP-Referer",
+ "appName": "X-Title",
+ "tokenRequired": "访问令牌是必填项",
+ "tokenHint": "请输入您的 ModelScope 访问令牌"
+ },
+ "timeline": {
+ "mcprouter": {
+ "createApiKeyTitle": "创建 API Key",
+ "createApiKeyDescPrefix": "访问 ",
+ "createApiKeyDescSuffix": " 创建并复制您的 API Key。",
+ "inputApiKeyTitle": "输入 API Key",
+ "inputApiKeyDesc": "输入 API Key 以同步 MCP 服务器。",
+ "optionalAppInfoTitle": "可选:应用标识",
+ "optionalAppInfoDesc": "部分 MCPRouter 服务可能会校验以下标识(可留空)。"
+ }
+ },
"buttons": {
+ "cancel": "取消",
+ "previous": "上一步",
+ "next": "下一步",
+ "sync": "开始同步",
+ "getToken": "获取令牌",
"fetchServers": "获取服务器列表"
},
"status": {
@@ -144,13 +145,17 @@
"enterToken": "请输入访问令牌以继续",
"enterApiKey": "请输入 API Key 以继续",
"fetchServersFirst": "请先获取服务器列表",
- "fetchedServers": "已获取服务器 ({count})"
+ "fetchedServers": "已获取服务器 ({count})",
+ "readyToSync": "准备同步服务器配置"
},
"messages": {
"syncSuccess": "MCP 服务器同步成功!",
"syncError": "同步失败: {error}",
"fetchServersSuccess": "已获取 {count} 个服务器",
- "fetchServersError": "获取服务器列表失败: {error}"
+ "fetchServersError": "获取服务器列表失败: {error}",
+ "networkOrApiKeyIssue": "网络连接异常或 API Key 无效",
+ "networkOrTokenIssue": "网络连接异常或访问令牌无效",
+ "tokenHelp": "如何获取 ModelScope 访问令牌?点击右侧按钮查看说明"
}
},
"messages": {
From 6a11367c67ec824ab50b3fa51b2d1dbdc7f71b6c Mon Sep 17 00:00:00 2001
From: xiaoxi68 <3520824673@qq.com>
Date: Mon, 2 Mar 2026 23:32:06 +0800
Subject: [PATCH 5/5] 1
---
astrbot/core/provider/func_tool_manager.py | 108 +----
astrbot/core/provider/mcp_sync_providers.py | 472 ++++++++++++--------
2 files changed, 282 insertions(+), 298 deletions(-)
diff --git a/astrbot/core/provider/func_tool_manager.py b/astrbot/core/provider/func_tool_manager.py
index 5d32e6761..e160d1402 100644
--- a/astrbot/core/provider/func_tool_manager.py
+++ b/astrbot/core/provider/func_tool_manager.py
@@ -9,7 +9,11 @@
from astrbot import logger
from astrbot.core import sp
-from astrbot.core.agent.mcp_client import MCPClient, MCPTool
+from astrbot.core.agent.mcp_client import (
+ MCPClient,
+ MCPTool,
+ _quick_test_mcp_connection,
+)
from astrbot.core.agent.tool import FunctionTool, ToolSet
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
@@ -39,108 +43,6 @@
FuncTool = FunctionTool
-def _prepare_config(config: dict) -> dict:
- """准备配置,处理嵌套格式"""
- if config.get("mcpServers"):
- first_key = next(iter(config["mcpServers"]))
- config = config["mcpServers"][first_key]
- config.pop("active", None)
- return config
-
-
-async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
- """快速测试 MCP 服务器可达性"""
- import json
-
- import aiohttp
- import mcp
-
- cfg = _prepare_config(config.copy())
-
- url = cfg["url"]
- headers = cfg.get("headers", {})
- timeout = cfg.get("timeout", 10)
-
- async def _format_http_error(response: aiohttp.ClientResponse) -> str:
- reason = response.reason or ""
- detail = ""
- try:
- raw = await response.content.read(2048)
- if raw:
- text = raw.decode(errors="replace").strip()
- if text:
- try:
- data = json.loads(text)
- except Exception:
- detail = text
- else:
- if isinstance(data, dict):
- msg = (
- data.get("message")
- or data.get("error")
- or data.get("detail")
- )
- code = data.get("code")
- if msg is not None:
- detail = (
- f"{code}: {msg}" if code is not None else str(msg)
- )
- else:
- detail = text
- else:
- detail = text
- except Exception:
- detail = ""
-
- if detail:
- return f"HTTP {response.status}: {reason} ({detail})"
- return f"HTTP {response.status}: {reason}"
-
- try:
- async with aiohttp.ClientSession() as session:
- if cfg.get("transport") == "streamable_http":
- test_payload = {
- "jsonrpc": "2.0",
- "method": "initialize",
- "id": 0,
- "params": {
- "protocolVersion": mcp.types.LATEST_PROTOCOL_VERSION,
- "capabilities": {},
- "clientInfo": {"name": "test-client", "version": "1.2.3"},
- },
- }
- async with session.post(
- url,
- headers={
- **headers,
- "Content-Type": "application/json",
- "Accept": "application/json, text/event-stream",
- },
- json=test_payload,
- timeout=aiohttp.ClientTimeout(total=timeout),
- ) as response:
- if response.status == 200:
- return True, ""
- return False, await _format_http_error(response)
- else:
- async with session.get(
- url,
- headers={
- **headers,
- "Accept": "application/json, text/event-stream",
- },
- timeout=aiohttp.ClientTimeout(total=timeout),
- ) as response:
- if response.status == 200:
- return True, ""
- return False, await _format_http_error(response)
-
- except asyncio.TimeoutError:
- return False, f"连接超时: {timeout}秒"
- except Exception as e:
- return False, f"{e!s}"
-
-
class FunctionToolManager:
def __init__(self) -> None:
self.func_list: list[FuncTool] = []
diff --git a/astrbot/core/provider/mcp_sync_providers.py b/astrbot/core/provider/mcp_sync_providers.py
index 0a8ad9053..57f76334e 100644
--- a/astrbot/core/provider/mcp_sync_providers.py
+++ b/astrbot/core/provider/mcp_sync_providers.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import abc
+from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any, ClassVar
@@ -21,7 +22,7 @@ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
raise NotImplementedError
async def list_servers(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
- raise NotImplementedError
+ return []
_provider_registry: dict[str, type[McpServerSyncProvider]] = {}
@@ -177,6 +178,239 @@ def _build_api_headers(
headers["X-Title"] = app_name
return headers
+ def _build_mcp_headers(
+ self,
+ *,
+ api_key: str,
+ app_url: str,
+ app_name: str,
+ ) -> dict[str, str]:
+ return self._build_api_headers(
+ api_key=api_key,
+ app_url=app_url,
+ app_name=app_name,
+ )
+
+ @staticmethod
+ def _parse_server_keys(value: Any) -> list[str]:
+ if isinstance(value, list):
+ parts = [str(item).strip() for item in value]
+ elif isinstance(value, str):
+ raw = value.replace(",", "\n").replace(";", "\n")
+ parts = [line.strip() for line in raw.splitlines()]
+ else:
+ return []
+
+ keys = [item for item in parts if item]
+ seen: set[str] = set()
+ result: list[str] = []
+ for item in keys:
+ if item in seen:
+ continue
+ seen.add(item)
+ result.append(item)
+ return result
+
+ @staticmethod
+ def _matches(server: dict[str, Any], q: str) -> bool:
+ if not q:
+ return True
+ haystacks = [
+ server.get("config_name"),
+ server.get("server_key"),
+ server.get("name"),
+ server.get("title"),
+ server.get("description"),
+ server.get("author_name"),
+ ]
+ combined = " ".join(str(v) for v in haystacks if v)
+ return q in combined.lower()
+
+ @staticmethod
+ def _resolve_server_name(
+ server: dict[str, Any],
+ *,
+ fallback: str | None = None,
+ ) -> str | None:
+ return (
+ server.get("config_name")
+ or server.get("server_key")
+ or server.get("name")
+ or server.get("title")
+ or fallback
+ )
+
+ def _make_item(
+ self,
+ *,
+ name: str,
+ url: str,
+ used_names: set[str],
+ headers: dict[str, str],
+ server_key: str | None = None,
+ ) -> SyncedMcpServer:
+ final_name = name
+ if final_name in used_names:
+ suffix = server_key or "dup"
+ final_name = f"{final_name}-{suffix}"
+ i = 2
+ while final_name in used_names:
+ final_name = f"{name}-{i}"
+ i += 1
+ used_names.add(final_name)
+ return SyncedMcpServer(
+ name=final_name,
+ config={
+ "url": url,
+ "transport": "streamable_http",
+ "headers": headers,
+ "active": True,
+ "provider": "mcprouter",
+ },
+ )
+
+ async def _iter_list_servers_batches(
+ self,
+ *,
+ list_url: str,
+ api_headers: dict[str, str],
+ limit: int,
+ max_pages: int,
+ ) -> AsyncGenerator[list[dict[str, Any]], None]:
+ timeout = aiohttp.ClientTimeout(total=30)
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ for page in range(1, max_pages + 1):
+ data = await self._post_json(
+ session=session,
+ url=list_url,
+ payload={"page": page, "limit": limit},
+ headers=api_headers,
+ action="MCPRouter list-servers",
+ )
+ self._ensure_api_success(data, action="MCPRouter list-servers")
+ raw_batch = data.get("data", {}).get("servers", []) or []
+ if not raw_batch:
+ break
+ batch = [item for item in raw_batch if isinstance(item, dict)]
+ if batch:
+ yield batch
+ if len(raw_batch) < limit:
+ break
+
+ async def _fetch_from_provided_servers(
+ self,
+ *,
+ provided_servers: list[Any],
+ raw_max_servers: Any,
+ max_servers: int,
+ mcp_headers: dict[str, str],
+ ) -> list[SyncedMcpServer]:
+ used_names: set[str] = set()
+ items: list[SyncedMcpServer] = []
+ selected_servers = (
+ provided_servers[:max_servers]
+ if raw_max_servers is not None
+ else provided_servers
+ )
+ for server in selected_servers:
+ if not isinstance(server, dict):
+ continue
+ server_name = self._resolve_server_name(server)
+ server_url = server.get("server_url")
+ if not server_name or not server_url:
+ continue
+ server_key = server.get("server_key")
+ items.append(
+ self._make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ headers=mcp_headers,
+ server_key=str(server_key) if server_key else None,
+ )
+ )
+ return items
+
+ async def _fetch_from_server_keys(
+ self,
+ *,
+ server_keys: list[str],
+ max_servers: int,
+ get_url: str,
+ api_headers: dict[str, str],
+ mcp_headers: dict[str, str],
+ ) -> list[SyncedMcpServer]:
+ timeout = aiohttp.ClientTimeout(total=30)
+ used_names: set[str] = set()
+ items: list[SyncedMcpServer] = []
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ for server_key in server_keys[:max_servers]:
+ data = await self._post_json(
+ session=session,
+ url=get_url,
+ payload={"server": server_key},
+ headers=api_headers,
+ action="MCPRouter get-server",
+ )
+ self._ensure_api_success(data, action="MCPRouter get-server")
+ server = data.get("data") or {}
+ if not isinstance(server, dict):
+ continue
+ server_url = server.get("server_url")
+ server_name = self._resolve_server_name(server, fallback=server_key)
+ if not server_url or not server_name:
+ continue
+ items.append(
+ self._make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ headers=mcp_headers,
+ server_key=server_key,
+ )
+ )
+ return items
+
+ async def _fetch_from_listing(
+ self,
+ *,
+ list_url: str,
+ api_headers: dict[str, str],
+ mcp_headers: dict[str, str],
+ query: str,
+ max_servers: int,
+ limit: int,
+ max_pages: int,
+ ) -> list[SyncedMcpServer]:
+ used_names: set[str] = set()
+ items: list[SyncedMcpServer] = []
+ async for batch in self._iter_list_servers_batches(
+ list_url=list_url,
+ api_headers=api_headers,
+ limit=limit,
+ max_pages=max_pages,
+ ):
+ for server in batch:
+ if not self._matches(server, query):
+ continue
+ server_url = server.get("server_url")
+ server_name = self._resolve_server_name(server)
+ if not server_url or not server_name:
+ continue
+ server_key = server.get("server_key")
+ items.append(
+ self._make_item(
+ name=str(server_name),
+ url=str(server_url),
+ used_names=used_names,
+ headers=mcp_headers,
+ server_key=str(server_key) if server_key else None,
+ )
+ )
+ if len(items) >= max_servers:
+ return items
+ return items
+
async def _validate_api_key(
self,
*,
@@ -231,6 +465,11 @@ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
app_url=app_url,
app_name=app_name,
)
+ mcp_headers = self._build_mcp_headers(
+ api_key=api_key,
+ app_url=app_url,
+ app_name=app_name,
+ )
query = str(payload.get("query", "")).strip().lower()
raw_max_servers = payload.get("max_servers")
@@ -243,179 +482,34 @@ async def fetch(self, payload: dict[str, Any]) -> list[SyncedMcpServer]:
max_pages = int(payload.get("max_pages", 10) or 10)
max_pages = max(1, min(max_pages, 50))
- mcp_headers: dict[str, str] = {
- "Authorization": f"Bearer {api_key}",
- "Content-Type": "application/json",
- }
- if app_url:
- mcp_headers["HTTP-Referer"] = app_url
- if app_name:
- mcp_headers["X-Title"] = app_name
-
- def parse_server_keys(value: Any) -> list[str]:
- if isinstance(value, list):
- parts = [str(item).strip() for item in value]
- elif isinstance(value, str):
- raw = value.replace(",", "\n").replace(";", "\n")
- parts = [line.strip() for line in raw.splitlines()]
- else:
- return []
- keys = [item for item in parts if item]
- seen: set[str] = set()
- result: list[str] = []
- for item in keys:
- if item in seen:
- continue
- seen.add(item)
- result.append(item)
- return result
-
- def matches(server: dict[str, Any], q: str) -> bool:
- if not q:
- return True
- haystacks = [
- server.get("config_name"),
- server.get("server_key"),
- server.get("name"),
- server.get("title"),
- server.get("description"),
- server.get("author_name"),
- ]
- combined = " ".join(str(v) for v in haystacks if v)
- return q in combined.lower()
-
- def make_item(
- *,
- name: str,
- url: str,
- used_names: set[str],
- server_key: str | None = None,
- ) -> SyncedMcpServer:
- final_name = name
- if final_name in used_names:
- suffix = server_key or "dup"
- final_name = f"{final_name}-{suffix}"
- i = 2
- while final_name in used_names:
- final_name = f"{name}-{i}"
- i += 1
- used_names.add(final_name)
- return SyncedMcpServer(
- name=final_name,
- config={
- "url": url,
- "transport": "streamable_http",
- "headers": mcp_headers,
- "active": True,
- "provider": "mcprouter",
- },
- )
-
- timeout = aiohttp.ClientTimeout(total=30)
- used_names: set[str] = set()
- items: list[SyncedMcpServer] = []
-
provided_servers = payload.get("servers")
if isinstance(provided_servers, list) and provided_servers:
- selected_servers = (
- provided_servers[:max_servers]
- if raw_max_servers is not None
- else provided_servers
+ return await self._fetch_from_provided_servers(
+ provided_servers=provided_servers,
+ raw_max_servers=raw_max_servers,
+ max_servers=max_servers,
+ mcp_headers=mcp_headers,
)
- for server in selected_servers:
- if not isinstance(server, dict):
- continue
- server_key = server.get("server_key")
- server_name = (
- server.get("config_name")
- or server_key
- or server.get("name")
- or server.get("title")
- )
- server_url = server.get("server_url")
- if not server_name or not server_url:
- continue
- items.append(
- make_item(
- name=str(server_name),
- url=str(server_url),
- used_names=used_names,
- server_key=str(server_key) if server_key else None,
- )
- )
- return items
- server_keys = parse_server_keys(payload.get("server_keys"))
+ server_keys = self._parse_server_keys(payload.get("server_keys"))
if server_keys:
- async with aiohttp.ClientSession(timeout=timeout) as session:
- for server_key in server_keys[:max_servers]:
- data = await self._post_json(
- session=session,
- url=get_url,
- payload={"server": server_key},
- headers=api_headers,
- action="MCPRouter get-server",
- )
- self._ensure_api_success(data, action="MCPRouter get-server")
- server = data.get("data") or {}
- server_url = server.get("server_url")
- if not server_url:
- continue
- server_name = (
- server.get("config_name")
- or server.get("server_key")
- or server.get("name")
- or server_key
- )
- items.append(
- make_item(
- name=str(server_name),
- url=str(server_url),
- used_names=used_names,
- server_key=server_key,
- )
- )
- return items
-
- async with aiohttp.ClientSession(timeout=timeout) as session:
- for page in range(1, max_pages + 1):
- data = await self._post_json(
- session=session,
- url=list_url,
- payload={"page": page, "limit": limit},
- headers=api_headers,
- action="MCPRouter list-servers",
- )
- self._ensure_api_success(data, action="MCPRouter list-servers")
- batch = data.get("data", {}).get("servers", []) or []
- if not batch:
- break
- for server in batch:
- if not matches(server, query):
- continue
- server_url = server.get("server_url")
- if not server_url:
- continue
- server_key = server.get("server_key")
- server_name = (
- server.get("config_name") or server_key or server.get("name")
- )
- if not server_name:
- continue
- items.append(
- make_item(
- name=str(server_name),
- url=str(server_url),
- used_names=used_names,
- server_key=str(server_key) if server_key else None,
- )
- )
- if len(items) >= max_servers:
- return items
- if len(batch) < limit:
- break
+ return await self._fetch_from_server_keys(
+ server_keys=server_keys,
+ max_servers=max_servers,
+ get_url=get_url,
+ api_headers=api_headers,
+ mcp_headers=mcp_headers,
+ )
- return items
+ return await self._fetch_from_listing(
+ list_url=list_url,
+ api_headers=api_headers,
+ mcp_headers=mcp_headers,
+ query=query,
+ max_servers=max_servers,
+ limit=limit,
+ max_pages=max_pages,
+ )
async def list_servers(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
api_key = self._normalize_api_key(str(payload.get("api_key", "")))
@@ -447,32 +541,20 @@ async def list_servers(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
max_items = 2000
servers: list[dict[str, Any]] = []
- timeout = aiohttp.ClientTimeout(total=30)
- async with aiohttp.ClientSession(timeout=timeout) as session:
- for page in range(1, max_pages + 1):
- data = await self._post_json(
- session=session,
- url=list_url,
- payload={"page": page, "limit": limit},
- headers=api_headers,
- action="MCPRouter list-servers",
- )
- self._ensure_api_success(data, action="MCPRouter list-servers")
- batch = data.get("data", {}).get("servers", []) or []
- if not batch:
- break
- for item in batch:
- if not isinstance(item, dict):
- continue
- server_url = item.get("server_url")
- server_key = item.get("server_key")
- config_name = item.get("config_name")
- if not server_url or not (server_key or config_name):
- continue
- servers.append(item)
- if len(servers) >= max_items:
- return servers
- if len(batch) < limit:
- break
+ async for batch in self._iter_list_servers_batches(
+ list_url=list_url,
+ api_headers=api_headers,
+ limit=limit,
+ max_pages=max_pages,
+ ):
+ for item in batch:
+ server_url = item.get("server_url")
+ server_key = item.get("server_key")
+ config_name = item.get("config_name")
+ if not server_url or not (server_key or config_name):
+ continue
+ servers.append(item)
+ if len(servers) >= max_items:
+ return servers
return servers