Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
/output
/input

/src/schema2validataclass/_version.py
/tests/test_schema/output
24 changes: 0 additions & 24 deletions src/schema2validataclass/_version.py

This file was deleted.

120 changes: 97 additions & 23 deletions src/schema2validataclass/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import subprocess # noqa: S404
from pathlib import Path
from typing import Callable
from urllib.request import urlopen

from schema2validataclass.common.helper import to_snake_case
Expand All @@ -23,7 +24,7 @@
from schema2validataclass.output.dataclass_outputs import DATACLASS_OUTPUT_CLASSES, DataclassObjectOutput
from schema2validataclass.output.pydantic_outputs import PYDANTIC_OUTPUT_CLASSES, PydanticObjectOutput
from schema2validataclass.output.validataclass_outputs import VALIDATACLASS_OUTPUT_CLASSES, ValidataclassObjectOutput
from schema2validataclass.schema.models import BaseField, Object, Schema
from schema2validataclass.schema.models import Array, BaseField, Object, Reference, Schema, get_reference_uris

logger = logging.getLogger(__name__)

Expand All @@ -44,29 +45,43 @@ def generate(self, schema_uri: URI, output_path: Path):
}
object_output_class, output_classes = output_format_map[self.config.output_format]

main_schema_dict = self.read_schema(schema_uri)

main_schema = Schema(main_schema_dict, uri=schema_uri)
schema_objects: dict[URI, Schema] = {schema_uri: main_schema}
schemas_to_load: list[URI] = main_schema.get_reference_base_uris()
while len(schemas_to_load):
child_schema = schemas_to_load.pop()
logger.info(f'parsing {child_schema} ...')
child_schema_dict = self.read_schema(child_schema)
child_schema_object = Schema(child_schema_dict, uri=child_schema)
schema_objects[child_schema] = child_schema_object
for reference_uri in child_schema_object.get_reference_base_uris():
if reference_uri not in schema_objects:
schemas_to_load.append(reference_uri)

# Check Reference Uniqueness and generate referencable fields
# Schema file cache: each file is read and parsed only once
schema_cache: dict[URI, Schema] = {}

main_schema = self.get_or_load_schema(schema_cache, schema_uri)

# Build referencable_fields by walking the reference tree from the main schema
referencable_fields: dict[URI, BaseField] = {}
for schema_object in list(schema_objects.values()):
for field in schema_object.properties + schema_object.definitions:
if field.uri in referencable_fields:
logger.warning(f'Duplicate field: {field.uri}')
continue
referencable_fields[field.uri] = field

# Main schema's contained_object properties are always included
for field in main_schema.properties:
referencable_fields[field.uri] = field

# Tree traversal: follow references starting from the main schema's properties
refs_to_process: list[URI] = get_reference_uris(main_schema.properties)
processed_refs: set[URI] = set()

while refs_to_process:
ref_uri = refs_to_process.pop()
if ref_uri in processed_refs:
continue
processed_refs.add(ref_uri)

base_uri = URI.from_uri_without_json_path(ref_uri)
schema = self.get_or_load_schema(schema_cache, base_uri)

field = schema.get_field_by_uri(ref_uri)
if field is None:
logger.warning(f'Referenced field not found: {ref_uri}')
continue

if ref_uri in referencable_fields:
logger.warning(f'Duplicate field: {ref_uri}')
continue
referencable_fields[ref_uri] = field

# Discover further references from this field
refs_to_process.extend(get_reference_uris([field]))

main_object_output = object_output_class(
main_schema.contained_object,
Expand Down Expand Up @@ -109,6 +124,65 @@ def generate(self, schema_uri: URI, output_path: Path):

self._run_post_processing(output_path)

def _is_ignored_reference(self, ref_uri: URI) -> bool:
ref_str = str(ref_uri)
for pattern in self.config.ignore_references:
if ref_str.endswith(pattern):
return True
return False

def get_or_load_schema(self, schema_cache: dict[URI, Schema], base_uri: URI) -> Schema:
if base_uri not in schema_cache:
logger.info(f'parsing {base_uri} ...')
schema_dict = self.read_schema(base_uri)
schema = Schema(schema_dict, uri=base_uri)
self._apply_ignore_paths(schema)
self._apply_ignore_references(schema)
schema_cache[base_uri] = schema
return schema_cache[base_uri]

def _apply_ignore_paths(self, schema: Schema) -> None:
if not self.config.ignore_paths:
return
if schema.contained_object:
self._filter_object_properties(schema.contained_object, self._is_ignored_path)
for definition in schema.definitions:
if isinstance(definition, Object):
self._filter_object_properties(definition, self._is_ignored_path)

def _apply_ignore_references(self, schema: Schema) -> None:
if not self.config.ignore_references:
return
if schema.contained_object:
self._filter_object_properties(schema.contained_object, self._is_ignored_reference_property)
for definition in schema.definitions:
if isinstance(definition, Object):
self._filter_object_properties(definition, self._is_ignored_reference_property)

def _filter_object_properties(self, obj: Object, predicate: Callable) -> None:
obj.properties = [prop for prop in obj.properties if not predicate(prop)]
for prop in obj.properties:
if isinstance(prop, Object):
self._filter_object_properties(prop, predicate)
if isinstance(prop, Array) and isinstance(prop.items, Object):
self._filter_object_properties(prop.items, predicate)

def _is_ignored_path(self, field: BaseField) -> bool:
uri_str = str(field.uri)
for pattern in self.config.ignore_paths:
if uri_str.endswith(pattern):
logger.info(f'skipping ignored path {field.uri}')
return True
return False

def _is_ignored_reference_property(self, field: BaseField) -> bool:
ref = field
if isinstance(ref, Array):
ref = ref.items
if not isinstance(ref, Reference):
return False
return self._is_ignored_reference(ref.to)

@staticmethod
def _get_referenced_object_name(output: BaseOutput) -> str | None:
if isinstance(output, NestedObjectBaseOutput):
Expand Down
9 changes: 6 additions & 3 deletions src/schema2validataclass/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ class PostProcessing(Enum):
class Config:
unset_value_output: UnsetValueOutput = UnsetValueOutput.UNSET_VALUE
object_postfix: str = 'Input'
ignored_uris: list[str] = field(
default_factory=list,
)
output_format: OutputFormat = OutputFormat.VALIDATACLASS
set_validataclass_mixin: bool = True
renamed_properties: list[str] = field(
default_factory=lambda: keyword.kwlist,
)
ignore_references: list[str] = field(
default_factory=list,
)
ignore_paths: list[str] = field(
default_factory=list,
)
detect_looping_references: bool = True
post_processing: list[PostProcessing] = field(
default_factory=lambda: [PostProcessing.RUFF_FORMAT, PostProcessing.RUFF_CHECK],
Expand Down
3 changes: 0 additions & 3 deletions src/schema2validataclass/output/base_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,6 @@ def __init__(self, field: Object, config: Config, referencable_fields: dict[URI,
references.append(field)
field = follow_reference(field, referencable_fields=referencable_fields)

if str(field.uri.json_path) in self.config.ignored_uris:
continue

if field is None:
continue

Expand Down
35 changes: 21 additions & 14 deletions src/schema2validataclass/schema/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ def get_objects(self) -> list['Object']:
result.extend(field.items.get_objects())
return result

def get_reference_base_uris(self) -> list[URI]:
return get_reference_base_uris(self.properties)
def get_reference_uris(self) -> list[URI]:
return get_reference_uris(self.properties)


@dataclass(kw_only=True, init=False)
Expand All @@ -212,12 +212,22 @@ def __init__(self, schema: dict, uri: URI):
def properties(self) -> list[BaseField]:
return self.contained_object.properties if self.contained_object else []

def get_reference_base_uris(self) -> list[URI]:
reference_uris = get_reference_base_uris(self.definitions)
def get_reference_uris(self) -> list[URI]:
reference_uris = get_reference_uris(self.definitions)
if self.contained_object:
reference_uris.extend(self.contained_object.get_reference_base_uris())
reference_uris.extend(self.contained_object.get_reference_uris())
return reference_uris

def get_field_by_uri(self, uri: URI) -> BaseField | None:
for field in self.definitions:
if field.uri == uri:
return field
if self.contained_object:
for field in self.contained_object.properties:
if field.uri == uri:
return field
return None


def parse_schema(schema: dict, **kwargs) -> BaseField:
# Special cases without type
Expand All @@ -243,17 +253,14 @@ def parse_schema(schema: dict, **kwargs) -> BaseField:
raise ValueError(f'Unsupported type: {schema.get("type")}')


def get_reference_base_uris(fields: list[BaseField]) -> list[URI]:
def get_reference_uris(fields: list[BaseField]) -> list[URI]:
result: list[URI] = []
for field in fields:
# Incrementally look in children
if isinstance(field, Object):
result.extend(field.get_reference_base_uris())

# Get References
if isinstance(field, Reference) and field.uri is not None:
result.append(URI.from_uri_without_json_path(field.to))
if isinstance(field, Array) and isinstance(field.items, Reference) and field.items.uri is not None:
result.append(URI.from_uri_without_json_path(field.items.to))
result.extend(field.get_reference_uris())
if isinstance(field, Reference):
result.append(field.to)
if isinstance(field, Array) and isinstance(field.items, Reference):
result.append(field.items.to)

return list(set(result))
94 changes: 94 additions & 0 deletions tests/integration/dataclass/chained_schemas_ignore_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
Copyright 2026 binary butterfly GmbH
Use of this source code is governed by an MIT-style license that can be found in the LICENSE.txt.
"""

from pathlib import Path

from schema2validataclass import App
from schema2validataclass.common.uri import URI
from schema2validataclass.config import Config, OutputFormat
from tests.integration.dataclass.helpers import INPUT_DIR, generated_files

SCHEMA_PATH = INPUT_DIR / 'chained_schemas_ignore' / 'main_schema.json'


def run_generate(schema_path: Path, output_path: Path, **config_kwargs):
config = Config(output_format=OutputFormat.DATACLASS, **config_kwargs)
app = App(config=config)
app.generate(URI(file_path=schema_path), output_path)


def test_without_ignoring_generates_all_files(tmp_path: Path):
run_generate(SCHEMA_PATH, tmp_path)
assert generated_files(tmp_path) == {
'__init__.py',
'simple_schema_input.py',
'second_object_input.py',
'third_object_input.py',
'ignored_object_input.py',
}


def test_ignored_reference_not_loaded(tmp_path: Path):
run_generate(SCHEMA_PATH, tmp_path, ignore_references=['third_schema.json#/definitions/IgnoredObject'])
assert generated_files(tmp_path) == {
'__init__.py',
'simple_schema_input.py',
'second_object_input.py',
'third_object_input.py',
}


def test_ignored_reference_property_removed_from_parent(tmp_path: Path):
run_generate(SCHEMA_PATH, tmp_path, ignore_references=['third_schema.json#/definitions/IgnoredObject'])
content = (tmp_path / 'second_object_input.py').read_text()

assert 'IgnoredObject' not in content
assert 'ThirdObject' in content


def test_third_object_still_works(tmp_path: Path):
run_generate(SCHEMA_PATH, tmp_path, ignore_references=['third_schema.json#/definitions/IgnoredObject'])
content = (tmp_path / 'third_object_input.py').read_text()

assert 'class ThirdObjectInput:' in content
assert 'third_string' in content


def test_ignored_path_not_loaded(tmp_path: Path):
run_generate(
SCHEMA_PATH,
tmp_path,
ignore_paths=['second_schema.json#/definitions/SecondObject/properties/IgnoredObject'],
)
assert generated_files(tmp_path) == {
'__init__.py',
'simple_schema_input.py',
'second_object_input.py',
'third_object_input.py',
}


def test_ignored_path_property_removed_from_parent(tmp_path: Path):
run_generate(
SCHEMA_PATH,
tmp_path,
ignore_paths=['second_schema.json#/definitions/SecondObject/properties/IgnoredObject'],
)
content = (tmp_path / 'second_object_input.py').read_text()

assert 'IgnoredObject' not in content
assert 'ThirdObject' in content


def test_ignored_path_third_object_still_works(tmp_path: Path):
run_generate(
SCHEMA_PATH,
tmp_path,
ignore_paths=['second_schema.json#/definitions/SecondObject/properties/IgnoredObject'],
)
content = (tmp_path / 'third_object_input.py').read_text()

assert 'class ThirdObjectInput:' in content
assert 'third_string' in content
Loading
Loading