Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,30 @@ make validate-config
- `scripts/host-egress-owner.sh`: upstream proxy and DoH IPs
- `scripts/host-nftables.sh`: bridge interface name and infra CIDRs

## Chain visual

Set `logging.chain_visual: true` in your config to get a terminal-friendly
proxychains-style display on stderr. It prints on startup (topology only)
and again whenever the per-hop health state changes:

```
[egressd] |S-chain|-<>-proxy1:3128-<>-proxy2:3128-<>-OK
[egressd] hop_0: proxy1:3128 OK 42ms
[egressd] hop_1: proxy2:3128 OK 38ms
```

When a hop is unreachable the connector flips to `-XX-` and the line ends
with `FAIL`:

```
[egressd] |S-chain|-<>-proxy1:3128-XX-proxy2:3128-<>-FAIL
[egressd] hop_0: proxy1:3128 OK 42ms
[egressd] hop_1: proxy2:3128 FAIL Connection refused
```

The visual is disabled by default (`logging.chain_visual: false`) so it does not
interfere with JSON log pipelines.

## Maintenance and cleanup

Run repository maintenance checks (unfinished markers, backup files, stale artifacts, stray cache dirs, and unexpected embedded git repos) for first-party code:
Expand Down
3 changes: 3 additions & 0 deletions egressd/config.host.example.json5
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
logging: {
level: "INFO",
json: true,
// Set chain_visual to true for a terminal-friendly proxychains-style
// display showing each hop and its health status.
chain_visual: false,
},

supervisor: {
Expand Down
3 changes: 3 additions & 0 deletions egressd/config.json5
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
logging: {
level: "INFO",
json: true,
// Set chain_visual to true for a terminal-friendly proxychains-style
// display showing each hop and its health status.
chain_visual: false,
},

supervisor: {
Expand Down
107 changes: 107 additions & 0 deletions egressd/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,14 +510,120 @@ def wait_for_chain_ready(cfg: Dict[str, Any]) -> None:
raise RuntimeError("shutdown requested")


def _extract_hop_label(hop: Any) -> str:
"""Return a sanitized ``host[:port]`` label for a hop config entry."""
raw_url = hop.get("url", "") if isinstance(hop, dict) else ""
if not raw_url:
return ""

try:
parsed = urlparse(raw_url)
except (ValueError, AttributeError):
# Never return the raw URL to avoid leaking credentials.
return ""

host = parsed.hostname or ""
port = parsed.port

# Derive an effective port similar to connectivity probing defaults:
# - 80 for HTTP/WS when no explicit port is provided
# - 443 for HTTPS/WSS when no explicit port is provided
if port is None:
if parsed.scheme in ("https", "wss"):
port = 443
elif parsed.scheme in ("http", "ws"):
port = 80

if host and port:
return f"{host}:{port}"
if host:
return host

# Fall back to an empty label rather than exposing raw URL/userinfo.
return ""
def _all_hops_ok(hops: List[Any], hop_statuses: Dict[str, Any]) -> bool:
"""Return True only when every hop in *hops* has a passing status entry."""
return bool(hops) and all(
bool(hop_statuses.get(f"hop_{idx}", {}).get("ok", False))
for idx in range(len(hops))
)


def format_chain_visual(cfg: Dict[str, Any], hop_statuses: Optional[Dict[str, Any]] = None) -> str:
"""Return a terminal-friendly proxychains-style ASCII chain visualization.

When *hop_statuses* is None the output shows the configured topology with
a trailing ``...`` to indicate that probing has not run yet. When
*hop_statuses* is provided the connectors and final token reflect the
current probe results, and a per-hop detail line is appended for each hop.
"""
chain_cfg = cfg.get("chain", {})
hops = chain_cfg.get("hops", [])

if not hops:
return "[egressd] chain: (no hops configured)"

segments: List[str] = ["|S-chain|"]

for idx, hop in enumerate(hops):
label = _extract_hop_label(hop)

if hop_statuses is not None:
ok = bool(hop_statuses.get(f"hop_{idx}", {}).get("ok", False))
connector = "-<>-" if ok else "-XX-"
else:
connector = "-<>-"

segments.append(f"{connector}{label}")

if hop_statuses is not None:
final = "-<>-OK" if _all_hops_ok(hops, hop_statuses) else "-<>-FAIL"
else:
final = "-<>-..."

lines = [f"[egressd] {''.join(segments)}{final}"]

if hop_statuses:
for idx, hop in enumerate(hops):
label = _extract_hop_label(hop)
hop_key = f"hop_{idx}"
status = hop_statuses.get(hop_key, {})
ok = bool(status.get("ok", False))
elapsed_ms = status.get("elapsed_ms")

if ok:
timing = f"{elapsed_ms}ms" if elapsed_ms is not None else "ok"
lines.append(f"[egressd] hop_{idx}: {label:<30} OK {timing}")
else:
err_msg = status.get("error") or status.get("status_line") or "unreachable"
lines.append(f"[egressd] hop_{idx}: {label:<30} FAIL {str(err_msg).splitlines()[0]}")

return "\n".join(lines)


def print_chain_visual(cfg: Dict[str, Any], hop_statuses: Optional[Dict[str, Any]] = None) -> None:
"""Print the chain visual to stderr when ``logging.chain_visual`` is enabled."""
if not _as_bool(cfg.get("logging", {}).get("chain_visual"), default=False):
return
print(format_chain_visual(cfg, hop_statuses), file=sys.stderr, flush=True)


def hop_health_loop(cfg: Dict[str, Any]) -> None:
interval_s = int(cfg.get("supervisor", {}).get("hop_check_interval_s", 5))
target = str(cfg.get("chain", {}).get("canary_target", ""))
last_overall_ok: Optional[bool] = None
first_run = True
while not STOP_EVENT.is_set():
checked_at = int(time.time())
statuses = collect_hop_statuses(cfg, target)
set_hop_statuses(statuses, checked_at=checked_at)
refresh_ready_state(cfg, now=checked_at)
hops = cfg.get("chain", {}).get("hops", [])
current_ok = _all_hops_ok(hops, statuses)
if first_run or current_ok != last_overall_ok:
print_chain_visual(cfg, statuses)
Comment on lines +622 to +624
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Trigger chain visual on any hop-status change

hop_health_loop only reprints when the aggregate _all_hops_ok(...) boolean flips, so per-hop transitions are suppressed whenever overall state stays unhealthy (for example, one failed hop is replaced by a different failed hop). This leaves operators with stale chain diagnostics despite real hop changes. Track and compare per-hop status (or an ok tuple) rather than a single aggregate flag before deciding to print.

Useful? React with 👍 / 👎.

last_overall_ok = current_ok
first_run = False
STOP_EVENT.wait(interval_s)


Expand Down Expand Up @@ -640,6 +746,7 @@ def main(argv: Optional[List[str]] = None) -> int:

STOP_EVENT.clear()
reset_state(cfg)
print_chain_visual(cfg)
server = run_health_server(
cfg.get("supervisor", {}).get("health_bind", "0.0.0.0"),
int(cfg.get("supervisor", {}).get("health_port", 9191)),
Expand Down
106 changes: 106 additions & 0 deletions tests/test_supervisor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import json
import sys
import types
Expand Down Expand Up @@ -65,5 +66,110 @@ def test_compute_readiness_relaxed_mode_requires_at_least_one_healthy_hop(self):
self.assertEqual([], readiness["reasons"])


class ChainVisualTests(unittest.TestCase):
def _cfg(self, hops=None, canary="exitserver:9999"):
return {
"chain": {
"hops": hops or [{"url": "http://proxy1:3128"}, {"url": "http://proxy2:3128"}],
"canary_target": canary,
},
}

def test_topology_only_uses_pending_suffix(self):
"""Without hop_statuses the visual ends with '...' to signal no probe yet."""
visual = supervisor.format_chain_visual(self._cfg())
self.assertIn("[egressd]", visual)
self.assertIn("|S-chain|", visual)
self.assertIn("...", visual)
self.assertNotIn("OK", visual)
self.assertNotIn("FAIL", visual)

def test_all_hops_ok_produces_ok_suffix(self):
"""When all hops are healthy the final token is 'OK'."""
statuses = {
"hop_0": {"ok": True, "elapsed_ms": 42},
"hop_1": {"ok": True, "elapsed_ms": 38},
}
visual = supervisor.format_chain_visual(self._cfg(), statuses)
self.assertIn("-<>-OK", visual)
self.assertNotIn("-XX-", visual)
self.assertNotIn("FAIL", visual)

def test_failed_hop_produces_fail_suffix_and_xx_connector(self):
"""A failed hop uses '-XX-' connector and the line ends with 'FAIL'."""
statuses = {
"hop_0": {"ok": True, "elapsed_ms": 42},
"hop_1": {"ok": False, "error": "Connection refused"},
}
visual = supervisor.format_chain_visual(self._cfg(), statuses)
self.assertIn("-XX-", visual)
self.assertIn("FAIL", visual)
self.assertNotIn("-<>-OK", visual)

def test_hop_labels_appear_in_chain_line(self):
"""Each hop hostname:port must appear in the main chain line."""
visual = supervisor.format_chain_visual(self._cfg())
lines = visual.splitlines()
chain_line = lines[0]
self.assertIn("proxy1:3128", chain_line)
self.assertIn("proxy2:3128", chain_line)

def test_per_hop_detail_lines_present_when_statuses_provided(self):
"""After the chain line there is one detail line per hop."""
statuses = {
"hop_0": {"ok": True, "elapsed_ms": 42},
"hop_1": {"ok": False, "error": "timeout"},
}
visual = supervisor.format_chain_visual(self._cfg(), statuses)
lines = visual.splitlines()
# chain line + 2 detail lines
self.assertEqual(len(lines), 3)
self.assertIn("hop_0", lines[1])
self.assertIn("hop_1", lines[2])
self.assertIn("OK", lines[1])
self.assertIn("FAIL", lines[2])

def test_no_hops_returns_safe_message(self):
"""An empty hops list must not raise; it returns a safe message."""
cfg = {"chain": {"hops": [], "canary_target": "x:9"}}
visual = supervisor.format_chain_visual(cfg)
self.assertIn("no hops", visual)

def test_single_hop_chain(self):
"""A single-hop chain produces exactly one hop label and no '-XX-'."""
cfg = {"chain": {"hops": [{"url": "http://solo:3128"}], "canary_target": "t:80"}}
statuses = {"hop_0": {"ok": True, "elapsed_ms": 10}}
visual = supervisor.format_chain_visual(cfg, statuses)
self.assertIn("solo:3128", visual)
self.assertIn("-<>-OK", visual)
self.assertNotIn("-XX-", visual)

def _capture_stderr(self, fn, *args, **kwargs) -> str:
"""Call *fn* with redirected stderr and return whatever was written."""
buf = io.StringIO()
old_stderr = sys.stderr
try:
sys.stderr = buf
fn(*args, **kwargs)
finally:
sys.stderr = old_stderr
return buf.getvalue()
Comment on lines +147 to +156
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_capture_stderr reassigns the global sys.stderr, which can interfere with other tests if the suite is ever run in parallel. Consider using the standard library helper (contextlib.redirect_stderr) for the same behavior with clearer intent, and keep the redirected scope as small as possible.

Copilot uses AI. Check for mistakes.

def test_print_chain_visual_no_output_when_disabled(self):
"""print_chain_visual must produce no output when chain_visual is false."""
cfg = self._cfg()
cfg["logging"] = {"chain_visual": False}
output = self._capture_stderr(supervisor.print_chain_visual, cfg)
self.assertEqual(output, "")

def test_print_chain_visual_writes_to_stderr_when_enabled(self):
"""print_chain_visual must write the visual to stderr when enabled."""
cfg = self._cfg()
cfg["logging"] = {"chain_visual": True}
output = self._capture_stderr(supervisor.print_chain_visual, cfg)
self.assertIn("[egressd]", output)
self.assertIn("|S-chain|", output)


if __name__ == "__main__":
unittest.main()
Loading