Skip to content
25 changes: 19 additions & 6 deletions EosLib/packet/data_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from EosLib.device import Device

class DataHeader:

data_header_struct_format_string = "!" \
"B" \
"B" \
Expand Down Expand Up @@ -50,17 +49,18 @@ def validate_data_header(self):

:return: True if valid
"""
if not isinstance(self.sender, int) or not 0 <= self.sender <= 255 or self.sender == \
Device.NO_DEVICE:

if not isinstance(self.sender, int) or self.sender == definitions.Device.NO_DEVICE or \
not self.sender in definitions.Device:
raise DataHeaderFormatError("Invalid Sender")

if not isinstance(self.data_type, int) or not 0 <= self.data_type <= 255:
if not isinstance(self.data_type, int) or self.data_type not in definitions.Type:
raise DataHeaderFormatError("Invalid Type")

if not isinstance(self.priority, int) or not 0 <= self.priority <= 255:
if not isinstance(self.priority, int) or self.priority not in definitions.Priority:
raise DataHeaderFormatError("Invalid Priority")

if not isinstance(self.destination, int) or not 0 <= self.destination <= 255:
if not isinstance(self.destination, int) or self.destination not in definitions.Device:
raise DataHeaderFormatError("Invalid Destination")

if not isinstance(self.generate_time, datetime):
Expand Down Expand Up @@ -111,3 +111,16 @@ def decode(header_bytes: bytes):
decoded_header = DataHeader(unpacked[1], unpacked[2], unpacked[3], unpacked[4],
datetime.fromtimestamp(unpacked[5]))
return decoded_header


def check_data_header(data_header: DataHeader) -> bool:
""" Takes a packet data header and checks to see if it is valid

:return: boolean True if valid
"""
if data_header is None:
raise PacketFormatError("All packets must have a data header")
else:
data_header.validate_data_header()

return True
56 changes: 53 additions & 3 deletions EosLib/packet/definitions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from enum import IntEnum, unique
from enum import IntEnum, unique, EnumMeta


class EnumMetaClass(EnumMeta):
def __contains__(cls, item):
try:
cls(item)
except ValueError:
return False
else:
return True


@unique
class Type(IntEnum):
class Type(IntEnum, metaclass=EnumMetaClass):
NO_TYPE = 0
TELEMETRY = 1
WARNING = 2
Expand All @@ -15,7 +25,7 @@ class Type(IntEnum):


@unique
class Priority(IntEnum):
class Priority(IntEnum, metaclass=EnumMetaClass):
NO_TRANSMIT = 0
URGENT = 11
TELEMETRY = 9
Expand All @@ -25,6 +35,46 @@ class Priority(IntEnum):
ERROR = 255


@unique
class Device(IntEnum, metaclass=EnumMetaClass):
NO_DEVICE = 0
PRESSURE = 1
PARTICULATES = 2
IR_VISIBLE_LIGHT = 3
VISIBLE_UVA_LIGHT = 4
UVA_UVB_LIGHT = 5
CO2 = 6
O3 = 7
MISC_SENSOR_1 = 8
MISC_SENSOR_2 = 9
MISC_SENSOR_3 = 10
MISC_SENSOR_4 = 11
REEFING_MOTOR = 12
MISC_ENGINEERING_1 = 13
MISC_ENGINEERING_2 = 14
CAMERA_1 = 15
CAMERA_2 = 16
MISC_CAMERA_1 = 17
MISC_CAMERA_2 = 18
RADIO = 19
MISC_RADIO_1 = 20
MISC_RADIO_2 = 21
GPS = 22
IMU = 23
MISC_1 = 24
MISC_2 = 25
MISC_3 = 26
MISC_4 = 27
TEMPERATURE_HUMIDITY = 28
MISC_TEST_1 = 29
MISC_TEST_2 = 30
MISC_TEST_3 = 31
GROUND_STATION_1 = 32
GROUND_STATION_2 = 33
GROUND_STATION_3 = 34
ORCHEOSTRATOR = 35


@unique
class HeaderPreamble(IntEnum):
V010DATA = 2
Expand Down
50 changes: 50 additions & 0 deletions EosLib/packet/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, body: bytes, data_header: DataHeader, transmit_header: Transm
self.body = body # type: bytes
self.data_header = data_header # type: DataHeader
self.transmit_header = transmit_header # type: TransmitHeader
self.validate_packet() # checks if packet is valid

def __eq__(self, other):
""" Compares two packets for value equality
Expand Down Expand Up @@ -129,6 +130,55 @@ def encode_to_string(self):
data_header=self.data_header.encode_to_string(),
body=self.body.decode())

def set_data_header(self, new_data_header: DataHeader) -> bool:
""" Setter that sets new data header

:return: boolean True set is successful
:raises: PacketFormatError if data header is none
"""
if new_data_header is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is very incorrect

if new_data_header.validate_data_header():
self.data_header = new_data_header
return True

raise PacketFormatError("data header can not be none")

def set_transmit_header(self, new_transmit_header: TransmitHeader) -> bool:
""" Setter that sets new transmit header

:return: boolean True set is successful
"""
if new_transmit_header is None:
self.transmit_header = new_transmit_header
return True

if new_transmit_header.validate_transmit_header():
self.transmit_header = new_transmit_header

return True

def set_body(self, new_body: bytes) -> bool:
""" Setter that sets new body

:return: boolean True set is successful
"""
if new_body is not None and len(new_body) == 0:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the body validator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to make a function or is there already a function for this? If so I can't find it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth making one, there's good logic in validate_packet.

Also this function's logic is wrong (only sets if length 0) and it doesn't return something even though the docstring specifies that it should

self.body = new_body
return True

return False

@staticmethod
def check_body(body: bytes) -> bool:
""" Takes a packet body and checks to see if it is valid

:return: boolean True if valid
"""
if body is None or len(body) == 0 or not isinstance(body, bytes):
raise PacketFormatError("All packets must have a body")

return True

@staticmethod
def decode(packet_bytes: bytes):
"""Takes a bytes object and decodes it into a Packet object.
Expand Down
11 changes: 11 additions & 0 deletions EosLib/packet/transmit_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@ def decode(header_bytes: bytes):

decoded_header = TransmitHeader(unpacked[1], datetime.fromtimestamp(unpacked[3]), unpacked[2])
return decoded_header


def check_transmit_header(transmit_header: TransmitHeader) -> bool:
""" Takes a packet transmit header and checks to see if it is valid

:return: boolean True if valid
"""
if transmit_header is not None:
transmit_header.validate_transmit_header()

return True
42 changes: 41 additions & 1 deletion tests/packet/test_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import datetime
from EosLib.packet.packet import TransmitHeader, DataHeader, Packet, PacketFormatError
from EosLib.packet.exceptions import DataHeaderFormatError, TransmitHeaderFormatError
from EosLib.packet.data_header import check_data_header
from EosLib.packet.transmit_header import check_transmit_header
from EosLib.device import Device


Expand Down Expand Up @@ -294,4 +296,42 @@ def test_packet_print_no_body(packet):
"\tGenerate Time: 2001-01-07 01:23:45\n" \
"No body"

assert expected_string == packet.__str__()

def test_set_data_header():
test_packet = get_valid_packet()
data_header = DataHeader(definitions.Device.GPS,
definitions.Type.TELEMETRY,
definitions.Priority.TELEMETRY,
definitions.Device.GPS,
datetime.now())

test_packet.set_data_header(data_header)


def test_set_transmit_header():
test_packet = get_valid_packet()
transmit_header = TransmitHeader(0, datetime.now())

test_packet.set_transmit_header(transmit_header)


def test_set_body():
test_packet = get_valid_packet()
body = bytes("temp", 'utf-8')

test_packet.set_body(body)


def test_check_data_header():
test_packet = get_valid_packet()
check_data_header(test_packet.data_header)


def test_check_transmit_header():
test_packet = get_valid_packet()
check_transmit_header(test_packet.transmit_header)


def test_check_body():
test_packet = get_valid_packet()
Packet.check_body(test_packet.body)