Skip to content

Commit c04a597

Browse files
committed
DPL: add MCP server for DPL
Allows debugging a DPL workflow using Claude, ChatGPT or similar tools.
1 parent 78dd799 commit c04a597

File tree

3 files changed

+349
-0
lines changed

3 files changed

+349
-0
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# DPL Status MCP Server
2+
3+
An MCP server that connects to a running DPL driver's `/status` WebSocket endpoint and exposes its device state and metrics as tools for an AI assistant (e.g. Claude).
4+
5+
## Requirements
6+
7+
```bash
8+
pip install mcp websockets
9+
# or install the package directly:
10+
pip install ./Framework/Core/scripts/dpl-mcp-server/
11+
```
12+
13+
## Running
14+
15+
The driver port defaults to `8080`. Override with `--port`, `--pid`, or `DPL_STATUS_PORT`:
16+
17+
```bash
18+
python3 dpl_mcp_server.py --port 8080
19+
python3 dpl_mcp_server.py --pid 12345 # port = 8080 + pid % 30000
20+
DPL_STATUS_PORT=8080 python3 dpl_mcp_server.py
21+
```
22+
23+
If installed as a package:
24+
25+
```bash
26+
dpl-mcp-server --pid $(pgrep -f diamond-workflow | head -1)
27+
```
28+
29+
## Claude Code integration
30+
31+
Add to `.mcp.json` in your project (or `~/.claude.json` for global use):
32+
33+
```json
34+
{
35+
"mcpServers": {
36+
"dpl": {
37+
"command": "dpl-mcp-server",
38+
"args": ["--pid", "12345"]
39+
}
40+
}
41+
}
42+
```
43+
44+
Or with `claude mcp add`:
45+
46+
```bash
47+
claude mcp add dpl -- dpl-mcp-server --pid 12345
48+
```
49+
50+
## Available tools
51+
52+
| Tool | Description |
53+
|------|-------------|
54+
| `list_devices` | List all devices with pid, active flag, streaming and device state |
55+
| `list_metrics(device)` | List numeric metrics available for a device |
56+
| `subscribe(device, metrics)` | Subscribe to metrics; driver will push updates when they change |
57+
| `unsubscribe(device, metrics)` | Stop receiving updates for specific metrics |
58+
| `get_updates(max_updates)` | Drain buffered update frames (default: up to 50) |
59+
60+
## Protocol
61+
62+
The driver sends a snapshot on connect, then pushes updates only for subscribed metrics that changed each processing cycle. There is no polling — updates arrive in real time as the workflow runs.
63+
64+
```
65+
connect → {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]}
66+
67+
client → {"cmd":"list_metrics","device":"producer"}
68+
driver → {"type":"metrics_list","device":"producer","metrics":["input-parts","output-bytes",...]}
69+
70+
client → {"cmd":"subscribe","device":"producer","metrics":["output-bytes"]}
71+
driver → {"type":"update","device":0,"name":"producer","metrics":{"output-bytes":1048576}}
72+
(pushed every cycle in which output-bytes changed)
73+
74+
client → {"cmd":"unsubscribe","device":"producer","metrics":["output-bytes"]}
75+
```
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2019-2026 CERN and copyright holders of ALICE O2.
3+
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
4+
# All rights not expressly granted are reserved.
5+
#
6+
# This software is distributed under the terms of the GNU General Public
7+
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8+
#
9+
# In applying this license CERN does not waive the privileges and immunities
10+
# granted to it by virtue of its status as an Intergovernmental Organization
11+
# or submit itself to any jurisdiction.
12+
"""DPL status MCP server.
13+
14+
Bridges the DPL driver /status WebSocket endpoint to MCP tools so that an
15+
AI assistant (e.g. Claude) can inspect and monitor a running DPL workflow.
16+
17+
Usage
18+
-----
19+
python3 dpl_mcp_server.py --port 8080
20+
python3 dpl_mcp_server.py --pid 12345 # port derived as 8080 + pid % 30000
21+
DPL_STATUS_PORT=8080 python3 dpl_mcp_server.py
22+
23+
Wire protocol (client → driver)
24+
--------------------------------
25+
{"cmd":"list_metrics","device":"<name>"}
26+
{"cmd":"subscribe","device":"<name>","metrics":["m1","m2"]}
27+
{"cmd":"unsubscribe","device":"<name>","metrics":["m1"]}
28+
29+
Wire protocol (driver → client)
30+
--------------------------------
31+
{"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]}
32+
{"type":"update","device":<idx>,"name":"<name>","metrics":{<name:value,...>}}
33+
{"type":"metrics_list","device":"<name>","metrics":["m1","m2",...]}
34+
"""
35+
36+
from __future__ import annotations
37+
38+
import argparse
39+
import asyncio
40+
import json
41+
import os
42+
import sys
43+
from typing import Any
44+
45+
import websockets
46+
from mcp.server.fastmcp import FastMCP
47+
48+
# ---------------------------------------------------------------------------
49+
# Global connection state (all access from the single asyncio event loop)
50+
# ---------------------------------------------------------------------------
51+
_port: int = 8080
52+
_ws: Any = None
53+
_reader_task: asyncio.Task | None = None
54+
_snapshot: dict = {}
55+
_updates: list[dict] = []
56+
_metrics_lists: dict[str, list[str]] = {}
57+
58+
59+
async def _ensure_connected() -> None:
60+
"""Connect (or reconnect) to the driver's /status WebSocket."""
61+
global _ws, _reader_task
62+
63+
# Check liveness of existing connection.
64+
if _ws is not None:
65+
try:
66+
pong = await asyncio.wait_for(_ws.ping(), timeout=2.0)
67+
await pong
68+
return
69+
except Exception:
70+
_ws = None
71+
if _reader_task is not None and not _reader_task.done():
72+
_reader_task.cancel()
73+
_reader_task = None
74+
75+
url = f"ws://localhost:{_port}/status"
76+
_ws = await websockets.connect(url, subprotocols=["dpl"])
77+
if _reader_task is None or _reader_task.done():
78+
_reader_task = asyncio.create_task(_reader())
79+
80+
81+
async def _reader() -> None:
82+
"""Background task: read frames from the driver and buffer them."""
83+
global _ws, _snapshot, _updates, _metrics_lists
84+
try:
85+
async for raw in _ws:
86+
try:
87+
msg = json.loads(raw)
88+
except json.JSONDecodeError:
89+
continue
90+
t = msg.get("type")
91+
if t == "snapshot":
92+
_snapshot = msg
93+
# Clear stale metric lists from a previous driver instance.
94+
_metrics_lists.clear()
95+
elif t == "update":
96+
_updates.append(msg)
97+
elif t == "metrics_list":
98+
device = msg.get("device", "")
99+
_metrics_lists[device] = msg.get("metrics", [])
100+
except Exception:
101+
pass
102+
finally:
103+
_ws = None
104+
105+
106+
async def _send(obj: dict) -> None:
107+
await _ensure_connected()
108+
await _ws.send(json.dumps(obj, separators=(",", ":")))
109+
110+
111+
# ---------------------------------------------------------------------------
112+
# MCP server definition
113+
# ---------------------------------------------------------------------------
114+
mcp = FastMCP("DPL Status")
115+
116+
117+
@mcp.tool()
118+
async def list_devices() -> str:
119+
"""List all DPL devices with their current status.
120+
121+
Returns each device's name, PID, active flag, streaming state, and device
122+
state as reported by the driver snapshot.
123+
"""
124+
await _ensure_connected()
125+
if not _snapshot:
126+
return "No snapshot received yet — the driver may still be starting."
127+
devices = _snapshot.get("devices", [])
128+
if not devices:
129+
return "No devices in snapshot."
130+
lines = []
131+
for d in devices:
132+
lines.append(
133+
f"{d['name']}: pid={d['pid']} active={d['active']} "
134+
f"streaming={d['streamingState']} state={d['deviceState']}"
135+
)
136+
return "\n".join(lines)
137+
138+
139+
@mcp.tool()
140+
async def list_metrics(device: str) -> str:
141+
"""List the available numeric metrics for a DPL device.
142+
143+
Sends a list_metrics command to the driver and waits up to 3 seconds for
144+
the reply. Only numeric metrics (int, float, uint64) are included; string
145+
and enum metrics are excluded.
146+
147+
Args:
148+
device: Device name exactly as shown by list_devices.
149+
"""
150+
# Remove any stale cached result so we can detect the fresh reply.
151+
_metrics_lists.pop(device, None)
152+
await _send({"cmd": "list_metrics", "device": device})
153+
for _ in range(60): # up to 3 s
154+
await asyncio.sleep(0.05)
155+
if device in _metrics_lists:
156+
names = _metrics_lists[device]
157+
if not names:
158+
return f"Device '{device}' has no numeric metrics yet."
159+
return f"{len(names)} metric(s): " + ", ".join(names)
160+
return f"No reply from driver for device '{device}' (timeout)."
161+
162+
163+
@mcp.tool()
164+
async def subscribe(device: str, metrics: list[str]) -> str:
165+
"""Subscribe to one or more metrics for a DPL device.
166+
167+
After subscribing, the driver will push update frames for the device
168+
whenever any of the subscribed metrics change. Use get_updates to drain
169+
the buffer.
170+
171+
Args:
172+
device: Device name exactly as shown by list_devices.
173+
metrics: List of metric names to subscribe to (from list_metrics).
174+
"""
175+
await _send({"cmd": "subscribe", "device": device, "metrics": metrics})
176+
return f"Subscribed to {len(metrics)} metric(s) for '{device}': {', '.join(metrics)}"
177+
178+
179+
@mcp.tool()
180+
async def unsubscribe(device: str, metrics: list[str]) -> str:
181+
"""Stop receiving updates for specific metrics of a DPL device.
182+
183+
Args:
184+
device: Device name exactly as shown by list_devices.
185+
metrics: List of metric names to unsubscribe from.
186+
"""
187+
await _send({"cmd": "unsubscribe", "device": device, "metrics": metrics})
188+
return f"Unsubscribed from {len(metrics)} metric(s) for '{device}'."
189+
190+
191+
@mcp.tool()
192+
async def get_updates(max_updates: int = 50) -> str:
193+
"""Drain and return buffered metric update frames received since the last call.
194+
195+
Each frame contains the latest values of all subscribed metrics that
196+
changed during that processing cycle. Calling this repeatedly gives a
197+
time-ordered view of metric evolution.
198+
199+
Args:
200+
max_updates: Maximum number of update frames to return (default 50).
201+
"""
202+
await _ensure_connected()
203+
batch = _updates[:max_updates]
204+
del _updates[:max_updates]
205+
if not batch:
206+
return "No buffered updates."
207+
lines = []
208+
for upd in batch:
209+
name = upd.get("name") or f"device[{upd.get('device', '?')}]"
210+
metrics = upd.get("metrics", {})
211+
if metrics:
212+
parts = ", ".join(f"{k}={v}" for k, v in metrics.items())
213+
lines.append(f"{name}: {parts}")
214+
else:
215+
lines.append(f"{name}: (empty update)")
216+
return "\n".join(lines)
217+
218+
219+
# ---------------------------------------------------------------------------
220+
# Entry point
221+
# ---------------------------------------------------------------------------
222+
def main() -> None:
223+
global _port
224+
225+
parser = argparse.ArgumentParser(
226+
description="DPL status MCP server — expose DPL driver metrics via MCP tools"
227+
)
228+
group = parser.add_mutually_exclusive_group()
229+
group.add_argument(
230+
"--port",
231+
type=int,
232+
default=None,
233+
help="TCP port of the DPL driver status WebSocket (default: 8080 or DPL_STATUS_PORT env var)",
234+
)
235+
group.add_argument(
236+
"--pid",
237+
type=int,
238+
default=None,
239+
help="PID of the DPL driver process; port is derived as 8080 + pid %% 30000",
240+
)
241+
args = parser.parse_args()
242+
243+
if args.pid is not None:
244+
_port = 8080 + args.pid % 30000
245+
elif args.port is not None:
246+
_port = args.port
247+
elif "DPL_STATUS_PORT" in os.environ:
248+
_port = int(os.environ["DPL_STATUS_PORT"])
249+
# else leave _port at the default 8080
250+
251+
mcp.run()
252+
253+
254+
if __name__ == "__main__":
255+
main()
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[build-system]
2+
requires = ["hatchling"]
3+
build-backend = "hatchling.build"
4+
5+
[project]
6+
name = "dpl-mcp-server"
7+
version = "0.1.0"
8+
description = "MCP server for monitoring DPL (Data Processing Layer) workflows"
9+
requires-python = ">=3.11"
10+
dependencies = [
11+
"mcp>=1.0.0",
12+
"websockets>=12.0",
13+
]
14+
15+
[project.scripts]
16+
dpl-mcp-server = "dpl_mcp_server:main"
17+
18+
[tool.hatch.build.targets.wheel]
19+
include = ["dpl_mcp_server.py"]

0 commit comments

Comments
 (0)