Skip to content

Commit 16e0810

Browse files
committed
DPL MCP: allow client to control signposts
1 parent db1bd8a commit 16e0810

File tree

3 files changed

+115
-0
lines changed

3 files changed

+115
-0
lines changed

Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,37 @@ async def get_logs(max_lines: int = 100) -> str:
237237
return "\n".join(lines)
238238

239239

240+
@mcp.tool()
241+
async def enable_signpost(device: str, streams: list[str]) -> str:
242+
"""Enable one or more signpost log streams for a DPL device.
243+
244+
Signpost streams produce detailed trace output visible in the device logs.
245+
Use get_logs() after subscribing to see the output.
246+
247+
Known stream names (full form): ch.cern.aliceo2.device,
248+
ch.cern.aliceo2.completion, ch.cern.aliceo2.monitoring_service,
249+
ch.cern.aliceo2.data_processor_context, ch.cern.aliceo2.stream_context.
250+
251+
Args:
252+
device: Device name as shown by list_devices, or "" for the driver.
253+
streams: List of full signpost log names to enable.
254+
"""
255+
await _send({"cmd": "enable_signpost", "device": device, "streams": streams})
256+
return f"Enabled {len(streams)} signpost stream(s) for '{device or 'driver'}': {', '.join(streams)}"
257+
258+
259+
@mcp.tool()
260+
async def disable_signpost(device: str, streams: list[str]) -> str:
261+
"""Disable one or more signpost log streams for a DPL device.
262+
263+
Args:
264+
device: Device name as shown by list_devices, or "" for the driver.
265+
streams: List of full signpost log names to disable.
266+
"""
267+
await _send({"cmd": "disable_signpost", "device": device, "streams": streams})
268+
return f"Disabled {len(streams)} signpost stream(s) for '{device or 'driver'}': {', '.join(streams)}"
269+
270+
240271
@mcp.tool()
241272
async def get_updates(max_updates: int = 50) -> str:
242273
"""Drain and return buffered metric update frames received since the last call.

Framework/Core/src/StatusWebSocketHandler.cxx

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212
#include "StatusWebSocketHandler.h"
1313
#include "DPLWebSocket.h"
1414
#include "DriverServerContext.h"
15+
#include "Framework/DeviceControl.h"
16+
#include "Framework/DeviceController.h"
1517
#include "Framework/DeviceInfo.h"
1618
#include "Framework/DeviceMetricsInfo.h"
1719
#include "Framework/DeviceSpec.h"
20+
#include "Framework/DeviceState.h"
1821
#include "Framework/DeviceStateEnums.h"
1922
#include "Framework/LogParsingHelpers.h"
23+
#include "Framework/Signpost.h"
2024
#include <algorithm>
2125
#include <cstdio>
2226
#include <string>
@@ -250,6 +254,10 @@ void StatusWebSocketHandler::frame(char const* data, size_t s)
250254
handleSubscribeLogs(deviceName);
251255
} else if (cmd == "unsubscribe_logs") {
252256
handleUnsubscribeLogs(deviceName);
257+
} else if (cmd == "enable_signpost") {
258+
handleEnableSignpost(deviceName, extractArrayField(msg, "streams"));
259+
} else if (cmd == "disable_signpost") {
260+
handleDisableSignpost(deviceName, extractArrayField(msg, "streams"));
253261
}
254262
}
255263

@@ -433,6 +441,69 @@ size_t StatusWebSocketHandler::findDeviceIndex(std::string_view name) const
433441
return SIZE_MAX;
434442
}
435443

444+
void StatusWebSocketHandler::handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr)
445+
{
446+
if (streamsArr.empty()) {
447+
return;
448+
}
449+
if (deviceName.empty()) {
450+
// Driver process — toggle in-process via o2_walk_logs.
451+
forEachStringInArray(streamsArr, [](std::string_view streamName) {
452+
std::string target(streamName);
453+
o2_walk_logs([](char const* name, void* l, void* context) -> bool {
454+
auto* log = static_cast<_o2_log_t*>(l);
455+
if (static_cast<std::string*>(context)->compare(name) == 0) {
456+
_o2_log_set_stacktrace(log, log->defaultStacktrace);
457+
return false;
458+
}
459+
return true;
460+
}, &target);
461+
});
462+
} else {
463+
size_t di = findDeviceIndex(deviceName);
464+
if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) {
465+
return;
466+
}
467+
auto* controller = (*mContext.controls)[di].controller;
468+
forEachStringInArray(streamsArr, [controller](std::string_view name) {
469+
std::string cmd = "/signpost:enable ";
470+
cmd += name;
471+
controller->write(cmd.c_str(), cmd.size());
472+
});
473+
}
474+
}
475+
476+
void StatusWebSocketHandler::handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr)
477+
{
478+
if (streamsArr.empty()) {
479+
return;
480+
}
481+
if (deviceName.empty()) {
482+
forEachStringInArray(streamsArr, [](std::string_view streamName) {
483+
std::string target(streamName);
484+
o2_walk_logs([](char const* name, void* l, void* context) -> bool {
485+
auto* log = static_cast<_o2_log_t*>(l);
486+
if (static_cast<std::string*>(context)->compare(name) == 0) {
487+
_o2_log_set_stacktrace(log, 0);
488+
return false;
489+
}
490+
return true;
491+
}, &target);
492+
});
493+
} else {
494+
size_t di = findDeviceIndex(deviceName);
495+
if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) {
496+
return;
497+
}
498+
auto* controller = (*mContext.controls)[di].controller;
499+
forEachStringInArray(streamsArr, [controller](std::string_view name) {
500+
std::string cmd = "/signpost:disable ";
501+
cmd += name;
502+
controller->write(cmd.c_str(), cmd.size());
503+
});
504+
}
505+
}
506+
436507
void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName)
437508
{
438509
size_t di = findDeviceIndex(deviceName);

Framework/Core/src/StatusWebSocketHandler.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ struct WSDPLHandler;
4141
/// {"cmd":"unsubscribe_logs","device":"<name>"}
4242
/// → driver stops pushing log lines for the device
4343
///
44+
/// {"cmd":"enable_signpost","device":"<name>","streams":["device","completion",...]}
45+
/// → enable the named signpost log streams for a device (or the driver if device=="")
46+
/// → known streams: "device","completion","monitoring_service","data_processor_context","stream_context"
47+
///
48+
/// {"cmd":"disable_signpost","device":"<name>","streams":["device","completion",...]}
49+
/// → disable the named signpost log streams for a device
50+
///
51+
/// {"cmd":"list_signposts"}
52+
/// → driver replies with {"type":"signposts_list","streams":["device","completion",...]}
53+
/// → lists the known stream names
54+
///
4455
/// Protocol (driver → client):
4556
/// {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]}
4657
/// → sent once on connect; contains no metrics or logs
@@ -84,6 +95,8 @@ struct StatusWebSocketHandler : public WebSocketHandler {
8495
void handleUnsubscribe(std::string_view deviceName, std::string_view metricsJson);
8596
void handleSubscribeLogs(std::string_view deviceName);
8697
void handleUnsubscribeLogs(std::string_view deviceName);
98+
void handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr);
99+
void handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr);
87100
size_t findDeviceIndex(std::string_view name) const;
88101

89102
DriverServerContext& mContext;

0 commit comments

Comments
 (0)