Skip to content

Commit 84a0335

Browse files
committed
DPL: add MCP server for DPL
Allows debugging a DPL workflow using Claude, ChatGPT or similar tools.
1 parent 060a5fa commit 84a0335

File tree

3 files changed

+398
-0
lines changed

3 files changed

+398
-0
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# DPL Status MCP Server
2+
3+
An MCP server that connects to a running DPL driver's `/status` WebSocket endpoint and exposes its device state and metrics as tools for an AI assistant (e.g. Claude).
4+
5+
## Requirements
6+
7+
```bash
8+
pip install mcp websockets
9+
# or install the package directly:
10+
pip install ./Framework/Core/scripts/dpl-mcp-server/
11+
```
12+
13+
## Running
14+
15+
The driver port defaults to `8080`. Override with `--port`, `--pid`, or `DPL_STATUS_PORT`:
16+
17+
```bash
18+
python3 dpl_mcp_server.py --port 8080
19+
python3 dpl_mcp_server.py --pid 12345 # port = 8080 + pid % 30000
20+
DPL_STATUS_PORT=8080 python3 dpl_mcp_server.py
21+
```
22+
23+
If installed as a package:
24+
25+
```bash
26+
dpl-mcp-server --pid $(pgrep -f diamond-workflow | head -1)
27+
```
28+
29+
## Claude Code integration
30+
31+
Add to `.mcp.json` in your project (or `~/.claude.json` for global use):
32+
33+
```json
34+
{
35+
"mcpServers": {
36+
"dpl": {
37+
"command": "dpl-mcp-server",
38+
"args": ["--pid", "12345"]
39+
}
40+
}
41+
}
42+
```
43+
44+
Or with `claude mcp add`:
45+
46+
```bash
47+
claude mcp add dpl -- dpl-mcp-server --pid 12345
48+
```
49+
50+
## Available tools
51+
52+
| Tool | Description |
53+
|------|-------------|
54+
| `list_devices` | List all devices with pid, active flag, streaming and device state |
55+
| `list_metrics(device)` | List numeric metrics available for a device |
56+
| `subscribe(device, metrics)` | Subscribe to metrics; driver will push updates when they change |
57+
| `unsubscribe(device, metrics)` | Stop receiving updates for specific metrics |
58+
| `get_updates(max_updates)` | Drain buffered update frames (default: up to 50) |
59+
60+
## Protocol
61+
62+
The driver sends a snapshot on connect, then pushes updates only for subscribed metrics that changed each processing cycle. There is no polling — updates arrive in real time as the workflow runs.
63+
64+
```
65+
connect → {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]}
66+
67+
client → {"cmd":"list_metrics","device":"producer"}
68+
driver → {"type":"metrics_list","device":"producer","metrics":["input-parts","output-bytes",...]}
69+
70+
client → {"cmd":"subscribe","device":"producer","metrics":["output-bytes"]}
71+
driver → {"type":"update","device":0,"name":"producer","metrics":{"output-bytes":1048576}}
72+
(pushed every cycle in which output-bytes changed)
73+
74+
client → {"cmd":"unsubscribe","device":"producer","metrics":["output-bytes"]}
75+
```
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2019-2026 CERN and copyright holders of ALICE O2.
3+
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
4+
# All rights not expressly granted are reserved.
5+
#
6+
# This software is distributed under the terms of the GNU General Public
7+
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8+
#
9+
# In applying this license CERN does not waive the privileges and immunities
10+
# granted to it by virtue of its status as an Intergovernmental Organization
11+
# or submit itself to any jurisdiction.
12+
"""DPL status MCP server.
13+
14+
Bridges the DPL driver /status WebSocket endpoint to MCP tools so that an
15+
AI assistant (e.g. Claude) can inspect and monitor a running DPL workflow.
16+
17+
Usage
18+
-----
19+
python3 dpl_mcp_server.py --port 8080
20+
python3 dpl_mcp_server.py --pid 12345 # port derived as 8080 + pid % 30000
21+
DPL_STATUS_PORT=8080 python3 dpl_mcp_server.py
22+
23+
Wire protocol (client → driver)
24+
--------------------------------
25+
{"cmd":"list_metrics","device":"<name>"}
26+
{"cmd":"subscribe","device":"<name>","metrics":["m1","m2"]}
27+
{"cmd":"unsubscribe","device":"<name>","metrics":["m1"]}
28+
29+
Wire protocol (driver → client)
30+
--------------------------------
31+
{"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]}
32+
{"type":"update","device":<idx>,"name":"<name>","metrics":{<name:value,...>}}
33+
{"type":"metrics_list","device":"<name>","metrics":["m1","m2",...]}
34+
"""
35+
36+
from __future__ import annotations
37+
38+
import argparse
39+
import asyncio
40+
import json
41+
import os
42+
import sys
43+
from typing import Any
44+
45+
import websockets
46+
from mcp.server.fastmcp import FastMCP
47+
48+
# ---------------------------------------------------------------------------
49+
# Global connection state (all access from the single asyncio event loop)
50+
# ---------------------------------------------------------------------------
51+
_port: int = 8080
52+
_ws: Any = None
53+
_reader_task: asyncio.Task | None = None
54+
_snapshot: dict = {}
55+
_updates: list[dict] = []
56+
_logs: list[dict] = []
57+
_metrics_lists: dict[str, list[str]] = {}
58+
59+
60+
async def _ensure_connected() -> None:
61+
"""Connect (or reconnect) to the driver's /status WebSocket."""
62+
global _ws, _reader_task
63+
64+
# Check liveness of existing connection.
65+
if _ws is not None:
66+
try:
67+
pong = await asyncio.wait_for(_ws.ping(), timeout=2.0)
68+
await pong
69+
return
70+
except Exception:
71+
_ws = None
72+
if _reader_task is not None and not _reader_task.done():
73+
_reader_task.cancel()
74+
_reader_task = None
75+
76+
url = f"ws://localhost:{_port}/status"
77+
_ws = await websockets.connect(url, subprotocols=["dpl"])
78+
if _reader_task is None or _reader_task.done():
79+
_reader_task = asyncio.create_task(_reader())
80+
81+
82+
async def _reader() -> None:
83+
"""Background task: read frames from the driver and buffer them."""
84+
global _ws, _snapshot, _updates, _logs, _metrics_lists
85+
try:
86+
async for raw in _ws:
87+
try:
88+
msg = json.loads(raw)
89+
except json.JSONDecodeError:
90+
continue
91+
t = msg.get("type")
92+
if t == "snapshot":
93+
_snapshot = msg
94+
# Clear stale metric lists from a previous driver instance.
95+
_metrics_lists.clear()
96+
elif t == "update":
97+
_updates.append(msg)
98+
elif t == "log":
99+
_logs.append(msg)
100+
elif t == "metrics_list":
101+
device = msg.get("device", "")
102+
_metrics_lists[device] = msg.get("metrics", [])
103+
except Exception:
104+
pass
105+
finally:
106+
_ws = None
107+
108+
109+
async def _send(obj: dict) -> None:
110+
await _ensure_connected()
111+
await _ws.send(json.dumps(obj, separators=(",", ":")))
112+
113+
114+
# ---------------------------------------------------------------------------
115+
# MCP server definition
116+
# ---------------------------------------------------------------------------
117+
mcp = FastMCP("DPL Status")
118+
119+
120+
@mcp.tool()
121+
async def list_devices() -> str:
122+
"""List all DPL devices with their current status.
123+
124+
Returns each device's name, PID, active flag, streaming state, and device
125+
state as reported by the driver snapshot.
126+
"""
127+
await _ensure_connected()
128+
if not _snapshot:
129+
return "No snapshot received yet — the driver may still be starting."
130+
devices = _snapshot.get("devices", [])
131+
if not devices:
132+
return "No devices in snapshot."
133+
lines = []
134+
for d in devices:
135+
lines.append(
136+
f"{d['name']}: pid={d['pid']} active={d['active']} "
137+
f"streaming={d['streamingState']} state={d['deviceState']}"
138+
)
139+
return "\n".join(lines)
140+
141+
142+
@mcp.tool()
143+
async def list_metrics(device: str) -> str:
144+
"""List the available numeric metrics for a DPL device.
145+
146+
Sends a list_metrics command to the driver and waits up to 3 seconds for
147+
the reply. Only numeric metrics (int, float, uint64) are included; string
148+
and enum metrics are excluded.
149+
150+
Args:
151+
device: Device name exactly as shown by list_devices.
152+
"""
153+
# Remove any stale cached result so we can detect the fresh reply.
154+
_metrics_lists.pop(device, None)
155+
await _send({"cmd": "list_metrics", "device": device})
156+
for _ in range(60): # up to 3 s
157+
await asyncio.sleep(0.05)
158+
if device in _metrics_lists:
159+
names = _metrics_lists[device]
160+
if not names:
161+
return f"Device '{device}' has no numeric metrics yet."
162+
return f"{len(names)} metric(s): " + ", ".join(names)
163+
return f"No reply from driver for device '{device}' (timeout)."
164+
165+
166+
@mcp.tool()
167+
async def subscribe(device: str, metrics: list[str]) -> str:
168+
"""Subscribe to one or more metrics for a DPL device.
169+
170+
After subscribing, the driver will push update frames for the device
171+
whenever any of the subscribed metrics change. Use get_updates to drain
172+
the buffer.
173+
174+
Args:
175+
device: Device name exactly as shown by list_devices.
176+
metrics: List of metric names to subscribe to (from list_metrics).
177+
"""
178+
await _send({"cmd": "subscribe", "device": device, "metrics": metrics})
179+
return f"Subscribed to {len(metrics)} metric(s) for '{device}': {', '.join(metrics)}"
180+
181+
182+
@mcp.tool()
183+
async def unsubscribe(device: str, metrics: list[str]) -> str:
184+
"""Stop receiving updates for specific metrics of a DPL device.
185+
186+
Args:
187+
device: Device name exactly as shown by list_devices.
188+
metrics: List of metric names to unsubscribe from.
189+
"""
190+
await _send({"cmd": "unsubscribe", "device": device, "metrics": metrics})
191+
return f"Unsubscribed from {len(metrics)} metric(s) for '{device}'."
192+
193+
194+
@mcp.tool()
195+
async def subscribe_logs(device: str) -> str:
196+
"""Subscribe to log output for a DPL device.
197+
198+
After subscribing, new log lines from the device will be buffered and
199+
can be retrieved with get_logs().
200+
201+
Args:
202+
device: Device name exactly as shown by list_devices.
203+
"""
204+
await _send({"cmd": "subscribe_logs", "device": device})
205+
return f"Subscribed to logs for '{device}'."
206+
207+
208+
@mcp.tool()
209+
async def unsubscribe_logs(device: str) -> str:
210+
"""Stop receiving log output for a DPL device.
211+
212+
Args:
213+
device: Device name exactly as shown by list_devices.
214+
"""
215+
await _send({"cmd": "unsubscribe_logs", "device": device})
216+
return f"Unsubscribed from logs for '{device}'."
217+
218+
219+
@mcp.tool()
220+
async def get_logs(max_lines: int = 100) -> str:
221+
"""Drain and return buffered log lines received since the last call.
222+
223+
Args:
224+
max_lines: Maximum number of log lines to return (default 100).
225+
"""
226+
await _ensure_connected()
227+
batch = _logs[:max_lines]
228+
del _logs[:max_lines]
229+
if not batch:
230+
return "No buffered log lines."
231+
lines = []
232+
for entry in batch:
233+
device = entry.get("device", "?")
234+
level = entry.get("level", "?")
235+
line = entry.get("line", "")
236+
lines.append(f"[{device}][{level}] {line}")
237+
return "\n".join(lines)
238+
239+
240+
@mcp.tool()
241+
async def get_updates(max_updates: int = 50) -> str:
242+
"""Drain and return buffered metric update frames received since the last call.
243+
244+
Each frame contains the latest values of all subscribed metrics that
245+
changed during that processing cycle. Calling this repeatedly gives a
246+
time-ordered view of metric evolution.
247+
248+
Args:
249+
max_updates: Maximum number of update frames to return (default 50).
250+
"""
251+
await _ensure_connected()
252+
batch = _updates[:max_updates]
253+
del _updates[:max_updates]
254+
if not batch:
255+
return "No buffered updates."
256+
lines = []
257+
for upd in batch:
258+
name = upd.get("name") or f"device[{upd.get('device', '?')}]"
259+
metrics = upd.get("metrics", {})
260+
if metrics:
261+
parts = ", ".join(f"{k}={v}" for k, v in metrics.items())
262+
lines.append(f"{name}: {parts}")
263+
else:
264+
lines.append(f"{name}: (empty update)")
265+
return "\n".join(lines)
266+
267+
268+
# ---------------------------------------------------------------------------
269+
# Entry point
270+
# ---------------------------------------------------------------------------
271+
def main() -> None:
272+
global _port
273+
274+
parser = argparse.ArgumentParser(
275+
description="DPL status MCP server — expose DPL driver metrics via MCP tools"
276+
)
277+
group = parser.add_mutually_exclusive_group()
278+
group.add_argument(
279+
"--port",
280+
type=int,
281+
default=None,
282+
help="TCP port of the DPL driver status WebSocket (default: 8080 or DPL_STATUS_PORT env var)",
283+
)
284+
group.add_argument(
285+
"--pid",
286+
type=int,
287+
default=None,
288+
help="PID of the DPL driver process; port is derived as 8080 + pid %% 30000",
289+
)
290+
args = parser.parse_args()
291+
292+
if args.pid is not None:
293+
_port = 8080 + args.pid % 30000
294+
elif args.port is not None:
295+
_port = args.port
296+
elif "DPL_STATUS_PORT" in os.environ:
297+
_port = int(os.environ["DPL_STATUS_PORT"])
298+
# else leave _port at the default 8080
299+
300+
mcp.run()
301+
302+
303+
if __name__ == "__main__":
304+
main()

0 commit comments

Comments
 (0)