From 07ef6f0ba608f7a113b42b401ed023325fdce604 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 4 Dec 2025 16:03:28 +0530 Subject: [PATCH 01/14] Migrate to v2 api --- import-automation/executor/requirements.txt | 1 + scripts/fires/wfigs_data.py | 2 +- util/dc_api_wrapper.py | 261 +++++++++++++++++--- util/dc_api_wrapper_test.py | 27 +- 4 files changed, 250 insertions(+), 41 deletions(-) diff --git a/import-automation/executor/requirements.txt b/import-automation/executor/requirements.txt index d208a7c239..e2e7bfac1d 100644 --- a/import-automation/executor/requirements.txt +++ b/import-automation/executor/requirements.txt @@ -8,6 +8,7 @@ chromedriver_py croniter dataclasses datacommons +datacommons_client db-dtypes duckdb frozendict diff --git a/scripts/fires/wfigs_data.py b/scripts/fires/wfigs_data.py index f6d92e591c..c9aa870ef2 100644 --- a/scripts/fires/wfigs_data.py +++ b/scripts/fires/wfigs_data.py @@ -130,7 +130,7 @@ def process_df(df): # convert epoch time in milliseconds to datetime def epoch_to_datetime(date_val): if not pd.isna(date_val): - return datetime.datetime.fromtimestamp(date_val / 1000) + return datetime.datetime.fromtimestamp(date_val / 1000, datetime.timezone.utc) else: return None diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index c8c487ba3d..b6f5f865a8 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -11,15 +11,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Wrapper utilities for data commons API.""" +"""Wrapper utilities for data commons API. + +It uses the DataCommonsClient library module for V2 DC APIs by default +and adds support for batched requests, retries and HTTP caching. +DC V2 API requires an environment variable set for DC_API_KEY. +Please refer to https://docs.datacommons.org/api/python/v2 +for more details. + +To use the legacy datacommons library module, set the config: + 'dc_api_version': 'V1' +""" from collections import OrderedDict import os import sys import time import urllib +import requests from absl import logging +from datacommons_client.client import DataCommonsClient +from datacommons_client.utils.error_handling import DCConnectionError, DCStatusError, APIError import datacommons as dc import requests_cache @@ -37,6 +50,8 @@ # Resolve latlng coordinate # https://api.datacommons.org/v2/resolve _DC_API_PATH_RESOLVE_COORD = '/v2/resolve' +# Default API key for limited tests +_DEFAULT_DC_API_KEY = 'AIzaSyCTI4Xz-UW_G2Q2RfknhcfdAnTHq5X5XuI' def dc_api_wrapper( @@ -47,7 +62,7 @@ def dc_api_wrapper( use_cache: bool = False, api_root: str = None, ): - """Wrapper for a DC APi call with retries and caching. + """Wrapper for a DC API call with retries and caching. Returns the result from the DC APi call function. In case of errors, retries the function with a delay a fixed number of times. @@ -80,8 +95,6 @@ def dc_api_wrapper( logging.debug(f'Using requests_cache for DC API {function}') else: cache_context = requests_cache.disabled() - logging.debug(f'Using requests_cache for DC API {function}') - with cache_context: for attempt in range(retries): try: logging.debug( @@ -95,14 +108,16 @@ def dc_api_wrapper( # Exception in case of missing dcid. Don't retry. logging.error(f'Got exception for api: {function}, {e}') return None - except (urllib.error.URLError, urllib.error.HTTPError, - ValueError) as e: + except (urllib.error.URLError, urllib.error.HTTPError, ValueError, + DCConnectionError, DCStatusError, APIError, + requests.exceptions.Timeout, + requests.exceptions.ChunkedEncodingError) as e: # Exception when server is overloaded, retry after a delay if attempt >= retries: logging.error( f'Got exception for api: {function}, {e}, no more retries' ) - raise urllib.error.URLError + raise e else: logging.debug( f'Got exception {e}, retrying API {function} after' @@ -115,7 +130,7 @@ def dc_api_batched_wrapper( function, dcids: list, args: dict, - dcid_arg_kw: str = 'dcid', + dcid_arg_kw: str = 'dcids', headers: dict = {}, config: dict = None, ) -> dict: @@ -157,7 +172,7 @@ def dc_api_batched_wrapper( _strip_namespace(x) for x in dcids[index:index + api_batch_size] ] index += api_batch_size - args['dcids'] = dcids_batch + args[dcid_arg_kw] = dcids_batch batch_result = dc_api_wrapper( function, args, @@ -167,16 +182,70 @@ def dc_api_batched_wrapper( config.get('dc_api_root', None), ) if batch_result: - api_result.update(batch_result) + _dc_api_merge_results(api_result, batch_result) logging.debug(f'Got DC API result for {function}: {batch_result}') logging.debug( f'Returning response {api_result} for {function}, {dcids}, {args}') return api_result -def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = None) -> dict: - """Returns a dictionary with dcids mapped to True/False based on whether +def _dc_api_merge_results(results: dict, new_result: dict) -> dict: + """Returns the merged dictionary from the results and new_result.""" + if results is None: + results = {} + if not new_result: + return results + if not isinstance(new_result, dict): + # This is a V2 response. Extract the dict. + new_result = new_result.to_dict() + if 'data' in new_result: + new_result = new_result['data'] + # Update new_result into results if keys are different. + if len(results) != len(new_result) or results.keys() != new_result.keys(): + results.update(new_result) + return results + else: + # Both have the same nested dicts. Merged the nested dicts. + for key in new_result.keys(): + results[key] = _dc_api_merge_results(results[key], new_result[key]) + return results + +def get_datacommons_client(config: dict = None) -> DataCommonsClient: + """Returns a DataCommonsClient object initialized using config.""" + if config is None: + config = {} + api_key = config.get('dc_api_key', os.environ.get('DC_API_KEY')) + if not api_key: + logging.log_first_n( + logging.WARNING, f'Using default DC API key with limited quota. ' + 'Please set an API key in the environment variable: DC_API_KEY.' + 'Refer https://docs.datacommons.org/api/python/v2/#authentication ' + 'for more details.', + n=1) + api_key = _DEFAULT_DC_API_KEY + dc_instance = config.get('dc_api_root') + url = None + # Check if API root is a host or url endpoint. + if dc_instance: + if dc_instance.startswith('http'): + parsed_url = urllib.parse.urlparse(dc_instance) + if parsed_url and parsed_url.path and parsed_url.path != '/': + # API endpoint is a URL. + url = dc_instance + dc_instance = None + else: + # DataCommonsClient uses custom DC path /core/api/v2 + # with dc_instance. + # Set the URL to v2 prod endpoint. + url = urllib.parse.urljoin(dc_instance, 'v2') + dc_instance = None + logging.info(f'DEL: using {dc_instance}, {url}') + return DataCommonsClient(api_key=api_key, dc_instance=dc_instance, url=url) + + +def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = {}) -> dict: + """Returns a dictionary with dcids mapped to True/False based on whether the dcid is defined in the API and has a 'typeOf' property. Uses the property_value() DC API to lookup 'typeOf' for each dcid. dcids not defined in KG get a value of False. @@ -188,13 +257,25 @@ def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = None) -> dict: Returns: dictionary with each input dcid mapped to a True/False value. """ - api_function = dc.get_property_values - args = { - 'prop': 'typeOf', - 'out': True, - } - api_result = dc_api_batched_wrapper(api_function, dcids, args, - wrapper_config) + # Set parameters for V2 node API. + client = get_datacommons_client(wrapper_config) + api_function = client.node.fetch_property_values + args = {'properties': 'typeOf'} + dcid_arg_kw = 'node_dcids' + if wrapper_config.get('dc_api_version', 'V2') != 'V2': + # Set parameters for V1 API. + api_function = dc.get_property_values + args = { + 'prop': 'typeOf', + 'out': True, + } + dcid_arg_kw = 'dcids' + + api_result = dc_api_batched_wrapper(function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=wrapper_config) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) @@ -207,7 +288,7 @@ def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = None) -> dict: def dc_api_get_node_property(dcids: list, prop: str, - wrapper_config: dict = None) -> dict: + wrapper_config: dict = {}) -> dict: """Returns a dictionary keyed by dcid with { prop:value } for each dcid. Uses the get_property_values() DC API to lookup the property for each dcid. @@ -220,24 +301,54 @@ def dc_api_get_node_property(dcids: list, Returns: dictionary with each input dcid mapped to a True/False value. """ - api_function = dc.get_property_values - args = { - 'prop': prop, - 'out': True, - } - api_result = dc_api_batched_wrapper(api_function, dcids, args, - wrapper_config) + is_v2 = wrapper_config.get('dc_api_version', 'V2') == 'V2' + # Set parameters for V2 node API. + client = get_datacommons_client(wrapper_config) + api_function = client.node.fetch_property_values + args = {'properties': prop} + dcid_arg_kw = 'node_dcids' + if not is_v2: + # Set parameters for V1 API. + api_function = dc.get_property_values + args = { + 'prop': prop, + 'out': True, + } + dcid_arg_kw = 'dcids' + api_result = dc_api_batched_wrapper(function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=wrapper_config) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) - value = api_result.get(dcid_stripped) - if value: - response[dcid] = {prop: value} + node_data = api_result.get(dcid_stripped) + if not node_data: + continue + + if is_v2: + values = [] + arcs = node_data.get('arcs', {}) + prop_nodes = arcs.get(prop, {}).get('nodes', []) + for node in prop_nodes: + val_dcid = node.get('dcid') + if val_dcid: + values.append(val_dcid) + value = node.get('value') + if value: + value = '"' + value + '"' + values.append(value) + if values: + response[dcid] = {prop: ','.join(values)} + else: # V1 + if node_data: + response[dcid] = {prop: node_data} return response def dc_api_get_node_property_values(dcids: list, - wrapper_config: dict = None) -> dict: + wrapper_config: dict = {}) -> dict: """Returns all the property values for a set of dcids from the DC API. Args: @@ -245,14 +356,63 @@ def dc_api_get_node_property_values(dcids: list, wrapper_config: configuration parameters for the wrapper. See dc_api_batched_wrapper() and dc_api_wrapper() for details. + Returns: + dictionary with each dcid with the namspace 'dcid:' as the key + mapped to a dictionary of property:value. + """ + if wrapper_config.get('dc_api_version', 'V2') != 'V2': + return dc_api_v1_get_node_property_values(dcids, wrapper_config) + # Lookup node properties using V2 node API + client = get_datacommons_client(wrapper_config) + api_function = client.node.fetch + args = {'expression': '->*'} + dcid_arg_kw = 'node_dcids' + api_result = dc_api_batched_wrapper(function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=wrapper_config) + response = {} + for dcid, arcs in api_result.items(): + pvs = {} + for prop, node_values in arcs.get('arcs', {}).items(): + for node in node_values.get('nodes', []): + # Get property value as reference to another node + value = node.get('dcid') + if not value: + # Property value is a string. + value = node.get('value') + if value: + value = '"' + value + '"' + if value: + if prop in pvs: + value = pvs[prop] + ',' + value + pvs[prop] = value + if len(pvs) > 0: + if 'Node' not in pvs: + pvs['Node'] = _add_namespace(dcid) + response[_add_namespace(dcid)] = pvs + return response + + +def dc_api_v1_get_node_property_values(dcids: list, + wrapper_config: dict = {}) -> dict: + """Returns all the property values for a set of dcids from the DC V1 API. + + Args: + dcids: list of dcids to lookup + wrapper_config: configuration parameters for the wrapper. See + dc_api_batched_wrapper() and dc_api_wrapper() for details. + Returns: dictionary with each dcid with the namspace 'dcid:' as the key mapped to a dictionary of property:value. """ predefined_nodes = OrderedDict() api_function = dc.get_triples - api_triples = dc_api_batched_wrapper(api_function, dcids, {}, - wrapper_config) + api_triples = dc_api_batched_wrapper(api_function, + dcids, {}, + config=wrapper_config) if api_triples: for dcid, triples in api_triples.items(): if (_strip_namespace(dcid) not in dcids and @@ -274,15 +434,42 @@ def dc_api_get_node_property_values(dcids: list, return predefined_nodes -def dc_api_resolve_placeid(dcids: list, in_prop: str = 'placeId') -> dict: +def dc_api_resolve_placeid(dcids: list, + in_prop: str = 'placeId', + wrapper_config: dict = {}) -> dict: """Returns the resolved dcid for each of the placeid. Args: dcids: list of placeids to be resolved. + in_prop: The property of the input IDs. + wrapper_config: dictionary of DC API configuration settings. Returns: dictionary keyed by input placeid with reoslved dcid as value. """ + if wrapper_config.get('dc_api_version', 'V2') == 'V2': + client = get_datacommons_client(wrapper_config) + api_function = client.resolve.fetch + args = {'expression': f'<-{in_prop}->dcid'} + dcid_arg_kw = 'node_ids' + api_result = dc_api_batched_wrapper(function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=wrapper_config) + results = {} + if api_result: + for node in api_result.get('entities', []): + place_id = node.get('node') + if place_id: + candidates = node.get('candidates', []) + if candidates: + dcid = candidates[0].get('dcid') + if dcid: + results[place_id] = dcid + return results + + # V1 implementation data = {'in_prop': in_prop, 'out_prop': 'dcid'} data['ids'] = dcids num_ids = len(dcids) @@ -298,9 +485,9 @@ def dc_api_resolve_placeid(dcids: list, in_prop: str = 'placeId') -> dict: if recon_resp: for entity in recon_resp.get('entities', []): place_id = entity.get('inId', '') - dcids = entity.get('outIds', None) - if place_id and dcids: - results[place_id] = dcids[0] + out_dcids = entity.get('outIds', None) + if place_id and out_dcids: + results[place_id] = out_dcids[0] return results diff --git a/util/dc_api_wrapper_test.py b/util/dc_api_wrapper_test.py index a0c4ec10fe..2a555796d2 100644 --- a/util/dc_api_wrapper_test.py +++ b/util/dc_api_wrapper_test.py @@ -50,8 +50,8 @@ def test_dc_api_batched_wrapper(self): 'dcid:NewStatVar_NotInDC', # new statvar missing in DC ] args = {'prop': 'typeOf'} - response = dc_api.dc_api_batched_wrapper(api_function, dcids, args, - {'dc_api_batch_size': 2}) + response = dc_api.dc_api_batched_wrapper( + api_function, dcids, args, config={'dc_api_batch_size': 2}) self.assertTrue(response is not None) self.assertEqual(response['Count_Person'], ['StatisticalVariable']) self.assertEqual(response['Count_Person_Male'], ['StatisticalVariable']) @@ -69,7 +69,7 @@ def test_dc_api_is_defined_dcid(self): dcids, { 'dc_api_batch_size': 2, - 'dc_api_root': 'http://autopush.api.datacommons.org', + 'dc_api_root': 'http://autopush.api.datacommons.org/', }, ) self.assertTrue(response is not None) @@ -91,6 +91,27 @@ def test_dc_get_node_property_values(self): self.assertTrue('measuredProperty' in statvar_pvs) self.assertEqual('StatisticalVariable', statvar_pvs['typeOf']) + def test_dc_api_get_node_property(self): + """Test API wrapper to get a single property for a node.""" + dcids = ['Count_Person'] + prop = 'name' + response_v2 = dc_api.dc_api_get_node_property(dcids, prop, + {'dc_api_version': 'V2'}) + self.assertTrue(response_v2) + self.assertIn('Count_Person', response_v2) + # Note: The name of Count_Person is "Total population" + self.assertEqual(response_v2['Count_Person'], + {'name': '"Total population"'}) + + def test_dc_api_resolve_placeid(self): + """Test API wrapper to resolve entity using a placeid.""" + placeids = ['ChIJT3IGqvxznW4Rqgw7pv9zYz8'] + response = dc_api.dc_api_resolve_placeid(placeids) + self.assertTrue(response) + self.assertIn('ChIJT3IGqvxznW4Rqgw7pv9zYz8', response) + self.assertEqual(response['ChIJT3IGqvxznW4Rqgw7pv9zYz8'], + 'wikidataId/Q9727') + def test_dc_api_resolve_latlng(self): """Test API wrapper for latlng resolution.""" latlngs = [{'latitude': 37.42, 'longitude': -122.08}] From 5c6eaef2fb8b85c1450afe8fe45e39f9f1def63a Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 4 Dec 2025 16:09:38 +0530 Subject: [PATCH 02/14] lint fix --- scripts/fires/wfigs_data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/fires/wfigs_data.py b/scripts/fires/wfigs_data.py index c9aa870ef2..60d21b03d9 100644 --- a/scripts/fires/wfigs_data.py +++ b/scripts/fires/wfigs_data.py @@ -130,7 +130,8 @@ def process_df(df): # convert epoch time in milliseconds to datetime def epoch_to_datetime(date_val): if not pd.isna(date_val): - return datetime.datetime.fromtimestamp(date_val / 1000, datetime.timezone.utc) + return datetime.datetime.fromtimestamp(date_val / 1000, + datetime.timezone.utc) else: return None From 551184af39e65629db018674d136e1a2398460ba Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 4 Dec 2025 22:51:00 +0530 Subject: [PATCH 03/14] rename config --- util/dc_api_wrapper.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index aa31e56e40..857e5a0bfd 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -260,25 +260,25 @@ def get_datacommons_client(config: dict = None) -> DataCommonsClient: return DataCommonsClient(api_key=api_key, dc_instance=dc_instance, url=url) -def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = {}) -> dict: +def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: """Returns a dictionary with dcids mapped to True/False based on whether the dcid is defined in the API and has a 'typeOf' property. Uses the property_value() DC API to lookup 'typeOf' for each dcid. dcids not defined in KG get a value of False. Args: dcids: List of dcids. The namespace is stripped from the dcid. - wrapper_config: dictionary of configurationparameters for the wrapper. See + config: dictionary of configurationparameters for the wrapper. See dc_api_batched_wrapper and dc_api_wrapper for details. Returns: dictionary with each input dcid mapped to a True/False value. """ # Set parameters for V2 node API. - client = get_datacommons_client(wrapper_config) + client = get_datacommons_client(config) api_function = client.node.fetch_property_values args = {'properties': 'typeOf'} dcid_arg_kw = 'node_dcids' - if wrapper_config.get('dc_api_version', 'V2') != 'V2': + if config.get('dc_api_version', 'V2') != 'V2': # Set parameters for V1 API. api_function = dc.get_property_values args = { @@ -291,7 +291,7 @@ def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = {}) -> dict: dcids=dcids, args=args, dcid_arg_kw=dcid_arg_kw, - config=wrapper_config) + config=config) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) @@ -304,22 +304,22 @@ def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = {}) -> dict: def dc_api_get_node_property(dcids: list, prop: str, - wrapper_config: dict = {}) -> dict: + config: dict = {}) -> dict: """Returns a dictionary keyed by dcid with { prop:value } for each dcid. Uses the get_property_values() DC API to lookup the property for each dcid. Args: dcids: List of dcids. The namespace is stripped from the dcid. - wrapper_config: dictionary of configurationparameters for the wrapper. See + config: dictionary of configurationparameters for the wrapper. See dc_api_batched_wrapper and dc_api_wrapper for details. Returns: dictionary with each input dcid mapped to a True/False value. """ - is_v2 = wrapper_config.get('dc_api_version', 'V2') == 'V2' + is_v2 = config.get('dc_api_version', 'V2') == 'V2' # Set parameters for V2 node API. - client = get_datacommons_client(wrapper_config) + client = get_datacommons_client(config) api_function = client.node.fetch_property_values args = {'properties': prop} dcid_arg_kw = 'node_dcids' @@ -335,7 +335,7 @@ def dc_api_get_node_property(dcids: list, dcids=dcids, args=args, dcid_arg_kw=dcid_arg_kw, - config=wrapper_config) + config=config) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) @@ -364,22 +364,22 @@ def dc_api_get_node_property(dcids: list, def dc_api_get_node_property_values(dcids: list, - wrapper_config: dict = {}) -> dict: + config: dict = {}) -> dict: """Returns all the property values for a set of dcids from the DC API. Args: dcids: list of dcids to lookup - wrapper_config: configuration parameters for the wrapper. See + config: configuration parameters for the wrapper. See dc_api_batched_wrapper() and dc_api_wrapper() for details. Returns: dictionary with each dcid with the namspace 'dcid:' as the key mapped to a dictionary of property:value. """ - if wrapper_config.get('dc_api_version', 'V2') != 'V2': - return dc_api_v1_get_node_property_values(dcids, wrapper_config) + if config.get('dc_api_version', 'V2') != 'V2': + return dc_api_v1_get_node_property_values(dcids, config) # Lookup node properties using V2 node API - client = get_datacommons_client(wrapper_config) + client = get_datacommons_client(config) api_function = client.node.fetch args = {'expression': '->*'} dcid_arg_kw = 'node_dcids' @@ -387,7 +387,7 @@ def dc_api_get_node_property_values(dcids: list, dcids=dcids, args=args, dcid_arg_kw=dcid_arg_kw, - config=wrapper_config) + config=config) response = {} for dcid, arcs in api_result.items(): pvs = {} @@ -412,12 +412,12 @@ def dc_api_get_node_property_values(dcids: list, def dc_api_v1_get_node_property_values(dcids: list, - wrapper_config: dict = {}) -> dict: + config: dict = {}) -> dict: """Returns all the property values for a set of dcids from the DC V1 API. Args: dcids: list of dcids to lookup - wrapper_config: configuration parameters for the wrapper. See + config: configuration parameters for the wrapper. See dc_api_batched_wrapper() and dc_api_wrapper() for details. Returns: @@ -428,7 +428,7 @@ def dc_api_v1_get_node_property_values(dcids: list, api_function = dc.get_triples api_triples = dc_api_batched_wrapper(api_function, dcids, {}, - config=wrapper_config) + config=config) if api_triples: for dcid, triples in api_triples.items(): if (_strip_namespace(dcid) not in dcids and From 72d08943363bfc278086a6c30d0c6ba3d1cfa6c9 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Mon, 8 Dec 2025 17:22:55 +0530 Subject: [PATCH 04/14] review comments --- util/dc_api_wrapper.py | 121 +++++++++++++++++++++++++++-------------- 1 file changed, 81 insertions(+), 40 deletions(-) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 857e5a0bfd..63d6d5defb 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -96,6 +96,8 @@ def dc_api_wrapper( logging.debug(f'Using requests_cache for DC API {function}') else: cache_context = requests_cache.disabled() + logging.debug(f'Using requests_cache for DC API {function}') + with cache_context: for attempt in range(retries): try: logging.debug( @@ -103,19 +105,22 @@ def dc_api_wrapper( f' retries={retries}') response = None - # All calls serialize here to prevent races while updating the - # global Data Commons API root. - with _API_ROOT_LOCK: - original_api_root = dc.utils._API_ROOT - if api_root: - dc.utils._API_ROOT = api_root - logging.debug( - f'Setting DC API root to {api_root} for {function}') - - try: - response = function(**args) - finally: - dc.utils._API_ROOT = original_api_root + if api_root: + # All calls serialize here to prevent races while updating the + # global Data Commons API root. + with _API_ROOT_LOCK: + original_api_root = dc.utils._API_ROOT + if api_root: + dc.utils._API_ROOT = api_root + logging.debug( + f'Setting DC API root to {api_root} for {function}' + ) + try: + response = function(**args) + finally: + dc.utils._API_ROOT = original_api_root + else: + response = function(**args) logging.debug( f'Got API response {response} for {function}, {args}') @@ -124,24 +129,52 @@ def dc_api_wrapper( # Exception in case of missing dcid. Don't retry. logging.error(f'Got exception for api: {function}, {e}') return None - except (urllib.error.URLError, urllib.error.HTTPError, ValueError, - DCConnectionError, DCStatusError, APIError, - requests.exceptions.Timeout, + except (DCConnectionError, requests.exceptions.Timeout, requests.exceptions.ChunkedEncodingError) as e: - # Exception when server is overloaded, retry after a delay - if attempt >= retries: + # Retry network errors + if should_retry_status_code(None, attempt, retries): + logging.debug( + f'Got exception {e}, retrying API {function} after' + f' {retry_secs}...') + time.sleep(retry_secs) + else: logging.error( f'Got exception for api: {function}, {e}, no more retries' ) raise e - else: + except (urllib.error.HTTPError, DCStatusError, APIError) as e: + # Retry 5xx and 429, but not other 4xx + status_code = getattr(e, 'code', None) or getattr( + e, 'status_code', None) + if should_retry_status_code(status_code, attempt, retries): logging.debug( f'Got exception {e}, retrying API {function} after' f' {retry_secs}...') time.sleep(retry_secs) + else: + # Don't retry other errors (e.g. 400, 404, 401) + logging.error(f'Got exception for api: {function}, {e}') + raise e return None +def should_retry_status_code(status_code: int, attempt: int, + max_retries: int) -> bool: + """Returns True if the request should be retried. + Request can be retried for HTTP status codes like 429 or 5xx + if the number of attempts is less than max_retries.""" + if status_code: + if (status_code != 429 and status_code < 500): + # Do no retry for error codes like 401 + logging.error(f'Got status: {status_code}, not retrying.') + return False + if attempt >= max_retries: + logging.error( + f'Got status: {status_code} after {attempt} retries, not retrying.') + return False + return True + + def dc_api_batched_wrapper( function, dcids: list, @@ -178,6 +211,10 @@ def dc_api_batched_wrapper( api_result = {} index = 0 num_dcids = len(dcids) + dc_api_root = config.get('dc_api_root', None) + if config.get('dc_api_version', 'V2') == 'V2': + # V2 API assumes api root is set in the function's client + dc_api_root = None api_batch_size = config.get('dc_api_batch_size', dc.utils._MAX_LIMIT) logging.debug( f'Calling DC API {function} on {len(dcids)} dcids in batches of' @@ -195,18 +232,18 @@ def dc_api_batched_wrapper( config.get('dc_api_retries', 3), config.get('dc_api_retry_secs', 5), config.get('dc_api_use_cache', False), - config.get('dc_api_root', None), + dc_api_root, ) if batch_result: - _dc_api_merge_results(api_result, batch_result) + dc_api_merge_results(api_result, batch_result) logging.debug(f'Got DC API result for {function}: {batch_result}') logging.debug( f'Returning response {api_result} for {function}, {dcids}, {args}') return api_result -def _dc_api_merge_results(results: dict, new_result: dict) -> dict: - """Returns the merged dictionary from the results and new_result.""" +def dc_api_merge_results(results: dict, new_result: dict) -> dict: + """Returns the merged dictionary with new_result added into results.""" if results is None: results = {} if not new_result: @@ -217,13 +254,23 @@ def _dc_api_merge_results(results: dict, new_result: dict) -> dict: if 'data' in new_result: new_result = new_result['data'] # Update new_result into results if keys are different. - if len(results) != len(new_result) or results.keys() != new_result.keys(): - results.update(new_result) - return results - else: - # Both have the same nested dicts. Merged the nested dicts. - for key in new_result.keys(): - results[key] = _dc_api_merge_results(results[key], new_result[key]) + for key, value in new_result.items(): + old_value = results.get(key) + if not old_value: + # New key, add it. + results[key] = value + else: + if isinstance(old_value, dict) and isinstance(value, dict): + # Merge the nested dicts. + dc_api_merge_results(old_value, value) + elif isinstance(old_value, list) and isinstance(value, list): + # Append new list + old_value.extend(value) + else: + # Replace with new value + old_value = value + results[key] = old_value + return results @@ -302,9 +349,7 @@ def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: return response -def dc_api_get_node_property(dcids: list, - prop: str, - config: dict = {}) -> dict: +def dc_api_get_node_property(dcids: list, prop: str, config: dict = {}) -> dict: """Returns a dictionary keyed by dcid with { prop:value } for each dcid. Uses the get_property_values() DC API to lookup the property for each dcid. @@ -363,8 +408,7 @@ def dc_api_get_node_property(dcids: list, return response -def dc_api_get_node_property_values(dcids: list, - config: dict = {}) -> dict: +def dc_api_get_node_property_values(dcids: list, config: dict = {}) -> dict: """Returns all the property values for a set of dcids from the DC API. Args: @@ -411,8 +455,7 @@ def dc_api_get_node_property_values(dcids: list, return response -def dc_api_v1_get_node_property_values(dcids: list, - config: dict = {}) -> dict: +def dc_api_v1_get_node_property_values(dcids: list, config: dict = {}) -> dict: """Returns all the property values for a set of dcids from the DC V1 API. Args: @@ -426,9 +469,7 @@ def dc_api_v1_get_node_property_values(dcids: list, """ predefined_nodes = OrderedDict() api_function = dc.get_triples - api_triples = dc_api_batched_wrapper(api_function, - dcids, {}, - config=config) + api_triples = dc_api_batched_wrapper(api_function, dcids, {}, config=config) if api_triples: for dcid, triples in api_triples.items(): if (_strip_namespace(dcid) not in dcids and From d69478a43b6f2f9c43f094cc8c9cf58f18fb663d Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Mon, 8 Dec 2025 17:25:23 +0530 Subject: [PATCH 05/14] lint --- util/dc_api_wrapper.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 63d6d5defb..118d91d11b 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -132,7 +132,7 @@ def dc_api_wrapper( except (DCConnectionError, requests.exceptions.Timeout, requests.exceptions.ChunkedEncodingError) as e: # Retry network errors - if should_retry_status_code(None, attempt, retries): + if _should_retry_status_code(None, attempt, retries): logging.debug( f'Got exception {e}, retrying API {function} after' f' {retry_secs}...') @@ -146,7 +146,7 @@ def dc_api_wrapper( # Retry 5xx and 429, but not other 4xx status_code = getattr(e, 'code', None) or getattr( e, 'status_code', None) - if should_retry_status_code(status_code, attempt, retries): + if _should_retry_status_code(status_code, attempt, retries): logging.debug( f'Got exception {e}, retrying API {function} after' f' {retry_secs}...') @@ -158,8 +158,8 @@ def dc_api_wrapper( return None -def should_retry_status_code(status_code: int, attempt: int, - max_retries: int) -> bool: +def _should_retry_status_code(status_code: int, attempt: int, + max_retries: int) -> bool: """Returns True if the request should be retried. Request can be retried for HTTP status codes like 429 or 5xx if the number of attempts is less than max_retries.""" From 4d9a5755fc0d34a8fd605d8c05ba9742e3de7f52 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Mon, 8 Dec 2025 17:27:35 +0530 Subject: [PATCH 06/14] lint --- util/dc_api_wrapper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 118d91d11b..2e084eb766 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -303,7 +303,6 @@ def get_datacommons_client(config: dict = None) -> DataCommonsClient: # Set the URL to v2 prod endpoint. url = urllib.parse.urljoin(dc_instance, 'v2') dc_instance = None - logging.info(f'DEL: using {dc_instance}, {url}') return DataCommonsClient(api_key=api_key, dc_instance=dc_instance, url=url) From 53a6c534996311c5ec0ea88a0482ab7e8a95512f Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 9 Dec 2025 14:12:03 +0530 Subject: [PATCH 07/14] lint --- tools/statvar_importer/place/place_resolver.py | 9 ++++++--- tools/statvar_importer/place/wiki_place_resolver.py | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/tools/statvar_importer/place/place_resolver.py b/tools/statvar_importer/place/place_resolver.py index 4781e48323..24f8be0cfb 100644 --- a/tools/statvar_importer/place/place_resolver.py +++ b/tools/statvar_importer/place/place_resolver.py @@ -65,7 +65,8 @@ ) flags.DEFINE_string('resolve_output_csv', '', 'Output csv with place dcids.') flags.DEFINE_list('resolve_place_names', [], 'List of place names to resolve.') -flags.DEFINE_string('maps_key', os.environ.get('MAPS_API_KEY', ''), 'Google Maps API key') +flags.DEFINE_string('maps_key', os.environ.get('MAPS_API_KEY', ''), + 'Google Maps API key') flags.DEFINE_string( 'resolve_config', '', @@ -110,7 +111,8 @@ 'https://autopush.api.datacommons.org/v2/resolve', 'DC API URL for resolve.', ) -flags.DEFINE_string('resolve_api_key', os.environ.get('DC_API_KEY', ''), 'DC API key for resolve.') +flags.DEFINE_string('resolve_api_key', os.environ.get('DC_API_KEY', ''), + 'DC API key for resolve.') flags.DEFINE_integer( 'dc_api_batch_size', 3, @@ -173,7 +175,8 @@ def __init__( self._counters = Counters(counters_dict) self._log_every_n = self._config.get('log_every_n', 10) if not self._maps_api_key: - self._maps_api_key = self._config.get('maps_api_key', os.environ.get('MAPS_API_KEY')) + self._maps_api_key = self._config.get( + 'maps_api_key', os.environ.get('MAPS_API_KEY')) self._place_name_matcher = PlaceNameMatcher( config=self._config.get_configs()) self._load_cache() diff --git a/tools/statvar_importer/place/wiki_place_resolver.py b/tools/statvar_importer/place/wiki_place_resolver.py index 008f8c21c0..a33093cd1f 100644 --- a/tools/statvar_importer/place/wiki_place_resolver.py +++ b/tools/statvar_importer/place/wiki_place_resolver.py @@ -74,7 +74,8 @@ 'List of wiki place property codes, such as P31', ) flags.DEFINE_string( - 'custom_search_key', os.environ.get('SEARCH_API_KEY', ''), 'API key for Google custom search API.' + 'custom_search_key', os.environ.get('SEARCH_API_KEY', ''), + 'API key for Google custom search API.' 'Get an API key at https://developers.google.com/custom-search/v1/introduction.' ) flags.DEFINE_integer('wiki_place_pprof_port', 0, From d64a5695374d1e586cebfebd070605f6606328f7 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 9 Dec 2025 14:19:44 +0530 Subject: [PATCH 08/14] migrate another call --- tools/statvar_importer/place/place_resolver.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tools/statvar_importer/place/place_resolver.py b/tools/statvar_importer/place/place_resolver.py index 24f8be0cfb..330677cb0f 100644 --- a/tools/statvar_importer/place/place_resolver.py +++ b/tools/statvar_importer/place/place_resolver.py @@ -1078,11 +1078,9 @@ def _cache_contained_in_places(self, dcids: list, level: int = 0): parent_dcids.update(_get_value_set(parents)) if lookup_dcids: - dc_api_args = {'prop': 'containedInPlace', 'out': True} - dc_api_resp = dc_api_batched_wrapper( - function=dc.get_property_values, + dc_api_resp = dc_api_get_node_property( dcids=lookup_dcids, - args=dc_api_args, + prop='containedInPlace', config=self._config.get_configs(), ) self._counters.add_counter( @@ -1090,8 +1088,9 @@ def _cache_contained_in_places(self, dcids: list, level: int = 0): len(lookup_dcids)) # Cache the property:value for the response. for dcid, prop_values in dc_api_resp.items(): - if prop_values: - values_set = _get_value_set(prop_values) + val = prop_values.get('containedInPlace') + if val: + values_set = _get_value_set(val) self._set_cache_value( '', { From 436ba55a9f6c39c830616bdbc1ec8592a81c0ded Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 16 Dec 2025 16:39:09 +0530 Subject: [PATCH 09/14] Update failing test that uses api key --- util/download_util_test.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/util/download_util_test.py b/util/download_util_test.py index 265b8aa1f7..7b266df1dc 100644 --- a/util/download_util_test.py +++ b/util/download_util_test.py @@ -38,20 +38,19 @@ def setUp(self): def test_request_url(self): # Download URL with GET parameters - countries = download_util.request_url( - url='https://api.datacommons.org/v2/node', + response = download_util.request_url( + url='https://api.datacommons.org/v2/resolve', params={ - 'nodes': ['Earth'], - 'property': '<-containedInPlace+{typeOf:Country}' + 'nodes': ['india'], + 'property': '<-description->dcid' }, output='JSON') - self.assertTrue(isinstance(countries, dict)) + self.assertTrue(isinstance(response, dict)) try: - nodes = countries['data']['Earth']['arcs']['containedInPlace+'][ - 'nodes'] + nodes = response['entities'][0]['candidates'] except (KeyError, TypeError) as e: self.fail( - f'Failed to parse nodes from API response. Check response structure. Error: {e}. Response: {countries}' + f'Failed to parse nodes from API response. Check response structure. Error: {e}. Response: {response}' ) country_dcids = [node.get('dcid', '') for node in nodes] self.assertIn('country/IND', country_dcids) From b721d760c3265aaf25968f50a11d77b89e42ffb1 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 16 Dec 2025 17:12:01 +0530 Subject: [PATCH 10/14] lint --- util/download_util_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/download_util_test.py b/util/download_util_test.py index 7b266df1dc..e629ebf002 100644 --- a/util/download_util_test.py +++ b/util/download_util_test.py @@ -47,7 +47,7 @@ def test_request_url(self): output='JSON') self.assertTrue(isinstance(response, dict)) try: - nodes = response['entities'][0]['candidates'] + nodes = response['entities'][0]['candidates'] except (KeyError, TypeError) as e: self.fail( f'Failed to parse nodes from API response. Check response structure. Error: {e}. Response: {response}' From e701d9ff0caed63416bde2ec1b525f07ae974b1f Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Mon, 22 Dec 2025 17:19:11 +0530 Subject: [PATCH 11/14] fix unittest --- util/dc_api_wrapper_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/util/dc_api_wrapper_test.py b/util/dc_api_wrapper_test.py index 2a555796d2..184127922a 100644 --- a/util/dc_api_wrapper_test.py +++ b/util/dc_api_wrapper_test.py @@ -69,7 +69,6 @@ def test_dc_api_is_defined_dcid(self): dcids, { 'dc_api_batch_size': 2, - 'dc_api_root': 'http://autopush.api.datacommons.org/', }, ) self.assertTrue(response is not None) From f59c90005be39e28345ddee2a8a719dcb9871240 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 23 Dec 2025 16:27:54 +0530 Subject: [PATCH 12/14] update mcf_dict to use dc_api_wrapper --- util/mcf_dict_util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/mcf_dict_util.py b/util/mcf_dict_util.py index 844500edf0..03226c6ec3 100644 --- a/util/mcf_dict_util.py +++ b/util/mcf_dict_util.py @@ -65,7 +65,7 @@ _MODULE_DIR = os.path.dirname(os.path.realpath(__file__)) path.insert(1, os.path.join(_MODULE_DIR, '../')) -from scripts.us_census.acs5yr.subject_tables.common.datacommons_api_wrappers.datacommons_wrappers import dc_check_existence +from dc_api_wrapper import dc_api_is_defined_dcid PREFIX_LIST = ['dcs', 'dcid', 'l', 'schema'] @@ -298,7 +298,7 @@ def node_list_check_existence_dc(node_list: list) -> dict: Dict object with dcids as key values and boolean values signifying existence as values. """ dcid_list = get_dcids_node_list(node_list) - return dc_check_existence(dcid_list) + return dc_api_is_defined_dcid(dcid_list) def node_list_check_existence_node_list(node_list: list, From a094db752ab685a6d3348a43f069f4e5d1ac76f2 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 23 Dec 2025 19:30:46 +0530 Subject: [PATCH 13/14] default statvar processor api root to prod --- tools/statvar_importer/config_flags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/statvar_importer/config_flags.py b/tools/statvar_importer/config_flags.py index 19673bc34a..d5214a3510 100644 --- a/tools/statvar_importer/config_flags.py +++ b/tools/statvar_importer/config_flags.py @@ -370,7 +370,7 @@ def get_default_config() -> dict: True, # Settings for DC API. 'dc_api_root': - 'http://autopush.api.datacommons.org', + 'http://api.datacommons.org', 'dc_api_use_cache': False, 'dc_api_batch_size': From 74e504e4fe7154214fd8d84dd8f33d612ac117f5 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Mon, 9 Feb 2026 17:00:41 +0530 Subject: [PATCH 14/14] Migrate v1 calls to v2 --- import-automation/executor/requirements.txt | 1 - scripts/earthengine/process_events.py | 15 +- scripts/earthengine/utils.py | 20 +- .../common/dcid_existence.py | 21 +- scripts/fires/wfigs_data.py | 1 - scripts/glims/rgi/process.py | 9 +- .../create_place_to_grid_area_mapping.py | 28 ++- scripts/rff/preprocess_raster.py | 6 +- scripts/un/boundaries/country_boundaries.py | 23 +- .../us_census/enhanced_tmcf/process_etmcf.py | 11 +- .../us_census/geojsons_low_res/download.py | 25 ++- scripts/us_epa/util/facilities_helper.py | 14 +- scripts/us_nces/common/dcid_mcf_existance.py | 22 +- .../statvar_importer/place/place_resolver.py | 1 + util/dc_api_wrapper.py | 208 ++++-------------- util/latlng_recon_geojson.py | 3 +- util/string_utils.py | 76 +++++++ 17 files changed, 243 insertions(+), 241 deletions(-) create mode 100644 util/string_utils.py diff --git a/import-automation/executor/requirements.txt b/import-automation/executor/requirements.txt index 9ae77bed1a..43a51fe0c5 100644 --- a/import-automation/executor/requirements.txt +++ b/import-automation/executor/requirements.txt @@ -7,7 +7,6 @@ chardet chromedriver_py croniter dataclasses -datacommons datacommons_client db-dtypes duckdb diff --git a/scripts/earthengine/process_events.py b/scripts/earthengine/process_events.py index dbd2671a31..2d6e66d8c4 100644 --- a/scripts/earthengine/process_events.py +++ b/scripts/earthengine/process_events.py @@ -21,7 +21,6 @@ import pickle import sys import time -import datacommons as dc from absl import app from absl import flags @@ -74,7 +73,7 @@ from counters import Counters from latlng_recon_geojson import LatLng2Places from config_map import ConfigMap -from dc_api_wrapper import dc_api_batched_wrapper +from dc_api_wrapper import dc_api_get_property # List of place types in increasing order of preference for name. # This is used to pick the name of the place from the list of affectedPlaces @@ -699,13 +698,11 @@ def prefetch_placeid_property(self, prop: str, place_ids: list = None): if lookup_places: start_time = time.perf_counter() - cache_dict.update( - dc_api_batched_wrapper(function=dc.get_property_values, - dcids=lookup_places, - args={ - 'prop': prop, - }, - config=self._config)) + place_props = dc_api_get_property(lookup_places, prop) + for placeid, prop_value in place_props.items(): + value = prop_value.get(prop) + if value: + cache_dict[placeid] = value end_time = time.perf_counter() self._counters.add_counter(f'dc_api_lookup_{prop}_count', len(lookup_places)) diff --git a/scripts/earthengine/utils.py b/scripts/earthengine/utils.py index 2f014c7dd2..21b3def784 100644 --- a/scripts/earthengine/utils.py +++ b/scripts/earthengine/utils.py @@ -26,7 +26,6 @@ from typing import Union from absl import logging -import datacommons as dc from dateutil.relativedelta import relativedelta from geopy import distance import s2sphere @@ -41,7 +40,8 @@ os.path.join(os.path.dirname(os.path.dirname(_SCRIPTS_DIR)), 'util')) from config_map import ConfigMap, read_py_dict_from_file, write_py_dict_to_file -from dc_api_wrapper import dc_api_wrapper +from dc_api_wrapper import dc_api_get_node_property +from string_utils import str_to_list # Constants _MAX_LATITUDE = 90.0 @@ -368,23 +368,13 @@ def place_id_to_lat_lng(placeid: str, # Get the lat/lng from the DC API latlng = [] for prop in ['latitude', 'longitude']: - # dc.utils._API_ROOT = 'http://autopush.api.datacommons.org' - # resp = dc.get_property_values([placeid], prop) - resp = dc_api_wrapper( - function=dc.get_property_values, - args={ - 'dcids': [placeid], - 'prop': prop, - }, - use_cache=True, - api_root=_DC_API_ROOT, - ) + resp = dc_api_get_node_property([placeid], prop) if not resp or placeid not in resp: return (0, 0) - values = resp[placeid] + values = str_to_list(resp[placeid].get(prop)) if not len(values): return (0, 0) - latlng.append(float(values[0])) + latlng.append(float(values[0].strip('"'))) lat = latlng[0] lng = latlng[1] return (lat, lng) diff --git a/scripts/eurostat/health_determinants/common/dcid_existence.py b/scripts/eurostat/health_determinants/common/dcid_existence.py index f6fe6e2f17..7bd3a5deb1 100644 --- a/scripts/eurostat/health_determinants/common/dcid_existence.py +++ b/scripts/eurostat/health_determinants/common/dcid_existence.py @@ -14,6 +14,13 @@ """_summary_ Script to check the property/dcid/nodes existance in datacommons.org. """ +import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../../util')) + +from dc_api_wrapper import dc_api_is_defined_dcid import datacommons @@ -30,17 +37,7 @@ def check_dcid_existence(nodes: list) -> dict: dict: Status dictionary. """ # pylint: disable=protected-access - nodes_response = datacommons.get_property_values( - nodes, - "typeOf", - out=True, - value_type=None, - limit=datacommons.utils._MAX_LIMIT) + # pylint: disable=protected-access + node_status = dc_api_is_defined_dcid(nodes) # pylint: enable=protected-access - node_status = {} - for node, value in nodes_response.items(): - if value == []: - node_status[node] = False - else: - node_status[node] = True return node_status diff --git a/scripts/fires/wfigs_data.py b/scripts/fires/wfigs_data.py index 60d21b03d9..1ce395bdfb 100644 --- a/scripts/fires/wfigs_data.py +++ b/scripts/fires/wfigs_data.py @@ -27,7 +27,6 @@ import pickle import re import requests -import datacommons as dc import sys _SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) diff --git a/scripts/glims/rgi/process.py b/scripts/glims/rgi/process.py index 450b4abd14..557eea8c5f 100644 --- a/scripts/glims/rgi/process.py +++ b/scripts/glims/rgi/process.py @@ -18,6 +18,11 @@ import json import glob import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../util')) +from dc_api_wrapper import dc_api_get_values from shapely import geometry from absl import app from absl import flags @@ -30,14 +35,14 @@ def _load_geojsons(): countries = dc.get_places_in(['Earth'], 'Country')['Earth'] - resp = dc.get_property_values(countries, 'geoJsonCoordinatesDP2') + resp = dc_api_get_values(countries, 'geoJsonCoordinatesDP2') geojsons = {} for p, gj in resp.items(): if not gj: continue geojsons[p] = geometry.shape(json.loads(gj[0])) print('Got', len(geojsons), 'geojsons!') - cip = dc.get_property_values(countries, 'containedInPlace') + cip = dc_api_get_values(countries, 'containedInPlace') return geojsons, cip diff --git a/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py b/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py index 19d0b2560b..bdd87c1ede 100644 --- a/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py +++ b/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py @@ -23,15 +23,22 @@ """ from shapely import geometry -import datacommons as dc import concurrent.futures from typing import List, Optional import json import csv +import sys +import os from absl import flags from absl import app +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.join(_SCRIPT_DIR.split('/data/', 1)[0], 'data', 'util')) + +import dc_api_wrapper as dc_api + FLAGS = flags.FLAGS flags.DEFINE_string('gpcc_spi_places_with_csv', @@ -59,20 +66,27 @@ def construct_one_degree_grid_polygons() -> List[geometry.box]: def get_place_by_type(parent_places, places_types: List[str]) -> List[str]: """Return the place ids of all places contained in a set of parent places.""" + dc_api_client = dc_api.get_datacommons_client() all_types_of_places = [] for place_type in places_types: - parent_place_to_places = dc.get_places_in(parent_places, place_type) - for places in parent_place_to_places.values(): - for place in places: - all_types_of_places.extend(place) + parent_place_to_places = dc_api.dc_api_batched_wrapper( + dc_api_client.node.fetch_place_descendants, + parent_places, {'descendants_type': place_type}, + dcid_arg_kw='place_dcids') + for child_places in parent_place_to_places.values(): + for place in child_places: + place_dcid = place.get('dcid') + if place_dcid: + all_types_of_places.append(place_dcid) return all_types_of_places def places_to_geo_jsons(places: List[str]): """Get geojsons for a list of places.""" - resp = dc.get_property_values(places, 'geoJsonCoordinates') + resp = dc_api.dc_api_get_node_property(places, 'geoJsonCoordinates') geojsons = {} - for p, gj in resp.items(): + for p, gj_value in resp.items(): + gj = gj_value.get('geoJsonCoordinates') if not gj: continue geojsons[p] = geometry.shape(json.loads(gj[0])) diff --git a/scripts/rff/preprocess_raster.py b/scripts/rff/preprocess_raster.py index 2aff529ae7..fbdf7fa836 100644 --- a/scripts/rff/preprocess_raster.py +++ b/scripts/rff/preprocess_raster.py @@ -10,6 +10,8 @@ RFF_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(RFF_DIR) +sys.path.append(os.path.join(RFF_DIR, '../../util')) +from dc_api_wrapper import dc_api_get_values from rff import util bandname_to_gdcStatVars = { @@ -37,11 +39,11 @@ def get_dcid(sp_scale, lat, lon): def get_county_geoid(lat, lon): counties = dc.get_places_in(['country/USA'], 'County')['country/USA'] - counties_simp = dc.get_property_values(counties, 'geoJsonCoordinatesDP1') + counties_simp = dc_api_get_values(counties, 'geoJsonCoordinatesDP1') point = geometry.Point(lon, lat) for p, gj in counties_simp.items(): if len(gj) == 0: - gj = dc.get_property_values([p], 'geoJsonCoordinates')[p] + gj = dc_api_get_values([p], 'geoJsonCoordinates')[p] if len(gj) == 0: # property not defined for one county in alaska continue if geometry.shape(json.loads(gj[0])).contains(point): diff --git a/scripts/un/boundaries/country_boundaries.py b/scripts/un/boundaries/country_boundaries.py index 9c519e7518..00ebc75d9c 100644 --- a/scripts/un/boundaries/country_boundaries.py +++ b/scripts/un/boundaries/country_boundaries.py @@ -23,16 +23,23 @@ from typing import Dict -import datacommons as dc import geopandas as gpd from geojson_rewind import rewind import json import os +import sys import requests from absl import app from absl import flags +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.join(_SCRIPT_DIR.split('/data/', 1)[0], 'data', 'util')) + +import dc_api_wrapper as dc_api +from string_utils import str_to_list + FLAGS = flags.FLAGS flags.DEFINE_string('input_file', 'data/UNGIS_BNDA.geojson', 'Input geojson file') @@ -194,10 +201,10 @@ def existing_codes(self, all_countries): Only countries with DCID of the form county/{code} are included. """ # Call DC API to get list of countries - dc_all_countries = dc.get_property_values(['Country'], - 'typeOf', - out=False, - limit=500)['Country'] + dc_all_countries = str_to_list( + dc_api.dc_api_get_property(['Country'], 'typeOf', + out=False).get('Country', + {}).get('typeOf', '')) dc_all_countries = set(dc_all_countries) def is_dc_country(iso): @@ -257,8 +264,10 @@ def build_cache(self, existing_codes): all_children.update(children) child2name = {} - for child, values in dc.get_property_values(list(all_children), - 'name').items(): + children_names = dc_api.dc_api_get_node_property( + list(all_children), 'name') + for child, prop_values in children_names.items(): + values = str_to_list(prop_values.get('name', '')) if values: child2name[child] = values[0] diff --git a/scripts/us_census/enhanced_tmcf/process_etmcf.py b/scripts/us_census/enhanced_tmcf/process_etmcf.py index fd51dbbf2d..d08567ddc2 100644 --- a/scripts/us_census/enhanced_tmcf/process_etmcf.py +++ b/scripts/us_census/enhanced_tmcf/process_etmcf.py @@ -1,6 +1,13 @@ import csv import datacommons as dc import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../util')) + +from dc_api_wrapper import dc_api_get_node_property +import datacommons as dc from absl import app from absl import flags @@ -70,9 +77,9 @@ def _get_places_not_found(census_geoids: List[str]) -> List[str]: for i in range(0, len(geo_ids), NUM_DCIDS_TO_QUERY): selected_geo_ids = geo_ids[i:i + NUM_DCIDS_TO_QUERY] selected_dcids = [geoId_to_dcids[g] for g in selected_geo_ids] - res = dc.get_property_values(selected_dcids, 'name') + res = dc_api_get_node_property(selected_dcids, 'name') for index in range(len(selected_dcids)): - if not res[selected_dcids[index]]: + if selected_dcids[index] not in res: geoIds_not_found.append(selected_geo_ids[index]) return geoIds_not_found diff --git a/scripts/us_census/geojsons_low_res/download.py b/scripts/us_census/geojsons_low_res/download.py index 0be1fdbd34..cbdb7359e4 100644 --- a/scripts/us_census/geojsons_low_res/download.py +++ b/scripts/us_census/geojsons_low_res/download.py @@ -20,6 +20,11 @@ import datacommons as dc import geojson import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../util')) +from dc_api_wrapper import dc_api_node_get_property class GeojsonDownloader: @@ -88,10 +93,11 @@ def download_data(self, place='country/USA', level=1): Raises: ValueError: If a Data Commons API call fails. """ - geolevel = dc.get_property_values([place], "typeOf") + place_types = dc_api.dc_api_node_get_property([place], "typeOf") + geo_level = str_to_list(place_types.get(place, {}).get("typeOf", "")) # There is an extra level of nesting in geojson files, so we have # to get the 0th element explicitly. - assert len(geolevel[place]) == 1 + assert len(geo_level) >= 1 geolevel = geolevel[place][0] for i in range(level): @@ -99,9 +105,18 @@ def download_data(self, place='country/USA', level=1): raise ValueError("Desired level does not exist.") geolevel = self.LEVEL_MAP[geolevel] - geos_contained_in_place = dc.get_places_in([place], geolevel)[place] - self.geojsons = dc.get_property_values(geos_contained_in_place, - "geoJsonCoordinates") + dc_api_client = dc_api.get_datacommons_client() + descendant_places = dc_api.dc_api_batched_wrapper( + dc_api_client.node.fetch_place_descendants, [place], { + 'descendants_type': geolevel + }, + dcid_arg_kw='place_dcids').get(place, {}) + geos_contained_in_place = [ + place_name.get('dcid') for place_name in descendant_places + ] + + self.geojsons = dc_api_get_values(geos_contained_in_place, + "geoJsonCoordinates") for area, coords in self.iter_subareas(): self.geojsons[area][0] = geojson.loads(coords) diff --git a/scripts/us_epa/util/facilities_helper.py b/scripts/us_epa/util/facilities_helper.py index 7662c36937..75ed450046 100644 --- a/scripts/us_epa/util/facilities_helper.py +++ b/scripts/us_epa/util/facilities_helper.py @@ -15,6 +15,7 @@ import os import ssl +import sys import datacommons import json @@ -25,6 +26,13 @@ from requests.structures import CaseInsensitiveDict from requests.exceptions import HTTPError +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.join(_SCRIPT_DIR.split('/data/', 1)[0], 'data', 'util')) + +import dc_api_wrapper as dc_api +from string_utils import str_to_list + _COUNTY_CANDIDATES_CACHE = {} @@ -116,11 +124,11 @@ def get_county_candidates(zcta): return _COUNTY_CANDIDATES_CACHE[zcta] candidate_lists = [] for prop in ['containedInPlace', 'geoOverlaps']: - resp = datacommons.get_property_values([zcta], + resp = dc_api.dc_api_get_node_property([zcta], prop, out=True, - value_type='County') - candidate_lists.append(sorted(resp[zcta])) + constraints={'typeOf': 'County'}) + candidate_lists.append(sorted(str_to_list(resp.get(zcta).get(prop, '')))) _COUNTY_CANDIDATES_CACHE[zcta] = candidate_lists return candidate_lists diff --git a/scripts/us_nces/common/dcid_mcf_existance.py b/scripts/us_nces/common/dcid_mcf_existance.py index 9c3d807112..d806fdc0c8 100644 --- a/scripts/us_nces/common/dcid_mcf_existance.py +++ b/scripts/us_nces/common/dcid_mcf_existance.py @@ -14,7 +14,13 @@ """ Script to check if properties, DCIDs, or nodes exist in datacommons.org. """ -import datacommons +import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../../util')) + +from dc_api_wrapper import dc_api_is_defined_dcid def check_dcid_existence(nodes: list) -> dict: @@ -28,17 +34,7 @@ def check_dcid_existence(nodes: list) -> dict: dict: Status dictionary. """ # pylint: disable=protected-access - nodes_response = datacommons.get_property_values( - nodes, - "typeOf", - out=True, - value_type=None, - limit=datacommons.utils._MAX_LIMIT) + # pylint: disable=protected-access + node_status = dc_api_is_defined_dcid(nodes) # pylint: enable=protected-access - node_status = {} - for node, value in nodes_response.items(): - if value == []: - node_status[node] = False - else: - node_status[node] = True return node_status diff --git a/tools/statvar_importer/place/place_resolver.py b/tools/statvar_importer/place/place_resolver.py index 330677cb0f..79f78d18b1 100644 --- a/tools/statvar_importer/place/place_resolver.py +++ b/tools/statvar_importer/place/place_resolver.py @@ -861,6 +861,7 @@ def filter_by_pvs( lookup_dcids_by_prop = {p: [] for p in lookup_props} for dcid in dcids: + place_props = places.get(dcid, {}) for prop in lookup_props: value = place_props.get(prop, '') if not value: diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 2e084eb766..e08e4c5f62 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -18,9 +18,6 @@ DC V2 API requires an environment variable set for DC_API_KEY. Please refer to https://docs.datacommons.org/api/python/v2 for more details. - -To use the legacy datacommons library module, set the config: - 'dc_api_version': 'V1' """ from collections import OrderedDict @@ -34,7 +31,6 @@ from absl import logging from datacommons_client.client import DataCommonsClient from datacommons_client.utils.error_handling import DCConnectionError, DCStatusError, APIError -import datacommons as dc import requests_cache _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -42,30 +38,20 @@ from download_util import request_url -# Path for reconciliation API in the dc.utils._API_ROOT -# For more details, please refer to: -# https://github.com/datacommonsorg/reconciliation#usage -# Resolve Id -# https://api.datacommons.org/v1/recon/resolve/id -_DC_API_PATH_RESOLVE_ID = '/v1/recon/resolve/id' # Resolve latlng coordinate # https://api.datacommons.org/v2/resolve _DC_API_PATH_RESOLVE_COORD = '/v2/resolve' # Default API key for limited tests _DEFAULT_DC_API_KEY = 'AIzaSyCTI4Xz-UW_G2Q2RfknhcfdAnTHq5X5XuI' -_API_ROOT_LOCK = threading.Lock() _DEFAULT_API_ROOT = 'https://api.datacommons.org' -def dc_api_wrapper( - function, - args: dict, - retries: int = 3, - retry_secs: int = 1, - use_cache: bool = False, - api_root: str = None, -): +def dc_api_wrapper(function, + args: dict, + retries: int = 3, + retry_secs: int = 1, + use_cache: bool = False): """Wrapper for a DC API call with retries and caching. Returns the result from the DC APi call function. In case of errors, retries @@ -78,9 +64,6 @@ def dc_api_wrapper( retries: Number of retries in case of HTTP errors. retry_sec: Interval in seconds between retries for which caller is blocked. use_cache: If True, uses request cache for faster response. - api_root: The API server to use. Default is 'http://api.datacommons.org'. To - use autopush with more recent data, set it to - 'http://autopush.api.datacommons.org' Returns: The response from the DataCommons API call. @@ -104,23 +87,7 @@ def dc_api_wrapper( f'Invoking DC API {function}, #{attempt} with {args},' f' retries={retries}') - response = None - if api_root: - # All calls serialize here to prevent races while updating the - # global Data Commons API root. - with _API_ROOT_LOCK: - original_api_root = dc.utils._API_ROOT - if api_root: - dc.utils._API_ROOT = api_root - logging.debug( - f'Setting DC API root to {api_root} for {function}' - ) - try: - response = function(**args) - finally: - dc.utils._API_ROOT = original_api_root - else: - response = function(**args) + response = function(**args) logging.debug( f'Got API response {response} for {function}, {args}') @@ -211,11 +178,7 @@ def dc_api_batched_wrapper( api_result = {} index = 0 num_dcids = len(dcids) - dc_api_root = config.get('dc_api_root', None) - if config.get('dc_api_version', 'V2') == 'V2': - # V2 API assumes api root is set in the function's client - dc_api_root = None - api_batch_size = config.get('dc_api_batch_size', dc.utils._MAX_LIMIT) + api_batch_size = config.get('dc_api_batch_size', 100) logging.debug( f'Calling DC API {function} on {len(dcids)} dcids in batches of' f' {api_batch_size} with args: {args}...') @@ -232,7 +195,6 @@ def dc_api_batched_wrapper( config.get('dc_api_retries', 3), config.get('dc_api_retry_secs', 5), config.get('dc_api_use_cache', False), - dc_api_root, ) if batch_result: dc_api_merge_results(api_result, batch_result) @@ -324,15 +286,6 @@ def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: api_function = client.node.fetch_property_values args = {'properties': 'typeOf'} dcid_arg_kw = 'node_dcids' - if config.get('dc_api_version', 'V2') != 'V2': - # Set parameters for V1 API. - api_function = dc.get_property_values - args = { - 'prop': 'typeOf', - 'out': True, - } - dcid_arg_kw = 'dcids' - api_result = dc_api_batched_wrapper(function=api_function, dcids=dcids, args=args, @@ -348,33 +301,31 @@ def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: return response -def dc_api_get_node_property(dcids: list, prop: str, config: dict = {}) -> dict: +def dc_api_get_node_property(dcids: list, + prop: str, + out: bool = True, + constraints: dict = {}, + config: dict = {}) -> dict: """Returns a dictionary keyed by dcid with { prop:value } for each dcid. Uses the get_property_values() DC API to lookup the property for each dcid. Args: dcids: List of dcids. The namespace is stripped from the dcid. + prop: proroty to be looked up. + out: If true, lookup values of the property for the given dcids. + If False, returns dcids for which the property has the value in dcids. config: dictionary of configurationparameters for the wrapper. See dc_api_batched_wrapper and dc_api_wrapper for details. Returns: dictionary with each input dcid mapped to a True/False value. """ - is_v2 = config.get('dc_api_version', 'V2') == 'V2' # Set parameters for V2 node API. client = get_datacommons_client(config) api_function = client.node.fetch_property_values - args = {'properties': prop} + args = {'properties': prop, 'out': out, 'constraints': constraints} dcid_arg_kw = 'node_dcids' - if not is_v2: - # Set parameters for V1 API. - api_function = dc.get_property_values - args = { - 'prop': prop, - 'out': True, - } - dcid_arg_kw = 'dcids' api_result = dc_api_batched_wrapper(function=api_function, dcids=dcids, args=args, @@ -387,23 +338,19 @@ def dc_api_get_node_property(dcids: list, prop: str, config: dict = {}) -> dict: if not node_data: continue - if is_v2: - values = [] - arcs = node_data.get('arcs', {}) - prop_nodes = arcs.get(prop, {}).get('nodes', []) - for node in prop_nodes: - val_dcid = node.get('dcid') - if val_dcid: - values.append(val_dcid) - value = node.get('value') - if value: - value = '"' + value + '"' - values.append(value) - if values: - response[dcid] = {prop: ','.join(values)} - else: # V1 - if node_data: - response[dcid] = {prop: node_data} + values = [] + arcs = node_data.get('arcs', {}) + prop_nodes = arcs.get(prop, {}).get('nodes', []) + for node in prop_nodes: + val_dcid = node.get('dcid') + if val_dcid: + values.append(val_dcid) + value = node.get('value') + if value: + value = '"' + value + '"' + values.append(value) + if values: + response[dcid] = {prop: ','.join(values)} return response @@ -419,8 +366,6 @@ def dc_api_get_node_property_values(dcids: list, config: dict = {}) -> dict: dictionary with each dcid with the namspace 'dcid:' as the key mapped to a dictionary of property:value. """ - if config.get('dc_api_version', 'V2') != 'V2': - return dc_api_v1_get_node_property_values(dcids, config) # Lookup node properties using V2 node API client = get_datacommons_client(config) api_function = client.node.fetch @@ -454,42 +399,6 @@ def dc_api_get_node_property_values(dcids: list, config: dict = {}) -> dict: return response -def dc_api_v1_get_node_property_values(dcids: list, config: dict = {}) -> dict: - """Returns all the property values for a set of dcids from the DC V1 API. - - Args: - dcids: list of dcids to lookup - config: configuration parameters for the wrapper. See - dc_api_batched_wrapper() and dc_api_wrapper() for details. - - Returns: - dictionary with each dcid with the namspace 'dcid:' as the key - mapped to a dictionary of property:value. - """ - predefined_nodes = OrderedDict() - api_function = dc.get_triples - api_triples = dc_api_batched_wrapper(api_function, dcids, {}, config=config) - if api_triples: - for dcid, triples in api_triples.items(): - if (_strip_namespace(dcid) not in dcids and - _add_namespace(dcid) not in dcids): - continue - pvs = {} - for d, prop, val in triples: - if d == dcid and val: - # quote string values with spaces if needed - if ' ' in val and val[0] != '"': - val = '"' + val + '"' - if prop in pvs: - val = pvs[prop] + ',' + val - pvs[prop] = val - if len(pvs) > 0: - if 'Node' not in pvs: - pvs['Node'] = _add_namespace(dcid) - predefined_nodes[_add_namespace(dcid)] = pvs - return predefined_nodes - - def dc_api_resolve_placeid(dcids: list, in_prop: str = 'placeId', *, @@ -507,48 +416,25 @@ def dc_api_resolve_placeid(dcids: list, """ if not config: config = {} - if config.get('dc_api_version', 'V2') == 'V2': - client = get_datacommons_client(config) - api_function = client.resolve.fetch - args = {'expression': f'<-{in_prop}->dcid'} - dcid_arg_kw = 'node_ids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) - results = {} - if api_result: - for node in api_result.get('entities', []): - place_id = node.get('node') - if place_id: - candidates = node.get('candidates', []) - if candidates: - dcid = candidates[0].get('dcid') - if dcid: - results[place_id] = dcid - return results - - # V1 implementation - api_root = config.get('dc_api_root', _DEFAULT_API_ROOT) - data = {'in_prop': in_prop, 'out_prop': 'dcid'} - data['ids'] = dcids - num_ids = len(dcids) - api_url = api_root + _DC_API_PATH_RESOLVE_ID - logging.debug( - f'Looking up {api_url} dcids for {num_ids} placeids: {data["ids"]}') - recon_resp = request_url(url=api_url, - params=data, - method='POST', - output='json') - # Extract the dcid for each place from the response + client = get_datacommons_client(config) + api_function = client.resolve.fetch + args = {'expression': f'<-{in_prop}->dcid'} + dcid_arg_kw = 'node_ids' + api_result = dc_api_batched_wrapper(function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config) results = {} - if recon_resp: - for entity in recon_resp.get('entities', []): - place_id = entity.get('inId', '') - out_dcids = entity.get('outIds', None) - if place_id and out_dcids: - results[place_id] = out_dcids[0] + if api_result: + for node in api_result.get('entities', []): + place_id = node.get('node') + if place_id: + candidates = node.get('candidates', []) + if candidates: + dcid = candidates[0].get('dcid') + if dcid: + results[place_id] = dcid return results diff --git a/util/latlng_recon_geojson.py b/util/latlng_recon_geojson.py index 9a301e543e..b0a89e01fa 100644 --- a/util/latlng_recon_geojson.py +++ b/util/latlng_recon_geojson.py @@ -22,6 +22,7 @@ import logging import time import urllib +from dc_api_wrapper import dc_api_get_values _WORLD = 'Earth' _USA = 'country/USA' @@ -39,7 +40,7 @@ def _get_geojsons(place_type, parent_place, retry=0): try: places = dc.get_places_in([parent_place], place_type)[parent_place] - resp = dc.get_property_values(places, _GJ_PROP[place_type]) + resp = dc_api_get_values(places, _GJ_PROP[place_type]) geojsons = {} for p, gj in resp.items(): if not gj: diff --git a/util/string_utils.py b/util/string_utils.py new file mode 100644 index 0000000000..c4ef88b9d8 --- /dev/null +++ b/util/string_utils.py @@ -0,0 +1,76 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""String utilities.""" + +import csv + + +def str_to_list(value: str) -> list: + """Returns the value as a list + + Args: + value: string with a single value or comma seperated list of values + + Returns: + value as a list. + """ + if isinstance(value, list): + return value + value_list = [] + # Read the string as a comma separated line. + is_quoted = '"' in value + try: + if is_quoted and "," in value: + # Read the string as a quoted comma separated line. + row = list( + csv.reader([value], + delimiter=',', + quotechar='"', + skipinitialspace=True))[0] + else: + # Without " quotes, the line can be split on commas. + # Avoiding csv reader calls for performance. + row = value.split(',') + for v in row: + val_normalized = to_quoted_string(v, is_quoted=is_quoted) + value_list.append(val_normalized) + except csv.Error: + logging.error( + f'Too large value {len(value)}, failed to convert to list') + value_list = [value] + return value_list + +def to_quoted_string(value: str, is_quoted: bool = None) -> str: + """Returns a quoted string if there are spaces and special characters. + + Args: + value: string value to be quoted if necessary. + is_quoted: if True, returns values as quotes strings. + + Returns: + value with optional double quotes. + """ + if not value or not isinstance(value, str): + return value + + value = value.strip('"') + value = value.strip() + if value.startswith('[') and value.endswith(']'): + return normalize_range(value) + if value and (' ' in value or ',' in value or is_quoted): + if value and value[0] != '"': + return '"' + value + '"' + return value + +