Respect parallel_decoding config and document fallback behavior#4198
Merged
Respect parallel_decoding config and document fallback behavior#4198
Conversation
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Copilot
AI
changed the title
[WIP] Implement logical replication for the replication domain
Implement logical replication manager with filters, DDL streaming, and initial sync dedupe
Mar 13, 2026
Copilot stopped work on behalf of
makr-code due to an error
March 13, 2026 19:47
Copilot stopped work on behalf of
makr-code due to an error
March 13, 2026 19:57
There was a problem hiding this comment.
Pull request overview
Adds schema-aware logical replication as a new replication-mode capability, exposing per-subscriber logical slots with filtering, DDL streaming, cross-version metadata, transform hooks, and initial-sync dedupe, plus focused unit tests and accompanying documentation/roadmap updates.
Changes:
- Introduces
LogicalReplicationManager(slot lifecycle, filtering/predicates, DDL enqueue, transform hook, persisted slot state, initial sync snapshot + dedupe). - Adds
LogicalReplicationTeststarget and new unit tests covering filters, DDL propagation, transforms, and initial sync dedupe. - Updates replication documentation and roadmap/changelog to reflect delivery and acceptance criteria.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
include/replication/logical_replication.h |
Defines public logical replication API (slots, filters, change model, config/stats). |
src/replication/logical_replication.cpp |
Implements slot persistence, filtering, WAL-to-logical conversion, transform hook, snapshot dedupe, and stats. |
tests/test_logical_replication.cpp |
Adds unit tests for filtering, DDL, transforms, and initial sync dedupe behavior. |
tests/CMakeLists.txt |
Registers new test_logical_replication executable and CTest entry LogicalReplicationTests. |
cmake/ModularBuild.cmake |
Adds logical_replication.cpp to modular build source lists. |
src/replication/README.md |
Documents LogicalReplicationManager usage and integration into ReplicationManager listeners. |
src/replication/ROADMAP.md |
Marks LogicalReplicationManager roadmap item as delivered. |
src/replication/FUTURE_ENHANCEMENTS.md |
Rewrites logical replication section into the standardized Scope/Constraints/Interfaces/Test/Perf/Security format and marks acceptance items. |
src/replication/CHANGELOG.md |
Adds 1.7.0 changelog entry describing logical replication feature set and tests. |
You can also share your feedback on Copilot code review. Take the survey.
Comment on lines
+411
to
+430
| void LogicalReplicationManager::persistSlot(const SlotRuntime& slot) const { | ||
| fs::create_directories(slotStatePath("")); | ||
| nlohmann::json j; | ||
| j["slot_name"] = slot.meta.slot_name; | ||
| j["plugin_name"] = slot.meta.plugin_name; | ||
| j["restart_lsn"] = slot.meta.restart_lsn; | ||
| j["confirmed_flush_lsn"] = slot.meta.confirmed_flush_lsn; | ||
| j["initial_sync_pending"] = slot.meta.initial_sync_pending; | ||
| j["filter"] = { | ||
| {"include_collections", slot.meta.filter.include_collections}, | ||
| {"exclude_collections", slot.meta.filter.exclude_collections}, | ||
| {"row_filter_expression", slot.meta.filter.row_filter_expression}, | ||
| {"replicate_ddl", slot.meta.filter.replicate_ddl}, | ||
| {"replicate_dml", slot.meta.filter.replicate_dml}, | ||
| }; | ||
|
|
||
| std::ofstream out(slotStatePath(slot.meta.slot_name)); | ||
| if (out.is_open()) { | ||
| out << j.dump(2); | ||
| } |
Comment on lines
+186
to
+200
| auto wal = std::make_shared<WALManager>(config); | ||
| LogicalReplicationManager::Config lcfg; | ||
| lcfg.target_version = "v1.6"; | ||
| lcfg.transform = [](LogicalChange& change) { | ||
| if (change.new_data.is_object()) { | ||
| change.new_data["tenant"] = "acme"; | ||
| } | ||
| }; | ||
| auto logical = std::make_shared<LogicalReplicationManager>(wal, lcfg); | ||
| replication_manager.addListener(logical); | ||
|
|
||
| LogicalReplicationManager::ReplicationFilter filter; | ||
| filter.include_collections = {"orders", "customers"}; | ||
| filter.row_filter_expression = "tenant == 'acme'"; | ||
| auto slot = logical->createSlot("acme_replica", "json_output", filter); |
tests/test_logical_replication.cpp
Outdated
| TEST(LogicalReplicationManagerTest, AppliesIncludeExcludeAndRowFilter) { | ||
| TempDir td("/tmp/themis_logical_repl_filters"); | ||
| auto wal = std::make_shared<WALManager>(makeConfig(td.path)); | ||
| LogicalReplicationManager mgr(wal, {}); |
|
|
||
| #### Performance Targets | ||
| - In-memory slot buffer push/pop amortized O(1); shared_mutex protects slot map, per-slot mutex guards buffers. | ||
| - Parallel decoding flag ensures consumer threads can drain without blocking producer enqueue on other slots. |
Comment on lines
+96
to
+103
| { | ||
| std::lock_guard<std::mutex> lock(runtime->mutex); | ||
| runtime->meta.confirmed_flush_lsn = lsn; | ||
| if (runtime->initial_sync_pending && lsn >= runtime->meta.restart_lsn) { | ||
| runtime->initial_sync_pending = false; | ||
| runtime->meta.initial_sync_pending = false; | ||
| runtime->snapshot_keys.clear(); | ||
| } |
Comment on lines
+41
to
+45
| LogicalReplicationManager::LogicalReplicationManager(std::shared_ptr<WALManager> wal, Config config) | ||
| : wal_(std::move(wal)) | ||
| , config_(std::move(config)) { | ||
| loadPersistedSlots(); | ||
| } |
| } | ||
|
|
||
| { | ||
| std::unique_lock<std::shared_mutex> lock(slots_mutex_); |
|
|
||
| #### Design Constraints | ||
| - Row-filter evaluation must remain O(1) per change using lightweight predicate parsing. | ||
| - Slot persistence must survive restart without blocking WAL writers; JSON state is fsync-safe under WAL directory. |
Comment on lines
+260
to
+277
| bool LogicalReplicationManager::matchesFilter(const LogicalChange& change, | ||
| const ReplicationFilter& filter) const { | ||
| if (!filter.include_collections.empty()) { | ||
| const bool included = std::find(filter.include_collections.begin(), | ||
| filter.include_collections.end(), | ||
| change.collection) != filter.include_collections.end(); | ||
| if (!included) return false; | ||
| } | ||
|
|
||
| if (std::find(filter.exclude_collections.begin(), | ||
| filter.exclude_collections.end(), | ||
| change.collection) != filter.exclude_collections.end()) { | ||
| return false; | ||
| } | ||
|
|
||
| if (change.type == LogicalChange::Type::DDL) { | ||
| return filter.replicate_ddl; | ||
| } |
Comment on lines
+367
to
+399
| for (auto& entry : fs::directory_iterator(dir)) { | ||
| if (!entry.is_regular_file()) continue; | ||
| std::ifstream in(entry.path()); | ||
| if (!in.is_open()) continue; | ||
| nlohmann::json j; | ||
| try { | ||
| in >> j; | ||
| } catch (...) { | ||
| continue; | ||
| } | ||
|
|
||
| auto runtime = std::make_shared<SlotRuntime>(); | ||
| runtime->meta.slot_name = j.value("slot_name", entry.path().stem().string()); | ||
| runtime->meta.plugin_name = j.value("plugin_name", "json"); | ||
| runtime->meta.restart_lsn = j.value("restart_lsn", 0ULL); | ||
| runtime->meta.confirmed_flush_lsn = j.value("confirmed_flush_lsn", runtime->meta.restart_lsn); | ||
| runtime->meta.initial_sync_pending = j.value("initial_sync_pending", false); | ||
| runtime->initial_sync_pending = runtime->meta.initial_sync_pending; | ||
|
|
||
| if (j.contains("filter")) { | ||
| const auto& jf = j["filter"]; | ||
| runtime->meta.filter.replicate_ddl = jf.value("replicate_ddl", true); | ||
| runtime->meta.filter.replicate_dml = jf.value("replicate_dml", true); | ||
| runtime->meta.filter.row_filter_expression = jf.value("row_filter_expression", ""); | ||
| if (jf.contains("include_collections")) { | ||
| runtime->meta.filter.include_collections = | ||
| jf["include_collections"].get<std::vector<std::string>>(); | ||
| } | ||
| if (jf.contains("exclude_collections")) { | ||
| runtime->meta.filter.exclude_collections = | ||
| jf["exclude_collections"].get<std::vector<std::string>>(); | ||
| } | ||
| } |
Owner
|
@copilot apply changes based on the comments in this thread |
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Owner
|
@copilot Review und code audit. Wurden alle Aufgaben des Issue erfüllt? |
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Contributor
Author
|
Just as a heads up, I was blocked by some firewall rules while working on your feedback. Expand below for details. |
Co-authored-by: makr-code <150588092+makr-code@users.noreply.github.com>
Copilot
AI
changed the title
Implement logical replication manager with filters, DDL streaming, and initial sync dedupe
Respect parallel_decoding config and document fallback behavior
Mar 14, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The logical replication review highlighted remaining open points: make parallel slot dispatch bounded, fallback correctly when hardware_concurrency is unavailable, and clarify documentation.
Description
Type of Change
Testing
📚 Research & Knowledge (wenn applicable)
/docs/research/angelegt?/docs/research/implementation_influence/eingetragen?Relevante Quellen:
Checklist
Original prompt
This section details on the original issue you should resolve
<issue_title>Logical Replication</issue_title>
<issue_description>### Context
This issue implements the roadmap item 'Logical Replication' for the replication domain. It is sourced from the consolidated roadmap under 🟠 High Priority — Near-term (v1.5.0 – v1.8.0) and targets milestone v1.7.0.
Primary detail section: Logical Replication
Goal
Deliver the scoped changes for Logical Replication in src/replication/ and complete the linked detail section in a release-ready state for v1.7.0.
Detailed Scope
Logical Replication
Priority: High
Target Version: v1.7.0
Replace physical WAL-based replication with logical replication that replicates operations at a higher level, enabling cross-version replication and selective replication.
Features:
Architecture:
Benefits:
Implementation Notes:
Acceptance Criteria
Relationships
References
Generated from the consolidated source roadmap. Keep the roadmap and issue in sync when scope changes.
💬 Send tasks to Copilot coding agent from Slack and Teams to turn conversations into code. Copilot posts an update in your thread when it's finished.