Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion tools/agentic_import/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This guide describes the complete process for importing CSV data into Data Commo
## Table of Contents

- [Agentic Import Tool for Data Commons](#agentic-import-tool-for-data-commons)
- [Table of Contents](#table-of-contents)
- [SDMX Quick Links](#sdmx-quick-links)
- [Prerequisites](#prerequisites)
- [Required Tools](#required-tools)
- [Setup](#setup)
Expand All @@ -26,6 +26,12 @@ This guide describes the complete process for importing CSV data into Data Commo
- [Gemini CLI Debugging](#gemini-cli-debugging)
- [Log Structure](#log-structure)

## SDMX Quick Links

- [SDMX import pipeline (end-to-end)](sdmx_import_pipeline.md)
- [SDMX Downloads (section)](#sdmx-downloads)
- [SDMX CLI documentation](../sdmx_import/README.md)

## Prerequisites

Before starting the import process, ensure you have the following installed and configured:
Expand Down Expand Up @@ -112,6 +118,7 @@ working_directory/
#### SDMX Downloads

Refer to the [SDMX CLI documentation](../sdmx_import/README.md) for details on downloading SDMX data and metadata files.
See the [SDMX import pipeline](sdmx_import_pipeline.md) for the end-to-end SDMX flow.

Extract a simplified, token-efficient JSON metadata copy from `metadata.xml`, retaining the original XML for later PV map generation.

Expand Down
35 changes: 32 additions & 3 deletions tools/agentic_import/generate_custom_dc_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@
'Path to output config.json file (required)')
flags.mark_flag_as_required('output_config')

flags.DEFINE_string('provenance_name', '<YOUR_PROVENANCE_NAME>',
'Name of the provenance')
flags.DEFINE_string('source_name', '<YOUR_SOURCE_NAME>', 'Name of the source')
flags.DEFINE_string('data_source_url', '<URL_OF_YOUR_DATA_SOURCE>',
'URL of the data source')
flags.DEFINE_string('dataset_url', '<URL_OF_THE_SPECIFIC_DATASET>',
'URL of the specific dataset')


class ConfigGenerator:
"""Generates Data Commons custom import config from CSV output."""
Expand Down Expand Up @@ -68,9 +76,19 @@ class ConfigGenerator:
'observationPeriod': 'observationPeriod'
}

def __init__(self, input_csv_path: str, output_config_path: str):
def __init__(self,
input_csv_path: str,
output_config_path: str,
provenance_name: str = '<YOUR_PROVENANCE_NAME>',
source_name: str = '<YOUR_SOURCE_NAME>',
data_source_url: str = '<URL_OF_YOUR_DATA_SOURCE>',
dataset_url: str = '<URL_OF_THE_SPECIFIC_DATASET>'):
self._input_csv_path = os.path.abspath(input_csv_path)
self._output_config_path = os.path.abspath(output_config_path)
self._provenance_name = provenance_name
self._source_name = source_name
self._data_source_url = data_source_url
self._dataset_url = dataset_url

def validate_input_file(self) -> None:
"""Validates that input CSV file exists and is readable."""
Expand Down Expand Up @@ -179,7 +197,11 @@ def generate_config(self, column_mappings: Dict[str, str]) -> str:

# Render template with context
config_content = template.render(output_filename=output_filename,
column_mappings=column_mappings)
column_mappings=column_mappings,
provenance_name=self._provenance_name,
source_name=self._source_name,
data_source_url=self._data_source_url,
dataset_url=self._dataset_url)

return config_content

Expand Down Expand Up @@ -228,7 +250,14 @@ def main(argv):
if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')

generator = ConfigGenerator(FLAGS.input_csv, FLAGS.output_config)
generator = ConfigGenerator(
input_csv_path=FLAGS.input_csv,
output_config_path=FLAGS.output_config,
provenance_name=FLAGS.provenance_name,
source_name=FLAGS.source_name,
data_source_url=FLAGS.data_source_url,
dataset_url=FLAGS.dataset_url,
)
generator.run()


Expand Down
23 changes: 17 additions & 6 deletions tools/agentic_import/sdmx_import_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The SDMX Agentic Import Pipeline is a Python-based system designed to automate t

The pipeline orchestrates several tools to handle the end-to-end import process:
1. **Download**: Retrieves data and metadata from SDMX endpoints.
2. **Metadata Extraction**: Converts SDMX metadata into JSON for downstream steps.
2. **Sample**: Creates a manageable sample of the data for analysis.
3. **Schema Mapping**: Generates Property-Value (PV) mappings using LLM-based tools.
4. **Full Data Processing**: Converts the full dataset into Data Commons MCF and CSV formats.
Expand Down Expand Up @@ -38,6 +39,9 @@ python $DC_DATA_REPO_PATH/tools/agentic_import/sdmx_import_pipeline.py \
--sdmx.endpoint="https://sdmx.example.org/data" \
--sdmx.agency="AGENCY_ID" \
--sdmx.dataflow.id="DATAFLOW_ID" \
--sdmx.dataflow.key="FREQ:Q" \
--sdmx.dataflow.key="REF_AREA:USA" \
--sdmx.dataflow.param="startPeriod:2022" \
--dataset_prefix="my_dataset"
```

Expand All @@ -46,11 +50,14 @@ python $DC_DATA_REPO_PATH/tools/agentic_import/sdmx_import_pipeline.py \
- `--sdmx.endpoint`: The SDMX API endpoint URL.
- `--sdmx.agency`: The SDMX agency ID.
- `--sdmx.dataflow.id`: The SDMX dataflow ID.
- `--sdmx.dataflow.key`: (Optional) Filter key for data download.
- `--sdmx.dataflow.param`: (Optional) Additional parameters for data download.
- `--sdmx.dataflow.key`: (Optional) Repeatable filter for data download, for example `--sdmx.dataflow.key=FREQ:Q --sdmx.dataflow.key=REF_AREA:USA`.
- `--sdmx.dataflow.param`: (Optional) Repeatable query parameter, for example `--sdmx.dataflow.param=startPeriod:2022 --sdmx.dataflow.param=endPeriod:2023`.
- `--dataset_prefix`: (Optional) Prefix for generated artifacts. Useful for disambiguating multiple datasets in the same working directory. If not provided, it is derived from the dataflow ID.
- `--sample.rows`: Number of rows for the sample dataset (default: 1000).
- `--force`: Force re-execution of all steps, ignoring saved state.
- `--run_only`: Execute only a single pipeline step by name.
- `--run_from`: Execute pipeline steps starting at the named step (inclusive).
- `--run_until`: Execute pipeline steps through the named step (inclusive).
- `--skip_confirmation`: Skip interactive confirmation prompts during schema mapping.
- `--verbose`: Enable verbose logging.

Expand All @@ -60,10 +67,11 @@ The pipeline consists of the following steps, executed in order:

1. **DownloadDataStep**: Downloads SDMX data to `<dataset_prefix>_data.csv`.
2. **DownloadMetadataStep**: Downloads SDMX metadata to `<dataset_prefix>_metadata.xml`.
3. **CreateSampleStep**: Creates `<dataset_prefix>_sample.csv` from the downloaded data.
4. **CreateSchemaMapStep**: Generates PV map and config in `sample_output/` using `pvmap_generator.py`.
5. **ProcessFullDataStep**: Processes the full data using `stat_var_processor.py` to generate artifacts in `output/`.
6. **CreateDcConfigStep**: Generates `output/<dataset_prefix>_config.json` for custom DC imports.
3. **ExtractMetadataStep**: Extracts SDMX metadata to `<dataset_prefix>_metadata.json`.
4. **CreateSampleStep**: Creates `<dataset_prefix>_sample.csv` from the downloaded data.
5. **CreateSchemaMapStep**: Generates PV map and config in `sample_output/` using `pvmap_generator.py`.
6. **ProcessFullDataStep**: Processes the full data using `stat_var_processor.py` to generate artifacts in `output/`.
7. **CreateDcConfigStep**: Generates `output/<dataset_prefix>_config.json` for custom DC imports.

## Directory Structure

Expand All @@ -73,6 +81,7 @@ The pipeline organizes outputs within the specified working directory:
working_dir/
├── <dataset_prefix>_data.csv # Raw downloaded data
├── <dataset_prefix>_metadata.xml # Raw downloaded metadata
├── <dataset_prefix>_metadata.json # Extracted metadata for downstream steps
├── <dataset_prefix>_sample.csv # Sampled data
├── .datacommons/
│ └── <dataset_prefix>.state.json # Pipeline state for resuming runs
Expand All @@ -92,6 +101,8 @@ The pipeline automatically saves its state to a `<dataset_prefix>.state.json` fi
- **Resuming**: If a run is interrupted, running the same command again will resume from the last successful step.
- **Skipping**: Steps that have already completed successfully will be skipped unless `--force` is used.
- **Input Hashing**: The pipeline tracks input configuration. If critical configuration changes, it may trigger re-execution of steps.
- **Run Only**: Use `--run_only=<step_name>` to execute just one step (for example, `download-metadata` or `create-schema-mapping`).
- **Run Range**: Use `--run_from=<step_name>` and/or `--run_until=<step_name>` to limit execution to a contiguous range of steps (inclusive). The range respects incremental state by default; use `--force` to rerun all steps in the range.

## Troubleshooting

Expand Down
18 changes: 12 additions & 6 deletions tools/agentic_import/sdmx_import_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ def _define_flags() -> None:
"Target SDMX dataflow identifier.")
flags.mark_flag_as_required(FLAG_SDMX_DATAFLOW_ID)

flags.DEFINE_string(FLAG_SDMX_DATAFLOW_KEY, None,
"Optional SDMX key or filter.")
flags.DEFINE_multi_string(FLAG_SDMX_DATAFLOW_KEY, [],
"Optional SDMX key or filter.")

flags.DEFINE_string(
FLAG_SDMX_DATAFLOW_PARAM, None,
flags.DEFINE_multi_string(
FLAG_SDMX_DATAFLOW_PARAM, [],
"Optional SDMX parameter appended to the dataflow query.")

flags.DEFINE_integer(_FLAG_SAMPLE_ROWS, 1000,
Expand All @@ -86,6 +86,10 @@ def _define_flags() -> None:

flags.DEFINE_string("run_only", None,
"Execute only a specific pipeline step by name.")
flags.DEFINE_string("run_from", None,
"Execute pipeline steps starting at the named step.")
flags.DEFINE_string("run_until", None,
"Execute pipeline steps through the named step.")

flags.DEFINE_boolean("force", False, "Force all steps to run.")

Expand Down Expand Up @@ -303,8 +307,8 @@ def prepare_config() -> PipelineConfig:
agency=FLAGS[FLAG_SDMX_AGENCY].value,
dataflow=SdmxDataflowConfig(
id=FLAGS[FLAG_SDMX_DATAFLOW_ID].value,
key=FLAGS[FLAG_SDMX_DATAFLOW_KEY].value,
param=FLAGS[FLAG_SDMX_DATAFLOW_PARAM].value,
key=tuple(FLAGS[FLAG_SDMX_DATAFLOW_KEY].value),
param=tuple(FLAGS[FLAG_SDMX_DATAFLOW_PARAM].value),
),
),
sample=SampleConfig(rows=FLAGS[_FLAG_SAMPLE_ROWS].value,),
Expand All @@ -313,6 +317,8 @@ def prepare_config() -> PipelineConfig:
dataset_prefix=FLAGS.dataset_prefix,
working_dir=FLAGS.working_dir,
run_only=FLAGS.run_only,
run_from=FLAGS.run_from,
run_until=FLAGS.run_until,
force=FLAGS.force,
verbose=FLAGS.verbose,
skip_confirmation=FLAGS.skip_confirmation,
Expand Down
Loading
Loading