DM-54070: Add support for APDB record updates#33
Conversation
35ac308 to
a1327a5
Compare
e66b7e7 to
2bd43ff
Compare
There was a problem hiding this comment.
Pull request overview
Adds end-to-end support for exporting, uploading, deduplicating, and merging APDB update records into PPDB BigQuery tables, along with supporting SQL resources and test coverage updates.
Changes:
- Introduces BigQuery “updates” subsystem (expand → load → deduplicate → merge) with SQL MERGE resources.
- Extends chunk export/upload metadata to track update-record presence and GCS location (
gcs_uri), and adds promotable-chunk SQL/query utilities. - Refactors tests/config to use a resource-based test schema path and adds multiple BigQuery/GCS integration tests.
Reviewed changes
Copilot reviewed 36 out of 41 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_updates_table.py | BigQuery UpdatesTable integration tests |
| tests/test_updates_merger.py | BigQuery MERGE integration tests |
| tests/test_updates_manager.py | End-to-end updates manager test |
| tests/test_update_records.py | UpdateRecords JSON + GCS uploader tests |
| tests/test_update_record_expander.py | Unit tests for update expansion |
| tests/test_ppdb_sql.py | Switch tests to schema resource URI |
| tests/test_ppdb_bigquery.py | Adds BigQuery test case scaffolding |
| requirements.txt | Removes ppdbx-gcp from base reqs |
| python/lsst/dax/ppdb/tests/config/init.py | Test config package init |
| python/lsst/dax/ppdb/tests/_updates.py | Shared synthetic update-record fixtures |
| python/lsst/dax/ppdb/tests/_ppdb.py | Adds test schema URI + mixin refactor |
| python/lsst/dax/ppdb/tests/_bigquery.py | BigQuery test mixins + uploader stub helpers |
| python/lsst/dax/ppdb/sql/_ppdb_sql.py | Engine creation API change for DB init |
| python/lsst/dax/ppdb/sql/_ppdb_sql_base.py | Refactors engine/connect-args helpers |
| python/lsst/dax/ppdb/ppdb.py | Imports config from new module |
| python/lsst/dax/ppdb/ppdb_config.py | New pydantic-based config loader |
| python/lsst/dax/ppdb/config/sql/select_promotable_chunks.sql | New SQL for promotable chunk selection |
| python/lsst/dax/ppdb/config/sql/merge_diasource_updates.sql | New MERGE SQL for DiaSource updates |
| python/lsst/dax/ppdb/config/sql/merge_diaobject_updates.sql | New MERGE SQL for DiaObject updates |
| python/lsst/dax/ppdb/config/sql/merge_diaforcedsource_updates.sql | New MERGE SQL for DiaForcedSource updates |
| python/lsst/dax/ppdb/config/schemas/test_apdb_schema.yaml | Adds test schema as package data |
| python/lsst/dax/ppdb/bigquery/updates/updates_table.py | Creates/loads/dedups updates table |
| python/lsst/dax/ppdb/bigquery/updates/updates_merger.py | Merger classes for applying updates |
| python/lsst/dax/ppdb/bigquery/updates/updates_manager.py | Orchestrates download/expand/load/merge |
| python/lsst/dax/ppdb/bigquery/updates/update_records.py | Pydantic model for update records JSON |
| python/lsst/dax/ppdb/bigquery/updates/update_record_expander.py | Expands logical updates into field updates |
| python/lsst/dax/ppdb/bigquery/updates/expanded_update_record.py | Model for a single expanded update row |
| python/lsst/dax/ppdb/bigquery/updates/init.py | Exports updates public API |
| python/lsst/dax/ppdb/bigquery/sql_resource.py | Loads SQL from package resources |
| python/lsst/dax/ppdb/bigquery/replica_chunk_promoter.py | Promotion workflow for staged chunks |
| python/lsst/dax/ppdb/bigquery/query_runner.py | Utility for running/logging BQ jobs |
| python/lsst/dax/ppdb/bigquery/ppdb_replica_chunk_extended.py | Adds gcs_uri to chunk metadata |
| python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py | Writes update_records.json; adds promotable-chunks query |
| python/lsst/dax/ppdb/bigquery/manifest.py | Tracks whether updates are included |
| python/lsst/dax/ppdb/bigquery/chunk_uploader.py | Uploads update_records.json; stores gcs_uri |
| python/lsst/dax/ppdb/bigquery/init.py | Exports ChunkUploader |
| python/lsst/dax/ppdb/_factory.py | Updates config import location |
| python/lsst/dax/ppdb/init.py | Re-exports new config module |
| pyproject.toml | Adds package data + gcp extra dep |
| docker/Dockerfile.replication | Adds build deps for Python packages |
| .gitignore | Ignores .scratch directory |
Comments suppressed due to low confidence (1)
python/lsst/dax/ppdb/tests/_bigquery.py:33
- This module imports
google.cloud.storageunconditionally. Because_bigquery.pyis used by multiple test cases/mixins, this can break test collection in environments where optional GCP dependencies aren’t installed. Wrap these imports intry/except(similar to other tests) and skip/disable GCP-dependent helpers when unavailable.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fc6daf7 to
0519130
Compare
6c7a5d3 to
4e54fc1
Compare
039bc08 to
0be5f74
Compare
0be5f74 to
3e1a408
Compare
andy-slac
left a comment
There was a problem hiding this comment.
I checked what I could, but it is too much to read in one go. Anyways, I left a bunch of comments and suggestions. My main issues:
- The code is structured in a way that makes it hard to extend with additional update types. SQL code looks particularly fragile.
- Error handling needs more attention, I think there are a lot of possible leaking low-level exceptions everywhere.
- Dumping update records to JSON will make it harder for other people to reuse those files, I think Parquet will be easier to read.
- You want to extend dax_apdb API instead of doing
getattron hardcoded field names of update records.
3e1a408 to
6ed9f72
Compare
The `test_ppdbBigQuery` module was also renamed to `tests_ppdb_bigquery` following snakecase conventions.
This was previously referred to as "deduplication," which was misleading, because the records do not represent duplicates. They are updates on the same combination of `(table, field, record_id)` where only the latest one should be kept.
247217e to
ae8a405
Compare
This also cleans up the test GCS bucket in the teardown of the test case.
adda2e9 to
1b9196a
Compare
Instead of a boolean flag, an update count is included in the manifest, which aligns with the record counts for the table files.
This is used for easily determining which replica chunks have associated updates during processing.
00c56b6 to
3402ff2
Compare
This adds startup and shutdown messages for the replication and adjusts some message levels so that the log is not so cluttered. We generally want to see messages when the replication activates and processes replica chunks. Printing information about wait intervals or when no chunks are available to replica is generall not very interesting for normal operations and these tend to bury other messages. So these types of messages have either been removed or set to DEBUG level.
22ea446 to
e69428e
Compare
|
Hi, @andy-slac. This should be ready for a re-review now. I rebased a significant amount of work onto previous commits, but once that became too awkward, I added new commits. All commits after 7ccf56f represent new work primarily based on your first review. I also went ahead and implemented the change mentioned in #33 (comment) on this branch after your review.
This was discussed on the DM-54070 Jira ticket, and DM-54636 has been created to track future work. I am not planning to rewrite these classes on this ticket branch, as the amount of testing and validation required for a new implementation is significant.
The error handling should be in better shape now. I added a few module-specific exception types and more granular error-handling to the
I thought this was a great idea, and I've changed the implementation to use parquet instead of JSON files for the update records. I stripped out the original JSON functionality as it was unused after making this change.
This was implemented on a dax_apdb branch, which I will merge before this one. |
This also makes a few minor updates to logging in several of the classes within the `updates` package.
e69428e to
7d7fde1
Compare
andy-slac
left a comment
There was a problem hiding this comment.
Looks OK, but I think it still misses validation of resulted updates.
The `record_payload` method will only return fields that were actually updated now, so this should be unneeded.
|
Thank you for the re-review, @andy-slac.
I have created DM-54650 to track this. |
Overview
This is a major update to the repository adding support for propagating APDB update records to the PPDB in BigQuery. The commit history was completely rebuilt and consolidated, and starting with it as a point of reference may be useful. In particular, 9e056c4 and a78ee70 contain the majority of the changes for implementing the new functionality.
As discussed previously, applying record updates using a typical RDMS pattern of SQL
UPDATEstatements would be infeasible given BigQuery's limitations in this area. In particular, the documentation on quotas indicates that there are limits on the number of update statements which may be executed per day. Given that single replica chunks may contain millions of individual update records, applying them usingUPDATEstatements would be infeasible, as executing this many queries would far exceed the quota. So a different approach needed to be taken where the updates are batched together and applied using a singleMERGEstatement.The process for applying the updates implemented on this branch is as follows:
Implementing this process required some major changes and additions to the existing infrastructure, outlined below.
Major Changes
updatespackage was added underbigquery, containing the following modules:update_records- Pydantic model for packaging a set of update records for a single replica chunkexpanded_update_record- expanded update record representing the changes in a common form within the initial BigQuery tableupdates_table- encapsulation of the initial target BQ table which contains the update information (also contains functionality for "deduplicating" the update records, described above)updates_merger- tool for merging the records in the updates table into the target APDB tables (DiaObject, etc.)updates_manager- manages the overall process of applying the updates during promotionbigquerypackage were modified to support the new functionality:ppdb_bigquerystoremethod by serializing them to a local JSON file in the chunk directorydbmodule indax_ppdb_gcpto this class so that they are more easily accessiblelsst-dm/dax_ppdbx_gcpinto this repository.dbmodule intoppdb_bigquery.replica_chunk_promotermodule for promoting replica chunk data from staging to production was copied into thebigquerypackage, modified, and renamed aschunk_promoter.Minor Changes
google-cloud-bigquerypackage was added as a primary dependency. The repository now depends heavily on this library after the updates in this ticket, so making it optional does not make sense for dependency management.lsst-dax-ppdbx-gcplibrary was also made a required dependency. It is similarly a core dependency now and does not make sense to have as optional.ppdb_bigquerymodule and all test modules.configmodule was renamed toppdb_config, because the primary class it defines isPpdbConfig(follows DM standards for module naming).sql_resourcemodule for loading SQL from a package resourceresourcesdirectory in the Python source treetestsdirectory intoresourcesdirectory where it is more easily accessibletestspackage to make certain methods available, e.g., for generating test dataKnown Issues
These are known issues that will be resolved on separate tickets.
ppdb_bigquerymodule. Some of this was preexisting and other overlapping methods were introduced in this PR. This can be cleaned up and consolidated (DM-54522).Determining what records are updated in the cloud pipeline is currently done by reading the chunk manifests from cloud storage during replication. It would be better if instead there was a field in theI will make this improvement on this PR.PpdbReplicaChunktable which tracked whether the chunk has updates, perhaps anupdate_countfield, so that this was unnecessary. (I may include this in DM-54522 or it could be a separate ticket.)updatespackage, as well as a few core, existing classes likechunk_uploaderandppdb_bigquery(DM-54536).testspackage in the Python source tree has some new modules, though they are a bit disorganized and miscellaneous. These should be cleaned up and consolidated. It is also possible that some of the classes should just be included into the test modules, even if there is minor duplication as a result.Additional Notes
This ticket includes some refactoring, porting of classes from other repositories, etc. that I now realize was excessive to include and should have been done on separate tickets. I will keep this in mind for the future and try to make changes on ticket branches more targeted, in particular, for ease of review and testing. The commit history of this PR should be useful for disambiguating some of this work, though beb2eab includes refactoring alongside updates for supporting the new functionality (These would have been difficult to separate out when I rebuilt the commit history.).
Disclosure on LLM Usage
I used Copilot extensively during the development of this ticket, in particular, for the following:
I reviewed all of the AI-generated code multiple times and made many changes to it. The test cases, in particular, could use further attention and consolidation (see above).