-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: 支持 MCPRouter 平台 MCP 服务器同步 #5690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
cb78531
892bfcd
921878a
d565877
6a11367
a1552da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,10 +19,34 @@ | |
| from .run_context import TContext | ||
| from .tool import FunctionTool | ||
|
|
||
|
|
||
| class _McpSseNoiseFilter(logging.Filter): | ||
| def filter(self, record: logging.LogRecord) -> bool: | ||
| try: | ||
| msg = record.getMessage().strip() | ||
| except Exception: | ||
| return True | ||
| if msg.startswith("Unknown SSE event:"): | ||
| event_name = msg.split(":", 1)[1].strip() | ||
| if event_name in {"stream", "connection"}: | ||
| return False | ||
| return True | ||
|
|
||
|
|
||
| def _install_mcp_noise_filters() -> None: | ||
| for logger_name in ("mcp.client.streamable_http", "mcp.client.sse"): | ||
| log = logging.getLogger(logger_name) | ||
| if any(isinstance(f, _McpSseNoiseFilter) for f in log.filters): | ||
| continue | ||
| log.addFilter(_McpSseNoiseFilter()) | ||
|
|
||
|
|
||
| try: | ||
| import anyio | ||
| import mcp | ||
| from mcp.client.sse import sse_client | ||
|
|
||
| _install_mcp_noise_filters() | ||
| except (ModuleNotFoundError, ImportError): | ||
| logger.warning( | ||
| "Warning: Missing 'mcp' dependency, MCP services will be unavailable." | ||
|
|
@@ -47,6 +71,8 @@ def _prepare_config(config: dict) -> dict: | |
|
|
||
| async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]: | ||
| """Quick test MCP server connectivity""" | ||
| import json | ||
|
|
||
| import aiohttp | ||
|
|
||
| cfg = _prepare_config(config.copy()) | ||
|
|
@@ -55,6 +81,40 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]: | |
| headers = cfg.get("headers", {}) | ||
| timeout = cfg.get("timeout", 10) | ||
|
|
||
| async def _format_http_error(response: aiohttp.ClientResponse) -> str: | ||
| reason = response.reason or "" | ||
| detail = "" | ||
| try: | ||
| raw = await response.content.read(2048) | ||
| if raw: | ||
| text = raw.decode(errors="replace").strip() | ||
| if text: | ||
| try: | ||
| data = json.loads(text) | ||
| except Exception: | ||
| detail = text | ||
| else: | ||
| if isinstance(data, dict): | ||
| msg = ( | ||
| data.get("message") | ||
| or data.get("error") | ||
| or data.get("detail") | ||
| ) | ||
| code = data.get("code") | ||
| if msg is not None: | ||
| detail = ( | ||
| f"{code}: {msg}" if code is not None else str(msg) | ||
| ) | ||
| else: | ||
| detail = text | ||
| else: | ||
| detail = text | ||
| except Exception: | ||
| detail = "" | ||
| if detail: | ||
| return f"HTTP {response.status}: {reason} ({detail})" | ||
| return f"HTTP {response.status}: {reason}" | ||
|
|
||
| try: | ||
| if "transport" in cfg: | ||
| transport_type = cfg["transport"] | ||
|
|
@@ -70,7 +130,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]: | |
| "method": "initialize", | ||
| "id": 0, | ||
| "params": { | ||
| "protocolVersion": "2024-11-05", | ||
| "protocolVersion": mcp.types.LATEST_PROTOCOL_VERSION, | ||
| "capabilities": {}, | ||
| "clientInfo": {"name": "test-client", "version": "1.2.3"}, | ||
| }, | ||
|
|
@@ -87,7 +147,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]: | |
| ) as response: | ||
| if response.status == 200: | ||
| return True, "" | ||
| return False, f"HTTP {response.status}: {response.reason}" | ||
| return False, await _format_http_error(response) | ||
| else: | ||
| async with session.get( | ||
| url, | ||
|
|
@@ -99,7 +159,7 @@ async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]: | |
| ) as response: | ||
| if response.status == 200: | ||
| return True, "" | ||
| return False, f"HTTP {response.status}: {response.reason}" | ||
| return False, await _format_http_error(response) | ||
|
|
||
| except asyncio.TimeoutError: | ||
| return False, f"Connection timeout: {timeout} seconds" | ||
|
|
@@ -146,6 +206,11 @@ async def connect_to_server(self, mcp_server_config: dict, name: str) -> None: | |
|
|
||
| def logging_callback(msg: str) -> None: | ||
| # Handle MCP service error logs | ||
| normalized = msg.strip() | ||
| if normalized.startswith("Unknown SSE event:"): | ||
| event_name = normalized.split(":", 1)[1].strip() | ||
| if event_name in {"stream", "connection"}: | ||
| return | ||
| print(f"MCP Server {name} Error: {msg}") | ||
| self.server_errlogs.append(msg) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这段过滤SSE噪音事件的逻辑与文件顶部的
_McpSseNoiseFilter类中的逻辑重复了。为了提高代码的可维护性和复用性,建议将这部分逻辑提取到一个独立的辅助函数中,供两处调用。另外,当前的
split操作没有处理不包含':'的字符串,可能会导致IndexError。建议增加对split结果的长度检查以增强代码的健壮性,并可以简化逻辑。