From a39504efedd81156a54db18e81fe73389af871b9 Mon Sep 17 00:00:00 2001 From: whywilson Date: Fri, 8 Aug 2025 14:43:47 +0800 Subject: [PATCH 1/9] Add TCP and UDP connection --- PN532.py | 425 ------------------------------- examples/pn532_udp_fw_version.py | 48 ++++ examples/pn532_usb_fw_version.py | 31 +++ script/pn532_cli_unit.py | 19 +- script/pn532_com.py | 327 ++++++++++++++---------- script/pn532_communication.py | 236 +++++++++++++++++ 6 files changed, 522 insertions(+), 564 deletions(-) delete mode 100755 PN532.py create mode 100644 examples/pn532_udp_fw_version.py create mode 100644 examples/pn532_usb_fw_version.py create mode 100644 script/pn532_communication.py diff --git a/PN532.py b/PN532.py deleted file mode 100755 index a04e382..0000000 --- a/PN532.py +++ /dev/null @@ -1,425 +0,0 @@ -# Example of detecting and reading a block from a MiFare NFC card. -# Author: Manuel Fernando Galindo (mfg90@live.com) -# -# Copyright (c) 2016 Manuel Fernando Galindo -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -from functools import reduce -import time -import serial - -PN532_PREAMBLE = 0x00 -PN532_STARTCODE1 = 0x00 -PN532_STARTCODE2 = 0xFF -PN532_POSTAMBLE = 0x00 - -PN532_HOSTTOPN532 = 0xD4 -PN532_PN532TOHOST = 0xD5 - -# PN532 Commands -PN532_COMMAND_DIAGNOSE = 0x00 -PN532_COMMAND_GETFIRMWAREVERSION = 0x02 -PN532_COMMAND_GETGENERALSTATUS = 0x04 -PN532_COMMAND_READREGISTER = 0x06 -PN532_COMMAND_WRITEREGISTER = 0x08 -PN532_COMMAND_READGPIO = 0x0C -PN532_COMMAND_WRITEGPIO = 0x0E -PN532_COMMAND_SETSERIALBAUDRATE = 0x10 -PN532_COMMAND_SETPARAMETERS = 0x12 -PN532_COMMAND_SAMCONFIGURATION = 0x14 -PN532_COMMAND_POWERDOWN = 0x16 -PN532_COMMAND_RFCONFIGURATION = 0x32 -PN532_COMMAND_RFREGULATIONTEST = 0x58 -PN532_COMMAND_INJUMPFORDEP = 0x56 -PN532_COMMAND_INJUMPFORPSL = 0x46 -PN532_COMMAND_INLISTPASSIVETARGET = 0x4A -PN532_COMMAND_INATR = 0x50 -PN532_COMMAND_INPSL = 0x4E -PN532_COMMAND_INDATAEXCHANGE = 0x40 -PN532_COMMAND_INCOMMUNICATETHRU = 0x42 -PN532_COMMAND_INDESELECT = 0x44 -PN532_COMMAND_INRELEASE = 0x52 -PN532_COMMAND_INSELECT = 0x54 -PN532_COMMAND_INAUTOPOLL = 0x60 -PN532_COMMAND_TGINITASTARGET = 0x8C -PN532_COMMAND_TGSETGENERALBYTES = 0x92 -PN532_COMMAND_TGGETDATA = 0x86 -PN532_COMMAND_TGSETDATA = 0x8E -PN532_COMMAND_TGSETMETADATA = 0x94 -PN532_COMMAND_TGGETINITIATORCOMMAND = 0x88 -PN532_COMMAND_TGRESPONSETOINITIATOR = 0x90 -PN532_COMMAND_TGGETTARGETSTATUS = 0x8A - -PN532_RESPONSE_INDATAEXCHANGE = 0x41 -PN532_RESPONSE_INLISTPASSIVETARGET = 0x4B - -PN532_WAKEUP = 0x55 - -PN532_SPI_STATREAD = 0x02 -PN532_SPI_DATAWRITE = 0x01 -PN532_SPI_DATAREAD = 0x03 -PN532_SPI_READY = 0x01 - -PN532_MIFARE_ISO14443A = 0x00 - -# Mifare Commands -MIFARE_CMD_AUTH_A = 0x60 -MIFARE_CMD_AUTH_B = 0x61 -MIFARE_CMD_READ = 0x30 -MIFARE_CMD_WRITE = 0xA0 -MIFARE_CMD_TRANSFER = 0xB0 -MIFARE_CMD_DECREMENT = 0xC0 -MIFARE_CMD_INCREMENT = 0xC1 -MIFARE_CMD_STORE = 0xC2 -MIFARE_ULTRALIGHT_CMD_WRITE = 0xA2 - -# Prefixes for NDEF Records (to identify record type) -NDEF_URIPREFIX_NONE = 0x00 -NDEF_URIPREFIX_HTTP_WWWDOT = 0x01 -NDEF_URIPREFIX_HTTPS_WWWDOT = 0x02 -NDEF_URIPREFIX_HTTP = 0x03 -NDEF_URIPREFIX_HTTPS = 0x04 -NDEF_URIPREFIX_TEL = 0x05 -NDEF_URIPREFIX_MAILTO = 0x06 -NDEF_URIPREFIX_FTP_ANONAT = 0x07 -NDEF_URIPREFIX_FTP_FTPDOT = 0x08 -NDEF_URIPREFIX_FTPS = 0x09 -NDEF_URIPREFIX_SFTP = 0x0A -NDEF_URIPREFIX_SMB = 0x0B -NDEF_URIPREFIX_NFS = 0x0C -NDEF_URIPREFIX_FTP = 0x0D -NDEF_URIPREFIX_DAV = 0x0E -NDEF_URIPREFIX_NEWS = 0x0F -NDEF_URIPREFIX_TELNET = 0x10 -NDEF_URIPREFIX_IMAP = 0x11 -NDEF_URIPREFIX_RTSP = 0x12 -NDEF_URIPREFIX_URN = 0x13 -NDEF_URIPREFIX_POP = 0x14 -NDEF_URIPREFIX_SIP = 0x15 -NDEF_URIPREFIX_SIPS = 0x16 -NDEF_URIPREFIX_TFTP = 0x17 -NDEF_URIPREFIX_BTSPP = 0x18 -NDEF_URIPREFIX_BTL2CAP = 0x19 -NDEF_URIPREFIX_BTGOEP = 0x1A -NDEF_URIPREFIX_TCPOBEX = 0x1B -NDEF_URIPREFIX_IRDAOBEX = 0x1C -NDEF_URIPREFIX_FILE = 0x1D -NDEF_URIPREFIX_URN_EPC_ID = 0x1E -NDEF_URIPREFIX_URN_EPC_TAG = 0x1F -NDEF_URIPREFIX_URN_EPC_PAT = 0x20 -NDEF_URIPREFIX_URN_EPC_RAW = 0x21 -NDEF_URIPREFIX_URN_EPC = 0x22 -NDEF_URIPREFIX_URN_NFC = 0x23 - -PN532_GPIO_VALIDATIONBIT = 0x80 -PN532_GPIO_P30 = 0 -PN532_GPIO_P31 = 1 -PN532_GPIO_P32 = 2 -PN532_GPIO_P33 = 3 -PN532_GPIO_P34 = 4 -PN532_GPIO_P35 = 5 - -PN532_ACK_STRING = "0000ff00ff00" -PN532_ACK_FRAME = "\x00\x00\xFF\x00\xFF\x00" - - -def millis(): - return int(round(time.time() * 1000)) - -class PN532(object): - - def __init__(self, uart_port = "COM5", uart_baudrate = 115200): - self.status = False - self.message = "" - - print("Port:"+uart_port) - try: - self.ser = serial.Serial(uart_port, uart_baudrate) - self.ser.timeout=2; - self.status = True - except serial.SerialException: - print("Opening port error.") - self.status = False - - def _uint8_add(self, a, b): - """Add add two values as unsigned 8-bit values.""" - return ((a & 0xFF) + (b & 0xFF)) & 0xFF - - def _busy_wait_ms(self, ms): - """Busy wait for the specified number of milliseconds.""" - start = time.time() - delta = ms/1000.0 - while (time.time() - start) <= delta: - pass - - def _write_frame(self, data): - ack = False - """Write a frame to the PN532 with the specified data bytearray.""" - assert data is not None and 0 < len(data) < 255, 'Data must be array of 1 to 255 bytes.' - # Build frame to send as: - # - Preamble (0x00) - # - Start code (0x00, 0xFF) - # - Command length (1 byte) - # - Command length checksum - # - Command bytes - # - Checksum - # - Postamble (0x00) - length = len(data) - frame = bytearray(length+7) - frame[0] = PN532_PREAMBLE - frame[1] = PN532_STARTCODE1 - frame[2] = PN532_STARTCODE2 - frame[3] = length & 0xFF - frame[4] = self._uint8_add(~length, 1) - frame[5:-2] = data - checksum = reduce(self._uint8_add, data, 0xFF) - frame[-2] = ~checksum & 0xFF - frame[-1] = PN532_POSTAMBLE - # Send frame. - # print("sendData: ", frame.hex()) - self.ser.flushInput() - while(not ack): - self.ser.write(frame) - ack = self._ack_wait(1000) - time.sleep(0.3) - return ack - - - def _ack_wait(self, timeout): - ack=False - rx_info=b"" - start_time = millis() - current_time = start_time - while((current_time - start_time) < timeout and not ack): - time.sleep(0.12) - rx_info += self.ser.read(self.ser.inWaiting()) - current_time = millis() - # print("rx_info: ", rx_info.hex()) - if (PN532_ACK_STRING in rx_info.hex()): - ack = True - if(ack): - if(len(rx_info)>6): - # found index of pn322_ack_frame in rx_info and return the remaining bytes - index = rx_info.hex().index(PN532_ACK_STRING) - self.message = rx_info[index+6:] - else: - self.message = rx_info - self.ser.flush() - return ack - else: - self.message = "" - return ack - - def _read_data(self, count): - timeout = 1000 - rx_info="" - if(self.message == ""): - self._ack_wait(timeout) - else: - rx_info = self.message - # print("_read_data rx_info: ", rx_info) - return rx_info - - def _read_frame(self, length): - """Read a response frame from the PN532 of at most length bytes in size. - Returns the data inside the frame if found, otherwise raises an exception - if there is an error parsing the frame. Note that less than length bytes - might be returned! - """ - # Read frame with expected length of data. - response = self._read_data(length+8) - # Check frame starts with 0x01 and then has 0x00FF (preceeded by optional - # zeros). - # print("_read_frame response: ", response.hex()) - if not (PN532_ACK_STRING == response.hex()): - if response[0] != 0x00: - raise RuntimeError('Response frame does not start with 0x01!') - # Swallow all the 0x00 values that preceed 0xFF. - offset = 1 - while response[offset] == 0x00: - offset += 1 - if offset >= len(response): - raise RuntimeError('Response frame preamble does not contain 0x00FF!') - if response[offset] != 0xFF: - raise RuntimeError('Response frame preamble does not contain 0x00FF!') - offset += 1 - if offset >= len(response): - raise RuntimeError('Response contains no data!') - # Check length & length checksum match. - frame_len = response[offset] - if (frame_len + response[offset+1]) & 0xFF != 0: - raise RuntimeError('Response length checksum did not match length!') - # Check frame checksum value matches bytes. - checksum = reduce(self._uint8_add, response[offset+2:offset+2+frame_len+1], 0) - if checksum != 0: - raise RuntimeError('Response checksum did not match expected value!') - # Return frame data. - return response[offset+2:offset+2+frame_len] - else: - return "no_card" - - def wakeup(self): - # write 555500000000000000000000000000000000FF03FDD414011700 - msg = bytearray([0x55, 0x55, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, 0x03, 0xFD, 0xD4, 0x14, 0x01, 0x17, 0x00]) - self.ser.write(msg) - buff = self.ser.readline() - # print(buff.hex()) - - def call_function(self, command, response_length=0, params=[], timeout_sec=1): - """Send specified command to the PN532 and expect up to response_length - bytes back in a response. Note that less than the expected bytes might - be returned! Params can optionally specify an array of bytes to send as - parameters to the function call. Will wait up to timeout_secs seconds - for a response and return a bytearray of response bytes, or None if no - response is available within the timeout. - """ - # Build frame data with command and parameters. - data = bytearray(2+len(params)) - data[0] = PN532_HOSTTOPN532 - data[1] = command & 0xFF - data[2:] = params - # Send frame and wait for response. - if not self._write_frame(data): - return None - # Read response bytes. - response = self._read_frame(response_length+2) - # Check that response is for the called function. - # print("call_function response", response) - if not (response == "no_card"): - if not (response[0] == PN532_PN532TOHOST and response[1] == (command+1)): - raise RuntimeError('Received unexpected command response!') - # Return response data. - return response[2:] - else: - return response - - def begin(self): - """Initialize communication with the PN532. Must be called before any - other calls are made against the PN532. - """ - self.wakeup() - - def get_firmware_version(self): - """Call PN532 GetFirmwareVersion function and return a tuple with the IC, - Ver, Rev, and Support values. - """ - response = self.call_function(PN532_COMMAND_GETFIRMWAREVERSION, 4) - # print(response.hex()) - if response is None: - raise RuntimeError('Failed to detect the PN532! Make sure there is sufficient power (use a 1 amp or greater power supply), the PN532 is wired correctly to the device, and the solder joints on the PN532 headers are solidly connected.') - return (response[0], response[1], response[2], response[3]) - - def SAM_configuration(self): - """Configure the PN532 to read MiFare cards.""" - # Send SAM configuration command with configuration for: - # - 0x01, normal mode - # - 0x14, timeout 50ms * 20 = 1 second - # - 0x01, use IRQ pin - # Note that no other verification is necessary as call_function will - # check the command was executed as expected. - self.call_function(PN532_COMMAND_SAMCONFIGURATION, params=[0x01, 0x14, 0x01]) - - def read_passive_target(self, card_baud=PN532_MIFARE_ISO14443A, timeout_sec=1): - """Wait for a MiFare card to be available and return its UID when found. - Will wait up to timeout_sec seconds and return None if no card is found, - otherwise a bytearray with the UID of the found card is returned. - """ - # Send passive read command for 1 card. Expect at most a 7 byte UUID. - response = self.call_function(PN532_COMMAND_INLISTPASSIVETARGET, - params=[0x01, card_baud], - response_length=17) - # print("read_passive_target response", response) - # If no response is available return None to indicate no card is present. - if response is None: - return None - if not (response == "no_card"): - # Check only 1 card with up to a 7 byte UID is present. - - if response[0] == 0x00: - return None - if response[0] != 0x01: - raise RuntimeError('More than one card detected!') - if response[5] > 7: - raise RuntimeError('Found card with unexpectedly long UID!') - # Return UID of card. - return response[6:6+response[5]] - else: - return response - - - def mifare_classic_authenticate_block(self, uid, block_number, key_number, key): - """Authenticate specified block number for a MiFare classic card. Uid - should be a byte array with the UID of the card, block number should be - the block to authenticate, key number should be the key type (like - MIFARE_CMD_AUTH_A or MIFARE_CMD_AUTH_B), and key should be a byte array - with the key data. Returns True if the block was authenticated, or False - if not authenticated. - """ - # Build parameters for InDataExchange command to authenticate MiFare card. - uidlen = len(uid) - keylen = len(key) - params = bytearray(3+uidlen+keylen) - params[0] = 0x01 # Max card numbers - params[1] = key_number & 0xFF - params[2] = block_number & 0xFF - params[3:3+keylen] = key - params[3+keylen:] = uid - # Send InDataExchange request and verify response is 0x00. - response = self.call_function(PN532_COMMAND_INDATAEXCHANGE, - params=params, - response_length=1) - return response[0] == 0x00 - - def mifare_classic_read_block(self, block_number): - """Read a block of data from the card. Block number should be the block - to read. If the block is successfully read a bytearray of length 16 with - data starting at the specified block will be returned. If the block is - not read then None will be returned. - """ - # Send InDataExchange request to read block of MiFare data. - response = self.call_function(PN532_COMMAND_INDATAEXCHANGE, - params=[0x01, MIFARE_CMD_READ, block_number & 0xFF], - response_length=17) - # Check first response is 0x00 to show success. - if response[0] != 0x00: - return None - # Return first 4 bytes since 16 bytes are always returned. - return response[1:] - - def mifare_classic_write_block(self, block_number, data): - """Write a block of data to the card. Block number should be the block - to write and data should be a byte array of length 16 with the data to - write. If the data is successfully written then True is returned, - otherwise False is returned. - """ - assert data is not None and len(data) == 16, 'Data must be an array of 16 bytes!' - # Build parameters for InDataExchange command to do MiFare classic write. - params = bytearray(19) - params[0] = 0x01 # Max card numbers - params[1] = MIFARE_CMD_WRITE - params[2] = block_number & 0xFF - params[3:] = data - # Send InDataExchange request. - response = self.call_function(PN532_COMMAND_INDATAEXCHANGE, - params=params, - response_length=1) - return response[0] == 0x00 diff --git a/examples/pn532_udp_fw_version.py b/examples/pn532_udp_fw_version.py new file mode 100644 index 0000000..21f4520 --- /dev/null +++ b/examples/pn532_udp_fw_version.py @@ -0,0 +1,48 @@ +import pn532_com +from pn532_cmd import Pn532CMD + +import os +import subprocess +from platform import uname +import sys +import select +import serial.tools.list_ports + +def test_fn(): + dev = pn532_com.Pn532Com() + # Connect via UDP to local test server + try: + dev.open("udp:192.168.0.109:18888") + if not dev.isOpen(): + print("Failed to connect to udp:192.168.0.109:18888") + return + print(f"Connected to udp:192.168.0.109:18888") + cml = Pn532CMD(dev) + except Exception as e: + print(f"Connection failed: {e}") + return + try: + print("Getting firmware version...") + fw_version = cml.get_firmware_version() + print("FW Version:", fw_version) + hf_14a_scan_result = cml.hf_14a_scan() + if hf_14a_scan_result is not None: + for data_tag in hf_14a_scan_result: + print(f"- UID: {data_tag['uid'].hex().upper()}") + print( + f"- ATQA: {data_tag['atqa'].hex().upper()} " + f"(0x{int.from_bytes(data_tag['atqa'], byteorder='little'):04x})" + ) + print(f"- SAK: {data_tag['sak'].hex().upper()}") + if "ats" in data_tag and len(data_tag["ats"]) > 0: + print(f"- ATS: {data_tag['ats'].hex().upper()}") + else: + print("ISO14443-A Tag not found") + except Exception as e: + print("Error:", e) + import traceback + traceback.print_exc() + dev.close() + +if __name__ == "__main__": + test_fn() diff --git a/examples/pn532_usb_fw_version.py b/examples/pn532_usb_fw_version.py new file mode 100644 index 0000000..48c8dc3 --- /dev/null +++ b/examples/pn532_usb_fw_version.py @@ -0,0 +1,31 @@ +import pn532_com +from pn532_cmd import Pn532CMD + +import os +import sys + +def test_fn(): + dev = pn532_com.Pn532Com() + # Connect via UDP to local test server + try: + dev.open("/dev/tty.wchusbserial210") + if not dev.isOpen(): + print("Failed to connect to /dev/tty.wchusbserial210") + return + print(f"Connected to /dev/tty.wchusbserial210") + cml = Pn532CMD(dev) + except Exception as e: + print(f"Connection failed: {e}") + return + try: + print("Getting firmware version...") + fw_version = cml.get_firmware_version() + print("FW Version:", fw_version) + except Exception as e: + print("Error:", e) + import traceback + traceback.print_exc() + dev.close() + +if __name__ == "__main__": + test_fn() diff --git a/script/pn532_cli_unit.py b/script/pn532_cli_unit.py index 6ac601d..256ee50 100644 --- a/script/pn532_cli_unit.py +++ b/script/pn532_cli_unit.py @@ -1165,8 +1165,9 @@ def on_exec(self, args: argparse.Namespace): class HWConnect(BaseCLIUnit): def args_parser(self) -> ArgumentParserNoExit: parser = ArgumentParserNoExit() - parser.description = "Connect to pn532 by serial port" - parser.add_argument("-p", "--port", type=str, required=False) + parser.description = "Connect to pn532 by serial port, TCP or UDP" + parser.add_argument("-p", "--port", type=str, required=False, + help="Connection string: /dev/ttyUSB0, COM3, tcp:192.168.1.100:1234, udp:192.168.1.100:2345") return parser def on_exec(self, args: argparse.Namespace): @@ -1215,9 +1216,21 @@ def on_exec(self, args: argparse.Namespace): print( "PN532 not found, please connect the device or try connecting manually with the -p flag." ) + print("Examples:") + print(" hw connect -p /dev/ttyUSB0 # Serial connection") + print(" hw connect -p COM3 # Windows serial connection") + print(" hw connect -p tcp:192.168.1.100:1234 # TCP connection") + print(" hw connect -p udp:192.168.1.100:2345 # UDP connection") return # print connecting to device name - print(f"Connecting to device on port {args.port}") + + if args.port.startswith('tcp:'): + print(f"Connecting to device via TCP: {args.port[4:]}") + elif args.port.startswith('udp:'): + print(f"Connecting to device via UDP: {args.port[4:]}") + else: + print(f"Connecting to device on serial port: {args.port}") + self.device_com.open(args.port) print("Device:", self.device_com.get_device_name()) except Exception as e: diff --git a/script/pn532_com.py b/script/pn532_com.py index 8f8a90c..d5b98c6 100644 --- a/script/pn532_com.py +++ b/script/pn532_com.py @@ -8,6 +8,7 @@ from pn532_enum import Command, Pn532KillerCommand, Status, PN532KillerMode, PN532KillerTagType from pn532_enum import BasicCapabilities, PN532Capabilities, PN532KillerCapabilities from pn532_utils import CC, CB, CG, C0, CY, CR +from pn532_communication import CommunicationInterface, CommunicationFactory DEBUG = False THREAD_BLOCKING_TIMEOUT = 0.1 @@ -39,7 +40,7 @@ def __init__(self): """ Create a PN532 device instance """ - self.serial_instance: Union[serial.Serial, None] = None + self.communication: Union[CommunicationInterface, None] = None self.send_data_queue = queue.Queue() self.wait_response_map = {} self.event_closing = threading.Event() @@ -54,28 +55,30 @@ def get_device_name(self) -> str: return self.device_name def isOpen(self) -> bool: - return self.serial_instance is not None and self.serial_instance.is_open + return self.communication is not None and self.communication.is_open() def open(self, port) -> "Pn532Com": if not self.isOpen(): error = None try: - # open serial port - self.serial_instance = serial.Serial(port=port, baudrate=115200) - # print device name + # 创建通信接口 + self.communication = CommunicationFactory.create_communication(port) + protocol_type, actual_address = CommunicationFactory.parse_address(port) + + # 打开连接 + if not self.communication.open(actual_address): + raise Exception(f"Failed to open {protocol_type} connection to {actual_address}") + + print(f"Opened {protocol_type} connection to {actual_address}") except Exception as e: error = e finally: if error is not None: raise OpenFailException(error) - assert self.serial_instance is not None - try: - self.serial_instance.dtr = False - self.serial_instance.rts = False - except Exception: - print("Failed to set DTR/RTS") - pass - self.serial_instance.timeout = THREAD_BLOCKING_TIMEOUT + + assert self.communication is not None + self.communication.set_timeout(THREAD_BLOCKING_TIMEOUT) + # clear variable self.send_data_queue.queue.clear() self.wait_response_map.clear() @@ -104,17 +107,18 @@ def dcs(self, array: bytearray) -> int: def close(self): self.event_closing.set() try: - assert self.serial_instance is not None - self.serial_instance.close() + if self.communication is not None: + self.communication.close() except Exception: pass finally: - self.serial_instance = None + self.communication = None self.wait_response_map.clear() self.send_data_queue.queue.clear() def set_normal_mode(self) -> response: - self.serial_instance.write(bytes.fromhex("5500000000000000000000000000")) + self.communication.write(bytes.fromhex("5500000000000000000000000000")) + time.sleep(0.1) response = self.send_cmd_sync(Command.SAMConfiguration, bytes.fromhex("01")) return response @@ -192,136 +196,187 @@ def thread_data_receive(self): skip_pattern = bytearray.fromhex("0000ff00ff00") skip_pattern_length = len(skip_pattern) + def reset_frame_parsing(): + nonlocal data_position + data_position = 0 + + def clear_buffer(): + nonlocal data_buffer, data_position, data_length + data_buffer.clear() + data_position = 0 + data_length = 0x0000 + + def check_for_ack_frame(): + nonlocal data_buffer + if len(data_buffer) >= skip_pattern_length: + if data_buffer[:skip_pattern_length] == skip_pattern: + if DEBUG: + print(f"Received ACK frame at start, removing from buffer. Buffer before: {data_buffer.hex()}") + data_buffer = data_buffer[skip_pattern_length:] + if DEBUG: + print(f"Buffer after ACK removal: {data_buffer.hex()}") + return True + if data_buffer[-skip_pattern_length:] == skip_pattern: + if DEBUG: + print("Received ACK frame at end, removing from buffer") + data_buffer = data_buffer[:-skip_pattern_length] + return True + return False + while self.isOpen(): try: - assert self.serial_instance is not None - data_bytes = self.serial_instance.read() + assert self.communication is not None + data_bytes = self.communication.read(32) except Exception as e: if not self.event_closing.is_set(): - print(f"Serial Error {e}, thread for receiver exit.") + print(f"Communication Error {e}, thread for receiver exit.") self.close() break if len(data_bytes) > 0: - data_byte = data_bytes[0] - data_buffer.append(data_byte) - # print(data_position, data_buffer.hex(), len(data_buffer), "data_length", data_length) - - # 检查是否需要跳过模式 - if len(data_buffer) >= skip_pattern_length: - if data_buffer[-skip_pattern_length:] == skip_pattern: - # print(" Skipping pattern:", skip_pattern.hex()) - data_buffer.clear() - data_position = 0 - continue - - # 验证前导码和起始码 - if data_position == 0: - if len(data_buffer) < 1: - print( - "Waiting for more bytes at position 0" - ) # Wait for more bytes - if data_buffer[0] != self.data_preamble[0]: - print("Data frame no preamble byte.") - data_position = 0 - data_buffer.clear() - continue - elif data_position == 1: - if len(data_buffer) < 2: - print( - "Waiting for more bytes at position 1" - ) # Wait for more bytes - # if data_buffer[1] != 0x00: - # print("Data frame start code error.") - # continue - elif data_position == 2: - if len(data_buffer) < 3: - print( - "Waiting for more bytes at position 2" - ) # Wait for more bytes - # if data_buffer[2] != 0xFF: - # print("Data frame start code error.") - # continue - elif data_position == 3: - if len(data_buffer) < 4: - print( - "Waiting for more bytes at position 3" - ) # Wait for more bytes - data_length = data_buffer[3] # Get the data length byte - elif data_position == 4 + data_length and data_length > 0: - if data_buffer[3] != self.dcs(data_buffer[4:5]): - print("Data frame LCS error.") - data_position = 0 - data_buffer.clear() - continue - # print("lengh checksum ok") - elif data_position == 4 + data_length + 1: - # Check DCS - if data_buffer[4 + data_length + 1] != self.dcs( - data_buffer[5 : 4 + data_length + 1] - ): - print("Data frame DCS error.") - data_position = 0 - data_buffer.clear() - continue - # print("data checksum ok") - elif data_position == 4 + data_length + 2: - if len(data_buffer) < 4 + data_length + 3: - print("len(data_buffer) < 4 + data_length + 3:") - continue - # Check POSTAMBLE - if data_buffer[4 + data_length + 2] != 0x00: - print("Data frame POSTAMBLE error.") - data_position = 0 - data_buffer.clear() - continue - # Process complete frame - data_response = bytes(data_buffer[5 : 5 + data_length]) - if data_response[0] != self.data_tfi_receive: - # print("Data frame TFI error.") - data_position = 0 - data_buffer.clear() + if DEBUG: + print(f"Received bytes: {data_bytes.hex()}") + data_buffer.extend(data_bytes) + + if DEBUG: + print(f"Buffer after extend: {data_buffer.hex()}") + + while len(data_buffer) > 0: + if check_for_ack_frame(): + reset_frame_parsing() + if DEBUG: + print(f"After ACK removal, continuing with buffer: {data_buffer.hex()}") continue - if len(data_response) < 2: - print("Data frame length error.") - data_position = 0 - data_buffer.clear() - continue - # get cmd - data_cmd = data_response[1] - 1 - if data_cmd in self.wait_response_map: + if len(data_buffer) > 300: if DEBUG: - print(f"<= {CY}{data_buffer.hex().upper()}{C0}") - # update wait_response_map - response = Response(data_cmd, Status.SUCCESS, data_response[2:]) - if ( - data_cmd == Command.InCommunicateThru or data_cmd == Command.InDataExchange - and len(data_response) > 2 + print("Buffer too long, resetting frame parsing") + clear_buffer() + break + + if data_position == 0 and len(data_buffer) < 1: + break + elif data_position == 1 and len(data_buffer) < 2: + break + elif data_position == 2 and len(data_buffer) < 3: + break + elif data_position == 3 and len(data_buffer) < 4: + break + elif data_position == 4 and len(data_buffer) < 5: + break + elif data_position >= 5 and len(data_buffer) < 6 + data_length: + break + + if data_position == 0: + if data_buffer[0] != self.data_preamble[0]: + if DEBUG: + print(f"Data frame no preamble byte: {data_buffer[0]:02x}") + preamble_pos = -1 + for i in range(len(data_buffer)): + if data_buffer[i] == self.data_preamble[0]: + preamble_pos = i + break + if preamble_pos > 0: + data_buffer = data_buffer[preamble_pos:] + data_position = 0 + continue + else: + clear_buffer() + break + elif data_position == 1: + if data_buffer[1] != self.data_start_code[0]: + if DEBUG: + print(f"Data frame start code error at position 1: {data_buffer[1]:02x}") + clear_buffer() + break + elif data_position == 2: + if data_buffer[2] != self.data_start_code[1]: + if DEBUG: + print(f"Data frame start code error at position 2: {data_buffer[2]:02x}") + clear_buffer() + break + elif data_position == 3: + data_length = data_buffer[3] # Get the data length byte + elif data_position == 4: + # Check length checksum (LCS) + length_checksum = data_buffer[4] + if (data_length + length_checksum) & 0xFF != 0: + if DEBUG: + print(f"Data frame LCS error: len={data_length:02x} lcs={length_checksum:02x}") + clear_buffer() + break + # print("length checksum ok") + elif data_position == 5 + data_length: + # Check DCS (Data Checksum) + if data_buffer[5 + data_length] != self.dcs( + data_buffer[5 : 5 + data_length] ): - response = Response( - data_cmd, data_response[2], data_response[2:] - ) - if data_response[2] == 0 and len(data_response) > 16: + if DEBUG: + print("Data frame DCS error.") + clear_buffer() + break + # print("data checksum ok") + elif data_position == 6 + data_length: + # Check POSTAMBLE + if data_buffer[6 + data_length] != 0x00: + if DEBUG: + print("Data frame POSTAMBLE error.") + clear_buffer() + break + # Process complete frame + data_response = bytes(data_buffer[5 : 5 + data_length]) + if len(data_response) == 0: + if DEBUG: + print("Data frame is empty.") + clear_buffer() + break + if data_response[0] != self.data_tfi_receive: + if DEBUG: + print("Data frame TFI error.") + clear_buffer() + break + + if len(data_response) < 2: + if DEBUG: + print("Data frame length error.") + clear_buffer() + break + # get cmd + data_cmd = data_response[1] - 1 + if DEBUG: + print(f"Parsed command: {data_cmd}, waiting for: {list(self.wait_response_map.keys())}") + if data_cmd in self.wait_response_map: + if DEBUG: + print(f"<= {CY}{data_buffer[:7+data_length].hex().upper()}{C0}") + # update wait_response_map + response = Response(data_cmd, Status.SUCCESS, data_response[2:]) + if ( + data_cmd == Command.InCommunicateThru or data_cmd == Command.InDataExchange + and len(data_response) > 2 + ): response = Response( - data_cmd, - data_response[2], - data_response[3: data_length], + data_cmd, data_response[2], data_response[2:] ) - self.wait_response_map[data_cmd]["response"] = response - fn_call = self.wait_response_map[data_cmd].get("callback") - if callable(fn_call): - print("run callback") - del self.wait_response_map[data_cmd] - fn_call(data_cmd, data_status, data_response) - else: - if DEBUG: - print(f"No task waiting for process: {data_cmd}") - pass - data_position = 0 - data_buffer.clear() - continue - data_position += 1 + if data_response[2] == 0 and len(data_response) > 16: + response = Response( + data_cmd, + data_response[2], + data_response[3: 3 + data_length - 3], # 修复数据长度计算 + ) + self.wait_response_map[data_cmd]["response"] = response + fn_call = self.wait_response_map[data_cmd].get("callback") + if callable(fn_call): + print("run callback") + del self.wait_response_map[data_cmd] + fn_call(data_cmd, data_status, data_response) + else: + if DEBUG: + print(f"No task waiting for process: {data_cmd}") + pass + clear_buffer() + break + + data_position += 1 def thread_data_transfer(self): while self.isOpen(): @@ -348,12 +403,12 @@ def thread_data_transfer(self): self.wait_response_map[task_cmd]["end_time"] = start_time + task_timeout self.wait_response_map[task_cmd]["is_timeout"] = False try: - assert self.serial_instance is not None + assert self.communication is not None if DEBUG: print(f'=> {CY}{task["frame"].hex().upper()}{C0}') - self.serial_instance.write(task["frame"]) + self.communication.write(task["frame"]) except Exception as e: - print(f"Serial Error {e}, thread for transfer exit.") + print(f"Communication Error {e}, thread for transfer exit.") self.close() break diff --git a/script/pn532_communication.py b/script/pn532_communication.py new file mode 100644 index 0000000..9f8c3d2 --- /dev/null +++ b/script/pn532_communication.py @@ -0,0 +1,236 @@ +""" +PN532 Communication Interface +Abstract communication interface, supports serial, TCP, UDP, etc. +""" +import socket +import threading +import time +import queue +from abc import ABC, abstractmethod +from typing import Union, Optional +import serial +import re + +class CommunicationInterface(ABC): + """Abstract communication interface""" + + @abstractmethod + def is_open(self) -> bool: + """Check if the connection is open""" + pass + + @abstractmethod + def open(self, address: str) -> bool: + """Open the connection""" + pass + + @abstractmethod + def close(self) -> None: + """Close the connection""" + pass + + @abstractmethod + def write(self, data: bytes) -> int: + """Write data""" + pass + + @abstractmethod + def read(self, size: int = 1) -> bytes: + """Read data""" + pass + + @abstractmethod + def set_timeout(self, timeout: float) -> None: + """Set timeout""" + pass + +class SerialCommunication(CommunicationInterface): + """Serial communication implementation""" + + def __init__(self): + self.serial_instance: Optional[serial.Serial] = None + + def is_open(self) -> bool: + return self.serial_instance is not None and self.serial_instance.is_open + + def open(self, address: str) -> bool: + try: + self.serial_instance = serial.Serial(port=address, baudrate=115200) + self.serial_instance.dtr = False + self.serial_instance.rts = False + return True + except Exception as e: + print(f"Serial connection failed: {e}") + return False + + def close(self) -> None: + if self.serial_instance: + self.serial_instance.close() + self.serial_instance = None + + def write(self, data: bytes) -> int: + if self.serial_instance: + return self.serial_instance.write(data) + return 0 + + def read(self, size: int = 1) -> bytes: + if self.serial_instance: + return self.serial_instance.read(size) + return b'' + + def set_timeout(self, timeout: float) -> None: + if self.serial_instance: + self.serial_instance.timeout = timeout + +class TCPCommunication(CommunicationInterface): + """TCP communication implementation""" + + def __init__(self): + self.socket_instance: Optional[socket.socket] = None + self.timeout = 5.0 # Increase timeout to 5 seconds + + def is_open(self) -> bool: + return self.socket_instance is not None + + def open(self, address: str) -> bool: + try: + # Parse address format: host:port + host, port = address.rsplit(':', 1) + port = int(port) + + self.socket_instance = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket_instance.settimeout(self.timeout) + self.socket_instance.connect((host, port)) + return True + except Exception as e: + print(f"TCP connection failed: {e}") + return False + + def close(self) -> None: + if self.socket_instance: + self.socket_instance.close() + self.socket_instance = None + + def write(self, data: bytes) -> int: + if self.socket_instance: + try: + self.socket_instance.send(data) + return len(data) + except Exception as e: + print(f"TCP write failed: {e}") + return 0 + return 0 + + def read(self, size: int = 1) -> bytes: + if self.socket_instance: + try: + return self.socket_instance.recv(size) + except socket.timeout: + return b'' + except Exception as e: + print(f"TCP read failed: {e}") + return b'' + return b'' + + def set_timeout(self, timeout: float) -> None: + self.timeout = timeout + if self.socket_instance: + self.socket_instance.settimeout(timeout) + +class UDPCommunication(CommunicationInterface): + """UDP communication implementation""" + + def __init__(self): + self.socket_instance: Optional[socket.socket] = None + self.server_address: Optional[tuple] = None + self.timeout = 1.0 + + def is_open(self) -> bool: + return self.socket_instance is not None + + def open(self, address: str) -> bool: + try: + # Parse address format: host:port + host, port = address.rsplit(':', 1) + port = int(port) + + self.socket_instance = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket_instance.settimeout(self.timeout) + self.server_address = (host, port) + + # UDP is connectionless, but we can send a test packet to verify connection + test_data = b'\x00\x00\xFF\x00\xFF\x00' # PN532 test frame + self.socket_instance.sendto(test_data, self.server_address) + + return True + except Exception as e: + print(f"UDP connection failed: {e}") + return False + + def close(self) -> None: + if self.socket_instance: + self.socket_instance.close() + self.socket_instance = None + self.server_address = None + + def write(self, data: bytes) -> int: + if self.socket_instance and self.server_address: + try: + self.socket_instance.sendto(data, self.server_address) + return len(data) + except Exception as e: + print(f"UDP write failed: {e}") + return 0 + return 0 + + def read(self, size: int = 1) -> bytes: + if self.socket_instance: + try: + data, addr = self.socket_instance.recvfrom(size) + return data + except socket.timeout: + return b'' + except Exception as e: + print(f"UDP read failed: {e}") + return b'' + return b'' + + def set_timeout(self, timeout: float) -> None: + self.timeout = timeout + if self.socket_instance: + self.socket_instance.settimeout(timeout) + +class CommunicationFactory: + """Communication interface factory class""" + + @staticmethod + def create_communication(address: str) -> CommunicationInterface: + """ + Create corresponding communication interface according to address format + + Supported formats: + - tcp:192.168.1.100:1234 - TCP connection + - udp:192.168.1.100:2345 - UDP connection + - /dev/ttyUSB0 or COM3 - Serial connection + """ + if address.startswith('tcp:'): + tcp_address = address[4:] # Remove 'tcp:' prefix + return TCPCommunication() + elif address.startswith('udp:'): + udp_address = address[4:] # Remove 'udp:' prefix + return UDPCommunication() + else: + # Default to serial connection + return SerialCommunication() + + @staticmethod + def parse_address(address: str) -> tuple[str, str]: + """ + Parse address, return (protocol type, actual address) + """ + if address.startswith('tcp:'): + return 'tcp', address[4:] + elif address.startswith('udp:'): + return 'udp', address[4:] + else: + return 'serial', address From 377e1ae29c0af5f912b0a264d2c641804d6a8811 Mon Sep 17 00:00:00 2001 From: whywilson Date: Fri, 8 Aug 2025 17:51:47 +0800 Subject: [PATCH 2/9] Show USB, TCP or UPD label after connected. --- script/pn532_cli_main.py | 31 +++++++++++++++---- script/pn532_com.py | 19 ------------ .../pn532_tcp_testing.py | 13 +++----- .../pn532_usb_testing.py | 1 - 4 files changed, 29 insertions(+), 35 deletions(-) rename examples/pn532_udp_fw_version.py => script/pn532_tcp_testing.py (81%) rename examples/pn532_usb_fw_version.py => script/pn532_usb_testing.py (94%) diff --git a/script/pn532_cli_main.py b/script/pn532_cli_main.py index 0eb5770..53082db 100644 --- a/script/pn532_cli_main.py +++ b/script/pn532_cli_main.py @@ -123,12 +123,31 @@ def set_device_name(self, device_name): self.device_com.set_device_name(device_name) def get_prompt(self): - """ - Retrieve the cli prompt - - :return: current cmd prompt - """ - device_string = f"{CG}USB" if self.device_com.isOpen() else f"{CR}Offline" + # Retrieve the cli prompt + # :return: current cmd prompt + if self.device_com.isOpen(): + # 判断连接类型 + port = getattr(self.device_com, 'port', None) + if port is None and hasattr(self.device_com, 'communication') and hasattr(self.device_com.communication, 'serial_instance'): + # Serial connection + port = getattr(self.device_com.communication.serial_instance, 'port', None) + conn_type = "USB" + if port: + if isinstance(port, str): + if port.startswith('tcp:'): + conn_type = "TCP" + elif port.startswith('udp:'): + conn_type = "UDP" + # 兼容 CommunicationFactory 连接 + if hasattr(self.device_com, 'communication'): + comm = self.device_com.communication + if comm.__class__.__name__ == "TCPCommunication": + conn_type = "TCP" + elif comm.__class__.__name__ == "UDPCommunication": + conn_type = "UDP" + device_string = f"{CG}{conn_type}" + else: + device_string = f"{CR}Offline" device_name = self.device_com.get_device_name() status = f"[{device_string}{C0}] {device_name} --> " return status diff --git a/script/pn532_com.py b/script/pn532_com.py index d5b98c6..c18996e 100644 --- a/script/pn532_com.py +++ b/script/pn532_com.py @@ -210,15 +210,9 @@ def check_for_ack_frame(): nonlocal data_buffer if len(data_buffer) >= skip_pattern_length: if data_buffer[:skip_pattern_length] == skip_pattern: - if DEBUG: - print(f"Received ACK frame at start, removing from buffer. Buffer before: {data_buffer.hex()}") data_buffer = data_buffer[skip_pattern_length:] - if DEBUG: - print(f"Buffer after ACK removal: {data_buffer.hex()}") return True if data_buffer[-skip_pattern_length:] == skip_pattern: - if DEBUG: - print("Received ACK frame at end, removing from buffer") data_buffer = data_buffer[:-skip_pattern_length] return True return False @@ -234,23 +228,14 @@ def check_for_ack_frame(): break if len(data_bytes) > 0: - if DEBUG: - print(f"Received bytes: {data_bytes.hex()}") data_buffer.extend(data_bytes) - if DEBUG: - print(f"Buffer after extend: {data_buffer.hex()}") - while len(data_buffer) > 0: if check_for_ack_frame(): reset_frame_parsing() - if DEBUG: - print(f"After ACK removal, continuing with buffer: {data_buffer.hex()}") continue if len(data_buffer) > 300: - if DEBUG: - print("Buffer too long, resetting frame parsing") clear_buffer() break @@ -269,8 +254,6 @@ def check_for_ack_frame(): if data_position == 0: if data_buffer[0] != self.data_preamble[0]: - if DEBUG: - print(f"Data frame no preamble byte: {data_buffer[0]:02x}") preamble_pos = -1 for i in range(len(data_buffer)): if data_buffer[i] == self.data_preamble[0]: @@ -285,8 +268,6 @@ def check_for_ack_frame(): break elif data_position == 1: if data_buffer[1] != self.data_start_code[0]: - if DEBUG: - print(f"Data frame start code error at position 1: {data_buffer[1]:02x}") clear_buffer() break elif data_position == 2: diff --git a/examples/pn532_udp_fw_version.py b/script/pn532_tcp_testing.py similarity index 81% rename from examples/pn532_udp_fw_version.py rename to script/pn532_tcp_testing.py index 21f4520..82debc2 100644 --- a/examples/pn532_udp_fw_version.py +++ b/script/pn532_tcp_testing.py @@ -1,22 +1,17 @@ import pn532_com from pn532_cmd import Pn532CMD -import os -import subprocess from platform import uname -import sys -import select -import serial.tools.list_ports def test_fn(): dev = pn532_com.Pn532Com() - # Connect via UDP to local test server + # Connect via TCP to local test server try: - dev.open("udp:192.168.0.109:18888") + dev.open("tcp:192.168.20.32:18889") if not dev.isOpen(): - print("Failed to connect to udp:192.168.0.109:18888") + print("Failed to connect to tcp:192.168.20.32:18889") return - print(f"Connected to udp:192.168.0.109:18888") + print(f"Connected to tcp:192.168.20.32:18889") cml = Pn532CMD(dev) except Exception as e: print(f"Connection failed: {e}") diff --git a/examples/pn532_usb_fw_version.py b/script/pn532_usb_testing.py similarity index 94% rename from examples/pn532_usb_fw_version.py rename to script/pn532_usb_testing.py index 48c8dc3..2f7cca4 100644 --- a/examples/pn532_usb_fw_version.py +++ b/script/pn532_usb_testing.py @@ -6,7 +6,6 @@ def test_fn(): dev = pn532_com.Pn532Com() - # Connect via UDP to local test server try: dev.open("/dev/tty.wchusbserial210") if not dev.isOpen(): From 1f15e032189422417582b9e54ca1f8605543978d Mon Sep 17 00:00:00 2001 From: whywilson Date: Sat, 9 Aug 2025 07:07:48 +0800 Subject: [PATCH 3/9] Add delay for PN532 wakeup. --- script/pn532_com.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/script/pn532_com.py b/script/pn532_com.py index c18996e..28296c6 100644 --- a/script/pn532_com.py +++ b/script/pn532_com.py @@ -88,8 +88,9 @@ def open(self, port) -> "Pn532Com": threading.Thread(target=self.thread_data_transfer).start() threading.Thread(target=self.thread_check_timeout).start() + time.sleep(0.5) self.set_normal_mode() - time.sleep(0.01) + time.sleep(0.5) is_pn532killer = self.is_pn532killer() if is_pn532killer: self.device_name = "PN532Killer" From fc7bc4ab60bf9dbb169d6d0ba60b3efac07c0b3b Mon Sep 17 00:00:00 2001 From: whywilson Date: Mon, 11 Aug 2025 20:01:06 +0800 Subject: [PATCH 4/9] Improve the data flow control. --- script/pn532_cli_main.py | 14 +- script/pn532_cli_unit.py | 333 +------------- script/pn532_cmd.py | 109 ++--- script/pn532_com.py | 461 ++++++++++--------- script/pn532_communication.py | 110 ++++- script/pn532_ntag_reader.py | 4 +- script/pn532_tag_scanner.py | 4 +- script/pn532_tcp_testing.py | 43 -- script/pn532killer_mf1_emulator.py | 4 +- script/test_tcp_udp.py | 87 ++++ script/{pn532_usb_testing.py => test_usb.py} | 7 +- 11 files changed, 522 insertions(+), 654 deletions(-) delete mode 100644 script/pn532_tcp_testing.py create mode 100644 script/test_tcp_udp.py rename script/{pn532_usb_testing.py => test_usb.py} (77%) diff --git a/script/pn532_cli_main.py b/script/pn532_cli_main.py index 53082db..7c9426a 100644 --- a/script/pn532_cli_main.py +++ b/script/pn532_cli_main.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -*- import argparse import sys import traceback @@ -127,10 +128,7 @@ def get_prompt(self): # :return: current cmd prompt if self.device_com.isOpen(): # 判断连接类型 - port = getattr(self.device_com, 'port', None) - if port is None and hasattr(self.device_com, 'communication') and hasattr(self.device_com.communication, 'serial_instance'): - # Serial connection - port = getattr(self.device_com.communication.serial_instance, 'port', None) + port = getattr(self.device_com, 'port_string', None) conn_type = "USB" if port: if isinstance(port, str): @@ -174,6 +172,14 @@ def startCLI(self): cmd_strs = [] cmd_str = '' while True: + # Check connection status before prompting + if self.device_com.isOpen(): + # Double check if communication is still valid + if hasattr(self.device_com, 'communication') and self.device_com.communication: + if not self.device_com.communication.is_open(): + print(f"{colorama.Fore.RED}Connection lost! Device disconnected.{colorama.Style.RESET_ALL}") + self.device_com.close() + if cmd_strs: print(f"{colorama.Fore.GREEN}>>> {cmd_strs[-1]}{colorama.Style.RESET_ALL}") # cmd_str = cmd_strs.pop(0) diff --git a/script/pn532_cli_unit.py b/script/pn532_cli_unit.py index 256ee50..9d75854 100644 --- a/script/pn532_cli_unit.py +++ b/script/pn532_cli_unit.py @@ -364,6 +364,12 @@ def on_exec(self, args: argparse.Namespace): if len(data) % 2 != 0: data = "0" + data data_bytes = bytes.fromhex(data) + # 如果用户输入的是完整帧 (以 0000FF 开头) 或 唤醒序列(55000...),直接发送不封装 + if data_bytes.startswith(b"\x00\x00\xFF") or data_bytes.startswith(b"\x55\x00"): + self.device_com.send_raw_frame(data_bytes) + print("Frame sent (raw, no wrapping)") + return + # 否则按命令格式: 第一个字节是cmd,其余是data resp = self.device_com.send_raw(data_bytes) print(f"Response: {' '.join(f'{byte:02X}' for byte in resp)}") @@ -1950,19 +1956,19 @@ def on_exec(self, args: argparse.Namespace): resp = self.cmd.hf14a_raw( options=options, resp_timeout_ms=1000, - data=[MifareCommand.MfWriteBlock, block], + data=[MifareCommand.MfWriteBlock, 0], ) + print(f"Writing block 0: {block0.hex().upper()}") options["keep_rf_field"] = 0 resp = self.cmd.hf14a_raw( options=options, resp_timeout_ms=1000, - data=bytes.fromhex(block_data), + data=block0, ) - if resp and len(resp) > 0 and resp[0] == 0x00: - print(f"Write block {block}: {CG}Success{C0}") + if resp.length > 0 and resp[0] == 0x00: + print(f"Write {block_data} to block {block}: {CG}Success{C0}") else: - print(f"Write block {block}: {CR}Failed{C0}") - + print(f"Write failed on block {block}") elif gen == 2: # Gen2 for block, block_data in dump_data.items(): resp = self.cmd.mf1_write_block( @@ -2126,49 +2132,8 @@ def on_exec(self, args: argparse.Namespace): self.cmd.hf_mf_load(dump_map, args.slot) -@hf_mf.command("eRead") -class HfMfEread(DeviceRequiredUnit): - def args_parser(self) -> ArgumentParserNoExit: - parser = ArgumentParserNoExit() - parser.description = "Get Mifare Classic dump from PN532Killer Slot" - parser.add_argument( - "-s", "--slot", default=1, type=int, help="Emulator slot(1-8)" - ) - parser.add_argument("--file", action="store_true", help="Save to json file") - parser.add_argument("--bin", action="store_true", help="Save to bin file") - return parser - - def on_exec(self, args: argparse.Namespace): - self.device_com.set_work_mode(PN532KillerMode.EMULATOR, PN532KillerTagType.MFC, args.slot - 1) - dump_map = self.cmd.hf_mf_eread(args.slot) - # {"0": "11223344556677889900AABBCCDDEEFF", "1": "11223344556677889900AABBCCDDEEFF", ...} - if not dump_map: - print("Get dump failed") - return - file_name = "mf_dump_{args.slot}" - file_index = 0 - if args.file: - while True: - if os.path.exists(f"{file_name}_{file_index}.json"): - file_index += 1 - else: - file_name = f"{file_name}_{file_index}.json" - break - with open(file_name, "w") as json_file: - json.dump({"blocks": dump_map}, json_file) - if args.bin: - while True: - if os.path.exists(f"{file_name}_{file_index}.bin"): - file_index += 1 - else: - file_name = f"{file_name}_{file_index}.bin" - break - with open(file_name, "wb") as bin_file: - for block_index, block_data in dump_map.items(): - bin_file.write(bytes.fromhex(block_data)) - @hf_mfu.command("rdbl") -class HfMfRdbl(DeviceRequiredUnit): +class HfMfuRdbl(DeviceRequiredUnit): def args_parser(self) -> ArgumentParserNoExit: parser = ArgumentParserNoExit() parser.description = "Read Mifare Ultralight block" @@ -2205,7 +2170,7 @@ def on_exec(self, args: argparse.Namespace): print(f"Block {block} Failed to read") @hf_mfu.command("wrbl") -class HfMfWrbl(DeviceRequiredUnit): +class HfMfuWrbl(DeviceRequiredUnit): def args_parser(self) -> ArgumentParserNoExit: parser = ArgumentParserNoExit() parser.description = "Write Mifare Ultralight block" @@ -2420,272 +2385,4 @@ def on_exec(self, args: argparse.Namespace): self.cmd.mf0_write_one_block(i, pages[i]) print(f"Updated UID: {CG}{uid.hex().upper()}{C0}") else: - print("Failed to read original Block0") - -@hf_mfu.command("eRead") -class HfMfuEread(DeviceRequiredUnit): - def args_parser(self) -> ArgumentParserNoExit: - parser = ArgumentParserNoExit() - parser.description = "Get Mifare Ultralight dump from PN532Killer Slot" - parser.add_argument( - "-s", "--slot", default=1, type=int, help="Emulator slot(1-8)" - ) - parser.add_argument("--file", action="store_true", help="Save to json file") - parser.add_argument("--bin", action="store_true", help="Save to bin file") - return parser - - def on_exec(self, args: argparse.Namespace): - self.device_com.set_work_mode(PN532KillerMode.EMULATOR, PN532KillerTagType.MFU, args.slot - 1) - dump_map = self.cmd.hf_mfu_eread(args.slot) - # {"0": "11223344", "1": "55667788", ...} - if not dump_map: - print("Get dump failed") - return - file_name = f"mfu_dump_{args.slot}" - file_index = 0 - if args.file: - while True: - if os.path.exists(f"{file_name}_{file_index}.json"): - file_index += 1 - else: - file_name = f"{file_name}_{file_index}.json" - break - # Convert bytes to hex strings for JSON serialization - json_dump_map = {} - for page_index, page_data in dump_map.items(): - json_dump_map[str(page_index)] = page_data.hex().upper() - with open(file_name, "w") as json_file: - json.dump({"pages": json_dump_map}, json_file) - if args.bin: - file_name_bin = f"mfu_dump_{args.slot}" - file_index = 0 - while True: - if os.path.exists(f"{file_name_bin}_{file_index}.bin"): - file_index += 1 - else: - file_name_bin = f"{file_name_bin}_{file_index}.bin" - break - with open(file_name_bin, "wb") as bin_file: - for page_index, page_data in dump_map.items(): - bin_file.write(page_data) - -@lf.command("scan") -class LfScan(DeviceRequiredUnit): - def args_parser(self) -> ArgumentParserNoExit: - parser = ArgumentParserNoExit() - parser.description = "Scan LF tag, and print basic information" - return parser - - def on_exec(self, args: argparse.Namespace): - resp = self.cmd.lf_scan() - if resp is not None: - for data_tag in resp: - if "dec" in data_tag: - print(f"- ID : {data_tag['id'].upper()}") - print(f" DEC : {data_tag['dec']}") - else: - print("LF Tag no found") - - -@lf_em_410x.command("eSetid") -class LfEm410xESetId(DeviceRequiredUnit): - def args_parser(self) -> ArgumentParserNoExit: - parser = ArgumentParserNoExit() - parser.description = "Set ID of EM410x Emulation" - parser.add_argument( - "-i", - type=str, - metavar="", - required=False, - help="ID to set (10 bytes)", - ) - parser.add_argument( - "-s", "--slot", default=1, type=int, help="Emulator slot(1-8)" - ) - return parser - - def on_exec(self, args: argparse.Namespace): - if args.i is None: - print("usage: lf em410x esetid [-h] -i [-s SLOT]") - print("lf em410x esetid: error: the following arguments are required: -i") - return - id = args.i - if not re.match(r"^[a-fA-F0-9]{20}$", id): - print("ID must be 10 bytes hex") - return - resp = self.cmd.lf_em4100_eset_id(args.slot - 1, bytes.fromhex(id)) - print( - f"Set Slot {args.slot} ID to {id} {CY}{'Success' if resp else 'Fail'}{C0}" - ) - - -@ntag.command("emulate") -class NtagEmulate(DeviceRequiredUnit): - def args_parser(self) -> ArgumentParserNoExit: - parser = ArgumentParserNoExit() - parser.description = "Start NTAG emulating" - parser.add_argument( - "--uri", - type=str, - required=False, - help="URI to emulate", - default="https://pn532killer.com", - ) - parser.epilog = ( - parser.epilog - ) = """ -examples: - ntag emulate --uri https://pn532killer.com -""" - - return parser - - def on_exec(self, args: argparse.Namespace): - self.device_com.set_normal_mode() - self.cmd.ntag_emulator(url=args.uri) - -@ntag.command("reader") -class NtagReader(DeviceRequiredUnit): - def args_parser(self) -> ArgumentParserNoExit: - parser = ArgumentParserNoExit() - parser.description = "Read NTAG data and open URI in browser" - return parser - - def on_exec(self, args: argparse.Namespace): - self.cmd.ntag_reader() - # Read dump file - dump_data = {} - if args.f.endswith('.mfd'): - with open(args.f, 'r') as f: - for line in f: - if ':' in line: - block_num, data = line.strip().split(':') - dump_data[int(block_num)] = data.strip() - elif args.f.endswith('.bin'): - with open(args.f, 'rb') as f: - data = f.read() - if len(data) != 1024: # 1KB - print(f"{CR}Error: Bin file must be 1KB{C0}") - return - for i in range(64): # 64 blocks - dump_data[i] = data[i*16:(i+1)*16].hex() - else: - print(f"{CR}Error: Unsupported file format{C0}") - return - - resp = self.cmd.hf_14a_scan() - if resp is None: - print("No tag found") - return - - self.sak_info(resp[0]) - uid = resp[0]["uid"] - key = bytes.fromhex(args.k) - - # Write tag depending on generation - gen = args.g - if gen == 1: # Gen1A - if not self.cmd.isGen1a(): - print(f"{CR}Tag is not Gen1A{C0}") - return - print("Found Gen1A:", f"{uid.hex().upper()}") - for block, block_data in dump_data.items(): - options = { - "activate_rf_field": 0, - "wait_response": 1, - "append_crc": 1, - "auto_select": 0, - "keep_rf_field": 1, - "check_response_crc": 0, - } - resp = self.cmd.hf14a_raw( - options=options, - resp_timeout_ms=1000, - data=[MifareCommand.MfWriteBlock, block], - ) - options["keep_rf_field"] = 0 - resp = self.cmd.hf14a_raw( - options=options, - resp_timeout_ms=1000, - data=bytes.fromhex(block_data), - ) - if resp and len(resp) > 0 and resp[0] == 0x00: - print(f"Write block {block}: {CG}Success{C0}") - else: - print(f"Write block {block}: {CR}Failed{C0}") - - elif gen == 2: # Gen2 - for block, block_data in dump_data.items(): - resp = self.cmd.mf1_write_block( - uid, - block, - key, - bytes.fromhex(block_data), - ) - if resp: - print(f"Write block {block}: {CG}Success{C0}") - else: - print(f"Write block {block}: {CR}Failed{C0}") - - elif gen == 3: # Gen3 - if not self.cmd.isGen3(): - print(f"{CR}Tag is not Gen3{C0}") - return - print("Found Gen3 Tag") - # Set UI - resp1 = self.cmd.setGen3Uid(uid) - print(f"Set UID to {uid.hex().upper()}: {CG}Success{C0}" if resp1 else f"Set UID to {uid.hex().upper()}: {CR}Failed{C0}") - # Set block0 - resp2 = self.cmd.setGen3Block0(bytes.fromhex(dump_data[0])) - print(f"Set block0: {CG}Success{C0}" if resp2 else f"Set block0: {CR}Failed{C0}") - # Write other blocks - for block, block_data in dump_data.items(): - if block == 0: - continue - resp = self.cmd.mf1_write_block( - uid, - block, - key, - bytes.fromhex(block_data), - ) - if resp: - print(f"Write block {block}: {CG}Success{C0}") - else: - print(f"Write block {block}: {CR}Failed{C0}") - - elif gen == 4: # Gen4 - if not self.cmd.isGen4(args.p): - print(f"{CR}Tag is not Gen4 or wrong password{C0}") - return - print("Found Gen4:", f"{uid.hex().upper()}") - for block, block_data in dump_data.items(): - options = { - "activate_rf_field": 0, - "wait_response": 1, - "append_crc": 1, - "auto_select": 0, - "keep_rf_field": 1, - "check_response_crc": 0, - } - resp = self.cmd.hf14a_raw( - options=options, - resp_timeout_ms=1000, - data=bytes.fromhex(f"CF{args.p}CD{block:02x}{block_data}"), - ) - if resp and len(resp) > 0 and resp[0] == 0x00: - print(f"Write block {block}: {CG}Success{C0}") - else: - print(f"Write block {block}: {CR}Failed{C0}") - - else: # Normal card - for block, block_data in dump_data.items(): - resp = self.cmd.mf1_write_block( - uid, - block, - key, - bytes.fromhex(block_data), - ) - if resp: - print(f"Write block {block}: {CG}Success{C0}") - else: - print(f"Write block {block}: {CR}Failed{C0}") \ No newline at end of file + print("Failed to read original Block0") \ No newline at end of file diff --git a/script/pn532_cmd.py b/script/pn532_cmd.py index be69f0b..2484baa 100644 --- a/script/pn532_cmd.py +++ b/script/pn532_cmd.py @@ -48,13 +48,15 @@ def hf_14a_scan(self): :return: """ - resp = self.device.send_cmd_sync(Command.InListPassiveTarget, b"\x01\x00") - # print("response status = ", resp.status) + # 根据连接类型调整一次性命令等待时间(不重发,只是给 UDP/TCP 更宽松的等待) + timeout = 1 + if hasattr(self.device, 'connection_type') and self.device.connection_type in ('udp', 'tcp'): + timeout = 2 # 给网络模式多一点时间 + resp = self.device.send_cmd_sync(Command.InListPassiveTarget, b"\x01\x00", timeout=timeout) if resp.status == Status.SUCCESS: if len(resp.data) < 2: resp.parsed = None return resp - # tagType[1]tagNum[1]atqa[2]sak[1]uidlen[1]uid[uidlen] offset = 0 data = [] while offset < len(resp.data): @@ -105,14 +107,16 @@ def hfmf_cview(self): self.device.set_normal_mode() tag_info = {} - resp = self.hf_14a_scan() - if resp == None: + scan_result = self.hf_14a_scan() # 装饰器返回 parsed 数据,不是 Response 对象 + + if scan_result == None or len(scan_result) == 0: print("No tag found") - return resp - # print("Tag found", resp) - tag_info["uid"] = resp[0]["uid"].hex() - tag_info["atqa"] = resp[0]["atqa"].hex() - tag_info["sak"] = resp[0]["sak"].hex() + return Response(Command.InListPassiveTarget, Status.HF_TAG_NO) + + # scan_result 直接是标签信息列表,不需要 .parsed + tag_info["uid"] = scan_result[0]["uid"].hex() + tag_info["atqa"] = scan_result[0]["atqa"].hex() + tag_info["sak"] = scan_result[0]["sak"].hex() tag_info["data"] = [] try: if not self.isGen1a(): @@ -132,24 +136,24 @@ def hfmf_cview(self): while block < 64: if block == 63: options["keep_rf_field"] = 0 - resp = self.hf14a_raw( + resp_data = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[MifareCommand.MfReadBlock, block], - ) - if len(resp) > 16: - resp = resp[:16] - block_data[f"{block}"] = resp.hex() + ) # 装饰器直接返回 parsed 数据(字节) + if len(resp_data) > 16: + resp_data = resp_data[:16] + block_data[f"{block}"] = resp_data.hex() if block == 0: print( - f"{block:02d}: {CY}{resp.hex()[0:8].upper()}{CR}{resp.hex()[8:10].upper()}{CG}{resp.hex()[10:12].upper()}{CY}{resp.hex()[12:16].upper()}{C0}{resp.hex()[16:].upper()}{C0}" + f"{block:02d}: {CY}{resp_data.hex()[0:8].upper()}{CR}{resp_data.hex()[8:10].upper()}{CG}{resp_data.hex()[10:12].upper()}{CY}{resp_data.hex()[12:16].upper()}{C0}{resp_data.hex()[16:].upper()}{C0}" ) elif block % 4 == 3: print( - f"{block:02d}: {CG}{resp.hex()[0:12].upper()}{CR}{resp.hex()[12:20].upper()}{CG}{resp.hex()[20:].upper()}{C0}" + f"{block:02d}: {CG}{resp_data.hex()[0:12].upper()}{CR}{resp_data.hex()[12:20].upper()}{CG}{resp_data.hex()[20:].upper()}{C0}" ) else: - print(f"{block:02d}: {resp.hex().upper()}") + print(f"{block:02d}: {resp_data.hex().upper()}") block += 1 tag_info["blocks"] = block_data except Exception as e: @@ -238,34 +242,35 @@ def isGen1a(self): "check_response_crc": 0, } # Unlock 1 - resp = self.hf14a_raw( + resp_data = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[0x40], bitlen=7 - ) + ) # 装饰器返回字节数据 if DEBUG: - print("unlock 1:", resp.hex()) - if resp[-1] == 0x0A: + print("unlock 1:", resp_data.hex()) + if resp_data[-1] == 0x0A: options["activate_rf_field"] = 0 # Unlock 2 - resp = self.hf14a_raw(options=options, resp_timeout_ms=1000, data=[0x43]) + resp_data = self.hf14a_raw(options=options, resp_timeout_ms=1000, data=[0x43]) if DEBUG: - print("unlock 2:", resp.hex()) - if resp[-1] == 0x0A: + print("unlock 2:", resp_data.hex()) + if resp_data[-1] == 0x0A: return True return False def selectTag(self): tag_info = {} - resp = self.hf_14a_scan() + scan_result = self.hf_14a_scan() # 装饰器返回 parsed 数据,不是 Response 对象 self.device.halt() - if resp == None: + if scan_result == None or len(scan_result) == 0: print("No tag found") - return resp - tag_info["uid"] = resp[0]["uid"].hex() - uid_length = len(resp[0]["uid"]) + return Response(Command.InListPassiveTarget, Status.HF_TAG_NO) + + tag_info["uid"] = scan_result[0]["uid"].hex() + uid_length = len(scan_result[0]["uid"]) if DEBUG: print("Found UID:", tag_info["uid"]) - tag_info["atqa"] = resp[0]["atqa"].hex() - tag_info["sak"] = resp[0]["sak"].hex() + tag_info["atqa"] = scan_result[0]["atqa"].hex() + tag_info["sak"] = scan_result[0]["sak"].hex() tag_info["data"] = [] options = { "activate_rf_field": 0, @@ -279,7 +284,7 @@ def selectTag(self): options=options, resp_timeout_ms=1000, data=[0x52], bitlen=7 ) if DEBUG: - print("WUPA:", wupa_result.hex()) + print("WUPA:", wupa_result.parsed.hex()) anti_coll_result = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[0x93, 0x20] ) @@ -358,7 +363,7 @@ def setGen3Uid(self, uid: bytes): resp = self.hf14a_raw( options=options, resp_timeout_ms=2000, data=bytes.fromhex(command) ) - if resp[0] == 0x00: + if resp.parsed[0] == 0x00: return True return False @@ -375,7 +380,7 @@ def setGen3Block0(self, block0: bytes): resp = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=bytes.fromhex(command) ) - if resp[0] == 0x00: + if resp.parsed[0] == 0x00: return True return False @@ -392,7 +397,7 @@ def lockGen3Uid(self): resp = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=bytes.fromhex(command) ) - if resp[0] == 0x00: + if resp.parsed[0] == 0x00: return True return False @@ -410,8 +415,8 @@ def isGen4(self, pwd="00000000"): options=options, resp_timeout_ms=1000, data=bytes.fromhex(command) ) if DEBUG: - print("isGen4:", resp.hex()) - if len(resp) >= 30: + print("isGen4:", resp.parsed.hex()) + if len(resp.parsed) >= 30: return True return False @@ -433,7 +438,7 @@ def mf1_read_block(self, block, key): if resp == None: print("No tag found") return resp - uidID1 = bytes(resp[0]["uid"]) + uidID1 = bytes(resp.parsed[0]["uid"]) auth_result = self.mf1_auth_one_key_block(block, MfcKeyType.A, key, uidID1) if not auth_result: self.mf1_authenticated_useKeyA = False @@ -460,7 +465,7 @@ def mf1_read_one_block(self, block, type_value: MfcKeyType, key): if resp == None: print("No tag found") return resp - uidID1 = bytes(resp[0]["uid"]) + uidID1 = bytes(resp.parsed[0]["uid"]) auth_result = self.mf1_auth_one_key_block( block, type_value, key, uidID1 ) @@ -847,14 +852,14 @@ def hf_mf_eread(self, slot): resp = self.download_data_block(type = 1, slot = slot, index = block) if block == 0: print( - f"block {block:02d}: {CY}{resp.hex()[0:8].upper()}{CR}{resp.hex()[8:10].upper()}{CG}{resp.hex()[10:14].upper()}{C0}{resp.hex()[14:].upper()}{C0}" + f"block {block:02d}: {CY}{resp.parsed.hex()[0:8].upper()}{CR}{resp.parsed.hex()[8:10].upper()}{CG}{resp.parsed.hex()[10:14].upper()}{C0}{resp.parsed.hex()[14:].upper()}{C0}" ) elif block % 4 == 3: print( - f"block {block:02d}: {CG}{resp.hex()[0:12].upper()}{CR}{resp.hex()[12:20].upper()}{CG}{resp.hex()[20:].upper()}{C0}" + f"block {block:02d}: {CG}{resp.parsed.hex()[0:12].upper()}{CR}{resp.parsed.hex()[12:20].upper()}{CG}{resp.parsed.hex()[20:].upper()}{C0}" ) else: - print(f"block {block:02d}: {resp.hex().upper()}") + print(f"block {block:02d}: {resp.parsed.hex().upper()}") mifare_dump[block] = resp return mifare_dump @@ -874,21 +879,21 @@ def hf_mfu_eread(self, slot): for page in range(4): resp = self.download_data_block(type = 2, slot = slot, index = page) if page == 0: - print(f"page {page:02d}: {CY}{resp.hex()[0:6].upper()}{CR}{resp.hex()[6:8].upper()}{C0}") + print(f"page {page:02d}: {CY}{resp.parsed.hex()[0:6].upper()}{CR}{resp.parsed.hex()[6:8].upper()}{C0}") elif page == 1: - print(f"page {page:02d}: {CY}{resp.hex().upper()}{C0}") + print(f"page {page:02d}: {CY}{resp.parsed.hex().upper()}{C0}") elif page == 2: - print(f"page {page:02d}: {CR}{resp.hex()[0:2].upper()}{CG}{resp.hex()[2:].upper()}{C0}") + print(f"page {page:02d}: {CR}{resp.parsed.hex()[0:2].upper()}{CG}{resp.parsed.hex()[2:].upper()}{C0}") elif page == 3: - print(f"page {page:02d}: {CG}{resp.hex().upper()}{C0}") + print(f"page {page:02d}: {CG}{resp.parsed.hex().upper()}{C0}") # page 3 , index 2 is the max page indicator - if len(resp) >= 3: - max_page = resp[2] * 2 + 12 + if len(resp.parsed) >= 3: + max_page = resp.parsed[2] * 2 + 12 mfu_dump[page] = resp # Read remaining pages for page in range(4, max_page): resp = self.download_data_block(type = 2, slot = slot, index = page) - print(f"page {page:02d}: {resp.hex().upper()}") + print(f"page {page:02d}: {resp.parsed.hex().upper()}") mfu_dump[page] = resp return mfu_dump @@ -1263,7 +1268,7 @@ def test_fn(): if "PN532Killer" in port.description: dev.open(port.device) break - print(f"Connected to {dev.serial_instance.port}") + print(f"Connected to {dev.get_connection_info()}") print(f"Device: {dev.device_name}") cml = Pn532CMD(dev) @@ -1283,7 +1288,7 @@ def test_fn(): resp_timeout_ms=1000, data= bytes.fromhex("cf00000000ce00"), ) - print("hf14a_raw:", resp.hex().upper()) + print("hf14a_raw:", resp.parsed.hex().upper()) except Exception as e: print("Error:", e) dev.close() diff --git a/script/pn532_com.py b/script/pn532_com.py index 28296c6..29adb0a 100644 --- a/script/pn532_com.py +++ b/script/pn532_com.py @@ -44,6 +44,8 @@ def __init__(self): self.send_data_queue = queue.Queue() self.wait_response_map = {} self.event_closing = threading.Event() + self.port_string = None # store original open string + self.ignore_late_cmd_until = {} # 记录某命令在这个时间点前的迟到帧直接丢弃 device_name = "Unknown" data_max_length = 0xFF @@ -56,41 +58,42 @@ def get_device_name(self) -> str: def isOpen(self) -> bool: return self.communication is not None and self.communication.is_open() + + def get_connection_info(self) -> str: + """Get connection information""" + if self.communication is not None: + return self.communication.get_connection_info() + return "Not connected" def open(self, port) -> "Pn532Com": if not self.isOpen(): error = None try: - # 创建通信接口 self.communication = CommunicationFactory.create_communication(port) protocol_type, actual_address = CommunicationFactory.parse_address(port) - - # 打开连接 + self.connection_type = protocol_type # 记录连接类型 if not self.communication.open(actual_address): raise Exception(f"Failed to open {protocol_type} connection to {actual_address}") - print(f"Opened {protocol_type} connection to {actual_address}") + self.port_string = port except Exception as e: error = e finally: if error is not None: raise OpenFailException(error) - assert self.communication is not None - self.communication.set_timeout(THREAD_BLOCKING_TIMEOUT) - + self.communication.set_timeout(0.3 if hasattr(self, 'connection_type') and self.connection_type in ('udp','tcp') else THREAD_BLOCKING_TIMEOUT) # clear variable self.send_data_queue.queue.clear() self.wait_response_map.clear() # Start a sub thread to process data self.event_closing.clear() - threading.Thread(target=self.thread_data_receive).start() - threading.Thread(target=self.thread_data_transfer).start() - threading.Thread(target=self.thread_check_timeout).start() + threading.Thread(target=self.thread_data_receive, daemon=True).start() + threading.Thread(target=self.thread_data_transfer, daemon=True).start() + threading.Thread(target=self.thread_check_timeout, daemon=True).start() - time.sleep(0.5) self.set_normal_mode() - time.sleep(0.5) + time.sleep(0.01) is_pn532killer = self.is_pn532killer() if is_pn532killer: self.device_name = "PN532Killer" @@ -119,7 +122,6 @@ def close(self): def set_normal_mode(self) -> response: self.communication.write(bytes.fromhex("5500000000000000000000000000")) - time.sleep(0.1) response = self.send_cmd_sync(Command.SAMConfiguration, bytes.fromhex("01")) return response @@ -139,6 +141,32 @@ def send_raw(self, data: bytes) -> response: response = self.send_cmd_sync(cmd, data[1:]) return response.data + def send_raw_frame(self, frame: bytes, wait_response: bool = True): + # Send a full PN532 frame (already contains preamble/startcode/len/LCS/DCS/postamble or any wakeup sequence) + if not self.isOpen(): + raise NotOpenException("Device not open") + assert self.communication is not None + if DEBUG: + print(f'=> {CY}{frame.hex().upper()}{C0}') + self.communication.write(frame) + + if not wait_response: + return None + + # For non-command frames (like wakeup), don't wait for structured response + if not frame.startswith(b'\x00\x00\xFF'): + return None + + # Wait for response with simple timeout + import time + start_time = time.time() + while time.time() - start_time < 2.0: # 2 second timeout + time.sleep(0.01) + # Check if any response came in the normal flow + # This is a simple implementation - for debugging mostly + + return None + def reset_register(self) -> response: response = self.send_cmd_sync( Command.WriteRegister, [0x63, 0x02, 0x00, 0x63, 0x03, 0x00] @@ -189,211 +217,150 @@ def crc16A(self, data: bytes) -> bytes: return crc.to_bytes(2, byteorder="little") def thread_data_receive(self): + """Receiver thread: robust frame extraction with resync & ACK filtering""" data_buffer = bytearray() - data_position = 0 - data_cmd = 0x0000 - data_status = 0x0000 - data_length = 0x0000 - skip_pattern = bytearray.fromhex("0000ff00ff00") - skip_pattern_length = len(skip_pattern) - - def reset_frame_parsing(): - nonlocal data_position - data_position = 0 - - def clear_buffer(): - nonlocal data_buffer, data_position, data_length - data_buffer.clear() - data_position = 0 - data_length = 0x0000 - - def check_for_ack_frame(): - nonlocal data_buffer - if len(data_buffer) >= skip_pattern_length: - if data_buffer[:skip_pattern_length] == skip_pattern: - data_buffer = data_buffer[skip_pattern_length:] - return True - if data_buffer[-skip_pattern_length:] == skip_pattern: - data_buffer = data_buffer[:-skip_pattern_length] - return True - return False - + ACK = b"\x00\x00\xFF\x00\xFF\x00" while self.isOpen(): + # 连接状态检测 + if self.communication is not None and not self.communication.is_open(): + if not self.event_closing.is_set(): + print("Connection lost, closing device...") + self.close() + break try: assert self.communication is not None - data_bytes = self.communication.read(32) + chunk = self.communication.read(64) + # UDP 的多包读取已在 communication 层处理,这里去掉额外逻辑 except Exception as e: if not self.event_closing.is_set(): print(f"Communication Error {e}, thread for receiver exit.") self.close() break - - if len(data_bytes) > 0: - data_buffer.extend(data_bytes) - - while len(data_buffer) > 0: - if check_for_ack_frame(): - reset_frame_parsing() - continue - - if len(data_buffer) > 300: - clear_buffer() - break - - if data_position == 0 and len(data_buffer) < 1: - break - elif data_position == 1 and len(data_buffer) < 2: - break - elif data_position == 2 and len(data_buffer) < 3: - break - elif data_position == 3 and len(data_buffer) < 4: - break - elif data_position == 4 and len(data_buffer) < 5: - break - elif data_position >= 5 and len(data_buffer) < 6 + data_length: - break - - if data_position == 0: - if data_buffer[0] != self.data_preamble[0]: - preamble_pos = -1 - for i in range(len(data_buffer)): - if data_buffer[i] == self.data_preamble[0]: - preamble_pos = i - break - if preamble_pos > 0: - data_buffer = data_buffer[preamble_pos:] - data_position = 0 - continue - else: - clear_buffer() - break - elif data_position == 1: - if data_buffer[1] != self.data_start_code[0]: - clear_buffer() - break - elif data_position == 2: - if data_buffer[2] != self.data_start_code[1]: - if DEBUG: - print(f"Data frame start code error at position 2: {data_buffer[2]:02x}") - clear_buffer() - break - elif data_position == 3: - data_length = data_buffer[3] # Get the data length byte - elif data_position == 4: - # Check length checksum (LCS) - length_checksum = data_buffer[4] - if (data_length + length_checksum) & 0xFF != 0: - if DEBUG: - print(f"Data frame LCS error: len={data_length:02x} lcs={length_checksum:02x}") - clear_buffer() - break - # print("length checksum ok") - elif data_position == 5 + data_length: - # Check DCS (Data Checksum) - if data_buffer[5 + data_length] != self.dcs( - data_buffer[5 : 5 + data_length] - ): - if DEBUG: - print("Data frame DCS error.") - clear_buffer() - break - # print("data checksum ok") - elif data_position == 6 + data_length: - # Check POSTAMBLE - if data_buffer[6 + data_length] != 0x00: - if DEBUG: - print("Data frame POSTAMBLE error.") - clear_buffer() - break - # Process complete frame - data_response = bytes(data_buffer[5 : 5 + data_length]) - if len(data_response) == 0: - if DEBUG: - print("Data frame is empty.") - clear_buffer() - break - if data_response[0] != self.data_tfi_receive: - if DEBUG: - print("Data frame TFI error.") - clear_buffer() - break - - if len(data_response) < 2: - if DEBUG: - print("Data frame length error.") - clear_buffer() - break - # get cmd - data_cmd = data_response[1] - 1 - if DEBUG: - print(f"Parsed command: {data_cmd}, waiting for: {list(self.wait_response_map.keys())}") - if data_cmd in self.wait_response_map: - if DEBUG: - print(f"<= {CY}{data_buffer[:7+data_length].hex().upper()}{C0}") - # update wait_response_map - response = Response(data_cmd, Status.SUCCESS, data_response[2:]) - if ( - data_cmd == Command.InCommunicateThru or data_cmd == Command.InDataExchange - and len(data_response) > 2 - ): - response = Response( - data_cmd, data_response[2], data_response[2:] - ) - if data_response[2] == 0 and len(data_response) > 16: - response = Response( - data_cmd, - data_response[2], - data_response[3: 3 + data_length - 3], # 修复数据长度计算 - ) - self.wait_response_map[data_cmd]["response"] = response - fn_call = self.wait_response_map[data_cmd].get("callback") - if callable(fn_call): - print("run callback") - del self.wait_response_map[data_cmd] - fn_call(data_cmd, data_status, data_response) - else: - if DEBUG: - print(f"No task waiting for process: {data_cmd}") - pass - clear_buffer() - break - - data_position += 1 - + if not chunk: + continue + if DEBUG: + print(f"READ {chunk.hex().upper()}") + data_buffer.extend(chunk) + # 过滤所有 ACK + changed = True + while changed: + changed = False + pos = data_buffer.find(ACK) + if pos != -1: + if DEBUG: + print(f"SKIP ACK at {pos}") + del data_buffer[pos:pos+len(ACK)] + changed = True + # 尝试解析帧 + i = 0 + while i <= len(data_buffer) - 7: # 最小帧长度 + # 寻找前导 00 00 FF + if not (data_buffer[i] == 0x00 and data_buffer[i+1] == 0x00 and data_buffer[i+2] == 0xFF): + i += 1 + continue + if i + 5 > len(data_buffer): + break # 不足以读取 LEN/LCS + length = data_buffer[i+3] + lcs = data_buffer[i+4] + if ((length + lcs) & 0xFF) != 0: + if DEBUG: + print(f"LEN/LCS mismatch at {i}: LEN={length:02X} LCS={lcs:02X}") + i += 1 + continue + frame_end = i + 5 + length + 2 # +DCS +POSTAMBLE + if frame_end > len(data_buffer): + break # 等待更多数据 + if data_buffer[frame_end-1] != 0x00: + if DEBUG: + print(f"POSTAMBLE error at {i}") + i += 1 + continue + data = bytes(data_buffer[i+5:i+5+length]) + dcs = data_buffer[i+5+length] + if self.dcs(bytearray(data)) != dcs: + if DEBUG: + print(f"DCS error at {i}: expect {self.dcs(bytearray(data)):02X} got {dcs:02X}") + i += 1 + continue + if not data: + i = frame_end + continue + tfi = data[0] + if tfi != self.data_tfi_receive: + if DEBUG: + print(f"Unexpected TFI {tfi:02X} (expect {self.data_tfi_receive:02X}), resync") + i += 1 + continue + if len(data) < 2: + i = frame_end + continue + cmd_resp = data[1] - 1 # 原始命令 + if cmd_resp in getattr(self, 'ignore_late_cmd_until', {}) and time.time() < self.ignore_late_cmd_until[cmd_resp]: + if DEBUG: + print(f"Discard late frame for CMD=0x{cmd_resp:02X}") + i = frame_end + continue + else: + if cmd_resp in getattr(self, 'ignore_late_cmd_until', {}) and time.time() >= self.ignore_late_cmd_until[cmd_resp]: + del self.ignore_late_cmd_until[cmd_resp] + if DEBUG: + print(f"<= {CY}{data_buffer[i:frame_end-1].hex().upper()}{C0}") + if cmd_resp in self.wait_response_map: + payload = data[2:] + response = Response(cmd_resp, Status.SUCCESS, payload) + if (cmd_resp == Command.InCommunicateThru or cmd_resp == Command.InDataExchange) and len(data) > 2: + status_byte = data[2] + response = Response(cmd_resp, status_byte, data[2:]) + if status_byte == 0 and len(data) > 3: + response = Response(cmd_resp, status_byte, data[3:]) + self.wait_response_map[cmd_resp]["response"] = response + fn_call = self.wait_response_map[cmd_resp].get("callback") + if callable(fn_call): + del self.wait_response_map[cmd_resp] + try: + fn_call(cmd_resp, 0, data) + except Exception as e: + print(f"Callback error: {e}") + else: + if DEBUG: + print(f"No waiter for CMD=0x{cmd_resp:02X}, pending keys={[hex(k) for k in self.wait_response_map.keys()]}") + i = frame_end + if i > 0: + del data_buffer[:i] def thread_data_transfer(self): while self.isOpen(): - # get a task from queue(if exists) try: - task = self.send_data_queue.get( - block=True, timeout=THREAD_BLOCKING_TIMEOUT - ) + task = self.send_data_queue.get(block=True, timeout=THREAD_BLOCKING_TIMEOUT) except queue.Empty: continue task_cmd = task["cmd"] task_timeout = task["timeout"] task_close = task["close"] - # print("thread_data_transfer", task) - if "callback" in task and callable(task["callback"]): - self.wait_response_map[task_cmd] = { - "callback": task["callback"] - } # The callback for this task - else: + # 如果是预注册,占位里补齐时间字段;否则(极少出现)创建新项 + if task_cmd not in self.wait_response_map: + # 不应发生(预注册保证存在),但兜底 self.wait_response_map[task_cmd] = {"response": None} - # set start time + if '_pre_registered' in self.wait_response_map[task_cmd]: + del self.wait_response_map[task_cmd]['_pre_registered'] + if 'callback' in task and callable(task['callback']): + self.wait_response_map[task_cmd]['callback'] = task['callback'] start_time = time.time() self.wait_response_map[task_cmd]["start_time"] = start_time self.wait_response_map[task_cmd]["end_time"] = start_time + task_timeout self.wait_response_map[task_cmd]["is_timeout"] = False try: assert self.communication is not None + if not self.communication.is_open(): + print("Connection lost during data transfer, closing device...") + self.close() + break if DEBUG: - print(f'=> {CY}{task["frame"].hex().upper()}{C0}') + print(f"=> {CY}{task['frame'].hex().upper()}{C0}") self.communication.write(task["frame"]) except Exception as e: print(f"Communication Error {e}, thread for transfer exit.") self.close() break - # update queue status self.send_data_queue.task_done() # disconnect if DFU command has been sent @@ -402,16 +369,13 @@ def thread_data_transfer(self): def thread_check_timeout(self): while self.isOpen(): - for task_cmd in self.wait_response_map.keys(): - if time.time() > self.wait_response_map[task_cmd]["end_time"]: - if "callback" in self.wait_response_map[task_cmd]: - # not sync, call function to notify timeout. - self.wait_response_map[task_cmd]["callback"]( - task_cmd, None, None - ) + for task_cmd in list(self.wait_response_map.keys()): # 使用 list 避免迭代期间修改 + task_data = self.wait_response_map.get(task_cmd, {}) + if "end_time" in task_data and time.time() > task_data["end_time"]: + if "callback" in task_data: + task_data["callback"](task_cmd, None, None) else: - # sync mode, set timeout flag - self.wait_response_map[task_cmd]["is_timeout"] = True + task_data["is_timeout"] = True time.sleep(THREAD_BLOCKING_TIMEOUT) def make_data_frame_bytes( @@ -457,14 +421,29 @@ def send_cmd_auto( :return: """ self.check_open() - # delete old task if cmd in self.wait_response_map: + if DEBUG: + print(f"Replace pending task CMD=0x{cmd:02X}") del self.wait_response_map[cmd] - # make data frame + if hasattr(self, 'connection_type') and self.connection_type in ('tcp', 'udp'): + if timeout < 2: + timeout = 2 data_frame = self.make_data_frame_bytes(cmd, data, status) + # 预注册占位,防止响应极快到达时无 waiter + self.wait_response_map[cmd] = self.wait_response_map.get(cmd, {}) + if callable(callback): + self.wait_response_map[cmd]['callback'] = callback + if 'response' not in self.wait_response_map[cmd]: + self.wait_response_map[cmd]['response'] = None + self.wait_response_map[cmd]['_pre_registered'] = True + self.wait_response_map[cmd]['_timeout_value'] = timeout + if DEBUG: + print(f"PRE-REG CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={(data.hex().upper() if data else '')}") task = {"cmd": cmd, "frame": data_frame, "timeout": timeout, "close": close} if callable(callback): task["callback"] = callback + if DEBUG: + print(f"QUEUE CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={(data.hex().upper() if data else '')}") self.send_data_queue.put(task) def send_cmd_sync( @@ -473,39 +452,73 @@ def send_cmd_sync( data: Union[bytes, None] = None, status: int = 0, timeout: int = 2, + retries: int = 0, # 保留参数以兼容,已不再自动重发 ) -> response: - if len(self.commands): - # check if PN532 can understand this command - if cmd not in self.commands: - raise CMDInvalidException( - f"This device doesn't declare that it can support this command: {cmd}.\n" - f"Make sure firmware is up to date and matches client" - ) - # first to send cmd, no callback mode(sync) - self.send_cmd_auto(cmd, data, status, None, timeout) - # wait cmd start process + """ + 发送命令并同步等待响应(不再自动重发)。 + :param cmd: 命令码 + :param data: 数据 + :param status: 状态 + :param timeout: 超时时间(秒) + :param retries: 已废弃,保留做兼容(不再使用) + """ + # 校验支持 + if len(self.commands) and cmd not in self.commands: + raise CMDInvalidException( + f"This device doesn't declare that it can support this command: {cmd}.\n" + f"Make sure firmware is up to date and matches client" + ) + network_mode = hasattr(self, 'connection_type') and self.connection_type in ('udp','tcp') + # 对扫描类命令直接提升任务自身 timeout (避免 timeout 线程过早置 is_timeout) + effective_timeout = timeout + if network_mode and cmd == Command.InListPassiveTarget and effective_timeout < 3: + effective_timeout = 3 # 给扫描至少 3s + # 其它命令给少量额外宽限(等待循环内部使用,不影响 timeout 线程判定) + wait_margin = 0.5 if not network_mode else 0.7 + self.send_cmd_auto(cmd, data, status, None, effective_timeout) + start_wait = time.time() + # 等待任务注册 while cmd not in self.wait_response_map: + if time.time() - start_wait > effective_timeout + wait_margin: + self.ignore_late_cmd_until[cmd] = time.time() + 0.3 + return Response(cmd, Status.TimeoutError) time.sleep(0.01) - # wait response data set + # 等待响应 while self.wait_response_map[cmd]["response"] is None: - if ( - "is_timeout" in self.wait_response_map[cmd] - and self.wait_response_map[cmd]["is_timeout"] - ): - # raise TimeoutError(f"CMD {cmd} exec timeout") - # print(f"CMD {cmd} exec timeout") - self.wait_response_map[cmd]["is_timeout"] = True - self.wait_response_map[cmd]["response"] = Response( - cmd, Status.TimeoutError - ) + if ("is_timeout" in self.wait_response_map[cmd] and self.wait_response_map[cmd]["is_timeout"]): + self.ignore_late_cmd_until[cmd] = time.time() + 0.3 + self.wait_response_map[cmd]["response"] = Response(cmd, Status.TimeoutError) + break + if time.time() - start_wait > effective_timeout + wait_margin: + self.ignore_late_cmd_until[cmd] = time.time() + 0.3 + self.wait_response_map[cmd]["response"] = Response(cmd, Status.TimeoutError) break time.sleep(0.01) - # ok, data received. - data_response = self.wait_response_map[cmd]["response"] + # 对 0x4A (InListPassiveTarget) 若收到空数据,继续等待直到超时或出现非空数据(不重发) + if cmd == Command.InListPassiveTarget: + while True: + resp_tmp = self.wait_response_map[cmd]["response"] + if resp_tmp is None: + # 理论上不会出现,但兜底 + if time.time() - start_wait > effective_timeout + wait_margin: + break + time.sleep(0.01) + continue + # 如果已经超时 / 非成功就直接退出 + if resp_tmp.status != Status.SUCCESS: + break + # 数据长度>=2 认为有效 + if len(resp_tmp.data) >= 2: + break + # 还未到时间,等待可能的覆盖(接收线程会覆盖 response) + if time.time() - start_wait > effective_timeout + wait_margin: + break + time.sleep(0.02) + resp = self.wait_response_map[cmd]["response"] del self.wait_response_map[cmd] - if data_response.status == Status.INVALID_CMD: + if resp.status == Status.INVALID_CMD: raise CMDInvalidException(f"Device unsupported cmd: {cmd}") - return data_response + return resp class Response: diff --git a/script/pn532_communication.py b/script/pn532_communication.py index 9f8c3d2..4e73bbb 100644 --- a/script/pn532_communication.py +++ b/script/pn532_communication.py @@ -43,6 +43,11 @@ def read(self, size: int = 1) -> bytes: def set_timeout(self, timeout: float) -> None: """Set timeout""" pass + + @abstractmethod + def get_connection_info(self) -> str: + """Get connection information string""" + pass class SerialCommunication(CommunicationInterface): """Serial communication implementation""" @@ -81,6 +86,12 @@ def read(self, size: int = 1) -> bytes: def set_timeout(self, timeout: float) -> None: if self.serial_instance: self.serial_instance.timeout = timeout + + def get_connection_info(self) -> str: + """Get serial connection information""" + if self.serial_instance and self.serial_instance.is_open: + return f"Serial: {self.serial_instance.port}" + return "Serial: Not connected" class TCPCommunication(CommunicationInterface): """TCP communication implementation""" @@ -90,7 +101,19 @@ def __init__(self): self.timeout = 5.0 # Increase timeout to 5 seconds def is_open(self) -> bool: - return self.socket_instance is not None + """Check if TCP connection is still active""" + if self.socket_instance is None: + return False + + try: + # Try to get peer name to check if connection is still active + self.socket_instance.getpeername() + return True + except (OSError, socket.error): + # Connection is broken, cleanup + self.socket_instance.close() + self.socket_instance = None + return False def open(self, address: str) -> bool: try: @@ -116,6 +139,11 @@ def write(self, data: bytes) -> int: try: self.socket_instance.send(data) return len(data) + except (ConnectionResetError, BrokenPipeError, OSError) as e: + print(f"TCP connection lost during write: {e}") + self.socket_instance.close() + self.socket_instance = None + return 0 except Exception as e: print(f"TCP write failed: {e}") return 0 @@ -124,9 +152,21 @@ def write(self, data: bytes) -> int: def read(self, size: int = 1) -> bytes: if self.socket_instance: try: - return self.socket_instance.recv(size) + data = self.socket_instance.recv(size) + if len(data) == 0: + # Remote side closed the connection + print("TCP connection closed by remote") + self.socket_instance.close() + self.socket_instance = None + return b'' + return data except socket.timeout: return b'' + except (ConnectionResetError, OSError) as e: + print(f"TCP connection lost during read: {e}") + self.socket_instance.close() + self.socket_instance = None + return b'' except Exception as e: print(f"TCP read failed: {e}") return b'' @@ -136,6 +176,17 @@ def set_timeout(self, timeout: float) -> None: self.timeout = timeout if self.socket_instance: self.socket_instance.settimeout(timeout) + + def get_connection_info(self) -> str: + """Get TCP connection information""" + if self.socket_instance: + try: + local = self.socket_instance.getsockname() + remote = self.socket_instance.getpeername() + return f"TCP: {local[0]}:{local[1]} -> {remote[0]}:{remote[1]}" + except: + return "TCP: Connected but unable to get info" + return "TCP: Not connected" class UDPCommunication(CommunicationInterface): """UDP communication implementation""" @@ -144,9 +195,21 @@ def __init__(self): self.socket_instance: Optional[socket.socket] = None self.server_address: Optional[tuple] = None self.timeout = 1.0 + self.connection_failed_count = 0 # Track failed operations + self.max_failed_count = 3 # Max failures before considering disconnected def is_open(self) -> bool: - return self.socket_instance is not None + """Check if UDP connection is still usable""" + if self.socket_instance is None: + return False + + # For UDP, we consider it "open" if socket exists and failed count is below threshold + if self.connection_failed_count >= self.max_failed_count: + print("UDP connection considered failed after multiple errors") + self.close() + return False + + return True def open(self, address: str) -> bool: try: @@ -172,26 +235,58 @@ def close(self) -> None: self.socket_instance.close() self.socket_instance = None self.server_address = None + self.connection_failed_count = 0 # Reset failed count def write(self, data: bytes) -> int: if self.socket_instance and self.server_address: try: self.socket_instance.sendto(data, self.server_address) + self.connection_failed_count = 0 # Reset on successful operation return len(data) + except (OSError, socket.error) as e: + print(f"UDP write failed: {e}") + self.connection_failed_count += 1 + return 0 except Exception as e: print(f"UDP write failed: {e}") + self.connection_failed_count += 1 return 0 return 0 def read(self, size: int = 1) -> bytes: + # 对于 UDP, 使用较大缓冲防止帧被截断,并尝试读取多个datagram if self.socket_instance: try: - data, addr = self.socket_instance.recvfrom(size) + # 先读取第一个UDP包 + data, addr = self.socket_instance.recvfrom(512) + self.connection_failed_count = 0 # Reset on successful operation + + # 立即尝试读取更多UDP包(非阻塞),但要更谨慎 + original_timeout = self.socket_instance.gettimeout() + try: + self.socket_instance.settimeout(0.001) # 很短的超时,1ms + for _ in range(3): # 减少到最多3个包 + try: + more_data, _ = self.socket_instance.recvfrom(512) + if more_data: # 只有非空数据才添加 + data += more_data + except (socket.timeout, BlockingIOError, OSError): + break # 没有更多包了 + except Exception: + pass # 忽略所有异常 + finally: + self.socket_instance.settimeout(original_timeout) # 恢复原timeout + return data except socket.timeout: return b'' + except (OSError, socket.error) as e: + print(f"UDP read failed: {e}") + self.connection_failed_count += 1 + return b'' except Exception as e: print(f"UDP read failed: {e}") + self.connection_failed_count += 1 return b'' return b'' @@ -199,6 +294,13 @@ def set_timeout(self, timeout: float) -> None: self.timeout = timeout if self.socket_instance: self.socket_instance.settimeout(timeout) + + def get_connection_info(self) -> str: + """Get UDP connection information""" + if self.socket_instance and self.server_address: + local = self.socket_instance.getsockname() + return f"UDP: {local[0]}:{local[1]} -> {self.server_address[0]}:{self.server_address[1]}" + return "UDP: Not connected" class CommunicationFactory: """Communication interface factory class""" diff --git a/script/pn532_ntag_reader.py b/script/pn532_ntag_reader.py index 61fab13..32cff12 100644 --- a/script/pn532_ntag_reader.py +++ b/script/pn532_ntag_reader.py @@ -46,10 +46,10 @@ def test_fn(): dev.open(port.device) break - if dev.serial_instance is None: + if not dev.isOpen(): print("No PN532/PN532Killer found") return - print(f"Connected to {dev.serial_instance.port}, {dev.device_name}") + print(f"Connected to {dev.get_connection_info()}, {dev.device_name}") cml = Pn532CMD(dev) try: diff --git a/script/pn532_tag_scanner.py b/script/pn532_tag_scanner.py index a462841..6d5d2a3 100644 --- a/script/pn532_tag_scanner.py +++ b/script/pn532_tag_scanner.py @@ -51,10 +51,10 @@ def test_fn(): dev.open(port.device) break - if dev.serial_instance is None: + if not dev.isOpen(): print("No PN532/PN532Killer found") return - print(f"Connected to {dev.serial_instance.port}, {dev.device_name}") + print(f"Connected to {dev.get_connection_info()}, {dev.device_name}") cml = Pn532CMD(dev) try: diff --git a/script/pn532_tcp_testing.py b/script/pn532_tcp_testing.py deleted file mode 100644 index 82debc2..0000000 --- a/script/pn532_tcp_testing.py +++ /dev/null @@ -1,43 +0,0 @@ -import pn532_com -from pn532_cmd import Pn532CMD - -from platform import uname - -def test_fn(): - dev = pn532_com.Pn532Com() - # Connect via TCP to local test server - try: - dev.open("tcp:192.168.20.32:18889") - if not dev.isOpen(): - print("Failed to connect to tcp:192.168.20.32:18889") - return - print(f"Connected to tcp:192.168.20.32:18889") - cml = Pn532CMD(dev) - except Exception as e: - print(f"Connection failed: {e}") - return - try: - print("Getting firmware version...") - fw_version = cml.get_firmware_version() - print("FW Version:", fw_version) - hf_14a_scan_result = cml.hf_14a_scan() - if hf_14a_scan_result is not None: - for data_tag in hf_14a_scan_result: - print(f"- UID: {data_tag['uid'].hex().upper()}") - print( - f"- ATQA: {data_tag['atqa'].hex().upper()} " - f"(0x{int.from_bytes(data_tag['atqa'], byteorder='little'):04x})" - ) - print(f"- SAK: {data_tag['sak'].hex().upper()}") - if "ats" in data_tag and len(data_tag["ats"]) > 0: - print(f"- ATS: {data_tag['ats'].hex().upper()}") - else: - print("ISO14443-A Tag not found") - except Exception as e: - print("Error:", e) - import traceback - traceback.print_exc() - dev.close() - -if __name__ == "__main__": - test_fn() diff --git a/script/pn532killer_mf1_emulator.py b/script/pn532killer_mf1_emulator.py index 9ad600a..b942269 100644 --- a/script/pn532killer_mf1_emulator.py +++ b/script/pn532killer_mf1_emulator.py @@ -70,10 +70,10 @@ def test_fn(): dev.open(port.device) break - if dev.serial_instance is None: + if not dev.isOpen(): print("No PN532/PN532Killer found") return - print(f"Connected to {dev.serial_instance.port}, {dev.device_name}") + print(f"Connected to {dev.get_connection_info()}, {dev.device_name}") cml = Pn532CMD(dev) try: diff --git a/script/test_tcp_udp.py b/script/test_tcp_udp.py new file mode 100644 index 0000000..f7782f8 --- /dev/null +++ b/script/test_tcp_udp.py @@ -0,0 +1,87 @@ +import pn532_com +from pn532_cmd import Pn532CMD + +from platform import uname + +def test_fn(): + dev = pn532_com.Pn532Com() + port_name = "tcp:192.168.20.58:18889" + try: + dev.open(port_name) + if not dev.isOpen(): + print(f"Failed to connect to {port_name}") + return + print(f"Connected to {port_name}") + cml = Pn532CMD(dev) + except Exception as e: + print(f"Connection failed: {e}") + return + try: + print("Getting firmware version...") + fw_version = cml.get_firmware_version() + print("FW Version:", fw_version) + hf_14a_scan_result = cml.hf_14a_scan() + if hf_14a_scan_result is not None: + for data_tag in hf_14a_scan_result: + print(f"- UID: {data_tag['uid'].hex().upper()}") + print( + f"- ATQA: {data_tag['atqa'].hex().upper()} " + f"(0x{int.from_bytes(data_tag['atqa'], byteorder='little'):04x})" + ) + print(f"- SAK: {data_tag['sak'].hex().upper()}") + if "ats" in data_tag and len(data_tag["ats"]) > 0: + print(f"- ATS: {data_tag['ats'].hex().upper()}") + + + else: + print("ISO14443-A Tag not found") + + # 测试 hf mf cview 命令 + print("\nTesting HF MF cview...") + try: + cview_result = cml.hfmf_cview() # 装饰器直接返回 parsed 数据 + if cview_result is not None: + print("✅ HF MF cview succeeded") + if isinstance(cview_result, dict): + print(f" - Available keys: {list(cview_result.keys())}") + # 显示标签信息 + if 'uid' in cview_result: + print(f" - UID: {cview_result['uid']}") + if 'atqa' in cview_result: + print(f" - ATQA: {cview_result['atqa']}") + if 'sak' in cview_result: + print(f" - SAK: {cview_result['sak']}") + if 'blocks' in cview_result: + print(f" - Total blocks read: {len(cview_result['blocks'])}") + print(" - Block data preview (first 5 blocks):") + for i in range(min(5, len(cview_result['blocks']))): + block_key = str(i) + if block_key in cview_result['blocks']: + print(f" Block {i:02d}: {cview_result['blocks'][block_key]}") + elif 'data' in cview_result: + print(f" - Data field found: {type(cview_result['data'])}") + if isinstance(cview_result['data'], list): + print(f" - Data list length: {len(cview_result['data'])}") + print(" - Data preview (first 5 items):") + for i, item in enumerate(cview_result['data'][:5]): + print(f" Item {i}: {item}") + else: + print(f" - Data content: {cview_result['data']}") + else: + print(" - No 'blocks' or 'data' found") + else: + print(f" - Result type: {type(cview_result)}") + else: + print("❌ HF MF cview returned None") + except Exception as e: + print(f"❌ ERROR during HF MF cview: {e}") + import traceback + traceback.print_exc() + except Exception as e: + print("Error:", e) + import traceback + traceback.print_exc() + dev.close() + +if __name__ == "__main__": + test_fn() diff --git a/script/pn532_usb_testing.py b/script/test_usb.py similarity index 77% rename from script/pn532_usb_testing.py rename to script/test_usb.py index 2f7cca4..b734fc5 100644 --- a/script/pn532_usb_testing.py +++ b/script/test_usb.py @@ -7,11 +7,12 @@ def test_fn(): dev = pn532_com.Pn532Com() try: - dev.open("/dev/tty.wchusbserial210") + port_name = "/dev/tty.wchusbserial210" + dev.open(port_name) if not dev.isOpen(): - print("Failed to connect to /dev/tty.wchusbserial210") + print(f"Failed to connect to {port_name}") return - print(f"Connected to /dev/tty.wchusbserial210") + print(f"Connected to {port_name}") cml = Pn532CMD(dev) except Exception as e: print(f"Connection failed: {e}") From cdbddb84140bdbd0d156a27003a99d79f6520744 Mon Sep 17 00:00:00 2001 From: whywilson Date: Thu, 14 Aug 2025 19:25:56 +0800 Subject: [PATCH 5/9] Add data processing helper functions to support multiple input types. --- script/pn532_com.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/script/pn532_com.py b/script/pn532_com.py index 29adb0a..b6bf68e 100644 --- a/script/pn532_com.py +++ b/script/pn532_com.py @@ -128,6 +128,24 @@ def set_normal_mode(self) -> response: def in_release(self) -> response: response = self.send_cmd_sync(Command.InRelease, bytes.fromhex("00")) return response + + # --- helpers --- + def _normalize_payload(self, data: Union[bytes, bytearray, list, tuple, int, None]) -> bytes: + """Normalize payload to bytes. Accepts bytes/bytearray/list/tuple of ints(IntEnum)/single int/None.""" + if data is None: + return b"" + if isinstance(data, (bytes, bytearray)): + return bytes(data) + if isinstance(data, (list, tuple)): + return bytes([(int(x) & 0xFF) for x in data]) + if isinstance(data, int): + return bytes([data & 0xFF]) + # Fallback: try to bytes(), may raise which is fine to surface + return bytes(data) + + def _hex_str(self, data: Union[bytes, bytearray, list, tuple, int, None]) -> str: + b = self._normalize_payload(data) + return b.hex().upper() if b else "" # PN532Killer def set_work_mode(self, mode: PN532KillerMode = PN532KillerMode.READER, type=PN532KillerTagType.MFC, index=0) -> response: @@ -379,10 +397,13 @@ def thread_check_timeout(self): time.sleep(THREAD_BLOCKING_TIMEOUT) def make_data_frame_bytes( - self, cmd: int, data: Union[bytes, None] = None, status: int = 0 + self, + cmd: int, + data: Union[bytes, bytearray, list, tuple, int, None] = None, + status: int = 0, ) -> bytes: - if data is None: - data = b"" + # 支持 list/tuple/bytearray/bytes/int/None,多态输入统一转换 + data = self._normalize_payload(data) commands = self.data_tfi_send.to_bytes(1, byteorder="big") commands += cmd.to_bytes(1, byteorder="big") commands = bytearray(commands) @@ -438,12 +459,12 @@ def send_cmd_auto( self.wait_response_map[cmd]['_pre_registered'] = True self.wait_response_map[cmd]['_timeout_value'] = timeout if DEBUG: - print(f"PRE-REG CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={(data.hex().upper() if data else '')}") + print(f"PRE-REG CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={self._hex_str(data)}") task = {"cmd": cmd, "frame": data_frame, "timeout": timeout, "close": close} if callable(callback): task["callback"] = callback if DEBUG: - print(f"QUEUE CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={(data.hex().upper() if data else '')}") + print(f"QUEUE CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={self._hex_str(data)}") self.send_data_queue.put(task) def send_cmd_sync( From 2892f52ea53dadfe0f601bde139d905b11dea8f9 Mon Sep 17 00:00:00 2001 From: whywilson Date: Fri, 15 Aug 2025 09:40:19 +0800 Subject: [PATCH 6/9] Fix gen3 tag check and params. --- script/pn532_cli_main.py | 1 - script/pn532_cli_unit.py | 40 +++++++++++++++++++++++++++++++++++----- script/pn532_cmd.py | 36 +++++++++++++++++------------------- script/pn532_com.py | 18 ++++++++---------- 4 files changed, 60 insertions(+), 35 deletions(-) diff --git a/script/pn532_cli_main.py b/script/pn532_cli_main.py index 7c9426a..db26027 100644 --- a/script/pn532_cli_main.py +++ b/script/pn532_cli_main.py @@ -12,7 +12,6 @@ from prompt_toolkit.history import FileHistory from pn532_utils import CR, CG, CY, C0, CM import pn532_com -from pn532_com import DEBUG BANNER_PN532Killer = """ ██████╗ ███╗ ██╗███████╗██████╗ ██████╗ ██╗ ██╗██╗██╗ ██╗ ███████╗██████╗ diff --git a/script/pn532_cli_unit.py b/script/pn532_cli_unit.py index 9d75854..fd7ff7d 100644 --- a/script/pn532_cli_unit.py +++ b/script/pn532_cli_unit.py @@ -278,6 +278,35 @@ def on_exec(self, args: argparse.Namespace): os.system("clear" if os.name == "posix" else "cls") +@root.command("debug") +class RootDebug(BaseCLIUnit): + def args_parser(self) -> ArgumentParserNoExit: + parser = ArgumentParserNoExit() + parser.description = "Toggle debug logging (debug on|off). No args shows current state" + # Accept on/off (case-insensitive) + def to_lower(v: str) -> str: + return v.lower() + parser.add_argument( + "state", + nargs="?", + type=to_lower, + choices=["on", "off"], + help="Enable (on) or disable (off) debug output", + ) + return parser + + def on_exec(self, args: argparse.Namespace): + if getattr(args, "state", None) is None: + print(f"Debug is currently: {CG if pn532_com.DEBUG else CR}{'ON' if pn532_com.DEBUG else 'OFF'}{C0}") + return + if args.state == "on": + pn532_com.DEBUG = True + print(f"Debug switched {CG}ON{C0}") + elif args.state == "off": + pn532_com.DEBUG = False + print(f"Debug switched {CR}OFF{C0}") + + @hw_mode.command("r") class HWModeReader(DeviceRequiredUnit): def args_parser(self) -> ArgumentParserNoExit: @@ -1431,7 +1460,7 @@ def get_block0(self, uid, args): uid = str_to_bytes(block0[0:8]) bcc = uid[0] ^ uid[1] ^ uid[2] ^ uid[3] # check if bcc is valid on the block0 - if block0[8:10] != format(bcc, "02x"): + if int(block0[8:10], 16) != bcc: print(f"{CR}Invalid BCC{C0}") return return str_to_bytes(block0) @@ -1956,16 +1985,17 @@ def on_exec(self, args: argparse.Namespace): resp = self.cmd.hf14a_raw( options=options, resp_timeout_ms=1000, - data=[MifareCommand.MfWriteBlock, 0], + data=[MifareCommand.MfWriteBlock, block], ) - print(f"Writing block 0: {block0.hex().upper()}") + blk_bytes = bytes.fromhex(block_data) + print(f"Writing block {block}: {blk_bytes.hex().upper()}") options["keep_rf_field"] = 0 resp = self.cmd.hf14a_raw( options=options, resp_timeout_ms=1000, - data=block0, + data=blk_bytes, ) - if resp.length > 0 and resp[0] == 0x00: + if resp and len(resp) > 0 and resp[0] == 0x00: print(f"Write {block_data} to block {block}: {CG}Success{C0}") else: print(f"Write failed on block {block}") diff --git a/script/pn532_cmd.py b/script/pn532_cmd.py index 2484baa..80ba71b 100644 --- a/script/pn532_cmd.py +++ b/script/pn532_cmd.py @@ -48,15 +48,14 @@ def hf_14a_scan(self): :return: """ - # 根据连接类型调整一次性命令等待时间(不重发,只是给 UDP/TCP 更宽松的等待) timeout = 1 if hasattr(self.device, 'connection_type') and self.device.connection_type in ('udp', 'tcp'): timeout = 2 # 给网络模式多一点时间 resp = self.device.send_cmd_sync(Command.InListPassiveTarget, b"\x01\x00", timeout=timeout) if resp.status == Status.SUCCESS: - if len(resp.data) < 2: - resp.parsed = None - return resp + # if len(resp.data) < 2: + # resp.parsed = None + # return resp offset = 0 data = [] while offset < len(resp.data): @@ -107,13 +106,12 @@ def hfmf_cview(self): self.device.set_normal_mode() tag_info = {} - scan_result = self.hf_14a_scan() # 装饰器返回 parsed 数据,不是 Response 对象 + scan_result = self.hf_14a_scan() if scan_result == None or len(scan_result) == 0: print("No tag found") return Response(Command.InListPassiveTarget, Status.HF_TAG_NO) - # scan_result 直接是标签信息列表,不需要 .parsed tag_info["uid"] = scan_result[0]["uid"].hex() tag_info["atqa"] = scan_result[0]["atqa"].hex() tag_info["sak"] = scan_result[0]["sak"].hex() @@ -259,7 +257,7 @@ def isGen1a(self): def selectTag(self): tag_info = {} - scan_result = self.hf_14a_scan() # 装饰器返回 parsed 数据,不是 Response 对象 + scan_result = self.hf_14a_scan() self.device.halt() if scan_result == None or len(scan_result) == 0: print("No tag found") @@ -284,18 +282,18 @@ def selectTag(self): options=options, resp_timeout_ms=1000, data=[0x52], bitlen=7 ) if DEBUG: - print("WUPA:", wupa_result.parsed.hex()) + print("WUPA:", wupa_result) anti_coll_result = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[0x93, 0x20] ) if DEBUG: print("Anticollision CL1:", anti_coll_result.hex()) - if anti_coll_result[0] != 0x00: + if len(anti_coll_result) < 4: if DEBUG: print("Anticollision failed") return False - anti_coll_data = anti_coll_result[1:] + anti_coll_data = anti_coll_result[0:] options["append_crc"] = 1 select_result = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[0x93, 0x70] + list(anti_coll_data) @@ -308,13 +306,11 @@ def selectTag(self): elif uid_length == 7: options["append_crc"] = 0 anti_coll2_result = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[0x95, 0x20]) - if DEBUG: - print("Anticollision CL2:", anti_coll2_result.hex()) - if anti_coll2_result[0] != 0x00: + if len(anti_coll2_result) < 4: if DEBUG: print("Anticollision CL2 failed") return False - anti_coll2_data = anti_coll2_result[1:] + anti_coll2_data = anti_coll2_result[0:] options["append_crc"] = 1 select2_result = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[0x95, 0x70] + list(anti_coll2_data) @@ -363,7 +359,9 @@ def setGen3Uid(self, uid: bytes): resp = self.hf14a_raw( options=options, resp_timeout_ms=2000, data=bytes.fromhex(command) ) - if resp.parsed[0] == 0x00: + if DEBUG: + print("Set Gen3 UID:", resp.hex()) + if len(resp) >= 2 and resp[0] == 0x90 and resp[1] == 0x00: return True return False @@ -380,7 +378,7 @@ def setGen3Block0(self, block0: bytes): resp = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=bytes.fromhex(command) ) - if resp.parsed[0] == 0x00: + if len(resp) >= 2 and resp[0] == 0x90 and resp[1] == 0x00: return True return False @@ -397,7 +395,7 @@ def lockGen3Uid(self): resp = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=bytes.fromhex(command) ) - if resp.parsed[0] == 0x00: + if len(resp) >= 2 and resp[0] == 0x90 and resp[1] == 0x00: return True return False @@ -438,7 +436,7 @@ def mf1_read_block(self, block, key): if resp == None: print("No tag found") return resp - uidID1 = bytes(resp.parsed[0]["uid"]) + uidID1 = bytes(resp[0]["uid"]) auth_result = self.mf1_auth_one_key_block(block, MfcKeyType.A, key, uidID1) if not auth_result: self.mf1_authenticated_useKeyA = False @@ -465,7 +463,7 @@ def mf1_read_one_block(self, block, type_value: MfcKeyType, key): if resp == None: print("No tag found") return resp - uidID1 = bytes(resp.parsed[0]["uid"]) + uidID1 = bytes(resp[0]["uid"]) auth_result = self.mf1_auth_one_key_block( block, type_value, key, uidID1 ) diff --git a/script/pn532_com.py b/script/pn532_com.py index b6bf68e..1423663 100644 --- a/script/pn532_com.py +++ b/script/pn532_com.py @@ -256,8 +256,8 @@ def thread_data_receive(self): break if not chunk: continue - if DEBUG: - print(f"READ {chunk.hex().upper()}") + # if DEBUG: + # print(f"READ {chunk.hex().upper()}") data_buffer.extend(chunk) # 过滤所有 ACK changed = True @@ -265,8 +265,6 @@ def thread_data_receive(self): changed = False pos = data_buffer.find(ACK) if pos != -1: - if DEBUG: - print(f"SKIP ACK at {pos}") del data_buffer[pos:pos+len(ACK)] changed = True # 尝试解析帧 @@ -305,8 +303,8 @@ def thread_data_receive(self): continue tfi = data[0] if tfi != self.data_tfi_receive: - if DEBUG: - print(f"Unexpected TFI {tfi:02X} (expect {self.data_tfi_receive:02X}), resync") + # if DEBUG: + # print(f"Unexpected TFI {tfi:02X} (expect {self.data_tfi_receive:02X}), resync") i += 1 continue if len(data) < 2: @@ -458,13 +456,13 @@ def send_cmd_auto( self.wait_response_map[cmd]['response'] = None self.wait_response_map[cmd]['_pre_registered'] = True self.wait_response_map[cmd]['_timeout_value'] = timeout - if DEBUG: - print(f"PRE-REG CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={self._hex_str(data)}") + # if DEBUG: + # print(f"PRE-REG CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={self._hex_str(data)}") task = {"cmd": cmd, "frame": data_frame, "timeout": timeout, "close": close} if callable(callback): task["callback"] = callback - if DEBUG: - print(f"QUEUE CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={self._hex_str(data)}") + # if DEBUG: + # print(f"QUEUE CMD=0x{cmd:02X} TIMEOUT={timeout}s DATA={self._hex_str(data)}") self.send_data_queue.put(task) def send_cmd_sync( From 785821fd2d2fe827a3a63fd717ef4056adf0cf61 Mon Sep 17 00:00:00 2001 From: whywilson Date: Fri, 15 Aug 2025 10:00:53 +0800 Subject: [PATCH 7/9] Add debug command and fix HfMfuWrbl block warning --- script/pn532_cli_unit.py | 21 ++++++++++++--------- script/pn532_cmd.py | 2 +- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/script/pn532_cli_unit.py b/script/pn532_cli_unit.py index fd7ff7d..c8d8ed7 100644 --- a/script/pn532_cli_unit.py +++ b/script/pn532_cli_unit.py @@ -282,7 +282,7 @@ def on_exec(self, args: argparse.Namespace): class RootDebug(BaseCLIUnit): def args_parser(self) -> ArgumentParserNoExit: parser = ArgumentParserNoExit() - parser.description = "Toggle debug logging (debug on|off). No args shows current state" + parser.description = "Toggle debug logging (debug on|off)." # Accept on/off (case-insensitive) def to_lower(v: str) -> str: return v.lower() @@ -2168,11 +2168,13 @@ def args_parser(self) -> ArgumentParserNoExit: parser = ArgumentParserNoExit() parser.description = "Read Mifare Ultralight block" parser.add_argument( - "-blk", + "-b", "--blk", + dest="blk", type=int, metavar="", - required=True, - help="Block to read", + required=False, + default=0, + help="Block to read (default 0)", ) return parser @@ -2203,9 +2205,10 @@ def on_exec(self, args: argparse.Namespace): class HfMfuWrbl(DeviceRequiredUnit): def args_parser(self) -> ArgumentParserNoExit: parser = ArgumentParserNoExit() - parser.description = "Write Mifare Ultralight block" + parser.description = "Write Mifare Ultralight block (blocks 0-2 are blocked to avoid soft-brick)" parser.add_argument( - "-blk", + "-b", "--blk", + dest="blk", type=int, metavar="", required=True, @@ -2226,9 +2229,9 @@ def on_exec(self, args: argparse.Namespace): if not re.match(r"^[a-fA-F0-9]{8}$", data): print("Data must be 4 bytes hex") return - # if block is less than 4, show warning - if block < 3: - print(f"{CR}Warning: Single writing to block {block} may brick the tag{C0}") + if block in (0, 1, 2): + print(f"{CR}Blocked single write to reserved page {block} (0/1/2): this would corrupt the BCC and can soft-brick the tag; recovery may require specialized tools.{C0}") + print(f"{CY}Recommendation: only update the first 3 pages as a whole using the proper procedure/device, and only if you fully understand the process.{C0}") return resp = self.cmd.hf_14a_scan() if resp == None: diff --git a/script/pn532_cmd.py b/script/pn532_cmd.py index 80ba71b..3f56bdc 100644 --- a/script/pn532_cmd.py +++ b/script/pn532_cmd.py @@ -242,7 +242,7 @@ def isGen1a(self): # Unlock 1 resp_data = self.hf14a_raw( options=options, resp_timeout_ms=1000, data=[0x40], bitlen=7 - ) # 装饰器返回字节数据 + ) if DEBUG: print("unlock 1:", resp_data.hex()) if resp_data[-1] == 0x0A: From 6913d326c528c4f5eb6acf82ad6bda2752dbba82 Mon Sep 17 00:00:00 2001 From: whywilson Date: Fri, 15 Aug 2025 10:22:56 +0800 Subject: [PATCH 8/9] Add build for dev branch. --- .github/workflows/pyinstaller.yml | 32 ++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pyinstaller.yml b/.github/workflows/pyinstaller.yml index 217856f..01251eb 100644 --- a/.github/workflows/pyinstaller.yml +++ b/.github/workflows/pyinstaller.yml @@ -4,12 +4,16 @@ on: push: branches: - main + - dev pull_request: branches: - main release: types: [created] +permissions: + contents: write + jobs: build: runs-on: ${{ matrix.os }} @@ -47,4 +51,30 @@ jobs: uses: actions/upload-artifact@v4 with: name: ${{ matrix.artifact_name }} - path: dist/ \ No newline at end of file + path: dist/ + + prerelease: + name: Create prerelease and attach builds + needs: build + if: github.ref == 'refs/heads/dev' + runs-on: ubuntu-latest + steps: + - name: Download all build artifacts + uses: actions/download-artifact@v4 + with: + pattern: '*' + merge-multiple: true + path: release-assets + + - name: Create GitHub prerelease and upload assets + uses: softprops/action-gh-release@v2 + with: + tag_name: beta-${{ github.run_number }} + name: Beta ${{ github.run_number }} + prerelease: true + draft: false + body: | + Auto-generated beta prerelease for commit ${{ github.sha }}. + This release contains PyInstaller builds for Linux, Windows, and macOS. + files: | + release-assets/** \ No newline at end of file From 5c80d7b4da94859a60a6e81724fd29cc26d58253 Mon Sep 17 00:00:00 2001 From: whywilson Date: Fri, 15 Aug 2025 10:34:45 +0800 Subject: [PATCH 9/9] Reflact the build for dev. --- .github/workflows/pyinstaller.yml | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/.github/workflows/pyinstaller.yml b/.github/workflows/pyinstaller.yml index 01251eb..916f3b3 100644 --- a/.github/workflows/pyinstaller.yml +++ b/.github/workflows/pyinstaller.yml @@ -47,11 +47,27 @@ jobs: run: | pyinstaller script/pyinstaller.spec - - name: Upload artifacts + - name: Package artifact (Linux) + if: runner.os == 'Linux' + run: | + zip -r "${{ matrix.artifact_name }}.zip" dist/* + + - name: Package artifact (macOS) + if: runner.os == 'macOS' + run: | + zip -r "${{ matrix.artifact_name }}.zip" dist/* + + - name: Package artifact (Windows) + if: runner.os == 'Windows' + shell: pwsh + run: | + Compress-Archive -Path "dist/*" -DestinationPath "${{ matrix.artifact_name }}.zip" + + - name: Upload artifacts (zip only) uses: actions/upload-artifact@v4 with: name: ${{ matrix.artifact_name }} - path: dist/ + path: ${{ matrix.artifact_name }}.zip prerelease: name: Create prerelease and attach builds @@ -59,6 +75,10 @@ jobs: if: github.ref == 'refs/heads/dev' runs-on: ubuntu-latest steps: + - name: Compute short SHA + id: vars + run: echo "short_sha=${GITHUB_SHA::8}" >> $GITHUB_OUTPUT + - name: Download all build artifacts uses: actions/download-artifact@v4 with: @@ -69,12 +89,12 @@ jobs: - name: Create GitHub prerelease and upload assets uses: softprops/action-gh-release@v2 with: - tag_name: beta-${{ github.run_number }} - name: Beta ${{ github.run_number }} + tag_name: beta.${{ steps.vars.outputs.short_sha }} + name: PN532 CLI - beta.${{ steps.vars.outputs.short_sha }} prerelease: true draft: false body: | Auto-generated beta prerelease for commit ${{ github.sha }}. This release contains PyInstaller builds for Linux, Windows, and macOS. files: | - release-assets/** \ No newline at end of file + release-assets/*.zip \ No newline at end of file