|
| 1 | +# SPDX-License-Identifier: PMPL-1.0-or-later |
| 2 | +# Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) <j.d.a.jewell@open.ac.uk> |
| 3 | +# |
| 4 | +# HttpCapabilityGateway.VeriSimDB — GenServer-backed VeriSimDB persistence client. |
| 5 | +# |
| 6 | +# Provides async audit log persistence to VeriSimDB for every gateway decision |
| 7 | +# (allow, deny, circuit-open, rate-limit-exceeded). ETS remains the hot path |
| 8 | +# for O(1) circuit-breaker and rate-limiter lookups; VeriSimDB receives |
| 9 | +# a durable append-only stream via cast for forensic replay and Hypatia analysis. |
| 10 | +# |
| 11 | +# Collection: capgw:audit |
| 12 | +# Document schema: |
| 13 | +# id — "ts:<unix_ms>:<request_id>" (sortable, unique) |
| 14 | +# timestamp — ISO-8601 UTC |
| 15 | +# action — :allow | :deny | :circuit_open | :rate_limited |
| 16 | +# backend — backend atom or nil |
| 17 | +# path — request path |
| 18 | +# verb — HTTP method |
| 19 | +# trust — trust level atom |
| 20 | +# latency_us — upstream latency in microseconds (nil for denied) |
| 21 | +# policy_ref — policy rule matched (nil for rate-limit) |
| 22 | + |
| 23 | +defmodule HttpCapabilityGateway.VeriSimDB do |
| 24 | + use GenServer |
| 25 | + require Logger |
| 26 | + |
| 27 | + @moduledoc """ |
| 28 | + Async VeriSimDB client for gateway audit-log persistence. |
| 29 | +
|
| 30 | + Writes are fire-and-forget (GenServer.cast) so the hot request path is |
| 31 | + never blocked by VeriSimDB availability. A local ETS buffer holds entries |
| 32 | + when VeriSimDB is unreachable and flushes on reconnect. |
| 33 | +
|
| 34 | + ## Usage |
| 35 | +
|
| 36 | + VeriSimDB.audit_allow(request, backend, policy_ref, latency_us) |
| 37 | + VeriSimDB.audit_deny(request, policy_ref) |
| 38 | + VeriSimDB.audit_circuit_open(backend) |
| 39 | + VeriSimDB.audit_rate_limited(request) |
| 40 | +
|
| 41 | + ## Configuration |
| 42 | +
|
| 43 | + Set VERISIMDB_URL environment variable (default: http://localhost:8080). |
| 44 | + """ |
| 45 | + |
| 46 | + @collection "capgw:audit" |
| 47 | + @buffer_table :capgw_verisimdb_buffer |
| 48 | + @flush_interval_ms 5_000 |
| 49 | + @max_buffer 1_000 |
| 50 | + |
| 51 | + # --------------------------------------------------------------------------- |
| 52 | + # Public API |
| 53 | + # --------------------------------------------------------------------------- |
| 54 | + |
| 55 | + @doc "Start the VeriSimDB client GenServer." |
| 56 | + def start_link(opts \\ []) do |
| 57 | + GenServer.start_link(__MODULE__, opts, name: __MODULE__) |
| 58 | + end |
| 59 | + |
| 60 | + @doc "Append an allow decision to the audit log." |
| 61 | + def audit_allow(path, verb, trust, backend, policy_ref, latency_us) do |
| 62 | + entry = build_entry(:allow, path, verb, trust, backend: backend, policy_ref: policy_ref, latency_us: latency_us) |
| 63 | + GenServer.cast(__MODULE__, {:audit, entry}) |
| 64 | + end |
| 65 | + |
| 66 | + @doc "Append a deny decision to the audit log." |
| 67 | + def audit_deny(path, verb, trust, policy_ref) do |
| 68 | + entry = build_entry(:deny, path, verb, trust, policy_ref: policy_ref) |
| 69 | + GenServer.cast(__MODULE__, {:audit, entry}) |
| 70 | + end |
| 71 | + |
| 72 | + @doc "Append a circuit-open event to the audit log." |
| 73 | + def audit_circuit_open(backend) do |
| 74 | + entry = build_entry(:circuit_open, nil, nil, nil, backend: backend) |
| 75 | + GenServer.cast(__MODULE__, {:audit, entry}) |
| 76 | + end |
| 77 | + |
| 78 | + @doc "Append a rate-limit event to the audit log." |
| 79 | + def audit_rate_limited(path, verb, trust) do |
| 80 | + entry = build_entry(:rate_limited, path, verb, trust, []) |
| 81 | + GenServer.cast(__MODULE__, {:audit, entry}) |
| 82 | + end |
| 83 | + |
| 84 | + @doc "Retrieve recent audit entries for a given time range (ISO-8601 strings)." |
| 85 | + def get_range(from_iso, to_iso) do |
| 86 | + GenServer.call(__MODULE__, {:get_range, from_iso, to_iso}) |
| 87 | + end |
| 88 | + |
| 89 | + # --------------------------------------------------------------------------- |
| 90 | + # GenServer callbacks |
| 91 | + # --------------------------------------------------------------------------- |
| 92 | + |
| 93 | + @impl true |
| 94 | + def init(_opts) do |
| 95 | + :ets.new(@buffer_table, [:named_table, :ordered_set, :public]) |
| 96 | + schedule_flush() |
| 97 | + {:ok, %{base_url: base_url(), healthy: false, buffer_size: 0}} |
| 98 | + end |
| 99 | + |
| 100 | + @impl true |
| 101 | + def handle_cast({:audit, entry}, state) do |
| 102 | + doc_id = Map.fetch!(entry, :id) |
| 103 | + case put_entry(state.base_url, doc_id, entry) do |
| 104 | + :ok -> |
| 105 | + {:noreply, %{state | healthy: true}} |
| 106 | + |
| 107 | + {:error, reason} -> |
| 108 | + Logger.warning("VeriSimDB unavailable (#{inspect(reason)}); buffering audit entry #{doc_id}") |
| 109 | + buffer_entry(doc_id, entry, state.buffer_size) |
| 110 | + {:noreply, %{state | healthy: false, buffer_size: min(state.buffer_size + 1, @max_buffer)}} |
| 111 | + end |
| 112 | + end |
| 113 | + |
| 114 | + @impl true |
| 115 | + def handle_call({:get_range, _from_iso, _to_iso}, _from, state) do |
| 116 | + # Phase 2: prefix-scan by timestamp key not yet available in VeriSimDB v1. |
| 117 | + # Return empty list for now; Hypatia scans the collection directly. |
| 118 | + {:reply, {:ok, []}, state} |
| 119 | + end |
| 120 | + |
| 121 | + @impl true |
| 122 | + def handle_info(:flush_buffer, state) do |
| 123 | + new_size = flush_buffer(state.base_url) |
| 124 | + schedule_flush() |
| 125 | + {:noreply, %{state | buffer_size: new_size}} |
| 126 | + end |
| 127 | + |
| 128 | + # --------------------------------------------------------------------------- |
| 129 | + # Internal helpers |
| 130 | + # --------------------------------------------------------------------------- |
| 131 | + |
| 132 | + defp base_url do |
| 133 | + System.get_env("VERISIMDB_URL", "http://localhost:8080") |
| 134 | + end |
| 135 | + |
| 136 | + defp schedule_flush do |
| 137 | + Process.send_after(self(), :flush_buffer, @flush_interval_ms) |
| 138 | + end |
| 139 | + |
| 140 | + defp build_entry(action, path, verb, trust, extras) do |
| 141 | + ts_ms = System.system_time(:millisecond) |
| 142 | + request_id = :crypto.strong_rand_bytes(6) |> Base.url_encode64(padding: false) |
| 143 | + id = "ts:#{ts_ms}:#{request_id}" |
| 144 | + |
| 145 | + base = %{ |
| 146 | + id: id, |
| 147 | + timestamp: DateTime.utc_now() |> DateTime.to_iso8601(), |
| 148 | + action: action, |
| 149 | + path: path, |
| 150 | + verb: verb, |
| 151 | + trust: trust |
| 152 | + } |
| 153 | + |
| 154 | + Enum.reduce(extras, base, fn {k, v}, acc -> Map.put(acc, k, v) end) |
| 155 | + end |
| 156 | + |
| 157 | + defp put_entry(base_url, doc_id, entry) do |
| 158 | + safe_id = URI.encode_www_form(doc_id) |
| 159 | + url = "#{base_url}/v1/#{@collection}/#{safe_id}" |
| 160 | + case Req.put(url, json: entry) do |
| 161 | + {:ok, %{status: s}} when s in 200..299 -> :ok |
| 162 | + {:ok, %{status: s}} -> {:error, {:http_status, s}} |
| 163 | + {:error, reason} -> {:error, reason} |
| 164 | + end |
| 165 | + end |
| 166 | + |
| 167 | + defp buffer_entry(doc_id, entry, buffer_size) when buffer_size < @max_buffer do |
| 168 | + :ets.insert(@buffer_table, {doc_id, entry}) |
| 169 | + end |
| 170 | + defp buffer_entry(_doc_id, _entry, _full), do: :ok |
| 171 | + |
| 172 | + defp flush_buffer(base_url) do |
| 173 | + entries = :ets.tab2list(@buffer_table) |
| 174 | + Enum.each(entries, fn {doc_id, entry} -> |
| 175 | + case put_entry(base_url, doc_id, entry) do |
| 176 | + :ok -> :ets.delete(@buffer_table, doc_id) |
| 177 | + {:error, _} -> :ok # leave in buffer for next flush |
| 178 | + end |
| 179 | + end) |
| 180 | + :ets.info(@buffer_table, :size) |
| 181 | + end |
| 182 | +end |
0 commit comments