Skip to content

Commit 95c8be2

Browse files
committed
xdd: Implement option to load object dictionary from XDD file.
Signed-off-by: Taras Zaporozhets <zaporozhets.taras@gmail.com>
1 parent 9685880 commit 95c8be2

5 files changed

Lines changed: 1913 additions & 2 deletions

File tree

canopen/objectdictionary/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def import_od(
7676
source: Union[str, TextIO, None],
7777
node_id: Optional[int] = None,
7878
) -> ObjectDictionary:
79-
"""Parse an EDS, DCF, or EPF file.
79+
"""Parse an EDS, DCF, EPF or XDD file.
8080
8181
:param source:
8282
The path to object dictionary file, a file like object, or an EPF XML tree.
@@ -106,9 +106,12 @@ def import_od(
106106
elif suffix == ".epf":
107107
from canopen.objectdictionary import epf
108108
return epf.import_epf(source)
109+
elif suffix == ".xdd":
110+
from canopen.objectdictionary import xdd
111+
return xdd.import_xdd(source, node_id)
109112
else:
110113
doc_type = suffix[1:]
111-
allowed = ", ".join(["eds", "dcf", "epf"])
114+
allowed = ", ".join(["eds", "dcf", "epf", "xdd"])
112115
raise ValueError(
113116
f"Cannot import from the {doc_type!r} format; "
114117
f"supported formats: {allowed}"

canopen/objectdictionary/xdd.py

Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
import functools
2+
import logging
3+
import os
4+
import re
5+
import xml.etree.ElementTree as etree
6+
from typing import Union, Optional
7+
from canopen.objectdictionary import (
8+
ODArray,
9+
ODRecord,
10+
ODVariable,
11+
ObjectDictionary,
12+
datatypes,
13+
objectcodes,
14+
)
15+
16+
logger = logging.getLogger(__name__)
17+
autoint = functools.partial(int, base=0)
18+
hex = functools.partial(int, base=16)
19+
20+
def import_xdd(
21+
xdd: Union[etree.Element, str, bytes, os.PathLike],
22+
node_id: Optional[int],
23+
) -> ObjectDictionary:
24+
od = ObjectDictionary()
25+
if etree.iselement(xdd):
26+
root = xdd
27+
else:
28+
root = etree.parse(xdd).getroot()
29+
30+
if node_id is None:
31+
device_commissioning = root.find('.//{*}DeviceCommissioning')
32+
if device_commissioning is not None:
33+
od.node_id = int(device_commissioning['nodeID'], 0)
34+
else:
35+
od.node_id = None
36+
else:
37+
od.node_id = node_id
38+
39+
_add_device_information(od, root)
40+
_add_object_list(od, root)
41+
_add_dummy_objects(od, root)
42+
return od
43+
44+
45+
def _add_device_information(
46+
od: ObjectDictionary,
47+
root: etree.Element
48+
):
49+
device_identity = root.find('.//{*}DeviceIdentity')
50+
if device_identity is not None:
51+
for src_prop, dst_prop, f in [
52+
("vendorName", "vendor_name", str),
53+
("vendorID", "vendor_number", hex),
54+
("productName", "product_name", str),
55+
("productID", "product_number", hex),
56+
]:
57+
val = device_identity.find(f'{{*}}{src_prop}')
58+
if val is not None and val.text:
59+
setattr(od.device_information, dst_prop, f(val.text))
60+
61+
general_features = root.find('.//{*}CANopenGeneralFeatures')
62+
if general_features is not None:
63+
for src_prop, dst_prop, f, default in [
64+
# properties without default value (default=None) are required
65+
("granularity", "granularity", autoint, None),
66+
("nrOfRxPDO", "nr_of_RXPDO", autoint, "0"),
67+
("nrOfTxPDO", "nr_of_TXPDO", autoint, "0"),
68+
("bootUpSlave", "simple_boot_up_slave", bool, 0),
69+
]:
70+
val = general_features.get(src_prop, default)
71+
if val is None:
72+
raise ValueError(f"Missing required '{src_prop}' property in XDD file")
73+
setattr(od.device_information, dst_prop, f(val))
74+
75+
76+
baud_rate = root.find('.//{*}PhysicalLayer/{*}baudRate')
77+
for baud in baud_rate:
78+
try:
79+
rate = int(baud.get("value").replace(' Kbps', ''), 10) * 1000
80+
od.device_information.allowed_baudrates.add(rate)
81+
except (ValueError, TypeError):
82+
pass
83+
84+
if default_baud := baud_rate.get('defaultValue', None):
85+
try:
86+
od.bitrate = int(default_baud.replace(' Kbps', ''), 10) * 1000
87+
except (ValueError, TypeError):
88+
pass
89+
90+
91+
def _add_object_list(
92+
od: ObjectDictionary,
93+
root: etree.Element
94+
):
95+
# Process all CANopen objects in the file
96+
for obj in root.findall('.//{*}CANopenObjectList/{*}CANopenObject'):
97+
name = obj.get('name', '')
98+
index = int(obj.get('index', '0'), 16)
99+
object_type = int(obj.get('objectType', '0'))
100+
sub_number = obj.get('subNumber')
101+
102+
# Simple variable
103+
if object_type == objectcodes.VAR:
104+
unique_id_ref = obj.get('uniqueIDRef', None)
105+
parameters = root.find(f'.//{{*}}parameter[@uniqueID="{unique_id_ref}"]')
106+
107+
var = _build_variable(parameters, od.node_id, name, index)
108+
_set_parameters_from_xdd_canopen_object(od.node_id, var, obj)
109+
od.add_object(var)
110+
111+
# Array
112+
elif object_type == objectcodes.ARRAY and sub_number:
113+
array = ODArray(name, index)
114+
for sub_obj in obj:
115+
sub_name = sub_obj.get('name', '')
116+
sub_index = int(sub_obj.get('subIndex'), 16)
117+
sub_unique_id = sub_obj.get('uniqueIDRef', None)
118+
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')
119+
120+
sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
121+
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
122+
array.add_member(sub_var)
123+
od.add_object(array)
124+
125+
# Record/Struct
126+
elif object_type == objectcodes.RECORD and sub_number:
127+
record = ODRecord(name, index)
128+
for sub_obj in obj:
129+
sub_name = sub_obj.get('name', '')
130+
sub_index = int(sub_obj.get('subIndex'), 16)
131+
sub_unique_id = sub_obj.get('uniqueIDRef', None)
132+
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')
133+
sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
134+
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
135+
record.add_member(sub_var)
136+
od.add_object(record)
137+
138+
139+
def _add_dummy_objects(
140+
od: ObjectDictionary,
141+
root: etree.Element
142+
):
143+
dummy_section = root.find('.//{*}ApplicationLayers/{*}dummyUsage')
144+
for dummy in dummy_section:
145+
p = dummy.get('entry').split('=')
146+
key = p[0]
147+
value = int(p[1], 10)
148+
index = int(key.replace('Dummy', ''), 10)
149+
if value == 1:
150+
var = ODVariable(key, index, 0)
151+
var.data_type = index
152+
var.access_type = "const"
153+
od.add_object(var)
154+
155+
156+
def _set_parameters_from_xdd_canopen_object(
157+
node_id: Optional[int],
158+
dst: ODVariable,
159+
src: etree.Element
160+
):
161+
# PDO mapping of the object, optional, string
162+
# Valid values:
163+
# * no - not mappable
164+
# * default - mapped by default
165+
# * optional - optionally mapped
166+
# * TPDO - may be mapped into TPDO only
167+
# * RPDO - may be mapped into RPDO only
168+
pdo_mapping = src.get('PDOmapping', 'no')
169+
dst.pdo_mappable = pdo_mapping != 'no'
170+
171+
# Name of the object, optional, string
172+
if var_name := src.get('name', None):
173+
dst.name = var_name
174+
175+
# CANopen data type (two hex digits), optional
176+
# data_type matches canopen library, no conversion needed
177+
if var_data_type := src.get('dataType', None):
178+
try:
179+
dst.data_type = int(var_data_type, 16)
180+
except (ValueError, TypeError):
181+
pass
182+
183+
# Access type of the object; valid values, optional, string
184+
# * const - read access only; the value is not changing
185+
# * ro - read access only
186+
# * wo - write access only
187+
# * rw - both read and write access
188+
# strings match with access_type in canopen library, no conversion needed
189+
if access_type := src.get('accessType', None):
190+
dst.access_type = access_type
191+
192+
# Low limit of the parameter value, optional, string
193+
if min_value := src.get('lowLimit', None):
194+
try:
195+
dst.min = _convert_variable(node_id, dst.data_type, min_value)
196+
except (ValueError, TypeError):
197+
pass
198+
199+
# High limit of the parameter value, optional, string
200+
if max_value := src.get('highLimit', None):
201+
try:
202+
dst.max = _convert_variable(node_id, dst.data_type, max_value)
203+
except (ValueError, TypeError):
204+
pass
205+
206+
# Default value of the object, optional, string
207+
if default_value := src.get('defaultValue', None):
208+
try:
209+
dst.default_raw = default_value
210+
if '$NODEID' in dst.default_raw:
211+
dst.relative = True
212+
dst.default = _convert_variable(node_id, dst.data_type, dst.default_raw)
213+
except (ValueError, TypeError):
214+
pass
215+
216+
217+
def _build_variable(
218+
par_tree: Optional[etree.Element],
219+
node_id: Optional[int],
220+
name: str,
221+
index: int,
222+
subindex: int = 0
223+
) -> ODVariable:
224+
var = ODVariable(name, index, subindex)
225+
# Set default parameters
226+
var.default_raw = None
227+
var.access_type = 'ro'
228+
if par_tree is None:
229+
return var
230+
231+
var.description = par_tree.get('description', '')
232+
233+
# Extract data type
234+
data_types = {
235+
'BOOL': datatypes.BOOLEAN,
236+
'SINT': datatypes.INTEGER8,
237+
'INT': datatypes.INTEGER16,
238+
'DINT': datatypes.INTEGER32,
239+
'LINT': datatypes.INTEGER64,
240+
'USINT': datatypes.UNSIGNED8,
241+
'UINT': datatypes.UNSIGNED16,
242+
'UDINT': datatypes.UNSIGNED32,
243+
'ULINT': datatypes.UNSIGNED32,
244+
'REAL': datatypes.REAL32,
245+
'LREAL': datatypes.REAL64,
246+
'STRING': datatypes.VISIBLE_STRING,
247+
'BITSTRING': datatypes.DOMAIN,
248+
'WSTRING': datatypes.UNICODE_STRING
249+
}
250+
251+
for k, v in data_types.items():
252+
if par_tree.find(f'{{*}}{k}') is not None:
253+
var.data_type = v
254+
255+
# Extract access type
256+
if access_type_str := par_tree.get('access', None):
257+
# Defines which operations are valid for the parameter:
258+
# * const - read access only; the value is not changing
259+
# * read - read access only (default value)
260+
# * write - write access only
261+
# * readWrite - both read and write access
262+
# * readWriteInput - both read and write access, but represents process input data
263+
# * readWriteOutput - both read and write access, but represents process output data
264+
# * noAccess - access denied
265+
access_types = {
266+
'const': 'const',
267+
'read': 'ro',
268+
'write': 'wo',
269+
'readWrite': 'rw',
270+
'readWriteInput': 'rw',
271+
'readWriteOutput': 'rw',
272+
'noAccess': 'const',
273+
}
274+
var.access_type = access_types.get(access_type_str)
275+
276+
# Extract default value
277+
default_value = par_tree.find('{*}defaultValue')
278+
if default_value is not None:
279+
try:
280+
var.default_raw = default_value.get('value')
281+
if '$NODEID' in var.default_raw:
282+
var.relative = True
283+
var.default = _convert_variable(node_id, var.data_type, var.default_raw)
284+
except (ValueError, TypeError):
285+
pass
286+
287+
# Extract allowed values range
288+
min_value = par_tree.find('{*}allowedValues/{*}range/{*}minValue')
289+
if min_value is not None:
290+
try:
291+
var.min = _convert_variable(node_id, var.data_type, min_value.get('value'))
292+
except (ValueError, TypeError):
293+
pass
294+
295+
max_value = par_tree.find('{*}allowedValues/{*}range/{*}maxValue')
296+
if max_value is not None:
297+
try:
298+
var.max = _convert_variable(node_id, var.data_type, max_value.get('value'))
299+
except (ValueError, TypeError):
300+
pass
301+
return var
302+
303+
304+
def _calc_bit_length(
305+
data_type: int
306+
) -> int:
307+
if data_type == datatypes.INTEGER8:
308+
return 8
309+
elif data_type == datatypes.INTEGER16:
310+
return 16
311+
elif data_type == datatypes.INTEGER32:
312+
return 32
313+
elif data_type == datatypes.INTEGER64:
314+
return 64
315+
else:
316+
raise ValueError(f"Invalid data_type '{data_type}', expecting a signed integer data_type.")
317+
318+
319+
def _signed_int_from_hex(
320+
hex_str: str,
321+
bit_length: int
322+
) -> int:
323+
number = int(hex_str, 0)
324+
max_value = (1 << (bit_length - 1)) - 1
325+
326+
if number > max_value:
327+
return number - (1 << bit_length)
328+
else:
329+
return number
330+
331+
332+
def _convert_variable(
333+
node_id: Optional[int],
334+
var_type: int,
335+
value: str
336+
) -> Optional[Union[bytes, str, float, int]]:
337+
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
338+
return bytes.fromhex(value)
339+
elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
340+
return str(value)
341+
elif var_type in datatypes.FLOAT_TYPES:
342+
return float(value)
343+
else:
344+
# COB-ID can contain '$NODEID+' so replace this with node_id before converting
345+
value = value.replace(" ", "").upper()
346+
if '$NODEID' in value:
347+
if node_id is None:
348+
logger.warn("Cannot convert value with $NODEID, skipping conversion")
349+
return None
350+
else:
351+
return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id
352+
else:
353+
if var_type in datatypes.SIGNED_TYPES:
354+
return _signed_int_from_hex(value, _calc_bit_length(var_type))
355+
else:
356+
return int(value, 0)

0 commit comments

Comments
 (0)