ThemisDB's Distributed Transaction Coordinator implements a Two-Phase Commit (2PC) protocol enhanced with TrueTime for providing ACID guarantees across multiple shards. This enables atomic, consistent, isolated, and durable transactions that span multiple database shards.
- DistributedTransactionCoordinator: Orchestrates distributed transactions
- TrueTime: Provides globally consistent timestamps
- ShardRPCClient: Handles communication with participant shards
- TransactionParticipant: Represents a shard participating in a transaction
- ✅ Two-Phase Commit Protocol: Guarantees atomicity across shards
- ✅ TrueTime Integration: External consistency for distributed transactions
- ✅ Snapshot Isolation: Read-only transactions without locks
- ✅ Parallel Execution: Concurrent prepare/commit operations
- ✅ Fault Tolerance: Handles participant failures gracefully
- ✅ Configurable Timeouts: Customizable RPC and phase timeouts
- Coordinator sends PREPARE request to all participants
- Each participant:
- Validates operations can be performed
- Locks required resources
- Writes undo/redo logs
- Votes: COMMIT (prepared) or ABORT (cannot prepare)
- Coordinator waits for all votes (with timeout)
- If all vote COMMIT → proceed to Phase 2
- If any vote ABORT or timeout → abort transaction
- Coordinator assigns commit timestamp using TrueTime
- Wait until commit timestamp is in the past (TrueTime guarantee)
- Send COMMIT request to all participants with timestamp
- Participants apply changes and release locks
- Transaction completes successfully
- Send ABORT request to all participants
- Participants rollback changes and release locks
- Transaction fails
TrueTime provides external consistency guarantees:
// Get commit timestamp (latest bound ensures visibility)
txn.commit_time = truetime_->now().latest;
// Wait until commit timestamp is definitely in the past
// This ensures all subsequent reads see this transaction
truetime_->waitUntil(txn.commit_time);- External Consistency: Transactions are globally ordered
- Snapshot Isolation: Read-only transactions use snapshot timestamps
- Wait-Free Reads: No locks needed for read-only transactions
#include "sharding/distributed_transaction.h"
#include "sharding/truetime.h"
// Initialize
auto truetime = std::make_shared<TrueTime>();
DistributedTransactionCoordinator::Config config;
config.prepare_timeout_ms = 10000;
config.commit_timeout_ms = 10000;
auto coordinator = std::make_shared<DistributedTransactionCoordinator>(
truetime, config
);
// Begin distributed transaction
std::vector<std::string> shard_ids = {"shard1", "shard2", "shard3"};
std::string txn_id = coordinator->beginTransaction(shard_ids);
// Add operations to transaction
nlohmann::json op1 = {{"type", "insert"}, {"key", "user:123"}, {"value", {...}}};
coordinator->addOperation(txn_id, "shard1", op1);
nlohmann::json op2 = {{"type", "update"}, {"key", "account:456"}, {"value", {...}}};
coordinator->addOperation(txn_id, "shard2", op2);
// Commit with 2PC
bool success = coordinator->commit(txn_id);
if (success) {
// Transaction committed atomically across all shards
} else {
// Transaction aborted - no changes applied
}Read-only transactions are optimized with TrueTime:
// No 2PC needed for reads!
std::vector<std::string> shard_ids = {"shard1", "shard2"};
nlohmann::json operations = {
{"queries", {
{{"shard", "shard1"}, {"key", "user:123"}},
{{"shard", "shard2"}, {"key", "account:456"}}
}}
};
// Execute snapshot read at consistent timestamp
auto results = coordinator->executeReadOnly(shard_ids, operations);// Explicitly abort a transaction
bool aborted = coordinator->abort(txn_id);auto state = coordinator->getTransactionState(txn_id);
if (state) {
switch (*state) {
case TransactionState::ACTIVE:
// Transaction is active
break;
case TransactionState::PREPARING:
// In prepare phase
break;
case TransactionState::COMMITTED:
// Successfully committed
break;
case TransactionState::ABORTED:
// Aborted or failed
break;
// ... other states
}
}auto stats = coordinator->getStatistics();
// Returns:
// {
// "total_transactions": 1000,
// "committed_transactions": 950,
// "aborted_transactions": 50,
// "readonly_transactions": 5000,
// "active_transactions": 10
// }DistributedTransactionCoordinator::Config config;
// Timeout for prepare phase (default: 10 seconds)
config.prepare_timeout_ms = 10000;
// Timeout for commit phase (default: 10 seconds)
config.commit_timeout_ms = 10000;
// Maximum concurrent transactions (default: 1000)
config.max_concurrent_txns = 1000;
// Enable read-only optimization (default: true)
config.enable_read_only_opt = true;
// RPC timeout per shard call (default: 5 seconds)
config.rpc_timeout_ms = 5000;
// Maximum RPC retry attempts (default: 3)
config.max_retries = 3;
// Maximum commit phase retries (default: 5, higher for durability)
config.max_commit_retries = 5;
// Base delay for exponential backoff (default: 100ms)
config.retry_backoff_base_ms = 100;
// Maximum backoff delay (default: 5 seconds)
config.max_backoff_ms = 5000;
// Enable transaction recovery logging (default: true)
config.enable_recovery_log = true;| State | Description |
|---|---|
ACTIVE |
Transaction is active, accepting operations |
PREPARING |
Phase 1: Sending prepare requests |
PREPARED |
All participants prepared successfully |
COMMITTING |
Phase 2: Sending commit requests |
COMMITTED |
Transaction committed successfully |
ABORTING |
Sending abort requests |
ABORTED |
Transaction aborted |
If any participant votes ABORT or times out:
- Coordinator sends ABORT to all participants
- Transaction state → ABORTED
- No changes are applied
- Detailed error context collected from all failed participants
Enhanced commit phase with automatic retry logic:
-
Automatic Retry: If commit fails on any participant:
- Coordinator automatically retries with exponential backoff
- Default: up to 5 retry attempts
- Backoff: 100ms → 200ms → 400ms → 800ms → 1600ms (capped at 5s)
-
Error Context: Detailed error information collected:
- Which participants failed
- Error messages from each participant
- Retry attempt counts
-
Recovery Logging: For committed transactions:
- Transaction state persisted to recovery log
- Enables recovery on coordinator restart
-
Persistent Failures: On retry exhaustion:
- Transaction marked as ABORTED
- Detailed error logged for manual investigation
- Recovery mechanism can replay commit based on coordinator log
Example Configuration:
DistributedTransactionCoordinator::Config config;
config.max_commit_retries = 5; // Retry commit up to 5 times
config.retry_backoff_base_ms = 100; // Start with 100ms delay
config.max_backoff_ms = 5000; // Cap delay at 5 seconds
config.enable_recovery_log = true; // Enable recovery logging- Participants with prepared state will wait for coordinator decision
- Coordinator timeout will trigger abort if participants unreachable
- Recovery protocol resolves in-doubt transactions on reconnection
- Recovery log enables coordinator to resume interrupted transactions
- Latency: ~2x single-shard transaction (2 network round trips)
- Throughput: Limited by coordinator capacity and network bandwidth
- Scalability: Horizontal scaling of coordinator instances possible
- Latency: ~1x single-shard read (parallel reads)
- Throughput: Very high (no locking, no 2PC overhead)
- Scalability: Linear with number of shards
// ❌ BAD: Large transaction across many shards
auto txn_id = coordinator->beginTransaction({"shard1", "shard2", ..., "shard50"});
// ✅ GOOD: Focused transaction with minimal shards
auto txn_id = coordinator->beginTransaction({"shard1", "shard2"});// ❌ BAD: Using full 2PC for reads
auto txn_id = coordinator->beginTransaction(shard_ids);
// ... read operations ...
coordinator->commit(txn_id);
// ✅ GOOD: Use read-only optimization
auto results = coordinator->executeReadOnly(shard_ids, operations);try {
auto txn_id = coordinator->beginTransaction(shard_ids);
// ... operations ...
if (!coordinator->commit(txn_id)) {
// Handle commit failure
LOG_ERROR("Transaction commit failed");
// Possibly retry or compensate
}
} catch (const std::exception& e) {
LOG_ERROR("Transaction error: {}", e.what());
// Handle exception
}// For operations with predictable latency
config.prepare_timeout_ms = 5000; // 5 seconds
config.commit_timeout_ms = 5000;
// For operations with variable latency
config.prepare_timeout_ms = 30000; // 30 seconds
config.commit_timeout_ms = 30000;Located in tests/test_distributed_transactions.cpp:
# Run distributed transaction tests
cd build
ctest -R test_distributed_transactions -V# Run full integration test suite
cd build
ctest -R distributed -VLocated in benchmarks/bench_distributed_coordinator.cpp:
# Run benchmarks
cd build
./benchmarks/bench_distributed_coordinatorThe coordinator exposes metrics via getStatistics():
total_transactions: Total transactions startedcommitted_transactions: Successfully committedaborted_transactions: Aborted or failedreadonly_transactions: Read-only transactions executedactive_transactions: Currently active transactions
Transaction lifecycle events are logged:
[DEBUG] PREPARE shard1: vote=COMMIT
[DEBUG] PREPARE shard2: vote=COMMIT
[DEBUG] COMMIT shard1: success=true
[DEBUG] COMMIT shard2: success=true
- Coordinator Single Point of Failure: No coordinator replication (planned for future)
- Blocking Protocol: 2PC is blocking if coordinator fails (3PC planned)
✅ Automatic Commit Phase Retry: Exponential backoff with configurable retry limits
✅ Enhanced Error Context: Detailed error messages from all participants
✅ Recovery Logging: Transaction states (PREPARED and COMMITTED) persisted to WAL
✅ Recovery Audit Trail: Coordinator can verify successfully committed transactions on restart
✅ Improved Observability: Better logging and error tracking throughout 2PC phases
✅ In-Doubt Transaction Recovery: Coordinator scans PREPARE_TX WAL entries on restart and
safely aborts any transactions that reached PREPARED state but were never resolved
✅ Dedicated WALEntryType::PREPARE_TX: Semantically correct WAL entry type for 2PC PREPARE phase
✅ TwoPhaseCommitParticipant: Shard-side participant handler with idempotent message handling,
WAL-backed durability, prepare-timeout auto-abort, and crash recovery (recoverFromWAL)
✅ HTTP API: REST endpoints for distributed transactions (/dtxn/*, see below)
| Method | Endpoint | Description |
|---|---|---|
| POST | /dtxn/begin |
Begin a distributed transaction across shards |
| POST | /dtxn/operation |
Append an operation to an active transaction |
| POST | /dtxn/commit |
Commit the transaction (runs 2PC) |
| POST | /dtxn/abort |
Abort the transaction |
| POST | /dtxn/readonly |
Execute a read-only (snapshot) query |
| GET | /dtxn/status/{id} |
Query transaction state |
| GET | /dtxn/stats |
Coordinator statistics |
Example — multi-shard transfer:
# 1. Begin
curl -X POST http://localhost:8080/dtxn/begin \
-H 'Content-Type: application/json' \
-d '{"shards": ["shard1", "shard2"]}'
# → {"transaction_id": "txn-abc123", "status": "active", "shards": ["shard1","shard2"]}
# 2. Add operations
curl -X POST http://localhost:8080/dtxn/operation \
-H 'Content-Type: application/json' \
-d '{"transaction_id":"txn-abc123","shard_id":"shard1","operation":{"type":"update","key":"balance","delta":-100}}'
curl -X POST http://localhost:8080/dtxn/operation \
-H 'Content-Type: application/json' \
-d '{"transaction_id":"txn-abc123","shard_id":"shard2","operation":{"type":"update","key":"balance","delta":100}}'
# 3. Commit (runs 2PC internally)
curl -X POST http://localhost:8080/dtxn/commit \
-H 'Content-Type: application/json' \
-d '{"transaction_id":"txn-abc123"}'
# → {"transaction_id":"txn-abc123","status":"committed"}- Three-Phase Commit (3PC) for non-blocking guarantee
- Coordinator replication and failover
- Participant health monitoring with heartbeat mechanism
- Optimistic concurrency control
- Distributed deadlock detection
- Saga pattern support for long-running transactions
- Circuit breaker pattern for participant health monitoring
- Gray, J., & Reuter, A. (1992). Transaction Processing: Concepts and Techniques
- Corbett, J. C., et al. (2013). Spanner: Google's Globally Distributed Database
- Bernstein, P. A., & Newcomer, E. (2009). Principles of Transaction Processing
- TransactionManager - Local transaction support
- TrueTime - Distributed timestamp service
- ShardRPCClient - RPC communication
- Sharding Documentation