diff --git a/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py b/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py index bc04acf026188..febb1278f6045 100644 --- a/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py +++ b/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py @@ -237,6 +237,37 @@ async def get_logs(max_lines: int = 100) -> str: return "\n".join(lines) +@mcp.tool() +async def enable_signpost(device: str, streams: list[str]) -> str: + """Enable one or more signpost log streams for a DPL device. + + Signpost streams produce detailed trace output visible in the device logs. + Use get_logs() after subscribing to see the output. + + Known stream names (full form): ch.cern.aliceo2.device, + ch.cern.aliceo2.completion, ch.cern.aliceo2.monitoring_service, + ch.cern.aliceo2.data_processor_context, ch.cern.aliceo2.stream_context. + + Args: + device: Device name as shown by list_devices, or "" for the driver. + streams: List of full signpost log names to enable. + """ + await _send({"cmd": "enable_signpost", "device": device, "streams": streams}) + return f"Enabled {len(streams)} signpost stream(s) for '{device or 'driver'}': {', '.join(streams)}" + + +@mcp.tool() +async def disable_signpost(device: str, streams: list[str]) -> str: + """Disable one or more signpost log streams for a DPL device. + + Args: + device: Device name as shown by list_devices, or "" for the driver. + streams: List of full signpost log names to disable. + """ + await _send({"cmd": "disable_signpost", "device": device, "streams": streams}) + return f"Disabled {len(streams)} signpost stream(s) for '{device or 'driver'}': {', '.join(streams)}" + + @mcp.tool() async def get_updates(max_updates: int = 50) -> str: """Drain and return buffered metric update frames received since the last call. diff --git a/Framework/Core/src/StatusWebSocketHandler.cxx b/Framework/Core/src/StatusWebSocketHandler.cxx index db715eff6592d..cdf08c4f2f349 100644 --- a/Framework/Core/src/StatusWebSocketHandler.cxx +++ b/Framework/Core/src/StatusWebSocketHandler.cxx @@ -12,11 +12,15 @@ #include "StatusWebSocketHandler.h" #include "DPLWebSocket.h" #include "DriverServerContext.h" +#include "Framework/DeviceControl.h" +#include "Framework/DeviceController.h" #include "Framework/DeviceInfo.h" #include "Framework/DeviceMetricsInfo.h" #include "Framework/DeviceSpec.h" +#include "Framework/DeviceState.h" #include "Framework/DeviceStateEnums.h" #include "Framework/LogParsingHelpers.h" +#include "Framework/Signpost.h" #include #include #include @@ -250,6 +254,10 @@ void StatusWebSocketHandler::frame(char const* data, size_t s) handleSubscribeLogs(deviceName); } else if (cmd == "unsubscribe_logs") { handleUnsubscribeLogs(deviceName); + } else if (cmd == "enable_signpost") { + handleEnableSignpost(deviceName, extractArrayField(msg, "streams")); + } else if (cmd == "disable_signpost") { + handleDisableSignpost(deviceName, extractArrayField(msg, "streams")); } } @@ -433,6 +441,69 @@ size_t StatusWebSocketHandler::findDeviceIndex(std::string_view name) const return SIZE_MAX; } +void StatusWebSocketHandler::handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr) +{ + if (streamsArr.empty()) { + return; + } + if (deviceName.empty()) { + // Driver process — toggle in-process via o2_walk_logs. + forEachStringInArray(streamsArr, [](std::string_view streamName) { + std::string target(streamName); + o2_walk_logs([](char const* name, void* l, void* context) -> bool { + auto* log = static_cast<_o2_log_t*>(l); + if (static_cast(context)->compare(name) == 0) { + _o2_log_set_stacktrace(log, log->defaultStacktrace); + return false; + } + return true; + }, &target); + }); + } else { + size_t di = findDeviceIndex(deviceName); + if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) { + return; + } + auto* controller = (*mContext.controls)[di].controller; + forEachStringInArray(streamsArr, [controller](std::string_view name) { + std::string cmd = "/signpost:enable "; + cmd += name; + controller->write(cmd.c_str(), cmd.size()); + }); + } +} + +void StatusWebSocketHandler::handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr) +{ + if (streamsArr.empty()) { + return; + } + if (deviceName.empty()) { + forEachStringInArray(streamsArr, [](std::string_view streamName) { + std::string target(streamName); + o2_walk_logs([](char const* name, void* l, void* context) -> bool { + auto* log = static_cast<_o2_log_t*>(l); + if (static_cast(context)->compare(name) == 0) { + _o2_log_set_stacktrace(log, 0); + return false; + } + return true; + }, &target); + }); + } else { + size_t di = findDeviceIndex(deviceName); + if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) { + return; + } + auto* controller = (*mContext.controls)[di].controller; + forEachStringInArray(streamsArr, [controller](std::string_view name) { + std::string cmd = "/signpost:disable "; + cmd += name; + controller->write(cmd.c_str(), cmd.size()); + }); + } +} + void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName) { size_t di = findDeviceIndex(deviceName); diff --git a/Framework/Core/src/StatusWebSocketHandler.h b/Framework/Core/src/StatusWebSocketHandler.h index 86a460e289440..3b040d68e26f0 100644 --- a/Framework/Core/src/StatusWebSocketHandler.h +++ b/Framework/Core/src/StatusWebSocketHandler.h @@ -41,6 +41,17 @@ struct WSDPLHandler; /// {"cmd":"unsubscribe_logs","device":""} /// → driver stops pushing log lines for the device /// +/// {"cmd":"enable_signpost","device":"","streams":["device","completion",...]} +/// → enable the named signpost log streams for a device (or the driver if device=="") +/// → known streams: "device","completion","monitoring_service","data_processor_context","stream_context" +/// +/// {"cmd":"disable_signpost","device":"","streams":["device","completion",...]} +/// → disable the named signpost log streams for a device +/// +/// {"cmd":"list_signposts"} +/// → driver replies with {"type":"signposts_list","streams":["device","completion",...]} +/// → lists the known stream names +/// /// Protocol (driver → client): /// {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]} /// → sent once on connect; contains no metrics or logs @@ -84,6 +95,8 @@ struct StatusWebSocketHandler : public WebSocketHandler { void handleUnsubscribe(std::string_view deviceName, std::string_view metricsJson); void handleSubscribeLogs(std::string_view deviceName); void handleUnsubscribeLogs(std::string_view deviceName); + void handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr); + void handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr); size_t findDeviceIndex(std::string_view name) const; DriverServerContext& mContext; diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index 43a407536cb59..97ea1b3dbf66a 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -188,48 +188,40 @@ void on_connect(uv_connect_t* connection, int status) state.tracingFlags = tracingFlags; }); - client->observe("/log-streams", [ref = context->ref](std::string_view cmd) { - auto& state = ref.get(); - static constexpr int prefixSize = std::string_view{"/log-streams "}.size(); - if (prefixSize > cmd.size()) { - LOG(error) << "Malformed log-streams request"; + client->observe("/signpost:enable", [](std::string_view cmd) { + static constexpr int prefixSize = std::string_view{"/signpost:enable "}.size(); + if (cmd.size() <= prefixSize) { + LOG(error) << "Malformed /signpost:enable request"; return; } - cmd.remove_prefix(prefixSize); - int logStreams = 0; + std::string name(cmd.substr(prefixSize)); + o2_walk_logs([](char const* logName, void* l, void* context) -> bool { + auto* log = static_cast<_o2_log_t*>(l); + auto* target = static_cast(context); + if (*target == logName) { + _o2_log_set_stacktrace(log, log->defaultStacktrace); + return false; + } + return true; + }, &name); + }); - auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams); - if (error.ec != std::errc()) { - LOG(error) << "Malformed log-streams mask"; + client->observe("/signpost:disable", [](std::string_view cmd) { + static constexpr int prefixSize = std::string_view{"/signpost:disable "}.size(); + if (cmd.size() <= prefixSize) { + LOG(error) << "Malformed /signpost:disable request"; return; } - LOGP(info, "Logstreams flags set to {}", logStreams); - state.logStreams = logStreams; - if ((state.logStreams & DeviceState::LogStreams::DEVICE_LOG) != 0) { - O2_LOG_ENABLE(device); - } else { - O2_LOG_DISABLE(device); - } - if ((state.logStreams & DeviceState::LogStreams::COMPLETION_LOG) != 0) { - O2_LOG_ENABLE(completion); - } else { - O2_LOG_DISABLE(completion); - } - if ((state.logStreams & DeviceState::LogStreams::MONITORING_SERVICE_LOG) != 0) { - O2_LOG_ENABLE(monitoring_service); - } else { - O2_LOG_DISABLE(monitoring_service); - } - if ((state.logStreams & DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG) != 0) { - O2_LOG_ENABLE(data_processor_context); - } else { - O2_LOG_DISABLE(data_processor_context); - } - if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) { - O2_LOG_ENABLE(stream_context); - } else { - O2_LOG_DISABLE(stream_context); - } + std::string name(cmd.substr(prefixSize)); + o2_walk_logs([](char const* logName, void* l, void* context) -> bool { + auto* log = static_cast<_o2_log_t*>(l); + auto* target = static_cast(context); + if (*target == logName) { + _o2_log_set_stacktrace(log, 0); + return false; + } + return true; + }, &name); }); // Client will be filled in the line after. I can probably have a single diff --git a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx index aa546b8a9ab49..b29e024ec641e 100644 --- a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx +++ b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx @@ -400,16 +400,25 @@ void displayDeviceInspector(DeviceSpec const& spec, } } - bool logsChanged = false; if (ImGui::CollapsingHeader("Signposts", ImGuiTreeNodeFlags_DefaultOpen)) { - logsChanged = ImGui::CheckboxFlags("Device", &control.logStreams, DeviceState::LogStreams::DEVICE_LOG); - logsChanged = ImGui::CheckboxFlags("Completion", &control.logStreams, DeviceState::LogStreams::COMPLETION_LOG); - logsChanged = ImGui::CheckboxFlags("Monitoring", &control.logStreams, DeviceState::LogStreams::MONITORING_SERVICE_LOG); - logsChanged = ImGui::CheckboxFlags("DataProcessorContext", &control.logStreams, DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG); - logsChanged = ImGui::CheckboxFlags("StreamContext", &control.logStreams, DeviceState::LogStreams::STREAM_CONTEXT_LOG); - if (logsChanged && control.controller) { - std::string cmd = fmt::format("/log-streams {}", control.logStreams); - control.controller->write(cmd.c_str(), cmd.size()); + static const struct { + const char* label; + int bit; + const char* fullName; + } kStreams[] = { + {"Device", DeviceState::LogStreams::DEVICE_LOG, "ch.cern.aliceo2.device"}, + {"Completion", DeviceState::LogStreams::COMPLETION_LOG, "ch.cern.aliceo2.completion"}, + {"Monitoring", DeviceState::LogStreams::MONITORING_SERVICE_LOG, "ch.cern.aliceo2.monitoring_service"}, + {"DataProcessorContext", DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG, "ch.cern.aliceo2.data_processor_context"}, + {"StreamContext", DeviceState::LogStreams::STREAM_CONTEXT_LOG, "ch.cern.aliceo2.stream_context"}, + }; + for (auto const& s : kStreams) { + if (ImGui::CheckboxFlags(s.label, &control.logStreams, s.bit) && control.controller) { + bool enabled = (control.logStreams & s.bit) != 0; + std::string cmd = enabled ? fmt::format("/signpost:enable {}", s.fullName) + : fmt::format("/signpost:disable {}", s.fullName); + control.controller->write(cmd.c_str(), cmd.size()); + } } }