Ce document explique les subtilités de l'architecture multi-threadée de l'application, en se concentrant sur les agents (workers) et leur fonctionnement.
L'application utilise une architecture événementielle basée sur des threads multiples qui communiquent via un ServiceBus. Cette approche découple les composants et permet un traitement parallèle efficace.
L'application s'articule autour de plusieurs threads qui s'exécutent en parallèle :
- Thread Principal : Orchestrateur (CryptoAnalyzer) qui coordonne le workflow et maintient l'état global
- ServiceBus Thread : Gestionnaire central des événements qui reçoit et distribue tous les messages
- Worker Threads : Agents spécialisés qui traitent les tâches en parallèle
- DataFetcher : Récupère les données depuis les APIs externes (CoinGecko, Binance)
- RSICalculator : Effectue les calculs mathématiques du RSI
- DatabaseManager : Gère toutes les interactions avec SQLite
- DisplayAgent : Affiche les résultats finaux
- StatusServer : Serveur HTTP optionnel pour monitorer l'état des workers
Tous ces threads communiquent exclusivement via le ServiceBus, sans jamais s'appeler directement.
Tous les agents héritent de QueueWorkerThread, qui implémente le pattern Producer-Consumer :
class QueueWorkerThread(threading.Thread):
def __init__(self, service_bus, name):
super().__init__(name=name)
self.service_bus = service_bus
self.work_queue = queue.Queue() # ← File thread-safe
self._running = TrueAvantages de cette approche :
- Découplage : Les producteurs (qui ajoutent des tâches) ne bloquent jamais
- Thread-safe :
queue.Queuegère automatiquement la synchronisation - Backpressure naturel : Si un agent est surchargé, sa queue s'allonge mais n'impacte pas les autres
Chaque agent exécute une boucle dans son propre thread :
def run(self):
logger.info(f"Thread '{self.name}' started.")
while self._running:
try:
# Bloque jusqu'à 1 seconde en attendant une tâche
task = self.work_queue.get(timeout=1)
if task is None: # Signal d'arrêt
break
# Décompose et exécute la tâche
method_name, args, kwargs = task
method = getattr(self, method_name)
method(*args, **kwargs)
self.work_queue.task_done() # ← CRUCIAL pour queue.join()
except queue.Empty:
continue # Timeout écoulé, rebouclePoints clés :
timeout=1: Permet de vérifier régulièrement_runningsans bloquer indéfinimenttask_done(): Essentiel pour synchroniser avecqueue.join()si utilisé- Gestion d'erreur : Les exceptions ne tuent pas le thread
Les tâches sont ajoutées via la méthode add_task() :
def add_task(self, method_name: str, *args, **kwargs):
self.work_queue.put((method_name, args, kwargs))Exemple d'utilisation dans DataFetcher :
def _handle_fetch_top_coins_requested(self, event):
# Cette méthode est appelée par le ServiceBus
# Elle délègue le travail lourd au thread du worker
self.add_task("_fetch_top_coins_task", event.n)Le ServiceBus est lui-même un thread qui :
- Reçoit les événements publiés par les composants
- Sérialise/désérialise les payloads (dataclasses → dict → dataclasses)
- Distribue les événements aux handlers abonnés
class ServiceBusBase(threading.Thread):
def __init__(self, url, consumer_name):
super().__init__(name=f"ServiceBus-{consumer_name}")
self._handlers: Dict[str, list[HandlerInfo]] = defaultdict(list)
self._event_schemas: Dict[str, type] = {}Abonnement à un événement :
def subscribe(self, event_name: str, subscriber: Callable):
self._topics.add(event_name)
handler_info = HandlerInfo(handler=subscriber)
self._handlers[event_name].append(handler_info)Publication d'un événement :
def publish(self, event_name: str, payload: Any, producer_name: str):
message = self._prepare_payload(payload, event_name)
self.client.publish(
topic=event_name,
message=message,
producer=producer_name,
message_id=str(uuid.uuid4())
)Le flux de communication suit toujours le même pattern en 4 étapes :
Étape 1 : Publication d'un événement
- Un worker (ex: DataFetcher) termine une tâche et publie un événement
- Appel à
service_bus.publish("TopCoinsFetched", payload, producer_name)
Étape 2 : Réception par le ServiceBus
- Le ServiceBus reçoit l'événement dans son thread dédié
- Sérialise le payload (dataclass → dict)
- Recherche tous les handlers abonnés à cet événement
Étape 3 : Invocation des handlers
- Le ServiceBus appelle chaque handler abonné (ex:
CryptoAnalyzer._handle_top_coins_fetched()) - Important : Les handlers s'exécutent dans le thread du ServiceBus, ils doivent être rapides
Étape 4 : Délégation aux workers
- Le handler ajoute une tâche à la queue d'un worker via
add_task() - Le worker traite la tâche dans son propre thread, de manière asynchrone
Responsabilité : Effectuer des appels réseau (API CoinGecko, Binance)
class DataFetcher(QueueWorkerThread):
def __init__(self, service_bus):
super().__init__(service_bus=service_bus, name="DataFetcher")
self.cg = CoinGeckoAPI()
self.binance = ccxt.binance({"enableRateLimit": True})Gestion des erreurs réseau avec retry :
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=2, min=5, max=30),
retry=retry_if_exception_type(requests.exceptions.RequestException)
)
def fetch_one_page(page_num: int) -> List[dict]:
return self.cg.get_coins_markets(...)Point important : Les opérations I/O sont exécutées dans le thread du DataFetcher, ne bloquant jamais les autres composants.
Responsabilité : Calculs mathématiques intensifs
class RSICalculator(QueueWorkerThread):
def _calculate_rsi_task(self, coin_id_symbol, data, timeframe):
# Calculs pandas/numpy intensifs
delta = valid_data.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=self.periods).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=self.periods).mean()
rs = gain / loss
rsi_series = 100 - (100 / (1 + rs))
# Publication du résultat
self.service_bus.publish("RSICalculated", event, self.__class__.__name__)Isolation CPU : Les calculs se font dans un thread dédié, permettant à d'autres agents de continuer à travailler en parallèle.
Particularité : SQLite nécessite une connexion par thread
class DatabaseManager(QueueWorkerThread):
def __init__(self, db_name="crypto_data.db", service_bus=None):
super().__init__(service_bus=service_bus, name="DatabaseManager")
self.db_name = db_name
self.conn = None # Sera créé dans run()
self._initialized_event = threading.Event() # Synchronisation
def run(self):
# Connexion créée dans le thread du worker
self.conn = sqlite3.connect(self.db_name, check_same_thread=False)
self.cursor = self.conn.cursor()
self._initialize_tables()
self._initialized_event.set() # Signale que la DB est prête
# Appel de la boucle de traitement parente
super().run()
self._close()Pattern de synchronisation :
_initialized_event: Permet aux autres composants d'attendre que la DB soit prête- Toutes les opérations SQL se font dans le thread du DatabaseManager
- Évite les problèmes de concurrence avec SQLite
Responsabilité : Affichage final des résultats
class DisplayAgent(QueueWorkerThread):
def _display_results_and_publish(self, event: FinalResultsReady):
self._display_results(event)
# Signale la fin du traitement
self.service_bus.publish("DisplayCompleted", DisplayCompleted(), self.__class__.__name__)Simplicité : Peu de tâches, mais isolation importante pour ne pas bloquer l'orchestrateur.
def _start_services(self):
all_services = self.services + [self.service_bus]
for service in all_services:
if not service.is_alive():
service.start() # ← Lance le thread
time.sleep(0.5) # Laisse le temps aux threads de s'initialiserdef stop(self, timeout: Optional[float] = 30):
logger.info(f"Stop requested for '{self.name}'.")
# Attend que la queue se vide (avec timeout)
start_time = time.time()
while not self.work_queue.empty():
if timeout and (time.time() - start_time) > timeout:
logger.warning(f"Timeout reached. Force stopping...")
break
time.sleep(0.1)
self._running = False
self.work_queue.put(None) # Signal d'arrêt
if self.is_alive():
self.join(timeout=5) # Attend la fin du threadOrdre d'arrêt : Les services sont arrêtés dans l'ordre inverse de leur démarrage.
class CryptoAnalyzer:
def __init__(self):
self._processing_completed = threading.Event()
def _on_display_completed(self, event):
# Signale au thread principal que tout est terminé
self._processing_completed.set()
def run(self):
self._start_services()
self.start_workflow()
self._processing_completed.wait() # Bloque jusqu'au signal
self._stop_services()Problème potentiel : Handler du ServiceBus qui bloque en attendant une réponse
Solution : Toujours déléguer le travail lourd via add_task()
# ❌ MAUVAIS : Bloque le ServiceBus
def _handle_event(self, event):
result = self.do_heavy_work() # Opération longue
self.service_bus.publish("Result", result)
# ✅ BON : Délègue au thread du worker
def _handle_event(self, event):
self.add_task("_do_heavy_work_task", event)
def _do_heavy_work_task(self, event):
result = self.do_heavy_work()
self.service_bus.publish("Result", result)Si un agent accumule trop de tâches dans sa queue :
def get_status(self) -> dict:
return {
"name": self.name,
"tasks_in_queue": self.work_queue.qsize(), # Monitoring
"last_activity_time": self._last_activity_time
}Solutions possibles :
- Ajouter des threads supplémentaires pour l'agent surchargé
- Implémenter une limite de taille de queue avec blocage
- Ajuster les priorités de traitement
Les handlers d'événements doivent être ultra-rapides car ils s'exécutent dans le thread du ServiceBus :
def _handle_fetch_requested(self, event):
# ✅ Rapide : juste ajouter à la queue
self.add_task("_fetch_task", event.url)
def _fetch_task(self, url):
# ✅ Lent mais isolé : s'exécute dans le thread du worker
response = requests.get(url, timeout=30)Les événements passent par le ServiceBus sous forme de dictionnaires :
@dataclass
class HistoricalPricesFetched:
coin_id_symbol: Tuple[str, str]
prices_df_json: Optional[str] # ← Pandas DataFrame sérialisé en JSON
timeframe: str
# Sérialisation
prices_json = prices_df.to_json(orient="split")
# Désérialisation
prices_df = pd.read_json(StringIO(prices_json), orient="split")Pourquoi ? Les objets volumineux (DataFrames) ne peuvent pas être passés directement entre threads via le bus réseau.
Toujours publier un événement d'échec pour débloquer les compteurs :
try:
result = self.process_coin(coin)
self.service_bus.publish("Success", result)
except Exception as e:
logger.error(f"Failed: {e}")
self.service_bus.publish("CoinProcessingFailed",
CoinProcessingFailed(coin_id_symbol=coin))Problème : État partagé entre threads
# ❌ DANGEREUX
class Agent:
def __init__(self):
self.counter = 0 # État partagé non protégé
def task(self):
self.counter += 1 # Race condition!Solution : Utiliser des locks ou éviter l'état partagé
# ✅ Option 1 : Lock
class Agent:
def __init__(self):
self.counter = 0
self._lock = threading.Lock()
def task(self):
with self._lock:
self.counter += 1
# ✅ Option 2 : État local (préféré)
class Agent:
def task(self):
# Pas d'état partagé, tout passe par événements
result = self.compute()
self.service_bus.publish("Result", result)Voici une séquence typique d'exécution pour analyser une cryptomonnaie :
Phase 1 : Récupération des données de marché
- CryptoAnalyzer publie
FetchTopCoinsRequestedsur le ServiceBus - ServiceBus notifie DataFetcher via son handler
_handle_fetch_top_coins_requested - DataFetcher ajoute la tâche
_fetch_top_coins_taskà sa queue - DataFetcher exécute la tâche dans son thread et appelle l'API CoinGecko
- Une fois terminé, DataFetcher publie
TopCoinsFetchedavec les résultats - ServiceBus notifie CryptoAnalyzer qui traite la liste des cryptos
Phase 2 : Récupération des prix historiques
- CryptoAnalyzer publie
FetchHistoricalPricesRequestedpour chaque crypto et timeframe - DataFetcher récupère les données OHLCV depuis Binance
- DataFetcher publie
HistoricalPricesFetchedavec les données JSON sérialisées - ServiceBus notifie deux handlers en parallèle :
- CryptoAnalyzer pour continuer le workflow
- DatabaseManager pour sauvegarder les prix (via
_db_save_prices)
Phase 3 : Calcul du RSI
- CryptoAnalyzer publie
CalculateRSIRequestedavec les prix - RSICalculator reçoit l'événement et ajoute
_calculate_rsi_taskà sa queue - RSICalculator effectue les calculs pandas/numpy dans son thread
- RSICalculator publie
RSICalculatedavec les résultats sérialisés - Deux handlers sont notifiés :
- CryptoAnalyzer pour analyser la corrélation
- DatabaseManager pour sauvegarder les RSI (via
_db_save_rsi)
Phase 4 : Analyse et affichage
- CryptoAnalyzer calcule les corrélations et publie
CorrelationAnalyzed - DatabaseManager sauvegarde les corrélations dans SQLite
- Quand tous les timeframes sont terminés, CryptoAnalyzer publie
FinalResultsReady - DisplayAgent affiche les résultats et publie
DisplayCompleted - CryptoAnalyzer reçoit le signal et arrête gracieusement tous les services
L'architecture multi-threadée de cette application offre :
- Performances : Traitement parallèle des tâches I/O et CPU-bound
- Résilience : Isolation des erreurs par thread
- Modularité : Ajout/suppression de workers sans impact sur les autres
- Clarté : Communication explicite via événements typés
Le pattern QueueWorkerThread + ServiceBus est un excellent choix pour des applications événementielles nécessitant du traitement asynchrone distribué sur plusieurs workers.