feat: Migrate task states and task messages from MongoDB to PostgreSQL #128
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Implements a phased migration strategy for task states and task messages from MongoDB to PostgreSQL with zero-downtime rollout capability. Both use the same dual-write/dual-read pattern controlled by independent feature flags.
Key changes:
TASK_STATE_STORAGE_PHASEandTASK_MESSAGE_STORAGE_PHASEfeature flagstask_statesandtask_messagesPostgreSQL tables with JSONB columnsstorage_backendquery parameter for testingMigration Phases
Each entity migrates independently via its own env var:
mongodbdual_writedual_readpostgresFiles Changed
Task State Migration
src/adapters/orm.py—TaskStateORMmodel (JSONB state column)src/domain/repositories/task_state_postgres_repository.py— PostgreSQL repositorysrc/domain/repositories/task_state_dual_repository.py— Dual-write wrappersrc/domain/use_cases/states_use_case.py— Updated to use dual repositorydatabase/migrations/.../postgres_task_state_07fc12196914.py— Alembic migrationscripts/backfill_task_states.py— Backfill MongoDB → PostgreSQLscripts/verify_task_states.py— Verify data consistencytests/unit/repositories/test_task_state_dual_repository.py— 35 teststests/unit/repositories/test_task_state_postgres_repository.py— 2 testsTask Message Migration
src/adapters/orm.py—TaskMessageORMmodel (JSONB content column, 3 indexes)src/domain/repositories/task_message_postgres_repository.py— PostgreSQL repository with JSONB filter translation and cursor paginationsrc/domain/repositories/task_message_dual_repository.py— Dual-write wrapper with MongoDB filter conversionsrc/domain/services/task_message_service.py— Updated to use dual repositorysrc/domain/use_cases/messages_use_case.py— Pass raw filters to storage layerdatabase/migrations/.../postgres_messages_b4d5f54e4ba2.py— Alembic migrationscripts/backfill_task_messages.py— Backfill MongoDB → PostgreSQLscripts/verify_task_messages.py— Verify data consistencytests/unit/repositories/test_task_message_dual_repository.py— 47 teststests/unit/repositories/test_task_message_postgres_repository.py— 16 testsShared / Config
src/config/environment_variables.py— Both storage phase env varssrc/adapters/orm.py— Both ORM modelsMetrics (dual_read phase)
Task States
task_state.dual_read.matchtask_state.dual_read.mismatch.missing_postgrestask_state.dual_read.mismatch.missing_mongodbtask_state.dual_read.mismatch.state_contenttask_state.dual_read.list_count_mismatchTask Messages
task_message.dual_read.matchtask_message.dual_read.mismatch.missing_postgrestask_message.dual_read.mismatch.missing_mongodbtask_message.dual_read.mismatch.contenttask_message.dual_read.list_count_mismatchRollout Plan
For each entity (task states, then task messages):
*_STORAGE_PHASE=mongodb(no behavior change)python scripts/backfill_task_{states,messages}.py*_STORAGE_PHASE=dual_write*_STORAGE_PHASE=dual_read, monitor metrics*_STORAGE_PHASE=postgresRollback
Set either
*_STORAGE_PHASEenv var back to the previous phase at any time.Test plan