-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_usage.py
More file actions
314 lines (248 loc) · 9.68 KB
/
simple_usage.py
File metadata and controls
314 lines (248 loc) · 9.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
"""
Exemple simple d'utilisation de la librairie python-pubsub-devtools-consumers.
Ce script démontre l'API simplifiée où toute la logique Flask/HTTP/JSON
est encapsulée dans la librairie.
"""
import time
from python_pubsub_devtools_consumers import (
DevToolsPlayerProxy,
DevToolsRecorderProxy,
find_free_port
)
def example_recorder():
"""Exemple d'utilisation du recorder."""
print("=== RECORDER EXAMPLE ===\n")
# Configuration simple
recorder = DevToolsRecorderProxy(
devtools_url="http://localhost:5556"
)
# Démarrer une session d'enregistrement
print("Starting recording session...")
if recorder.start_session("example-session"):
print("✓ Recording started\n")
# Enregistrer des événements
print("Recording events...")
recorder.record_event(
"user.created",
{"id": 1, "name": "Alice"},
"example-service"
)
recorder.record_event(
"user.updated",
{"id": 1, "email": "alice@example.com"},
"example-service"
)
print("✓ Events recorded\n")
# Arrêter et sauvegarder
time.sleep(1)
print("Stopping recording...")
if recorder.stop_session():
print("✓ Recording saved\n")
else:
print("✗ Failed to start recording (DevTools not running?)\n")
def example_player():
"""Exemple d'utilisation du player avec handler simple."""
print("=== PLAYER EXAMPLE (Simplified API) ===\n")
# Handler simple - reçoit juste les données déjà parsées
def handle_event(event_name: str, event_data: dict, source: str) -> bool:
"""
Handler ultra-simple qui traite l'événement.
Pas besoin de connaître Flask, JSON, HTTP, etc.
Args:
event_name: Nom de l'événement
event_data: Données de l'événement (déjà parsées!)
source: Source de l'événement
Returns:
True si succès, False si erreur
"""
print(f"[REPLAY] Event: {event_name}")
print(f"[REPLAY] From: {source}")
print(f"[REPLAY] Data: {event_data}")
print("-" * 50)
# Votre logique métier ici
# Par exemple: sauvegarder en DB, envoyer à Kafka, etc.
return True # Succès
# Configuration simple - un seul paramètre requis!
player = DevToolsPlayerProxy(
consumer_name="example-consumer",
event_handler=handle_event # Handler simple
)
print(f"Starting player on {player.player_url}...")
if player.start():
print("✓ Player started and registered\n")
print(f"Player listening on: {player.player_url}")
print(f"Is registered: {player.is_registered}")
print("\nPlayer is now ready to receive events.")
print("Use DevTools UI to send events.")
print("\nPress Ctrl+C to stop...\n")
try:
# Garder le player actif
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nStopping player...")
player.unregister()
print("✓ Player stopped\n")
else:
print("✗ Failed to start player (DevTools not running?)\n")
def example_custom_business_logic():
"""Exemple avec logique métier personnalisée."""
print("=== CUSTOM BUSINESS LOGIC EXAMPLE ===\n")
# Logique métier plus complexe
class EventProcessor:
def __init__(self):
self.events_processed = 0
def process(self, event_name: str, event_data: dict, source: str) -> bool:
"""Processeur d'événements avec logique métier."""
self.events_processed += 1
print(f"Processing event #{self.events_processed}: {event_name}")
# Validation métier
if event_name.startswith("user."):
if "id" not in event_data:
print(f" ✗ Validation failed: missing 'id' field")
return False
# Traitement
print(f" ✓ Event processed successfully")
# Vous pourriez ici:
# - Sauvegarder en base de données
# - Envoyer à Kafka
# - Appeler d'autres services
# - Transformer les données
# - etc.
return True
processor = EventProcessor()
player = DevToolsPlayerProxy(
consumer_name="business-consumer",
event_handler=processor.process # Méthode de classe
)
print(f"Player configured with custom business logic")
print(f"Player URL: {player.player_url}\n")
def example_error_handling():
"""Exemple avec gestion d'erreurs."""
print("=== ERROR HANDLING EXAMPLE ===\n")
def handle_with_errors(event_name: str, event_data: dict, source: str) -> bool:
"""Handler qui gère les erreurs."""
try:
print(f"Processing: {event_name}")
# Simuler une validation
if event_name == "invalid.event":
print(" ✗ Invalid event type")
return False # Échec
if not isinstance(event_data, dict):
print(" ✗ Invalid data format")
return False
# Traitement normal
print(f" ✓ Success")
return True
except Exception as e:
print(f" ✗ Exception: {e}")
return False # Échec
player = DevToolsPlayerProxy(
consumer_name="error-handler-consumer",
event_handler=handle_with_errors
)
print(f"Player with error handling: {player.player_url}\n")
def example_custom_config():
"""Exemple avec configuration avancée."""
print("=== CUSTOM CONFIGURATION EXAMPLE ===\n")
# Trouver un port libre dans une plage spécifique
free_port = find_free_port(start_port=20000, end_port=21000)
print(f"Found free port: {free_port}\n")
def simple_handler(event_name, event_data, source):
print(f"Received: {event_name}")
return True
# Configuration avancée
player = DevToolsPlayerProxy(
consumer_name="custom-consumer",
event_handler=simple_handler,
devtools_url="http://localhost:5556",
player_port=free_port,
player_host="localhost",
auto_find_port=False
)
print(f"Player configured with custom settings:")
print(f" - URL: {player.player_url}")
print(f" - Endpoint: {player.player_endpoint} (auto-generated)")
print(f" - DevTools: {player.devtools_url}")
print()
def example_sequential_processing():
"""Exemple avec traitement séquentiel (évite les race conditions)."""
print("=== SEQUENTIAL PROCESSING EXAMPLE ===\n")
# Simuler un état partagé qui pourrait avoir des race conditions
class EventCounter:
def __init__(self):
self.count = 0
self.processed_events = []
def handle_event(self, event_name: str, event_data: dict, source: str) -> bool:
"""
Handler qui modifie un état partagé.
Sans sequential_processing=True, il y aurait des race conditions!
"""
import time
# Simuler un traitement qui prend du temps
time.sleep(0.1)
# Incrémenter le compteur (opération non-atomique)
self.count += 1
self.processed_events.append(event_name)
print(f"[{self.count}] Processed: {event_name} from {source}")
return True
counter = EventCounter()
# Créer le player avec traitement séquentiel
player = DevToolsPlayerProxy(
consumer_name="sequential-consumer",
event_handler=counter.handle_event,
sequential_processing=True # ✅ Garantit l'ordre et évite les race conditions
)
print(f"Player configured with sequential processing:")
print(f" - URL: {player.player_url}")
print(f" - Sequential: {player.sequential_processing}")
print(f"\nStarting player...\n")
if player.start():
print("✓ Player started\n")
print("Player is now processing events SEQUENTIALLY.")
print("Events are queued and processed one at a time.")
print("This eliminates race conditions!")
print("\nSend multiple events simultaneously from DevTools")
print("and they will be processed in order.\n")
print("Press Ctrl+C to stop...\n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nStopping player...")
player.stop() # Arrêt propre avec attente de la queue
print(f"✓ Player stopped")
print(f"\nTotal events processed: {counter.count}")
print(f"Events: {counter.processed_events}\n")
else:
print("✗ Failed to start player (DevTools not running?)\n")
if __name__ == "__main__":
print("Python PubSub DevTools Consumers - Examples (Simplified API)")
print("=" * 70)
print()
# Choisir l'exemple à exécuter
print("Choose an example:")
print("1. Recorder example")
print("2. Player example (Simple handler)")
print("3. Custom business logic")
print("4. Error handling")
print("5. Custom configuration")
print("6. Sequential processing (no race conditions)")
print()
choice = input("Enter choice (1/2/3/4/5/6): ").strip()
print("\n" + "=" * 70 + "\n")
if choice == "1":
example_recorder()
elif choice == "2":
example_player()
elif choice == "3":
example_custom_business_logic()
elif choice == "4":
example_error_handling()
elif choice == "5":
example_custom_config()
elif choice == "6":
example_sequential_processing()
else:
print("Invalid choice!")
print("=" * 70)