Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions repeater/data_acquisition/mqtt_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ def __init__(self, mqtt_config: dict, node_name: str = "unknown", node_id: str =
self.available = MQTT_AVAILABLE
self._init_client()

def _get_status_topic(self) -> str:
base_topic = self.mqtt_config.get("base_topic", "meshcore/repeater")
return f"{base_topic}/{self.node_name}/status"

def _build_offline_message(self) -> str:
return json.dumps({
"status": "offline",
"origin": self.node_name,
"origin_id": self.node_id,
})

def _init_client(self):
if not self.available or not self.mqtt_config.get("enabled", False):
logger.info("MQTT disabled or not available")
Expand All @@ -34,6 +45,15 @@ def _init_client(self):
password = self.mqtt_config.get("password")
if username:
self.client.username_pw_set(username, password)

# Set Last Will and Testament so status goes "offline" on disconnect
status_topic = self._get_status_topic()
self.client.will_set(
status_topic,
payload=self._build_offline_message(),
qos=1,
retain=True,
)

broker = self.mqtt_config.get("broker", "localhost")
port = self.mqtt_config.get("port", 1883)
Expand Down Expand Up @@ -85,6 +105,41 @@ def publish(self, record: dict, record_type: str):
except Exception as e:
logger.error(f"Failed to publish to MQTT: {e}")

def publish_status(self, status: str = "online", stats: dict = None):
"""Publish a status message with optional radio/device stats.

The message format matches meshcoretomqtt so downstream tools
like CoreScope can parse it:

{
"status": "online",
"origin": "<node_name>",
"origin_id": "<node_id>",
"stats": { ... }
}
"""
if not self.client:
return

try:
topic = self._get_status_topic()

payload = {
"status": status,
"origin": self.node_name,
"origin_id": self.node_id,
}

if stats:
payload["stats"] = stats

message = json.dumps(payload, default=str)
self.client.publish(topic, message, qos=1, retain=True)
logger.debug(f"Published status '{status}' to {topic}")

except Exception as e:
logger.error(f"Failed to publish status to MQTT: {e}")

def close(self):
if self.client:
self.client.loop_stop()
Expand Down
46 changes: 46 additions & 0 deletions repeater/data_acquisition/storage_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,52 @@ def record_noise_floor(self, noise_floor_dbm: float):
self.sqlite_handler.store_noise_floor(noise_record)
self.mqtt_handler.publish(noise_record, "noise_floor")

def get_device_stats(self) -> dict:
"""Gather radio and device stats for MQTT status messages.

Returns a dict compatible with the meshcoretomqtt stats format::

{
"battery_mv": ...,
"uptime_secs": ...,
"noise_floor": ...,
"tx_air_secs": ...,
"rx_air_secs": ...,
"recv_errors": ...
}

Fields that cannot be determined are omitted.
"""
stats: Dict[str, Any] = {}

if self.repeater_handler:
stats["uptime_secs"] = int(time.time() - self.repeater_handler.start_time)

# Noise floor from radio
noise_floor = self.repeater_handler.get_noise_floor()
if noise_floor is not None:
stats["noise_floor"] = noise_floor

# Airtime stats (convert ms -> secs)
airtime = self.repeater_handler.airtime_mgr.get_stats()
total_tx_ms = airtime.get("total_airtime_ms", 0)
stats["tx_air_secs"] = int(total_tx_ms / 1000)

# rx_air_secs and recv_errors are not tracked by pyMC_Repeater
# today, but we include the keys so consumers can detect their
# absence vs. zero.
stats["rx_air_secs"] = 0
stats["recv_errors"] = 0

# battery_mv is only available on embedded boards; omit when unknown.

return stats

def publish_status(self, status: str = "online"):
"""Publish an MQTT status message including radio stats."""
stats = self.get_device_stats()
self.mqtt_handler.publish_status(status=status, stats=stats)

def get_packet_stats(self, hours: int = 24) -> dict:
return self.sqlite_handler.get_packet_stats(hours)

Expand Down
22 changes: 22 additions & 0 deletions repeater/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def __init__(self, config: dict, dispatcher, local_hash: int, send_advert_func=N
self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds
self._background_task = None

# Status publish interval (seconds) — default 5 minutes
self.status_publish_interval = config.get("mqtt", {}).get(
"status_interval", 300
)
self.last_status_publish = 0 # force immediate publish on start

# Cache transport keys for efficient lookup
self._transport_keys_cache = None
self._transport_keys_cache_time = 0
Expand Down Expand Up @@ -734,6 +740,12 @@ async def _background_timer_loop(self):
await self._send_periodic_advert_async()
self.last_advert_time = current_time

# Publish MQTT status with radio stats periodically
if (self.status_publish_interval > 0 and
current_time - self.last_status_publish >= self.status_publish_interval):
await self._publish_status_async()
self.last_status_publish = current_time

# Sleep for 5 seconds before next check
await asyncio.sleep(5.0)

Expand Down Expand Up @@ -776,6 +788,16 @@ async def _send_periodic_advert_async(self):
except Exception as e:
logger.error(f"Error sending periodic advert: {e}")

async def _publish_status_async(self):
"""Publish an MQTT status message with radio stats."""
if not self.storage:
return
try:
self.storage.publish_status("online")
logger.debug("Published MQTT status with radio stats")
except Exception as e:
logger.error(f"Error publishing MQTT status: {e}")

def cleanup(self):
if self._background_task and not self._background_task.done():
self._background_task.cancel()
Expand Down