Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions sdk/basyx/aas/util/traversal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2025 the Eclipse BaSyx Authors
# Copyright (c) 2026 the Eclipse BaSyx Authors
#
# This program and the accompanying materials are made available under the terms of the MIT License, available in
# the LICENSE file of this project.
Expand All @@ -8,27 +8,38 @@
A module with helper functions for traversing AAS object structures.
"""

from typing import Union, Iterator
from typing import Iterable, Iterator, Union

from .. import model


def _walk_submodel_helper(elements: Iterable[model.SubmodelElement]) -> Iterator[model.SubmodelElement]:
for element in elements:
if isinstance(element, (model.SubmodelElementCollection, model.SubmodelElementList)):
yield from _walk_submodel_helper(element.value)
elif isinstance(element, model.Operation):
for var_list in (element.input_variable, element.output_variable, element.in_output_variable):
yield from _walk_submodel_helper(var_list)
elif isinstance(element, model.Entity):
yield from _walk_submodel_helper(element.statement)
yield element


def walk_submodel(collection: Union[model.Submodel, model.SubmodelElementCollection, model.SubmodelElementList]) \
-> Iterator[model.SubmodelElement]:
"""
Traverse the :class:`SubmodelElements <basyx.aas.model.submodel.SubmodelElement>` in a
:class:`~basyx.aas.model.submodel.Submodel`, :class:`~basyx.aas.model.submodel.SubmodelElementCollection` or a
:class:`~basyx.aas.model.submodel.SubmodelElementList` recursively in post-order tree-traversal.
:class:`~basyx.aas.model.submodel.Submodel`, :class:`~basyx.aas.model.submodel.SubmodelElementCollection`,
:class:`~basyx.aas.model.submodel.SubmodelElementList`, :class:`~basyx.aas.model.submodel.Entity`
(via ``statement``) or :class:`~basyx.aas.model.submodel.Operation` (via ``input_variable``, ``output_variable``,
``in_output_variable``) recursively in post-order tree-traversal.

This is a generator function, yielding all the :class:`SubmodelElements <basyx.aas.model.submodel.SubmodelElement>`.
No :class:`SubmodelElements <basyx.aas.model.submodel.SubmodelElement>` should be added, removed or
moved while iterating, as this could result in undefined behaviour.
"""
elements = collection.submodel_element if isinstance(collection, model.Submodel) else collection.value
for element in elements:
if isinstance(element, (model.SubmodelElementCollection, model.SubmodelElementList)):
yield from walk_submodel(element)
yield element
yield from _walk_submodel_helper(elements)


def walk_semantic_ids_recursive(root: model.Referable) -> Iterator[model.Reference]:
Expand Down
120 changes: 120 additions & 0 deletions sdk/test/util/test_traversal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2026 the Eclipse BaSyx Authors
#
# This program and the accompanying materials are made available under the terms of the MIT License, available in
# the LICENSE file of this project.
#
# SPDX-License-Identifier: MIT

import unittest
from typing import List

from basyx.aas import model
from basyx.aas.util.traversal import walk_submodel


class TestWalkSubmodel(unittest.TestCase):
def _submodel(self, *elements: model.SubmodelElement) -> model.Submodel:
return model.Submodel("test-submodel", submodel_element=list(elements))

def test_flat_submodel(self):
prop = model.Property("prop", model.datatypes.String)
sm = self._submodel(prop)
result = list(walk_submodel(sm))
self.assertEqual([prop], result)

def test_collection_post_order(self):
child1 = model.Property("child1", model.datatypes.String)
child2 = model.Property("child2", model.datatypes.String)
coll = model.SubmodelElementCollection("coll", value=[child1, child2])
sm = self._submodel(coll)
result = list(walk_submodel(sm))
# post-order: children before parent
self.assertEqual([child1, child2, coll], result)

def test_list_post_order(self):
child = model.Property(None, model.datatypes.String)
sml = model.SubmodelElementList("sml", type_value_list_element=model.Property,
value_type_list_element=model.datatypes.String, value=[child])
sm = self._submodel(sml)
result = list(walk_submodel(sm))
self.assertEqual([child, sml], result)

def test_entity_statement_post_order(self):
stmt = model.Property("stmt", model.datatypes.String)
entity = model.Entity("entity", model.EntityType.CO_MANAGED_ENTITY, statement=[stmt])
sm = self._submodel(entity)
result = list(walk_submodel(sm))
# post-order: statement element before Entity
self.assertEqual([stmt, entity], result)

def test_entity_empty_statement(self):
entity = model.Entity("entity", model.EntityType.CO_MANAGED_ENTITY)
sm = self._submodel(entity)
result = list(walk_submodel(sm))
self.assertEqual([entity], result)

def test_operation_variables_post_order(self):
in_var = model.Property("in_var", model.datatypes.String)
out_var = model.Property("out_var", model.datatypes.String)
inout_var = model.Property("inout_var", model.datatypes.String)
op = model.Operation("op", input_variable=[in_var], output_variable=[out_var],
in_output_variable=[inout_var])
sm = self._submodel(op)
result = list(walk_submodel(sm))
# post-order: all variable elements before Operation
self.assertEqual([in_var, out_var, inout_var, op], result)

def test_operation_empty_variables(self):
op = model.Operation("op")
sm = self._submodel(op)
result = list(walk_submodel(sm))
self.assertEqual([op], result)

def test_collection_inside_entity_statement(self):
inner = model.Property("inner", model.datatypes.String)
coll = model.SubmodelElementCollection("coll", value=[inner])
entity = model.Entity("entity", model.EntityType.CO_MANAGED_ENTITY, statement=[coll])
sm = self._submodel(entity)
result = list(walk_submodel(sm))
# post-order: inner → coll → entity
self.assertEqual([inner, coll, entity], result)

def test_entity_inside_operation_input_variable(self):
stmt = model.Property("stmt", model.datatypes.String)
entity = model.Entity("entity", model.EntityType.CO_MANAGED_ENTITY, statement=[stmt])
op = model.Operation("op", input_variable=[entity])
sm = self._submodel(op)
result = list(walk_submodel(sm))
# post-order: stmt → entity → op
self.assertEqual([stmt, entity, op], result)

def test_walk_from_collection(self):
prop = model.Property("prop", model.datatypes.String)
entity = model.Entity("entity", model.EntityType.CO_MANAGED_ENTITY, statement=[prop])
coll = model.SubmodelElementCollection("coll", value=[entity])
# walk_submodel(coll) yields the contents of coll, not coll itself
result = list(walk_submodel(coll))
self.assertEqual([prop, entity], result)

def test_walk_from_list(self):
op = model.Operation(None)
sml = model.SubmodelElementList("sml", type_value_list_element=model.Operation, value=[op])
# walk_submodel(sml) yields the contents of sml, not sml itself
result = list(walk_submodel(sml))
self.assertEqual([op], result)

def test_file_inside_entity_is_found(self):
"""Regression test for issue #423: File inside Entity.statement must be yielded."""
f = model.File("file", content_type="application/pdf", value="/some/file.pdf")
entity = model.Entity("entity", model.EntityType.CO_MANAGED_ENTITY, statement=[f])
sm = self._submodel(entity)
files: List[model.File] = [e for e in walk_submodel(sm) if isinstance(e, model.File)]
self.assertEqual([f], files)

def test_file_inside_operation_variable_is_found(self):
"""Regression test for issue #423: File inside Operation variable must be yielded."""
f = model.File("file", content_type="application/pdf", value="/some/file.pdf")
op = model.Operation("op", input_variable=[f])
sm = self._submodel(op)
files: List[model.File] = [e for e in walk_submodel(sm) if isinstance(e, model.File)]
self.assertEqual([f], files)