Skip to content

Conversation

@smoreinis
Copy link
Collaborator

@smoreinis smoreinis commented Jan 13, 2026

Summary

Implements a phased migration strategy for task states and task messages from MongoDB to PostgreSQL with zero-downtime rollout capability. Both use the same dual-write/dual-read pattern controlled by independent feature flags.

Key changes:

  • Add TASK_STATE_STORAGE_PHASE and TASK_MESSAGE_STORAGE_PHASE feature flags
  • Create task_states and task_messages PostgreSQL tables with JSONB columns
  • Implement dual repository pattern for safe migration rollout of both entities
  • Add Datadog StatsD metrics for monitoring data consistency
  • Add benchmark suite and storage_backend query parameter for testing

Migration Phases

Each entity migrates independently via its own env var:

Phase Behavior
mongodb Legacy behavior - MongoDB only (default)
dual_write Write to both, read from MongoDB
dual_read Write to both, read from both + verify consistency
postgres PostgreSQL only (target state)

Files Changed

Task State Migration

  • src/adapters/orm.pyTaskStateORM model (JSONB state column)
  • src/domain/repositories/task_state_postgres_repository.py — PostgreSQL repository
  • src/domain/repositories/task_state_dual_repository.py — Dual-write wrapper
  • src/domain/use_cases/states_use_case.py — Updated to use dual repository
  • database/migrations/.../postgres_task_state_07fc12196914.py — Alembic migration
  • scripts/backfill_task_states.py — Backfill MongoDB → PostgreSQL
  • scripts/verify_task_states.py — Verify data consistency
  • tests/unit/repositories/test_task_state_dual_repository.py — 35 tests
  • tests/unit/repositories/test_task_state_postgres_repository.py — 2 tests

Task Message Migration

  • src/adapters/orm.pyTaskMessageORM model (JSONB content column, 3 indexes)
  • src/domain/repositories/task_message_postgres_repository.py — PostgreSQL repository with JSONB filter translation and cursor pagination
  • src/domain/repositories/task_message_dual_repository.py — Dual-write wrapper with MongoDB filter conversion
  • src/domain/services/task_message_service.py — Updated to use dual repository
  • src/domain/use_cases/messages_use_case.py — Pass raw filters to storage layer
  • database/migrations/.../postgres_messages_b4d5f54e4ba2.py — Alembic migration
  • scripts/backfill_task_messages.py — Backfill MongoDB → PostgreSQL
  • scripts/verify_task_messages.py — Verify data consistency
  • tests/unit/repositories/test_task_message_dual_repository.py — 47 tests
  • tests/unit/repositories/test_task_message_postgres_repository.py — 16 tests

Shared / Config

  • src/config/environment_variables.py — Both storage phase env vars
  • src/adapters/orm.py — Both ORM models

Metrics (dual_read phase)

Task States

Metric Description
task_state.dual_read.match Data matches between MongoDB and PostgreSQL
task_state.dual_read.mismatch.missing_postgres Missing in PostgreSQL
task_state.dual_read.mismatch.missing_mongodb Missing in MongoDB
task_state.dual_read.mismatch.state_content State content differs
task_state.dual_read.list_count_mismatch List counts differ

Task Messages

Metric Description
task_message.dual_read.match Data matches between MongoDB and PostgreSQL
task_message.dual_read.mismatch.missing_postgres Missing in PostgreSQL
task_message.dual_read.mismatch.missing_mongodb Missing in MongoDB
task_message.dual_read.mismatch.content Message content differs
task_message.dual_read.list_count_mismatch List counts differ

Rollout Plan

For each entity (task states, then task messages):

  1. Deploy with *_STORAGE_PHASE=mongodb (no behavior change)
  2. Run backfill: python scripts/backfill_task_{states,messages}.py
  3. Enable dual-write: Set *_STORAGE_PHASE=dual_write
  4. Enable dual-read: Set *_STORAGE_PHASE=dual_read, monitor metrics
  5. Switch to PostgreSQL: Set *_STORAGE_PHASE=postgres

Rollback

Set either *_STORAGE_PHASE env var back to the previous phase at any time.

Test plan

  • Unit tests pass (288 total: 35 + 2 task state, 47 + 16 task message, plus existing)
  • Integration tests with real databases
  • Manual verification of each phase transition
  • Monitor metrics during dual_read phase before final switch

Implement phased migration strategy for task states:
- Phase 0: Add feature flag TASK_STATE_STORAGE_PHASE
- Add TaskStateORM model and Alembic migration
- Create TaskStatePostgresRepository for PostgreSQL storage
- Create TaskStateDualRepository for phased rollout

Migration phases supported:
- mongodb: Legacy behavior (MongoDB only)
- dual_write: Write to both, read from MongoDB
- dual_read: Write to both, read from both with verification
- postgres: PostgreSQL only (target state)

Includes:
- Datadog StatsD metrics for dual_read verification
- Backfill script for existing MongoDB data
- Verification script for data consistency checks
- Unit tests for all repository operations and metrics
@smoreinis smoreinis force-pushed the stas/task-state-postgres branch from 7685cef to a9f298c Compare January 13, 2026 00:25
- Add benchmark scripts for comparing MongoDB vs PostgreSQL performance:
  - benchmark_task_state.py: Repository-level benchmarks
  - benchmark_api.py: API-level benchmarks with connection pooling
  - compare_results.py: Generate markdown comparison reports
  - locustfile.py: Cluster load tests with Locust

- Add storage_backend query parameter to dynamically switch backends:
  - Enables benchmarking without server restarts
  - Valid values: mongodb, dual_write, dual_read, postgres

- Fix FastAPI dependency injection in TaskStateDualRepository:
  - Remove 'from __future__ import annotations' which broke DI resolution
  - Use List from typing to avoid 'list' method name shadowing
  - Update authorization_shortcuts to use DTaskStateDualRepository

Benchmark results show MongoDB ~20-30% faster than PostgreSQL at API level.
Port task messages to PostgreSQL using the same phased dual-write/dual-read
strategy used for task states. Controlled by TASK_MESSAGE_STORAGE_PHASE env var
with phases: mongodb → dual_write → dual_read → postgres.

New files:
- TaskMessageORM model with JSONB content column and indexes
- Alembic migration for task_messages table
- TaskMessagePostgresRepository with JSONB filter translation and cursor pagination
- TaskMessageDualRepository with 4-phase switching and Datadog metrics
- Backfill and verification scripts for data migration
- Unit tests (63 tests: 47 dual repo + 16 postgres repo)

Modified:
- Wire TaskMessageService to use DualRepository
- Pass raw TaskMessageEntityFilter objects through to storage layer
- Add TASK_MESSAGE_STORAGE_PHASE to environment variables
JSONB supports indexing and faster querying compared to JSON,
consistent with all other JSON columns in the schema.
@smoreinis smoreinis changed the title feat: Add task state migration from MongoDB to PostgreSQL feat: Migrate task states and task messages from MongoDB to PostgreSQL Feb 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant