forked from ShaerWare/AI_Secretary_System
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbridge_manager.py
More file actions
356 lines (302 loc) · 12.3 KB
/
bridge_manager.py
File metadata and controls
356 lines (302 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
#!/usr/bin/env python3
"""
Bridge Process Manager for CLI-OpenAI Bridge.
Manages the bridge as a subprocess — start, stop, health check.
The bridge wraps claude CLI into an OpenAI-compatible API on a local port.
In Docker mode (DOCKER_CONTAINER=1), tries subprocess first (if claude CLI
is mounted into the container), then falls back to checking if the bridge
is already running on the host via host.docker.internal.
Usage:
from bridge_manager import bridge_manager
await bridge_manager.start(port=8787, permission_level="chat")
status = bridge_manager.get_status()
await bridge_manager.stop()
"""
import asyncio
import logging
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
# Base directory of the project
BASE_DIR = Path(__file__).parent
# Docker mode detection
IS_DOCKER = os.environ.get("DOCKER_CONTAINER") == "1"
class BridgeProcessManager:
"""Manages the CLI-OpenAI Bridge process."""
def __init__(self) -> None:
self._process: Optional[subprocess.Popen] = None
self._port: int = 8787
self._started_at: Optional[float] = None
self._log_file: Path = BASE_DIR / "logs" / "bridge.log"
self._bridge_dir: Path = BASE_DIR / "services" / "bridge"
self._permission_level: str = "chat"
self._docker_remote: bool = False # True = connected to host bridge
@property
def _bridge_host(self) -> str:
"""Bridge host: host.docker.internal only when using remote host bridge."""
if self._docker_remote:
return "host.docker.internal"
return "127.0.0.1"
@property
def is_running(self) -> bool:
"""Check if bridge process is alive."""
if self._docker_remote:
return True # Assumed running (verified on connect)
if self._process is not None:
return self._process.poll() is None
return False
@property
def pid(self) -> Optional[int]:
"""Return PID of bridge process."""
if not self._docker_remote and self.is_running:
return self._process.pid # type: ignore[union-attr]
return None
async def start(
self,
port: int = 8787,
permission_level: str = "chat",
) -> dict:
"""
Start the bridge subprocess.
Tries subprocess mode first (works locally and in Docker if claude CLI
is mounted). In Docker, falls back to checking host.docker.internal.
Args:
port: Port to run the bridge on.
permission_level: Claude permission level (chat, readonly, edit, full).
Returns:
Dict with status, message, pid, url.
"""
self._port = port
self._permission_level = permission_level
# Try subprocess first (works locally and in Docker with mounted CLI)
result = await self._start_local_mode(port, permission_level)
if result.get("status") == "ok":
self._docker_remote = False
return result
# In Docker: fall back to checking if bridge runs on host
if IS_DOCKER:
host_result = await self._start_docker_mode(port)
if host_result.get("status") == "ok":
return host_result
# Return combined error with both failure reasons
return {
"status": "error",
"error": (
f"Bridge auto-start failed: {result.get('error')}.\n"
f"Host bridge also not found: {host_result.get('error')}.\n"
"Mount claude CLI into container (see docker-compose.yml) "
"or start bridge on host manually."
),
}
return result
async def _start_docker_mode(self, port: int) -> dict:
"""In Docker, check if bridge is reachable on host."""
# Temporarily set remote mode so _bridge_host returns host.docker.internal
self._docker_remote = True
healthy = await self._wait_for_health(timeout=5)
if healthy:
self._started_at = time.time()
logger.info(f"🌉 Bridge detected on host at {self.get_base_url()}")
return {
"status": "ok",
"message": "Bridge detected on host",
"url": self.get_base_url(),
}
self._docker_remote = False
return {
"status": "error",
"error": (
f"Bridge not reachable at http://host.docker.internal:{port}/v1. "
"Start the bridge on the host machine:\n"
f" cd services/bridge && python -m uvicorn src.server.main:app "
f"--host 0.0.0.0 --port {port}"
),
}
async def _start_local_mode(self, port: int, permission_level: str) -> dict:
"""Start bridge as a local subprocess."""
if self.is_running:
return {
"status": "ok",
"message": "Bridge already running",
"pid": self.pid,
"url": self.get_base_url(),
}
# Verify bridge directory exists
if not self._bridge_dir.exists():
return {
"status": "error",
"error": f"Bridge directory not found: {self._bridge_dir}",
}
# Write .env with current settings
self._write_env(port, permission_level)
# Ensure log directory exists
self._log_file.parent.mkdir(parents=True, exist_ok=True)
try:
# Build command — use same Python interpreter
cmd = [
sys.executable,
"-m",
"uvicorn",
"src.server.main:app",
"--host",
"127.0.0.1",
"--port",
str(port),
]
# Open log file
log_fd = open(self._log_file, "a", encoding="utf-8")
log_fd.write(f"\n{'=' * 60}\n")
log_fd.write(f"Bridge starting at {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_fd.write(f"Port: {port}, Permission: {permission_level}\n")
log_fd.write(f"{'=' * 60}\n")
log_fd.flush()
# Set environment
env = os.environ.copy()
env["BRIDGE_HOST"] = "127.0.0.1"
env["BRIDGE_PORT"] = str(port)
env["CLAUDE_PERMISSION_LEVEL"] = permission_level
env["PYTHONUNBUFFERED"] = "1"
# Disable Gemini and GPT providers
env["GEMINI_CLI_PATH"] = ""
env["GPT_CLI_PATH"] = ""
# Clear proxy env vars — bridge talks to localhost (Claude CLI),
# should not inherit VLESS/HTTP proxy from GeminiProvider
for proxy_key in ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy"):
env.pop(proxy_key, None)
self._process = subprocess.Popen(
cmd,
cwd=str(self._bridge_dir),
env=env,
stdout=log_fd,
stderr=subprocess.STDOUT,
start_new_session=True,
)
self._started_at = time.time()
logger.info(f"🌉 Bridge process started (PID: {self._process.pid}, port: {port})")
# Wait for health check
healthy = await self._wait_for_health(timeout=20)
if not healthy:
# Process may have crashed
if self._process.poll() is not None:
return {
"status": "error",
"error": "Bridge process crashed on startup. Check logs/bridge.log",
}
return {
"status": "error",
"error": f"Bridge not responding on port {port} after 20s",
}
logger.info(f"✅ Bridge is healthy on http://127.0.0.1:{port}")
return {
"status": "ok",
"message": "Bridge started successfully",
"pid": self._process.pid,
"url": self.get_base_url(),
}
except FileNotFoundError:
return {
"status": "error",
"error": "Python interpreter not found for bridge",
}
except Exception as e:
logger.error(f"❌ Failed to start bridge: {e}")
return {"status": "error", "error": str(e)}
async def stop(self) -> dict:
"""Stop the bridge process gracefully."""
if self._docker_remote:
self._docker_remote = False
self._started_at = None
return {
"status": "ok",
"message": "Disconnected from host bridge (stop it on the host)",
}
if not self.is_running:
self._process = None
self._started_at = None
return {"status": "ok", "message": "Bridge was not running"}
pid = self._process.pid # type: ignore[union-attr]
logger.info(f"🛑 Stopping bridge (PID: {pid})...")
try:
# Send SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
# Wait up to 5 seconds
for _ in range(50):
if self._process.poll() is not None: # type: ignore[union-attr]
break
await asyncio.sleep(0.1)
else:
# Force kill if still running
logger.warning("⚠️ Bridge didn't stop gracefully, sending SIGKILL")
os.kill(pid, signal.SIGKILL)
self._process.wait(timeout=3) # type: ignore[union-attr]
except ProcessLookupError:
pass # Already dead
except Exception as e:
logger.error(f"Error stopping bridge: {e}")
self._process = None
self._started_at = None
logger.info("🛑 Bridge stopped")
return {"status": "ok", "message": "Bridge stopped"}
def get_status(self) -> dict:
"""Return bridge process status."""
running = self.is_running
result = {
"is_running": running,
"port": self._port,
"url": self.get_base_url() if running else None,
"pid": self.pid,
"permission_level": self._permission_level,
"log_file": str(self._log_file),
"docker_mode": IS_DOCKER,
}
if running and self._started_at:
result["uptime_seconds"] = int(time.time() - self._started_at)
return result
def get_base_url(self) -> str:
"""Return the bridge base URL (OpenAI-compatible /v1 prefix)."""
return f"http://{self._bridge_host}:{self._port}/v1"
def _write_env(self, port: int, permission_level: str) -> None:
"""Write/update .env file for the bridge."""
env_path = self._bridge_dir / ".env"
env_content = (
f"BRIDGE_HOST=127.0.0.1\n"
f"BRIDGE_PORT={port}\n"
f"BRIDGE_DEBUG=false\n"
f"CLAUDE_PERMISSION_LEVEL={permission_level}\n"
f"QUEUE_ENABLED=true\n"
f"HEALTH_CHECK_ON_STARTUP=true\n"
f"HIDE_UNAVAILABLE_MODELS=true\n"
f"GEMINI_CLI_PATH=\n"
f"GPT_CLI_PATH=\n"
)
try:
env_path.write_text(env_content)
except OSError:
pass # Read-only filesystem in Docker; config passed via env vars
async def _wait_for_health(self, timeout: int = 20) -> bool:
"""Wait for bridge /health endpoint to respond."""
url = f"http://{self._bridge_host}:{self._port}/health"
start = time.time()
while time.time() - start < timeout:
try:
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=3.0)
if resp.status_code == 200:
return True
except (httpx.ConnectError, httpx.ReadTimeout, httpx.ConnectTimeout):
pass
except Exception:
pass
# Check if local process died
if not self._docker_remote and self._process and self._process.poll() is not None:
return False
await asyncio.sleep(0.5)
return False
# Global singleton
bridge_manager = BridgeProcessManager()