From 236089f5c89804007b3a9dfd9879d07a7d961d0e Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 2 Mar 2026 17:43:41 +0000 Subject: [PATCH 01/16] Add long-running agents anomaly monitoring end-to-end Co-authored-by: leor --- docs/agents_monitoring_handoff.md | 128 +++ electron-ui/index.html | 93 +++ electron-ui/main.js | 316 ++++++- electron-ui/preload.js | 29 + electron-ui/renderer.js | 379 +++++++++ electron-ui/styles.css | 275 +++++++ scripts/anomaly_monitor.py | 912 +++++++++++++++++++++ scripts/anomaly_rules.py | 217 +++++ scripts/fixtures/anomaly_replay_cases.json | 32 + scripts/graph_api.py | 11 +- scripts/ignition_api_client.py | 69 +- scripts/neo4j_ontology.py | 148 +++- 12 files changed, 2565 insertions(+), 44 deletions(-) create mode 100644 docs/agents_monitoring_handoff.md create mode 100644 scripts/anomaly_monitor.py create mode 100644 scripts/anomaly_rules.py create mode 100644 scripts/fixtures/anomaly_replay_cases.json diff --git a/docs/agents_monitoring_handoff.md b/docs/agents_monitoring_handoff.md new file mode 100644 index 0000000..a5368fb --- /dev/null +++ b/docs/agents_monitoring_handoff.md @@ -0,0 +1,128 @@ +# Long-Running Agents Monitoring Handoff + +## Summary + +This handoff documents the implemented V1 monitoring capability: + +- New **Agents** tab in Electron UI for starting/stopping long-running monitoring. +- Continuous Python worker (`anomaly_monitor.py`) with: + - deterministic historical-deviation scoring, + - quality/staleness gates, + - optional LLM triage, + - Neo4j persistence for `AgentRun` and `AnomalyEvent`, + - event dedup and retention cleanup. +- IPC surface and stream channels from Electron main to renderer: + - `agents:start`, `agents:status`, `agents:stop`, + - `agents:list-events`, `agents:get-event`, `agents:ack-event`, `agents:cleanup`, + - channels: `agent-status`, `agent-event`, `agent-error`, `agent-complete`. +- Graph drill-down integration with anomaly node support. + +## Files Changed + +### Electron + +- `electron-ui/index.html` + - Added **Agents** nav button. + - Added `tab-agents` page shell with controls, filters, feed, and detail panel. + - Added graph filter option for anomaly layer. + +- `electron-ui/styles.css` + - Added Agents tab styles (`agents-*`, `status-chip`, feed cards, detail panel). + +- `electron-ui/preload.js` + - Added `agents*` API bridge methods. + - Added event listeners for `agent-status/event/error/complete`. + +- `electron-ui/main.js` + - Added background agent runtime management (`activeAgentRun`). + - Added stream parser for monitor stdout markers (`[AGENT_STATUS]`, etc.). + - Added full `agents:*` IPC handlers. + - Added graceful stop handling on app shutdown. + +- `electron-ui/renderer.js` + - Added Agents tab state management. + - Added start/stop/refresh/cleanup/ack handlers. + - Added realtime feed updates from agent channels. + - Added event detail rendering and graph drill-down action. + +### Python backend + +- `scripts/anomaly_rules.py` (new) + - Deterministic scoring logic (`z`, `MAD`, rate, drift trend, flatline). + - Quality/staleness helpers and dedup key generator. + +- `scripts/anomaly_monitor.py` (new) + - Long-running monitoring worker with CLI subcommands: + - `run`, `status`, `list-events`, `get-event`, `ack-event`, `cleanup`, `replay-fixtures`. + - Neo4j persistence + dedup + retention cleanup. + - Optional LLM triage with structured JSON fallback. + +- `scripts/ignition_api_client.py` + - Added `query_tag_history(...)` and local-time-to-UTC conversion helper. + +- `scripts/neo4j_ontology.py` + - Added monitoring schema constraints/indexes for `AgentRun` / `AnomalyEvent`. + - Added helper methods: list/get/cleanup anomaly events. + - Added CLI commands: + - `init-agent-schema` + - `list-anomaly-events` + - `get-anomaly-event` + - `cleanup-anomaly-events` + +- `scripts/graph_api.py` + - Added node groups/colors for `AgentRun` and `AnomalyEvent`. + - Extended neighbor center-node lookup to support `event_id` and `run_id`. + +### Fixtures + +- `scripts/fixtures/anomaly_replay_cases.json` (new) + - Deterministic replay cases: + - normal baseline, + - sudden spike, + - slow drift, + - flatline/stuck. + +## Runtime Commands + +### Deterministic replay validation + +```bash +python3 scripts/anomaly_monitor.py replay-fixtures --fixture-file scripts/fixtures/anomaly_replay_cases.json +``` + +### Monitor worker manual run + +```bash +python3 scripts/anomaly_monitor.py run --run-id demo-run --config-json '{"pollIntervalMs":15000}' +``` + +### Event operations + +```bash +python3 scripts/anomaly_monitor.py list-events --limit 50 +python3 scripts/anomaly_monitor.py get-event --event-id +python3 scripts/anomaly_monitor.py ack-event --event-id --note "Reviewed by operator" +python3 scripts/anomaly_monitor.py cleanup --retention-days 14 +``` + +## Known Environment Requirements + +The Python environment must include packages from `requirements.txt`: + +- `neo4j` +- `anthropic` (for LLM triage; deterministic fallback works without API key) +- `python-dotenv` +- `requests` + +If `ANTHROPIC_API_KEY` is absent, triage automatically falls back to deterministic explanations. + +## Validation Status + +- Syntax checks passed: + - Python (`py_compile`) for all modified scripts. + - JS syntax checks (`node --check`) for Electron files. +- Fixture replay passed: + - `4/4` deterministic scenarios. + +Live end-to-end validation against actual Ignition + Neo4j + Anthropic requires connected runtime services. + diff --git a/electron-ui/index.html b/electron-ui/index.html index 03b808e..7e5e8a7 100644 --- a/electron-ui/index.html +++ b/electron-ui/index.html @@ -36,6 +36,13 @@ Assist + + + + + +
+ Idle + No active run +
+ + +
+ + + + + + + + + + + + + + +
+ +
+
Cycle (ms)0
+
Candidates0
+
Triaged0
+
Emitted0
+
Last heartbeatn/a
+
+ +
+ + +
+
+

Event Details

+
+ + +
+
+
+

Select an anomaly event from the feed.

+
+
+
+ +
@@ -630,6 +722,7 @@

Ontology Graph

+ diff --git a/electron-ui/main.js b/electron-ui/main.js index b5cdb4d..e215fb4 100644 --- a/electron-ui/main.js +++ b/electron-ui/main.js @@ -4,6 +4,7 @@ const fs = require('fs'); const { spawn } = require('child_process'); let mainWindow; +let activeAgentRun = null; // --------------------------------------------------------------------------- // Python backend configuration (works in both dev and packaged modes) @@ -103,6 +104,16 @@ app.on('window-all-closed', () => { } }); +app.on('before-quit', () => { + if (activeAgentRun && activeAgentRun.process && !activeAgentRun.process.killed) { + try { + activeAgentRun.process.kill('SIGTERM'); + } catch (err) { + // Ignore termination errors during shutdown. + } + } +}); + app.on('activate', () => { if (BrowserWindow.getAllWindows().length === 0) { createWindow(); @@ -185,6 +196,132 @@ function runPythonScript(scriptName, args = [], options = {}) { }); } +function normalizeAgentConfig(config = {}) { + const thresholds = (config && typeof config.thresholds === 'object' && config.thresholds) || {}; + const scope = (config && typeof config.scope === 'object' && config.scope) || {}; + return { + pollIntervalMs: Math.max(5000, Number(config.pollIntervalMs || 15000)), + historyWindowMinutes: Math.max(10, Number(config.historyWindowMinutes || 360)), + minHistoryPoints: Math.max(10, Number(config.minHistoryPoints || 30)), + maxMonitoredTags: Math.max(10, Number(config.maxMonitoredTags || 200)), + maxCandidatesPerCycle: Math.max(1, Number(config.maxCandidatesPerCycle || 25)), + maxLlmTriagesPerCycle: Math.max(0, Number(config.maxLlmTriagesPerCycle || 5)), + dedupCooldownMinutes: Math.max(1, Number(config.dedupCooldownMinutes || 10)), + retentionDays: Math.max(1, Number(config.retentionDays || 14)), + cleanupEveryCycles: Math.max(1, Number(config.cleanupEveryCycles || 40)), + thresholds: { + z: Number(thresholds.z ?? 3.0), + mad: Number(thresholds.mad ?? 3.5), + rate: Number(thresholds.rate ?? 0.0), + stalenessSec: Number(thresholds.stalenessSec ?? 120), + flatline_std_epsilon: Number(thresholds.flatline_std_epsilon ?? 1e-6), + stuck_window_size: Number(thresholds.stuck_window_size ?? 20), + }, + scope: { + project: scope.project || null, + equipmentTags: Array.isArray(scope.equipmentTags) ? scope.equipmentTags : [], + tagRegex: scope.tagRegex || null, + }, + }; +} + +function routeAgentMessage(channel, payload) { + if (mainWindow) { + mainWindow.webContents.send(channel, payload); + } +} + +function parseAgentLine(line) { + const trimmed = (line || '').trim(); + if (!trimmed) return null; + const prefixes = [ + { key: '[AGENT_STATUS]', channel: 'agent-status' }, + { key: '[AGENT_EVENT]', channel: 'agent-event' }, + { key: '[AGENT_ERROR]', channel: 'agent-error' }, + { key: '[AGENT_COMPLETE]', channel: 'agent-complete' }, + ]; + for (const prefix of prefixes) { + if (!trimmed.startsWith(prefix.key)) continue; + const jsonText = trimmed.slice(prefix.key.length).trim(); + try { + const payload = JSON.parse(jsonText); + return { channel: prefix.channel, payload }; + } catch (err) { + return { + channel: 'agent-error', + payload: { + runId: activeAgentRun ? activeAgentRun.runId : null, + code: 'invalid_agent_json', + message: `Failed to parse agent stream line: ${trimmed.slice(0, 200)}`, + recoverable: true, + timestamp: new Date().toISOString(), + }, + }; + } + } + return null; +} + +function handleAgentStdoutChunk(text) { + if (!activeAgentRun) return; + activeAgentRun.stdoutBuffer += text; + const lines = activeAgentRun.stdoutBuffer.split(/\r?\n/); + activeAgentRun.stdoutBuffer = lines.pop() || ''; + for (const line of lines) { + const parsed = parseAgentLine(line); + if (!parsed) continue; + if (parsed.channel === 'agent-status' && parsed.payload) { + activeAgentRun.status = parsed.payload.state || activeAgentRun.status; + activeAgentRun.metrics = { + cycleMs: parsed.payload.cycleMs || 0, + candidates: parsed.payload.candidates || 0, + triaged: parsed.payload.triaged || 0, + emitted: parsed.payload.emitted || 0, + timestamp: parsed.payload.timestamp || new Date().toISOString(), + }; + } + routeAgentMessage(parsed.channel, parsed.payload); + } +} + +async function stopActiveAgent(reason = 'stopped_by_user') { + if (!activeAgentRun || !activeAgentRun.process || activeAgentRun.process.killed) { + return { success: false, error: 'No active agent run' }; + } + const runId = activeAgentRun.runId; + activeAgentRun.status = 'stopping'; + + return new Promise((resolve) => { + const proc = activeAgentRun.process; + let settled = false; + const done = (result) => { + if (settled) return; + settled = true; + resolve(result); + }; + + proc.once('close', () => { + done({ success: true, runId, stoppedAt: new Date().toISOString(), reason }); + }); + + try { + proc.kill('SIGTERM'); + } catch (err) { + done({ success: false, error: err.message }); + return; + } + + setTimeout(() => { + if (proc.killed) return; + try { + proc.kill('SIGKILL'); + } catch (err) { + // Ignore forced termination errors. + } + }, 5000); + }); +} + // IPC Handlers // Select file dialog @@ -1304,7 +1441,9 @@ function readDbCredentials() { if (!fs.existsSync(credPath)) return {}; try { return JSON.parse(fs.readFileSync(credPath, 'utf-8')); - } catch { return {}; } + } catch { + return {}; + } } // Get database connections from Neo4j + credential status from db_credentials.json @@ -1314,10 +1453,8 @@ ipcMain.handle('get-db-connections', async () => { const proc = spawnPythonProcess('neo4j_ontology.py', ['db-connections', '--json']); let stdout = ''; - let stderr = ''; proc.stdout.on('data', (data) => { stdout += data.toString(); }); - proc.stderr.on('data', (data) => { stderr += data.toString(); }); proc.on('close', (code) => { if (code !== 0) { @@ -1335,7 +1472,7 @@ ipcMain.handle('get-db-connections', async () => { })); resolve({ success: true, connections: enriched }); - } catch (e) { + } catch { resolve({ success: true, connections: [] }); } }); @@ -1349,7 +1486,7 @@ ipcMain.handle('get-db-connections', async () => { ipcMain.handle('save-db-credentials', async (event, credentials) => { try { const credPath = getDbCredentialsPath(); - let existing = readDbCredentials(); + const existing = readDbCredentials(); for (const [name, cred] of Object.entries(credentials)) { existing[name] = { @@ -1392,4 +1529,173 @@ ipcMain.handle('test-db-connection', async (event, connectionName) => { } catch (error) { return { success: false, error: error.message }; } +}); + +// ============================================ +// Long-running Agent Monitoring IPC Handlers +// ============================================ + +ipcMain.handle('agents:start', async (event, rawConfig = {}) => { + if (activeAgentRun && activeAgentRun.process && !activeAgentRun.process.killed) { + return { success: false, error: `Agent run already active: ${activeAgentRun.runId}`, runId: activeAgentRun.runId }; + } + + const runId = `agent-${Date.now()}`; + const config = normalizeAgentConfig(rawConfig); + + try { + const proc = spawnPythonProcess('anomaly_monitor.py', [ + 'run', + '--run-id', + runId, + '--config-json', + JSON.stringify(config), + ]); + + activeAgentRun = { + runId, + process: proc, + status: 'starting', + startedAt: new Date().toISOString(), + metrics: { + cycleMs: 0, + candidates: 0, + triaged: 0, + emitted: 0, + timestamp: new Date().toISOString(), + }, + stdoutBuffer: '', + config, + }; + + proc.stdout.on('data', (data) => { + handleAgentStdoutChunk(data.toString()); + }); + + proc.stderr.on('data', (data) => { + const text = data.toString().trim(); + if (!text) return; + routeAgentMessage('agent-error', { + runId, + code: 'worker_stderr', + message: text, + recoverable: true, + timestamp: new Date().toISOString(), + }); + }); + + proc.on('close', (code) => { + const hadActive = activeAgentRun && activeAgentRun.runId === runId; + if (hadActive) { + routeAgentMessage('agent-complete', { + runId, + success: code === 0, + reason: code === 0 ? 'completed' : 'worker_exit_error', + stoppedAt: new Date().toISOString(), + }); + activeAgentRun = null; + } + }); + + proc.on('error', (err) => { + routeAgentMessage('agent-error', { + runId, + code: 'worker_spawn_error', + message: err.message, + recoverable: false, + timestamp: new Date().toISOString(), + }); + activeAgentRun = null; + }); + + return { success: true, runId, startedAt: activeAgentRun.startedAt, config }; + } catch (error) { + activeAgentRun = null; + return { success: false, error: error.message, runId }; + } +}); + +ipcMain.handle('agents:status', async (event, runId) => { + if (activeAgentRun && (!runId || runId === activeAgentRun.runId)) { + return { + success: true, + runId: activeAgentRun.runId, + status: activeAgentRun.status, + metrics: activeAgentRun.metrics, + lastHeartbeatAt: activeAgentRun.metrics.timestamp, + startedAt: activeAgentRun.startedAt, + config: activeAgentRun.config, + active: true, + }; + } + + if (!runId) { + return { success: true, active: false, status: 'idle' }; + } + + try { + const output = await runPythonScript('anomaly_monitor.py', ['status', '--run-id', runId]); + const parsed = JSON.parse(output || '{}'); + return parsed; + } catch (error) { + return { success: false, error: error.message }; + } +}); + +ipcMain.handle('agents:stop', async (event, runId = null) => { + if (!activeAgentRun) { + return { success: false, error: 'No active agent run' }; + } + if (runId && runId !== activeAgentRun.runId) { + return { success: false, error: `Requested run ${runId} does not match active run ${activeAgentRun.runId}` }; + } + return stopActiveAgent('stopped_by_user'); +}); + +ipcMain.handle('agents:list-events', async (event, filters = {}) => { + const args = ['list-events']; + if (filters.limit) args.push('--limit', String(filters.limit)); + if (filters.state) args.push('--state', String(filters.state)); + if (filters.severity) args.push('--severity', String(filters.severity)); + if (filters.runId) args.push('--run-id', String(filters.runId)); + + try { + const output = await runPythonScript('anomaly_monitor.py', args); + return JSON.parse(output || '{"success":true,"events":[]}'); + } catch (error) { + return { success: false, error: error.message, events: [] }; + } +}); + +ipcMain.handle('agents:get-event', async (event, eventId) => { + try { + const output = await runPythonScript('anomaly_monitor.py', ['get-event', '--event-id', String(eventId)]); + return JSON.parse(output || '{}'); + } catch (error) { + return { success: false, error: error.message }; + } +}); + +ipcMain.handle('agents:ack-event', async (event, eventId, note = '') => { + try { + const args = ['ack-event', '--event-id', String(eventId)]; + if (note) args.push('--note', String(note)); + const output = await runPythonScript('anomaly_monitor.py', args); + return JSON.parse(output || '{}'); + } catch (error) { + return { success: false, error: error.message }; + } +}); + +ipcMain.handle('agents:cleanup', async (event, retentionDays = 14) => { + try { + const output = await runPythonScript('anomaly_monitor.py', [ + 'cleanup', + '--retention-days', + String(retentionDays), + ]); + return JSON.parse(output || '{}'); + } catch (error) { + return { success: false, error: error.message }; + } }); \ No newline at end of file diff --git a/electron-ui/preload.js b/electron-ui/preload.js index d3c8171..1e0930c 100644 --- a/electron-ui/preload.js +++ b/electron-ui/preload.js @@ -70,6 +70,15 @@ contextBridge.exposeInMainWorld('api', { getSettings: () => ipcRenderer.invoke('get-settings'), saveSettings: (settings) => ipcRenderer.invoke('save-settings', settings), testIgnitionConnection: (options) => ipcRenderer.invoke('test-ignition-connection', options), + + // Long-running agents monitoring + agentsStart: (config) => ipcRenderer.invoke('agents:start', config), + agentsStatus: (runId) => ipcRenderer.invoke('agents:status', runId), + agentsStop: (runId) => ipcRenderer.invoke('agents:stop', runId), + agentsListEvents: (filters) => ipcRenderer.invoke('agents:list-events', filters), + agentsGetEvent: (eventId) => ipcRenderer.invoke('agents:get-event', eventId), + agentsAckEvent: (eventId, note) => ipcRenderer.invoke('agents:ack-event', eventId, note), + agentsCleanup: (retentionDays) => ipcRenderer.invoke('agents:cleanup', retentionDays), // Database connections getDbConnections: () => ipcRenderer.invoke('get-db-connections'), @@ -91,6 +100,26 @@ contextBridge.exposeInMainWorld('api', { const handler = (event, data) => callback(data); ipcRenderer.on('stream-complete', handler); return () => ipcRenderer.removeListener('stream-complete', handler); + }, + onAgentStatus: (callback) => { + const handler = (event, data) => callback(data); + ipcRenderer.on('agent-status', handler); + return () => ipcRenderer.removeListener('agent-status', handler); + }, + onAgentEvent: (callback) => { + const handler = (event, data) => callback(data); + ipcRenderer.on('agent-event', handler); + return () => ipcRenderer.removeListener('agent-event', handler); + }, + onAgentError: (callback) => { + const handler = (event, data) => callback(data); + ipcRenderer.on('agent-error', handler); + return () => ipcRenderer.removeListener('agent-error', handler); + }, + onAgentComplete: (callback) => { + const handler = (event, data) => callback(data); + ipcRenderer.on('agent-complete', handler); + return () => ipcRenderer.removeListener('agent-complete', handler); } }); diff --git a/electron-ui/renderer.js b/electron-ui/renderer.js index 53974f5..cab7e8b 100644 --- a/electron-ui/renderer.js +++ b/electron-ui/renderer.js @@ -3536,6 +3536,381 @@ btnSaveDbCreds?.addEventListener('click', async () => { btnSaveDbCreds.disabled = false; } }); +// Agents Tab - Long-running monitoring +// ============================================ + +const agentsState = { + runId: null, + status: 'idle', + events: [], + selectedEventId: null, + listenersReady: false, +}; + +function getAgentsElements() { + return { + btnStart: document.getElementById('btn-agents-start'), + btnStop: document.getElementById('btn-agents-stop'), + btnRefresh: document.getElementById('btn-agents-refresh'), + btnCleanup: document.getElementById('btn-agents-cleanup'), + btnOpenGraph: document.getElementById('btn-agents-open-graph'), + btnAck: document.getElementById('btn-agents-ack'), + statusChip: document.getElementById('agents-status-chip'), + statusText: document.getElementById('agents-status-text'), + list: document.getElementById('agents-event-list'), + detail: document.getElementById('agents-event-detail'), + filterState: document.getElementById('agents-filter-state'), + filterSeverity: document.getElementById('agents-filter-severity'), + filterSearch: document.getElementById('agents-filter-search'), + metricCycle: document.getElementById('agents-metric-cycle'), + metricCandidates: document.getElementById('agents-metric-candidates'), + metricTriaged: document.getElementById('agents-metric-triaged'), + metricEmitted: document.getElementById('agents-metric-emitted'), + metricHeartbeat: document.getElementById('agents-metric-heartbeat'), + cfgPoll: document.getElementById('agents-config-poll-ms'), + cfgHist: document.getElementById('agents-config-history-min'), + cfgPoints: document.getElementById('agents-config-min-points'), + cfgMaxLlm: document.getElementById('agents-config-max-llm'), + cfgZ: document.getElementById('agents-config-threshold-z'), + cfgMad: document.getElementById('agents-config-threshold-mad'), + cfgStale: document.getElementById('agents-config-staleness-sec'), + }; +} + +function getAgentsConfigFromUI() { + const el = getAgentsElements(); + return { + pollIntervalMs: Number(el.cfgPoll?.value || 15000), + historyWindowMinutes: Number(el.cfgHist?.value || 360), + minHistoryPoints: Number(el.cfgPoints?.value || 30), + maxLlmTriagesPerCycle: Number(el.cfgMaxLlm?.value || 5), + thresholds: { + z: Number(el.cfgZ?.value || 3), + mad: Number(el.cfgMad?.value || 3.5), + stalenessSec: Number(el.cfgStale?.value || 120), + }, + }; +} + +function formatAgentTime(ts) { + if (!ts) return 'n/a'; + const d = new Date(ts); + if (Number.isNaN(d.getTime())) return String(ts); + return d.toLocaleString(); +} + +function updateAgentStatusUi(status, text) { + const el = getAgentsElements(); + if (!el.statusChip || !el.statusText) return; + el.statusChip.className = 'status-chip'; + const normalized = (status || 'idle').toLowerCase(); + if (normalized === 'running') el.statusChip.classList.add('running'); + if (normalized === 'failed' || normalized === 'error') el.statusChip.classList.add('error'); + el.statusChip.textContent = normalized; + el.statusText.textContent = text || normalized; + if (el.btnStart) el.btnStart.disabled = normalized === 'running' || normalized === 'starting'; + if (el.btnStop) el.btnStop.disabled = !(normalized === 'running' || normalized === 'starting' || normalized === 'stopping'); +} + +function updateAgentMetrics(metrics = {}, heartbeatTs = null) { + const el = getAgentsElements(); + if (el.metricCycle) el.metricCycle.textContent = String(metrics.cycleMs ?? metrics.lastCycleMs ?? 0); + if (el.metricCandidates) el.metricCandidates.textContent = String(metrics.candidates ?? metrics.lastCandidates ?? 0); + if (el.metricTriaged) el.metricTriaged.textContent = String(metrics.triaged ?? metrics.lastTriaged ?? 0); + if (el.metricEmitted) el.metricEmitted.textContent = String(metrics.emitted ?? metrics.lastEmitted ?? 0); + if (el.metricHeartbeat) el.metricHeartbeat.textContent = formatAgentTime(heartbeatTs || metrics.timestamp); +} + +function getFilteredAgentEvents() { + const el = getAgentsElements(); + const state = (el.filterState?.value || '').toLowerCase(); + const severity = (el.filterSeverity?.value || '').toLowerCase(); + const search = (el.filterSearch?.value || '').trim().toLowerCase(); + return agentsState.events.filter((event) => { + if (state && String(event.state || '').toLowerCase() !== state) return false; + if (severity && String(event.severity || '').toLowerCase() !== severity) return false; + if (search) { + const haystack = [ + event.summary, + event.source_tag, + event.tag_name, + ...(event.equipment || []), + ...(event.tags || []), + ] + .filter(Boolean) + .join(' ') + .toLowerCase(); + if (!haystack.includes(search)) return false; + } + return true; + }); +} + +function renderAgentEventList() { + const el = getAgentsElements(); + if (!el.list) return; + const events = getFilteredAgentEvents(); + if (!events.length) { + el.list.innerHTML = '
No anomaly events match the current filters.
'; + return; + } + el.list.innerHTML = events + .map((event) => { + const active = event.event_id === agentsState.selectedEventId ? ' active' : ''; + const sev = String(event.severity || 'low').toLowerCase(); + const equipment = (event.equipment || []).slice(0, 2).join(', '); + return ` +
+
+ ${escapeHtml(sev)} + ${escapeHtml(formatAgentTime(event.created_at))} +
+
${escapeHtml(event.summary || 'Untitled anomaly')}
+
${escapeHtml(event.tag_name || event.source_tag || '')}${equipment ? ` • ${escapeHtml(equipment)}` : ''}
+
+ `; + }) + .join(''); + + el.list.querySelectorAll('.agents-event-card').forEach((card) => { + card.addEventListener('click', () => { + const eventId = card.getAttribute('data-event-id'); + if (!eventId) return; + selectAgentEvent(eventId); + }); + }); +} + +function resolveAgentGraphTarget(event) { + const equipment = (event.equipment || []).find(Boolean); + if (equipment) return { name: equipment, type: 'Equipment' }; + const tagName = event.tag_name || (event.tags || []).find(Boolean) || event.source_tag; + if (tagName) return { name: tagName, type: 'ScadaTag' }; + return null; +} + +function renderAgentEventDetails(event) { + const el = getAgentsElements(); + if (!el.detail) return; + if (!event) { + el.detail.innerHTML = '

Select an anomaly event from the feed.

'; + if (el.btnOpenGraph) el.btnOpenGraph.disabled = true; + if (el.btnAck) el.btnAck.disabled = true; + return; + } + + let checks = []; + let causes = []; + let safety = []; + try { checks = JSON.parse(event.recommended_checks_json || '[]'); } catch (e) {} + try { causes = JSON.parse(event.probable_causes_json || '[]'); } catch (e) {} + try { safety = JSON.parse(event.safety_notes_json || '[]'); } catch (e) {} + + el.detail.innerHTML = ` +
+
Event ID${escapeHtml(event.event_id || '')}
+
State${escapeHtml(event.state || '')}
+
Severity${escapeHtml(event.severity || '')}
+
Confidence${escapeHtml(String(event.confidence ?? ''))}
+
Category${escapeHtml(event.category || '')}
+
Timestamp${escapeHtml(formatAgentTime(event.created_at))}
+
Source Tag${escapeHtml(event.source_tag || '')}
+
Tag Name${escapeHtml(event.tag_name || '')}
+
z-score${escapeHtml(String(event.z_score ?? '0'))}
+
MAD score${escapeHtml(String(event.mad_score ?? '0'))}
+
+
+
Summary
+
${escapeHtml(event.summary || '')}
+
+
+
Explanation
+
${escapeHtml(event.explanation || '')}
+
+
+
Probable Causes
+
    ${(causes || []).map((x) => `
  • ${escapeHtml(String(x))}
  • `).join('') || '
  • n/a
  • '}
+
+
+
Verification Checks
+
    ${(checks || []).map((x) => `
  • ${escapeHtml(String(x))}
  • `).join('') || '
  • n/a
  • '}
+
+
+
Safety Notes
+
    ${(safety || []).map((x) => `
  • ${escapeHtml(String(x))}
  • `).join('') || '
  • n/a
  • '}
+
+ `; + + if (el.btnOpenGraph) el.btnOpenGraph.disabled = !resolveAgentGraphTarget(event); + if (el.btnAck) el.btnAck.disabled = event.state === 'acknowledged'; +} + +async function selectAgentEvent(eventId) { + agentsState.selectedEventId = eventId; + const existing = agentsState.events.find((e) => e.event_id === eventId); + if (existing && existing.explanation && existing.recommended_checks_json) { + renderAgentEventList(); + renderAgentEventDetails(existing); + return; + } + const detailResult = await window.api.agentsGetEvent(eventId); + if (detailResult.success && detailResult.event) { + const idx = agentsState.events.findIndex((e) => e.event_id === eventId); + if (idx >= 0) { + agentsState.events[idx] = { ...agentsState.events[idx], ...detailResult.event }; + } else { + agentsState.events.unshift(detailResult.event); + } + renderAgentEventList(); + renderAgentEventDetails(detailResult.event); + } +} + +async function loadAgentEvents() { + const el = getAgentsElements(); + const result = await window.api.agentsListEvents({ + limit: 200, + state: el.filterState?.value || undefined, + severity: el.filterSeverity?.value || undefined, + runId: agentsState.runId || undefined, + }); + if (!result.success) return; + agentsState.events = Array.isArray(result.events) ? result.events : []; + renderAgentEventList(); + + if (agentsState.selectedEventId) { + const selected = agentsState.events.find((e) => e.event_id === agentsState.selectedEventId); + renderAgentEventDetails(selected || null); + } +} + +async function refreshAgentStatus() { + const status = await window.api.agentsStatus(agentsState.runId || undefined); + if (!status.success) { + updateAgentStatusUi('error', status.error || 'Failed to fetch status'); + return; + } + if (status.active) { + agentsState.runId = status.runId || agentsState.runId; + agentsState.status = status.status || 'running'; + updateAgentStatusUi(agentsState.status, `Run ${agentsState.runId}`); + updateAgentMetrics(status.metrics || {}, status.lastHeartbeatAt); + } else { + agentsState.status = 'idle'; + updateAgentStatusUi('idle', 'No active run'); + } +} + +async function startAgentsMonitoring() { + const config = getAgentsConfigFromUI(); + const result = await window.api.agentsStart(config); + if (!result.success) { + updateAgentStatusUi('error', result.error || 'Failed to start monitoring'); + return; + } + agentsState.runId = result.runId; + agentsState.status = 'running'; + updateAgentStatusUi('running', `Run ${result.runId}`); + await loadAgentEvents(); +} + +async function stopAgentsMonitoring() { + const result = await window.api.agentsStop(agentsState.runId || undefined); + if (!result.success) { + updateAgentStatusUi('error', result.error || 'Failed to stop monitoring'); + return; + } + agentsState.status = 'stopped'; + updateAgentStatusUi('stopped', 'Monitoring stopped'); +} + +async function acknowledgeSelectedAgentEvent() { + if (!agentsState.selectedEventId) return; + const result = await window.api.agentsAckEvent(agentsState.selectedEventId, ''); + if (!result.success) return; + await loadAgentEvents(); + const selected = agentsState.events.find((e) => e.event_id === agentsState.selectedEventId); + renderAgentEventDetails(selected || null); +} + +function upsertRealtimeAgentEvent(payload) { + if (!payload || !payload.eventId) return; + const idx = agentsState.events.findIndex((e) => e.event_id === payload.eventId); + const next = { + event_id: payload.eventId, + severity: payload.severity || 'medium', + summary: payload.summary || 'Anomaly detected', + category: payload.category || 'deviation', + created_at: payload.createdAt || new Date().toISOString(), + source_tag: payload.entityRefs?.sourceTag || payload.entityRefs?.tag || '', + tag_name: payload.entityRefs?.tag || '', + state: 'open', + }; + if (idx >= 0) { + agentsState.events[idx] = { ...agentsState.events[idx], ...next }; + } else { + agentsState.events.unshift(next); + } + renderAgentEventList(); +} + +function ensureAgentListeners() { + if (agentsState.listenersReady) return; + agentsState.listenersReady = true; + + window.api.onAgentStatus((payload) => { + if (!payload) return; + if (payload.runId) agentsState.runId = payload.runId; + agentsState.status = payload.state || agentsState.status; + updateAgentStatusUi(agentsState.status, `Run ${agentsState.runId || 'n/a'}`); + updateAgentMetrics(payload, payload.timestamp); + }); + + window.api.onAgentEvent((payload) => { + upsertRealtimeAgentEvent(payload); + }); + + window.api.onAgentError((payload) => { + if (!payload) return; + updateAgentStatusUi('error', payload.message || 'Agent runtime error'); + }); + + window.api.onAgentComplete((payload) => { + if (!payload) return; + agentsState.status = payload.success ? 'stopped' : 'failed'; + updateAgentStatusUi(agentsState.status, payload.reason || 'Run complete'); + refreshAgentStatus(); + }); +} + +function initAgentsTab() { + ensureAgentListeners(); + const el = getAgentsElements(); + if (!el.btnStart) return; + if (!el.btnStart.dataset.bound) { + el.btnStart.dataset.bound = '1'; + el.btnStart.addEventListener('click', startAgentsMonitoring); + el.btnStop?.addEventListener('click', stopAgentsMonitoring); + el.btnRefresh?.addEventListener('click', loadAgentEvents); + el.btnCleanup?.addEventListener('click', async () => { + await window.api.agentsCleanup(14); + await loadAgentEvents(); + }); + el.btnAck?.addEventListener('click', acknowledgeSelectedAgentEvent); + el.btnOpenGraph?.addEventListener('click', () => { + const event = agentsState.events.find((e) => e.event_id === agentsState.selectedEventId); + if (!event) return; + const target = resolveAgentGraphTarget(event); + if (!target) return; + openGraphModal(target.name, target.type, event.summary || target.name); + }); + el.filterState?.addEventListener('change', loadAgentEvents); + el.filterSeverity?.addEventListener('change', loadAgentEvents); + el.filterSearch?.addEventListener('input', renderAgentEventList); + } + refreshAgentStatus(); + loadAgentEvents(); +} // Initialize graph tab when it's first shown navButtons.forEach(btn => { @@ -3558,6 +3933,9 @@ navButtons.forEach(btn => { loadSettings(); loadDbConnections(); } + if (btn.dataset.tab === 'agents') { + setTimeout(initAgentsTab, 100); + } }); }); @@ -3569,5 +3947,6 @@ setTimeout(() => { loadTiaProjects(); loadSettings(); loadDbConnections(); + ensureAgentListeners(); }, 500); diff --git a/electron-ui/styles.css b/electron-ui/styles.css index 5ba9186..f1e066e 100644 --- a/electron-ui/styles.css +++ b/electron-ui/styles.css @@ -2979,3 +2979,278 @@ select.input, .connection-status .status-dot { flex-shrink: 0; } + +/* ============================================ + AGENTS TAB + ============================================ */ + +.agents-topbar { + display: flex; + justify-content: space-between; + align-items: center; + gap: var(--space-4); + margin-bottom: var(--space-3); + flex-wrap: wrap; +} + +.agents-run-controls { + display: flex; + gap: var(--space-2); + flex-wrap: wrap; +} + +.agents-run-status { + display: flex; + align-items: center; + gap: var(--space-2); + color: var(--color-text-secondary); + font-size: var(--text-sm); +} + +.status-chip { + display: inline-flex; + align-items: center; + justify-content: center; + padding: 2px 8px; + border-radius: 999px; + border: 1px solid var(--color-border); + background: var(--color-bg-panel-2); + color: var(--color-text-secondary); + font-size: var(--text-xs); + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.4px; +} + +.status-chip.running { + color: var(--color-success); + border-color: rgba(34, 197, 94, 0.35); + background: rgba(34, 197, 94, 0.12); +} + +.status-chip.error { + color: var(--color-danger); + border-color: rgba(239, 68, 68, 0.35); + background: rgba(239, 68, 68, 0.12); +} + +.agents-config-row { + display: grid; + grid-template-columns: repeat(14, minmax(0, 1fr)); + gap: var(--space-2); + margin-bottom: var(--space-4); + align-items: center; +} + +.agents-config-row label { + font-size: var(--text-xs); + color: var(--color-text-secondary); + text-transform: uppercase; + letter-spacing: 0.35px; +} + +.agents-config-row .input { + min-width: 0; +} + +.agents-metrics-row { + display: grid; + grid-template-columns: repeat(5, minmax(0, 1fr)); + gap: var(--space-2); + margin-bottom: var(--space-4); +} + +.metric-card { + border: 1px solid var(--color-border); + background: var(--color-bg-panel); + border-radius: var(--radius-md); + padding: var(--space-2) var(--space-3); + display: flex; + flex-direction: column; + gap: 2px; +} + +.metric-label { + font-size: var(--text-xs); + color: var(--color-text-muted); +} + +.metric-value { + font-family: var(--font-mono); + font-size: var(--text-sm); + color: var(--color-text); +} + +.agents-main { + display: grid; + grid-template-columns: minmax(300px, 38%) minmax(0, 1fr); + gap: var(--space-3); + min-height: 480px; +} + +.agents-feed-panel, +.agents-detail-panel { + border: 1px solid var(--color-border); + background: var(--color-bg-panel); + border-radius: var(--radius-lg); + overflow: hidden; + display: flex; + flex-direction: column; +} + +.agents-feed-header, +.agents-detail-header { + padding: var(--space-3); + border-bottom: 1px solid var(--color-border-subtle); + display: flex; + justify-content: space-between; + align-items: center; + gap: var(--space-2); +} + +.agents-feed-header h3, +.agents-detail-header h3 { + font-size: var(--text-md); + font-weight: 600; +} + +.agents-feed-filters { + display: flex; + gap: var(--space-2); + flex-wrap: wrap; +} + +.agents-feed-filters .input { + min-width: 120px; +} + +.agents-event-list { + overflow-y: auto; + padding: var(--space-2); + display: flex; + flex-direction: column; + gap: var(--space-2); + flex: 1; +} + +.agents-empty { + color: var(--color-text-muted); + font-size: var(--text-sm); + padding: var(--space-4); + text-align: center; +} + +.agents-event-card { + border: 1px solid var(--color-border); + background: var(--color-bg-panel-2); + border-radius: var(--radius-md); + padding: var(--space-2) var(--space-3); + cursor: pointer; + transition: border-color var(--transition-fast), transform var(--transition-fast); +} + +.agents-event-card:hover { + border-color: var(--color-border-active); + transform: translateY(-1px); +} + +.agents-event-card.active { + border-color: var(--color-accent); + box-shadow: 0 0 0 1px rgba(34, 211, 238, 0.35) inset; +} + +.agents-event-line-top { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 4px; + gap: var(--space-2); +} + +.agents-severity { + font-size: var(--text-xs); + text-transform: uppercase; + letter-spacing: 0.4px; + padding: 2px 6px; + border-radius: 999px; + border: 1px solid transparent; +} + +.agents-severity.sev-critical { + color: #fecaca; + background: rgba(239, 68, 68, 0.2); + border-color: rgba(239, 68, 68, 0.4); +} + +.agents-severity.sev-high { + color: #fdba74; + background: rgba(249, 115, 22, 0.18); + border-color: rgba(249, 115, 22, 0.35); +} + +.agents-severity.sev-medium { + color: #fde68a; + background: rgba(245, 158, 11, 0.15); + border-color: rgba(245, 158, 11, 0.35); +} + +.agents-severity.sev-low { + color: #bfdbfe; + background: rgba(59, 130, 246, 0.15); + border-color: rgba(59, 130, 246, 0.35); +} + +.agents-event-time { + font-size: var(--text-xs); + color: var(--color-text-muted); + font-family: var(--font-mono); +} + +.agents-event-summary { + font-size: var(--text-sm); + color: var(--color-text); + margin-bottom: 4px; +} + +.agents-event-meta { + font-size: var(--text-xs); + color: var(--color-text-muted); +} + +.agents-detail-content { + padding: var(--space-3); + overflow-y: auto; + font-size: var(--text-sm); + display: flex; + flex-direction: column; + gap: var(--space-3); +} + +.agents-detail-grid { + display: grid; + grid-template-columns: 1fr 1fr; + gap: var(--space-2) var(--space-3); +} + +.agents-detail-item { + display: flex; + flex-direction: column; + gap: 2px; +} + +.agents-detail-label { + font-size: var(--text-xs); + color: var(--color-text-muted); + text-transform: uppercase; + letter-spacing: 0.3px; +} + +.agents-detail-value { + font-family: var(--font-mono); + color: var(--color-text); +} + +.agents-list { + margin-left: var(--space-4); + color: var(--color-text-secondary); +} diff --git a/scripts/anomaly_monitor.py b/scripts/anomaly_monitor.py new file mode 100644 index 0000000..70a0f4b --- /dev/null +++ b/scripts/anomaly_monitor.py @@ -0,0 +1,912 @@ +#!/usr/bin/env python3 +""" +Long-running anomaly monitor worker. + +Modes: + - run: start continuous monitoring loop + - status: get run status + - list-events: list persisted anomaly events + - get-event: fetch one anomaly event + - ack-event: mark event as acknowledged + - cleanup: delete old events by retention policy + - replay-fixtures: run deterministic fixture validation +""" + +from __future__ import annotations + +import argparse +import json +import os +import signal +import sys +import time +import uuid +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +try: + from dotenv import load_dotenv +except ImportError: # pragma: no cover - optional fallback for minimal environments + def load_dotenv(*_args, **_kwargs): + return False + +from anomaly_rules import ( + compute_deviation_scores, + dedup_key, + is_quality_good, + is_stale, + parse_timestamp, + safe_float, +) + + +load_dotenv() + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def emit(prefix: str, payload: Dict[str, Any]) -> None: + """Emit machine-parseable messages for Electron main process.""" + print(f"[{prefix}] {json.dumps(payload, default=str)}", flush=True) + + +def merge_defaults(config: Optional[Dict[str, Any]]) -> Dict[str, Any]: + raw = dict(config or {}) + thresholds = raw.get("thresholds", {}) if isinstance(raw.get("thresholds"), dict) else {} + defaults = { + "pollIntervalMs": 15000, + "historyWindowMinutes": 360, + "minHistoryPoints": 30, + "maxMonitoredTags": 200, + "maxCandidatesPerCycle": 25, + "maxLlmTriagesPerCycle": 5, + "dedupCooldownMinutes": 10, + "retentionDays": 14, + "cleanupEveryCycles": 40, + "runMode": "live", + "scope": { + "project": None, + "equipmentTags": [], + "tagRegex": None, + }, + "thresholds": { + "z": 3.0, + "mad": 3.5, + "rate": 0.0, + "stalenessSec": 120, + "flatline_std_epsilon": 1e-6, + "stuck_window_size": 20, + }, + } + cfg = defaults + cfg.update({k: v for k, v in raw.items() if k in defaults and k != "thresholds"}) + cfg["thresholds"].update({k: v for k, v in thresholds.items() if v is not None}) + if isinstance(raw.get("scope"), dict): + cfg["scope"].update(raw["scope"]) + return cfg + + +class AnomalyMonitor: + def __init__(self, config: Dict[str, Any], run_id: Optional[str] = None): + self.config = merge_defaults(config) + self.run_id = run_id or f"agent-run-{uuid.uuid4()}" + from ignition_api_client import IgnitionApiClient + from neo4j_ontology import get_ontology_graph + + self.graph = get_ontology_graph() + + self.api = IgnitionApiClient( + base_url=self.config.get("ignitionApiUrl") or os.getenv("IGNITION_API_URL"), + api_token=self.config.get("ignitionApiToken") or os.getenv("IGNITION_API_TOKEN"), + timeout=15.0, + ) + + self.llm = None + self._llm_enabled = bool(os.getenv("ANTHROPIC_API_KEY")) + if self._llm_enabled: + try: + from claude_client import ClaudeClient + + self.llm = ClaudeClient( + enable_tools=False, + ignition_api_url=self.config.get("ignitionApiUrl"), + ignition_api_token=self.config.get("ignitionApiToken"), + ) + except Exception as exc: + self._llm_enabled = False + emit("AGENT_ERROR", { + "runId": self.run_id, + "code": "llm_init_failed", + "message": str(exc), + "recoverable": True, + "timestamp": utc_now_iso(), + }) + + self._running = True + self._cycle_count = 0 + self._prev_values: Dict[str, float] = {} + + # ----------------------------- + # Schema / run lifecycle + # ----------------------------- + def init_schema(self) -> None: + self.graph.init_agent_monitoring_schema() + + def upsert_run(self, status: str, reason: Optional[str] = None) -> None: + with self.graph.session() as session: + session.run( + """ + MERGE (r:AgentRun {run_id: $run_id}) + SET r.status = $status, + r.updated_at = datetime(), + r.last_heartbeat_at = datetime(), + r.config_json = $config_json, + r.cycle_count = $cycle_count, + r.started_at = coalesce(r.started_at, datetime()), + r.stopped_at = CASE WHEN $status IN ['stopped', 'failed'] THEN datetime() ELSE r.stopped_at END, + r.stop_reason = CASE WHEN $reason IS NULL THEN r.stop_reason ELSE $reason END + """, + run_id=self.run_id, + status=status, + config_json=json.dumps(self.config, default=str), + cycle_count=self._cycle_count, + reason=reason, + ) + + def heartbeat(self, metrics: Dict[str, Any]) -> None: + with self.graph.session() as session: + session.run( + """ + MATCH (r:AgentRun {run_id: $run_id}) + SET r.last_heartbeat_at = datetime(), + r.cycle_count = $cycle_count, + r.last_cycle_ms = $cycle_ms, + r.last_candidates = $candidates, + r.last_triaged = $triaged, + r.last_emitted = $emitted + """, + run_id=self.run_id, + cycle_count=self._cycle_count, + cycle_ms=metrics.get("cycleMs", 0), + candidates=metrics.get("candidates", 0), + triaged=metrics.get("triaged", 0), + emitted=metrics.get("emitted", 0), + ) + + # ----------------------------- + # Tag and context collection + # ----------------------------- + def get_monitored_tags(self) -> List[Dict[str, str]]: + max_tags = int(self.config.get("maxMonitoredTags", 200)) + scope = self.config.get("scope", {}) + tag_regex = scope.get("tagRegex") + equipment_tags = set(scope.get("equipmentTags") or []) + + with self.graph.session() as session: + result = session.run( + """ + MATCH (t:ScadaTag) + WHERE coalesce(t.opc_item_path, t.name) IS NOT NULL + AND coalesce(t.opc_item_path, t.name) <> '' + RETURN DISTINCT coalesce(t.opc_item_path, t.name) AS tag_path, + coalesce(t.name, t.opc_item_path) AS tag_name + LIMIT $limit + """, + limit=max_tags * 3, + ) + tags = [{"path": r["tag_path"], "name": r["tag_name"]} for r in result if r["tag_path"]] + + if tag_regex: + import re + try: + pattern = re.compile(tag_regex, re.IGNORECASE) + tags = [t for t in tags if pattern.search(t["path"]) or pattern.search(t["name"])] + except re.error: + emit("AGENT_ERROR", { + "runId": self.run_id, + "code": "invalid_tag_regex", + "message": f"Invalid regex: {tag_regex}", + "recoverable": True, + "timestamp": utc_now_iso(), + }) + + if equipment_tags: + tags = [t for t in tags if t["name"] in equipment_tags or t["path"] in equipment_tags] + + return tags[:max_tags] + + def _extract_history_values(self, history_data: Any, tag_path: str) -> List[float]: + """Normalize multiple gateway response shapes to numeric values list.""" + values: List[float] = [] + if history_data is None: + return values + if isinstance(history_data, dict) and history_data.get("error"): + return values + + rows: List[Any] = [] + if isinstance(history_data, list): + rows = history_data + elif isinstance(history_data, dict): + for key in ("rows", "data", "results", "values", "history"): + chunk = history_data.get(key) + if isinstance(chunk, list): + rows = chunk + break + if not rows and "tagHistory" in history_data and isinstance(history_data["tagHistory"], list): + rows = history_data["tagHistory"] + + for row in rows: + if isinstance(row, (int, float, str)): + val = safe_float(row) + if val is not None: + values.append(val) + continue + if not isinstance(row, dict): + continue + candidate = None + if "value" in row: + candidate = row.get("value") + elif tag_path in row: + candidate = row.get(tag_path) + else: + # Wide format often has timestamp + one tag column. + for k, v in row.items(): + if k.lower() in {"timestamp", "ts", "t", "time"}: + continue + candidate = v + break + val = safe_float(candidate) + if val is not None: + values.append(val) + return values + + def fetch_history_values(self, tag_path: str) -> List[float]: + minutes = int(self.config.get("historyWindowMinutes", 360)) + end_dt = datetime.now(timezone.utc) + start_dt = end_dt - timedelta(minutes=minutes) + data = self.api.query_tag_history( + [tag_path], + start_dt.isoformat(), + end_dt.isoformat(), + return_size=max(100, int(self.config.get("minHistoryPoints", 30)) * 4), + aggregation_mode="Average", + return_format="Wide", + ) + return self._extract_history_values(data, tag_path) + + def get_context(self, tag_path: str) -> Dict[str, Any]: + with self.graph.session() as session: + result = session.run( + """ + MATCH (t:ScadaTag) + WHERE t.name = $tag OR t.opc_item_path = $tag + OPTIONAL MATCH (eq:Equipment)-[*1..2]-(t) + OPTIONAL MATCH (eq)-[:HAS_SYMPTOM]->(s:FaultSymptom) + OPTIONAL MATCH (s)-[:CAUSED_BY]->(c:FaultCause) + OPTIONAL MATCH (eq)-[:HAS_PATTERN]->(p:ControlPattern) + OPTIONAL MATCH (eq)-[:SAFETY_CRITICAL]->(se:SafetyElement) + RETURN t, + collect(DISTINCT eq.name) AS equipment, + collect(DISTINCT s.symptom) AS symptoms, + collect(DISTINCT c.cause) AS causes, + collect(DISTINCT p.pattern_name) AS patterns, + collect(DISTINCT se.name) AS safety + LIMIT 1 + """, + tag=tag_path, + ) + record = result.single() + if not record: + return { + "tag_path": tag_path, + "equipment": [], + "symptoms": [], + "causes": [], + "patterns": [], + "safety": [], + } + node = record["t"] + return { + "tag_path": tag_path, + "tag_name": node.get("name") if node else tag_path, + "equipment": [x for x in record["equipment"] if x], + "symptoms": [x for x in record["symptoms"] if x], + "causes": [x for x in record["causes"] if x], + "patterns": [x for x in record["patterns"] if x], + "safety": [x for x in record["safety"] if x], + } + + # ----------------------------- + # Triage and persistence + # ----------------------------- + def run_llm_triage( + self, + context: Dict[str, Any], + deterministic: Dict[str, Any], + live_sample: Dict[str, Any], + ) -> Dict[str, Any]: + fallback = { + "summary": f"Deterministic anomaly on {context.get('tag_name', context['tag_path'])}", + "category": deterministic.get("category", "deviation"), + "severity": "medium", + "confidence": 0.55, + "probable_causes": ["Signal deviates from historical baseline."], + "verification_checks": [ + f"Check live quality/timestamp for {context.get('tag_path')}", + "Inspect upstream interlocks and communication health.", + ], + "safety_notes": context.get("safety", []), + "rationale": "LLM triage unavailable; using deterministic fallback.", + "related_entities": [ + {"label": "Equipment", "name": e} for e in context.get("equipment", [])[:3] + ], + } + if not self.llm: + return fallback + + system_prompt = ( + "You are an industrial anomaly triage assistant. " + "Return ONLY valid JSON with keys: summary, category, severity, confidence, " + "probable_causes, verification_checks, safety_notes, rationale, related_entities. " + "Severity must be one of critical/high/medium/low. " + "Category must be one of spike/drift/stuck/state-conflict/quality-issue/deviation. " + "related_entities is a list of objects: {label,name}." + ) + user_prompt = json.dumps( + { + "context": context, + "deterministic": deterministic, + "live_sample": live_sample, + }, + default=str, + ) + try: + result = self.llm.query_json( + system_prompt=system_prompt, + user_prompt=user_prompt, + max_tokens=900, + use_tools=False, + ) + data = result.get("data") + if not isinstance(data, dict): + return fallback + merged = dict(fallback) + merged.update({k: v for k, v in data.items() if v is not None}) + return merged + except Exception as exc: + emit("AGENT_ERROR", { + "runId": self.run_id, + "code": "llm_triage_failed", + "message": str(exc), + "recoverable": True, + "timestamp": utc_now_iso(), + }) + return fallback + + def _severity_from_scores(self, deterministic: Dict[str, Any], llm_out: Dict[str, Any]) -> str: + sev = str(llm_out.get("severity", "")).lower() + if sev in {"critical", "high", "medium", "low"}: + return sev + z = abs(float(deterministic.get("z_score", 0.0))) + if z >= 8: + return "critical" + if z >= 5: + return "high" + if z >= 3: + return "medium" + return "low" + + def is_duplicate_recent(self, dedup_sig: str) -> bool: + cooldown = max(1, int(self.config.get("dedupCooldownMinutes", 10))) + with self.graph.session() as session: + result = session.run( + """ + MATCH (e:AnomalyEvent {dedup_key: $dedup_key}) + WHERE e.created_at IS NOT NULL + AND datetime(e.created_at) > datetime() - duration({minutes: $minutes}) + RETURN count(e) AS cnt + """, + dedup_key=dedup_sig, + minutes=cooldown, + ) + row = result.single() + return bool(row and row["cnt"] > 0) + + def persist_event( + self, + context: Dict[str, Any], + deterministic: Dict[str, Any], + live_sample: Dict[str, Any], + triage: Dict[str, Any], + ) -> Optional[Dict[str, Any]]: + category = triage.get("category") or deterministic.get("category", "deviation") + dedup_sig = dedup_key(context["tag_path"], category, int(self.config.get("dedupCooldownMinutes", 10))) + if self.is_duplicate_recent(dedup_sig): + return None + + event_id = f"ae-{uuid.uuid4()}" + severity = self._severity_from_scores(deterministic, triage) + confidence = float(max(0.0, min(1.0, triage.get("confidence", 0.5)))) + event_data = { + "event_id": event_id, + "run_id": self.run_id, + "event_schema_version": 1, + "state": "open", + "severity": severity, + "confidence": confidence, + "category": category, + "summary": triage.get("summary", f"Anomaly on {context['tag_path']}"), + "explanation": triage.get("rationale", ""), + "recommended_checks_json": json.dumps(triage.get("verification_checks", []), default=str), + "probable_causes_json": json.dumps(triage.get("probable_causes", []), default=str), + "safety_notes_json": json.dumps(triage.get("safety_notes", []), default=str), + "deterministic_reasons_json": json.dumps(deterministic.get("reasons", []), default=str), + "z_score": float(deterministic.get("z_score", 0.0)), + "mad_score": float(deterministic.get("mad_score", 0.0)), + "delta_rate": float(deterministic.get("delta_rate", 0.0)), + "window_volatility": float(deterministic.get("window_volatility", 0.0)), + "source_tag": context["tag_path"], + "tag_name": context.get("tag_name") or context["tag_path"], + "live_quality": live_sample.get("quality"), + "live_timestamp": live_sample.get("timestamp"), + "live_value": str(live_sample.get("value")), + "dedup_key": dedup_sig, + "created_at": utc_now_iso(), + "updated_at": utc_now_iso(), + } + + with self.graph.session() as session: + session.run( + """ + MATCH (r:AgentRun {run_id: $run_id}) + CREATE (e:AnomalyEvent $props) + MERGE (r)-[:EMITTED]->(e) + """, + run_id=self.run_id, + props=event_data, + ) + + session.run( + """ + MATCH (e:AnomalyEvent {event_id: $event_id}) + MATCH (t:ScadaTag) + WHERE t.name = $tag OR t.opc_item_path = $tag + MERGE (e)-[:OBSERVED_ON]->(t) + """, + event_id=event_id, + tag=context["tag_path"], + ) + + for equipment_name in context.get("equipment", [])[:5]: + session.run( + """ + MATCH (e:AnomalyEvent {event_id: $event_id}) + MATCH (eq:Equipment {name: $name}) + MERGE (e)-[:AFFECTS]->(eq) + """, + event_id=event_id, + name=equipment_name, + ) + + related_inputs: List[Dict[str, str]] = [] + for item in triage.get("related_entities", []) or []: + if isinstance(item, dict) and item.get("label") and item.get("name"): + related_inputs.append({"label": str(item["label"]), "name": str(item["name"])}) + for name in context.get("symptoms", [])[:3]: + related_inputs.append({"label": "FaultSymptom", "name": name}) + for name in context.get("causes", [])[:3]: + related_inputs.append({"label": "FaultCause", "name": name}) + + for rel in related_inputs[:8]: + label = rel["label"] + if label not in {"FaultSymptom", "FaultCause", "ControlPattern", "SafetyElement", "Equipment", "ScadaTag"}: + continue + session.run( + f""" + MATCH (e:AnomalyEvent {{event_id: $event_id}}) + MATCH (n:{label}) + WHERE n.name = $name OR n.symptom = $name OR n.cause = $name + MERGE (e)-[:RELATED_TO]->(n) + """, + event_id=event_id, + name=rel["name"], + ) + + return event_data + + # ----------------------------- + # Monitoring loop + # ----------------------------- + def run_cycle(self) -> Dict[str, Any]: + cycle_start = time.time() + metrics = {"candidates": 0, "triaged": 0, "emitted": 0, "cycleMs": 0} + thresholds = self.config.get("thresholds", {}) + min_history = int(self.config.get("minHistoryPoints", 30)) + + if not self.api.is_configured: + emit("AGENT_ERROR", { + "runId": self.run_id, + "code": "ignition_not_configured", + "message": "Ignition API URL/token not configured.", + "recoverable": True, + "timestamp": utc_now_iso(), + }) + metrics["cycleMs"] = int((time.time() - cycle_start) * 1000) + return metrics + + tags = self.get_monitored_tags() + if not tags: + emit("AGENT_ERROR", { + "runId": self.run_id, + "code": "no_tags_found", + "message": "No ScadaTag nodes with readable tag paths found.", + "recoverable": True, + "timestamp": utc_now_iso(), + }) + metrics["cycleMs"] = int((time.time() - cycle_start) * 1000) + return metrics + + tag_paths = [t["path"] for t in tags] + live_values = self.api.read_tags(tag_paths) + candidates: List[Dict[str, Any]] = [] + now = datetime.now(timezone.utc) + + for tv in live_values: + if tv.error: + continue + if not is_quality_good(tv.quality): + # quality gate: only emit quality anomalies if this persists via triage. + continue + if is_stale(tv.timestamp, int(thresholds.get("stalenessSec", 120)), now=now): + continue + + history = self.fetch_history_values(tv.path) + if len(history) < min_history: + continue + + prev_val = self._prev_values.get(tv.path) + deterministic = compute_deviation_scores( + current_value=tv.value, + history_values=history, + prev_value=prev_val, + thresholds=thresholds, + ) + curr_num = safe_float(tv.value) + if curr_num is not None: + self._prev_values[tv.path] = curr_num + + if deterministic.get("candidate"): + context = self.get_context(tv.path) + candidates.append( + { + "context": context, + "deterministic": deterministic, + "live_sample": { + "path": tv.path, + "value": tv.value, + "quality": tv.quality, + "timestamp": tv.timestamp, + "data_type": tv.data_type, + }, + } + ) + + metrics["candidates"] = len(candidates) + max_candidates = int(self.config.get("maxCandidatesPerCycle", 25)) + max_triage = int(self.config.get("maxLlmTriagesPerCycle", 5)) + shortlisted = candidates[:max_candidates] + + for idx, candidate in enumerate(shortlisted): + use_llm = idx < max_triage + triage = ( + self.run_llm_triage( + candidate["context"], + candidate["deterministic"], + candidate["live_sample"], + ) + if use_llm + else { + "summary": f"Deviation on {candidate['context'].get('tag_name', candidate['context']['tag_path'])}", + "category": candidate["deterministic"].get("category", "deviation"), + "severity": "medium", + "confidence": 0.5, + "verification_checks": [], + "probable_causes": [], + "safety_notes": [], + "rationale": "Triaged in deterministic-only mode due per-cycle LLM cap.", + "related_entities": [], + } + ) + metrics["triaged"] += 1 + persisted = self.persist_event( + candidate["context"], + candidate["deterministic"], + candidate["live_sample"], + triage, + ) + if persisted: + metrics["emitted"] += 1 + emit("AGENT_EVENT", { + "runId": self.run_id, + "eventId": persisted["event_id"], + "severity": persisted["severity"], + "summary": persisted["summary"], + "category": persisted.get("category"), + "entityRefs": { + "tag": persisted.get("tag_name") or persisted.get("source_tag"), + "sourceTag": persisted.get("source_tag"), + }, + "createdAt": persisted.get("created_at"), + }) + + metrics["cycleMs"] = int((time.time() - cycle_start) * 1000) + return metrics + + def cleanup_retention(self) -> int: + retention_days = int(self.config.get("retentionDays", 14)) + return self.graph.cleanup_anomaly_events(retention_days=retention_days) + + def run_forever(self) -> int: + self.init_schema() + self.upsert_run("running") + emit("AGENT_STATUS", { + "runId": self.run_id, + "state": "running", + "cycleMs": 0, + "candidates": 0, + "triaged": 0, + "emitted": 0, + "timestamp": utc_now_iso(), + }) + + poll_ms = int(self.config.get("pollIntervalMs", 15000)) + cleanup_every = max(1, int(self.config.get("cleanupEveryCycles", 40))) + exit_code = 0 + reason = "stopped" + + while self._running: + self._cycle_count += 1 + cycle_started = time.time() + try: + metrics = self.run_cycle() + self.heartbeat(metrics) + emit("AGENT_STATUS", { + "runId": self.run_id, + "state": "running", + "cycleMs": metrics["cycleMs"], + "candidates": metrics["candidates"], + "triaged": metrics["triaged"], + "emitted": metrics["emitted"], + "timestamp": utc_now_iso(), + }) + if self._cycle_count % cleanup_every == 0: + deleted = self.cleanup_retention() + if deleted > 0: + emit("AGENT_STATUS", { + "runId": self.run_id, + "state": "retention_cleanup", + "cycleMs": 0, + "candidates": 0, + "triaged": 0, + "emitted": deleted, + "timestamp": utc_now_iso(), + }) + except Exception as exc: + reason = "failed" + exit_code = 1 + emit("AGENT_ERROR", { + "runId": self.run_id, + "code": "cycle_error", + "message": str(exc), + "recoverable": True, + "timestamp": utc_now_iso(), + }) + + elapsed_ms = int((time.time() - cycle_started) * 1000) + remaining = max(0, poll_ms - elapsed_ms) / 1000.0 + if remaining > 0: + time.sleep(remaining) + + self.upsert_run("stopped" if reason != "failed" else "failed", reason=reason) + emit("AGENT_COMPLETE", { + "runId": self.run_id, + "success": exit_code == 0, + "reason": reason, + "stoppedAt": utc_now_iso(), + }) + return exit_code + + # ----------------------------- + # Single-operation helpers + # ----------------------------- + def list_events(self, limit: int, state: Optional[str], severity: Optional[str], run_id: Optional[str]) -> Dict[str, Any]: + events = self.graph.list_anomaly_events(limit=limit, state=state, severity=severity, run_id=run_id) + return {"success": True, "events": events} + + def get_event(self, event_id: str) -> Dict[str, Any]: + event = self.graph.get_anomaly_event(event_id) + if not event: + return {"success": False, "error": f"Event not found: {event_id}"} + return {"success": True, "event": event} + + def ack_event(self, event_id: str, note: Optional[str]) -> Dict[str, Any]: + with self.graph.session() as session: + result = session.run( + """ + MATCH (e:AnomalyEvent {event_id: $event_id}) + SET e.state = 'acknowledged', + e.acknowledged_at = datetime(), + e.ack_note = $note, + e.updated_at = datetime() + RETURN count(e) AS cnt + """, + event_id=event_id, + note=note or "", + ) + record = result.single() + if not record or record["cnt"] == 0: + return {"success": False, "error": f"Event not found: {event_id}"} + return {"success": True, "eventId": event_id} + + def get_status(self, run_id: str) -> Dict[str, Any]: + with self.graph.session() as session: + result = session.run( + """ + MATCH (r:AgentRun {run_id: $run_id}) + RETURN r + LIMIT 1 + """, + run_id=run_id, + ) + row = result.single() + if not row: + return {"success": False, "error": f"Run not found: {run_id}"} + props = dict(row["r"]) + return { + "success": True, + "status": props.get("status"), + "metrics": { + "cycleCount": props.get("cycle_count", 0), + "lastCycleMs": props.get("last_cycle_ms", 0), + "lastCandidates": props.get("last_candidates", 0), + "lastTriaged": props.get("last_triaged", 0), + "lastEmitted": props.get("last_emitted", 0), + }, + "lastHeartbeatAt": props.get("last_heartbeat_at"), + "run": props, + } + + +def _load_fixture_cases(path: Path) -> List[Dict[str, Any]]: + data = json.loads(path.read_text(encoding="utf-8")) + if isinstance(data, dict): + return data.get("cases", []) + if isinstance(data, list): + return data + return [] + + +def replay_fixtures(config_json: Optional[str], fixture_path: str) -> Dict[str, Any]: + config = merge_defaults(json.loads(config_json) if config_json else {}) + path = Path(fixture_path) + cases = _load_fixture_cases(path) + thresholds = config.get("thresholds", {}) + passed = 0 + failures: List[Dict[str, Any]] = [] + + for case in cases: + result = compute_deviation_scores( + current_value=case.get("current_value"), + history_values=case.get("history_values", []), + prev_value=case.get("prev_value"), + thresholds=thresholds, + ) + expected = bool(case.get("expected_candidate", False)) + if result.get("candidate") == expected: + passed += 1 + else: + failures.append( + { + "id": case.get("id"), + "expected_candidate": expected, + "actual_candidate": result.get("candidate"), + "category": result.get("category"), + "reasons": result.get("reasons", []), + } + ) + + return { + "success": len(failures) == 0, + "total": len(cases), + "passed": passed, + "failed": len(failures), + "failures": failures, + } + + +def main() -> int: + parser = argparse.ArgumentParser(description="Anomaly monitor worker") + sub = parser.add_subparsers(dest="command", required=True) + + p_run = sub.add_parser("run", help="Run continuous anomaly monitoring") + p_run.add_argument("--run-id", help="Optional run id") + p_run.add_argument("--config-json", default="{}", help="JSON config string") + + p_status = sub.add_parser("status", help="Get status for one run") + p_status.add_argument("--run-id", required=True) + + p_list = sub.add_parser("list-events", help="List anomaly events") + p_list.add_argument("--limit", type=int, default=100) + p_list.add_argument("--state") + p_list.add_argument("--severity") + p_list.add_argument("--run-id") + + p_get = sub.add_parser("get-event", help="Get one anomaly event") + p_get.add_argument("--event-id", required=True) + + p_ack = sub.add_parser("ack-event", help="Acknowledge one anomaly event") + p_ack.add_argument("--event-id", required=True) + p_ack.add_argument("--note") + + p_cleanup = sub.add_parser("cleanup", help="Delete old anomaly events") + p_cleanup.add_argument("--retention-days", type=int, default=14) + + p_replay = sub.add_parser("replay-fixtures", help="Validate deterministic scoring against fixtures") + p_replay.add_argument("--fixture-file", required=True) + p_replay.add_argument("--config-json", default="{}") + + args = parser.parse_args() + + if args.command == "replay-fixtures": + result = replay_fixtures(args.config_json, args.fixture_file) + print(json.dumps(result)) + return 0 if result["success"] else 1 + + try: + monitor = AnomalyMonitor( + config=json.loads(getattr(args, "config_json", "{}") or "{}"), + run_id=getattr(args, "run_id", None), + ) + except Exception as exc: + print(json.dumps({"success": False, "error": str(exc)})) + return 1 + + if args.command == "run": + def _signal_handler(_signum, _frame): + monitor._running = False + + signal.signal(signal.SIGTERM, _signal_handler) + if hasattr(signal, "SIGINT"): + signal.signal(signal.SIGINT, _signal_handler) + return monitor.run_forever() + + if args.command == "status": + print(json.dumps(monitor.get_status(args.run_id), default=str)) + return 0 + + if args.command == "list-events": + print(json.dumps(monitor.list_events(args.limit, args.state, args.severity, args.run_id), default=str)) + return 0 + + if args.command == "get-event": + print(json.dumps(monitor.get_event(args.event_id), default=str)) + return 0 + + if args.command == "ack-event": + print(json.dumps(monitor.ack_event(args.event_id, args.note), default=str)) + return 0 + + if args.command == "cleanup": + deleted = monitor.graph.cleanup_anomaly_events(args.retention_days) + print(json.dumps({"success": True, "deleted": deleted})) + return 0 + + return 1 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/scripts/anomaly_rules.py b/scripts/anomaly_rules.py new file mode 100644 index 0000000..2aa274d --- /dev/null +++ b/scripts/anomaly_rules.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python3 +""" +Deterministic anomaly scoring primitives for monitoring agents. + +This module intentionally avoids external dependencies so it can run in +packaged/offline environments. +""" + +from __future__ import annotations + +import hashlib +import math +from datetime import datetime, timezone +from statistics import mean, median, pstdev +from typing import Any, Dict, List, Optional + + +def safe_float(value: Any) -> Optional[float]: + """Best-effort conversion to float.""" + if value is None: + return None + if isinstance(value, bool): + return float(value) + if isinstance(value, (int, float)): + if math.isnan(value) or math.isinf(value): + return None + return float(value) + text = str(value).strip() + if not text: + return None + try: + result = float(text) + except ValueError: + return None + if math.isnan(result) or math.isinf(result): + return None + return result + + +def parse_timestamp(ts: Optional[str]) -> Optional[datetime]: + """Parse an ISO-like timestamp to UTC-aware datetime.""" + if not ts: + return None + text = str(ts).strip() + if not text: + return None + if text.endswith("Z"): + text = text[:-1] + "+00:00" + try: + dt = datetime.fromisoformat(text) + except ValueError: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def is_quality_good(quality: Optional[str]) -> bool: + """Conservative quality gate.""" + if quality is None: + return False + q = str(quality).strip().lower() + if not q: + return False + if "good" in q or "ok" in q or q in {"192"}: + return True + return False + + +def is_stale(timestamp: Optional[str], staleness_sec: int, now: Optional[datetime] = None) -> bool: + """Return True if sample timestamp is stale or invalid.""" + if staleness_sec <= 0: + return False + parsed = parse_timestamp(timestamp) + if parsed is None: + return True + baseline = now or datetime.now(timezone.utc) + age = (baseline - parsed).total_seconds() + return age > staleness_sec + + +def _mad(values: List[float]) -> float: + """Median absolute deviation.""" + if not values: + return 0.0 + med = median(values) + abs_dev = [abs(v - med) for v in values] + return median(abs_dev) if abs_dev else 0.0 + + +def _percentile_rank(values: List[float], current: float) -> float: + """Approximate percentile rank of current within values.""" + if not values: + return 0.0 + less_or_equal = sum(1 for v in values if v <= current) + return less_or_equal / len(values) + + +def compute_deviation_scores( + current_value: Any, + history_values: List[Any], + prev_value: Any = None, + thresholds: Optional[Dict[str, float]] = None, +) -> Dict[str, Any]: + """ + Compute deterministic anomaly scores and candidate flags. + + Threshold defaults are intentionally conservative and should be configured + per process during rollout. + """ + cfg = { + "z": 3.0, + "mad": 3.5, + "rate": 0.0, + "flatline_std_epsilon": 1e-6, + "stuck_window_size": 20, + } + if thresholds: + cfg.update({k: v for k, v in thresholds.items() if v is not None}) + + current = safe_float(current_value) + hist = [v for v in (safe_float(x) for x in history_values) if v is not None] + previous = safe_float(prev_value) + + result: Dict[str, Any] = { + "candidate": False, + "reasons": [], + "category": "normal", + "z_score": 0.0, + "mad_score": 0.0, + "delta_rate": 0.0, + "window_volatility": 0.0, + "percentile_rank": 0.0, + "drift_score": 0.0, + "history_points": len(hist), + } + + if current is None: + result["category"] = "invalid_value" + result["reasons"].append("current_value_not_numeric") + return result + if not hist: + result["category"] = "insufficient_history" + result["reasons"].append("history_empty") + return result + + mu = mean(hist) + sigma = pstdev(hist) if len(hist) > 1 else 0.0 + sigma = max(sigma, 1e-9) + z_score = (current - mu) / sigma + result["z_score"] = z_score + result["window_volatility"] = sigma + result["percentile_rank"] = _percentile_rank(hist, current) + + mad = _mad(hist) + mad_denom = max(mad * 1.4826, 1e-9) + mad_score = abs(current - median(hist)) / mad_denom + result["mad_score"] = mad_score + + if previous is not None: + result["delta_rate"] = abs(current - previous) + + if abs(z_score) >= float(cfg["z"]): + result["candidate"] = True + result["reasons"].append("z_score_threshold") + if mad_score >= float(cfg["mad"]): + result["candidate"] = True + result["reasons"].append("mad_score_threshold") + if float(cfg["rate"]) > 0 and result["delta_rate"] >= float(cfg["rate"]): + result["candidate"] = True + result["reasons"].append("delta_rate_threshold") + + if len(hist) >= 20: + midpoint = len(hist) // 2 + first_half = hist[:midpoint] + second_half = hist[midpoint:] + trend_delta = abs(mean(second_half) - mean(first_half)) + trend_score = trend_delta / sigma + result["drift_score"] = trend_score + if trend_score >= 1.25 and (result["percentile_rank"] >= 0.85 or result["percentile_rank"] <= 0.15): + result["candidate"] = True + result["reasons"].append("drift_trend") + + recent = hist[-int(max(3, cfg["stuck_window_size"])) :] + recent_std = pstdev(recent) if len(recent) > 1 else 0.0 + if recent_std <= float(cfg["flatline_std_epsilon"]): + if previous is not None and abs(current - previous) <= float(cfg["flatline_std_epsilon"]): + result["candidate"] = True + result["reasons"].append("flatline_detected") + result["category"] = "stuck" + + if result["category"] == "normal" and result["candidate"]: + if "flatline_detected" in result["reasons"]: + result["category"] = "stuck" + elif result["delta_rate"] > 0 and "delta_rate_threshold" in result["reasons"]: + result["category"] = "spike" + elif "drift_trend" in result["reasons"]: + result["category"] = "drift" + elif abs(z_score) > 0 and len(hist) >= 20: + # Drift-like heuristic for sustained tail position with moderate rate + if result["percentile_rank"] >= 0.95 or result["percentile_rank"] <= 0.05: + result["category"] = "drift" + else: + result["category"] = "spike" + else: + result["category"] = "deviation" + + return result + + +def dedup_key(tag_path: str, category: str, bucket_minutes: int = 10) -> str: + """Create a deterministic dedup signature for event cooldown windows.""" + now = datetime.now(timezone.utc) + bucket = int(now.timestamp() // max(1, bucket_minutes * 60)) + raw = f"{tag_path}|{category}|{bucket}" + return hashlib.sha1(raw.encode("utf-8")).hexdigest() + diff --git a/scripts/fixtures/anomaly_replay_cases.json b/scripts/fixtures/anomaly_replay_cases.json new file mode 100644 index 0000000..544cd3f --- /dev/null +++ b/scripts/fixtures/anomaly_replay_cases.json @@ -0,0 +1,32 @@ +{ + "cases": [ + { + "id": "normal-baseline", + "current_value": 50.3, + "prev_value": 50.1, + "history_values": [49.9, 50.1, 50.0, 50.2, 50.1, 49.8, 50.3, 50.0, 49.9, 50.2, 50.1, 50.0, 49.9, 50.2, 50.1, 50.0, 50.2, 49.8, 50.0, 50.1, 50.0, 49.9, 50.1, 50.2, 50.0, 50.1, 49.9, 50.0, 50.1, 50.0], + "expected_candidate": false + }, + { + "id": "sudden-spike", + "current_value": 91.0, + "prev_value": 49.8, + "history_values": [49.9, 50.1, 50.0, 50.2, 50.1, 49.8, 50.3, 50.0, 49.9, 50.2, 50.1, 50.0, 49.9, 50.2, 50.1, 50.0, 50.2, 49.8, 50.0, 50.1, 50.0, 49.9, 50.1, 50.2, 50.0, 50.1, 49.9, 50.0, 50.1, 50.0], + "expected_candidate": true + }, + { + "id": "slow-drift-tail", + "current_value": 61.5, + "prev_value": 61.0, + "history_values": [50.0, 50.2, 50.3, 50.5, 50.7, 50.9, 51.1, 51.4, 51.8, 52.1, 52.6, 53.0, 53.5, 54.0, 54.5, 55.1, 55.6, 56.0, 56.6, 57.0, 57.5, 58.0, 58.4, 58.9, 59.4, 59.9, 60.2, 60.6, 60.9, 61.2], + "expected_candidate": true + }, + { + "id": "flatline-stuck", + "current_value": 72.0, + "prev_value": 72.0, + "history_values": [72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0, 72.0], + "expected_candidate": true + } + ] +} diff --git a/scripts/graph_api.py b/scripts/graph_api.py index 8e36e7c..e3bff45 100644 --- a/scripts/graph_api.py +++ b/scripts/graph_api.py @@ -77,6 +77,8 @@ class GraphAPI: "processdeviation": "mes", "functionallocation": "mes", "vendor": "mes", + "agentrun": "anomaly", + "anomalyevent": "anomaly", } # Color palette for node types @@ -91,6 +93,7 @@ class GraphAPI: "flows": "#E91E63", "overview": "#607D8B", "mes": "#00897B", + "anomaly": "#F44336", "other": "#9E9E9E", } @@ -252,9 +255,11 @@ def get_neighbors( WHERE center.name = $node_id OR center.name ENDS WITH $node_id OR center.name CONTAINS $node_id + OR center.event_id = $node_id + OR center.run_id = $node_id RETURN elementId(center) as id, labels(center)[0] as type, - center.name as label, + coalesce(center.name, center.event_id, center.run_id, center.symptom, center.phrase, 'unknown') as label, properties(center) as props LIMIT 1 """ @@ -264,9 +269,11 @@ def get_neighbors( WHERE center.name = $node_id OR center.name ENDS WITH $node_id OR center.name CONTAINS $node_id + OR center.event_id = $node_id + OR center.run_id = $node_id RETURN elementId(center) as id, labels(center)[0] as type, - center.name as label, + coalesce(center.name, center.event_id, center.run_id, center.symptom, center.phrase, 'unknown') as label, properties(center) as props LIMIT 1 """ diff --git a/scripts/ignition_api_client.py b/scripts/ignition_api_client.py index d0d7e41..e8fbccf 100644 --- a/scripts/ignition_api_client.py +++ b/scripts/ignition_api_client.py @@ -22,7 +22,11 @@ from urllib.parse import urljoin, quote import requests -from dotenv import load_dotenv +try: + from dotenv import load_dotenv +except ImportError: # pragma: no cover - optional fallback for minimal envs + def load_dotenv(*_args, **_kwargs): + return False load_dotenv() @@ -243,60 +247,55 @@ def read_tags(self, paths: List[str]) -> List[TagValue]: @staticmethod def _local_iso_to_utc(dt_str: str) -> str: - """Convert a bare ISO datetime string (assumed local) to UTC. + """ + Convert a bare ISO datetime string (assumed local time) to UTC. - If the string already has a timezone indicator (Z, +, -) - or looks like epoch milliseconds, it is returned unchanged. + If the input already contains timezone info or appears to be epoch + milliseconds, it is returned unchanged. """ from datetime import datetime, timezone - s = str(dt_str).strip() + text = str(dt_str).strip() + if not text: + return text - # Epoch ms – pass through - if s.isdigit(): - return s + # Epoch millis (or seconds) should pass through unchanged. + if text.isdigit(): + return text - # Already has TZ info – pass through - if s.endswith("Z") or "+" in s[10:] or s[10:].count("-") > 0: - return s + # Already timezone-aware. + if text.endswith("Z") or "+" in text[10:] or text[10:].count("-") > 0: + return text try: - naive = datetime.fromisoformat(s) - local_dt = naive.astimezone() # attach local TZ + naive = datetime.fromisoformat(text) + local_dt = naive.astimezone() utc_dt = local_dt.astimezone(timezone.utc) return utc_dt.strftime("%Y-%m-%dT%H:%M:%S") except (ValueError, TypeError): - return s + return text def query_tag_history( self, tag_paths: List[str], start_date: str, end_date: str, - return_size: int = 100, + return_size: int = 200, aggregation_mode: str = "Average", return_format: str = "Wide", interval_minutes: Optional[int] = None, include_bounding_values: bool = False, ) -> Optional[Any]: - """Query historical tag values via the WebDev queryTagHistory endpoint. - - Bare ISO datetime strings (no timezone suffix) are assumed to be in - the server's local timezone and are converted to UTC before sending - to the gateway (which interprets all times as UTC). - - Args: - tag_paths: Tag paths with provider prefix, e.g. ['[default]Folder/Tag']. - start_date: ISO datetime string (local) or epoch ms. - end_date: ISO datetime string (local) or epoch ms. - return_size: Max rows to return (default 100). - aggregation_mode: Average, MinMax, LastValue, Sum, Minimum, Maximum. - return_format: Wide or Tall. - interval_minutes: Aggregation interval in minutes. - include_bounding_values: Include values at boundaries. """ - normalised = [self._ensure_provider_prefix(p) for p in tag_paths] + Query historical tag values from the WebDev queryTagHistory endpoint. + + Dates may be passed as local ISO strings; they are converted to UTC + to match Ignition endpoint expectations. + """ + if not tag_paths: + return {"error": "No tag paths provided", "tagPaths": []} + normalised = [self._ensure_provider_prefix(p) for p in tag_paths] utc_start = self._local_iso_to_utc(start_date) utc_end = self._local_iso_to_utc(end_date) @@ -304,19 +303,17 @@ def query_tag_history( "tagPaths": ",".join(normalised), "startDate": utc_start, "endDate": utc_end, - "returnSize": return_size, + "returnSize": int(return_size), "aggregationMode": aggregation_mode, "returnFormat": return_format, - "includeBoundingValues": str(include_bounding_values).lower(), + "includeBoundingValues": str(bool(include_bounding_values)).lower(), } if interval_minutes is not None: - params["intervalMinutes"] = interval_minutes + params["intervalMinutes"] = int(interval_minutes) data = self._get("system/webdev/Axilon/queryTagHistory", params=params) - if data is None: return {"error": "API request failed or not configured", "tagPaths": normalised} - return data # --------------------------------------------------------------------- # diff --git a/scripts/neo4j_ontology.py b/scripts/neo4j_ontology.py index 110719f..380e3cb 100644 --- a/scripts/neo4j_ontology.py +++ b/scripts/neo4j_ontology.py @@ -9,7 +9,11 @@ from typing import Dict, List, Optional, Any, Union from dataclasses import dataclass, field from contextlib import contextmanager -from dotenv import load_dotenv +try: + from dotenv import load_dotenv +except ImportError: # pragma: no cover - optional fallback for minimal envs + def load_dotenv(*_args, **_kwargs): + return False from neo4j import GraphDatabase, Driver, Session @@ -147,6 +151,8 @@ def create_indexes(self) -> None: "CREATE CONSTRAINT project_name IF NOT EXISTS FOR (p:Project) REQUIRE p.name IS UNIQUE", "CREATE CONSTRAINT script_name IF NOT EXISTS FOR (s:Script) REQUIRE s.name IS UNIQUE", "CREATE CONSTRAINT namedquery_name IF NOT EXISTS FOR (q:NamedQuery) REQUIRE q.name IS UNIQUE", + "CREATE CONSTRAINT agentrun_id IF NOT EXISTS FOR (r:AgentRun) REQUIRE r.run_id IS UNIQUE", + "CREATE CONSTRAINT anomalyevent_id IF NOT EXISTS FOR (e:AnomalyEvent) REQUIRE e.event_id IS UNIQUE", ] # Regular indexes @@ -186,6 +192,11 @@ def create_indexes(self) -> None: "CREATE INDEX hmitextlist_name IF NOT EXISTS FOR (htl:HMITextList) ON (htl.name)", "CREATE INDEX plctagtable_name IF NOT EXISTS FOR (pt:PLCTagTable) ON (pt.name)", "CREATE INDEX plctag_name IF NOT EXISTS FOR (ptg:PLCTag) ON (ptg.name)", + # Agent monitoring indexes + "CREATE INDEX anomalyevent_created IF NOT EXISTS FOR (e:AnomalyEvent) ON (e.created_at)", + "CREATE INDEX anomalyevent_state IF NOT EXISTS FOR (e:AnomalyEvent) ON (e.state)", + "CREATE INDEX anomalyevent_severity IF NOT EXISTS FOR (e:AnomalyEvent) ON (e.severity)", + "CREATE INDEX anomalyevent_dedup_key IF NOT EXISTS FOR (e:AnomalyEvent) ON (e.dedup_key)", ] for constraint in constraints: @@ -202,6 +213,95 @@ def create_indexes(self) -> None: if "already exists" not in str(e).lower(): print(f"[WARNING] Index error: {e}") + def init_agent_monitoring_schema(self) -> None: + """Ensure agent monitoring labels and indexes exist.""" + self.create_indexes() + + def list_anomaly_events( + self, + limit: int = 100, + state: Optional[str] = None, + severity: Optional[str] = None, + run_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """List persisted anomaly events for UI feeds.""" + with self.session() as session: + clauses = [] + params: Dict[str, Any] = {"limit": max(1, min(limit, 500))} + if state: + clauses.append("e.state = $state") + params["state"] = state + if severity: + clauses.append("e.severity = $severity") + params["severity"] = severity + if run_id: + clauses.append("e.run_id = $run_id") + params["run_id"] = run_id + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + query = f""" + MATCH (e:AnomalyEvent) + {where} + OPTIONAL MATCH (e)-[:OBSERVED_ON]->(t:ScadaTag) + OPTIONAL MATCH (e)-[:AFFECTS]->(eq:Equipment) + RETURN e, collect(DISTINCT t.name) AS tags, collect(DISTINCT eq.name) AS equipment + ORDER BY e.created_at DESC + LIMIT $limit + """ + result = session.run(query, **params) + events: List[Dict[str, Any]] = [] + for record in result: + node = record["e"] + props = dict(node) + props["tags"] = [x for x in record["tags"] if x] + props["equipment"] = [x for x in record["equipment"] if x] + events.append(props) + return events + + def get_anomaly_event(self, event_id: str) -> Optional[Dict[str, Any]]: + """Get one anomaly event with linked context labels.""" + with self.session() as session: + result = session.run( + """ + MATCH (e:AnomalyEvent {event_id: $event_id}) + OPTIONAL MATCH (e)-[:OBSERVED_ON]->(t:ScadaTag) + OPTIONAL MATCH (e)-[:AFFECTS]->(eq:Equipment) + OPTIONAL MATCH (e)-[r:RELATED_TO]->(n) + RETURN e, + collect(DISTINCT t.name) AS tags, + collect(DISTINCT eq.name) AS equipment, + collect(DISTINCT {type: type(r), label: labels(n)[0], name: coalesce(n.name, n.symptom, n.phrase)}) AS related + LIMIT 1 + """, + event_id=event_id, + ) + record = result.single() + if not record: + return None + data = dict(record["e"]) + data["tags"] = [x for x in record["tags"] if x] + data["equipment"] = [x for x in record["equipment"] if x] + data["related"] = [ + x for x in record["related"] if x and x.get("name") + ] + return data + + def cleanup_anomaly_events(self, retention_days: int = 14) -> int: + """Delete old anomaly events outside retention window.""" + with self.session() as session: + result = session.run( + """ + MATCH (e:AnomalyEvent) + WHERE e.created_at IS NOT NULL + AND datetime(e.created_at) < datetime() - duration({days: $days}) + WITH collect(e) AS old_events + FOREACH (n IN old_events | DETACH DELETE n) + RETURN size(old_events) AS deleted + """, + days=max(1, retention_days), + ) + record = result.single() + return int(record["deleted"]) if record else 0 + def clear_all(self) -> None: """Clear all nodes and relationships. USE WITH CAUTION.""" with self.session() as session: @@ -4192,12 +4292,22 @@ def main(): "tia-projects", "tia-project-resources", "db-connections", + "init-agent-schema", + "list-anomaly-events", + "get-anomaly-event", + "cleanup-anomaly-events", ], help="Command to execute", ) parser.add_argument("--file", "-f", help="JSON file for import/export") parser.add_argument("--query", "-q", help="Query string for search") parser.add_argument("--project", "-p", help="Project name for project-resources") + parser.add_argument("--event-id", help="Event ID for get-anomaly-event") + parser.add_argument("--state", help="Filter anomaly events by state") + parser.add_argument("--severity", help="Filter anomaly events by severity") + parser.add_argument("--run-id", help="Filter anomaly events by run_id") + parser.add_argument("--limit", type=int, default=100, help="Limit results for list commands") + parser.add_argument("--retention-days", type=int, default=14, help="Retention window in days") parser.add_argument("--json", action="store_true", help="Output in JSON format") parser.add_argument( "--enrichment-status", @@ -4437,7 +4547,43 @@ def main(): f" {c['name']} ({c['database_type']}) " f"- {c['url']} [{enabled}]" ) + elif args.command == "init-agent-schema": + graph.init_agent_monitoring_schema() + print("[OK] Initialized agent monitoring schema") + + elif args.command == "list-anomaly-events": + events = graph.list_anomaly_events( + limit=args.limit, + state=args.state, + severity=args.severity, + run_id=args.run_id, + ) + if args.json: + print(json_module.dumps(events)) + else: + print(f"Anomaly events: {len(events)}") + for event in events: + print( + f"- {event.get('event_id')} {event.get('severity')} " + f"{event.get('summary', '')[:80]}" + ) + + elif args.command == "get-anomaly-event": + if not args.event_id: + print("[ERROR] --event-id required for get-anomaly-event") + return + event = graph.get_anomaly_event(args.event_id) + if args.json: + print(json_module.dumps(event or {})) + else: + if not event: + print(f"[ERROR] Event not found: {args.event_id}") + return + print(json_module.dumps(event, indent=2)) + elif args.command == "cleanup-anomaly-events": + deleted = graph.cleanup_anomaly_events(args.retention_days) + print(f"[OK] Deleted {deleted} anomaly events older than {args.retention_days} days") if __name__ == "__main__": main() From 1f1f6b4dc1f7005d8d144d21a10ff2dd77cce070 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 2 Mar 2026 18:06:40 +0000 Subject: [PATCH 02/16] Emit provider failures as anomaly feed events Co-authored-by: leor --- scripts/anomaly_monitor.py | 183 ++++++++++++++++++++++++++++++++----- 1 file changed, 161 insertions(+), 22 deletions(-) diff --git a/scripts/anomaly_monitor.py b/scripts/anomaly_monitor.py index 70a0f4b..9049108 100644 --- a/scripts/anomaly_monitor.py +++ b/scripts/anomaly_monitor.py @@ -263,7 +263,7 @@ def _extract_history_values(self, history_data: Any, tag_path: str) -> List[floa values.append(val) return values - def fetch_history_values(self, tag_path: str) -> List[float]: + def fetch_history_values(self, tag_path: str) -> tuple[List[float], Optional[str]]: minutes = int(self.config.get("historyWindowMinutes", 360)) end_dt = datetime.now(timezone.utc) start_dt = end_dt - timedelta(minutes=minutes) @@ -275,7 +275,9 @@ def fetch_history_values(self, tag_path: str) -> List[float]: aggregation_mode="Average", return_format="Wide", ) - return self._extract_history_values(data, tag_path) + if isinstance(data, dict) and data.get("error"): + return [], str(data.get("error")) + return self._extract_history_values(data, tag_path), None def get_context(self, tag_path: str) -> Dict[str, Any]: with self.graph.session() as session: @@ -517,6 +519,97 @@ def persist_event( return event_data + def _emit_persisted_event(self, persisted: Dict[str, Any]) -> None: + """Emit normalized AGENT_EVENT payload for UI stream.""" + emit("AGENT_EVENT", { + "runId": self.run_id, + "eventId": persisted["event_id"], + "severity": persisted["severity"], + "summary": persisted["summary"], + "category": persisted.get("category"), + "entityRefs": { + "tag": persisted.get("tag_name") or persisted.get("source_tag"), + "sourceTag": persisted.get("source_tag"), + }, + "createdAt": persisted.get("created_at"), + }) + + def emit_provider_failure_event( + self, + code: str, + message: str, + *, + severity: str = "high", + category: str = "quality-issue", + source_tag: Optional[str] = None, + details: Optional[Dict[str, Any]] = None, + ) -> bool: + """ + Persist and stream provider-health anomalies so failures appear in feed. + + Returns: + True if a new event was persisted (false if deduped). + """ + emit("AGENT_ERROR", { + "runId": self.run_id, + "code": code, + "message": message, + "recoverable": True, + "timestamp": utc_now_iso(), + }) + + tag = source_tag or f"provider://{code}" + detail_blob = json.dumps(details or {}, default=str) + context = { + "tag_path": tag, + "tag_name": source_tag or "ProviderHealth", + "equipment": [], + "symptoms": [], + "causes": [], + "patterns": [], + "safety": [], + } + deterministic = { + "candidate": True, + "reasons": [code], + "category": category, + "z_score": 0.0, + "mad_score": 0.0, + "delta_rate": 0.0, + "window_volatility": 0.0, + "history_points": 0, + } + triage = { + "summary": message, + "category": category, + "severity": severity, + "confidence": 0.9, + "probable_causes": [message], + "verification_checks": [ + "Check Ignition gateway connectivity and credentials.", + "Validate tag provider availability and endpoint health.", + ], + "safety_notes": [], + "rationale": f"Provider health event ({code}). Details: {detail_blob}", + "related_entities": [], + } + persisted = self.persist_event( + context=context, + deterministic=deterministic, + live_sample={ + "path": tag, + "value": "", + "quality": "Bad", + "timestamp": utc_now_iso(), + "data_type": "provider_health", + }, + triage=triage, + ) + if persisted: + self._emit_persisted_event(persisted) + return True + return False + # ----------------------------- # Monitoring loop # ----------------------------- @@ -527,13 +620,14 @@ def run_cycle(self) -> Dict[str, Any]: min_history = int(self.config.get("minHistoryPoints", 30)) if not self.api.is_configured: - emit("AGENT_ERROR", { - "runId": self.run_id, - "code": "ignition_not_configured", - "message": "Ignition API URL/token not configured.", - "recoverable": True, - "timestamp": utc_now_iso(), - }) + emitted = self.emit_provider_failure_event( + "ignition_not_configured", + "Ignition API URL/token not configured.", + severity="critical", + category="state-conflict", + ) + if emitted: + metrics["emitted"] += 1 metrics["cycleMs"] = int((time.time() - cycle_start) * 1000) return metrics @@ -553,17 +647,31 @@ def run_cycle(self) -> Dict[str, Any]: live_values = self.api.read_tags(tag_paths) candidates: List[Dict[str, Any]] = [] now = datetime.now(timezone.utc) + live_error_count = 0 + live_error_samples: List[str] = [] + history_error_count = 0 + history_error_samples: List[str] = [] + valid_live_count = 0 for tv in live_values: if tv.error: + live_error_count += 1 + if len(live_error_samples) < 5: + live_error_samples.append(f"{tv.path}: {tv.error}") continue + valid_live_count += 1 if not is_quality_good(tv.quality): # quality gate: only emit quality anomalies if this persists via triage. continue if is_stale(tv.timestamp, int(thresholds.get("stalenessSec", 120)), now=now): continue - history = self.fetch_history_values(tv.path) + history, history_error = self.fetch_history_values(tv.path) + if history_error: + history_error_count += 1 + if len(history_error_samples) < 5: + history_error_samples.append(f"{tv.path}: {history_error}") + continue if len(history) < min_history: continue @@ -594,6 +702,48 @@ def run_cycle(self) -> Dict[str, Any]: } ) + if live_values and live_error_count == len(live_values): + emitted = self.emit_provider_failure_event( + "live_tag_provider_failed", + f"Live tag provider failed for all reads ({live_error_count}/{len(live_values)}).", + severity="high", + category="quality-issue", + details={"samples": live_error_samples}, + ) + if emitted: + metrics["emitted"] += 1 + elif live_error_count > 0: + emitted = self.emit_provider_failure_event( + "live_tag_provider_partial_failure", + f"Live tag provider partially failed ({live_error_count}/{len(live_values)} reads).", + severity="medium", + category="quality-issue", + details={"samples": live_error_samples}, + ) + if emitted: + metrics["emitted"] += 1 + + if valid_live_count > 0 and history_error_count >= max(1, int(valid_live_count * 0.8)): + emitted = self.emit_provider_failure_event( + "history_provider_failed", + f"History provider failed for most queries ({history_error_count}/{valid_live_count}).", + severity="high", + category="quality-issue", + details={"samples": history_error_samples}, + ) + if emitted: + metrics["emitted"] += 1 + elif history_error_count > 0: + emitted = self.emit_provider_failure_event( + "history_provider_partial_failure", + f"History provider partially failed ({history_error_count}/{valid_live_count}).", + severity="medium", + category="quality-issue", + details={"samples": history_error_samples}, + ) + if emitted: + metrics["emitted"] += 1 + metrics["candidates"] = len(candidates) max_candidates = int(self.config.get("maxCandidatesPerCycle", 25)) max_triage = int(self.config.get("maxLlmTriagesPerCycle", 5)) @@ -629,18 +779,7 @@ def run_cycle(self) -> Dict[str, Any]: ) if persisted: metrics["emitted"] += 1 - emit("AGENT_EVENT", { - "runId": self.run_id, - "eventId": persisted["event_id"], - "severity": persisted["severity"], - "summary": persisted["summary"], - "category": persisted.get("category"), - "entityRefs": { - "tag": persisted.get("tag_name") or persisted.get("source_tag"), - "sourceTag": persisted.get("source_tag"), - }, - "createdAt": persisted.get("created_at"), - }) + self._emit_persisted_event(persisted) metrics["cycleMs"] = int((time.time() - cycle_start) * 1000) return metrics From 1a17e651432ab28c7109ef46bc5fc05987e4be53 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 2 Mar 2026 18:19:17 +0000 Subject: [PATCH 03/16] Migrate tests to pytest with ingest coverage Co-authored-by: leor --- .gitignore | 3 +- pytest.ini | 4 + requirements-dev.txt | 1 + tests/README.md | 46 +++++ tests/conftest.py | 35 ++++ .../integration/simulated_ignition_server.py | 170 ++++++++++++++++++ .../integration/test_live_value_sim_server.py | 75 ++++++++ tests/unit/test_anomaly_rules.py | 64 +++++++ tests/unit/test_ingest_siemens_parser.py | 72 ++++++++ tests/unit/test_ingest_workbench_parser.py | 119 ++++++++++++ 10 files changed, 587 insertions(+), 2 deletions(-) create mode 100644 pytest.ini create mode 100644 requirements-dev.txt create mode 100644 tests/README.md create mode 100644 tests/conftest.py create mode 100644 tests/integration/simulated_ignition_server.py create mode 100644 tests/integration/test_live_value_sim_server.py create mode 100644 tests/unit/test_anomaly_rules.py create mode 100644 tests/unit/test_ingest_siemens_parser.py create mode 100644 tests/unit/test_ingest_workbench_parser.py diff --git a/.gitignore b/.gitignore index 085a6d7..28f5878 100644 --- a/.gitignore +++ b/.gitignore @@ -9,8 +9,7 @@ venv/ ENV/ .venv -# Test files and outputs -tests/ +# Test outputs *_updated*.xml *_applied*.xml *_diffs/ diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..3b2c446 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +testpaths = tests +python_files = test_*.py +addopts = -q diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..e079f8a --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1 @@ +pytest diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..350a8d4 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,46 @@ +# Test Flow: Agents Monitoring + Ingest + +This repository now includes a lightweight test scaffold using `pytest`. + +## Layout + +- `tests/unit/` + - `test_anomaly_rules.py` + Unit tests for deterministic anomaly scoring and quality/staleness gates. + - `test_ingest_workbench_parser.py` + Unit tests for workbench ingest parsing. + - `test_ingest_siemens_parser.py` + Unit tests for Siemens `.st` ingest parsing. + +- `tests/integration/` + - `simulated_ignition_server.py` + Local simulated live/history webserver implementing: + - `/system/webdev/Axilon/getTags` + - `/system/webdev/Axilon/queryTagHistory` + - `test_live_value_sim_server.py` + Integration tests for `IgnitionApiClient` + anomaly scoring with simulated live values. + +## Run all tests + +```bash +python3 -m pytest +``` + +## Run only unit tests + +```bash +python3 -m pytest tests/unit +``` + +## Run only integration tests + +```bash +python3 -m pytest tests/integration +``` + +## Notes + +- Integration tests are fully local and do **not** require a real Ignition gateway. +- LLM services are not required for these tests. +- Neo4j is not required for this test suite. + diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5b51088 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPTS_DIR = REPO_ROOT / "scripts" +INTEGRATION_DIR = REPO_ROOT / "tests" / "integration" + +for path in (SCRIPTS_DIR, INTEGRATION_DIR): + path_str = str(path) + if path_str not in sys.path: + sys.path.insert(0, path_str) + + +@pytest.fixture +def sim_ignition(): + from simulated_ignition_server import ( + start_simulated_ignition_server, + stop_simulated_ignition_server, + ) + + server, thread, state, base_url = start_simulated_ignition_server() + try: + yield { + "server": server, + "thread": thread, + "state": state, + "base_url": base_url, + } + finally: + stop_simulated_ignition_server(server, thread) diff --git a/tests/integration/simulated_ignition_server.py b/tests/integration/simulated_ignition_server.py new file mode 100644 index 0000000..607f316 --- /dev/null +++ b/tests/integration/simulated_ignition_server.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +""" +Simulated Ignition WebDev endpoints for local integration tests. +""" + +from __future__ import annotations + +import json +import threading +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Dict, List, Tuple +from urllib.parse import parse_qs, urlparse + + +def _utc_iso(offset_minutes: int = 0) -> str: + return (datetime.now(timezone.utc) + timedelta(minutes=offset_minutes)).isoformat() + + +@dataclass +class SimulatedIgnitionState: + fail_live_reads: bool = False + fail_history_reads: bool = False + live_tags: Dict[str, Dict] = field(default_factory=dict) + tag_history: Dict[str, List[Tuple[str, float]]] = field(default_factory=dict) + + def __post_init__(self) -> None: + if not self.live_tags: + self.live_tags = { + "[default]Line/Throughput": { + "value": 95.0, + "quality": "Good", + "timestamp": _utc_iso(), + "dataType": "Float8", + }, + "[default]Line/Temperature": { + "value": 42.0, + "quality": "Good", + "timestamp": _utc_iso(), + "dataType": "Float8", + }, + } + if not self.tag_history: + base = [49.9, 50.1, 50.0, 50.2, 50.1, 49.8, 50.3, 50.0, 49.9, 50.2] + self.tag_history = { + "[default]Line/Throughput": [ + (_utc_iso(offset_minutes=-(len(base) - i)), value) + for i, value in enumerate(base) + ], + "[default]Line/Temperature": [ + (_utc_iso(offset_minutes=-(len(base) - i)), 41.5 + (i * 0.1)) + for i in range(len(base)) + ], + } + + +class _IgnitionHandler(BaseHTTPRequestHandler): + state: SimulatedIgnitionState + + def _send_json(self, payload, status: int = 200) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self): # noqa: N802 - BaseHTTPRequestHandler naming + parsed = urlparse(self.path) + path = parsed.path + query = parse_qs(parsed.query) + + if path == "/system/webdev/Axilon/getTags": + if self.state.fail_live_reads: + self._send_json({"error": "simulated live provider failure"}, status=503) + return + + raw = query.get("tagPaths", [""])[0] + tag_paths = [p.strip() for p in raw.split(",") if p.strip()] + tags = [] + for tag_path in tag_paths: + data = self.state.live_tags.get(tag_path) + if not data: + tags.append( + { + "tagPath": tag_path, + "value": None, + "quality": "Bad", + "isGood": False, + "timestamp": _utc_iso(), + "dataType": "Unknown", + } + ) + continue + tags.append( + { + "tagPath": tag_path, + "value": data.get("value"), + "quality": data.get("quality", "Good"), + "isGood": str(data.get("quality", "Good")).lower() == "good", + "timestamp": data.get("timestamp", _utc_iso()), + "dataType": data.get("dataType", "Unknown"), + } + ) + self._send_json({"success": True, "count": len(tags), "tags": tags}) + return + + if path == "/system/webdev/Axilon/queryTagHistory": + if self.state.fail_history_reads: + self._send_json({"error": "simulated history provider failure"}, status=503) + return + + raw = query.get("tagPaths", [""])[0] + tag_paths = [p.strip() for p in raw.split(",") if p.strip()] + rows = [] + + primary_path = tag_paths[0] if tag_paths else "[default]Line/Throughput" + primary_hist = self.state.tag_history.get(primary_path, []) + for ts, _ in primary_hist: + row = {"timestamp": ts} + for tag_path in tag_paths: + values = self.state.tag_history.get(tag_path, []) + match_val = None + for hist_ts, hist_val in values: + if hist_ts == ts: + match_val = hist_val + break + if match_val is None and values: + match_val = values[-1][1] + row[tag_path] = match_val + rows.append(row) + + self._send_json( + { + "success": True, + "rows": rows, + "tagPaths": tag_paths, + "returnFormat": "Wide", + } + ) + return + + self._send_json({"error": f"unsupported endpoint: {path}"}, status=404) + + def log_message(self, format, *args): # noqa: A003 - stdlib signature + # Silence default HTTP request logs during tests. + return + + +def start_simulated_ignition_server() -> tuple[HTTPServer, threading.Thread, SimulatedIgnitionState, str]: + state = SimulatedIgnitionState() + handler_cls = type( + "IgnitionHandlerWithState", + (_IgnitionHandler,), + {"state": state}, + ) + server = HTTPServer(("127.0.0.1", 0), handler_cls) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + host, port = server.server_address + base_url = f"http://{host}:{port}" + return server, thread, state, base_url + + +def stop_simulated_ignition_server(server: HTTPServer, thread: threading.Thread) -> None: + server.shutdown() + server.server_close() + thread.join(timeout=3) + diff --git a/tests/integration/test_live_value_sim_server.py b/tests/integration/test_live_value_sim_server.py new file mode 100644 index 0000000..d6feeea --- /dev/null +++ b/tests/integration/test_live_value_sim_server.py @@ -0,0 +1,75 @@ +from datetime import datetime, timedelta, timezone + +from anomaly_rules import compute_deviation_scores +from ignition_api_client import IgnitionApiClient + +def test_read_tags_history_and_detect_spike(sim_ignition): + state = sim_ignition["state"] + state.fail_live_reads = False + state.fail_history_reads = False + + client = IgnitionApiClient(base_url=sim_ignition["base_url"], api_token="token") + try: + tag_path = "[default]Line/Throughput" + tv = client.read_tag(tag_path) + assert tv.error is None + assert tv.quality == "Good" + assert float(tv.value) == 95.0 + + start = (datetime.now(timezone.utc) - timedelta(hours=1)).replace(microsecond=0).isoformat() + end = datetime.now(timezone.utc).replace(microsecond=0).isoformat() + history = client.query_tag_history([tag_path], start, end, return_size=100) + assert isinstance(history, dict) + assert "rows" in history + + history_values = [ + row[tag_path] + for row in history["rows"] + if isinstance(row, dict) and tag_path in row and row[tag_path] is not None + ] + assert len(history_values) > 5 + + score = compute_deviation_scores( + current_value=tv.value, + history_values=history_values, + prev_value=55.0, + thresholds={"z": 3.0, "mad": 3.5, "rate": 10.0}, + ) + assert score["candidate"] + assert score["category"] in {"spike", "deviation", "drift"} + finally: + client.close() + + +def test_live_provider_failure_surfaces_as_read_error(sim_ignition): + state = sim_ignition["state"] + state.fail_live_reads = True + + client = IgnitionApiClient(base_url=sim_ignition["base_url"], api_token="token") + try: + tv = client.read_tag("[default]Line/Throughput") + assert tv.error is not None + assert "failed" in tv.error.lower() + finally: + client.close() + + +def test_history_provider_failure_surfaces_error_payload(sim_ignition): + state = sim_ignition["state"] + state.fail_history_reads = True + + client = IgnitionApiClient(base_url=sim_ignition["base_url"], api_token="token") + try: + start = (datetime.now(timezone.utc) - timedelta(hours=1)).replace(microsecond=0).isoformat() + end = datetime.now(timezone.utc).replace(microsecond=0).isoformat() + history = client.query_tag_history( + ["[default]Line/Throughput"], + start, + end, + return_size=100, + ) + assert isinstance(history, dict) + assert "error" in history + finally: + client.close() + diff --git a/tests/unit/test_anomaly_rules.py b/tests/unit/test_anomaly_rules.py new file mode 100644 index 0000000..e5f2af1 --- /dev/null +++ b/tests/unit/test_anomaly_rules.py @@ -0,0 +1,64 @@ +from datetime import datetime, timedelta, timezone + +import pytest + +from anomaly_rules import compute_deviation_scores, is_quality_good, is_stale + + +def test_detects_sharp_rise_and_sharp_drop(): + baseline = [50.0, 49.9, 50.1, 50.2, 49.8, 50.0, 50.1, 49.9, 50.0, 50.2] * 3 + + rise = compute_deviation_scores( + current_value=95.0, + history_values=baseline, + prev_value=52.0, + thresholds={"z": 3.0, "mad": 3.5, "rate": 10.0}, + ) + drop = compute_deviation_scores( + current_value=12.0, + history_values=baseline, + prev_value=49.0, + thresholds={"z": 3.0, "mad": 3.5, "rate": 10.0}, + ) + + assert rise["candidate"] + assert drop["candidate"] + + +def test_detects_flatline_stuck_pattern(): + flat = [72.0] * 30 + result = compute_deviation_scores( + current_value=72.0, + history_values=flat, + prev_value=72.0, + thresholds={"z": 3.0, "mad": 3.5, "rate": 1.0, "stuck_window_size": 20}, + ) + assert result["candidate"] + assert "flatline_detected" in result["reasons"] + assert result["category"] == "stuck" + + +@pytest.mark.parametrize( + "quality,expected", + [("Good", True), ("OK", True), ("Bad", False), (None, False)], +) +def test_quality_helper(quality, expected): + assert is_quality_good(quality) is expected + + +def test_staleness_helper(): + recent_ts = datetime.now(timezone.utc).isoformat() + old_ts = (datetime.now(timezone.utc) - timedelta(minutes=15)).isoformat() + assert not is_stale(recent_ts, staleness_sec=300) + assert is_stale(old_ts, staleness_sec=300) + + +def test_non_numeric_current_value_is_rejected(): + result = compute_deviation_scores( + current_value="not-a-number", + history_values=[1, 2, 3, 4, 5], + prev_value=3, + ) + assert not result["candidate"] + assert result["category"] == "invalid_value" + diff --git a/tests/unit/test_ingest_siemens_parser.py b/tests/unit/test_ingest_siemens_parser.py new file mode 100644 index 0000000..935bf71 --- /dev/null +++ b/tests/unit/test_ingest_siemens_parser.py @@ -0,0 +1,72 @@ +from pathlib import Path + +from siemens_parser import SiemensSTParser + + +SAMPLE_ST = """ +NAMESPACE Plant.Process + +TYPE MotorData : STRUCT + Speed : REAL; +END_STRUCT +END_TYPE + +CLASS MotorFB +VAR_INPUT + StartCmd : BOOL; // start command +END_VAR +VAR_OUTPUT + Running : BOOL; +END_VAR +METHOD PUBLIC Execute : BOOL +VAR + tempVar : INT := 1; +END_VAR +Running := StartCmd; +END_METHOD +END_CLASS + +PROGRAM MainProgram +VAR + Counter : INT := 0; +END_VAR +Counter := Counter + 1; +END_PROGRAM + +CONFIGURATION Config1 +TASK MainTask(INTERVAL := T#100MS, PRIORITY := 1); +PROGRAM PLC_PRG WITH MainTask: MainProgram; +END_CONFIGURATION + +END_NAMESPACE +""" + + +def test_parse_structured_text_blocks(tmp_path): + st_path = Path(tmp_path) / "sample.st" + st_path.write_text(SAMPLE_ST, encoding="utf-8") + + parser = SiemensSTParser() + blocks = parser.parse_file(str(st_path)) + assert len(blocks) >= 4 + + by_name = {b.name: b for b in blocks} + assert "MotorData" in by_name + assert by_name["MotorData"].type == "UDT" + assert by_name["MotorData"].local_tags[0].name == "Speed" + + assert "MotorFB" in by_name + fb = by_name["MotorFB"] + assert fb.type == "FB" + assert any(t.name == "StartCmd" and t.direction == "INPUT" for t in fb.input_tags) + assert any(t.name == "Running" and t.direction == "OUTPUT" for t in fb.output_tags) + assert any(r["name"] == "Execute" for r in fb.routines) + + assert "MainProgram" in by_name + assert by_name["MainProgram"].type == "PROGRAM" + assert "Counter := Counter + 1" in by_name["MainProgram"].raw_implementation + + assert "Config1" in by_name + assert by_name["Config1"].type == "CONFIGURATION" + assert "MainTask" in by_name["Config1"].description + diff --git a/tests/unit/test_ingest_workbench_parser.py b/tests/unit/test_ingest_workbench_parser.py new file mode 100644 index 0000000..7609490 --- /dev/null +++ b/tests/unit/test_ingest_workbench_parser.py @@ -0,0 +1,119 @@ +import json +from pathlib import Path + +from workbench_parser import WorkbenchParser + + +def test_parse_workbench_project_json_with_inline_resources(tmp_path): + root = Path(tmp_path) + + # Script file expected by WorkbenchParser._read_script_file + script_file = root / "scripts" / "PlantA" / "utility" / "tags" / "code.py" + script_file.parent.mkdir(parents=True, exist_ok=True) + script_file.write_text("def read_tag():\n return 42\n", encoding="utf-8") + + data = { + "__typeName": "WorkbenchState", + "version": "1.2.3", + "root": { + "windows": [ + { + "projectName": "PlantA", + "title": "MainView", + "path": "main/view", + "windowType": "perspective", + "rootContainer": { + "meta": {"name": "Root"}, + "type": "ia.container", + "propConfig": { + "props.value": { + "binding": { + "type": "tag", + "config": { + "tagPath": "[default]Line/Speed", + "bidirectional": True, + }, + } + } + }, + "children": [], + }, + } + ], + "namedQueries": [ + { + "projectName": "PlantA", + "queryName": "GetBatches", + "folderPath": "Prod\\Ops", + "query": "SELECT * FROM batches", + } + ], + "scripts": [ + { + "projectName": "PlantA", + "path": ["utility", "tags"], + "scope": "A", + } + ], + "tags": [ + { + "name": "LineSpeed", + "type": "Opc", + "dataType": "Float8", + "opcItemPath": "[default]Line/Speed", + }, + { + "name": "BatchCount", + "type": "Memory", + "dataType": "Int4", + "value": 7, + }, + ], + "udtDefinitions": [ + { + "name": "MotorUDT", + "id": "MotorUDT", + "parameters": { + "area": {"dataType": "String", "value": "A1"} + }, + "members": [ + { + "name": "Run", + "type": "opc", + "dataType": "Boolean", + "opcItemPath": "[default]Motor/Run", + "serverName": {"binding": "default"}, + } + ], + } + ], + }, + } + + project_json = root / "project.json" + project_json.write_text(json.dumps(data), encoding="utf-8") + + parser = WorkbenchParser() + backup = parser.parse_file(str(project_json)) + + assert "PlantA" in backup.projects + assert len(backup.windows) == 1 + assert backup.windows[0].name == "MainView" + assert backup.windows[0].components[0].bindings[0].target == "[default]Line/Speed" + + assert len(backup.named_queries) == 1 + assert backup.named_queries[0].id == "Prod/Ops/GetBatches" + assert "SELECT" in backup.named_queries[0].query_text + + assert len(backup.scripts) == 1 + assert "return 42" in backup.scripts[0].script_text + + tag_types = {t.name: t.tag_type for t in backup.tags} + assert tag_types["LineSpeed"] == "opc" + assert tag_types["BatchCount"] == "memory" + + assert len(backup.udt_definitions) == 1 + udt = backup.udt_definitions[0] + assert "area" in udt.parameters + assert udt.members[0].server_name == "default" + From e9ca37d4bbd907fd258af0ebca7e3122dcaa1d1c Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 2 Mar 2026 22:16:19 +0000 Subject: [PATCH 04/16] Improve anomaly visibility and clear acknowledged events Co-authored-by: leor --- electron-ui/index.html | 1 + electron-ui/main.js | 11 ++++ electron-ui/preload.js | 1 + electron-ui/renderer.js | 27 ++++++++-- scripts/anomaly_monitor.py | 101 ++++++++++++++++++++++++++++++++++++- 5 files changed, 136 insertions(+), 5 deletions(-) diff --git a/electron-ui/index.html b/electron-ui/index.html index 7e5e8a7..99ba9a1 100644 --- a/electron-ui/index.html +++ b/electron-ui/index.html @@ -593,6 +593,7 @@

Anomaly Feed

+ + diff --git a/electron-ui/main.js b/electron-ui/main.js index 43eb3fb..5b6a081 100644 --- a/electron-ui/main.js +++ b/electron-ui/main.js @@ -209,7 +209,7 @@ function normalizeAgentConfig(config = {}) { const thresholds = (config && typeof config.thresholds === 'object' && config.thresholds) || {}; const scope = (config && typeof config.scope === 'object' && config.scope) || {}; return { - pollIntervalMs: Math.max(5000, Number(config.pollIntervalMs || 15000)), + pollIntervalMs: Math.max(1000, Number(config.pollIntervalMs || 1000)), historyWindowMinutes: Math.max(10, Number(config.historyWindowMinutes || 360)), minHistoryPoints: Math.max(10, Number(config.minHistoryPoints || 30)), maxMonitoredTags: Math.max(10, Number(config.maxMonitoredTags || 200)), diff --git a/electron-ui/renderer.js b/electron-ui/renderer.js index 9b5da03..8479580 100644 --- a/electron-ui/renderer.js +++ b/electron-ui/renderer.js @@ -3580,7 +3580,7 @@ function getAgentsElements() { function getAgentsConfigFromUI() { const el = getAgentsElements(); return { - pollIntervalMs: Number(el.cfgPoll?.value || 15000), + pollIntervalMs: Number(el.cfgPoll?.value || 1000), historyWindowMinutes: Number(el.cfgHist?.value || 360), minHistoryPoints: Number(el.cfgPoints?.value || 30), maxCandidatesPerSubsystem: 8, diff --git a/scripts/anomaly_monitor.py b/scripts/anomaly_monitor.py index 2fa7f73..ff033dc 100644 --- a/scripts/anomaly_monitor.py +++ b/scripts/anomaly_monitor.py @@ -238,7 +238,7 @@ def merge_defaults(config: Optional[Dict[str, Any]]) -> Dict[str, Any]: raw = dict(config or {}) thresholds = raw.get("thresholds", {}) if isinstance(raw.get("thresholds"), dict) else {} defaults = { - "pollIntervalMs": 15000, + "pollIntervalMs": 1000, "historyWindowMinutes": 360, "minHistoryPoints": 30, "maxMonitoredTags": 200, @@ -1607,7 +1607,7 @@ def run_forever(self) -> int: "timestamp": utc_now_iso(), }) - poll_ms = int(self.config.get("pollIntervalMs", 15000)) + poll_ms = int(self.config.get("pollIntervalMs", 1000)) cleanup_every = max(1, int(self.config.get("cleanupEveryCycles", 40))) exit_code = 0 reason = "stopped" From 6dcfd15f81912b3ce743e6e5eab1507c2f6f093e Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 2 Mar 2026 23:18:37 +0000 Subject: [PATCH 15/16] Emit in-cycle agent status progress updates Co-authored-by: leor --- scripts/anomaly_monitor.py | 47 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/scripts/anomaly_monitor.py b/scripts/anomaly_monitor.py index ff033dc..54065c8 100644 --- a/scripts/anomaly_monitor.py +++ b/scripts/anomaly_monitor.py @@ -1024,6 +1024,8 @@ def run_cycle(self) -> Dict[str, Any]: cycle_start = time.time() thresholds = self.config.get("thresholds", {}) stale_threshold_sec = int(thresholds.get("stalenessSec", 120)) + progress_emit_interval_tags = max(5, int(self.config.get("progressEveryTags", 10))) + progress_emit_interval_sec = max(1, int(self.config.get("progressEverySec", 2))) metrics = { "candidates": 0, "triaged": 0, @@ -1148,6 +1150,42 @@ def run_cycle(self) -> Dict[str, Any]: near_shift_unlinked = 0 stale_samples: List[Dict[str, Any]] = [] subsystem_shift_signals: Dict[str, Dict[str, Any]] = {} + processed_live_count = 0 + total_live_count = len(live_values) + last_progress_emit = 0.0 + + def emit_cycle_progress(reason: str, current_tag: str = "") -> None: + nonlocal last_progress_emit + diag = make_default_diagnostics( + staleness_threshold_sec=stale_threshold_sec, + phase="cycle_in_progress", + reason=reason, + ) + diag.update({ + "processedLiveCount": processed_live_count, + "totalLiveCount": total_live_count, + "currentTag": current_tag, + "candidatesSoFar": len(candidates), + "liveErrorCount": live_error_count, + "qualityFilteredCount": quality_filtered_count, + "staleFilteredCount": stale_filtered_count, + "historyErrorCount": history_error_count, + "linkedTags": linked_tag_count, + "unlinkedTags": unlinked_tag_count, + }) + emit("AGENT_STATUS", { + "runId": self.run_id, + "state": "running", + "cycleMs": int((time.time() - cycle_start) * 1000), + "candidates": len(candidates), + "triaged": 0, + "emitted": metrics.get("emitted", 0), + "diagnostics": diag, + "timestamp": utc_now_iso(), + }) + last_progress_emit = time.time() + + emit_cycle_progress("cycle_started") def _update_subsystem_signal( subsystem_ref: Dict[str, str], deterministic: Dict[str, Any], tag_path: str @@ -1180,10 +1218,19 @@ def _update_subsystem_signal( bucket["sampleTag"] = tag_path for tv in live_values: + processed_live_count += 1 tag_meta = tag_lookup.get(tv.path, {"path": tv.path, "name": tv.path}) subsystem = tag_meta.get("primary_subsystem") or _subsystem_ref("global", "all") is_linked = bool(tag_meta.get("views") or tag_meta.get("equipment")) + now_progress = time.time() + if ( + processed_live_count == 1 + or processed_live_count % progress_emit_interval_tags == 0 + or (now_progress - last_progress_emit) >= progress_emit_interval_sec + ): + emit_cycle_progress("processing_live_tags", current_tag=tv.path) + if tv.error: live_error_count += 1 if is_linked: From 5157857bb4ff28e2d4b42f16027b5c7de52f3115 Mon Sep 17 00:00:00 2001 From: Leor Barak Fishman Date: Mon, 2 Mar 2026 17:27:50 -0800 Subject: [PATCH 16/16] more agentics fixing --- electron-ui/index.html | 20 +- electron-ui/main.js | 13 ++ electron-ui/preload.js | 1 + electron-ui/renderer.js | 353 +++++++++++++++++++++++++++++++--- electron-ui/styles.css | 385 ++++++++++++++++++++++++++++++++++++- scripts/anomaly_monitor.py | 316 ++++++++++++++++++++++++++---- tests/quick_import_test.py | 76 ++++++++ 7 files changed, 1100 insertions(+), 64 deletions(-) create mode 100644 tests/quick_import_test.py diff --git a/electron-ui/index.html b/electron-ui/index.html index a4a7a9b..08adc5d 100644 --- a/electron-ui/index.html +++ b/electron-ui/index.html @@ -566,8 +566,11 @@

Long-Running Agents

- - + + + @@ -584,6 +587,18 @@

Long-Running Agents

Last heartbeatn/a
+
+
+

Subsystem Health

+
+ +
+
+
+
Start monitoring to see subsystem health.
+
+
+