diff --git a/class_generator/README.md b/class_generator/README.md index be86fe944e..c4e4f4bf7c 100644 --- a/class_generator/README.md +++ b/class_generator/README.md @@ -155,18 +155,55 @@ class-generator --kind Pod --add-tests ## Update schema files -- Dependencies - - Kubernetes/Openshift cluster - - [oc](https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/stable/) or [kubectl](https://kubernetes.io/docs/tasks/tools/) (latest version) - - [uv](https://github.com/astral-sh/uv) +Schema files contain resource definitions used by the class generator. You can update these from a connected Kubernetes/OpenShift cluster. -- Clone this repository +### Dependencies + +- Kubernetes/OpenShift cluster +- [oc](https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/stable/) or [kubectl](https://kubernetes.io/docs/tasks/tools/) (latest version) +- [uv](https://github.com/astral-sh/uv) + +### Setup + +Clone this repository: ```bash git clone https://github.com/RedHatQE/openshift-python-wrapper.git cd openshift-python-wrapper ``` -- Login to the cluster use admin user and password. +Login to the cluster using admin user and password. + +### Full schema update + +Update the entire schema from the connected cluster: + +```bash +class-generator --update-schema +``` + +This fetches all resource schemas from the cluster and updates the local cache. + +**Note:** If connected to an older cluster, existing schemas are preserved and only missing resources are added. + +### Single resource schema update + +Update the schema for a single resource without affecting others: + +```bash +class-generator --update-schema-for LlamaStackDistribution +``` + +This is useful when: + +- Connected to an older cluster but need to update a specific CRD +- A new operator was installed and you need its resource schema +- You want to refresh just one resource without a full update + +After updating the schema, regenerate the class: +```bash +class-generator --kind LlamaStackDistribution --overwrite ``` + +**Note:** `--update-schema` and `--update-schema-for` are mutually exclusive. Use one or the other, not both. diff --git a/class_generator/cli.py b/class_generator/cli.py index b837ed3a6b..05d643b771 100644 --- a/class_generator/cli.py +++ b/class_generator/cli.py @@ -10,14 +10,19 @@ from typing import Any import cloup -from cloup.constraints import If, IsSet, accept_none, require_one +from cloup.constraints import If, IsSet, accept_none, mutually_exclusive, require_one from simple_logger.logger import get_logger from class_generator.constants import TESTS_MANIFESTS_DIR from class_generator.core.coverage import analyze_coverage, generate_report from class_generator.core.discovery import discover_generated_resources from class_generator.core.generator import class_generator -from class_generator.core.schema import ClusterVersionError, update_kind_schema +from class_generator.core.schema import ( + ClusterVersionError, + update_kind_schema, + update_single_resource_schema, +) +from class_generator.exceptions import ResourceNotFoundError from class_generator.tests.test_generation import generate_class_generator_tests from class_generator.utils import execute_parallel_tasks from ocp_resources.utils.utils import convert_camel_case_to_snake_case @@ -28,16 +33,25 @@ def validate_actions( kind: str | None, update_schema: bool, + update_schema_for: str | None, discover_missing: bool, coverage_report: bool, generate_missing: bool, regenerate_all: bool, ) -> None: """Validate that at least one action is specified.""" - actions = [kind, update_schema, discover_missing, coverage_report, generate_missing, regenerate_all] + actions = [ + kind, + update_schema, + update_schema_for, + discover_missing, + coverage_report, + generate_missing, + regenerate_all, + ] if not any(actions): LOGGER.error( - "At least one action must be specified (--kind, --update-schema, --discover-missing, --coverage-report, --generate-missing, or --regenerate-all)" + "At least one action must be specified (--kind, --update-schema, --update-schema-for, --discover-missing, --coverage-report, --generate-missing, or --regenerate-all)" ) sys.exit(1) @@ -70,6 +84,33 @@ def handle_schema_update(update_schema: bool, generate_missing: bool) -> bool: return True +def handle_single_schema_update(update_schema_for: str | None) -> bool: + """ + Handle single resource schema update operations. + + Args: + update_schema_for: The kind name to update schema for, or None + + Returns: + True if processing should continue, False if it should exit + """ + if update_schema_for: + LOGGER.info(f"Updating schema for single resource: {update_schema_for}") + try: + update_single_resource_schema(kind=update_schema_for) + except ResourceNotFoundError as e: + LOGGER.error(f"Resource not found: {e}") + sys.exit(1) + except (OSError, RuntimeError) as e: + LOGGER.exception(f"Failed to update schema for {update_schema_for}: {e}") + sys.exit(1) + + LOGGER.info(f"Schema updated for {update_schema_for}.") + return False + + return True + + def handle_coverage_analysis_and_reporting( coverage_report: bool, discover_missing: bool, @@ -491,6 +532,17 @@ def handle_test_generation(add_tests: bool) -> None: is_flag=True, show_default=True, ) +@cloup.option( + "--update-schema-for", + type=cloup.STRING, + help=""" + \b + Update schema for a single resource kind only, without affecting other resources. + Useful when connected to an older cluster but needing to update a specific CRD. + Example: --update-schema-for LlamaStackDistribution + Cannot be used together with --update-schema. +""", +) @cloup.option( "--discover-missing", help="Discover resources in the cluster that don't have wrapper classes", @@ -555,6 +607,20 @@ def handle_test_generation(add_tests: bool) -> None: "regenerate_all", ], ) +@cloup.constraint( + mutually_exclusive, + ["update_schema", "update_schema_for"], +) +@cloup.constraint( + If(IsSet("update_schema_for"), then=accept_none), + [ + "kind", + "discover_missing", + "coverage_report", + "generate_missing", + "regenerate_all", + ], +) @cloup.constraint( If(IsSet("backup"), then=require_one), ["regenerate_all", "overwrite"], @@ -577,6 +643,7 @@ def main( filter: str | None, json_output: bool, update_schema: bool, + update_schema_for: str | None, verbose: bool, ) -> None: """Generate Python module for K8S resource.""" @@ -610,13 +677,18 @@ def main( validate_actions( kind=kind, update_schema=update_schema, + update_schema_for=update_schema_for, discover_missing=discover_missing, coverage_report=coverage_report, generate_missing=generate_missing, regenerate_all=regenerate_all, ) - # Handle schema update + # Handle single resource schema update (if specified) + if not handle_single_schema_update(update_schema_for=update_schema_for): + return + + # Handle full schema update if not handle_schema_update(update_schema=update_schema, generate_missing=generate_missing): return diff --git a/class_generator/core/schema.py b/class_generator/core/schema.py index 38bf371078..61cf78ac9d 100644 --- a/class_generator/core/schema.py +++ b/class_generator/core/schema.py @@ -12,6 +12,7 @@ from simple_logger.logger import get_logger from class_generator.constants import DEFINITIONS_FILE, RESOURCES_MAPPING_FILE, SCHEMA_DIR +from class_generator.exceptions import ResourceNotFoundError from class_generator.utils import execute_parallel_tasks, execute_parallel_with_mapping from ocp_resources.utils.archive_utils import save_json_archive from ocp_resources.utils.schema_validator import SchemaValidator @@ -503,6 +504,64 @@ def create_fetch_task(api_path: str) -> tuple[str, dict[str, Any] | None]: return schemas +def _build_schema_data( + def_data: dict[str, Any], + gvk: dict[str, str], + is_namespaced: bool, +) -> dict[str, Any]: + """Build schema data dictionary from definition data and GVK info. + + Args: + def_data: The definition data from the API schema + gvk: The group-version-kind dictionary + is_namespaced: Whether the resource is namespaced + + Returns: + Schema data dictionary in the format expected by SchemaValidator + """ + return { + "description": def_data.get("description", ""), + "properties": def_data.get("properties", {}), + "required": def_data.get("required", []), + "type": def_data.get("type", "object"), + "x-kubernetes-group-version-kind": [gvk], + "namespaced": is_namespaced, + } + + +def _merge_schema_into_mapping( + resources_mapping: dict[str, Any], + kind_lower: str, + group: str, + version: str, + schema_data: dict[str, Any], +) -> tuple[bool, str]: + """Merge a schema into the resources mapping. + + Args: + resources_mapping: The resources mapping dictionary to update + kind_lower: The lowercase kind name + group: The API group + version: The API version + schema_data: The schema data to merge + + Returns: + Tuple of (was_updated, action_description) where action is 'updated', 'added_version', or 'added_new' + """ + if kind_lower in resources_mapping: + existing_schemas = resources_mapping[kind_lower] + for i, existing_schema in enumerate(existing_schemas): + existing_gvk = existing_schema.get("x-kubernetes-group-version-kind", [{}])[0] + if existing_gvk.get("group") == group and existing_gvk.get("version") == version: + existing_schemas[i] = schema_data + return True, "updated" + existing_schemas.append(schema_data) + return True, "added_version" + else: + resources_mapping[kind_lower] = [schema_data] + return True, "added_new" + + def process_schema_definitions( schemas: dict[str, dict[str, Any]], namespacing_dict: dict[str, bool], @@ -579,51 +638,27 @@ def process_schema_definitions( else: schema_name = f"{version}/{kind}" - # Create schema data in the format expected by SchemaValidator - schema_data = { - "description": def_data.get("description", ""), - "properties": def_data.get("properties", {}), - "required": def_data.get("required", []), - "type": def_data.get("type", "object"), - "x-kubernetes-group-version-kind": [group_kind_version], - "namespaced": is_namespaced, - } + # Create schema data using helper function + schema_data = _build_schema_data(def_data, group_kind_version, is_namespaced) # Store in resources_mapping as an array (multiple schemas per kind) kind_lower = kind.lower() if kind_lower not in resources_mapping: - # NEW resource - always add - resources_mapping[kind_lower] = [schema_data] + # NEW resource - always add using merge helper + _merge_schema_into_mapping(resources_mapping, kind_lower, group, version, schema_data) new_resources += 1 LOGGER.debug(f"Added new resource: {kind_lower}") elif allow_updates: - # UPDATE existing resource - replace schema with same group/version or add new one - existing_schemas = resources_mapping[kind_lower] - updated = False - - for i, existing_schema in enumerate(existing_schemas): - existing_gvk = existing_schema.get("x-kubernetes-group-version-kind", [{}])[0] - new_gvk = group_kind_version - - # Check if this is the same group/version combination - if existing_gvk.get("group") == new_gvk.get("group") and existing_gvk.get("version") == new_gvk.get( - "version" - ): - # UPDATE: Replace existing schema with newer version - existing_schemas[i] = schema_data - updated = True - updated_resources += 1 - LOGGER.debug( - f"Updated existing resource schema: {kind_lower} ({new_gvk.get('group', 'core')}/{new_gvk.get('version')})" - ) - break - - if not updated: - # ADD: New group/version for existing resource kind - existing_schemas.append(schema_data) - updated_resources += 1 + # UPDATE existing resource using merge helper + _was_updated, action = _merge_schema_into_mapping( + resources_mapping, kind_lower, group, version, schema_data + ) + updated_resources += 1 + if action == "updated": + LOGGER.debug(f"Updated existing resource schema: {kind_lower} ({group or 'core'}/{version})") + else: LOGGER.debug( - f"Added new schema version for existing resource: {kind_lower} ({group_kind_version.get('group', 'core')}/{group_kind_version.get('version')})" + f"Added new schema version for existing resource: {kind_lower} ({group or 'core'}/{version})" ) else: # Don't update existing resources when cluster version is older @@ -1458,22 +1493,18 @@ def _determine_update_strategy(client: str, resources_mapping: dict[Any, Any]) - return UpdateStrategy(should_update=should_update, missing_resources=missing_resources, need_v3_index=need_v3_index) -def _fetch_api_index_if_needed(client: str, need_v3_index: bool) -> dict[str, Any]: - """Fetch OpenAPI v3 index if needed. +def _fetch_openapi_v3_index(client: str) -> dict[str, Any]: + """Fetch OpenAPI v3 index from the cluster. Args: client: The client binary path - need_v3_index: Whether to fetch the index Returns: - Dictionary of API paths or empty dict if not needed + Dictionary of API paths Raises: - RuntimeError: If fetching the index fails when needed + RuntimeError: If fetching the index fails """ - if not need_v3_index: - return {} - LOGGER.info("Fetching OpenAPI v3 index...") success, v3_data, _ = run_command(command=shlex.split(f"{client} get --raw /openapi/v3"), check=False) if not success: @@ -1487,6 +1518,25 @@ def _fetch_api_index_if_needed(client: str, need_v3_index: bool) -> dict[str, An return paths +def _fetch_api_index_if_needed(client: str, need_v3_index: bool) -> dict[str, Any]: + """Fetch OpenAPI v3 index if needed. + + Args: + client: The client binary path + need_v3_index: Whether to fetch the index + + Returns: + Dictionary of API paths or empty dict if not needed + + Raises: + RuntimeError: If fetching the index fails when needed + """ + if not need_v3_index: + return {} + + return _fetch_openapi_v3_index(client) + + def _fetch_schemas_based_on_strategy(client: str, strategy: UpdateStrategy, paths: dict[str, Any]) -> dict[str, Any]: """Fetch schemas based on the determined update strategy. @@ -1612,3 +1662,141 @@ def update_kind_schema(client: str | None = None) -> None: LOGGER.info( "No schema files updated - no schemas were fetched (either due to older cluster version with no missing resources, or transient fetch failures)" ) + + +def update_single_resource_schema(kind: str, client: str | None = None) -> None: + """Update schema for a single resource kind without affecting other resources. + + This function is useful when connected to an older cluster but needing to update + a specific CRD (e.g., a newly installed operator's resources). + + Args: + kind: The resource Kind to update (e.g., 'LlamaStackDistribution') + client: Path to kubectl/oc client binary. If None, will auto-detect. + + Raises: + ResourceNotFoundError: If the kind is not found on the cluster + RuntimeError: If fetching the schema fails + """ + if client is None: + client = get_client_binary() + + LOGGER.info(f"Updating schema for single resource: {kind}") + + # Get dynamic resource-to-API mapping to find the API path for this kind + resource_to_api_mapping = build_dynamic_resource_to_api_mapping(client=client) + + if kind not in resource_to_api_mapping: + raise ResourceNotFoundError( + kind=kind, + message=( + f"Resource kind '{kind}' not found on the cluster. " + f"Ensure the CRD is installed and the kind name is correct (case-sensitive)." + ), + ) + + api_paths_for_kind = resource_to_api_mapping[kind] + LOGGER.info(f"Found API paths for {kind}: {api_paths_for_kind}") + + # Fetch OpenAPI v3 index using shared helper function + paths = _fetch_openapi_v3_index(client) + + # Filter to only the API paths for this specific kind + filter_paths = set() + for api_path in api_paths_for_kind: + if api_path in paths: + filter_paths.add(api_path) + else: + LOGGER.warning(f"API path {api_path} not found in cluster paths") + + if not filter_paths: + raise RuntimeError( + f"None of the API paths for {kind} were found in the cluster. Expected paths: {api_paths_for_kind}" + ) + + LOGGER.info(f"Fetching schemas from {len(filter_paths)} API path(s): {filter_paths}") + + # Fetch only the relevant API schemas + schemas = fetch_all_api_schemas(client=client, paths=paths, filter_paths=filter_paths) + + if not schemas: + raise RuntimeError( + f"Failed to fetch schema for {kind} from API paths {filter_paths}. " + f"The API server returned no schema data. Check cluster connectivity and permissions." + ) + + # Build namespacing dictionary by reusing the existing function and filtering to just this resource + full_namespacing_dict = build_namespacing_dict(client=client) + if kind in full_namespacing_dict: + namespacing_dict = {kind: full_namespacing_dict[kind]} + else: + LOGGER.warning(f"Could not determine if {kind} is namespaced, defaulting to True") + namespacing_dict = {kind: True} + + LOGGER.info(f"Resource {kind} is namespaced: {namespacing_dict[kind]}") + + # Load existing resources mapping + resources_mapping = read_resources_mapping_file(skip_cache=True) + existing_count = len(resources_mapping) + LOGGER.info(f"Loaded {existing_count} existing resources from mapping") + + # Process schema definitions for this specific kind only + kind_lower = kind.lower() + updated = False + new_schemas_for_kind = [] + + for _, schema in schemas.items(): + for _def_name, def_data in schema.get("components", {}).get("schemas", {}).items(): + gvk_list = def_data.get("x-kubernetes-group-version-kind", []) + if not gvk_list: + continue + + # Check if this definition is for our target kind + for gvk in gvk_list: + if gvk.get("kind") == kind: + group = gvk.get("group", "") + version = gvk.get("version", "") + + # Create schema data using helper function + is_namespaced = namespacing_dict.get(kind, True) + schema_data = _build_schema_data(def_data, gvk, is_namespaced) + + new_schemas_for_kind.append((group, version, schema_data)) + LOGGER.debug(f"Found schema for {kind} (group={group}, version={version})") + break + + if not new_schemas_for_kind: + raise RuntimeError( + f"No schema definition found for {kind} in the fetched API schemas. " + f"The resource exists on the cluster but its schema could not be extracted. " + f"This may indicate an issue with the CRD definition or API server configuration." + ) + + # Update or add the resource in the mapping using the merge helper + updated = False + for group, version, new_schema_data in new_schemas_for_kind: + _was_updated, action = _merge_schema_into_mapping( + resources_mapping, kind_lower, group, version, new_schema_data + ) + updated = True + if action == "updated": + LOGGER.info(f"Updated existing schema for {kind} ({group}/{version})") + elif action == "added_version": + LOGGER.info(f"Added new schema version for {kind} ({group}/{version})") + else: # added_new + LOGGER.info(f"Added new resource {kind} ({group}/{version})") + + if updated: + # Save the updated resources mapping + try: + save_json_archive(resources_mapping, RESOURCES_MAPPING_FILE) + LOGGER.info(f"Successfully updated schema for {kind}") + LOGGER.info(f"Total resources in mapping: {len(resources_mapping)}") + except (OSError, TypeError) as e: + raise RuntimeError(f"Failed to save resources mapping: {e}") from e + + # Clear cached mapping data in SchemaValidator to force reload + SchemaValidator.clear_cache() + SchemaValidator.load_mappings_data() + else: + LOGGER.info(f"No schema updates needed for {kind} - schema is already up to date") diff --git a/class_generator/exceptions.py b/class_generator/exceptions.py new file mode 100644 index 0000000000..5737f20438 --- /dev/null +++ b/class_generator/exceptions.py @@ -0,0 +1,11 @@ +"""Custom exceptions for class generator.""" + + +class ResourceNotFoundError(Exception): + """Raised when a resource kind is not found.""" + + def __init__(self, kind: str, message: str | None = None) -> None: + self.kind = kind + if message is None: + message = f"Resource kind '{kind}' not found" + super().__init__(message) diff --git a/class_generator/parsers/explain_parser.py b/class_generator/parsers/explain_parser.py index d580352834..bc153fef6c 100644 --- a/class_generator/parsers/explain_parser.py +++ b/class_generator/parsers/explain_parser.py @@ -6,6 +6,7 @@ from class_generator.constants import MISSING_DESCRIPTION_STR from class_generator.core.schema import extract_group_kind_version, read_resources_mapping_file +from class_generator.exceptions import ResourceNotFoundError from class_generator.parsers.type_parser import get_property_schema, prepare_property_dict from class_generator.utils import get_latest_version from ocp_resources.resource import Resource @@ -13,14 +14,6 @@ LOGGER = get_logger(name=__name__) -class ResourceNotFoundError(Exception): - """Raised when a resource kind is not found in the schema definition.""" - - def __init__(self, kind: str) -> None: - self.kind = kind - super().__init__(f"Resource kind '{kind}' not found in schema definition") - - def parse_explain(kind: str) -> list[dict[str, Any]]: """ Parse OpenAPI explain data for a given resource kind. diff --git a/class_generator/tests/test_cli.py b/class_generator/tests/test_cli.py index ba35a5dbcd..bdf66051d2 100644 --- a/class_generator/tests/test_cli.py +++ b/class_generator/tests/test_cli.py @@ -6,6 +6,7 @@ from click.testing import CliRunner from class_generator.cli import main +from class_generator.core.schema import ResourceNotFoundError class TestCLIFunctionality: @@ -253,3 +254,46 @@ def test_required_options(self): result = runner.invoke(cli=main, args=[], catch_exceptions=False) assert result.exit_code != 0 # Error is printed to stderr, not stdout + + def test_update_schema_for_single_resource(self): + """Test --update-schema-for functionality.""" + runner = CliRunner() + + with patch("class_generator.cli.update_single_resource_schema") as mock_update: + mock_update.return_value = None + + result = runner.invoke(cli=main, args=["--update-schema-for", "LlamaStackDistribution"]) + + assert result.exit_code == 0 + mock_update.assert_called_once_with(kind="LlamaStackDistribution") + + def test_update_schema_for_mutual_exclusivity_with_update_schema(self): + """Test --update-schema-for and --update-schema are mutually exclusive.""" + runner = CliRunner() + + result = runner.invoke(cli=main, args=["--update-schema", "--update-schema-for", "Pod"]) + + # Should fail due to mutual exclusivity + assert result.exit_code != 0 + assert "mutually exclusive" in result.output + + def test_update_schema_for_cannot_combine_with_kind(self): + """Test that --update-schema-for cannot be combined with -k/--kind.""" + runner = CliRunner() + + result = runner.invoke(cli=main, args=["--update-schema-for", "Pod", "-k", "Pod"]) + + # Should fail due to constraint + assert result.exit_code != 0 + + def test_update_schema_for_resource_not_found(self): + """Test --update-schema-for with a non-existent resource.""" + runner = CliRunner() + + with patch("class_generator.cli.update_single_resource_schema") as mock_update: + mock_update.side_effect = ResourceNotFoundError(kind="FakeResource") + + result = runner.invoke(cli=main, args=["--update-schema-for", "FakeResource"], catch_exceptions=False) + + # Should exit with error + assert result.exit_code != 0 diff --git a/class_generator/tests/test_schema_new_functions.py b/class_generator/tests/test_schema_new_functions.py index 8dc6d1e8e5..def784c142 100644 --- a/class_generator/tests/test_schema_new_functions.py +++ b/class_generator/tests/test_schema_new_functions.py @@ -9,6 +9,7 @@ from class_generator.constants import RESOURCES_MAPPING_FILE from class_generator.core.schema import ( ClusterVersionError, + ResourceNotFoundError, _convert_type_to_schema, _detect_missing_refs_from_schemas, _infer_oc_explain_path, @@ -23,6 +24,7 @@ identify_missing_resources, process_schema_definitions, read_resources_mapping_file, + update_single_resource_schema, write_schema_files, ) @@ -56,19 +58,19 @@ class TestReadResourcesMappingFile: """Test read_resources_mapping_file function.""" @patch("class_generator.core.schema.SchemaValidator") - def test_successful_mapping_load(self, mock_schema_validator): + def test_successful_mapping_load(self, _mock_schema_validator): """Test successful loading of resources mapping.""" expected_mapping = {"pod": [{"kind": "Pod"}], "service": [{"kind": "Service"}]} - mock_schema_validator.load_mappings_data.return_value = True - mock_schema_validator.get_mappings_data.return_value = expected_mapping + _mock_schema_validator.load_mappings_data.return_value = True + _mock_schema_validator.get_mappings_data.return_value = expected_mapping result = read_resources_mapping_file() assert result == expected_mapping @patch("class_generator.core.schema.SchemaValidator") - def test_mapping_load_failure_returns_empty_dict(self, mock_schema_validator): + def test_mapping_load_failure_returns_empty_dict(self, _mock_schema_validator): """Test that empty dict is returned when mapping load fails.""" - mock_schema_validator.load_mappings_data.return_value = False + _mock_schema_validator.load_mappings_data.return_value = False result = read_resources_mapping_file() assert result == {} @@ -1391,3 +1393,190 @@ def test_convert_type_to_schema(self, input_type, expected_schema): """Test conversion of various types to schema format.""" result = _convert_type_to_schema(input_type) assert result == expected_schema + + +class TestUpdateSingleResourceSchema: + """Test update_single_resource_schema function.""" + + @patch("class_generator.core.schema.SchemaValidator") + @patch("class_generator.core.schema.save_json_archive") + @patch("class_generator.core.schema.read_resources_mapping_file") + @patch("class_generator.core.schema.fetch_all_api_schemas") + @patch("class_generator.core.schema.build_namespacing_dict") + @patch("class_generator.core.schema.run_command") + @patch("class_generator.core.schema.build_dynamic_resource_to_api_mapping") + @patch("class_generator.core.schema.get_client_binary") + def test_update_single_resource_schema_new_resource( + self, + mock_get_client, + mock_build_mapping, + mock_run_command, + mock_build_namespacing, + mock_fetch_schemas, + mock_read_mapping, + mock_save_archive, + _mock_schema_validator, + ): + """Test adding a new resource that doesn't exist in mapping.""" + mock_get_client.return_value = "oc" + mock_build_mapping.return_value = {"CustomResource": ["apis/custom.io/v1"]} + mock_run_command.return_value = ( + True, + '{"paths": {"apis/custom.io/v1": {"serverRelativeURL": "/openapi/v3/apis/custom.io/v1"}}}', + "", + ) + mock_build_namespacing.return_value = {"CustomResource": True} + mock_fetch_schemas.return_value = { + "apis/custom.io/v1": { + "components": { + "schemas": { + "io.custom.v1.CustomResource": { + "type": "object", + "description": "Custom resource description", + "properties": {"spec": {"type": "object"}}, + "x-kubernetes-group-version-kind": [ + {"group": "custom.io", "version": "v1", "kind": "CustomResource"} + ], + } + } + } + } + } + mock_read_mapping.return_value = {"pod": [{"x-kubernetes-group-version-kind": [{"kind": "Pod"}]}]} + + update_single_resource_schema(kind="CustomResource") + + # Verify save was called with updated mapping containing the new resource + mock_save_archive.assert_called_once() + saved_mapping = mock_save_archive.call_args[0][0] + assert "customresource" in saved_mapping + assert len(saved_mapping["customresource"]) == 1 + assert saved_mapping["customresource"][0]["namespaced"] is True + + @patch("class_generator.core.schema.get_client_binary") + @patch("class_generator.core.schema.build_dynamic_resource_to_api_mapping") + def test_update_single_resource_schema_kind_not_found(self, mock_build_mapping, mock_get_client): + """Test that ResourceNotFoundError is raised when kind is not on cluster.""" + mock_get_client.return_value = "oc" + mock_build_mapping.return_value = {"Pod": ["api/v1"], "Deployment": ["apis/apps/v1"]} + + with pytest.raises(ResourceNotFoundError) as exc_info: + update_single_resource_schema(kind="NonExistentResource") + + assert "NonExistentResource" in str(exc_info.value) + assert "not found on the cluster" in str(exc_info.value) + + @patch("class_generator.core.schema.SchemaValidator") + @patch("class_generator.core.schema.save_json_archive") + @patch("class_generator.core.schema.read_resources_mapping_file") + @patch("class_generator.core.schema.fetch_all_api_schemas") + @patch("class_generator.core.schema.build_namespacing_dict") + @patch("class_generator.core.schema.run_command") + @patch("class_generator.core.schema.build_dynamic_resource_to_api_mapping") + @patch("class_generator.core.schema.get_client_binary") + def test_update_single_resource_schema_updates_existing( + self, + mock_get_client, + mock_build_mapping, + mock_run_command, + mock_build_namespacing, + mock_fetch_schemas, + mock_read_mapping, + mock_save_archive, + _mock_schema_validator, + ): + """Test updating an existing resource schema.""" + mock_get_client.return_value = "oc" + mock_build_mapping.return_value = {"Pod": ["api/v1"]} + mock_run_command.return_value = ( + True, + '{"paths": {"api/v1": {"serverRelativeURL": "/openapi/v3/api/v1"}}}', + "", + ) + mock_build_namespacing.return_value = {"Pod": True} + mock_fetch_schemas.return_value = { + "api/v1": { + "components": { + "schemas": { + "io.k8s.api.core.v1.Pod": { + "type": "object", + "description": "Updated Pod description", + "properties": {"spec": {"type": "object"}, "newField": {"type": "string"}}, + "x-kubernetes-group-version-kind": [{"group": "", "version": "v1", "kind": "Pod"}], + } + } + } + } + } + # Existing mapping with old Pod schema + mock_read_mapping.return_value = { + "pod": [ + { + "description": "Old Pod description", + "properties": {"spec": {"type": "object"}}, + "x-kubernetes-group-version-kind": [{"group": "", "version": "v1", "kind": "Pod"}], + } + ] + } + + update_single_resource_schema(kind="Pod") + + # Verify save was called + mock_save_archive.assert_called_once() + saved_mapping = mock_save_archive.call_args[0][0] + assert "pod" in saved_mapping + # Verify the schema was updated + assert saved_mapping["pod"][0]["description"] == "Updated Pod description" + assert "newField" in saved_mapping["pod"][0]["properties"] + + @patch("class_generator.core.schema.run_command") + @patch("class_generator.core.schema.build_dynamic_resource_to_api_mapping") + @patch("class_generator.core.schema.get_client_binary") + def test_update_single_resource_schema_v3_index_fetch_fails( + self, mock_get_client, mock_build_mapping, mock_run_command + ): + """Test that RuntimeError is raised when OpenAPI v3 index fetch fails.""" + mock_get_client.return_value = "oc" + mock_build_mapping.return_value = {"Pod": ["api/v1"]} + mock_run_command.return_value = (False, "", "Connection refused") + + with pytest.raises(RuntimeError) as exc_info: + update_single_resource_schema(kind="Pod") + + assert "Failed to fetch OpenAPI v3 index" in str(exc_info.value) + + @patch("class_generator.core.schema.fetch_all_api_schemas") + @patch("class_generator.core.schema.build_namespacing_dict") + @patch("class_generator.core.schema.run_command") + @patch("class_generator.core.schema.build_dynamic_resource_to_api_mapping") + @patch("class_generator.core.schema.get_client_binary") + def test_update_single_resource_schema_no_schema_definition( + self, mock_get_client, mock_build_mapping, mock_run_command, mock_build_namespacing, mock_fetch_schemas + ): + """Test that RuntimeError is raised when no schema definition found for kind.""" + mock_get_client.return_value = "oc" + mock_build_mapping.return_value = {"Pod": ["api/v1"]} + mock_run_command.return_value = ( + True, + '{"paths": {"api/v1": {"serverRelativeURL": "/openapi/v3/api/v1"}}}', + "", + ) + mock_build_namespacing.return_value = {"Pod": True} + # Return schema without Pod definition + mock_fetch_schemas.return_value = { + "api/v1": { + "components": { + "schemas": { + "io.k8s.api.core.v1.ConfigMap": { + "type": "object", + "x-kubernetes-group-version-kind": [{"group": "", "version": "v1", "kind": "ConfigMap"}], + } + } + } + } + } + + with pytest.raises(RuntimeError) as exc_info: + update_single_resource_schema(kind="Pod") + + assert "No schema definition found for Pod" in str(exc_info.value)