3535SERVER_SETTINGS_FILE = SCRIPT_DIR / "src" / "server" / "settings.yaml"
3636RESULTS_DIR = SCRIPT_DIR / "simulated_results"
3737
38+ # Use the same python interpreter that's running this script
39+ PYTHON_EXECUTABLE = sys .executable
40+
3841# Ensure results directory exists
3942RESULTS_DIR .mkdir (exist_ok = True )
4043
4144# Simulation parameters
4245SIMULATION_SCENARIOS = [
4346
44- # Scenario 8: Multiple clients (no delay )
47+ # Scenario 8: Heavy device delay test (device slow, edge fast )
4548 {
4649 "name" : "heavy_delay" ,
4750 "edge_computation_delay" : {"enabled" : False },
4851 "device_computation_delay" : {
49- "enabled" : False ,
50- "type" : "gaussian" ,
51- "mean" : 0.01 ,
52- "std_dev" : 0.0005
52+ "enabled" : True ,
53+ "type" : "static" ,
54+ "value" : 0.01 # 10ms per layer = 590ms total for 59 layers
5355 },
54- "network_delay" : {"enabled" : True ,
55- "type " : "gaussian" ,
56- "mean " : 1 ,
57- "std_dev " : 0.02
56+ "network_delay" : {
57+ "enabled " : True ,
58+ "type " : "static" ,
59+ "value " : 0.02 # 20ms network delay
5860 },
5961 "duration_seconds" : 300 ,
6062 "num_clients" : 1
@@ -282,17 +284,21 @@ def start_server(self):
282284 server_script = SERVER_DIR / "run_edge.py"
283285
284286 self .server_process = subprocess .Popen (
285- [sys . executable , str (server_script )],
287+ [PYTHON_EXECUTABLE , str (server_script )],
286288 stdout = subprocess .PIPE ,
287289 stderr = subprocess .STDOUT ,
288290 text = True ,
289291 bufsize = 1 ,
290292 cwd = str (SCRIPT_DIR )
291293 )
292294
293- # Start thread to monitor server output
294- self .server_monitor_thread = threading .Thread (target = self ._monitor_server_output , daemon = True )
295- self .server_monitor_thread .start ()
295+ # Start thread to print server output
296+ def print_server_output ():
297+ for line in self .server_process .stdout :
298+ print (f"[SERVER] { line .rstrip ()} " )
299+
300+ server_output_thread = threading .Thread (target = print_server_output , daemon = True )
301+ server_output_thread .start ()
296302
297303 # Wait for server to be ready
298304 max_wait = 15
@@ -319,96 +325,48 @@ def stop_server(self):
319325 self .server_process .kill ()
320326 self .server_process = None
321327
322- def _monitor_server_output (self ):
323- """Monitor server output for inference events"""
324- if not self .server_process :
325- return
326-
327- for line in self .server_process .stdout :
328- # Look for inference completion patterns
329- # Adjust these patterns based on actual server output
330- if "inference" in line .lower () or "layer" in line .lower ():
331- # Try to extract inference data from logs
332- self ._try_capture_inference_event (line )
333-
334- def _try_capture_inference_event (self , log_line ):
335- """Try to capture inference event from log line"""
336- # This is called when we detect potential inference activity
337- # We'll also check files for more reliable data
338- pass
339-
340- def _check_and_record_new_inferences (self ):
341- """Check server data files and record any new inferences"""
328+ def setup_simulation_csv (self , csv_file_path ):
329+ """Setup CSV file in the request handler for direct recording"""
342330 try :
343- # Read the latest inference result directly from the server
344- latest_inference_file = SCRIPT_DIR / "src" / "server" / "latest_inference_result.json"
331+ # Import here to avoid circular imports
332+ import requests
345333
346- if latest_inference_file .exists ():
347- with open (latest_inference_file , 'r' ) as f :
348- inference_data = json .load (f )
349-
350- # Check if this is a new inference (different timestamp)
351- current_timestamp = inference_data .get ('timestamp' )
352- if current_timestamp != self .last_inference_timestamp :
353- self .last_inference_timestamp = current_timestamp
354-
355- # Get the actual measured times from this inference
356- device_layer_times = inference_data .get ('device_layers_inference_time' , [])
357- edge_layer_times = inference_data .get ('edge_layers_inference_time' , [])
358- offloading_layer = inference_data .get ('offloading_layer_index' , - 1 )
359-
360- # Build device_times dict from the actual measurements
361- device_times = {}
362- for i , time_val in enumerate (device_layer_times ):
363- device_times [f'layer_{ i } ' ] = time_val
364-
365- # Build edge_times dict from the actual measurements
366- edge_times = {}
367- if edge_layer_times :
368- # Edge layers start from offloading_layer + 1
369- start_edge_layer = offloading_layer + 1
370- for i , time_val in enumerate (edge_layer_times ):
371- edge_times [f'layer_{ start_edge_layer + i } ' ] = time_val
372-
373- # Record this inference
374- self ._record_inference (device_times , edge_times )
334+ # Call the server endpoint to set up CSV recording
335+ response = requests .post (
336+ f"http://{ self .server_host } :{ self .server_port } /api/set_simulation_csv" ,
337+ json = {"csv_file_path" : str (csv_file_path )},
338+ timeout = 5
339+ )
340+ if response .status_code == 200 :
341+ print (f"✓ Server CSV recording enabled: { csv_file_path } " )
342+ return True
343+ else :
344+ print (f"⚠ Failed to enable server CSV recording: { response .status_code } " )
345+ return False
375346 except Exception as e :
376- pass # Silently ignore errors during monitoring
347+ print (f"⚠ Could not setup server CSV recording: { e } " )
348+ return False
377349
378- def _record_inference (self , device_times , edge_times ):
379- """Record a single inference to CSV"""
380- if not self .csv_writer :
381- return
382-
383- self .inference_count += 1
384-
385- # Calculate statistics
386- device_values = [v for k , v in device_times .items () if k .startswith ('layer_' ) and isinstance (v , (int , float ))]
387- edge_values = [v for k , v in edge_times .items () if k .startswith ('layer_' ) and isinstance (v , (int , float ))]
388-
389- row = {
390- 'inference_id' : self .inference_count ,
391- 'timestamp' : datetime .now ().isoformat (),
392- 'avg_device_time' : sum (device_values ) / len (device_values ) if device_values else 0 ,
393- 'min_device_time' : min (device_values ) if device_values else 0 ,
394- 'max_device_time' : max (device_values ) if device_values else 0 ,
395- 'avg_edge_time' : sum (edge_values ) / len (edge_values ) if edge_values else 0 ,
396- 'min_edge_time' : min (edge_values ) if edge_values else 0 ,
397- 'max_edge_time' : max (edge_values ) if edge_values else 0 ,
398- 'num_device_layers' : len (device_values ),
399- 'num_edge_layers' : len (edge_values ),
400- }
401-
402- self .csv_writer .writerow (row )
403- self .csv_file .flush () # Ensure data is written immediately
350+ def close_simulation_csv (self ):
351+ """Close CSV file in the request handler"""
352+ try :
353+ import requests
354+ response = requests .post (
355+ f"http://{ self .server_host } :{ self .server_port } /api/close_simulation_csv" ,
356+ timeout = 5
357+ )
358+ return response .status_code == 200
359+ except Exception as e :
360+ print (f"⚠ Could not close server CSV: { e } " )
361+ return False
404362
405363 def start_client (self , client_index = 0 ):
406364 """Start a client and monitor its output"""
407365 print (f" Starting client { client_index } ..." )
408366 client_script = CLIENT_DIR / "http_client.py"
409367
410368 process = subprocess .Popen (
411- [sys . executable , str (client_script )],
369+ [PYTHON_EXECUTABLE , str (client_script )],
412370 stdout = subprocess .PIPE ,
413371 stderr = subprocess .STDOUT ,
414372 text = True ,
@@ -428,11 +386,11 @@ def start_client(self, client_index=0):
428386 return process
429387
430388 def _monitor_client_output (self , process , client_index ):
431- """Monitor client output for inference completion """
389+ """Monitor client output ( for logging purposes) """
432390 for line in process .stdout :
433- if "Inference complete" in line or "✓" in line :
434- # Inference completed, check and record
435- self . _check_and_record_new_inferences ()
391+ # Just consume the output to prevent buffer overflow
392+ # Actual recording happens on the server side
393+ pass
436394
437395 def stop_clients (self ):
438396 """Stop all client processes"""
@@ -526,7 +484,7 @@ def run_scenario(self, scenario):
526484
527485 try :
528486 # Create folder for this scenario
529- self .create_scenario_folder (scenario )
487+ csv_file_path = self .create_scenario_folder (scenario )
530488
531489 # Update configurations
532490 self .update_server_config (scenario )
@@ -535,18 +493,18 @@ def run_scenario(self, scenario):
535493 self .start_server ()
536494 time .sleep (3 ) # Extra time for server initialization
537495
496+ # Setup CSV recording on the server side
497+ self .setup_simulation_csv (csv_file_path )
498+
538499 # Start clients
539500 for i in range (scenario ['num_clients' ]):
540501 self .update_client_config (scenario , i )
541502 time .sleep (0.5 ) # Small delay between client starts
542503 self .start_client (i )
543504
544- # Monitor and record inferences during simulation
505+ # Wait for simulation duration
545506 print (f"\n ⏳ Running for { scenario ['duration_seconds' ]} seconds..." )
546- start_time = time .time ()
547- while time .time () - start_time < scenario ['duration_seconds' ]:
548- time .sleep (2 ) # Check every 2 seconds
549- self ._check_and_record_new_inferences ()
507+ time .sleep (scenario ['duration_seconds' ])
550508
551509 # Stop clients
552510 self .stop_clients ()
0 commit comments