From 1eb13c9953cba4c2113ba9174a5eab2784311765 Mon Sep 17 00:00:00 2001 From: Sergio Conde Date: Wed, 28 Jan 2026 15:43:41 +0100 Subject: [PATCH 1/4] feat: Add ESP32 WiFi Unified OTA update support --- meshtastic/__main__.py | 47 +++++++++++++++- meshtastic/node.py | 18 +++++- meshtastic/ota.py | 122 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+), 3 deletions(-) create mode 100644 meshtastic/ota.py diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 16854002..533ec67a 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -37,6 +37,7 @@ except ImportError as e: have_test = False +import meshtastic.ota import meshtastic.util import meshtastic.serial_interface import meshtastic.tcp_interface @@ -60,7 +61,7 @@ have_powermon = False powermon_exception = e meter = None -from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2, mesh_pb2 +from meshtastic.protobuf import admin_pb2, channel_pb2, config_pb2, portnums_pb2, mesh_pb2 from meshtastic.version import get_active_version logger = logging.getLogger(__name__) @@ -452,6 +453,41 @@ def onConnected(interface): waitForAckNak = True interface.getNode(args.dest, False, **getNode_kwargs).rebootOTA() + if args.ota_update: + closeNow = True + waitForAckNak = True + + if not isinstance(interface, meshtastic.tcp_interface.TCPInterface): + meshtastic.util.our_exit( + "Error: OTA update currently requires a TCP connection to the node (use --host)." + ) + + ota = meshtastic.ota.ESP32WiFiOTA(args.ota_update, interface.hostname) + + print(f"Triggering OTA update on {interface.hostname}...") + interface.getNode(args.dest, False, **getNode_kwargs).startOTA( + mode=admin_pb2.OTA_WIFI, + hash=ota.hash_bytes() + ) + + print("Waiting for device to reboot into OTA mode...") + time.sleep(5) + + retries = 5 + while retries > 0: + try: + ota.update() + break + + except Exception as e: + retries -= 1 + if retries == 0: + meshtastic.util.our_exit(f"\nOTA update failed: {e}") + + time.sleep(2) + + print("\nOTA update completed successfully!") + if args.enter_dfu: closeNow = True waitForAckNak = True @@ -1904,10 +1940,17 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars group.add_argument( "--reboot-ota", - help="Tell the destination node to reboot into factory firmware (ESP32)", + help="Tell the destination node to reboot into factory firmware (ESP32, firmware version <2.7.18)", action="store_true", ) + group.add_argument( + "--ota-update", + help="Perform a OTA update on the destination node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). Specify the path to the firmware file.", + metavar="FIRMWARE_FILE", + action="store", + ) + group.add_argument( "--enter-dfu", help="Tell the destination node to enter DFU mode (NRF52)", diff --git a/meshtastic/node.py b/meshtastic/node.py index afb5611a..4df5acb4 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -654,7 +654,7 @@ def commitSettingsTransaction(self): return self._sendAdmin(p, onResponse=onResponse) def rebootOTA(self, secs: int = 10): - """Tell the node to reboot into factory firmware.""" + """Tell the node to reboot into factory firmware (firmware < 2.7.18).""" self.ensureSessionKey() p = admin_pb2.AdminMessage() p.reboot_ota_seconds = secs @@ -667,6 +667,22 @@ def rebootOTA(self, secs: int = 10): onResponse = self.onAckNak return self._sendAdmin(p, onResponse=onResponse) + def startOTA( + self, + mode: admin_pb2.OTAMode.ValueType, + hash: bytes, + ): + """Tell the node to start OTA mode (firmware >= 2.7.18).""" + if self != self.iface.localNode: + raise Exception("startOTA only possible in local node") + + self.ensureSessionKey() + p = admin_pb2.AdminMessage() + p.ota_request.reboot_ota_mode=mode + p.ota_request.ota_hash=hash + + return self._sendAdmin(p) + def enterDFUMode(self): """Tell the node to enter DFU mode (NRF52).""" self.ensureSessionKey() diff --git a/meshtastic/ota.py b/meshtastic/ota.py new file mode 100644 index 00000000..b2545a28 --- /dev/null +++ b/meshtastic/ota.py @@ -0,0 +1,122 @@ +import os +import hashlib +import socket +import logging +from typing import Optional, Callable + + +logger = logging.getLogger(__name__) + + +def _file_sha256(filename: str): + """Calculate SHA256 hash of a file.""" + sha256_hash = hashlib.sha256() + + with open(filename, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + + return sha256_hash + +class ESP32WiFiOTA: + """ESP32 WiFi Unified OTA updates.""" + + def __init__(self, filename: str, hostname: str, port: int = 3232): + self._filename = filename + self._hostname = hostname + self._port = port + self._socket: Optional[socket.socket] = None + + if not os.path.exists(self._filename): + raise Exception(f"File {self._filename} does not exist") + + self._file_hash = _file_sha256(self._filename) + + def _read_line(self) -> str: + """Read a line from the socket.""" + if not self._socket: + raise Exception("Socket not connected") + + line = b"" + while not line.endswith(b"\n"): + char = self._socket.recv(1) + + if not char: + raise Exception("Connection closed while waiting for response") + + line += char + + return line.decode("utf-8").strip() + + def hash_bytes(self) -> bytes: + """Return the hash as bytes.""" + return self._file_hash.digest() + + def hash_hex(self) -> str: + """Return the hash as a hex string.""" + return self._file_hash.hexdigest() + + def update(self, progress_callback: Optional[Callable[[int, int], None]] = None): + """Perform the OTA update.""" + with open(self._filename, "rb") as f: + data = f.read() + size = len(data) + + logger.info(f"Starting OTA update with {self._filename} ({size} bytes, hash {self.hash_hex()})") + + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(15) + try: + self._socket.connect((self._hostname, self._port)) + logger.debug(f"Connected to {self._hostname}:{self._port}") + + # Send start command + self._socket.sendall(f"OTA {size} {self.hash_hex()}\n".encode("utf-8")) + + # Wait for OK from the device + while True: + response = self._read_line() + if response == "OK": + break + elif response == "ERASING": + logger.info("Device is erasing flash...") + elif response.startswith("ERR "): + raise Exception(f"Device reported error: {response}") + else: + logger.warning(f"Unexpected response: {response}") + + # Stream firmware + sent_bytes = 0 + chunk_size = 1024 + while sent_bytes < size: + chunk = data[sent_bytes : sent_bytes + chunk_size] + self._socket.sendall(chunk) + sent_bytes += len(chunk) + + if progress_callback: + progress_callback(sent_bytes, size) + else: + print(f"[{sent_bytes / size * 100:5.1f}%] Sent {sent_bytes} of {size} bytes...", end="\r") + + if not progress_callback: + print() + + # Wait for OK from device + logger.info("Firmware sent, waiting for verification...") + while True: + response = self._read_line() + + if response == "OK": + logger.info("OTA update completed successfully!") + break + elif response == "ACK": + continue + elif response.startswith("ERR "): + raise Exception(f"OTA update failed: {response}") + else: + logger.warning(f"Unexpected final response: {response}") + + finally: + if self._socket: + self._socket.close() + self._socket = None \ No newline at end of file From bf580c36aeb013921eec1452ad73d5de25f360ed Mon Sep 17 00:00:00 2001 From: Ben Meadors Date: Sat, 31 Jan 2026 18:15:25 -0600 Subject: [PATCH 2/4] Update meshtastic/__main__.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- meshtastic/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 533ec67a..82f2a566 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -1946,7 +1946,7 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars group.add_argument( "--ota-update", - help="Perform a OTA update on the destination node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). Specify the path to the firmware file.", + help="Perform an OTA update on the destination node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). Specify the path to the firmware file.", metavar="FIRMWARE_FILE", action="store", ) From 4d8430d36094ed30524c14a3746f4af85aa27bf8 Mon Sep 17 00:00:00 2001 From: Sergio Conde Date: Sun, 1 Feb 2026 11:39:09 +0100 Subject: [PATCH 3/4] fix: throw propper exceptions and cleanup code --- meshtastic/__main__.py | 9 +++++---- meshtastic/node.py | 10 +++++----- meshtastic/ota.py | 36 +++++++++++++++++++++--------------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 82f2a566..8d4e38bb 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -466,8 +466,8 @@ def onConnected(interface): print(f"Triggering OTA update on {interface.hostname}...") interface.getNode(args.dest, False, **getNode_kwargs).startOTA( - mode=admin_pb2.OTA_WIFI, - hash=ota.hash_bytes() + ota_mode=admin_pb2.OTAMode.OTA_WIFI, + ota_file_hash=ota.hash_bytes() ) print("Waiting for device to reboot into OTA mode...") @@ -487,7 +487,7 @@ def onConnected(interface): time.sleep(2) print("\nOTA update completed successfully!") - + if args.enter_dfu: closeNow = True waitForAckNak = True @@ -1946,7 +1946,8 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars group.add_argument( "--ota-update", - help="Perform an OTA update on the destination node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). Specify the path to the firmware file.", + help="Perform an OTA update on the local node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). " + "Specify the path to the firmware file.", metavar="FIRMWARE_FILE", action="store", ) diff --git a/meshtastic/node.py b/meshtastic/node.py index 4df5acb4..a6356430 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -669,17 +669,17 @@ def rebootOTA(self, secs: int = 10): def startOTA( self, - mode: admin_pb2.OTAMode.ValueType, - hash: bytes, + ota_mode: admin_pb2.OTAMode.ValueType, + ota_file_hash: bytes, ): """Tell the node to start OTA mode (firmware >= 2.7.18).""" if self != self.iface.localNode: - raise Exception("startOTA only possible in local node") + raise ValueError("startOTA only possible in local node") self.ensureSessionKey() p = admin_pb2.AdminMessage() - p.ota_request.reboot_ota_mode=mode - p.ota_request.ota_hash=hash + p.ota_request.reboot_ota_mode = ota_mode + p.ota_request.ota_hash = ota_file_hash return self._sendAdmin(p) diff --git a/meshtastic/ota.py b/meshtastic/ota.py index b2545a28..af9feae4 100644 --- a/meshtastic/ota.py +++ b/meshtastic/ota.py @@ -1,4 +1,6 @@ -import os +"""Meshtastic ESP32 Unified OTA +""" +import os import hashlib import socket import logging @@ -11,13 +13,18 @@ def _file_sha256(filename: str): """Calculate SHA256 hash of a file.""" sha256_hash = hashlib.sha256() - + with open(filename, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash + +class OTAError(Exception): + """Exception for OTA errors.""" + + class ESP32WiFiOTA: """ESP32 WiFi Unified OTA updates.""" @@ -28,21 +35,21 @@ def __init__(self, filename: str, hostname: str, port: int = 3232): self._socket: Optional[socket.socket] = None if not os.path.exists(self._filename): - raise Exception(f"File {self._filename} does not exist") + raise FileNotFoundError(f"File {self._filename} does not exist") self._file_hash = _file_sha256(self._filename) def _read_line(self) -> str: """Read a line from the socket.""" if not self._socket: - raise Exception("Socket not connected") + raise ConnectionError("Socket not connected") line = b"" while not line.endswith(b"\n"): char = self._socket.recv(1) - + if not char: - raise Exception("Connection closed while waiting for response") + raise ConnectionError("Connection closed while waiting for response") line += char @@ -78,10 +85,11 @@ def update(self, progress_callback: Optional[Callable[[int, int], None]] = None) response = self._read_line() if response == "OK": break - elif response == "ERASING": + + if response == "ERASING": logger.info("Device is erasing flash...") elif response.startswith("ERR "): - raise Exception(f"Device reported error: {response}") + raise OTAError(f"Device reported error: {response}") else: logger.warning(f"Unexpected response: {response}") @@ -105,18 +113,16 @@ def update(self, progress_callback: Optional[Callable[[int, int], None]] = None) logger.info("Firmware sent, waiting for verification...") while True: response = self._read_line() - if response == "OK": logger.info("OTA update completed successfully!") break - elif response == "ACK": - continue - elif response.startswith("ERR "): - raise Exception(f"OTA update failed: {response}") - else: + + if response.startswith("ERR "): + raise OTAError(f"OTA update failed: {response}") + elif response != "ACK": logger.warning(f"Unexpected final response: {response}") finally: if self._socket: self._socket.close() - self._socket = None \ No newline at end of file + self._socket = None From 4de19f50d98abc6b96c50bf008fbbc6baf30bb33 Mon Sep 17 00:00:00 2001 From: Sergio Conde Date: Sun, 1 Feb 2026 21:13:58 +0100 Subject: [PATCH 4/4] fix: add tests --- meshtastic/tests/test_main.py | 66 +++++ meshtastic/tests/test_node.py | 35 +++ meshtastic/tests/test_ota.py | 458 ++++++++++++++++++++++++++++++++++ 3 files changed, 559 insertions(+) create mode 100644 meshtastic/tests/test_ota.py diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index 251de98f..048614e5 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -6,6 +6,7 @@ import platform import re import sys +import tempfile from unittest.mock import mock_open, MagicMock, patch import pytest @@ -2900,3 +2901,68 @@ def test_main_set_ham_empty_string(capsys): out, _ = capsys.readouterr() assert "ERROR: Ham radio callsign cannot be empty or contain only whitespace characters" in out assert excinfo.value.code == 1 + + +# OTA-related tests +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_ota_update_file_not_found(capsys): + """Test --ota-update with non-existent file""" + sys.argv = [ + "", + "--ota-update", + "/nonexistent/firmware.bin", + "--host", + "192.168.1.100", + ] + mt_config.args = sys.argv + + with pytest.raises(SystemExit) as pytest_wrapped_e: + main() + + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +@patch("meshtastic.ota.ESP32WiFiOTA") +@patch("meshtastic.__main__.meshtastic.util.our_exit") +def test_main_ota_update_retries(mock_our_exit, mock_ota_class, capsys): + """Test --ota-update retries on failure""" + # Create a temporary firmware file + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"fake firmware data") + firmware_file = f.name + + try: + sys.argv = ["", "--ota-update", firmware_file, "--host", "192.168.1.100"] + mt_config.args = sys.argv + + # Mock the OTA class to fail all 5 retries + mock_ota = MagicMock() + mock_ota_class.return_value = mock_ota + mock_ota.hash_bytes.return_value = b"\x00" * 32 + mock_ota.hash_hex.return_value = "a" * 64 + mock_ota.update.side_effect = Exception("Connection failed") + + # Mock isinstance to return True + with patch("meshtastic.__main__.isinstance", return_value=True): + with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp: + mock_iface = MagicMock() + mock_iface.hostname = "192.168.1.100" + mock_iface.localNode = MagicMock(autospec=Node) + mock_tcp.return_value = mock_iface + + with patch("time.sleep"): + main() + + # Should have exhausted all retries and called our_exit + # Note: our_exit might be called twice - once for TCP check, once for failure + assert mock_our_exit.call_count >= 1 + # Check the last call was for OTA failure + last_call_args = mock_our_exit.call_args[0][0] + assert "OTA update failed" in last_call_args + + finally: + os.unlink(firmware_file) diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index c5cb6b3f..3083fd61 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -1550,6 +1550,41 @@ def test_setOwner_valid_names(caplog): assert re.search(r'p.set_owner.short_name:VN:', caplog.text, re.MULTILINE) +@pytest.mark.unit +def test_start_ota_local_node(): + """Test startOTA on local node""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=True) + # Set up as local node + iface.localNode = anode + + amesg = admin_pb2.AdminMessage() + with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg): + with patch.object(anode, "_sendAdmin") as mock_send_admin: + test_hash = b"\x01\x02\x03" * 8 # 24 bytes hash + anode.startOTA(ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash) + + # Verify the OTA request was set correctly + assert amesg.ota_request.reboot_ota_mode == admin_pb2.OTAMode.OTA_WIFI + assert amesg.ota_request.ota_hash == test_hash + mock_send_admin.assert_called_once_with(amesg) + + +@pytest.mark.unit +def test_start_ota_remote_node_raises_error(): + """Test startOTA on remote node raises ValueError""" + iface = MagicMock(autospec=MeshInterface) + local_node = Node(iface, 1234567890, noProto=True) + remote_node = Node(iface, 9876543210, noProto=True) + iface.localNode = local_node + + test_hash = b"\x01\x02\x03" * 8 + with pytest.raises(ValueError, match="startOTA only possible in local node"): + remote_node.startOTA( + ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash + ) + + # TODO # @pytest.mark.unitslow # def test_waitForConfig(): diff --git a/meshtastic/tests/test_ota.py b/meshtastic/tests/test_ota.py new file mode 100644 index 00000000..21863880 --- /dev/null +++ b/meshtastic/tests/test_ota.py @@ -0,0 +1,458 @@ +"""Meshtastic unit tests for ota.py""" + +import hashlib +import os +import socket +import tempfile +from unittest.mock import MagicMock, mock_open, patch, call + +import pytest + +from meshtastic.ota import ( + _file_sha256, + ESP32WiFiOTA, + OTAError, +) + + +@pytest.mark.unit +def test_file_sha256(): + """Test _file_sha256 calculates correct hash""" + # Create a temporary file with known content + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + test_data = b"Hello, World!" + f.write(test_data) + temp_file = f.name + + try: + result = _file_sha256(temp_file) + expected_hash = hashlib.sha256(test_data).hexdigest() + assert result.hexdigest() == expected_hash + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_file_sha256_large_file(): + """Test _file_sha256 handles files larger than chunk size""" + # Create a temporary file with more than 4096 bytes + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + test_data = b"A" * 8192 # More than 4096 bytes + f.write(test_data) + temp_file = f.name + + try: + result = _file_sha256(temp_file) + expected_hash = hashlib.sha256(test_data).hexdigest() + assert result.hexdigest() == expected_hash + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_esp32_wifi_ota_init_file_not_found(): + """Test ESP32WiFiOTA raises FileNotFoundError for non-existent file""" + with pytest.raises(FileNotFoundError, match="does not exist"): + ESP32WiFiOTA("/nonexistent/firmware.bin", "192.168.1.1") + + +@pytest.mark.unit +def test_esp32_wifi_ota_init_success(): + """Test ESP32WiFiOTA initializes correctly with valid file""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"fake firmware data") + temp_file = f.name + + try: + ota = ESP32WiFiOTA(temp_file, "192.168.1.1", 3232) + assert ota._filename == temp_file + assert ota._hostname == "192.168.1.1" + assert ota._port == 3232 + assert ota._socket is None + # Verify hash is calculated + assert ota._file_hash is not None + assert len(ota.hash_hex()) == 64 # SHA256 hex is 64 chars + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_esp32_wifi_ota_init_default_port(): + """Test ESP32WiFiOTA uses default port 3232""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"fake firmware data") + temp_file = f.name + + try: + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + assert ota._port == 3232 + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_esp32_wifi_ota_hash_bytes(): + """Test hash_bytes returns correct bytes""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + test_data = b"firmware data" + f.write(test_data) + temp_file = f.name + + try: + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + hash_bytes = ota.hash_bytes() + expected_bytes = hashlib.sha256(test_data).digest() + assert hash_bytes == expected_bytes + assert len(hash_bytes) == 32 # SHA256 is 32 bytes + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_esp32_wifi_ota_hash_hex(): + """Test hash_hex returns correct hex string""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + test_data = b"firmware data" + f.write(test_data) + temp_file = f.name + + try: + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + hash_hex = ota.hash_hex() + expected_hex = hashlib.sha256(test_data).hexdigest() + assert hash_hex == expected_hex + assert len(hash_hex) == 64 # SHA256 hex is 64 chars + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_esp32_wifi_ota_read_line_not_connected(): + """Test _read_line raises ConnectionError when not connected""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + with pytest.raises(ConnectionError, match="Socket not connected"): + ota._read_line() + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_esp32_wifi_ota_read_line_connection_closed(): + """Test _read_line raises ConnectionError when connection closed""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + mock_socket = MagicMock() + # Simulate connection closed + mock_socket.recv.return_value = b"" + ota._socket = mock_socket + + with pytest.raises(ConnectionError, match="Connection closed"): + ota._read_line() + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +def test_esp32_wifi_ota_read_line_success(): + """Test _read_line successfully reads a line""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + mock_socket = MagicMock() + # Simulate receiving "OK\n" + mock_socket.recv.side_effect = [b"O", b"K", b"\n"] + ota._socket = mock_socket + + result = ota._read_line() + assert result == "OK" + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_success(mock_socket_class): + """Test update() with successful OTA""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + test_data = b"A" * 1024 # 1KB of data + f.write(test_data) + temp_file = f.name + + try: + # Setup mock socket + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + # Mock _read_line to return appropriate responses + # First call: ERASING, Second call: OK (ready), Third call: OK (complete) + with patch.object(ota, "_read_line") as mock_read_line: + mock_read_line.side_effect = [ + "ERASING", # Device is erasing flash + "OK", # Device ready for firmware + "OK", # Device finished successfully + ] + + ota.update() + + # Verify socket was created and connected + mock_socket_class.assert_called_once_with( + socket.AF_INET, socket.SOCK_STREAM + ) + mock_socket.settimeout.assert_called_once_with(15) + mock_socket.connect.assert_called_once_with(("192.168.1.1", 3232)) + + # Verify start command was sent + start_cmd = f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8") + mock_socket.sendall.assert_any_call(start_cmd) + + # Verify firmware was sent (at least one chunk) + assert mock_socket.sendall.call_count >= 2 + + # Verify socket was closed + mock_socket.close.assert_called_once() + + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_with_progress_callback(mock_socket_class): + """Test update() with progress callback""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + test_data = b"A" * 1024 # 1KB of data + f.write(test_data) + temp_file = f.name + + try: + # Setup mock socket + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + # Track progress callback calls + progress_calls = [] + + def progress_callback(sent, total): + progress_calls.append((sent, total)) + + # Mock _read_line + with patch.object(ota, "_read_line") as mock_read_line: + mock_read_line.side_effect = [ + "OK", # Device ready + "OK", # Device finished + ] + + ota.update(progress_callback=progress_callback) + + # Verify progress callback was called + assert len(progress_calls) > 0 + # First call should show some progress + assert progress_calls[0][0] > 0 + # Total should be the firmware size + assert progress_calls[0][1] == len(test_data) + # Last call should show all bytes sent + assert progress_calls[-1][0] == len(test_data) + + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_device_error_on_start(mock_socket_class): + """Test update() raises OTAError when device reports error during start""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + with patch.object(ota, "_read_line") as mock_read_line: + mock_read_line.return_value = "ERR BAD_HASH" + + with pytest.raises(OTAError, match="Device reported error"): + ota.update() + + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_device_error_on_finish(mock_socket_class): + """Test update() raises OTAError when device reports error after firmware sent""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + with patch.object(ota, "_read_line") as mock_read_line: + mock_read_line.side_effect = [ + "OK", # Device ready + "ERR FLASH_ERR", # Error after firmware sent + ] + + with pytest.raises(OTAError, match="OTA update failed"): + ota.update() + + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_socket_cleanup_on_error(mock_socket_class): + """Test that socket is properly cleaned up on error""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + # Simulate connection error + mock_socket.connect.side_effect = ConnectionRefusedError("Connection refused") + + with pytest.raises(ConnectionRefusedError): + ota.update() + + # Verify socket was closed even on error + mock_socket.close.assert_called_once() + assert ota._socket is None + + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_large_firmware(mock_socket_class): + """Test update() correctly chunks large firmware files""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + # Create a file larger than chunk_size (1024) + test_data = b"B" * 3000 + f.write(test_data) + temp_file = f.name + + try: + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + with patch.object(ota, "_read_line") as mock_read_line: + mock_read_line.side_effect = [ + "OK", # Device ready + "OK", # Device finished + ] + + ota.update() + + # Verify that all data was sent in chunks + # 3000 bytes should be sent in ~3 chunks of 1024 bytes + sendall_calls = [ + call + for call in mock_socket.sendall.call_args_list + if call[0][0] + != f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8") + ] + # Calculate total data sent (excluding the start command) + total_sent = sum(len(call[0][0]) for call in sendall_calls) + assert total_sent == len(test_data) + + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_unexpected_response_warning(mock_socket_class, caplog): + """Test update() logs warning on unexpected response during startup""" + import logging + + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + with patch.object(ota, "_read_line") as mock_read_line: + mock_read_line.side_effect = [ + "UNKNOWN", # Unexpected response + "OK", # Then proceed + "OK", # Device finished + ] + + with caplog.at_level(logging.WARNING): + ota.update() + + # Check that warning was logged for unexpected response + assert "Unexpected response" in caplog.text + + finally: + os.unlink(temp_file) + + +@pytest.mark.unit +@patch("meshtastic.ota.socket.socket") +def test_esp32_wifi_ota_update_unexpected_final_response(mock_socket_class, caplog): + """Test update() logs warning on unexpected final response after firmware upload""" + import logging + + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(b"firmware") + temp_file = f.name + + try: + mock_socket = MagicMock() + mock_socket_class.return_value = mock_socket + + ota = ESP32WiFiOTA(temp_file, "192.168.1.1") + + with patch.object(ota, "_read_line") as mock_read_line: + mock_read_line.side_effect = [ + "OK", # Device ready for firmware + "UNKNOWN", # Unexpected final response (not OK, not ERR, not ACK) + "OK", # Then succeed + ] + + with caplog.at_level(logging.WARNING): + ota.update() + + # Check that warning was logged for unexpected final response + assert "Unexpected final response" in caplog.text + + finally: + os.unlink(temp_file)