-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
142 lines (115 loc) · 4.42 KB
/
main.py
File metadata and controls
142 lines (115 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Main FastAPI application
"""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import router
from app.config import settings
from app.middleware import LoggingMiddleware, SecurityHeadersMiddleware
from app.services.mongodb import mongodb_service
from app.services.telegram import telegram_service
from app.utils.session_manager import session_manager
# Configure logging
logging.basicConfig(
level=logging.INFO if not settings.debug else logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events."""
logger.info("Starting Telegram Service...")
# Connect to MongoDB and initialize session manager
try:
await mongodb_service.connect()
session_manager.set_mongodb_service(mongodb_service)
logger.info("Connected to MongoDB and initialized session manager")
except Exception as exc:
logger.error(f"Failed to connect to MongoDB: {exc}")
raise
# Reconnect existing sessions
try:
active_sessions = await session_manager.get_all_active_sessions()
logger.info(
f"Found {len(active_sessions)} active sessions to reconnect")
for session in active_sessions:
session_id = session.get("session_id")
agent_id = session.get("agent_id")
if not session_id:
continue
try:
success = await telegram_service.reconnect_client(session_id)
if success:
logger.info(f"Reconnected session: {session_id}")
await telegram_service.setup_message_handler(
session_id=session_id,
agent_id=agent_id,
)
else:
logger.warning(
f"Failed to reconnect session: {session_id}")
await session_manager.deactivate_session(session_id)
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Error reconnecting session {session_id}: {exc}")
await session_manager.deactivate_session(session_id)
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Error during startup reconnection: {exc}")
logger.info("Telegram Service started successfully")
# Application is running
yield
# Shutdown sequence
logger.info("Shutting down Telegram Service...")
# Disconnect Telegram clients
for session_id, client in list(telegram_service.clients.items()):
try:
if client.is_connected():
await client.disconnect()
logger.info(f"Disconnected session: {session_id}")
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Error disconnecting session {session_id}: {exc}")
# Disconnect from MongoDB
try:
await mongodb_service.disconnect()
logger.info("Disconnected from MongoDB")
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Error disconnecting from MongoDB: {exc}")
logger.info("Telegram Service shut down successfully")
# Create FastAPI app
app = FastAPI(
title="Telegram Service API",
description="A Python service for managing Telegram connections and messages",
version="1.0.0",
lifespan=lifespan,
)
# CORS (allow all origins for simplicity; tighten if needed)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Custom middlewares
app.add_middleware(LoggingMiddleware)
app.add_middleware(SecurityHeadersMiddleware)
# Routes
app.include_router(router)
@app.get("/")
async def root():
"""Root endpoint"""
return {"service": "Telegram Service", "version": "1.0.0", "status": "running"}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
active_sessions = len(telegram_service.clients)
return {"status": "healthy", "active_sessions": active_sessions}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host=settings.host,
port=settings.port,
reload=settings.debug,
)