diff --git a/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp new file mode 100644 index 000000000..140d38dfc --- /dev/null +++ b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp @@ -0,0 +1,62 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +namespace launchdarkly { + +/** + * Protocol state machine for the FDv2 wire format. + * + * Accumulates put-object and delete-object events between a server-intent + * and payload-transferred event, then emits a complete FDv2ChangeSet. + * + * Shared between the polling and streaming synchronizers. + */ +class FDv2ProtocolHandler { + public: + /** + * Result of handling a single FDv2 event: + * - monostate: no output yet (accumulating, heartbeat, or unknown event) + * - FDv2ChangeSet: complete changeset ready to apply + * - FDv2Error: server reported an error; discard partial data + * - Goodbye: server is closing; caller should rotate sources + */ + using Result = std::variant; + + /** + * Process one FDv2 event. + * + * @param event_type The event type string (e.g. "server-intent", + * "put-object", "payload-transferred"). + * @param data The parsed JSON value for the event's data field. + * @return A Result indicating what (if anything) the caller + * should act on. + */ + Result HandleEvent(std::string_view event_type, + boost::json::value const& data); + + /** + * Reset accumulated state. Call on reconnect before processing new events. + */ + void Reset(); + + FDv2ProtocolHandler() = default; + + private: + enum class State { kInactive, kFull, kPartial }; + + State state_ = State::kInactive; + std::vector changes_; +}; + +} // namespace launchdarkly diff --git a/libs/internal/src/CMakeLists.txt b/libs/internal/src/CMakeLists.txt index 44600638b..8ff998499 100644 --- a/libs/internal/src/CMakeLists.txt +++ b/libs/internal/src/CMakeLists.txt @@ -35,6 +35,7 @@ set(INTERNAL_SOURCES serialization/value_mapping.cpp serialization/json_evaluation_result.cpp serialization/json_fdv2_events.cpp + fdv2_protocol_handler.cpp serialization/json_sdk_data_set.cpp serialization/json_segment.cpp serialization/json_primitives.cpp diff --git a/libs/internal/src/fdv2_protocol_handler.cpp b/libs/internal/src/fdv2_protocol_handler.cpp new file mode 100644 index 000000000..0e6b7380c --- /dev/null +++ b/libs/internal/src/fdv2_protocol_handler.cpp @@ -0,0 +1,220 @@ +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace launchdarkly { + +static char const* const kServerIntent = "server-intent"; +static char const* const kPutObject = "put-object"; +static char const* const kDeleteObject = "delete-object"; +static char const* const kPayloadTransferred = "payload-transferred"; +static char const* const kError = "error"; +static char const* const kGoodbye = "goodbye"; + +// Returns the parsed FDv2Change on success, nullopt for unknown kinds (which +// should be silently skipped for forward-compatibility), or an error string if +// a known kind fails to deserialize. +static tl::expected, std::string> +ParsePut(PutObject const& put) { + if (put.kind == "flag") { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + put.object); + // One bad flag aborts the entire transfer so the store is never + // left in a partially-updated state. + if (!result) { + return tl::make_unexpected("could not deserialize flag '" + + put.key + "'"); + } + if (!result->has_value()) { + return tl::make_unexpected("flag '" + put.key + "' object was null"); + } + return data_model::FDv2Change{ + put.key, + data_model::ItemDescriptor{std::move(**result)}}; + } + if (put.kind == "segment") { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + put.object); + // One bad segment aborts the entire transfer so the store is never + // left in a partially-updated state. + if (!result) { + return tl::make_unexpected("could not deserialize segment '" + + put.key + "'"); + } + if (!result->has_value()) { + return tl::make_unexpected("segment '" + put.key + + "' object was null"); + } + return data_model::FDv2Change{ + put.key, + data_model::ItemDescriptor{ + std::move(**result)}}; + } + // Silently skip unknown kinds for forward-compatibility. + return std::nullopt; +} + +static data_model::FDv2Change MakeDeleteChange(DeleteObject const& del) { + if (del.kind == "flag") { + return data_model::FDv2Change{ + del.key, + data_model::ItemDescriptor{ + data_model::Tombstone{static_cast(del.version)}}}; + } + return data_model::FDv2Change{ + del.key, + data_model::ItemDescriptor{ + data_model::Tombstone{static_cast(del.version)}}}; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleEvent( + std::string_view event_type, + boost::json::value const& data) { + if (event_type == kServerIntent) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, "could not deserialize server-intent"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "server-intent data was null"}; + } + auto const& intent = **result; + if (intent.payloads.empty()) { + return std::monostate{}; + } + auto const& code = intent.payloads[0].intent_code; + changes_.clear(); + if (code == IntentCode::kTransferFull) { + state_ = State::kFull; + } else if (code == IntentCode::kTransferChanges) { + state_ = State::kPartial; + } else { + // kNone or kUnknown: emit an empty changeset immediately. + state_ = State::kInactive; + return data_model::FDv2ChangeSet{data_model::FDv2ChangeSet::Type::kNone, + {}, + data_model::Selector{}}; + } + return std::monostate{}; + } + + if (event_type == kPutObject) { + if (state_ == State::kInactive) { + return std::monostate{}; + } + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, "could not deserialize put-object"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "put-object data was null"}; + } + auto change = ParsePut(**result); + if (!change) { + Reset(); + return FDv2Error{std::nullopt, std::move(change.error())}; + } + if (*change) { + changes_.push_back(std::move(**change)); + } + return std::monostate{}; + } + + if (event_type == kDeleteObject) { + if (state_ == State::kInactive) { + return std::monostate{}; + } + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, "could not deserialize delete-object"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "delete-object data was null"}; + } + auto const& del = **result; + // Silently skip unknown kinds for forward-compatibility. + if (del.kind != "flag" && del.kind != "segment") { + return std::monostate{}; + } + changes_.push_back(MakeDeleteChange(del)); + return std::monostate{}; + } + + if (event_type == kPayloadTransferred) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return FDv2Error{std::nullopt, + "could not deserialize payload-transferred"}; + } + if (!result->has_value()) { + Reset(); + return FDv2Error{std::nullopt, "payload-transferred data was null"}; + } + auto const& transferred = **result; + auto type = (state_ == State::kPartial) + ? data_model::FDv2ChangeSet::Type::kPartial + : data_model::FDv2ChangeSet::Type::kFull; + data_model::FDv2ChangeSet changeset{ + type, + std::move(changes_), + data_model::Selector{data_model::Selector::State{ + transferred.version, transferred.state}}}; + Reset(); + return changeset; + } + + if (event_type == kError) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + Reset(); + if (!result) { + return FDv2Error{std::nullopt, "could not deserialize error event"}; + } + if (!result->has_value()) { + return FDv2Error{std::nullopt, "error event data was null"}; + } + return **result; + } + + if (event_type == kGoodbye) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + return Goodbye{std::nullopt}; + } + if (!result->has_value()) { + return Goodbye{std::nullopt}; + } + return **result; + } + + // heartbeat and unrecognized events: no-op. + return std::monostate{}; +} + +void FDv2ProtocolHandler::Reset() { + state_ = State::kInactive; + changes_.clear(); +} + +} // namespace launchdarkly diff --git a/libs/internal/tests/fdv2_protocol_handler_test.cpp b/libs/internal/tests/fdv2_protocol_handler_test.cpp new file mode 100644 index 000000000..b6b12087e --- /dev/null +++ b/libs/internal/tests/fdv2_protocol_handler_test.cpp @@ -0,0 +1,277 @@ +#include + +#include + +#include + +using namespace launchdarkly; + +// Minimal valid flag JSON accepted by the existing Flag deserializer. +static char const* const kFlagJson = + R"({"key":"my-flag","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1})"; + +// Minimal valid segment JSON accepted by the existing Segment deserializer. +static char const* const kSegmentJson = + R"({"key":"my-seg","version":2,"rules":[],"included":[],"excluded":[]})"; + +// Build a server-intent event data value. +static boost::json::value MakeServerIntent(std::string const& intent_code) { + return boost::json::parse( + R"({"payloads":[{"id":"p1","target":1,"intentCode":")" + intent_code + + R"("}]})"); +} + +static boost::json::value MakePutObject(std::string const& kind, + std::string const& key, + std::string const& object_json) { + return boost::json::parse(R"({"version":1,"kind":")" + kind + + R"(","key":")" + key + + R"(","object":)" + object_json + "}"); +} + +static boost::json::value MakeDeleteObject(std::string const& kind, + std::string const& key, + int version) { + return boost::json::parse(R"({"version":)" + std::to_string(version) + + R"(,"kind":")" + kind + R"(","key":")" + key + + R"("})"); +} + +static boost::json::value MakePayloadTransferred(std::string const& state, + int version) { + return boost::json::parse(R"({"state":")" + state + R"(","version":)" + + std::to_string(version) + "}"); +} + +// ============================================================================ +// kNone intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, NoneIntentEmitsEmptyChangeSetImmediately) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent("server-intent", MakeServerIntent("none")); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kNone); + EXPECT_TRUE(cs->changes.empty()); + EXPECT_FALSE(cs->selector.value.has_value()); +} + +// ============================================================================ +// kTransferFull intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, FullIntentEmitsChangeSetOnPayloadTransferred) { + FDv2ProtocolHandler handler; + + auto r1 = handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + EXPECT_TRUE(std::holds_alternative(r1)); + + auto r2 = handler.HandleEvent( + "put-object", MakePutObject("flag", "my-flag", kFlagJson)); + EXPECT_TRUE(std::holds_alternative(r2)); + + auto r3 = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("state-abc", 7)); + + auto* cs = std::get_if(&r3); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-flag"); + ASSERT_TRUE(cs->selector.value.has_value()); + EXPECT_EQ(cs->selector.value->state, "state-abc"); + EXPECT_EQ(cs->selector.value->version, 7); +} + +TEST(FDv2ProtocolHandlerTest, FullIntentAccumulatesMultipleObjects) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-1", kFlagJson)); + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-2", kFlagJson)); + handler.HandleEvent("delete-object", MakeDeleteObject("segment", "seg-1", 5)); + + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->changes.size(), 3u); +} + +// ============================================================================ +// kTransferChanges intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, PartialIntentEmitsPartialChangeSet) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-changes")); + handler.HandleEvent("put-object", + MakePutObject("segment", "my-seg", kSegmentJson)); + + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("state-xyz", 3)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kPartial); + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-seg"); + ASSERT_TRUE(cs->selector.value.has_value()); + EXPECT_EQ(cs->selector.value->state, "state-xyz"); +} + +// ============================================================================ +// Unknown kind in put-object → silently skipped +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, UnknownKindInPutObjectIsSilentlySkipped) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("experiment", "exp-1", R"({"key":"exp-1","version":1})")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + // Only the known kind (flag) should appear. + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-flag"); +} + +// ============================================================================ +// error event → discard accumulated data, return FDv2Error +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, ErrorEventDiscardsAccumulatedDataAndReturnsError) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + + auto result = handler.HandleEvent( + "error", + boost::json::parse(R"({"reason":"something went wrong"})")); + + auto* err = std::get_if(&result); + ASSERT_NE(err, nullptr); + EXPECT_EQ(err->reason, "something went wrong"); + + // After the error the handler is reset. A subsequent full transfer should + // produce an empty changeset (no leftover data from before the error). + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result2 = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result2); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} + +// ============================================================================ +// goodbye event → return Goodbye +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, GoodbyeEventReturnsGoodbye) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent( + "goodbye", + boost::json::parse(R"({"reason":"shutting down"})")); + + auto* gb = std::get_if(&result); + ASSERT_NE(gb, nullptr); + ASSERT_TRUE(gb->reason.has_value()); + EXPECT_EQ(*gb->reason, "shutting down"); +} + +TEST(FDv2ProtocolHandlerTest, GoodbyeWithoutReasonReturnsGoodbye) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent("goodbye", boost::json::parse(R"({})")); + + auto* gb = std::get_if(&result); + ASSERT_NE(gb, nullptr); + EXPECT_FALSE(gb->reason.has_value()); +} + +// ============================================================================ +// heartbeat → no-op +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, HeartbeatReturnsMonostate) { + FDv2ProtocolHandler handler; + + auto result = + handler.HandleEvent("heartbeat", boost::json::parse(R"({})")); + EXPECT_TRUE(std::holds_alternative(result)); +} + +// ============================================================================ +// Unrecognized event type → no-op +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, UnknownEventTypeReturnsMonostate) { + FDv2ProtocolHandler handler; + + auto result = + handler.HandleEvent("future-event-type", boost::json::parse(R"({})")); + EXPECT_TRUE(std::holds_alternative(result)); +} + +// ============================================================================ +// put-object and delete-object before server-intent are ignored +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, PutBeforeServerIntentIsIgnored) { + FDv2ProtocolHandler handler; + + auto r1 = handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + EXPECT_TRUE(std::holds_alternative(r1)); + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} + +// ============================================================================ +// Reset clears accumulated state +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, ResetClearsState) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + handler.Reset(); + + // After reset, payload-transferred with no prior server-intent produces + // a full changeset with no changes. + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result = handler.HandleEvent( + "payload-transferred", MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 62a017d41..2d855d575 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -49,6 +49,10 @@ target_sources(${LIBNAME} data_systems/background_sync/detail/payload_filter_validation/payload_filter_validation.cpp data_systems/background_sync/sources/polling/polling_data_source.hpp data_systems/background_sync/sources/polling/polling_data_source.cpp + data_systems/fdv2/polling_initializer.hpp + data_systems/fdv2/polling_initializer.cpp + data_systems/fdv2/polling_synchronizer.hpp + data_systems/fdv2/polling_synchronizer.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp data_systems/background_sync/sources/streaming/streaming_data_source.cpp data_systems/background_sync/sources/streaming/event_handler.hpp diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp new file mode 100644 index 000000000..566dec99e --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp @@ -0,0 +1,254 @@ +#include "polling_initializer.hpp" + +#include +#include +#include + +#include + +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 polling initializer"; +static char const* const kFDv2PollPath = "/sdk/poll"; + +static char const* const kErrorParsingBody = + "Could not parse FDv2 polling response"; +static char const* const kErrorMissingEvents = + "FDv2 polling response missing 'events' array"; +static char const* const kErrorIncompletePayload = + "FDv2 polling response did not contain a complete payload"; + +using ErrorInfo = data_interfaces::FDv2SourceResult::ErrorInfo; +using ErrorKind = ErrorInfo::ErrorKind; + +static ErrorInfo MakeError(ErrorKind kind, + ErrorInfo::StatusCodeType status, + std::string message) { + return ErrorInfo{kind, status, std::move(message), + std::chrono::system_clock::now()}; +} + +static network::HttpRequest MakeRequest( + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector const& selector, + std::optional const& filter_key) { + auto url = std::make_optional(endpoints.PollingBaseUrl()); + url = network::AppendUrl(url, kFDv2PollPath); + + bool has_query = false; + if (selector.value) { + url->append("?basis=" + selector.value->state); + has_query = true; + } + + if (filter_key) { + url->append(has_query ? "&filter=" : "?filter="); + url->append(*filter_key); + } + + config::builders::HttpPropertiesBuilder const builder(http_properties); + return {url.value_or(""), network::HttpMethod::kGet, builder.Build(), + network::HttpRequest::BodyType{}}; +} + +FDv2PollingInitializer::FDv2PollingInitializer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector selector, + std::optional filter_key) + : logger_(logger), + requester_(executor, http_properties.Tls()), + request_(MakeRequest(logger, endpoints, http_properties, selector, + filter_key)) {} + +data_interfaces::FDv2SourceResult FDv2PollingInitializer::Run() { + if (!request_.Valid()) { + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": invalid polling endpoint URL"; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kUnknown, 0, + "invalid polling endpoint URL"), + false}}; + } + + auto shared_result = + std::make_shared>(); + + requester_.Request(request_, [this, shared_result](network::HttpResult res) { + std::lock_guard guard(mutex_); + *shared_result = std::move(res); + cv_.notify_one(); + }); + + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return shared_result->has_value() || closed_; }); + + if (closed_) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}; + } + + auto http_result = std::move(**shared_result); + lock.unlock(); + + return HandlePollResult(http_result); +} + +void FDv2PollingInitializer::Close() { + std::lock_guard lock(mutex_); + closed_ = true; + cv_.notify_one(); +} + +std::string const& FDv2PollingInitializer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +data_interfaces::FDv2SourceResult FDv2PollingInitializer::HandlePollResult( + network::HttpResult const& res) { + if (res.IsError()) { + auto const& msg = res.ErrorMessage(); + std::string error_msg = msg.has_value() ? *msg : "unknown error"; + LD_LOG(logger_, LogLevel::kWarn) + << kIdentity << ": " << error_msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)), + false}}; + } + + if (res.Status() == 304) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + data_model::FDv2ChangeSet{ + data_model::FDv2ChangeSet::Type::kNone, {}, {}}, + false}}; + } + + if (res.Status() == 200) { + auto const& body = res.Body(); + if (!body) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, + "polling response contained no body"), + false}}; + } + + boost::system::error_code ec; + auto parsed = boost::json::parse(*body, ec); + if (ec) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* obj = parsed.if_object(); + if (!obj) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* events_val = obj->if_contains("events"); + if (!events_val) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + auto const* events_arr = events_val->if_array(); + if (!events_arr) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + for (auto const& event_val : *events_arr) { + auto const* event_obj = event_val.if_object(); + if (!event_obj) { + continue; + } + + auto const* event_type_val = event_obj->if_contains("event"); + auto const* event_data_val = event_obj->if_contains("data"); + if (!event_type_val || !event_data_val) { + continue; + } + + auto const* event_type_str = event_type_val->if_string(); + if (!event_type_str) { + continue; + } + + auto result = protocol_handler_.HandleEvent( + std::string_view{event_type_str->data(), + event_type_str->size()}, + *event_data_val); + + if (auto* changeset = + std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + std::move(*changeset), false}}; + } + if (auto* goodbye = std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Goodbye{goodbye->reason, + false}}; + } + if (auto* error = std::get_if(&result)) { + std::string msg = "Server error: " + error->reason; + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kUnknown, 0, std::move(msg)), + false}}; + } + } + + LD_LOG(logger_, LogLevel::kError) << kErrorIncompletePayload; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), + false}}; + } + + if (network::IsRecoverableStatus(res.Status())) { + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", "will retry"); + LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, res.Status(), + std::move(msg)), + false}}; + } + + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", std::nullopt); + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), + false}}; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp new file mode 100644 index 000000000..affe348e7 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_initializer.hpp" + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * FDv2 polling initializer. Makes a single HTTP GET to the FDv2 polling + * endpoint, parses the response via the FDv2 protocol state machine, and + * returns the result. Implements IFDv2Initializer (blocking, one-shot). + */ +class FDv2PollingInitializer final + : public data_interfaces::IFDv2Initializer { + public: + FDv2PollingInitializer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector selector, + std::optional filter_key); + + data_interfaces::FDv2SourceResult Run() override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + data_interfaces::FDv2SourceResult HandlePollResult( + network::HttpResult const& res); + + Logger const& logger_; + network::AsioRequester requester_; + network::HttpRequest request_; + FDv2ProtocolHandler protocol_handler_; + + std::mutex mutex_; + std::condition_variable cv_; + bool closed_ = false; // guarded by mutex_ +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp new file mode 100644 index 000000000..381b85efc --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp @@ -0,0 +1,294 @@ +#include "polling_synchronizer.hpp" + +#include +#include +#include + +#include +#include + +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 polling synchronizer"; +static char const* const kFDv2PollPath = "/sdk/poll"; + +static char const* const kErrorParsingBody = + "Could not parse FDv2 polling response"; +static char const* const kErrorMissingEvents = + "FDv2 polling response missing 'events' array"; +static char const* const kErrorIncompletePayload = + "FDv2 polling response did not contain a complete payload"; + +// Minimum polling interval to prevent accidentally hammering the service. +static constexpr std::chrono::seconds kMinPollInterval{30}; + +using ErrorInfo = data_interfaces::FDv2SourceResult::ErrorInfo; +using ErrorKind = ErrorInfo::ErrorKind; + +static ErrorInfo MakeError(ErrorKind kind, + ErrorInfo::StatusCodeType status, + std::string message) { + return ErrorInfo{kind, status, std::move(message), + std::chrono::system_clock::now()}; +} + +FDv2PollingSynchronizer::FDv2PollingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::seconds poll_interval) + : logger_(logger), + requester_(executor, http_properties.Tls()), + endpoints_(endpoints), + http_properties_(http_properties), + filter_key_(std::move(filter_key)), + poll_interval_(std::max(poll_interval, kMinPollInterval)), + timer_(executor) { + if (poll_interval < kMinPollInterval) { + LD_LOG(logger_, LogLevel::kWarn) + << kIdentity << ": polling interval too frequent, defaulting to " + << kMinPollInterval.count() << " seconds"; + } +} + +network::HttpRequest FDv2PollingSynchronizer::MakeRequest( + data_model::Selector const& selector) const { + auto url = std::make_optional(endpoints_.PollingBaseUrl()); + url = network::AppendUrl(url, kFDv2PollPath); + + bool has_query = false; + if (selector.value) { + url->append("?basis=" + selector.value->state); + has_query = true; + } + + if (filter_key_) { + url->append(has_query ? "&filter=" : "?filter="); + url->append(*filter_key_); + } + + config::builders::HttpPropertiesBuilder const builder(http_properties_); + return {url.value_or(""), network::HttpMethod::kGet, builder.Build(), + network::HttpRequest::BodyType{}}; +} + +void FDv2PollingSynchronizer::DoPoll(data_model::Selector selector) { + last_poll_start_ = std::chrono::steady_clock::now(); + protocol_handler_.Reset(); + + auto request = MakeRequest(selector); + requester_.Request(request, [this](network::HttpResult res) { + std::lock_guard guard(mutex_); + result_ = std::move(res); + cv_.notify_one(); + }); +} + +data_interfaces::FDv2SourceResult FDv2PollingSynchronizer::Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) { + std::unique_lock lock(mutex_); + + if (closed_) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}; + } + + result_.reset(); + + if (!started_) { + started_ = true; + // First call: poll immediately (post to avoid holding the lock). + boost::asio::post(timer_.get_executor(), + [this, selector] { DoPoll(selector); }); + } else { + // Subsequent calls: schedule next poll after the remaining interval. + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - last_poll_start_); + auto delay = std::chrono::seconds( + std::max(poll_interval_ - elapsed, std::chrono::seconds(0))); + + timer_.cancel(); + timer_.expires_after(delay); + timer_.async_wait( + [this, selector](boost::system::error_code const& ec) { + if (ec == boost::asio::error::operation_aborted) { + return; + } + DoPoll(selector); + }); + } + + auto deadline = std::chrono::steady_clock::now() + timeout; + bool timed_out = !cv_.wait_until( + lock, deadline, [this] { return result_.has_value() || closed_; }); + + if (closed_) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}; + } + + if (timed_out) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Timeout{}}; + } + + return HandlePollResult(*result_); +} + +void FDv2PollingSynchronizer::Close() { + timer_.cancel(); + std::lock_guard lock(mutex_); + closed_ = true; + cv_.notify_one(); +} + +std::string const& FDv2PollingSynchronizer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +data_interfaces::FDv2SourceResult FDv2PollingSynchronizer::HandlePollResult( + network::HttpResult const& res) { + if (res.IsError()) { + auto const& msg = res.ErrorMessage(); + std::string error_msg = msg.has_value() ? *msg : "unknown error"; + LD_LOG(logger_, LogLevel::kWarn) + << kIdentity << ": " << error_msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)), + false}}; + } + + if (res.Status() == 304) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + data_model::FDv2ChangeSet{ + data_model::FDv2ChangeSet::Type::kNone, {}, {}}, + false}}; + } + + if (res.Status() == 200) { + auto const& body = res.Body(); + if (!body) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, + "polling response contained no body"), + false}}; + } + + boost::system::error_code ec; + auto parsed = boost::json::parse(*body, ec); + if (ec) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* obj = parsed.if_object(); + if (!obj) { + LD_LOG(logger_, LogLevel::kError) << kErrorParsingBody; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), + false}}; + } + + auto const* events_val = obj->if_contains("events"); + if (!events_val) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + auto const* events_arr = events_val->if_array(); + if (!events_arr) { + LD_LOG(logger_, LogLevel::kError) << kErrorMissingEvents; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), + false}}; + } + + for (auto const& event_val : *events_arr) { + auto const* event_obj = event_val.if_object(); + if (!event_obj) { + continue; + } + + auto const* event_type_val = event_obj->if_contains("event"); + auto const* event_data_val = event_obj->if_contains("data"); + if (!event_type_val || !event_data_val) { + continue; + } + + auto const* event_type_str = event_type_val->if_string(); + if (!event_type_str) { + continue; + } + + auto result = protocol_handler_.HandleEvent( + std::string_view{event_type_str->data(), + event_type_str->size()}, + *event_data_val); + + if (auto* changeset = + std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::ChangeSet{ + std::move(*changeset), false}}; + } + if (auto* goodbye = std::get_if(&result)) { + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Goodbye{goodbye->reason, + false}}; + } + if (auto* error = std::get_if(&result)) { + std::string msg = "Server error: " + error->reason; + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kUnknown, 0, std::move(msg)), + false}}; + } + } + + LD_LOG(logger_, LogLevel::kError) << kErrorIncompletePayload; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), + false}}; + } + + if (network::IsRecoverableStatus(res.Status())) { + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", "will retry"); + LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, res.Status(), + std::move(msg)), + false}}; + } + + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", std::nullopt); + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + return data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), + false}}; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp new file mode 100644 index 000000000..38cf9a61f --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_synchronizer.hpp" + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * FDv2 polling synchronizer. Repeatedly polls the FDv2 polling endpoint at + * a configurable interval. Implements IFDv2Synchronizer (blocking). + * + * The caller passes the current selector into each Next() call, allowing the + * orchestrator to reflect applied changesets without any shared state. + */ +class FDv2PollingSynchronizer final + : public data_interfaces::IFDv2Synchronizer { + public: + FDv2PollingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::seconds poll_interval); + + data_interfaces::FDv2SourceResult Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + void DoPoll(data_model::Selector selector); + network::HttpRequest MakeRequest(data_model::Selector const& selector) const; + data_interfaces::FDv2SourceResult HandlePollResult( + network::HttpResult const& res); + + Logger const& logger_; + network::AsioRequester requester_; + config::built::ServiceEndpoints const& endpoints_; + config::built::HttpProperties const& http_properties_; + std::optional filter_key_; + std::chrono::seconds poll_interval_; + boost::asio::steady_timer timer_; + FDv2ProtocolHandler protocol_handler_; + + bool started_ = false; + std::chrono::time_point last_poll_start_; + + std::mutex mutex_; + std::condition_variable cv_; + std::optional result_; // guarded by mutex_ + bool closed_ = false; // guarded by mutex_ +}; + +} // namespace launchdarkly::server_side::data_systems