From c0b0fa77f2c78af847c7d1aeab26c6d25d676365 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 3 Mar 2026 21:22:54 +0530 Subject: [PATCH 1/5] Tool to reconcile property:value within nodes that are supercedeBy new nodes --- tools/statvar_importer/mcf_file_util.py | 2 + .../schema/schema_reconciler.py | 330 ++++++++++++++++++ .../schema/schema_reconciler_test.py | 131 +++++++ .../schema/schema_resolver.py | 13 +- tools/statvar_importer/stat_var_processor.py | 10 + util/dc_api_wrapper.py | 2 +- 6 files changed, 475 insertions(+), 13 deletions(-) create mode 100644 tools/statvar_importer/schema/schema_reconciler.py create mode 100644 tools/statvar_importer/schema/schema_reconciler_test.py diff --git a/tools/statvar_importer/mcf_file_util.py b/tools/statvar_importer/mcf_file_util.py index 2a78fde9db..93b5a8e6ce 100644 --- a/tools/statvar_importer/mcf_file_util.py +++ b/tools/statvar_importer/mcf_file_util.py @@ -742,6 +742,8 @@ def get_value_list(value: str) -> list: return value value_list = [] # Read the string as a comma separated line. + if not isinstance(value, str): + value = str(value) is_quoted = '"' in value try: if is_quoted and "," in value: diff --git a/tools/statvar_importer/schema/schema_reconciler.py b/tools/statvar_importer/schema/schema_reconciler.py new file mode 100644 index 0000000000..99166e0a32 --- /dev/null +++ b/tools/statvar_importer/schema/schema_reconciler.py @@ -0,0 +1,330 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Class to reconcile schema nodes.""" + +import os +import sys +from typing import Union + +from absl import app +from absl import flags +from absl import logging + +# uncomment to run pprof +# os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python' +# from pypprof.net_http import start_pprof_server + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_DATA_DIR = os.path.join(_SCRIPT_DIR.split('/data/')[0], 'data') +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.dirname(_SCRIPT_DIR)) +sys.path.append(_DATA_DIR) +sys.path.append(os.path.join(_DATA_DIR, 'util')) + +import process_http_server + +from mcf_file_util import load_mcf_nodes, write_mcf_nodes +from mcf_file_util import add_namespace, strip_namespace +from mcf_file_util import add_mcf_node, get_value_list + +# imports from data/util +import file_util +import dc_api_wrapper as dc_api +from config_map import ConfigMap +from counters import Counters + + +_FLAGS = flags.FLAGS + +flags.DEFINE_list('recon_schema_mcf', [], + 'List of schema MCF files to load for reconciliation.') +flags.DEFINE_list('recon_input', [], + 'List of MCF files to load for reconciliation.') +flags.DEFINE_string( + 'recon_output', + '', + 'Output MCF for reconciled nodes.', +) +flags.DEFINE_list('recon_property', [], + 'List of properties to be looked up for reconciliation.') +flags.DEFINE_bool('recon_keep_legacy_svobs', True, + 'Keep the legacy value when reconciling nodes.') + + +def get_default_recon_config() -> dict: + """Returns dictionary of default config for reconciliation.""" + if not _FLAGS.is_parsed(): + _FLAGS.mark_as_parsed() + + return { + 'recon_property': _FLAGS.recon_property, + 'recon_keep_legacy_svobs': _FLAGS.recon_keep_legacy_svobs, + } + + +class SchemaReconciler: + """Class to reconcile nodes with schema. + Supports the following reconciliation: + - If a property or value node has a supercededBy property, + update the value to use the superceded node. + + Usage: + # Create a SchemaReconciler loaded with MCF schema nodes. + resolver = SchemaReconciler(mcf_files) + + # Reconcile a list of nodes with property:values. + input_nodes = { : { : }, ...} + output_nodes = recon.reconcile_nodes(input_nodes) + # if value1 is remapped to valueNew with supercededBy in schema, + # output nodes = { : { : }, ...} + """ + + def __init__(self, + schema_mcf_files: list = '', + config: dict = {}, + counters: Counters = None): + self._counters = counters + if self._counters is None: + self._counters = Counters() + self._config = ConfigMap(config_dict=get_default_recon_config()) + if config: + self._config.add_configs(config) + + self._schema_nodes = {} + self.load_schema_mcf(schema_mcf_files) + + def load_schema_mcf(self, schema_mcf_files: list): + """Load nodes from schema MCF files and add to the index.""" + mcf_nodes = load_mcf_nodes(schema_mcf_files, {}) + self.load_schema_nodes(mcf_nodes) + logging.info( + f'Loaded {len(mcf_nodes)} schema MCF nodes: {schema_mcf_files}') + + def load_schema_nodes(self, schema_nodes: dict): + """Load nodes into schema used for reconciliation.""" + if not schema_nodes: + return + for node in schema_nodes.values(): + add_mcf_node(node, self._schema_nodes) + self._counters.add_counter('recon_schema_nodes', 1) + + def reconcile_nodes(self, + nodes: dict, + keep_legacy_obs: bool = None, + remapped_dcids: dict = None) -> int: + """Return the reconciled nodes. + Any values in the input nodes that are supercededBy new nodes are updated + to the new node. If config{'recon_keep_legacy_svobs'} is set the old value is + also retained. + + In case the node is a StatVarObservation with single value per property, + the node is replicated with the new property:value. + + If config('recon_lookup_api') is set, the DC API is used to fetch schema + for nodes referenced in the input but are not in the schema preloaded. + + Args: + nodes: dictionary of nodes as dict of property:values. + keep_legacy_obs: if True, preserves the existing StatVarObservation nodes + and add a new node with modified property:values. + remapped_dcids: dictionary of dcids to be remapped with updated dcids. + if not set, looks up schema for supercededBy property for each unique dcid. + + Returns: + The number of nodes remapped. + """ + num_remapped = 0 + if keep_legacy_obs is None: + keep_legacy_obs = self._config.get('recon_keep_legacy_svobs', True) + logging.info(f'Looking up {len(nodes)} nodes for reconciliation.') + if not remapped_dcids: + remapped_dcids = self.lookup_remapped_schema(nodes) + if not remapped_dcids: + # No dcids to be remapped. Return the original nodes. + logging.info(f'No remapped dcids in {len(nodes)} nodes') + self._counters.add_counter('recon_unmodified_nodes', len(nodes)) + return num_remapped + + logging.info( + f'Got {len(remapped_dcids)} remapped dcids for {len(nodes)} nodes') + + keys = list(nodes.keys()) + for key in keys: + node = nodes.get(key) + if not node: + continue + # Get all remapped property:values for the node from remapped_dcids, + new_node = {} + new_pvs = {} + for prop, value in node.items(): + remapped_prop = remapped_dcids.get(add_namespace(prop), prop) + values = get_value_list(value) + remapped_values = [] + for val in values: + remapped_values.append( + remapped_dcids.get(add_namespace(val), val)) + if remapped_prop != prop or remapped_values != values: + # Property:value is modified. Add the new property:value. + new_pvs[strip_namespace(remapped_prop)] = ",".join( + remapped_values) + else: + # No modifiction to property:value. Copy it over. + new_node[prop] = value + if not new_pvs: + self._counters.add_counter('recon_unmodified_nodes', 1) + continue + + # Get the new node with all existing unmodified prop:value + # and new modified prop:value. + new_node.update(new_pvs) + + # Node has remapped property:values. + # Update existing node for non-StatVar observations. + # Duplicate node for StatVar observations of keep is set. + typeof = strip_namespace(node.get('typeOf', '')) + if typeof == 'StatVarObservation' and keep_legacy_obs: + # Create a new duplicate node with a new dcid. + new_key = f'{key}-1' + new_node['Node'] = new_key + if 'dcid' in new_node: + new_node['dcid'] = new_key + nodes[new_key] = new_node + + self._counters.add_counter('recon_new_nodes', 1) + else: + # Update the existing node in place. + node.clear() + node.update(new_node) + self._counters.add_counter('recon_updated_nodes', 1) + num_remapped += 1 + + logging.info(f'Remapped {num_remapped} nodes out of {len(nodes)}') + return num_remapped + + def lookup_remapped_schema(self, + nodes: dict, + schema_nodes: dict = None) -> dict: + """Lookup new property or value in the nodes and add to the cached schema. + + Args: + nodes: dictionary of nodes as dict of property:values. + + Returns: + dictionary of dcids that have to be remapped with updated dcid. + """ + if schema_nodes is None: + schema_nodes = self._schema_nodes + + # dictionary of dcid mapped to the remapped new dcid. + remapped_dcids = {} + recon_props = self._config.get('recon_property', []) + + # Get a list of new dcids to be looked up + lookup_dcids = set() + for dcid, pvs in nodes.items(): + for prop, value in pvs.items(): + if recon_props and prop not in recon_props: + continue + if not prop or prop.startswith('#') or not prop[0].islower(): + # Ignore invalid properties. + continue + if prop not in self._schema_nodes and add_namespace( + prop) not in self._schema_nodes: + lookup_dcids.add(prop) + else: + remapped_prop = self.get_remapped_dcid(prop) + if remapped_prop and remapped_prop != prop: + remapped_dcids[add_namespace(prop)] = remapped_prop + values = get_value_list(value) + for val in values: + if val.startswith('#') or val.startswith('"') or ' ' in val: + continue + if val not in schema_nodes and add_namespace( + val) not in schema_nodes: + lookup_dcids.add(val) + else: + remapped_val = self.get_remapped_dcid(val) + if remapped_val and remapped_val != val: + remapped_dcids[add_namespace(val)] = remapped_val + + if lookup_dcids and not self._config.get('recon_lookup_api', True): + logging.warning( + f'SchemaRecon ignoring {len(lookup_dcids)} new dcids not in schema.' + ) + return remapped_dcids + + new_schema_nodes = dc_api.dc_api_get_node_property( + list(lookup_dcids), 'supercededBy') + self._counters.add_counter('recon_lookup_schema', len(lookup_dcids)) + for dcid, node in new_schema_nodes.items(): + add_mcf_node(node, schema_nodes) + self._counters.add_counter('recon_lookup_schema_response', 1) + self._counters.add_counter('recon_schema_nodes', 1) + + remapped_dcid = self.get_remapped_dcid(dcid) + if remapped_dcid: + remapped_dcids[add_namespace(dcid)] = remapped_dcid + + return remapped_dcids + + def get_remapped_dcid(self, dcid: str) -> str: + """Returns the remapped dcid from the schema for the given node. + + Args: + dcid: dcid to be lookedup. + + Returns: + new dcid that superceded the input dcid. + """ + node = self._schema_nodes.get(add_namespace(dcid)) + if not node: + return None + return node.get('supercededBy', '') + + +def schema_reconcile_nodes(nodes: dict, + config: dict = None, + schema_nodes: dict = None, + schema_files: list = None, + counters: Counters = None) -> bool: + """Reconcile a set of nodes.""" + recon = SchemaReconciler(schema_files, config, counters) + recon.load_schema_nodes(schema_nodes) + return recon.reconcile_nodes(nodes) + + +def main(_): + # Launch a web server if --http_port is set. + if process_http_server.run_http_server(script=__file__, module=__name__): + return + + # if _FLAGS.debug: + # logging.set_verbosity(2) + + counters = Counters() + nodes = load_mcf_nodes(_FLAGS.recon_input) + is_remapped = schema_reconcile_nodes( + nodes, + get_default_recon_config(), + {}, + _FLAGS.recon_schema_mcf, + counters + ) + if is_remapped and _FLAGS.recon_output: + write_mcf_nodes(nodes, _FLAGS.recon_output) + counters.print_counters() + + +if __name__ == '__main__': + app.run(main) diff --git a/tools/statvar_importer/schema/schema_reconciler_test.py b/tools/statvar_importer/schema/schema_reconciler_test.py new file mode 100644 index 0000000000..1a68a25238 --- /dev/null +++ b/tools/statvar_importer/schema/schema_reconciler_test.py @@ -0,0 +1,131 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for schema_reconciler.py.""" + +import sys +import os +import unittest +from unittest import mock + +# Add the directory containing schema_reconciler.py to sys.path +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../util')) # For dc_api_wrapper if needed by schema_reconciler + +# Mock dc_api_wrapper before importing schema_reconciler to avoid dependency issues +sys.modules['dc_api_wrapper'] = mock.Mock() + +import schema_reconciler + +class SchemaReconcilerTest(unittest.TestCase): + + def setUp(self): + self.schema_nodes = { + 'dcid:OldVal': { + 'Node': 'dcid:OldVal', + 'typeOf': 'dcs:Class', + 'supercededBy': 'dcid:NewVal' + }, + 'dcid:oldProp': { + 'Node': 'dcid:oldProp', + 'typeOf': 'dcs:Property', + 'supercededBy': 'dcid:newProp' + }, + 'dcid:NewVal': { + 'Node': 'dcid:NewVal', + 'typeOf': 'dcs:Class' + }, + 'dcid:newProp': { + 'Node': 'dcid:newProp', + 'typeOf': 'dcs:Property' + }, + 'prop1': {'Node': 'prop1'}, + 'value1': {'Node': 'value1'}, + 'typeOf': {'Node': 'typeOf'}, + 'dcs:StatVarObservation': {'Node': 'dcs:StatVarObservation'}, + } + self.reconciler = schema_reconciler.SchemaReconciler(config={'recon_lookup_api': False}) + self.reconciler.load_schema_nodes(self.schema_nodes) + + # Configure mock to return empty dict if called + schema_reconciler.dc_api.dc_api_get_node_property.return_value = {} + + def test_reconcile_simple(self): + # Test value reconciliation + nodes = { + 'node1': { + 'prop1': 'dcid:OldVal' + } + } + num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=False) + self.assertEqual(num_remapped, 1) + self.assertEqual(nodes['node1']['prop1'], 'dcid:NewVal') + + def test_reconcile_property(self): + # Test property reconciliation + nodes = { + 'node1': { + 'oldProp': 'value1' + } + } + num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=False) + self.assertEqual(num_remapped, 1) + self.assertIn('newProp', nodes['node1']) + self.assertEqual(nodes['node1']['newProp'], 'value1') + self.assertNotIn('oldProp', nodes['node1']) + + def test_keep_legacy_svobs(self): + nodes = { + 'obs1': { + 'typeOf': 'StatVarObservation', + 'prop1': 'dcid:OldVal' + } + } + num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=True) + + # Should have 2 nodes now: original and new + self.assertEqual(len(nodes), 2) + self.assertEqual(num_remapped, 1) + + # Original should be unchanged + self.assertEqual(nodes['obs1']['prop1'], 'dcid:OldVal') + + # New node should have new value + self.assertIn('1', nodes) + self.assertEqual(nodes['1']['prop1'], 'dcid:NewVal') + + def test_no_change(self): + nodes = { + 'node1': { + 'prop1': 'dcid:NewVal' + } + } + num_remapped = self.reconciler.reconcile_nodes(nodes) + self.assertEqual(num_remapped, 0) + self.assertEqual(nodes['node1']['prop1'], 'dcid:NewVal') + + def test_list_values(self): + nodes = { + 'node1': { + 'prop1': 'dcid:OldVal, dcid:NewVal' + } + } + num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=False) + self.assertEqual(num_remapped, 1) + # Should be "dcid:NewVal, dcid:NewVal" -> duplicate values might be kept or joined + # Code: ",".join(remapped_values) + self.assertEqual(nodes['node1']['prop1'], 'dcid:NewVal,dcid:NewVal') + +if __name__ == '__main__': + unittest.main() diff --git a/tools/statvar_importer/schema/schema_resolver.py b/tools/statvar_importer/schema/schema_resolver.py index d363b82c2d..a0e0dc87c2 100644 --- a/tools/statvar_importer/schema/schema_resolver.py +++ b/tools/statvar_importer/schema/schema_resolver.py @@ -36,7 +36,7 @@ import process_http_server -from mcf_file_util import load_mcf_nodes, write_mcf_nodes +from mcf_file_util import load_mcf_nodes, write_mcf_nodes, get_node_dcid from mcf_file_util import add_namespace, strip_namespace, normalize_mcf_node from mcf_diff import fingerprint_node @@ -242,17 +242,6 @@ def _get_keys(self, node: dict, normalize_node: bool = True) -> list: return keys -def get_node_dcid(node: dict) -> str: - """Returns the dcid without the namespace prefix for the node.""" - if not node: - return '' - dcid = node.get('dcid', node.get('Node', '')).strip() - if dcid and dcid[0] == '"': - # Strip quotes around dcid - dcid = dcid.strip('"') - return strip_namespace(dcid) - - def resolve_nodes( schema_mcf: list, input_mcf: list, diff --git a/tools/statvar_importer/stat_var_processor.py b/tools/statvar_importer/stat_var_processor.py index 31a0a6d139..fd12544abe 100644 --- a/tools/statvar_importer/stat_var_processor.py +++ b/tools/statvar_importer/stat_var_processor.py @@ -88,6 +88,7 @@ from json_to_csv import file_json_to_csv from schema_generator import generate_schema_nodes, generate_statvar_name from schema_checker import sanity_check_nodes +from schema_reconciler import SchemaReconciler # imports from ../../util from config_map import ConfigMap, read_py_dict_from_file @@ -1229,6 +1230,13 @@ def filter_svobs(self): """Filter SVObs to remove outliers.""" filter_data_svobs(self._statvar_obs_map, self._config, self._counters) + def reconcile_nodes(self): + """Reconcile statvar and obs to new property:values or statvars.""" + recon = SchemaReconciler(config=self._config.get_configs(), + counters=self._counters) + recon.reconcile_nodes(self._statvars_map) + recon.reconcile_nodes(self._statvar_obs_map) + def write_statvar_obs_csv( self, output_csv: str, @@ -2789,6 +2797,8 @@ def write_outputs(self, output_path: str): logging.info(f'Generating output: {output_path}') self._counters.set_prefix('2:prepare_output_') self._statvars_map.drop_invalid_statvars() + if self._config.get('reconcile_nodes', True): + self._statvars_map.reconcile_nodes() if self._config.get('generate_statvar_mcf', True): self._counters.set_prefix('3:write_statvar_mcf_') statvar_mcf_file = self._config.get('output_statvar_mcf', diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index f1a1fee247..cf2f25cb9b 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -329,7 +329,7 @@ def dc_api_get_node_property(dcids: list, dc_api_batched_wrapper and dc_api_wrapper for details. Returns: - dictionary with each input dcid mapped to a True/False value. + dictionary with each input dcid mapped to a propeorty:value """ config = _validate_v2_config(config) if isinstance(prop, list): From 6e7329f915fc565464925d9241cce60833e3afe7 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Sat, 21 Mar 2026 02:17:49 +0530 Subject: [PATCH 2/5] fix comments --- .../schema/schema_reconciler.py | 172 +++++++++++++----- .../schema/schema_reconciler_test.py | 67 +++---- 2 files changed, 159 insertions(+), 80 deletions(-) diff --git a/tools/statvar_importer/schema/schema_reconciler.py b/tools/statvar_importer/schema/schema_reconciler.py index 99166e0a32..efcf0faf62 100644 --- a/tools/statvar_importer/schema/schema_reconciler.py +++ b/tools/statvar_importer/schema/schema_reconciler.py @@ -11,7 +11,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Class to reconcile schema nodes.""" +"""Class to reconcile schema nodes. + +Given a dictionary of MCF nodes, such as StatVarObservations or schema, +any property or values that refers to a node that has a 'supercededBy' property +is replaced by the supercededBy node. + +When reconciling schema, a set of equivalent or duplicate StatVars can be +mapped to a canonical node through the supercededBy property. +The import pipelines will then start generating the nodes with the new dcid. + +When reconciling nodes into a canonical node, the old nodes can be optionally +preserved so that any users who have hardcoded the old dcid can continue to +retrieve the node. The reconciliation invokes the DC API to retrieve the +supercededBy property for all property and values. +To reconcile faster, provide a schema MCF file with the supercededBy values. + +The reconciliation is automatically invoked on the output as part of the +StatVarProcessor. + +To reconcile any csv or mcf file through command line, run the following: + python schema_reconciler.py --recon_input= \ + --recon_output= \ + +To reconcile a specific set of nodes configured with supercededBy, add the +schema MCF with the following commandline option: + --recon_schema_mcf= + +To reconcile values of a specific set of properties, such as +variableMeasured and measurementMethod, set the following command line option: + --recon_property=variableMeasured,measurementMethod +""" import os import sys @@ -32,6 +62,7 @@ sys.path.append(_DATA_DIR) sys.path.append(os.path.join(_DATA_DIR, 'util')) +# imports from data/tools/statvar_importer import process_http_server from mcf_file_util import load_mcf_nodes, write_mcf_nodes @@ -44,7 +75,6 @@ from config_map import ConfigMap from counters import Counters - _FLAGS = flags.FLAGS flags.DEFINE_list('recon_schema_mcf', [], @@ -76,8 +106,20 @@ def get_default_recon_config() -> dict: class SchemaReconciler: """Class to reconcile nodes with schema. Supports the following reconciliation: - - If a property or value node has a supercededBy property, - update the value to use the superceded node. + - If a schema definition for a property or value has a supercededBy + property, update the value to use the superceded node. + - To restrict schema lookup through DC APIs for reconciliation to specific + properties, set the recon_property config. + + It supports the following config options: + - recon_keep_legacy_svobs: retain old StatVarObservation nodes + if any value in the svobs is reconciled to a new dcid. + - recon_lookup_api: Enable or disable API lookup for schema definition + for any property or value. + If disabled, use the schema_mcf option to specify a list of MCF files + containing nodes with the supercededBy property. + - recon_property: Restrict reconciliation to values of these listed + properties. Usage: # Create a SchemaReconciler loaded with MCF schema nodes. @@ -94,6 +136,13 @@ def __init__(self, schema_mcf_files: list = '', config: dict = {}, counters: Counters = None): + """Initializes the SchemaReconciler with schema files and configuration. + + Args: + schema_mcf_files: List of MCF files containing schema definitions. + config: Configuration dictionary for reconciliation. + counters: Counters object for tracking metrics. + """ self._counters = counters if self._counters is None: self._counters = Counters() @@ -107,17 +156,17 @@ def __init__(self, def load_schema_mcf(self, schema_mcf_files: list): """Load nodes from schema MCF files and add to the index.""" mcf_nodes = load_mcf_nodes(schema_mcf_files, {}) - self.load_schema_nodes(mcf_nodes) + self.add_schema_nodes(mcf_nodes) logging.info( f'Loaded {len(mcf_nodes)} schema MCF nodes: {schema_mcf_files}') - def load_schema_nodes(self, schema_nodes: dict): + def add_schema_nodes(self, schema_nodes: dict): """Load nodes into schema used for reconciliation.""" if not schema_nodes: return for node in schema_nodes.values(): add_mcf_node(node, self._schema_nodes) - self._counters.add_counter('recon_schema_nodes', 1) + self._counters.add_counter('recon-schema-nodes', 1) def reconcile_nodes(self, nodes: dict, @@ -125,12 +174,14 @@ def reconcile_nodes(self, remapped_dcids: dict = None) -> int: """Return the reconciled nodes. Any values in the input nodes that are supercededBy new nodes are updated - to the new node. If config{'recon_keep_legacy_svobs'} is set the old value is - also retained. + to the new node. In case the node is a StatVarObservation with single value per property, the node is replicated with the new property:value. + If config{'recon_keep_legacy_svobs'} is set the old svobs node is + also retained. + If config('recon_lookup_api') is set, the DC API is used to fetch schema for nodes referenced in the input but are not in the schema preloaded. @@ -140,6 +191,8 @@ def reconcile_nodes(self, and add a new node with modified property:values. remapped_dcids: dictionary of dcids to be remapped with updated dcids. if not set, looks up schema for supercededBy property for each unique dcid. + remapped_dcids are assumed to have 'dcid:' prefix in the key. + { 'dcid:OldId': 'dcid:NewId' ... } Returns: The number of nodes remapped. @@ -148,23 +201,28 @@ def reconcile_nodes(self, if keep_legacy_obs is None: keep_legacy_obs = self._config.get('recon_keep_legacy_svobs', True) logging.info(f'Looking up {len(nodes)} nodes for reconciliation.') + + # Identify all DCIDs in input nodes that need to be remapped based on schema. if not remapped_dcids: remapped_dcids = self.lookup_remapped_schema(nodes) if not remapped_dcids: # No dcids to be remapped. Return the original nodes. logging.info(f'No remapped dcids in {len(nodes)} nodes') - self._counters.add_counter('recon_unmodified_nodes', len(nodes)) + self._counters.add_counter('recon-unmodified-nodes', len(nodes)) return num_remapped logging.info( f'Got {len(remapped_dcids)} remapped dcids for {len(nodes)} nodes') + # Iterate through each node and update properties or values that have been remapped. keys = list(nodes.keys()) for key in keys: node = nodes.get(key) if not node: continue - # Get all remapped property:values for the node from remapped_dcids, + + # Process all property-value pairs for the current node. + # Collect any modifications in new_pvs and keep original values in new_node. new_node = {} new_pvs = {} for prop, value in node.items(): @@ -174,39 +232,39 @@ def reconcile_nodes(self, for val in values: remapped_values.append( remapped_dcids.get(add_namespace(val), val)) + if remapped_prop != prop or remapped_values != values: # Property:value is modified. Add the new property:value. new_pvs[strip_namespace(remapped_prop)] = ",".join( remapped_values) else: - # No modifiction to property:value. Copy it over. + # No modification to property:value. Copy it over. new_node[prop] = value + if not new_pvs: - self._counters.add_counter('recon_unmodified_nodes', 1) + self._counters.add_counter('recon-unmodified-nodes', 1) continue - # Get the new node with all existing unmodified prop:value - # and new modified prop:value. + # Merge modified properties and values into the new node representation. new_node.update(new_pvs) - # Node has remapped property:values. - # Update existing node for non-StatVar observations. - # Duplicate node for StatVar observations of keep is set. + # Update the original nodes dictionary. + # For StatVarObservations, we can optionally keep the legacy observation. typeof = strip_namespace(node.get('typeOf', '')) if typeof == 'StatVarObservation' and keep_legacy_obs: - # Create a new duplicate node with a new dcid. + # Create a new duplicate node with a new dcid to preserve history. new_key = f'{key}-1' new_node['Node'] = new_key if 'dcid' in new_node: - new_node['dcid'] = new_key + new_node['dcid'] = new_key nodes[new_key] = new_node - self._counters.add_counter('recon_new_nodes', 1) + self._counters.add_counter('recon-new-nodes', 1) else: - # Update the existing node in place. + # Update the existing node in place if no legacy preservation is required. node.clear() node.update(new_node) - self._counters.add_counter('recon_updated_nodes', 1) + self._counters.add_counter('recon-updated-nodes', 1) num_remapped += 1 logging.info(f'Remapped {num_remapped} nodes out of {len(nodes)}') @@ -226,51 +284,62 @@ def lookup_remapped_schema(self, if schema_nodes is None: schema_nodes = self._schema_nodes - # dictionary of dcid mapped to the remapped new dcid. + # dictionary of dcid mapped to a new dcid. remapped_dcids = {} recon_props = self._config.get('recon_property', []) - # Get a list of new dcids to be looked up + # Get a list of new dcids to be looked up from both properties and values. lookup_dcids = set() for dcid, pvs in nodes.items(): for prop, value in pvs.items(): + # Check if reconciliation is restricted to specific properties. if recon_props and prop not in recon_props: continue if not prop or prop.startswith('#') or not prop[0].islower(): # Ignore invalid properties. continue - if prop not in self._schema_nodes and add_namespace( - prop) not in self._schema_nodes: + + # Check if the property itself has a replacement in the cached schema. + schema_node = self.get_schema_node(prop) + if not schema_node: lookup_dcids.add(prop) else: - remapped_prop = self.get_remapped_dcid(prop) + remapped_prop = self.get_remapped_dcid(prop, schema_node) if remapped_prop and remapped_prop != prop: remapped_dcids[add_namespace(prop)] = remapped_prop + + # Check if any of the values for this property have replacements in the cached schema. values = get_value_list(value) for val in values: if val.startswith('#') or val.startswith('"') or ' ' in val: + # ignore value that is a quoted string and not a + # reference to another node continue - if val not in schema_nodes and add_namespace( - val) not in schema_nodes: + schema_node = self.get_schema_node(val) + if not schema_node: lookup_dcids.add(val) else: - remapped_val = self.get_remapped_dcid(val) + remapped_val = self.get_remapped_dcid(val, schema_node) if remapped_val and remapped_val != val: remapped_dcids[add_namespace(val)] = remapped_val + # If some DCIDs are not found in the local schema cache, fetch them using the DC API. if lookup_dcids and not self._config.get('recon_lookup_api', True): + # DC API lookup is disabled. + # Use any remapped dcids collected from existing schema. logging.warning( f'SchemaRecon ignoring {len(lookup_dcids)} new dcids not in schema.' ) return remapped_dcids + # Batch lookup DC API for supercededBy for selected unique dcids. new_schema_nodes = dc_api.dc_api_get_node_property( list(lookup_dcids), 'supercededBy') - self._counters.add_counter('recon_lookup_schema', len(lookup_dcids)) + self._counters.add_counter('recon-lookup-schema', len(lookup_dcids)) for dcid, node in new_schema_nodes.items(): add_mcf_node(node, schema_nodes) - self._counters.add_counter('recon_lookup_schema_response', 1) - self._counters.add_counter('recon_schema_nodes', 1) + self._counters.add_counter('recon-lookup-schema-response', 1) + self._counters.add_counter('recon-schema-nodes', 1) remapped_dcid = self.get_remapped_dcid(dcid) if remapped_dcid: @@ -278,19 +347,32 @@ def lookup_remapped_schema(self, return remapped_dcids - def get_remapped_dcid(self, dcid: str) -> str: + def get_schema_node(self, dcid: str) -> dict: + """Returns a schema node with the given dcid.""" + node = self._schema_nodes.get(add_namespace(dcid)) + if not node: + # Try without namespace + node = self._schema_nodes.get(strip_namespace(dcid)) + return node + + def get_remapped_dcid(self, dcid: str, schema_node: dict = None) -> str: """Returns the remapped dcid from the schema for the given node. Args: - dcid: dcid to be lookedup. + dcid: dcid to be looked up for a remapped dcid. + schema_node: (optional) schema definition for the dcid with the + supercededBy property, if any. Returns: new dcid that superceded the input dcid. + None if the dcid is not present in the schema nodes + '' string if the schema node is present, but has no supercededBy """ - node = self._schema_nodes.get(add_namespace(dcid)) - if not node: + if not schema_node: + schema_node = self.get_schema_node(dcid) + if not schema_node: return None - return node.get('supercededBy', '') + return schema_node.get('supercededBy', '') def schema_reconcile_nodes(nodes: dict, @@ -300,11 +382,12 @@ def schema_reconcile_nodes(nodes: dict, counters: Counters = None) -> bool: """Reconcile a set of nodes.""" recon = SchemaReconciler(schema_files, config, counters) - recon.load_schema_nodes(schema_nodes) + recon.add_schema_nodes(schema_nodes) return recon.reconcile_nodes(nodes) def main(_): + """Main entry point for reconciling MCF files from the command line.""" # Launch a web server if --http_port is set. if process_http_server.run_http_server(script=__file__, module=__name__): return @@ -314,13 +397,8 @@ def main(_): counters = Counters() nodes = load_mcf_nodes(_FLAGS.recon_input) - is_remapped = schema_reconcile_nodes( - nodes, - get_default_recon_config(), - {}, - _FLAGS.recon_schema_mcf, - counters - ) + is_remapped = schema_reconcile_nodes(nodes, get_default_recon_config(), {}, + _FLAGS.recon_schema_mcf, counters) if is_remapped and _FLAGS.recon_output: write_mcf_nodes(nodes, _FLAGS.recon_output) counters.print_counters() diff --git a/tools/statvar_importer/schema/schema_reconciler_test.py b/tools/statvar_importer/schema/schema_reconciler_test.py index 1a68a25238..a230c451e9 100644 --- a/tools/statvar_importer/schema/schema_reconciler_test.py +++ b/tools/statvar_importer/schema/schema_reconciler_test.py @@ -21,13 +21,16 @@ # Add the directory containing schema_reconciler.py to sys.path _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPT_DIR) -sys.path.append(os.path.join(_SCRIPT_DIR, '../../util')) # For dc_api_wrapper if needed by schema_reconciler +sys.path.append(os.path.join( + _SCRIPT_DIR, + '../../util')) # For dc_api_wrapper if needed by schema_reconciler # Mock dc_api_wrapper before importing schema_reconciler to avoid dependency issues sys.modules['dc_api_wrapper'] = mock.Mock() import schema_reconciler + class SchemaReconcilerTest(unittest.TestCase): def setUp(self): @@ -50,36 +53,39 @@ def setUp(self): 'Node': 'dcid:newProp', 'typeOf': 'dcs:Property' }, - 'prop1': {'Node': 'prop1'}, - 'value1': {'Node': 'value1'}, - 'typeOf': {'Node': 'typeOf'}, - 'dcs:StatVarObservation': {'Node': 'dcs:StatVarObservation'}, + 'prop1': { + 'Node': 'prop1' + }, + 'value1': { + 'Node': 'value1' + }, + 'typeOf': { + 'Node': 'typeOf' + }, + 'dcs:StatVarObservation': { + 'Node': 'dcs:StatVarObservation' + }, } - self.reconciler = schema_reconciler.SchemaReconciler(config={'recon_lookup_api': False}) - self.reconciler.load_schema_nodes(self.schema_nodes) + self.reconciler = schema_reconciler.SchemaReconciler( + config={'recon_lookup_api': False}) + self.reconciler.add_schema_nodes(self.schema_nodes) # Configure mock to return empty dict if called schema_reconciler.dc_api.dc_api_get_node_property.return_value = {} def test_reconcile_simple(self): # Test value reconciliation - nodes = { - 'node1': { - 'prop1': 'dcid:OldVal' - } - } - num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=False) + nodes = {'node1': {'prop1': 'dcid:OldVal'}} + num_remapped = self.reconciler.reconcile_nodes(nodes, + keep_legacy_obs=False) self.assertEqual(num_remapped, 1) self.assertEqual(nodes['node1']['prop1'], 'dcid:NewVal') def test_reconcile_property(self): # Test property reconciliation - nodes = { - 'node1': { - 'oldProp': 'value1' - } - } - num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=False) + nodes = {'node1': {'oldProp': 'value1'}} + num_remapped = self.reconciler.reconcile_nodes(nodes, + keep_legacy_obs=False) self.assertEqual(num_remapped, 1) self.assertIn('newProp', nodes['node1']) self.assertEqual(nodes['node1']['newProp'], 'value1') @@ -92,7 +98,8 @@ def test_keep_legacy_svobs(self): 'prop1': 'dcid:OldVal' } } - num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=True) + num_remapped = self.reconciler.reconcile_nodes(nodes, + keep_legacy_obs=True) # Should have 2 nodes now: original and new self.assertEqual(len(nodes), 2) @@ -102,30 +109,24 @@ def test_keep_legacy_svobs(self): self.assertEqual(nodes['obs1']['prop1'], 'dcid:OldVal') # New node should have new value - self.assertIn('1', nodes) - self.assertEqual(nodes['1']['prop1'], 'dcid:NewVal') + self.assertIn('obs1-1', nodes) + self.assertEqual(nodes['obs1-1']['prop1'], 'dcid:NewVal') def test_no_change(self): - nodes = { - 'node1': { - 'prop1': 'dcid:NewVal' - } - } + nodes = {'node1': {'prop1': 'dcid:NewVal'}} num_remapped = self.reconciler.reconcile_nodes(nodes) self.assertEqual(num_remapped, 0) self.assertEqual(nodes['node1']['prop1'], 'dcid:NewVal') def test_list_values(self): - nodes = { - 'node1': { - 'prop1': 'dcid:OldVal, dcid:NewVal' - } - } - num_remapped = self.reconciler.reconcile_nodes(nodes, keep_legacy_obs=False) + nodes = {'node1': {'prop1': 'dcid:OldVal, dcid:NewVal'}} + num_remapped = self.reconciler.reconcile_nodes(nodes, + keep_legacy_obs=False) self.assertEqual(num_remapped, 1) # Should be "dcid:NewVal, dcid:NewVal" -> duplicate values might be kept or joined # Code: ",".join(remapped_values) self.assertEqual(nodes['node1']['prop1'], 'dcid:NewVal,dcid:NewVal') + if __name__ == '__main__': unittest.main() From 1ceb1342821611b39a2cbb6dc188beb98a50843e Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Sat, 21 Mar 2026 02:31:56 +0530 Subject: [PATCH 3/5] fix comments --- .../schema/schema_reconciler.py | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/tools/statvar_importer/schema/schema_reconciler.py b/tools/statvar_importer/schema/schema_reconciler.py index efcf0faf62..be91e62fd0 100644 --- a/tools/statvar_importer/schema/schema_reconciler.py +++ b/tools/statvar_importer/schema/schema_reconciler.py @@ -133,8 +133,8 @@ class SchemaReconciler: """ def __init__(self, - schema_mcf_files: list = '', - config: dict = {}, + schema_mcf_files: list = None, + config: dict = None, counters: Counters = None): """Initializes the SchemaReconciler with schema files and configuration. @@ -155,6 +155,8 @@ def __init__(self, def load_schema_mcf(self, schema_mcf_files: list): """Load nodes from schema MCF files and add to the index.""" + if not schema_mcf_files: + return mcf_nodes = load_mcf_nodes(schema_mcf_files, {}) self.add_schema_nodes(mcf_nodes) logging.info( @@ -379,8 +381,25 @@ def schema_reconcile_nodes(nodes: dict, config: dict = None, schema_nodes: dict = None, schema_files: list = None, - counters: Counters = None) -> bool: - """Reconcile a set of nodes.""" + counters: Counters = None) -> int: + """Reconcile a set of nodes. + + Args: + nodes: dictionary of nodes as dict of property:values + config: dictionary of config parameter values + schema_nodes: dictionary of existing schema nodes + with key with a namespace: dcid: + schema_file: list of schema file to be loaded + Any node not in chema_nodes or schema_files is lookedup in DC API. + counters: (optional) counters to be updated + + Returns: + the number of nodes that wre reconciled. + + Also updates the input nodes with the reconciled property + if config recon_keep_legacy_svobs is set, keeps existing gnodes + and creates new nodesthat have been reconciled. + """ recon = SchemaReconciler(schema_files, config, counters) recon.add_schema_nodes(schema_nodes) return recon.reconcile_nodes(nodes) From a07fd9aa523db8d6ced3dc7a3752c5fc2445e8a6 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Sat, 21 Mar 2026 02:39:52 +0530 Subject: [PATCH 4/5] typo --- util/dc_api_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index cf2f25cb9b..25e537506f 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -329,7 +329,7 @@ def dc_api_get_node_property(dcids: list, dc_api_batched_wrapper and dc_api_wrapper for details. Returns: - dictionary with each input dcid mapped to a propeorty:value + dictionary with each input dcid mapped to a property:value """ config = _validate_v2_config(config) if isinstance(prop, list): From c1905641a1576f53c7996ef7ced4b8ab850c7486 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Sat, 21 Mar 2026 15:39:46 +0530 Subject: [PATCH 5/5] lint fix --- tools/statvar_importer/mcf_file_util.py | 2 +- tools/statvar_importer/stat_var_processor.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/statvar_importer/mcf_file_util.py b/tools/statvar_importer/mcf_file_util.py index 93b5a8e6ce..6ed57ed7fc 100644 --- a/tools/statvar_importer/mcf_file_util.py +++ b/tools/statvar_importer/mcf_file_util.py @@ -743,7 +743,7 @@ def get_value_list(value: str) -> list: value_list = [] # Read the string as a comma separated line. if not isinstance(value, str): - value = str(value) + value = str(value) is_quoted = '"' in value try: if is_quoted and "," in value: diff --git a/tools/statvar_importer/stat_var_processor.py b/tools/statvar_importer/stat_var_processor.py index fd12544abe..487cce556f 100644 --- a/tools/statvar_importer/stat_var_processor.py +++ b/tools/statvar_importer/stat_var_processor.py @@ -1233,7 +1233,7 @@ def filter_svobs(self): def reconcile_nodes(self): """Reconcile statvar and obs to new property:values or statvars.""" recon = SchemaReconciler(config=self._config.get_configs(), - counters=self._counters) + counters=self._counters) recon.reconcile_nodes(self._statvars_map) recon.reconcile_nodes(self._statvar_obs_map) @@ -2798,7 +2798,7 @@ def write_outputs(self, output_path: str): self._counters.set_prefix('2:prepare_output_') self._statvars_map.drop_invalid_statvars() if self._config.get('reconcile_nodes', True): - self._statvars_map.reconcile_nodes() + self._statvars_map.reconcile_nodes() if self._config.get('generate_statvar_mcf', True): self._counters.set_prefix('3:write_statvar_mcf_') statvar_mcf_file = self._config.get('output_statvar_mcf',