Skip to content

Commit 8332d02

Browse files
author
bedogni@unimore.it
committed
Discovery
1 parent a56472e commit 8332d02

14 files changed

Lines changed: 717 additions & 94 deletions

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ source .venv/bin/activate # On macOS/Linux
136136
communication:
137137
http:
138138
host: 0.0.0.0
139-
port: 8000
139+
port: 23456
140140
endpoints:
141141
registration: /api/registration
142142
device_input: /api/device_input
@@ -168,7 +168,7 @@ client:
168168
169169
http:
170170
server_host: "0.0.0.0"
171-
server_port: 8000
171+
server_port: 23456
172172
173173
model:
174174
last_offloading_layer: 58

USER_GUIDE.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ communication:
6969

7070
http:
7171
host: "0.0.0.0"
72-
port: 8000
72+
port: 23456
7373
model: "fomo_96x96"
7474
ntp_server: "pool.ntp.org"
7575
endpoints:
@@ -117,7 +117,7 @@ variance_detection:
117117
#### HTTP Client (`server_client_light/client/http_config.yaml`)
118118
```yaml
119119
server:
120-
base_url: "http://localhost:8000"
120+
base_url: "http://localhost:23456"
121121
inference_endpoint: "/inference"
122122
timeout: 5
123123
@@ -278,7 +278,7 @@ python install_tensorflow.py
278278
**2. Server connection refused**
279279
```bash
280280
# Make sure server is running and port is correct
281-
netstat -an | grep 8000 # Check if port 8000 is open
281+
netstat -an | grep 23456 # Check if port 23456 is open
282282
```
283283

284284
**3. Model file not found**

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ dependencies = [
3333
"websockets",
3434
"tensorflow==2.15.0; sys_platform != 'darwin' or platform_machine != 'arm64'",
3535
"tensorflow-macos==2.15.0; sys_platform == 'darwin' and platform_machine == 'arm64'",
36+
"ipykernel>=7.1.0",
37+
"seaborn>=0.13.2",
3638
]
3739

3840
# Define optional dependencies like testing and CUDA support

server_client_light/client/http_client.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import yaml
1010
from pathlib import Path
1111
from delay_simulator import DelaySimulator
12+
import socket
13+
import ipaddress
14+
from concurrent.futures import ThreadPoolExecutor, as_completed
1215

1316
# Get the directory where this script is located
1417
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -22,6 +25,145 @@
2225
DEVICE_ID = config["client"]["device_id"]
2326
SERVER_HOST = config["http"]["server_host"]
2427
SERVER_PORT = config["http"]["server_port"]
28+
29+
# Discovery functions
30+
def get_local_network():
31+
"""Get the local network IP range for scanning."""
32+
try:
33+
# Get local IP address
34+
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
35+
s.connect(("8.8.8.8", 80))
36+
local_ip = s.getsockname()[0]
37+
s.close()
38+
39+
# Convert to network/24 (assuming typical home network)
40+
network = ipaddress.IPv4Network(f"{local_ip}/24", strict=False)
41+
return network, local_ip
42+
except Exception as e:
43+
print(f"Error getting local network: {e}")
44+
return None, None
45+
46+
def check_server_at_ip(ip, port, timeout=1.0):
47+
"""Check if a server is responding at the given IP:port."""
48+
try:
49+
# First try TCP connection
50+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
51+
sock.settimeout(timeout)
52+
result = sock.connect_ex((str(ip), port))
53+
sock.close()
54+
55+
if result == 0:
56+
# Port is open, verify it's our server by trying multiple methods
57+
try:
58+
# Try 1: POST to registration endpoint (proper way)
59+
test_url = f"http://{ip}:{port}{config['http']['endpoints']['registration']}"
60+
response = requests.post(test_url, json={"device_id": "discovery_test"}, timeout=2)
61+
if response.status_code in [200, 201, 400, 409]: # Any reasonable server response
62+
print(f" Found server at {ip}:{port} (registration: {response.status_code})")
63+
return str(ip)
64+
except requests.exceptions.RequestException:
65+
pass
66+
67+
try:
68+
# Try 2: GET request to root or any endpoint
69+
response = requests.get(f"http://{ip}:{port}/", timeout=2)
70+
if response.status_code < 500: # Any response except server error
71+
print(f" Found server at {ip}:{port} (root: {response.status_code})")
72+
return str(ip)
73+
except requests.exceptions.RequestException:
74+
pass
75+
76+
try:
77+
# Try 3: Check offloading_layer endpoint
78+
test_url = f"http://{ip}:{port}{config['http']['endpoints']['offloading_layer']}"
79+
response = requests.get(test_url, timeout=2)
80+
if response.status_code in [200, 400, 404, 405]:
81+
print(f" Found server at {ip}:{port} (offload endpoint: {response.status_code})")
82+
return str(ip)
83+
except requests.exceptions.RequestException:
84+
pass
85+
86+
return None
87+
except Exception as e:
88+
return None
89+
90+
def discover_server(port):
91+
"""Scan local network to discover server."""
92+
print("="*60)
93+
print("SERVER DISCOVERY MODE")
94+
print("="*60)
95+
96+
# First, try common local addresses
97+
print(f"Checking localhost and common addresses on port {port}...\n")
98+
common_hosts = ["127.0.0.1", "localhost", "0.0.0.0"]
99+
100+
for host in common_hosts:
101+
print(f" Trying {host}:{port}...")
102+
result = check_server_at_ip(host, port, timeout=2.0)
103+
if result:
104+
print(f"\nServer found at {result}:{port}")
105+
return result
106+
107+
print("\nNo server on localhost, scanning local network...\n")
108+
109+
network, local_ip = get_local_network()
110+
if not network:
111+
print("ERROR: Could not determine local network")
112+
return None
113+
114+
print(f"Local IP: {local_ip}")
115+
print(f"Scanning network: {network}")
116+
print(f"This may take 10-30 seconds...\n")
117+
118+
# Scan network with threading for speed
119+
found_servers = []
120+
total_hosts = network.num_addresses - 2 # Exclude network and broadcast
121+
checked = 0
122+
123+
with ThreadPoolExecutor(max_workers=50) as executor:
124+
futures = {executor.submit(check_server_at_ip, ip, port): ip
125+
for ip in network.hosts() if str(ip) != local_ip}
126+
127+
for future in as_completed(futures):
128+
checked += 1
129+
if checked % 25 == 0:
130+
print(f"Progress: {checked}/{total_hosts} hosts checked...")
131+
132+
result = future.result()
133+
if result:
134+
found_servers.append(result)
135+
print(f"\nSERVER FOUND: {result}:{port}")
136+
# Cancel remaining futures for faster completion
137+
for f in futures:
138+
f.cancel()
139+
break
140+
141+
print(f"\nScan complete: {checked}/{total_hosts} hosts checked")
142+
143+
if found_servers:
144+
selected = found_servers[0]
145+
print(f"\nSelected server: {selected}:{port}")
146+
return selected
147+
else:
148+
print("\nNo server found on local network")
149+
print(" Please check:")
150+
print(" 1. Server is running")
151+
print(" 2. Server is on the same network")
152+
print(f" 3. Server is listening on port {port}")
153+
return None
154+
155+
# Run discovery if SERVER_HOST is None
156+
if SERVER_HOST is None or SERVER_HOST == "None" or SERVER_HOST == "":
157+
print("\nSERVER_HOST is not configured - starting discovery...\n")
158+
discovered_host = discover_server(SERVER_PORT)
159+
160+
if discovered_host:
161+
SERVER_HOST = discovered_host
162+
print(f"\nUsing discovered server: {SERVER_HOST}:{SERVER_PORT}\n")
163+
else:
164+
print("\nWARNING: No server discovered - will run in LOCAL-ONLY mode")
165+
SERVER_HOST = "localhost" # Fallback to avoid errors
166+
25167
SERVER = f"http://{SERVER_HOST}:{SERVER_PORT}"
26168

27169
MODEL_CONFIG = config["model"]

server_client_light/client/http_config.yaml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@ client:
33
device_id: device_01
44
delay_simulation:
55
device_computation:
6-
enabled: true
7-
mean: 0.015
8-
std_dev: 0.003
9-
type: gaussian
6+
enabled: false
107
network:
118
enabled: false
129
http:
@@ -15,8 +12,8 @@ http:
1512
device_input: /api/device_input
1613
offloading_layer: /api/offloading_layer
1714
registration: /api/registration
18-
server_host: 0.0.0.0
19-
server_port: 8000
15+
server_host: None
16+
server_port: 23456
2017
local_inference_mode:
2118
enabled: false
2219
probability: 0.0

server_client_light/client/http_config.yaml.backup

Lines changed: 0 additions & 29 deletions
This file was deleted.

simulation_runner.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
"device_computation_delay": {"enabled": False},
5151
"edge_computation_delay": {"enabled": False},
5252
"network_delay": {"enabled": False},
53-
"duration_seconds": 600,
53+
"duration_seconds": 180,
5454
"num_clients": 1
5555
},
5656

@@ -65,7 +65,7 @@
6565
"std_dev": 0.002 # Low variance for stable high delay
6666
},
6767
"network_delay": {"enabled": False},
68-
"duration_seconds": 600,
68+
"duration_seconds": 180,
6969
"num_clients": 1
7070
},
7171

@@ -80,7 +80,7 @@
8080
},
8181
"edge_computation_delay": {"enabled": False},
8282
"network_delay": {"enabled": False},
83-
"duration_seconds": 600,
83+
"duration_seconds": 180,
8484
"num_clients": 1
8585
},
8686

@@ -95,7 +95,7 @@
9595
"mean": 0.050, # 50ms average
9696
"std_dev": 0.025 # High variance for variable conditions
9797
},
98-
"duration_seconds": 600,
98+
"duration_seconds": 180,
9999
"num_clients": 1
100100
},
101101

@@ -120,7 +120,7 @@
120120
"mean": 0.040, # 40ms average
121121
"std_dev": 0.020 # High variance
122122
},
123-
"duration_seconds": 600,
123+
"duration_seconds": 180,
124124
"num_clients": 1
125125
},
126126
]
@@ -209,6 +209,7 @@ def start_server(self):
209209
"""Start the edge server"""
210210
print("\nStarting server...")
211211
server_script = SERVER_DIR / "run_edge.py"
212+
print(f" Launching server script: {PYTHON_EXECUTABLE}{server_script}")
212213

213214
self.server_process = subprocess.Popen(
214215
[PYTHON_EXECUTABLE, str(server_script)],
@@ -228,7 +229,7 @@ def print_server_output():
228229
server_output_thread.start()
229230

230231
# Wait for server to be ready
231-
max_wait = 15
232+
max_wait = 30
232233
for i in range(max_wait):
233234
try:
234235
response = requests.get(f"http://{self.server_host}:{self.server_port}/docs", timeout=1)

src/server/client_config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
server:
55
http:
66
host: "0.0.0.0"
7-
port: 8000
7+
port: 23456
88
ntp_server: "0.it.pool.ntp.org"
99

1010
websocket:

src/server/edge/edge_initialization.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,19 @@ def run_inference(offloading_layer_index: int, offloading_layer_output: np.array
124124

125125
prediction_data = []
126126
if layer_index == start_layer_index: # if it's the starting layer
127+
# Check if this layer has multiple inputs (skip connection)
128+
if hasattr(layer, '_inbound_nodes') and layer._inbound_nodes:
129+
inbound_node = layer._inbound_nodes[0]
130+
# Check if this layer expects multiple inputs
131+
if isinstance(inbound_node.inbound_layers, list) and len(inbound_node.inbound_layers) > 1:
132+
# This layer has skip connections - we can't process it because we don't have
133+
# all the required inputs from previous layers computed on device
134+
from server.logger.log import logger
135+
logger.warning(f"Cannot start edge processing at layer {layer_index}: layer {layer.name} has skip connections requiring {len(inbound_node.inbound_layers)} inputs from device-side layers")
136+
logger.warning(f"Offloading point {offloading_layer_index} is before a skip connection merge. Returning device output.")
137+
# Return the input as-is since we can't process further
138+
return input_data, []
139+
127140
prediction_data.append(input_data)
128141
else:
129142
# Get the previous layers' output tensor
@@ -158,6 +171,10 @@ def run_inference(offloading_layer_index: int, offloading_layer_output: np.array
158171
if not has_all_inputs:
159172
break
160173

174+
# Check again if we should stop (before calling predict)
175+
if edge_processing_stopped:
176+
break
177+
161178
# Measure inference time for this layer
162179
t0 = time.time()
163180
prediction = model_manager.predict_single_layer(layer_index, start_layer_offset, prediction_data)

src/server/models/model_manager.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,18 @@ def predict_single_layer(self, layer_id: int, layer_offset: int, layer_input_dat
181181
output_details = cached['output_details']
182182

183183
# set input tensor
184+
# Handle both single input and multiple inputs (e.g., from skip connections)
185+
if not isinstance(layer_input_data, list):
186+
# If layer_input_data is not a list, wrap it in one
187+
layer_input_data = [layer_input_data]
188+
189+
# Verify we have the correct number of inputs
190+
if len(layer_input_data) != len(input_details):
191+
logger.error(f"Input mismatch: model expects {len(input_details)} inputs but received {len(layer_input_data)}")
192+
logger.error(f"Input shapes expected: {[detail['shape'] for detail in input_details]}")
193+
logger.error(f"Input data shapes received: {[data.shape if hasattr(data, 'shape') else type(data) for data in layer_input_data]}")
194+
raise ValueError(f"Model expects {len(input_details)} inputs but received {len(layer_input_data)}")
195+
184196
for i, input_detail in enumerate(input_details):
185197
interpreter.set_tensor(input_detail['index'], layer_input_data[i].reshape(input_detail['shape']))
186198

0 commit comments

Comments
 (0)