From bf945b3b745029d9c6cc0d793612622fbac3ba88 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Fri, 27 Mar 2026 14:02:33 -0700 Subject: [PATCH 01/12] refactor: add Apply method to MemoryStore for transactional change sets --- .../memory_store/memory_store.cpp | 49 ++ .../memory_store/memory_store.hpp | 4 + .../tests/memory_store_apply_test.cpp | 551 ++++++++++++++++++ 3 files changed, 604 insertions(+) create mode 100644 libs/server-sdk/tests/memory_store_apply_test.cpp diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp index 95cac8748..6e0365d7f 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp @@ -1,5 +1,9 @@ #include "memory_store.hpp" +#include + +#include + namespace launchdarkly::server_side::data_components { std::shared_ptr MemoryStore::GetFlag( @@ -82,4 +86,49 @@ bool MemoryStore::RemoveSegment(std::string const& key) { return segments_.erase(key) == 1; } +void MemoryStore::Apply(data_model::FDv2ChangeSet const& changeSet) { + std::lock_guard lock{data_mutex_}; + + if (changeSet.type == data_model::FDv2ChangeSet::Type::kNone) { + return; + } + + if (changeSet.type == data_model::FDv2ChangeSet::Type::kFull) { + initialized_ = true; + flags_.clear(); + segments_.clear(); + } + + for (auto change : changeSet.changes) { + if (std::holds_alternative(change.object)) { + auto& flag_descriptor = + std::get(change.object); + + auto existing_flag = flags_.find(change.key); + if (existing_flag != flags_.end() && + existing_flag->second->version >= flag_descriptor.version) { + continue; + } + + flags_[change.key] = std::make_shared( + std::move(flag_descriptor)); + } else if (std::holds_alternative( + change.object)) { + auto& segment_descriptor = + std::get(change.object); + + auto existing_segment = segments_.find(change.key); + if (existing_segment != segments_.end() && + existing_segment->second->version >= + segment_descriptor.version) { + continue; + } + + segments_[change.key] = + std::make_shared( + std::move(segment_descriptor)); + } + } +} + } // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp index 93dfca485..81846dfab 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp @@ -3,6 +3,8 @@ #include "../../data_interfaces/destination/idestination.hpp" #include "../../data_interfaces/store/istore.hpp" +#include + #include #include #include @@ -44,6 +46,8 @@ class MemoryStore final : public data_interfaces::IStore, bool RemoveSegment(std::string const& key); + void Apply(data_model::FDv2ChangeSet const& changeSet); + MemoryStore() = default; ~MemoryStore() override = default; diff --git a/libs/server-sdk/tests/memory_store_apply_test.cpp b/libs/server-sdk/tests/memory_store_apply_test.cpp new file mode 100644 index 000000000..219e36a15 --- /dev/null +++ b/libs/server-sdk/tests/memory_store_apply_test.cpp @@ -0,0 +1,551 @@ +#include + +#include +#include + +using namespace launchdarkly::data_model; +using namespace launchdarkly::server_side::data_components; + +// --------------------------------------------------------------------------- +// kNone tests +// --------------------------------------------------------------------------- + +TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { + MemoryStore store; + Flag flag_a; + flag_a.version = 1; + flag_a.key = "flagA"; + + Segment seg_a; + seg_a.version = 1; + seg_a.key = "segA"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, + }); + + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + + auto fetched_flag = store.GetFlag("flagA"); + ASSERT_TRUE(fetched_flag); + EXPECT_EQ(1u, fetched_flag->version); + auto fetched_seg = store.GetSegment("segA"); + ASSERT_TRUE(fetched_seg); + EXPECT_EQ(1u, fetched_seg->version); +} + +TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { + MemoryStore store; + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + EXPECT_FALSE(store.Initialized()); +} + +// --------------------------------------------------------------------------- +// kFull tests +// --------------------------------------------------------------------------- + +TEST(MemoryStoreApplyTest, ApplyFull_SetsInitialized) { + MemoryStore store; + ASSERT_FALSE(store.Initialized()); + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); + EXPECT_TRUE(store.Initialized()); +} + +TEST(MemoryStoreApplyTest, ApplyFull_WithFlag) { + MemoryStore store; + Flag flag_a; + flag_a.version = 1; + flag_a.key = "flagA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kFull, + std::vector{{"flagA", FlagDescriptor(flag_a)}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_TRUE(fetched->item.has_value()); + EXPECT_EQ("flagA", fetched->item->key); + EXPECT_EQ(1u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyFull_WithSegment) { + MemoryStore store; + Segment seg_a; + seg_a.version = 1; + seg_a.key = "segA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kFull, + std::vector{{"segA", SegmentDescriptor(seg_a)}}, + Selector{}, + }); + + auto fetched = store.GetSegment("segA"); + ASSERT_TRUE(fetched); + EXPECT_TRUE(fetched->item.has_value()); + EXPECT_EQ("segA", fetched->item->key); + EXPECT_EQ(1u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingFlags) { + MemoryStore store; + Flag flag_a; + flag_a.version = 1; + flag_a.key = "flagA"; + + Flag flag_b; + flag_b.version = 1; + flag_b.key = "flagB"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}, + {"flagB", FlagDescriptor(flag_b)}}, + std::unordered_map(), + }); + + Flag flag_c; + flag_c.version = 1; + flag_c.key = "flagC"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kFull, + std::vector{{"flagC", FlagDescriptor(flag_c)}}, + Selector{}, + }); + + EXPECT_FALSE(store.GetFlag("flagA")); + EXPECT_FALSE(store.GetFlag("flagB")); + ASSERT_TRUE(store.GetFlag("flagC")); + EXPECT_EQ("flagC", store.GetFlag("flagC")->item->key); +} + +TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingSegments) { + MemoryStore store; + Segment seg_a; + seg_a.version = 1; + seg_a.key = "segA"; + + store.Init(SDKDataSet{ + std::unordered_map(), + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, + }); + + Segment seg_b; + seg_b.version = 1; + seg_b.key = "segB"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kFull, + std::vector{{"segB", SegmentDescriptor(seg_b)}}, + Selector{}, + }); + + EXPECT_FALSE(store.GetSegment("segA")); + ASSERT_TRUE(store.GetSegment("segB")); +} + +TEST(MemoryStoreApplyTest, ApplyFull_EmptyChangeSetClearsStore) { + MemoryStore store; + Flag flag_a; + flag_a.version = 1; + flag_a.key = "flagA"; + + Segment seg_a; + seg_a.version = 1; + seg_a.key = "segA"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, + }); + + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); + + EXPECT_EQ(0u, store.AllFlags().size()); + EXPECT_EQ(0u, store.AllSegments().size()); +} + +TEST(MemoryStoreApplyTest, ApplyFull_WithFlagTombstone) { + MemoryStore store; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kFull, + std::vector{{"flagA", FlagDescriptor(Tombstone(5))}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(5u, fetched->version); + EXPECT_FALSE(fetched->item.has_value()); +} + +TEST(MemoryStoreApplyTest, ApplyFull_WithSegmentTombstone) { + MemoryStore store; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kFull, + std::vector{{"segA", SegmentDescriptor(Tombstone(3))}}, + Selector{}, + }); + + auto fetched = store.GetSegment("segA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(3u, fetched->version); + EXPECT_FALSE(fetched->item.has_value()); +} + +// --------------------------------------------------------------------------- +// kPartial tests +// --------------------------------------------------------------------------- + +TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewFlag) { + MemoryStore store; + store.Init(SDKDataSet{ + std::unordered_map(), + std::unordered_map(), + }); + + Flag flag_a; + flag_a.version = 1; + flag_a.key = "flagA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagA", FlagDescriptor(flag_a)}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_TRUE(fetched->item.has_value()); + EXPECT_EQ("flagA", fetched->item->key); + EXPECT_EQ(1u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewSegment) { + MemoryStore store; + store.Init(SDKDataSet{ + std::unordered_map(), + std::unordered_map(), + }); + + Segment seg_a; + seg_a.version = 1; + seg_a.key = "segA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"segA", SegmentDescriptor(seg_a)}}, + Selector{}, + }); + + auto fetched = store.GetSegment("segA"); + ASSERT_TRUE(fetched); + EXPECT_TRUE(fetched->item.has_value()); + EXPECT_EQ("segA", fetched->item->key); + EXPECT_EQ(1u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithLowerVersion) { + MemoryStore store; + Flag flag_a; + flag_a.version = 5; + flag_a.key = "flagA"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + Flag flag_a_stale; + flag_a_stale.version = 3; + flag_a_stale.key = "flagA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagA", FlagDescriptor(flag_a_stale)}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(5u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithEqualVersion) { + MemoryStore store; + Flag flag_a; + flag_a.version = 5; + flag_a.key = "flagA"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + Flag flag_a_same; + flag_a_same.version = 5; + flag_a_same.key = "flagA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagA", FlagDescriptor(flag_a_same)}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(5u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFlagWithHigherVersion) { + MemoryStore store; + Flag flag_a; + flag_a.version = 5; + flag_a.key = "flagA"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + Flag flag_a_new; + flag_a_new.version = 6; + flag_a_new.key = "flagA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagA", FlagDescriptor(flag_a_new)}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(6u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_SkipsSegmentWithLowerVersion) { + MemoryStore store; + Segment seg_a; + seg_a.version = 5; + seg_a.key = "segA"; + + store.Init(SDKDataSet{ + std::unordered_map(), + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, + }); + + Segment seg_a_stale; + seg_a_stale.version = 3; + seg_a_stale.key = "segA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"segA", SegmentDescriptor(seg_a_stale)}}, + Selector{}, + }); + + auto fetched = store.GetSegment("segA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(5u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_AppliesSegmentWithHigherVersion) { + MemoryStore store; + Segment seg_a; + seg_a.version = 5; + seg_a.key = "segA"; + + store.Init(SDKDataSet{ + std::unordered_map(), + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, + }); + + Segment seg_a_new; + seg_a_new.version = 6; + seg_a_new.key = "segA"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"segA", SegmentDescriptor(seg_a_new)}}, + Selector{}, + }); + + auto fetched = store.GetSegment("segA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(6u, fetched->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedFlags) { + MemoryStore store; + Flag flag_a; + flag_a.version = 1; + flag_a.key = "flagA"; + + Flag flag_b; + flag_b.version = 1; + flag_b.key = "flagB"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}, + {"flagB", FlagDescriptor(flag_b)}}, + std::unordered_map(), + }); + + Flag flag_b_new; + flag_b_new.version = 2; + flag_b_new.key = "flagB"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagB", FlagDescriptor(flag_b_new)}}, + Selector{}, + }); + + auto fetched_a = store.GetFlag("flagA"); + ASSERT_TRUE(fetched_a); + EXPECT_EQ(1u, fetched_a->version); + + auto fetched_b = store.GetFlag("flagB"); + ASSERT_TRUE(fetched_b); + EXPECT_EQ(2u, fetched_b->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedSegments) { + MemoryStore store; + Segment seg_a; + seg_a.version = 1; + seg_a.key = "segA"; + + Segment seg_b; + seg_b.version = 1; + seg_b.key = "segB"; + + store.Init(SDKDataSet{ + std::unordered_map(), + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}, + {"segB", SegmentDescriptor(seg_b)}}, + }); + + Segment seg_b_new; + seg_b_new.version = 2; + seg_b_new.key = "segB"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"segB", SegmentDescriptor(seg_b_new)}}, + Selector{}, + }); + + auto fetched_a = store.GetSegment("segA"); + ASSERT_TRUE(fetched_a); + EXPECT_EQ(1u, fetched_a->version); + + auto fetched_b = store.GetSegment("segB"); + ASSERT_TRUE(fetched_b); + EXPECT_EQ(2u, fetched_b->version); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_WithFlagTombstone) { + MemoryStore store; + Flag flag_a; + flag_a.version = 1; + flag_a.key = "flagA"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagA", FlagDescriptor(Tombstone(2))}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(2u, fetched->version); + EXPECT_FALSE(fetched->item.has_value()); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_TombstoneSkippedIfVersionNotNewer) { + MemoryStore store; + Flag flag_a; + flag_a.version = 5; + flag_a.key = "flagA"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + // Tombstone at version 3 < stored version 5: should be ignored. + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagA", FlagDescriptor(Tombstone(3))}}, + Selector{}, + }); + + auto fetched = store.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(5u, fetched->version); + EXPECT_TRUE(fetched->item.has_value()); +} + +TEST(MemoryStoreApplyTest, ApplyPartial_MixedStaleAndFreshItems) { + MemoryStore store; + Flag flag_a; + flag_a.version = 10; + flag_a.key = "flagA"; + + Flag flag_b; + flag_b.version = 1; + flag_b.key = "flagB"; + + store.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}, + {"flagB", FlagDescriptor(flag_b)}}, + std::unordered_map(), + }); + + Flag flag_a_stale; + flag_a_stale.version = 5; + flag_a_stale.key = "flagA"; + + Flag flag_b_new; + flag_b_new.version = 2; + flag_b_new.key = "flagB"; + + store.Apply(FDv2ChangeSet{ + FDv2ChangeSet::Type::kPartial, + std::vector{{"flagA", FlagDescriptor(flag_a_stale)}, + {"flagB", FlagDescriptor(flag_b_new)}}, + Selector{}, + }); + + // flagA version 5 < 10: skip. + EXPECT_EQ(10u, store.GetFlag("flagA")->version); + // flagB version 2 > 1: apply. + EXPECT_EQ(2u, store.GetFlag("flagB")->version); +} From cc59635bccfd2990cc53ae65f5256f83f1fb5027 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Fri, 27 Mar 2026 14:19:23 -0700 Subject: [PATCH 02/12] fix an unneeded copy, and warn on missing enum case --- .../memory_store/memory_store.cpp | 28 ++++++++++--------- .../memory_store/memory_store.hpp | 2 +- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp index 6e0365d7f..7dc5105e3 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp @@ -1,8 +1,6 @@ #include "memory_store.hpp" -#include - -#include +#include namespace launchdarkly::server_side::data_components { @@ -86,20 +84,24 @@ bool MemoryStore::RemoveSegment(std::string const& key) { return segments_.erase(key) == 1; } -void MemoryStore::Apply(data_model::FDv2ChangeSet const& changeSet) { +void MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { std::lock_guard lock{data_mutex_}; - if (changeSet.type == data_model::FDv2ChangeSet::Type::kNone) { - return; - } - - if (changeSet.type == data_model::FDv2ChangeSet::Type::kFull) { - initialized_ = true; - flags_.clear(); - segments_.clear(); + switch (changeSet.type) { + case data_model::FDv2ChangeSet::Type::kNone: + return; + case data_model::FDv2ChangeSet::Type::kPartial: + break; + case data_model::FDv2ChangeSet::Type::kFull: + initialized_ = true; + flags_.clear(); + segments_.clear(); + break; + default: + detail::unreachable(); } - for (auto change : changeSet.changes) { + for (auto& change : changeSet.changes) { if (std::holds_alternative(change.object)) { auto& flag_descriptor = std::get(change.object); diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp index 81846dfab..e9a067881 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp @@ -46,7 +46,7 @@ class MemoryStore final : public data_interfaces::IStore, bool RemoveSegment(std::string const& key); - void Apply(data_model::FDv2ChangeSet const& changeSet); + void Apply(data_model::FDv2ChangeSet changeSet); MemoryStore() = default; ~MemoryStore() override = default; From 82d09d26bd805d72012bf329d90933db9678aaef Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Fri, 27 Mar 2026 16:02:23 -0700 Subject: [PATCH 03/12] add an ApplyResult with the changed keys --- .../memory_store/memory_store.cpp | 17 ++- .../memory_store/memory_store.hpp | 8 +- .../tests/memory_store_apply_test.cpp | 123 +++++++++++++++--- 3 files changed, 125 insertions(+), 23 deletions(-) diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp index 7dc5105e3..7680853a7 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp @@ -84,15 +84,24 @@ bool MemoryStore::RemoveSegment(std::string const& key) { return segments_.erase(key) == 1; } -void MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { +ApplyResult MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { + ApplyResult result; std::lock_guard lock{data_mutex_}; switch (changeSet.type) { case data_model::FDv2ChangeSet::Type::kNone: - return; + return result; case data_model::FDv2ChangeSet::Type::kPartial: break; case data_model::FDv2ChangeSet::Type::kFull: + // When there's a full change, any current keys are considered + // changed, regardless of whether they are in the new set. + for (auto const& [key, _] : flags_) { + result.flags.insert(key); + } + for (auto const& [key, _] : segments_) { + result.segments.insert(key); + } initialized_ = true; flags_.clear(); segments_.clear(); @@ -114,6 +123,7 @@ void MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { flags_[change.key] = std::make_shared( std::move(flag_descriptor)); + result.flags.insert(change.key); } else if (std::holds_alternative( change.object)) { auto& segment_descriptor = @@ -129,8 +139,11 @@ void MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { segments_[change.key] = std::make_shared( std::move(segment_descriptor)); + result.segments.insert(change.key); } } + + return result; } } // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp index e9a067881..7712eb67e 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp @@ -9,9 +9,15 @@ #include #include #include +#include namespace launchdarkly::server_side::data_components { +struct ApplyResult { + std::unordered_set flags; + std::unordered_set segments; +}; + class MemoryStore final : public data_interfaces::IStore, public data_interfaces::IDestination { public: @@ -46,7 +52,7 @@ class MemoryStore final : public data_interfaces::IStore, bool RemoveSegment(std::string const& key); - void Apply(data_model::FDv2ChangeSet changeSet); + ApplyResult Apply(data_model::FDv2ChangeSet changeSet); MemoryStore() = default; ~MemoryStore() override = default; diff --git a/libs/server-sdk/tests/memory_store_apply_test.cpp b/libs/server-sdk/tests/memory_store_apply_test.cpp index 219e36a15..d0ab96868 100644 --- a/libs/server-sdk/tests/memory_store_apply_test.cpp +++ b/libs/server-sdk/tests/memory_store_apply_test.cpp @@ -27,7 +27,8 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { {"segA", SegmentDescriptor(seg_a)}}, }); - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + auto result = + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); auto fetched_flag = store.GetFlag("flagA"); ASSERT_TRUE(fetched_flag); @@ -35,6 +36,9 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { auto fetched_seg = store.GetSegment("segA"); ASSERT_TRUE(fetched_seg); EXPECT_EQ(1u, fetched_seg->version); + + EXPECT_TRUE(result.flags.empty()); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { @@ -60,7 +64,7 @@ TEST(MemoryStoreApplyTest, ApplyFull_WithFlag) { flag_a.version = 1; flag_a.key = "flagA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"flagA", FlagDescriptor(flag_a)}}, Selector{}, @@ -71,6 +75,10 @@ TEST(MemoryStoreApplyTest, ApplyFull_WithFlag) { EXPECT_TRUE(fetched->item.has_value()); EXPECT_EQ("flagA", fetched->item->key); EXPECT_EQ(1u, fetched->version); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyFull_WithSegment) { @@ -79,7 +87,7 @@ TEST(MemoryStoreApplyTest, ApplyFull_WithSegment) { seg_a.version = 1; seg_a.key = "segA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"segA", SegmentDescriptor(seg_a)}}, Selector{}, @@ -90,6 +98,10 @@ TEST(MemoryStoreApplyTest, ApplyFull_WithSegment) { EXPECT_TRUE(fetched->item.has_value()); EXPECT_EQ("segA", fetched->item->key); EXPECT_EQ(1u, fetched->version); + + EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(1u, result.segments.size()); + EXPECT_EQ(1u, result.segments.count("segA")); } TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingFlags) { @@ -113,7 +125,7 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingFlags) { flag_c.version = 1; flag_c.key = "flagC"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"flagC", FlagDescriptor(flag_c)}}, Selector{}, @@ -123,6 +135,13 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingFlags) { EXPECT_FALSE(store.GetFlag("flagB")); ASSERT_TRUE(store.GetFlag("flagC")); EXPECT_EQ("flagC", store.GetFlag("flagC")->item->key); + + // Cleared keys (flagA, flagB) and new key (flagC) all reported as changed. + ASSERT_EQ(3u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); + EXPECT_EQ(1u, result.flags.count("flagB")); + EXPECT_EQ(1u, result.flags.count("flagC")); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingSegments) { @@ -141,7 +160,7 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingSegments) { seg_b.version = 1; seg_b.key = "segB"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"segB", SegmentDescriptor(seg_b)}}, Selector{}, @@ -149,6 +168,12 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingSegments) { EXPECT_FALSE(store.GetSegment("segA")); ASSERT_TRUE(store.GetSegment("segB")); + + // Cleared key (segA) and new key (segB) both reported as changed. + EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(2u, result.segments.size()); + EXPECT_EQ(1u, result.segments.count("segA")); + EXPECT_EQ(1u, result.segments.count("segB")); } TEST(MemoryStoreApplyTest, ApplyFull_EmptyChangeSetClearsStore) { @@ -168,16 +193,22 @@ TEST(MemoryStoreApplyTest, ApplyFull_EmptyChangeSetClearsStore) { {"segA", SegmentDescriptor(seg_a)}}, }); - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); + auto result = + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); EXPECT_EQ(0u, store.AllFlags().size()); EXPECT_EQ(0u, store.AllSegments().size()); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); + ASSERT_EQ(1u, result.segments.size()); + EXPECT_EQ(1u, result.segments.count("segA")); } TEST(MemoryStoreApplyTest, ApplyFull_WithFlagTombstone) { MemoryStore store; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"flagA", FlagDescriptor(Tombstone(5))}}, Selector{}, @@ -187,12 +218,16 @@ TEST(MemoryStoreApplyTest, ApplyFull_WithFlagTombstone) { ASSERT_TRUE(fetched); EXPECT_EQ(5u, fetched->version); EXPECT_FALSE(fetched->item.has_value()); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyFull_WithSegmentTombstone) { MemoryStore store; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"segA", SegmentDescriptor(Tombstone(3))}}, Selector{}, @@ -202,6 +237,10 @@ TEST(MemoryStoreApplyTest, ApplyFull_WithSegmentTombstone) { ASSERT_TRUE(fetched); EXPECT_EQ(3u, fetched->version); EXPECT_FALSE(fetched->item.has_value()); + + EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(1u, result.segments.size()); + EXPECT_EQ(1u, result.segments.count("segA")); } // --------------------------------------------------------------------------- @@ -219,7 +258,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewFlag) { flag_a.version = 1; flag_a.key = "flagA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(flag_a)}}, Selector{}, @@ -230,6 +269,10 @@ TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewFlag) { EXPECT_TRUE(fetched->item.has_value()); EXPECT_EQ("flagA", fetched->item->key); EXPECT_EQ(1u, fetched->version); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewSegment) { @@ -243,7 +286,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewSegment) { seg_a.version = 1; seg_a.key = "segA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"segA", SegmentDescriptor(seg_a)}}, Selector{}, @@ -254,6 +297,10 @@ TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewSegment) { EXPECT_TRUE(fetched->item.has_value()); EXPECT_EQ("segA", fetched->item->key); EXPECT_EQ(1u, fetched->version); + + EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(1u, result.segments.size()); + EXPECT_EQ(1u, result.segments.count("segA")); } TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithLowerVersion) { @@ -272,7 +319,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithLowerVersion) { flag_a_stale.version = 3; flag_a_stale.key = "flagA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(flag_a_stale)}}, Selector{}, @@ -281,6 +328,9 @@ TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithLowerVersion) { auto fetched = store.GetFlag("flagA"); ASSERT_TRUE(fetched); EXPECT_EQ(5u, fetched->version); + + EXPECT_TRUE(result.flags.empty()); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithEqualVersion) { @@ -299,7 +349,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithEqualVersion) { flag_a_same.version = 5; flag_a_same.key = "flagA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(flag_a_same)}}, Selector{}, @@ -308,6 +358,9 @@ TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithEqualVersion) { auto fetched = store.GetFlag("flagA"); ASSERT_TRUE(fetched); EXPECT_EQ(5u, fetched->version); + + EXPECT_TRUE(result.flags.empty()); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFlagWithHigherVersion) { @@ -326,7 +379,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFlagWithHigherVersion) { flag_a_new.version = 6; flag_a_new.key = "flagA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(flag_a_new)}}, Selector{}, @@ -335,6 +388,10 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFlagWithHigherVersion) { auto fetched = store.GetFlag("flagA"); ASSERT_TRUE(fetched); EXPECT_EQ(6u, fetched->version); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_SkipsSegmentWithLowerVersion) { @@ -353,7 +410,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_SkipsSegmentWithLowerVersion) { seg_a_stale.version = 3; seg_a_stale.key = "segA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"segA", SegmentDescriptor(seg_a_stale)}}, Selector{}, @@ -362,6 +419,9 @@ TEST(MemoryStoreApplyTest, ApplyPartial_SkipsSegmentWithLowerVersion) { auto fetched = store.GetSegment("segA"); ASSERT_TRUE(fetched); EXPECT_EQ(5u, fetched->version); + + EXPECT_TRUE(result.flags.empty()); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_AppliesSegmentWithHigherVersion) { @@ -380,7 +440,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesSegmentWithHigherVersion) { seg_a_new.version = 6; seg_a_new.key = "segA"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"segA", SegmentDescriptor(seg_a_new)}}, Selector{}, @@ -389,6 +449,10 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesSegmentWithHigherVersion) { auto fetched = store.GetSegment("segA"); ASSERT_TRUE(fetched); EXPECT_EQ(6u, fetched->version); + + EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(1u, result.segments.size()); + EXPECT_EQ(1u, result.segments.count("segA")); } TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedFlags) { @@ -412,7 +476,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedFlags) { flag_b_new.version = 2; flag_b_new.key = "flagB"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagB", FlagDescriptor(flag_b_new)}}, Selector{}, @@ -425,6 +489,10 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedFlags) { auto fetched_b = store.GetFlag("flagB"); ASSERT_TRUE(fetched_b); EXPECT_EQ(2u, fetched_b->version); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagB")); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedSegments) { @@ -448,7 +516,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedSegments) { seg_b_new.version = 2; seg_b_new.key = "segB"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"segB", SegmentDescriptor(seg_b_new)}}, Selector{}, @@ -461,6 +529,10 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedSegments) { auto fetched_b = store.GetSegment("segB"); ASSERT_TRUE(fetched_b); EXPECT_EQ(2u, fetched_b->version); + + EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(1u, result.segments.size()); + EXPECT_EQ(1u, result.segments.count("segB")); } TEST(MemoryStoreApplyTest, ApplyPartial_WithFlagTombstone) { @@ -475,7 +547,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_WithFlagTombstone) { std::unordered_map(), }); - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(Tombstone(2))}}, Selector{}, @@ -485,6 +557,10 @@ TEST(MemoryStoreApplyTest, ApplyPartial_WithFlagTombstone) { ASSERT_TRUE(fetched); EXPECT_EQ(2u, fetched->version); EXPECT_FALSE(fetched->item.has_value()); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_TombstoneSkippedIfVersionNotNewer) { @@ -500,7 +576,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_TombstoneSkippedIfVersionNotNewer) { }); // Tombstone at version 3 < stored version 5: should be ignored. - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(Tombstone(3))}}, Selector{}, @@ -510,6 +586,9 @@ TEST(MemoryStoreApplyTest, ApplyPartial_TombstoneSkippedIfVersionNotNewer) { ASSERT_TRUE(fetched); EXPECT_EQ(5u, fetched->version); EXPECT_TRUE(fetched->item.has_value()); + + EXPECT_TRUE(result.flags.empty()); + EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyPartial_MixedStaleAndFreshItems) { @@ -537,7 +616,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_MixedStaleAndFreshItems) { flag_b_new.version = 2; flag_b_new.key = "flagB"; - store.Apply(FDv2ChangeSet{ + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(flag_a_stale)}, {"flagB", FlagDescriptor(flag_b_new)}}, @@ -548,4 +627,8 @@ TEST(MemoryStoreApplyTest, ApplyPartial_MixedStaleAndFreshItems) { EXPECT_EQ(10u, store.GetFlag("flagA")->version); // flagB version 2 > 1: apply. EXPECT_EQ(2u, store.GetFlag("flagB")->version); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagB")); + EXPECT_TRUE(result.segments.empty()); } From 259d8e901e9b4d2f76a85e63f423213e6cbf6340 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Fri, 27 Mar 2026 16:08:41 -0700 Subject: [PATCH 04/12] mark the apply result as nodiscard for now --- .../src/data_components/memory_store/memory_store.cpp | 2 +- .../src/data_components/memory_store/memory_store.hpp | 2 +- libs/server-sdk/tests/memory_store_apply_test.cpp | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp index 7680853a7..d8e9474af 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp @@ -85,8 +85,8 @@ bool MemoryStore::RemoveSegment(std::string const& key) { } ApplyResult MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { - ApplyResult result; std::lock_guard lock{data_mutex_}; + ApplyResult result; switch (changeSet.type) { case data_model::FDv2ChangeSet::Type::kNone: diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp index 7712eb67e..5013adcce 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp @@ -52,7 +52,7 @@ class MemoryStore final : public data_interfaces::IStore, bool RemoveSegment(std::string const& key); - ApplyResult Apply(data_model::FDv2ChangeSet changeSet); + [[nodiscard]] ApplyResult Apply(data_model::FDv2ChangeSet changeSet); MemoryStore() = default; ~MemoryStore() override = default; diff --git a/libs/server-sdk/tests/memory_store_apply_test.cpp b/libs/server-sdk/tests/memory_store_apply_test.cpp index d0ab96868..844076559 100644 --- a/libs/server-sdk/tests/memory_store_apply_test.cpp +++ b/libs/server-sdk/tests/memory_store_apply_test.cpp @@ -43,7 +43,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { MemoryStore store; - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + std::ignore = store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); EXPECT_FALSE(store.Initialized()); } @@ -54,7 +54,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { TEST(MemoryStoreApplyTest, ApplyFull_SetsInitialized) { MemoryStore store; ASSERT_FALSE(store.Initialized()); - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); + std::ignore = store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); EXPECT_TRUE(store.Initialized()); } From f5b82bf6240b0cec36943d2c8de7cd5162414df6 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Fri, 27 Mar 2026 16:19:50 -0700 Subject: [PATCH 05/12] simplify tests --- .../tests/memory_store_apply_test.cpp | 381 ++++++------------ 1 file changed, 120 insertions(+), 261 deletions(-) diff --git a/libs/server-sdk/tests/memory_store_apply_test.cpp b/libs/server-sdk/tests/memory_store_apply_test.cpp index 844076559..619fe380b 100644 --- a/libs/server-sdk/tests/memory_store_apply_test.cpp +++ b/libs/server-sdk/tests/memory_store_apply_test.cpp @@ -43,7 +43,8 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { MemoryStore store; - std::ignore = store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + std::ignore = store.Apply( + FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); EXPECT_FALSE(store.Initialized()); } @@ -54,57 +55,47 @@ TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { TEST(MemoryStoreApplyTest, ApplyFull_SetsInitialized) { MemoryStore store; ASSERT_FALSE(store.Initialized()); - std::ignore = store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); + std::ignore = store.Apply( + FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); EXPECT_TRUE(store.Initialized()); } -TEST(MemoryStoreApplyTest, ApplyFull_WithFlag) { +TEST(MemoryStoreApplyTest, ApplyFull_StoresItems) { MemoryStore store; Flag flag_a; flag_a.version = 1; flag_a.key = "flagA"; - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kFull, - std::vector{{"flagA", FlagDescriptor(flag_a)}}, - Selector{}, - }); - - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_TRUE(fetched->item.has_value()); - EXPECT_EQ("flagA", fetched->item->key); - EXPECT_EQ(1u, fetched->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyFull_WithSegment) { - MemoryStore store; Segment seg_a; seg_a.version = 1; seg_a.key = "segA"; auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, - std::vector{{"segA", SegmentDescriptor(seg_a)}}, + std::vector{{"flagA", FlagDescriptor(flag_a)}, + {"segA", SegmentDescriptor(seg_a)}}, Selector{}, }); - auto fetched = store.GetSegment("segA"); - ASSERT_TRUE(fetched); - EXPECT_TRUE(fetched->item.has_value()); - EXPECT_EQ("segA", fetched->item->key); - EXPECT_EQ(1u, fetched->version); + auto fetched_flag = store.GetFlag("flagA"); + ASSERT_TRUE(fetched_flag); + EXPECT_TRUE(fetched_flag->item.has_value()); + EXPECT_EQ("flagA", fetched_flag->item->key); + EXPECT_EQ(1u, fetched_flag->version); - EXPECT_TRUE(result.flags.empty()); + auto fetched_seg = store.GetSegment("segA"); + ASSERT_TRUE(fetched_seg); + EXPECT_TRUE(fetched_seg->item.has_value()); + EXPECT_EQ("segA", fetched_seg->item->key); + EXPECT_EQ(1u, fetched_seg->version); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); ASSERT_EQ(1u, result.segments.size()); EXPECT_EQ(1u, result.segments.count("segA")); } -TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingFlags) { +TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingItems) { MemoryStore store; Flag flag_a; flag_a.version = 1; @@ -114,63 +105,44 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingFlags) { flag_b.version = 1; flag_b.key = "flagB"; + Segment seg_a; + seg_a.version = 1; + seg_a.key = "segA"; + store.Init(SDKDataSet{ std::unordered_map{ {"flagA", FlagDescriptor(flag_a)}, {"flagB", FlagDescriptor(flag_b)}}, - std::unordered_map(), + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, }); Flag flag_c; flag_c.version = 1; flag_c.key = "flagC"; + Segment seg_b; + seg_b.version = 1; + seg_b.key = "segB"; + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, - std::vector{{"flagC", FlagDescriptor(flag_c)}}, + std::vector{{"flagC", FlagDescriptor(flag_c)}, + {"segB", SegmentDescriptor(seg_b)}}, Selector{}, }); EXPECT_FALSE(store.GetFlag("flagA")); EXPECT_FALSE(store.GetFlag("flagB")); ASSERT_TRUE(store.GetFlag("flagC")); - EXPECT_EQ("flagC", store.GetFlag("flagC")->item->key); + EXPECT_FALSE(store.GetSegment("segA")); + ASSERT_TRUE(store.GetSegment("segB")); - // Cleared keys (flagA, flagB) and new key (flagC) all reported as changed. + // Cleared keys and new keys all reported as changed. ASSERT_EQ(3u, result.flags.size()); EXPECT_EQ(1u, result.flags.count("flagA")); EXPECT_EQ(1u, result.flags.count("flagB")); EXPECT_EQ(1u, result.flags.count("flagC")); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingSegments) { - MemoryStore store; - Segment seg_a; - seg_a.version = 1; - seg_a.key = "segA"; - - store.Init(SDKDataSet{ - std::unordered_map(), - std::unordered_map{ - {"segA", SegmentDescriptor(seg_a)}}, - }); - - Segment seg_b; - seg_b.version = 1; - seg_b.key = "segB"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kFull, - std::vector{{"segB", SegmentDescriptor(seg_b)}}, - Selector{}, - }); - - EXPECT_FALSE(store.GetSegment("segA")); - ASSERT_TRUE(store.GetSegment("segB")); - - // Cleared key (segA) and new key (segB) both reported as changed. - EXPECT_TRUE(result.flags.empty()); ASSERT_EQ(2u, result.segments.size()); EXPECT_EQ(1u, result.segments.count("segA")); EXPECT_EQ(1u, result.segments.count("segB")); @@ -224,30 +196,11 @@ TEST(MemoryStoreApplyTest, ApplyFull_WithFlagTombstone) { EXPECT_TRUE(result.segments.empty()); } -TEST(MemoryStoreApplyTest, ApplyFull_WithSegmentTombstone) { - MemoryStore store; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kFull, - std::vector{{"segA", SegmentDescriptor(Tombstone(3))}}, - Selector{}, - }); - - auto fetched = store.GetSegment("segA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(3u, fetched->version); - EXPECT_FALSE(fetched->item.has_value()); - - EXPECT_TRUE(result.flags.empty()); - ASSERT_EQ(1u, result.segments.size()); - EXPECT_EQ(1u, result.segments.count("segA")); -} - // --------------------------------------------------------------------------- // kPartial tests // --------------------------------------------------------------------------- -TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewFlag) { +TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewItems) { MemoryStore store; store.Init(SDKDataSet{ std::unordered_map(), @@ -258,183 +211,137 @@ TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewFlag) { flag_a.version = 1; flag_a.key = "flagA"; - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a)}}, - Selector{}, - }); - - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_TRUE(fetched->item.has_value()); - EXPECT_EQ("flagA", fetched->item->key); - EXPECT_EQ(1u, fetched->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewSegment) { - MemoryStore store; - store.Init(SDKDataSet{ - std::unordered_map(), - std::unordered_map(), - }); - Segment seg_a; seg_a.version = 1; seg_a.key = "segA"; auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, - std::vector{{"segA", SegmentDescriptor(seg_a)}}, + std::vector{{"flagA", FlagDescriptor(flag_a)}, + {"segA", SegmentDescriptor(seg_a)}}, Selector{}, }); - auto fetched = store.GetSegment("segA"); - ASSERT_TRUE(fetched); - EXPECT_TRUE(fetched->item.has_value()); - EXPECT_EQ("segA", fetched->item->key); - EXPECT_EQ(1u, fetched->version); + auto fetched_flag = store.GetFlag("flagA"); + ASSERT_TRUE(fetched_flag); + EXPECT_TRUE(fetched_flag->item.has_value()); + EXPECT_EQ("flagA", fetched_flag->item->key); + EXPECT_EQ(1u, fetched_flag->version); - EXPECT_TRUE(result.flags.empty()); + auto fetched_seg = store.GetSegment("segA"); + ASSERT_TRUE(fetched_seg); + EXPECT_TRUE(fetched_seg->item.has_value()); + EXPECT_EQ("segA", fetched_seg->item->key); + EXPECT_EQ(1u, fetched_seg->version); + + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); ASSERT_EQ(1u, result.segments.size()); EXPECT_EQ(1u, result.segments.count("segA")); } -TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithLowerVersion) { +TEST(MemoryStoreApplyTest, ApplyPartial_SkipsStaleItems) { MemoryStore store; Flag flag_a; flag_a.version = 5; flag_a.key = "flagA"; + Segment seg_a; + seg_a.version = 5; + seg_a.key = "segA"; + store.Init(SDKDataSet{ std::unordered_map{ {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map(), + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, }); Flag flag_a_stale; flag_a_stale.version = 3; flag_a_stale.key = "flagA"; + Segment seg_a_stale; + seg_a_stale.version = 3; + seg_a_stale.key = "segA"; + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a_stale)}}, + std::vector{{"flagA", FlagDescriptor(flag_a_stale)}, + {"segA", SegmentDescriptor(seg_a_stale)}}, Selector{}, }); - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(5u, fetched->version); + ASSERT_TRUE(store.GetFlag("flagA")); + EXPECT_EQ(5u, store.GetFlag("flagA")->version); + ASSERT_TRUE(store.GetSegment("segA")); + EXPECT_EQ(5u, store.GetSegment("segA")->version); EXPECT_TRUE(result.flags.empty()); EXPECT_TRUE(result.segments.empty()); } -TEST(MemoryStoreApplyTest, ApplyPartial_SkipsFlagWithEqualVersion) { +TEST(MemoryStoreApplyTest, ApplyPartial_SkipsItemsWithEqualVersion) { MemoryStore store; Flag flag_a; flag_a.version = 5; flag_a.key = "flagA"; + Segment seg_a; + seg_a.version = 5; + seg_a.key = "segA"; + store.Init(SDKDataSet{ std::unordered_map{ {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map(), + std::unordered_map{ + {"segA", SegmentDescriptor(seg_a)}}, }); Flag flag_a_same; flag_a_same.version = 5; flag_a_same.key = "flagA"; + Segment seg_a_same; + seg_a_same.version = 5; + seg_a_same.key = "segA"; + auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a_same)}}, + std::vector{{"flagA", FlagDescriptor(flag_a_same)}, + {"segA", SegmentDescriptor(seg_a_same)}}, Selector{}, }); - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(5u, fetched->version); + ASSERT_TRUE(store.GetFlag("flagA")); + EXPECT_EQ(5u, store.GetFlag("flagA")->version); + ASSERT_TRUE(store.GetSegment("segA")); + EXPECT_EQ(5u, store.GetSegment("segA")->version); EXPECT_TRUE(result.flags.empty()); EXPECT_TRUE(result.segments.empty()); } -TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFlagWithHigherVersion) { +TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFreshItems) { MemoryStore store; Flag flag_a; flag_a.version = 5; flag_a.key = "flagA"; - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map(), - }); - - Flag flag_a_new; - flag_a_new.version = 6; - flag_a_new.key = "flagA"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a_new)}}, - Selector{}, - }); - - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(6u, fetched->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_SkipsSegmentWithLowerVersion) { - MemoryStore store; Segment seg_a; seg_a.version = 5; seg_a.key = "segA"; store.Init(SDKDataSet{ - std::unordered_map(), + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, std::unordered_map{ {"segA", SegmentDescriptor(seg_a)}}, }); - Segment seg_a_stale; - seg_a_stale.version = 3; - seg_a_stale.key = "segA"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"segA", SegmentDescriptor(seg_a_stale)}}, - Selector{}, - }); - - auto fetched = store.GetSegment("segA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(5u, fetched->version); - - EXPECT_TRUE(result.flags.empty()); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_AppliesSegmentWithHigherVersion) { - MemoryStore store; - Segment seg_a; - seg_a.version = 5; - seg_a.key = "segA"; - - store.Init(SDKDataSet{ - std::unordered_map(), - std::unordered_map{ - {"segA", SegmentDescriptor(seg_a)}}, - }); + Flag flag_a_new; + flag_a_new.version = 6; + flag_a_new.key = "flagA"; Segment seg_a_new; seg_a_new.version = 6; @@ -442,20 +349,23 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesSegmentWithHigherVersion) { auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, - std::vector{{"segA", SegmentDescriptor(seg_a_new)}}, + std::vector{{"flagA", FlagDescriptor(flag_a_new)}, + {"segA", SegmentDescriptor(seg_a_new)}}, Selector{}, }); - auto fetched = store.GetSegment("segA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(6u, fetched->version); + ASSERT_TRUE(store.GetFlag("flagA")); + EXPECT_EQ(6u, store.GetFlag("flagA")->version); + ASSERT_TRUE(store.GetSegment("segA")); + EXPECT_EQ(6u, store.GetSegment("segA")->version); - EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagA")); ASSERT_EQ(1u, result.segments.size()); EXPECT_EQ(1u, result.segments.count("segA")); } -TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedFlags) { +TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedItems) { MemoryStore store; Flag flag_a; flag_a.version = 1; @@ -465,38 +375,6 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedFlags) { flag_b.version = 1; flag_b.key = "flagB"; - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}, - {"flagB", FlagDescriptor(flag_b)}}, - std::unordered_map(), - }); - - Flag flag_b_new; - flag_b_new.version = 2; - flag_b_new.key = "flagB"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagB", FlagDescriptor(flag_b_new)}}, - Selector{}, - }); - - auto fetched_a = store.GetFlag("flagA"); - ASSERT_TRUE(fetched_a); - EXPECT_EQ(1u, fetched_a->version); - - auto fetched_b = store.GetFlag("flagB"); - ASSERT_TRUE(fetched_b); - EXPECT_EQ(2u, fetched_b->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagB")); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedSegments) { - MemoryStore store; Segment seg_a; seg_a.version = 1; seg_a.key = "segA"; @@ -506,31 +384,40 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedSegments) { seg_b.key = "segB"; store.Init(SDKDataSet{ - std::unordered_map(), + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}, + {"flagB", FlagDescriptor(flag_b)}}, std::unordered_map{ {"segA", SegmentDescriptor(seg_a)}, {"segB", SegmentDescriptor(seg_b)}}, }); + Flag flag_b_new; + flag_b_new.version = 2; + flag_b_new.key = "flagB"; + Segment seg_b_new; seg_b_new.version = 2; seg_b_new.key = "segB"; auto result = store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, - std::vector{{"segB", SegmentDescriptor(seg_b_new)}}, + std::vector{{"flagB", FlagDescriptor(flag_b_new)}, + {"segB", SegmentDescriptor(seg_b_new)}}, Selector{}, }); - auto fetched_a = store.GetSegment("segA"); - ASSERT_TRUE(fetched_a); - EXPECT_EQ(1u, fetched_a->version); - - auto fetched_b = store.GetSegment("segB"); - ASSERT_TRUE(fetched_b); - EXPECT_EQ(2u, fetched_b->version); + ASSERT_TRUE(store.GetFlag("flagA")); + EXPECT_EQ(1u, store.GetFlag("flagA")->version); + ASSERT_TRUE(store.GetFlag("flagB")); + EXPECT_EQ(2u, store.GetFlag("flagB")->version); + ASSERT_TRUE(store.GetSegment("segA")); + EXPECT_EQ(1u, store.GetSegment("segA")->version); + ASSERT_TRUE(store.GetSegment("segB")); + EXPECT_EQ(2u, store.GetSegment("segB")->version); - EXPECT_TRUE(result.flags.empty()); + ASSERT_EQ(1u, result.flags.size()); + EXPECT_EQ(1u, result.flags.count("flagB")); ASSERT_EQ(1u, result.segments.size()); EXPECT_EQ(1u, result.segments.count("segB")); } @@ -563,34 +450,6 @@ TEST(MemoryStoreApplyTest, ApplyPartial_WithFlagTombstone) { EXPECT_TRUE(result.segments.empty()); } -TEST(MemoryStoreApplyTest, ApplyPartial_TombstoneSkippedIfVersionNotNewer) { - MemoryStore store; - Flag flag_a; - flag_a.version = 5; - flag_a.key = "flagA"; - - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map(), - }); - - // Tombstone at version 3 < stored version 5: should be ignored. - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(Tombstone(3))}}, - Selector{}, - }); - - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(5u, fetched->version); - EXPECT_TRUE(fetched->item.has_value()); - - EXPECT_TRUE(result.flags.empty()); - EXPECT_TRUE(result.segments.empty()); -} - TEST(MemoryStoreApplyTest, ApplyPartial_MixedStaleAndFreshItems) { MemoryStore store; Flag flag_a; From 7826357582c1e65c305951b6c6a7a31a13c17144 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Mon, 30 Mar 2026 10:14:54 -0700 Subject: [PATCH 06/12] simplify, since FDv2 doesnt require version checking in memory store --- .../memory_store/memory_store.cpp | 42 +-- .../memory_store/memory_store.hpp | 8 +- .../tests/memory_store_apply_test.cpp | 286 +----------------- 3 files changed, 14 insertions(+), 322 deletions(-) diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp index d8e9474af..c71acf08a 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp @@ -84,24 +84,15 @@ bool MemoryStore::RemoveSegment(std::string const& key) { return segments_.erase(key) == 1; } -ApplyResult MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { +void MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { std::lock_guard lock{data_mutex_}; - ApplyResult result; switch (changeSet.type) { case data_model::FDv2ChangeSet::Type::kNone: - return result; + return; case data_model::FDv2ChangeSet::Type::kPartial: break; case data_model::FDv2ChangeSet::Type::kFull: - // When there's a full change, any current keys are considered - // changed, regardless of whether they are in the new set. - for (auto const& [key, _] : flags_) { - result.flags.insert(key); - } - for (auto const& [key, _] : segments_) { - result.segments.insert(key); - } initialized_ = true; flags_.clear(); segments_.clear(); @@ -112,38 +103,15 @@ ApplyResult MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { for (auto& change : changeSet.changes) { if (std::holds_alternative(change.object)) { - auto& flag_descriptor = - std::get(change.object); - - auto existing_flag = flags_.find(change.key); - if (existing_flag != flags_.end() && - existing_flag->second->version >= flag_descriptor.version) { - continue; - } - flags_[change.key] = std::make_shared( - std::move(flag_descriptor)); - result.flags.insert(change.key); + std::move(std::get(change.object))); } else if (std::holds_alternative( change.object)) { - auto& segment_descriptor = - std::get(change.object); - - auto existing_segment = segments_.find(change.key); - if (existing_segment != segments_.end() && - existing_segment->second->version >= - segment_descriptor.version) { - continue; - } - segments_[change.key] = - std::make_shared( - std::move(segment_descriptor)); - result.segments.insert(change.key); + std::make_shared(std::move( + std::get(change.object))); } } - - return result; } } // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp index 5013adcce..e9a067881 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp @@ -9,15 +9,9 @@ #include #include #include -#include namespace launchdarkly::server_side::data_components { -struct ApplyResult { - std::unordered_set flags; - std::unordered_set segments; -}; - class MemoryStore final : public data_interfaces::IStore, public data_interfaces::IDestination { public: @@ -52,7 +46,7 @@ class MemoryStore final : public data_interfaces::IStore, bool RemoveSegment(std::string const& key); - [[nodiscard]] ApplyResult Apply(data_model::FDv2ChangeSet changeSet); + void Apply(data_model::FDv2ChangeSet changeSet); MemoryStore() = default; ~MemoryStore() override = default; diff --git a/libs/server-sdk/tests/memory_store_apply_test.cpp b/libs/server-sdk/tests/memory_store_apply_test.cpp index 619fe380b..003285c53 100644 --- a/libs/server-sdk/tests/memory_store_apply_test.cpp +++ b/libs/server-sdk/tests/memory_store_apply_test.cpp @@ -27,8 +27,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { {"segA", SegmentDescriptor(seg_a)}}, }); - auto result = - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); auto fetched_flag = store.GetFlag("flagA"); ASSERT_TRUE(fetched_flag); @@ -36,15 +35,11 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { auto fetched_seg = store.GetSegment("segA"); ASSERT_TRUE(fetched_seg); EXPECT_EQ(1u, fetched_seg->version); - - EXPECT_TRUE(result.flags.empty()); - EXPECT_TRUE(result.segments.empty()); } TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { MemoryStore store; - std::ignore = store.Apply( - FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); EXPECT_FALSE(store.Initialized()); } @@ -55,8 +50,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { TEST(MemoryStoreApplyTest, ApplyFull_SetsInitialized) { MemoryStore store; ASSERT_FALSE(store.Initialized()); - std::ignore = store.Apply( - FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); + store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); EXPECT_TRUE(store.Initialized()); } @@ -70,7 +64,7 @@ TEST(MemoryStoreApplyTest, ApplyFull_StoresItems) { seg_a.version = 1; seg_a.key = "segA"; - auto result = store.Apply(FDv2ChangeSet{ + store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"flagA", FlagDescriptor(flag_a)}, {"segA", SegmentDescriptor(seg_a)}}, @@ -88,11 +82,6 @@ TEST(MemoryStoreApplyTest, ApplyFull_StoresItems) { EXPECT_TRUE(fetched_seg->item.has_value()); EXPECT_EQ("segA", fetched_seg->item->key); EXPECT_EQ(1u, fetched_seg->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - ASSERT_EQ(1u, result.segments.size()); - EXPECT_EQ(1u, result.segments.count("segA")); } TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingItems) { @@ -125,7 +114,7 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingItems) { seg_b.version = 1; seg_b.key = "segB"; - auto result = store.Apply(FDv2ChangeSet{ + store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kFull, std::vector{{"flagC", FlagDescriptor(flag_c)}, {"segB", SegmentDescriptor(seg_b)}}, @@ -137,192 +126,13 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingItems) { ASSERT_TRUE(store.GetFlag("flagC")); EXPECT_FALSE(store.GetSegment("segA")); ASSERT_TRUE(store.GetSegment("segB")); - - // Cleared keys and new keys all reported as changed. - ASSERT_EQ(3u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - EXPECT_EQ(1u, result.flags.count("flagB")); - EXPECT_EQ(1u, result.flags.count("flagC")); - ASSERT_EQ(2u, result.segments.size()); - EXPECT_EQ(1u, result.segments.count("segA")); - EXPECT_EQ(1u, result.segments.count("segB")); -} - -TEST(MemoryStoreApplyTest, ApplyFull_EmptyChangeSetClearsStore) { - MemoryStore store; - Flag flag_a; - flag_a.version = 1; - flag_a.key = "flagA"; - - Segment seg_a; - seg_a.version = 1; - seg_a.key = "segA"; - - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map{ - {"segA", SegmentDescriptor(seg_a)}}, - }); - - auto result = - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); - - EXPECT_EQ(0u, store.AllFlags().size()); - EXPECT_EQ(0u, store.AllSegments().size()); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - ASSERT_EQ(1u, result.segments.size()); - EXPECT_EQ(1u, result.segments.count("segA")); -} - -TEST(MemoryStoreApplyTest, ApplyFull_WithFlagTombstone) { - MemoryStore store; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kFull, - std::vector{{"flagA", FlagDescriptor(Tombstone(5))}}, - Selector{}, - }); - - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(5u, fetched->version); - EXPECT_FALSE(fetched->item.has_value()); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - EXPECT_TRUE(result.segments.empty()); } // --------------------------------------------------------------------------- // kPartial tests // --------------------------------------------------------------------------- -TEST(MemoryStoreApplyTest, ApplyPartial_UpsertsNewItems) { - MemoryStore store; - store.Init(SDKDataSet{ - std::unordered_map(), - std::unordered_map(), - }); - - Flag flag_a; - flag_a.version = 1; - flag_a.key = "flagA"; - - Segment seg_a; - seg_a.version = 1; - seg_a.key = "segA"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a)}, - {"segA", SegmentDescriptor(seg_a)}}, - Selector{}, - }); - - auto fetched_flag = store.GetFlag("flagA"); - ASSERT_TRUE(fetched_flag); - EXPECT_TRUE(fetched_flag->item.has_value()); - EXPECT_EQ("flagA", fetched_flag->item->key); - EXPECT_EQ(1u, fetched_flag->version); - - auto fetched_seg = store.GetSegment("segA"); - ASSERT_TRUE(fetched_seg); - EXPECT_TRUE(fetched_seg->item.has_value()); - EXPECT_EQ("segA", fetched_seg->item->key); - EXPECT_EQ(1u, fetched_seg->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - ASSERT_EQ(1u, result.segments.size()); - EXPECT_EQ(1u, result.segments.count("segA")); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_SkipsStaleItems) { - MemoryStore store; - Flag flag_a; - flag_a.version = 5; - flag_a.key = "flagA"; - - Segment seg_a; - seg_a.version = 5; - seg_a.key = "segA"; - - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map{ - {"segA", SegmentDescriptor(seg_a)}}, - }); - - Flag flag_a_stale; - flag_a_stale.version = 3; - flag_a_stale.key = "flagA"; - - Segment seg_a_stale; - seg_a_stale.version = 3; - seg_a_stale.key = "segA"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a_stale)}, - {"segA", SegmentDescriptor(seg_a_stale)}}, - Selector{}, - }); - - ASSERT_TRUE(store.GetFlag("flagA")); - EXPECT_EQ(5u, store.GetFlag("flagA")->version); - ASSERT_TRUE(store.GetSegment("segA")); - EXPECT_EQ(5u, store.GetSegment("segA")->version); - - EXPECT_TRUE(result.flags.empty()); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_SkipsItemsWithEqualVersion) { - MemoryStore store; - Flag flag_a; - flag_a.version = 5; - flag_a.key = "flagA"; - - Segment seg_a; - seg_a.version = 5; - seg_a.key = "segA"; - - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map{ - {"segA", SegmentDescriptor(seg_a)}}, - }); - - Flag flag_a_same; - flag_a_same.version = 5; - flag_a_same.key = "flagA"; - - Segment seg_a_same; - seg_a_same.version = 5; - seg_a_same.key = "segA"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a_same)}, - {"segA", SegmentDescriptor(seg_a_same)}}, - Selector{}, - }); - - ASSERT_TRUE(store.GetFlag("flagA")); - EXPECT_EQ(5u, store.GetFlag("flagA")->version); - ASSERT_TRUE(store.GetSegment("segA")); - EXPECT_EQ(5u, store.GetSegment("segA")->version); - - EXPECT_TRUE(result.flags.empty()); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFreshItems) { +TEST(MemoryStoreApplyTest, ApplyPartial_AppliesItems) { MemoryStore store; Flag flag_a; flag_a.version = 5; @@ -347,7 +157,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFreshItems) { seg_a_new.version = 6; seg_a_new.key = "segA"; - auto result = store.Apply(FDv2ChangeSet{ + store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagA", FlagDescriptor(flag_a_new)}, {"segA", SegmentDescriptor(seg_a_new)}}, @@ -358,11 +168,6 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesFreshItems) { EXPECT_EQ(6u, store.GetFlag("flagA")->version); ASSERT_TRUE(store.GetSegment("segA")); EXPECT_EQ(6u, store.GetSegment("segA")->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - ASSERT_EQ(1u, result.segments.size()); - EXPECT_EQ(1u, result.segments.count("segA")); } TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedItems) { @@ -400,7 +205,7 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedItems) { seg_b_new.version = 2; seg_b_new.key = "segB"; - auto result = store.Apply(FDv2ChangeSet{ + store.Apply(FDv2ChangeSet{ FDv2ChangeSet::Type::kPartial, std::vector{{"flagB", FlagDescriptor(flag_b_new)}, {"segB", SegmentDescriptor(seg_b_new)}}, @@ -415,79 +220,4 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedItems) { EXPECT_EQ(1u, store.GetSegment("segA")->version); ASSERT_TRUE(store.GetSegment("segB")); EXPECT_EQ(2u, store.GetSegment("segB")->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagB")); - ASSERT_EQ(1u, result.segments.size()); - EXPECT_EQ(1u, result.segments.count("segB")); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_WithFlagTombstone) { - MemoryStore store; - Flag flag_a; - flag_a.version = 1; - flag_a.key = "flagA"; - - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}}, - std::unordered_map(), - }); - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(Tombstone(2))}}, - Selector{}, - }); - - auto fetched = store.GetFlag("flagA"); - ASSERT_TRUE(fetched); - EXPECT_EQ(2u, fetched->version); - EXPECT_FALSE(fetched->item.has_value()); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagA")); - EXPECT_TRUE(result.segments.empty()); -} - -TEST(MemoryStoreApplyTest, ApplyPartial_MixedStaleAndFreshItems) { - MemoryStore store; - Flag flag_a; - flag_a.version = 10; - flag_a.key = "flagA"; - - Flag flag_b; - flag_b.version = 1; - flag_b.key = "flagB"; - - store.Init(SDKDataSet{ - std::unordered_map{ - {"flagA", FlagDescriptor(flag_a)}, - {"flagB", FlagDescriptor(flag_b)}}, - std::unordered_map(), - }); - - Flag flag_a_stale; - flag_a_stale.version = 5; - flag_a_stale.key = "flagA"; - - Flag flag_b_new; - flag_b_new.version = 2; - flag_b_new.key = "flagB"; - - auto result = store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a_stale)}, - {"flagB", FlagDescriptor(flag_b_new)}}, - Selector{}, - }); - - // flagA version 5 < 10: skip. - EXPECT_EQ(10u, store.GetFlag("flagA")->version); - // flagB version 2 > 1: apply. - EXPECT_EQ(2u, store.GetFlag("flagB")->version); - - ASSERT_EQ(1u, result.flags.size()); - EXPECT_EQ(1u, result.flags.count("flagB")); - EXPECT_TRUE(result.segments.empty()); } From 651a780e8dddf04e13065f741706c51b13feb17d Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Mon, 30 Mar 2026 17:27:55 -0700 Subject: [PATCH 07/12] refactor: define new Synchronizer and Initializer interfaces for FDv2 --- .../source/fdv2_source_result.hpp | 65 +++++++++++++++++++ .../source/ifdv2_initializer.hpp | 46 +++++++++++++ .../source/ifdv2_synchronizer.hpp | 57 ++++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp create mode 100644 libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp create mode 100644 libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp diff --git a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp new file mode 100644 index 000000000..2d7dd435b --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_interfaces { + +/** + * Result returned by IFDv2Initializer::Run and IFDv2Synchronizer::Next. + * + * Mirrors Java's FDv2SourceResult. + */ +struct FDv2SourceResult { + using ErrorInfo = common::data_sources::DataSourceStatusErrorInfo; + + /** + * A changeset was successfully received and is ready to apply. + */ + struct ChangeSet { + data_model::FDv2ChangeSet change_set; + /** If true, the server signaled that the client should fall back to + * FDv1. */ + bool fdv1_fallback; + }; + + /** + * A transient error occurred; the source may recover. + */ + struct Interrupted { + ErrorInfo error; + bool fdv1_fallback; + }; + + /** + * A non-recoverable error occurred; the source should not be retried. + */ + struct TerminalError { + ErrorInfo error; + bool fdv1_fallback; + }; + + /** + * The source was closed cleanly (via Close()). + */ + struct Shutdown {}; + + /** + * The server sent a goodbye; the orchestrator should rotate sources. + */ + struct Goodbye { + std::optional reason; + bool fdv1_fallback; + }; + + using Value = + std::variant; + + Value value; +}; + +} // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp new file mode 100644 index 000000000..5c2608cd1 --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "fdv2_source_result.hpp" + +#include + +namespace launchdarkly::server_side::data_interfaces { + +/** + * Defines a one-shot data source that runs to completion and returns a single + * result. Used during the initialization phase of FDv2, before handing off to + * an IFDv2Synchronizer. + */ +class IFDv2Initializer { + public: + /** + * Run the initializer to completion. Blocks until a result is available. + * Called at most once per instance. + * + * Close() may be called from another thread to unblock Run(), in which + * case Run() returns FDv2SourceResult::Shutdown. + */ + virtual FDv2SourceResult Run() = 0; + + /** + * Unblocks any in-progress Run() call, causing it to return + * FDv2SourceResult::Shutdown. + */ + virtual void Close() = 0; + + /** + * @return A display-suitable name of the initializer. + */ + [[nodiscard]] virtual std::string const& Identity() const = 0; + + virtual ~IFDv2Initializer() = default; + IFDv2Initializer(IFDv2Initializer const&) = delete; + IFDv2Initializer(IFDv2Initializer&&) = delete; + IFDv2Initializer& operator=(IFDv2Initializer const&) = delete; + IFDv2Initializer& operator=(IFDv2Initializer&&) = delete; + + protected: + IFDv2Initializer() = default; +}; + +} // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp new file mode 100644 index 000000000..4ba11872d --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp @@ -0,0 +1,57 @@ +#pragma once + +#include "fdv2_source_result.hpp" + +#include +#include + +namespace launchdarkly::server_side::data_interfaces { + +/** + * Defines a continuous data source that produces a stream of results. Used + * during the synchronization phase of FDv2, after initialization is complete. + * + * The stream is started lazily on the first call to Next(). The synchronizer + * runs until Close() is called. + */ +class IFDv2Synchronizer { + public: + /** + * Block until the next result is available or the timeout expires. + * + * On the first call, the synchronizer starts its underlying connection. + * Subsequent calls continue reading from the same connection. + * + * If the timeout expires before a result arrives, returns + * FDv2SourceResult::Interrupted. The orchestrator uses this to evaluate + * fallback conditions. + * + * Close() may be called from another thread to unblock Next(), in which + * case Next() returns FDv2SourceResult::Shutdown. + * + * @param timeout Maximum time to wait for the next result. + */ + virtual FDv2SourceResult Next(std::chrono::milliseconds timeout) = 0; + + /** + * Unblocks any in-progress Next() call, causing it to return + * FDv2SourceResult::Shutdown, and releases underlying resources. + */ + virtual void Close() = 0; + + /** + * @return A display-suitable name of the synchronizer. + */ + [[nodiscard]] virtual std::string const& Identity() const = 0; + + virtual ~IFDv2Synchronizer() = default; + IFDv2Synchronizer(IFDv2Synchronizer const&) = delete; + IFDv2Synchronizer(IFDv2Synchronizer&&) = delete; + IFDv2Synchronizer& operator=(IFDv2Synchronizer const&) = delete; + IFDv2Synchronizer& operator=(IFDv2Synchronizer&&) = delete; + + protected: + IFDv2Synchronizer() = default; +}; + +} // namespace launchdarkly::server_side::data_interfaces From fe37f5571e03fe214be497a8be5e07bd1bcfd0d7 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Thu, 2 Apr 2026 10:42:20 -0700 Subject: [PATCH 08/12] move selector source into Next --- .gitignore | 1 + .../src/data_interfaces/source/ifdv2_synchronizer.hpp | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 76143cd10..0dac51100 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,7 @@ build-dynamic build-static-debug build-dynamic-debug cmake-build-* +.cache # For Macs.. .DS_Store diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp index 4ba11872d..9a555d24e 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp @@ -2,6 +2,8 @@ #include "fdv2_source_result.hpp" +#include + #include #include @@ -29,9 +31,12 @@ class IFDv2Synchronizer { * Close() may be called from another thread to unblock Next(), in which * case Next() returns FDv2SourceResult::Shutdown. * - * @param timeout Maximum time to wait for the next result. + * @param timeout Maximum time to wait for the next result. + * @param selector The selector to send with the request, reflecting any + * changesets applied since the previous call. */ - virtual FDv2SourceResult Next(std::chrono::milliseconds timeout) = 0; + virtual FDv2SourceResult Next(std::chrono::milliseconds timeout, + data_model::Selector selector) = 0; /** * Unblocks any in-progress Next() call, causing it to return From ed228574d21d1f3e8b970448c7b412f164e4bc8b Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Thu, 2 Apr 2026 11:40:30 -0700 Subject: [PATCH 09/12] distinguish between timeouts and errors --- .../src/data_interfaces/source/fdv2_source_result.hpp | 9 +++++++-- .../src/data_interfaces/source/ifdv2_synchronizer.hpp | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp index 2d7dd435b..069fe5a38 100644 --- a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp +++ b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp @@ -56,8 +56,13 @@ struct FDv2SourceResult { bool fdv1_fallback; }; - using Value = - std::variant; + /** + * Next() returned because the timeout expired before a result arrived. + */ + struct Timeout {}; + + using Value = std::variant; Value value; }; diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp index 9a555d24e..a56bacee3 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp @@ -25,7 +25,7 @@ class IFDv2Synchronizer { * Subsequent calls continue reading from the same connection. * * If the timeout expires before a result arrives, returns - * FDv2SourceResult::Interrupted. The orchestrator uses this to evaluate + * FDv2SourceResult::Timeout. The orchestrator uses this to evaluate * fallback conditions. * * Close() may be called from another thread to unblock Next(), in which From f0013dac3ee12070944582fd7d1f0b75416a961d Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Thu, 2 Apr 2026 10:32:30 -0700 Subject: [PATCH 10/12] refactor: implement fdv2 polling initializer / synchronizer --- .../launchdarkly/fdv2_protocol_handler.hpp | 62 ++++ libs/internal/src/CMakeLists.txt | 1 + libs/internal/src/fdv2_protocol_handler.cpp | 220 +++++++++++++ .../tests/fdv2_protocol_handler_test.cpp | 277 ++++++++++++++++ libs/server-sdk/src/CMakeLists.txt | 4 + .../data_systems/fdv2/polling_initializer.cpp | 254 +++++++++++++++ .../data_systems/fdv2/polling_initializer.hpp | 56 ++++ .../fdv2/polling_synchronizer.cpp | 295 ++++++++++++++++++ .../fdv2/polling_synchronizer.hpp | 71 +++++ 9 files changed, 1240 insertions(+) create mode 100644 libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp create mode 100644 libs/internal/src/fdv2_protocol_handler.cpp create mode 100644 libs/internal/tests/fdv2_protocol_handler_test.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp diff --git a/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp new file mode 100644 index 000000000..140d38dfc --- /dev/null +++ b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp @@ -0,0 +1,62 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +namespace launchdarkly { + +/** + * Protocol state machine for the FDv2 wire format. + * + * Accumulates put-object and delete-object events between a server-intent + * and payload-transferred event, then emits a complete FDv2ChangeSet. + * + * Shared between the polling and streaming synchronizers. + */ +class FDv2ProtocolHandler { + public: + /** + * Result of handling a single FDv2 event: + * - monostate: no output yet (accumulating, heartbeat, or unknown event) + * - FDv2ChangeSet: complete changeset ready to apply + * - FDv2Error: server reported an error; discard partial data + * - Goodbye: server is closing; caller should rotate sources + */ + using Result = std::variant; + + /** + * Process one FDv2 event. + * + * @param event_type The event type string (e.g. "server-intent", + * "put-object", "payload-transferred"). + * @param data The parsed JSON value for the event's data field. + * @return A Result indicating what (if anything) the caller + * should act on. + */ + Result HandleEvent(std::string_view event_type, + boost::json::value const& data); + + /** + * Reset accumulated state. Call on reconnect before processing new events. + */ + void Reset(); + + FDv2ProtocolHandler() = default; + + private: + enum class State { kInactive, kFull, kPartial }; + + State state_ = State::kInactive; + std::vector changes_; +}; + +} // namespace launchdarkly diff --git a/libs/internal/src/CMakeLists.txt b/libs/internal/src/CMakeLists.txt index 44600638b..8ff998499 100644 --- a/libs/internal/src/CMakeLists.txt +++ b/libs/internal/src/CMakeLists.txt @@ -35,6 +35,7 @@ set(INTERNAL_SOURCES serialization/value_mapping.cpp serialization/json_evaluation_result.cpp serialization/json_fdv2_events.cpp + fdv2_protocol_handler.cpp serialization/json_sdk_data_set.cpp serialization/json_segment.cpp serialization/json_primitives.cpp diff --git a/libs/internal/src/fdv2_protocol_handler.cpp b/libs/internal/src/fdv2_protocol_handler.cpp new file mode 100644 index 000000000..0e6b7380c --- /dev/null +++ b/libs/internal/src/fdv2_protocol_handler.cpp @@ -0,0 +1,220 @@ +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace launchdarkly { + +static char const* const kServerIntent = "server-intent"; +static char const* const kPutObject = "put-object"; +static char const* const kDeleteObject = "delete-object"; +static char const* const kPayloadTransferred = "payload-transferred"; +static char const* const kError = "error"; +static char const* const kGoodbye = "goodbye"; + +// Returns the parsed FDv2Change on success, nullopt for unknown kinds (which +// should be silently skipped for forward-compatibility), or an error string if +// a known kind fails to deserialize. +static tl::expected, std::string> +ParsePut(PutObject const& put) { + if (put.kind == "flag") { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + put.object); + // One bad flag aborts the entire transfer so the store is never + // left in a partially-updated state. + if (!result) { + return tl::make_unexpected("could not deserialize flag '" + + put.key + "'"); + } + if (!result->has_value()) { + return tl::make_unexpected("flag '" + put.key + "' object was null"); + } + return data_model::FDv2Change{ + put.key, + data_model::ItemDescriptor{std::move(**result)}}; + } + if (put.kind == "segment") { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + put.object); + // One bad segment aborts the entire transfer so the store is never + // left in a partially-updated state. + if (!result) { + return tl::make_unexpected("could not deserialize segment '" + + put.key + "'"); + } + if (!result->has_value()) { + return tl::make_unexpected("segment '" + put.key + + "' object was null"); + } + return data_model::FDv2Change{ + put.key, + data_model::ItemDescriptor{ + std::move(**result)}}; + } + // Silently skip unknown kinds for forward-compatibility. + return std::nullopt; +} + +static data_model::FDv2Change MakeDeleteChange(DeleteObject const& del) { + if (del.kind == "flag") { + return data_model::FDv2Change{ + del.key, + data_model::ItemDescriptor{ + data_model::Tombstone{static_cast(del.version)}}}; + } + return data_model::FDv2Change{ + del.key, + data_model::ItemDescriptor{ + data_model::Tombstone{static_cast(del.version)}}}; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleEvent( + std::string_view event_type, + boost::json::value const& data) { + if (event_type == kServerIntent) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, "could not deserialize server-intent"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "server-intent data was null"}; + } + auto const& intent = **result; + if (intent.payloads.empty()) { + return std::monostate{}; + } + auto const& code = intent.payloads[0].intent_code; + changes_.clear(); + if (code == IntentCode::kTransferFull) { + state_ = State::kFull; + } else if (code == IntentCode::kTransferChanges) { + state_ = State::kPartial; + } else { + // kNone or kUnknown: emit an empty changeset immediately. + state_ = State::kInactive; + return data_model::FDv2ChangeSet{data_model::FDv2ChangeSet::Type::kNone, + {}, + data_model::Selector{}}; + } + return std::monostate{}; + } + + if (event_type == kPutObject) { + if (state_ == State::kInactive) { + return std::monostate{}; + } + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, "could not deserialize put-object"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "put-object data was null"}; + } + auto change = ParsePut(**result); + if (!change) { + Reset(); + return FDv2Error{std::nullopt, std::move(change.error())}; + } + if (*change) { + changes_.push_back(std::move(**change)); + } + return std::monostate{}; + } + + if (event_type == kDeleteObject) { + if (state_ == State::kInactive) { + return std::monostate{}; + } + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, "could not deserialize delete-object"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "delete-object data was null"}; + } + auto const& del = **result; + // Silently skip unknown kinds for forward-compatibility. + if (del.kind != "flag" && del.kind != "segment") { + return std::monostate{}; + } + changes_.push_back(MakeDeleteChange(del)); + return std::monostate{}; + } + + if (event_type == kPayloadTransferred) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, + "could not deserialize payload-transferred"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "payload-transferred data was null"}; + } + auto const& transferred = **result; + auto type = (state_ == State::kPartial) + ? data_model::FDv2ChangeSet::Type::kPartial + : data_model::FDv2ChangeSet::Type::kFull; + data_model::FDv2ChangeSet changeset{ + type, + std::move(changes_), + data_model::Selector{data_model::Selector::State{ + transferred.version, transferred.state}}}; + Reset(); + return changeset; + } + + if (event_type == kError) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + Reset(); + if (!result) { + return FDv2Error{std::nullopt, "could not deserialize error event"}; + } + if (!result->has_value()) { + return FDv2Error{std::nullopt, "error event data was null"}; + } + return **result; + } + + if (event_type == kGoodbye) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + return Goodbye{std::nullopt}; + } + if (!result->has_value()) { + return Goodbye{std::nullopt}; + } + return **result; + } + + // heartbeat and unrecognized events: no-op. + return std::monostate{}; +} + +void FDv2ProtocolHandler::Reset() { + state_ = State::kInactive; + changes_.clear(); +} + +} // namespace launchdarkly diff --git a/libs/internal/tests/fdv2_protocol_handler_test.cpp b/libs/internal/tests/fdv2_protocol_handler_test.cpp new file mode 100644 index 000000000..b6b12087e --- /dev/null +++ b/libs/internal/tests/fdv2_protocol_handler_test.cpp @@ -0,0 +1,277 @@ +#include + +#include + +#include + +using namespace launchdarkly; + +// Minimal valid flag JSON accepted by the existing Flag deserializer. +static char const* const kFlagJson = + R"({"key":"my-flag","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1})"; + +// Minimal valid segment JSON accepted by the existing Segment deserializer. +static char const* const kSegmentJson = + R"({"key":"my-seg","version":2,"rules":[],"included":[],"excluded":[]})"; + +// Build a server-intent event data value. +static boost::json::value MakeServerIntent(std::string const& intent_code) { + return boost::json::parse( + R"({"payloads":[{"id":"p1","target":1,"intentCode":")" + intent_code + + R"("}]})"); +} + +static boost::json::value MakePutObject(std::string const& kind, + std::string const& key, + std::string const& object_json) { + return boost::json::parse(R"({"version":1,"kind":")" + kind + + R"(","key":")" + key + + R"(","object":)" + object_json + "}"); +} + +static boost::json::value MakeDeleteObject(std::string const& kind, + std::string const& key, + int version) { + return boost::json::parse(R"({"version":)" + std::to_string(version) + + R"(,"kind":")" + kind + R"(","key":")" + key + + R"("})"); +} + +static boost::json::value MakePayloadTransferred(std::string const& state, + int version) { + return boost::json::parse(R"({"state":")" + state + R"(","version":)" + + std::to_string(version) + "}"); +} + +// ============================================================================ +// kNone intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, NoneIntentEmitsEmptyChangeSetImmediately) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent("server-intent", MakeServerIntent("none")); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kNone); + EXPECT_TRUE(cs->changes.empty()); + EXPECT_FALSE(cs->selector.value.has_value()); +} + +// ============================================================================ +// kTransferFull intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, FullIntentEmitsChangeSetOnPayloadTransferred) { + FDv2ProtocolHandler handler; + + auto r1 = handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + EXPECT_TRUE(std::holds_alternative(r1)); + + auto r2 = handler.HandleEvent( + "put-object", MakePutObject("flag", "my-flag", kFlagJson)); + EXPECT_TRUE(std::holds_alternative(r2)); + + auto r3 = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("state-abc", 7)); + + auto* cs = std::get_if(&r3); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-flag"); + ASSERT_TRUE(cs->selector.value.has_value()); + EXPECT_EQ(cs->selector.value->state, "state-abc"); + EXPECT_EQ(cs->selector.value->version, 7); +} + +TEST(FDv2ProtocolHandlerTest, FullIntentAccumulatesMultipleObjects) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-1", kFlagJson)); + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-2", kFlagJson)); + handler.HandleEvent("delete-object", MakeDeleteObject("segment", "seg-1", 5)); + + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->changes.size(), 3u); +} + +// ============================================================================ +// kTransferChanges intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, PartialIntentEmitsPartialChangeSet) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-changes")); + handler.HandleEvent("put-object", + MakePutObject("segment", "my-seg", kSegmentJson)); + + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("state-xyz", 3)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kPartial); + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-seg"); + ASSERT_TRUE(cs->selector.value.has_value()); + EXPECT_EQ(cs->selector.value->state, "state-xyz"); +} + +// ============================================================================ +// Unknown kind in put-object → silently skipped +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, UnknownKindInPutObjectIsSilentlySkipped) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("experiment", "exp-1", R"({"key":"exp-1","version":1})")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + // Only the known kind (flag) should appear. + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-flag"); +} + +// ============================================================================ +// error event → discard accumulated data, return FDv2Error +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, ErrorEventDiscardsAccumulatedDataAndReturnsError) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + + auto result = handler.HandleEvent( + "error", + boost::json::parse(R"({"reason":"something went wrong"})")); + + auto* err = std::get_if(&result); + ASSERT_NE(err, nullptr); + EXPECT_EQ(err->reason, "something went wrong"); + + // After the error the handler is reset. A subsequent full transfer should + // produce an empty changeset (no leftover data from before the error). + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result2 = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result2); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} + +// ============================================================================ +// goodbye event → return Goodbye +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, GoodbyeEventReturnsGoodbye) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent( + "goodbye", + boost::json::parse(R"({"reason":"shutting down"})")); + + auto* gb = std::get_if(&result); + ASSERT_NE(gb, nullptr); + ASSERT_TRUE(gb->reason.has_value()); + EXPECT_EQ(*gb->reason, "shutting down"); +} + +TEST(FDv2ProtocolHandlerTest, GoodbyeWithoutReasonReturnsGoodbye) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent("goodbye", boost::json::parse(R"({})")); + + auto* gb = std::get_if(&result); + ASSERT_NE(gb, nullptr); + EXPECT_FALSE(gb->reason.has_value()); +} + +// ============================================================================ +// heartbeat → no-op +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, HeartbeatReturnsMonostate) { + FDv2ProtocolHandler handler; + + auto result = + handler.HandleEvent("heartbeat", boost::json::parse(R"({})")); + EXPECT_TRUE(std::holds_alternative(result)); +} + +// ============================================================================ +// Unrecognized event type → no-op +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, UnknownEventTypeReturnsMonostate) { + FDv2ProtocolHandler handler; + + auto result = + handler.HandleEvent("future-event-type", boost::json::parse(R"({})")); + EXPECT_TRUE(std::holds_alternative(result)); +} + +// ============================================================================ +// put-object and delete-object before server-intent are ignored +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, PutBeforeServerIntentIsIgnored) { + FDv2ProtocolHandler handler; + + auto r1 = handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + EXPECT_TRUE(std::holds_alternative(r1)); + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} + +// ============================================================================ +// Reset clears accumulated state +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, ResetClearsState) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + handler.Reset(); + + // After reset, payload-transferred with no prior server-intent produces + // a full changeset with no changes. + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 62a017d41..2d855d575 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -49,6 +49,10 @@ target_sources(${LIBNAME} data_systems/background_sync/detail/payload_filter_validation/payload_filter_validation.cpp data_systems/background_sync/sources/polling/polling_data_source.hpp data_systems/background_sync/sources/polling/polling_data_source.cpp + data_systems/fdv2/polling_initializer.hpp + data_systems/fdv2/polling_initializer.cpp + data_systems/fdv2/polling_synchronizer.hpp + data_systems/fdv2/polling_synchronizer.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp data_systems/background_sync/sources/streaming/streaming_data_source.cpp data_systems/background_sync/sources/streaming/event_handler.hpp diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp new file mode 100644 index 000000000..07593a53f --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp @@ -0,0 +1,254 @@ +#include "polling_initializer.hpp" + +#include +#include +#include + +#include + +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 polling initializer"; +static char const* const kFDv2PollPath = "/sdk/poll"; + +static char const* const kErrorParsingBody = + "Could not parse FDv2 polling response"; +static char const* const kErrorMissingEvents = + "FDv2 polling response missing 'events' array"; +static char const* const kErrorIncompletePayload = + "FDv2 polling response did not contain a complete payload"; + +using ErrorInfo = data_interfaces::FDv2SourceResult::ErrorInfo; +using ErrorKind = ErrorInfo::ErrorKind; + +static ErrorInfo MakeError(ErrorKind kind, + ErrorInfo::StatusCodeType status, + std::string message) { + return ErrorInfo{kind, status, std::move(message), + std::chrono::system_clock::now()}; +} + +static network::HttpRequest MakeRequest( + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector const& selector, + std::optional const& filter_key) { + auto url = std::make_optional(endpoints.PollingBaseUrl()); + url = network::AppendUrl(url, kFDv2PollPath); + + bool has_query = false; + if (selector.value) { + url->append("?basis=" + selector.value->state); + has_query = true; + } + + if (filter_key) { + url->append(has_query ? "&filter=" : "?filter="); + url->append(*filter_key); + } + + config::builders::HttpPropertiesBuilder const builder(http_properties); + return {url.value_or(""), network::HttpMethod::kGet, builder.Build(), + network::HttpRequest::BodyType{}}; +} + +FDv2PollingInitializer::FDv2PollingInitializer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector selector, + std::optional filter_key) + : logger_(logger), + requester_(executor, http_properties.Tls()), + request_(MakeRequest(logger, endpoints, http_properties, selector, + filter_key)) {} + +data_interfaces::FDv2SourceResult FDv2PollingInitializer::Run() { + if (!request_.Valid()) { + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": invalid polling endpoint URL"; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, + "invalid polling endpoint URL"), + false}}; + } + + auto shared_result = + std::make_shared>(); + + requester_.Request(request_, [this, shared_result](network::HttpResult res) { + std::lock_guard guard(mutex_); + *shared_result = std::move(res); + cv_.notify_one(); + }); + + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return shared_result->has_value() || closed_; }); + + if (closed_) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}; + } + + auto http_result = std::move(**shared_result); + lock.unlock(); + + return HandlePollResult(http_result); +} + +void FDv2PollingInitializer::Close() { + std::lock_guard lock(mutex_); + closed_ = true; + cv_.notify_one(); +} + +std::string const& FDv2PollingInitializer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +data_interfaces::FDv2SourceResult FDv2PollingInitializer::HandlePollResult( + network::HttpResult const& res) { + if (res.IsError()) { + auto const& msg = res.ErrorMessage(); + std::string error_msg = msg.has_value() ? *msg : "unknown error"; + LD_LOG(logger_, LogLevel::kWarn) + << kIdentity << ": " << error_msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)), + false}}; + } + + if (res.Status() == 304) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + data_model::FDv2ChangeSet{ + data_model::FDv2ChangeSet::Type::kNone, {}, {}}, + false}}; + } + + if (res.Status() == 200) { + auto const& body = res.Body(); + if (!body) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, + "polling response contained no body"), + false}}; + } + + boost::system::error_code ec; + auto parsed = boost::json::parse(*body, ec); + if (ec) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* obj = parsed.if_object(); + if (!obj) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* events_val = obj->if_contains("events"); + if (!events_val) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + auto const* events_arr = events_val->if_array(); + if (!events_arr) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + for (auto const& event_val : *events_arr) { + auto const* event_obj = event_val.if_object(); + if (!event_obj) { + continue; + } + + auto const* event_type_val = event_obj->if_contains("event"); + auto const* event_data_val = event_obj->if_contains("data"); + if (!event_type_val || !event_data_val) { + continue; + } + + auto const* event_type_str = event_type_val->if_string(); + if (!event_type_str) { + continue; + } + + auto result = protocol_handler_.HandleEvent( + std::string_view{event_type_str->data(), + event_type_str->size()}, + *event_data_val); + + if (auto* changeset = + std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + std::move(*changeset), false}}; + } + if (auto* goodbye = std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Goodbye{goodbye->reason, + false}}; + } + if (auto* error = std::get_if(&result)) { + std::string msg = "Server error: " + error->reason; + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kUnknown, 0, std::move(msg)), + false}}; + } + } + + LD_LOG(logger_, LogLevel::kError) << kErrorIncompletePayload; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), + false}}; + } + + if (network::IsRecoverableStatus(res.Status())) { + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", "will retry"); + LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, res.Status(), + std::move(msg)), + false}}; + } + + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", std::nullopt); + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), + false}}; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp new file mode 100644 index 000000000..affe348e7 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_initializer.hpp" + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * FDv2 polling initializer. Makes a single HTTP GET to the FDv2 polling + * endpoint, parses the response via the FDv2 protocol state machine, and + * returns the result. Implements IFDv2Initializer (blocking, one-shot). + */ +class FDv2PollingInitializer final + : public data_interfaces::IFDv2Initializer { + public: + FDv2PollingInitializer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector selector, + std::optional filter_key); + + data_interfaces::FDv2SourceResult Run() override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + data_interfaces::FDv2SourceResult HandlePollResult( + network::HttpResult const& res); + + Logger const& logger_; + network::AsioRequester requester_; + network::HttpRequest request_; + FDv2ProtocolHandler protocol_handler_; + + std::mutex mutex_; + std::condition_variable cv_; + bool closed_ = false; // guarded by mutex_ +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp new file mode 100644 index 000000000..39f0d40b3 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp @@ -0,0 +1,295 @@ +#include "polling_synchronizer.hpp" + +#include +#include +#include + +#include +#include + +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 polling synchronizer"; +static char const* const kFDv2PollPath = "/sdk/poll"; + +static char const* const kErrorParsingBody = + "Could not parse FDv2 polling response"; +static char const* const kErrorMissingEvents = + "FDv2 polling response missing 'events' array"; +static char const* const kErrorIncompletePayload = + "FDv2 polling response did not contain a complete payload"; + +// Minimum polling interval to prevent accidentally hammering the service. +static constexpr std::chrono::seconds kMinPollInterval{30}; + +using ErrorInfo = data_interfaces::FDv2SourceResult::ErrorInfo; +using ErrorKind = ErrorInfo::ErrorKind; + +static ErrorInfo MakeError(ErrorKind kind, + ErrorInfo::StatusCodeType status, + std::string message) { + return ErrorInfo{kind, status, std::move(message), + std::chrono::system_clock::now()}; +} + +FDv2PollingSynchronizer::FDv2PollingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::seconds poll_interval) + : logger_(logger), + requester_(executor, http_properties.Tls()), + endpoints_(endpoints), + http_properties_(http_properties), + filter_key_(std::move(filter_key)), + poll_interval_(std::max(poll_interval, kMinPollInterval)), + timer_(executor) { + if (poll_interval < kMinPollInterval) { + LD_LOG(logger_, LogLevel::kWarn) + << kIdentity << ": polling interval too frequent, defaulting to " + << kMinPollInterval.count() << " seconds"; + } +} + +network::HttpRequest FDv2PollingSynchronizer::MakeRequest( + data_model::Selector const& selector) const { + auto url = std::make_optional(endpoints_.PollingBaseUrl()); + url = network::AppendUrl(url, kFDv2PollPath); + + bool has_query = false; + if (selector.value) { + url->append("?basis=" + selector.value->state); + has_query = true; + } + + if (filter_key_) { + url->append(has_query ? "&filter=" : "?filter="); + url->append(*filter_key_); + } + + config::builders::HttpPropertiesBuilder const builder(http_properties_); + return {url.value_or(""), network::HttpMethod::kGet, builder.Build(), + network::HttpRequest::BodyType{}}; +} + +void FDv2PollingSynchronizer::DoPoll(data_model::Selector selector) { + last_poll_start_ = std::chrono::steady_clock::now(); + protocol_handler_.Reset(); + + auto request = MakeRequest(selector); + requester_.Request(request, [this](network::HttpResult res) { + std::lock_guard guard(mutex_); + result_ = std::move(res); + cv_.notify_one(); + }); +} + +data_interfaces::FDv2SourceResult FDv2PollingSynchronizer::Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) { + std::unique_lock lock(mutex_); + + if (closed_) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}; + } + + result_.reset(); + + if (!started_) { + started_ = true; + // First call: poll immediately (post to avoid holding the lock). + boost::asio::post(timer_.get_executor(), + [this, selector] { DoPoll(selector); }); + } else { + // Subsequent calls: schedule next poll after the remaining interval. + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - last_poll_start_); + auto delay = std::chrono::seconds( + std::max(poll_interval_ - elapsed, std::chrono::seconds(0))); + + timer_.cancel(); + timer_.expires_after(delay); + timer_.async_wait( + [this, selector](boost::system::error_code const& ec) { + if (ec == boost::asio::error::operation_aborted) { + return; + } + DoPoll(selector); + }); + } + + auto deadline = std::chrono::steady_clock::now() + timeout; + bool timed_out = !cv_.wait_until( + lock, deadline, [this] { return result_.has_value() || closed_; }); + + if (closed_) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}; + } + + if (timed_out) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kUnknown, 0, "polling timeout"), false}}; + } + + return HandlePollResult(*result_); +} + +void FDv2PollingSynchronizer::Close() { + timer_.cancel(); + std::lock_guard lock(mutex_); + closed_ = true; + cv_.notify_one(); +} + +std::string const& FDv2PollingSynchronizer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +data_interfaces::FDv2SourceResult FDv2PollingSynchronizer::HandlePollResult( + network::HttpResult const& res) { + if (res.IsError()) { + auto const& msg = res.ErrorMessage(); + std::string error_msg = msg.has_value() ? *msg : "unknown error"; + LD_LOG(logger_, LogLevel::kWarn) + << kIdentity << ": " << error_msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)), + false}}; + } + + if (res.Status() == 304) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + data_model::FDv2ChangeSet{ + data_model::FDv2ChangeSet::Type::kNone, {}, {}}, + false}}; + } + + if (res.Status() == 200) { + auto const& body = res.Body(); + if (!body) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, + "polling response contained no body"), + false}}; + } + + boost::system::error_code ec; + auto parsed = boost::json::parse(*body, ec); + if (ec) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* obj = parsed.if_object(); + if (!obj) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* events_val = obj->if_contains("events"); + if (!events_val) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + auto const* events_arr = events_val->if_array(); + if (!events_arr) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + for (auto const& event_val : *events_arr) { + auto const* event_obj = event_val.if_object(); + if (!event_obj) { + continue; + } + + auto const* event_type_val = event_obj->if_contains("event"); + auto const* event_data_val = event_obj->if_contains("data"); + if (!event_type_val || !event_data_val) { + continue; + } + + auto const* event_type_str = event_type_val->if_string(); + if (!event_type_str) { + continue; + } + + auto result = protocol_handler_.HandleEvent( + std::string_view{event_type_str->data(), + event_type_str->size()}, + *event_data_val); + + if (auto* changeset = + std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + std::move(*changeset), false}}; + } + if (auto* goodbye = std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Goodbye{goodbye->reason, + false}}; + } + if (auto* error = std::get_if(&result)) { + std::string msg = "Server error: " + error->reason; + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kUnknown, 0, std::move(msg)), + false}}; + } + } + + LD_LOG(logger_, LogLevel::kError) << kErrorIncompletePayload; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), + false}}; + } + + if (network::IsRecoverableStatus(res.Status())) { + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", "will retry"); + LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, res.Status(), + std::move(msg)), + false}}; + } + + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", std::nullopt); + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), + false}}; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp new file mode 100644 index 000000000..38cf9a61f --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_synchronizer.hpp" + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * FDv2 polling synchronizer. Repeatedly polls the FDv2 polling endpoint at + * a configurable interval. Implements IFDv2Synchronizer (blocking). + * + * The caller passes the current selector into each Next() call, allowing the + * orchestrator to reflect applied changesets without any shared state. + */ +class FDv2PollingSynchronizer final + : public data_interfaces::IFDv2Synchronizer { + public: + FDv2PollingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::seconds poll_interval); + + data_interfaces::FDv2SourceResult Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + void DoPoll(data_model::Selector selector); + network::HttpRequest MakeRequest(data_model::Selector const& selector) const; + data_interfaces::FDv2SourceResult HandlePollResult( + network::HttpResult const& res); + + Logger const& logger_; + network::AsioRequester requester_; + config::built::ServiceEndpoints const& endpoints_; + config::built::HttpProperties const& http_properties_; + std::optional filter_key_; + std::chrono::seconds poll_interval_; + boost::asio::steady_timer timer_; + FDv2ProtocolHandler protocol_handler_; + + bool started_ = false; + std::chrono::time_point last_poll_start_; + + std::mutex mutex_; + std::condition_variable cv_; + std::optional result_; // guarded by mutex_ + bool closed_ = false; // guarded by mutex_ +}; + +} // namespace launchdarkly::server_side::data_systems From 29a94c8cc23d0ffc8b259873575190dba5f3f616 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Thu, 2 Apr 2026 11:45:21 -0700 Subject: [PATCH 11/12] update for upstream change --- libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp index 39f0d40b3..381b85efc 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp @@ -134,8 +134,7 @@ data_interfaces::FDv2SourceResult FDv2PollingSynchronizer::Next( if (timed_out) { return data_interfaces::FDv2SourceResult{ - data_interfaces::FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kUnknown, 0, "polling timeout"), false}}; + data_interfaces::FDv2SourceResult::Timeout{}}; } return HandlePollResult(*result_); From 0996ccabb8d261a6bf396b5773ecdcb60faa2cbb Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Fri, 3 Apr 2026 15:52:59 -0700 Subject: [PATCH 12/12] refactor: fix an error type that could be better --- libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp index 07593a53f..566dec99e 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp @@ -73,7 +73,7 @@ data_interfaces::FDv2SourceResult FDv2PollingInitializer::Run() { << kIdentity << ": invalid polling endpoint URL"; return data_interfaces::FDv2SourceResult{ data_interfaces::FDv2SourceResult::TerminalError{ - MakeError(ErrorKind::kNetworkError, 0, + MakeError(ErrorKind::kUnknown, 0, "invalid polling endpoint URL"), false}}; }