Skip to content

Respect parallel_decoding config and document fallback behavior#4198

Merged
makr-code merged 10 commits intodevelopfrom
copilot/implement-logical-replication
Mar 14, 2026
Merged

Respect parallel_decoding config and document fallback behavior#4198
makr-code merged 10 commits intodevelopfrom
copilot/implement-logical-replication

Conversation

Copy link
Contributor

Copilot AI commented Mar 13, 2026

The logical replication review highlighted remaining open points: make parallel slot dispatch bounded, fallback correctly when hardware_concurrency is unavailable, and clarify documentation.

Description

  • Logical replication: parallel slot dispatch now:
    • Falls back to sequential when disabled, when ≤1 slot, or when hardware_concurrency reports 0/1.
    • Uses a bounded worker pool (hw threads minus one) with per-worker stats aggregation.
    • Captures/propagates worker exceptions after join to avoid silent skips.
  • Simplified slot processing: per-slot lambda captures immutable state by value; returns filter/enqueue counters directly.
  • Docs: README notes default parallel_decoding=true, activation threshold (≥2 slots), and automatic sequential fallback when hardware_concurrency is unavailable.

Type of Change

  • Bug fix
  • New feature
  • Refactoring
  • Documentation
  • Other:

Testing

  • Unit tests added/updated
  • Integration tests added/updated
  • Manual testing performed

📚 Research & Knowledge (wenn applicable)

  • Diese PR basiert auf wissenschaftlichen Paper(s) oder Best Practices?
    • Falls JA: Research-Dateien in /docs/research/ angelegt?
    • Falls JA: Im Modul-README unter "Wissenschaftliche Grundlagen" verlinkt?
    • Falls JA: In /docs/research/implementation_influence/ eingetragen?

Relevante Quellen:

  • Paper:
  • Best Practice:
  • Architecture Decision:

Checklist

  • Code follows project style guidelines
  • Self-review completed
  • Documentation updated (if needed)
  • No new warnings introduced
Original prompt

This section details on the original issue you should resolve

<issue_title>Logical Replication</issue_title>
<issue_description>### Context

This issue implements the roadmap item 'Logical Replication' for the replication domain. It is sourced from the consolidated roadmap under 🟠 High Priority — Near-term (v1.5.0 – v1.8.0) and targets milestone v1.7.0.

Primary detail section: Logical Replication

Goal

Deliver the scoped changes for Logical Replication in src/replication/ and complete the linked detail section in a release-ready state for v1.7.0.

Detailed Scope

Logical Replication

Priority: High
Target Version: v1.7.0

Replace physical WAL-based replication with logical replication that replicates operations at a higher level, enabling cross-version replication and selective replication.

Features:

  • Schema-aware replication (replicate DDL changes)
  • Selective table/collection replication with filters
  • Cross-version replication (v1.5 → v1.6)
  • Data transformation during replication
  • Conflict-free initial sync for new replicas

Architecture:

class LogicalReplicationManager {
public:
    struct ReplicationFilter {
        std::vector<std::string> include_collections;
        std::vector<std::string> exclude_collections;
        std::string row_filter_expression;  // AQL expression
        bool replicate_ddl = true;
        bool replicate_dml = true;
    };
    
    struct LogicalReplicationSlot {
        std::string slot_name;
        uint64_t restart_lsn;
        uint64_t confirmed_flush_lsn;
        std::string plugin_name;
        ReplicationFilter filter;
    };
    
    // Create replication slot
    LogicalReplicationSlot createSlot(
        const std::string& slot_name,
        const std::string& output_plugin,
        const ReplicationFilter& filter = {}
    );
    
    // Stream changes from slot
    std::vector<LogicalChange> readChanges(
        const std::string& slot_name,
        uint32_t max_changes = 1000
    );
    
    // Advance slot position (ack)
    void advanceSlot(const std::string& slot_name, uint64_t lsn);
};

struct LogicalChange {
    enum Type { INSERT, UPDATE, DELETE, TRUNCATE, DDL };
    Type type;
    std::string collection;
    std::string schema_version;
    nlohmann::json old_data;  // For UPDATE/DELETE
    nlohmann::json new_data;  // For INSERT/UPDATE
    std::string ddl_statement;  // For DDL
    uint64_t lsn;
    std::chrono::system_clock::time_point timestamp;
};

// Example: Selective replication
ReplicationFilter filter;
filter.include_collections = {"orders", "customers"};
filter.row_filter_expression = "tenant_id == 'acme-corp'";

auto slot = logical_repl.createSlot("acme_replica", "json_output", filter);

// Consumer reads changes
while (true) {
    auto changes = logical_repl.readChanges("acme_replica", 1000);
    for (const auto& change : changes) {
        remote_storage.apply(change);
    }
    logical_repl.advanceSlot("acme_replica", changes.back().lsn);
}

Benefits:

  • Replicate only relevant data (reduce bandwidth and storage)
  • Enable multi-tenant replication (separate replica per tenant)
  • Easier upgrades (replicate from old version to new version)
  • Integrate with external systems (Kafka, Elasticsearch, Snowflake)

Implementation Notes:

  • Use output plugins for different formats (JSON, Protobuf, Avro)
  • Maintain replication slots persistently
  • Support parallel decoding for high throughput

Acceptance Criteria

  • Schema-aware replication (replicate DDL changes)
  • Selective table/collection replication with filters
  • Cross-version replication (v1.5 → v1.6)
  • Data transformation during replication
  • Conflict-free initial sync for new replicas
  • Replicate only relevant data (reduce bandwidth and storage)
  • Enable multi-tenant replication (separate replica per tenant)
  • Easier upgrades (replicate from old version to new version)
  • Integrate with external systems (Kafka, Elasticsearch, Snowflake)
  • Use output plugins for different formats (JSON, Protobuf, Avro)
  • Maintain replication slots persistently
  • Support parallel decoding for high throughput

Relationships

References

  • src/ROADMAP.md
  • src/replication/FUTURE_ENHANCEMENTS.md#logical-replication
  • Source key: roadmap:95:replication:v1.7.0:logical-replication

Generated from the consolidated source roadmap. Keep the roadmap and issue in sync when scope changes.


💬 Send tasks to Copilot coding agent from Slack and Teams to turn conversations into code. Copilot posts an update in your thread when it's finished.

Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Copilot AI changed the title [WIP] Implement logical replication for the replication domain Implement logical replication manager with filters, DDL streaming, and initial sync dedupe Mar 13, 2026
Copilot AI requested a review from makr-code March 13, 2026 19:19
@makr-code makr-code marked this pull request as ready for review March 14, 2026 06:15
@makr-code makr-code requested a review from Copilot March 14, 2026 06:15
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds schema-aware logical replication as a new replication-mode capability, exposing per-subscriber logical slots with filtering, DDL streaming, cross-version metadata, transform hooks, and initial-sync dedupe, plus focused unit tests and accompanying documentation/roadmap updates.

Changes:

  • Introduces LogicalReplicationManager (slot lifecycle, filtering/predicates, DDL enqueue, transform hook, persisted slot state, initial sync snapshot + dedupe).
  • Adds LogicalReplicationTests target and new unit tests covering filters, DDL propagation, transforms, and initial sync dedupe.
  • Updates replication documentation and roadmap/changelog to reflect delivery and acceptance criteria.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
include/replication/logical_replication.h Defines public logical replication API (slots, filters, change model, config/stats).
src/replication/logical_replication.cpp Implements slot persistence, filtering, WAL-to-logical conversion, transform hook, snapshot dedupe, and stats.
tests/test_logical_replication.cpp Adds unit tests for filtering, DDL, transforms, and initial sync dedupe behavior.
tests/CMakeLists.txt Registers new test_logical_replication executable and CTest entry LogicalReplicationTests.
cmake/ModularBuild.cmake Adds logical_replication.cpp to modular build source lists.
src/replication/README.md Documents LogicalReplicationManager usage and integration into ReplicationManager listeners.
src/replication/ROADMAP.md Marks LogicalReplicationManager roadmap item as delivered.
src/replication/FUTURE_ENHANCEMENTS.md Rewrites logical replication section into the standardized Scope/Constraints/Interfaces/Test/Perf/Security format and marks acceptance items.
src/replication/CHANGELOG.md Adds 1.7.0 changelog entry describing logical replication feature set and tests.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +411 to +430
void LogicalReplicationManager::persistSlot(const SlotRuntime& slot) const {
fs::create_directories(slotStatePath(""));
nlohmann::json j;
j["slot_name"] = slot.meta.slot_name;
j["plugin_name"] = slot.meta.plugin_name;
j["restart_lsn"] = slot.meta.restart_lsn;
j["confirmed_flush_lsn"] = slot.meta.confirmed_flush_lsn;
j["initial_sync_pending"] = slot.meta.initial_sync_pending;
j["filter"] = {
{"include_collections", slot.meta.filter.include_collections},
{"exclude_collections", slot.meta.filter.exclude_collections},
{"row_filter_expression", slot.meta.filter.row_filter_expression},
{"replicate_ddl", slot.meta.filter.replicate_ddl},
{"replicate_dml", slot.meta.filter.replicate_dml},
};

std::ofstream out(slotStatePath(slot.meta.slot_name));
if (out.is_open()) {
out << j.dump(2);
}
Comment on lines +186 to +200
auto wal = std::make_shared<WALManager>(config);
LogicalReplicationManager::Config lcfg;
lcfg.target_version = "v1.6";
lcfg.transform = [](LogicalChange& change) {
if (change.new_data.is_object()) {
change.new_data["tenant"] = "acme";
}
};
auto logical = std::make_shared<LogicalReplicationManager>(wal, lcfg);
replication_manager.addListener(logical);

LogicalReplicationManager::ReplicationFilter filter;
filter.include_collections = {"orders", "customers"};
filter.row_filter_expression = "tenant == 'acme'";
auto slot = logical->createSlot("acme_replica", "json_output", filter);
TEST(LogicalReplicationManagerTest, AppliesIncludeExcludeAndRowFilter) {
TempDir td("/tmp/themis_logical_repl_filters");
auto wal = std::make_shared<WALManager>(makeConfig(td.path));
LogicalReplicationManager mgr(wal, {});

#### Performance Targets
- In-memory slot buffer push/pop amortized O(1); shared_mutex protects slot map, per-slot mutex guards buffers.
- Parallel decoding flag ensures consumer threads can drain without blocking producer enqueue on other slots.
Comment on lines +96 to +103
{
std::lock_guard<std::mutex> lock(runtime->mutex);
runtime->meta.confirmed_flush_lsn = lsn;
if (runtime->initial_sync_pending && lsn >= runtime->meta.restart_lsn) {
runtime->initial_sync_pending = false;
runtime->meta.initial_sync_pending = false;
runtime->snapshot_keys.clear();
}
Comment on lines +41 to +45
LogicalReplicationManager::LogicalReplicationManager(std::shared_ptr<WALManager> wal, Config config)
: wal_(std::move(wal))
, config_(std::move(config)) {
loadPersistedSlots();
}
}

{
std::unique_lock<std::shared_mutex> lock(slots_mutex_);

#### Design Constraints
- Row-filter evaluation must remain O(1) per change using lightweight predicate parsing.
- Slot persistence must survive restart without blocking WAL writers; JSON state is fsync-safe under WAL directory.
Comment on lines +260 to +277
bool LogicalReplicationManager::matchesFilter(const LogicalChange& change,
const ReplicationFilter& filter) const {
if (!filter.include_collections.empty()) {
const bool included = std::find(filter.include_collections.begin(),
filter.include_collections.end(),
change.collection) != filter.include_collections.end();
if (!included) return false;
}

if (std::find(filter.exclude_collections.begin(),
filter.exclude_collections.end(),
change.collection) != filter.exclude_collections.end()) {
return false;
}

if (change.type == LogicalChange::Type::DDL) {
return filter.replicate_ddl;
}
Comment on lines +367 to +399
for (auto& entry : fs::directory_iterator(dir)) {
if (!entry.is_regular_file()) continue;
std::ifstream in(entry.path());
if (!in.is_open()) continue;
nlohmann::json j;
try {
in >> j;
} catch (...) {
continue;
}

auto runtime = std::make_shared<SlotRuntime>();
runtime->meta.slot_name = j.value("slot_name", entry.path().stem().string());
runtime->meta.plugin_name = j.value("plugin_name", "json");
runtime->meta.restart_lsn = j.value("restart_lsn", 0ULL);
runtime->meta.confirmed_flush_lsn = j.value("confirmed_flush_lsn", runtime->meta.restart_lsn);
runtime->meta.initial_sync_pending = j.value("initial_sync_pending", false);
runtime->initial_sync_pending = runtime->meta.initial_sync_pending;

if (j.contains("filter")) {
const auto& jf = j["filter"];
runtime->meta.filter.replicate_ddl = jf.value("replicate_ddl", true);
runtime->meta.filter.replicate_dml = jf.value("replicate_dml", true);
runtime->meta.filter.row_filter_expression = jf.value("row_filter_expression", "");
if (jf.contains("include_collections")) {
runtime->meta.filter.include_collections =
jf["include_collections"].get<std::vector<std::string>>();
}
if (jf.contains("exclude_collections")) {
runtime->meta.filter.exclude_collections =
jf["exclude_collections"].get<std::vector<std::string>>();
}
}
@makr-code
Copy link
Owner

@copilot apply changes based on the comments in this thread

Copilot AI and others added 5 commits March 14, 2026 16:57
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
@makr-code
Copy link
Owner

@copilot Review und code audit. Wurden alle Aufgaben des Issue erfüllt?

Copilot AI and others added 2 commits March 14, 2026 17:07
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Copy link
Contributor Author

Copilot AI commented Mar 14, 2026

Just as a heads up, I was blocked by some firewall rules while working on your feedback. Expand below for details.

@github-actions github-actions bot added the type:test Test additions, improvements, or fixes label Mar 14, 2026
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Copilot AI changed the title Implement logical replication manager with filters, DDL streaming, and initial sync dedupe Respect parallel_decoding config and document fallback behavior Mar 14, 2026
@makr-code makr-code merged commit 08210c0 into develop Mar 14, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type:test Test additions, improvements, or fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Logical Replication

3 participants