diff --git a/providers/flagd/src/BUILD b/providers/flagd/src/BUILD index 2ccd666..2050369 100644 --- a/providers/flagd/src/BUILD +++ b/providers/flagd/src/BUILD @@ -26,6 +26,7 @@ cc_library( deps = [ "@abseil-cpp//absl/strings", "@com_github_grpc_grpc//:grpc++", + "@nlohmann_json//:json", "@openfeature_cpp_sdk//openfeature", ], ) diff --git a/providers/flagd/src/configuration.cpp b/providers/flagd/src/configuration.cpp index 8192dee..f00b21f 100644 --- a/providers/flagd/src/configuration.cpp +++ b/providers/flagd/src/configuration.cpp @@ -1,20 +1,27 @@ #include "configuration.h" +#include #include #include #include #include +#include #include #include #include +#include +#include "absl/log/log.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" +#include "nlohmann/json.hpp" namespace flagd { namespace { +constexpr double kMsInSecond = 1000.0; + struct EnvVars { static constexpr std::string_view kHost = "FLAGD_HOST"; static constexpr std::string_view kPort = "FLAGD_PORT"; @@ -23,6 +30,17 @@ struct EnvVars { static constexpr std::string_view kSocketPath = "FLAGD_SOCKET_PATH"; static constexpr std::string_view kServerCertPath = "FLAGD_SERVER_CERT_PATH"; static constexpr std::string_view kDeadlineMs = "FLAGD_DEADLINE_MS"; + static constexpr std::string_view kStreamDeadlineMs = + "FLAGD_STREAM_DEADLINE_MS"; + static constexpr std::string_view kRetryBackoffMs = "FLAGD_RETRY_BACKOFF_MS"; + static constexpr std::string_view kRetryBackoffMaxMs = + "FLAGD_RETRY_BACKOFF_MAX_MS"; + static constexpr std::string_view kRetryGracePeriod = + "FLAGD_RETRY_GRACE_PERIOD"; + static constexpr std::string_view kKeepAliveTimeMs = + "FLAGD_KEEP_ALIVE_TIME_MS"; + static constexpr std::string_view kFatalStatusCodes = + "FLAGD_FATAL_STATUS_CODES"; static constexpr std::string_view kSourceSelector = "FLAGD_SOURCE_SELECTOR"; static constexpr std::string_view kProviderId = "FLAGD_PROVIDER_ID"; static constexpr std::string_view kOfflineFlagSourcePath = @@ -30,12 +48,31 @@ struct EnvVars { static constexpr std::string_view kOfflinePollMs = "FLAGD_OFFLINE_POLL_MS"; }; -struct Defaults { - static constexpr std::string_view kHost = "localhost"; - static constexpr int kPortInProcess = 8015; - static constexpr bool kTls = false; - static constexpr int kDeadlineMs = 500; - static constexpr int kOfflinePollMs = 5000; +struct Validation { + static constexpr int kMinPort = 1; + static constexpr int kMaxPort = 65535; + static constexpr int kMinStatusCode = 0; + static constexpr int kMaxStatusCode = 16; +}; + +const std::map kStatusCodeMap = { + {"OK", 0}, + {"CANCELLED", 1}, + {"UNKNOWN", 2}, + {"INVALID_ARGUMENT", 3}, + {"DEADLINE_EXCEEDED", 4}, + {"NOT_FOUND", 5}, + {"ALREADY_EXISTS", 6}, + {"PERMISSION_DENIED", 7}, + {"UNAUTHENTICATED", 16}, + {"RESOURCE_EXHAUSTED", 8}, + {"FAILED_PRECONDITION", 9}, + {"ABORTED", 10}, + {"OUT_OF_RANGE", 11}, + {"UNIMPLEMENTED", 12}, + {"INTERNAL", 13}, + {"UNAVAILABLE", 14}, + {"DATA_LOSS", 15}, }; } // namespace @@ -69,31 +106,88 @@ static bool GetEnvBool(const std::string_view name, bool default_value) { return str == "true" || str == "1"; } -FlagdProviderConfig::FlagdProviderConfig() - : host_(GetEnvStr(EnvVars::kHost, Defaults::kHost)), - port_(GetEnvInt(EnvVars::kPort, Defaults::kPortInProcess)), - tls_(GetEnvBool(EnvVars::kTls, Defaults::kTls)), - deadline_ms_(GetEnvInt(EnvVars::kDeadlineMs, Defaults::kDeadlineMs)), - offline_poll_interval_ms_( - GetEnvInt(EnvVars::kOfflinePollMs, Defaults::kOfflinePollMs)) { +static bool IsValidPort(int port) { + return port >= Validation::kMinPort && port <= Validation::kMaxPort; +} + +static bool IsValidStatusCode(int code) { + return code >= Validation::kMinStatusCode && + code <= Validation::kMaxStatusCode; +} + +static std::vector ParseFatalStatusCodes(const std::string& str) { + std::vector result; + std::stringstream sstream(str); + std::string item; + while (std::getline(sstream, item, ',')) { + item.erase(0, item.find_first_not_of(" \t\n\r")); + item.erase(item.find_last_not_of(" \t\n\r") + 1); + + if (item.empty()) continue; + + try { + int code = std::stoi(item); + if (IsValidStatusCode(code)) { + result.push_back(code); + } else { + LOG(WARNING) << "Invalid gRPC status code: " << code; + } + continue; + } catch (const std::invalid_argument&) { + // Not an integer, try parsing as a status code string. + } catch (const std::out_of_range&) { + // Not a valid integer, try parsing as a status code string. + } + + auto iter = kStatusCodeMap.find(item); + if (iter != kStatusCodeMap.end()) { + result.push_back(iter->second); + } else { + LOG(WARNING) << "Unknown gRPC status code: " << item; + } + } + return result; +} + +FlagdProviderConfig::FlagdProviderConfig() { + SetHost(GetEnvStr(EnvVars::kHost, Defaults::kHost)); + SetPort(GetEnvInt(EnvVars::kPort, Defaults::kPortInProcess)); + SetTls(GetEnvBool(EnvVars::kTls, Defaults::kTls)); + SetDeadlineMs(GetEnvInt(EnvVars::kDeadlineMs, Defaults::kDeadlineMs)); + SetStreamDeadlineMs( + GetEnvInt(EnvVars::kStreamDeadlineMs, Defaults::kStreamDeadlineMs)); + SetRetryBackoffMs( + GetEnvInt(EnvVars::kRetryBackoffMs, Defaults::kRetryBackoffMs)); + SetRetryBackoffMaxMs( + GetEnvInt(EnvVars::kRetryBackoffMaxMs, Defaults::kRetryBackoffMaxMs)); + SetRetryGracePeriod( + GetEnvInt(EnvVars::kRetryGracePeriod, Defaults::kRetryGracePeriod)); + SetKeepAliveTimeMs( + GetEnvInt(EnvVars::kKeepAliveTimeMs, Defaults::kKeepAliveTimeMs)); + SetOfflinePollIntervalMs( + GetEnvInt(EnvVars::kOfflinePollMs, Defaults::kOfflinePollMs)); + if (std::string val = GetEnvStr(EnvVars::kTargetUri); !val.empty()) { - target_uri_ = val; + SetTargetUri(val); } if (std::string val = GetEnvStr(EnvVars::kSocketPath); !val.empty()) { - socket_path_ = val; + SetSocketPath(val); } if (std::string val = GetEnvStr(EnvVars::kServerCertPath); !val.empty()) { - cert_path_ = val; + SetCertPath(val); } if (std::string val = GetEnvStr(EnvVars::kSourceSelector); !val.empty()) { - selector_ = val; + SetSelector(val); } if (std::string val = GetEnvStr(EnvVars::kProviderId); !val.empty()) { - provider_id_ = val; + SetProviderId(val); } if (std::string val = GetEnvStr(EnvVars::kOfflineFlagSourcePath); !val.empty()) { - offline_flag_source_path_ = val; + SetOfflineFlagSourcePath(val); + } + if (std::string val = GetEnvStr(EnvVars::kFatalStatusCodes); !val.empty()) { + SetFatalStatusCodes(val); } } @@ -150,6 +244,50 @@ std::optional FlagdProviderConfig::GetCertPath() const { return cert_path_; } int FlagdProviderConfig::GetDeadlineMs() const { return deadline_ms_; } +int FlagdProviderConfig::GetStreamDeadlineMs() const { + return stream_deadline_ms_; +} +int FlagdProviderConfig::GetRetryBackoffMs() const { return retry_backoff_ms_; } +int FlagdProviderConfig::GetRetryBackoffMaxMs() const { + return retry_backoff_max_ms_; +} +int FlagdProviderConfig::GetRetryGracePeriod() const { + return retry_grace_period_; +} +int FlagdProviderConfig::GetKeepAliveTimeMs() const { + return keep_alive_time_ms_; +} +const std::vector& FlagdProviderConfig::GetFatalStatusCodes() const { + return fatal_status_codes_; +} + +std::string FlagdProviderConfig::GetServiceConfigJson() const { + const auto names = nlohmann::json::array({ + nlohmann::json::object({{"service", "flagd.evaluation.v1.Service"}}), + nlohmann::json::object({{"service", "flagd.sync.v1.FlagSyncService"}}), + }); + + const auto retry_policy = nlohmann::json::object({ + {"maxAttempts", 4}, + {"initialBackoff", absl::StrCat(retry_backoff_ms_ / kMsInSecond, "s")}, + {"maxBackoff", absl::StrCat(retry_backoff_max_ms_ / kMsInSecond, "s")}, + {"backoffMultiplier", 2}, + { + "retryableStatusCodes", + nlohmann::json::array({"UNAVAILABLE", "UNKNOWN"}), + }, + }); + + const auto method_config = nlohmann::json::object({ + {"name", names}, + {"retryPolicy", retry_policy}, + }); + + return nlohmann::json::object( + {{"methodConfig", nlohmann::json::array({method_config})}}) + .dump(); +} + std::optional FlagdProviderConfig::GetSelector() const { return selector_; } @@ -170,6 +308,10 @@ FlagdProviderConfig& FlagdProviderConfig::SetHost(std::string_view host) { return *this; } FlagdProviderConfig& FlagdProviderConfig::SetPort(int port) { + if (!IsValidPort(port)) { + LOG(WARNING) << "Invalid port: " << port << ". Ignoring."; + return *this; + } port_ = port; return *this; } @@ -195,9 +337,81 @@ FlagdProviderConfig& FlagdProviderConfig::SetCertPath(std::string_view path) { return *this; } FlagdProviderConfig& FlagdProviderConfig::SetDeadlineMs(int deadline_ms) { + if (deadline_ms <= 0) { + LOG(WARNING) << "Invalid deadline_ms: " << deadline_ms << ". Ignoring."; + return *this; + } deadline_ms_ = deadline_ms; return *this; } +FlagdProviderConfig& FlagdProviderConfig::SetStreamDeadlineMs( + int stream_deadline_ms) { + if (stream_deadline_ms <= 0) { + LOG(WARNING) << "Invalid stream_deadline_ms: " << stream_deadline_ms + << ". Ignoring."; + return *this; + } + stream_deadline_ms_ = stream_deadline_ms; + return *this; +} +FlagdProviderConfig& FlagdProviderConfig::SetRetryBackoffMs( + int retry_backoff_ms) { + if (retry_backoff_ms <= 0) { + LOG(WARNING) << "Invalid retry_backoff_ms: " << retry_backoff_ms + << ". Ignoring."; + return *this; + } + retry_backoff_ms_ = retry_backoff_ms; + return *this; +} +FlagdProviderConfig& FlagdProviderConfig::SetRetryBackoffMaxMs( + int retry_backoff_max_ms) { + if (retry_backoff_max_ms <= 0) { + LOG(WARNING) << "Invalid retry_backoff_max_ms: " << retry_backoff_max_ms + << ". Ignoring."; + return *this; + } + retry_backoff_max_ms_ = retry_backoff_max_ms; + return *this; +} +FlagdProviderConfig& FlagdProviderConfig::SetRetryGracePeriod( + int retry_grace_period) { + if (retry_grace_period < 0) { + LOG(WARNING) << "Invalid retry_grace_period: " << retry_grace_period + << ". Ignoring."; + return *this; + } + retry_grace_period_ = retry_grace_period; + return *this; +} +FlagdProviderConfig& FlagdProviderConfig::SetKeepAliveTimeMs( + int keep_alive_time_ms) { + if (keep_alive_time_ms < 0) { + LOG(WARNING) << "Invalid keep_alive_time_ms: " << keep_alive_time_ms + << ". Ignoring."; + return *this; + } + keep_alive_time_ms_ = keep_alive_time_ms; + return *this; +} +FlagdProviderConfig& FlagdProviderConfig::SetFatalStatusCodes( + const std::vector& fatal_status_codes) { + std::vector valid_codes; + for (int code : fatal_status_codes) { + if (IsValidStatusCode(code)) { + valid_codes.push_back(code); + } else { + LOG(WARNING) << "Invalid gRPC status code: " << code << ". Ignoring."; + } + } + fatal_status_codes_ = std::move(valid_codes); + return *this; +} +FlagdProviderConfig& FlagdProviderConfig::SetFatalStatusCodes( + const std::string& fatal_status_codes_str) { + fatal_status_codes_ = ParseFatalStatusCodes(fatal_status_codes_str); + return *this; +} FlagdProviderConfig& FlagdProviderConfig::SetSelector( std::string_view selector) { selector_ = std::string(selector); @@ -215,6 +429,11 @@ FlagdProviderConfig& FlagdProviderConfig::SetOfflineFlagSourcePath( } FlagdProviderConfig& FlagdProviderConfig::SetOfflinePollIntervalMs( int interval_ms) { + if (interval_ms <= 0) { + LOG(WARNING) << "Invalid offline_poll_interval_ms: " << interval_ms + << ". Ignoring."; + return *this; + } offline_poll_interval_ms_ = interval_ms; return *this; } diff --git a/providers/flagd/src/configuration.h b/providers/flagd/src/configuration.h index d15435b..ef35549 100644 --- a/providers/flagd/src/configuration.h +++ b/providers/flagd/src/configuration.h @@ -5,12 +5,26 @@ #include #include #include +#include #include "absl/status/statusor.h" #include "grpcpp/security/credentials.h" namespace flagd { +struct Defaults { + static constexpr std::string_view kHost = "localhost"; + static constexpr int kPortInProcess = 8015; + static constexpr bool kTls = false; + static constexpr int kDeadlineMs = 500; + static constexpr int kStreamDeadlineMs = 600000; + static constexpr int kRetryBackoffMs = 1000; + static constexpr int kRetryBackoffMaxMs = 12000; + static constexpr int kRetryGracePeriod = 5; + static constexpr int kKeepAliveTimeMs = 0; + static constexpr int kOfflinePollMs = 5000; +}; + class FlagdProviderConfig { public: // --- Constructor --- @@ -27,6 +41,14 @@ class FlagdProviderConfig { std::optional GetCertPath() const; int GetDeadlineMs() const; + int GetStreamDeadlineMs() const; + int GetRetryBackoffMs() const; + int GetRetryBackoffMaxMs() const; + int GetRetryGracePeriod() const; + int GetKeepAliveTimeMs() const; + const std::vector& GetFatalStatusCodes() const; + + std::string GetServiceConfigJson() const; std::optional GetSelector() const; std::optional GetProviderId() const; @@ -54,27 +76,43 @@ class FlagdProviderConfig { FlagdProviderConfig& SetSocketPath(std::string_view path); FlagdProviderConfig& SetCertPath(std::string_view path); FlagdProviderConfig& SetDeadlineMs(int deadline_ms); + FlagdProviderConfig& SetStreamDeadlineMs(int stream_deadline_ms); + FlagdProviderConfig& SetRetryBackoffMs(int retry_backoff_ms); + FlagdProviderConfig& SetRetryBackoffMaxMs(int retry_backoff_max_ms); + FlagdProviderConfig& SetRetryGracePeriod(int retry_grace_period); + FlagdProviderConfig& SetKeepAliveTimeMs(int keep_alive_time_ms); + FlagdProviderConfig& SetFatalStatusCodes( + const std::vector& fatal_status_codes); + FlagdProviderConfig& SetFatalStatusCodes( + const std::string& fatal_status_codes_str); + FlagdProviderConfig& SetSelector(std::string_view selector); FlagdProviderConfig& SetProviderId(std::string_view provider_id); FlagdProviderConfig& SetOfflineFlagSourcePath(std::string_view path); FlagdProviderConfig& SetOfflinePollIntervalMs(int interval_ms); private: - std::string host_; - int port_; + std::string host_ = std::string(Defaults::kHost); + int port_ = Defaults::kPortInProcess; std::optional target_uri_; - bool tls_; + bool tls_ = Defaults::kTls; std::shared_ptr channel_credentials_; std::optional socket_path_; std::optional cert_path_; - int deadline_ms_; + int deadline_ms_ = Defaults::kDeadlineMs; + int stream_deadline_ms_ = Defaults::kStreamDeadlineMs; + int retry_backoff_ms_ = Defaults::kRetryBackoffMs; + int retry_backoff_max_ms_ = Defaults::kRetryBackoffMaxMs; + int retry_grace_period_ = Defaults::kRetryGracePeriod; + int keep_alive_time_ms_ = Defaults::kKeepAliveTimeMs; + std::vector fatal_status_codes_; std::optional selector_; std::optional provider_id_; std::optional offline_flag_source_path_; - int offline_poll_interval_ms_; + int offline_poll_interval_ms_ = Defaults::kOfflinePollMs; }; } // namespace flagd diff --git a/providers/flagd/src/sync/grpc/grpc_sync.cpp b/providers/flagd/src/sync/grpc/grpc_sync.cpp index ebe2352..ca65df9 100644 --- a/providers/flagd/src/sync/grpc/grpc_sync.cpp +++ b/providers/flagd/src/sync/grpc/grpc_sync.cpp @@ -2,6 +2,8 @@ #include +#include +#include #include #include #include @@ -36,18 +38,33 @@ absl::Status GrpcSync::Init(const openfeature::EvaluationContext& ctx) { case State::kReady: return absl::OkStatus(); case State::kInitializing: - lifecycle_cv_.wait(lock, - [this] { return state_ != State::kInitializing; }); + if (!lifecycle_cv_.wait_for( + lock, std::chrono::milliseconds(config_.GetDeadlineMs()), + [this] { return state_ != State::kInitializing; })) { + init_timed_out_ = true; + init_result_ = absl::DeadlineExceededError("Initialization timed out"); + lock.unlock(); + static_cast(ShutdownInternal(State::kUninitialized)); + return init_result_; + } return (state_ == State::kReady) ? absl::OkStatus() : init_result_; + case State::kReconnecting: + if (init_timed_out_) { + return absl::DeadlineExceededError("Initialization timed out"); + } + return init_result_; case State::kShuttingDown: lifecycle_cv_.wait(lock, [this] { return state_ == State::kUninitialized; }); break; + case State::kFatal: + return init_result_; case State::kUninitialized: break; } state_ = State::kInitializing; + LOG(INFO) << "GrpcSync state changed to kInitializing"; init_result_ = absl::UnknownError("Initialization incomplete"); std::string target = config_.GetEffectiveTargetUri(); @@ -55,30 +72,42 @@ absl::Status GrpcSync::Init(const openfeature::EvaluationContext& ctx) { config_.GetEffectiveCredentials(); if (!creds.ok()) { state_ = State::kUninitialized; + LOG(INFO) << "GrpcSync state changed to kUninitialized"; return creds.status(); } - std::shared_ptr channel = grpc::CreateChannel(target, *creds); + std::shared_ptr channel; + grpc::ChannelArguments args; + args.SetServiceConfigJSON(config_.GetServiceConfigJson()); + if (config_.GetKeepAliveTimeMs() > 0) { + args.SetInt("grpc.keepalive_time_ms", config_.GetKeepAliveTimeMs()); + } + channel = grpc::CreateCustomChannel(target, *creds, args); stub_ = FlagSyncService::NewStub(channel); - context_ = std::make_shared(); - try { background_thread_ = std::thread(&GrpcSync::WaitForUpdates, this); } catch (const std::exception& e) { state_ = State::kUninitialized; + LOG(INFO) << "GrpcSync state changed to kUninitialized"; return absl::InternalError( absl::StrCat("Failed to spawn thread: ", e.what())); } - lifecycle_cv_.wait(lock, [this] { return state_ != State::kInitializing; }); + if (!lifecycle_cv_.wait_for( + lock, std::chrono::milliseconds(config_.GetDeadlineMs()), + [this] { return state_ != State::kInitializing; })) { + init_timed_out_ = true; + init_result_ = absl::DeadlineExceededError("Initialization timed out"); + lock.unlock(); + static_cast(ShutdownInternal(State::kUninitialized)); + return init_result_; + } if (state_ != State::kReady) { - if (background_thread_.joinable()) { - lock.unlock(); - background_thread_.join(); - lock.lock(); - } + State final_state = state_; + lock.unlock(); + static_cast(ShutdownInternal(final_state)); return init_result_; } @@ -86,6 +115,10 @@ absl::Status GrpcSync::Init(const openfeature::EvaluationContext& ctx) { } absl::Status GrpcSync::Shutdown() { + return ShutdownInternal(State::kUninitialized); +} + +absl::Status GrpcSync::ShutdownInternal(State target_state) { std::unique_lock lock(lifecycle_mutex_); if (state_ == State::kUninitialized) { @@ -99,10 +132,13 @@ absl::Status GrpcSync::Shutdown() { } State previous_state = state_; - state_ = State::kShuttingDown; + if (state_ != State::kFatal) { + state_ = State::kShuttingDown; + LOG(INFO) << "GrpcSync state changed to kShuttingDown"; - if (context_) { - context_->TryCancel(); + if (context_) { + context_->TryCancel(); + } } lock.unlock(); @@ -113,9 +149,11 @@ absl::Status GrpcSync::Shutdown() { context_.reset(); stub_.reset(); - state_ = State::kUninitialized; + state_ = target_state; + LOG(INFO) << "GrpcSync state changed (ShutdownInternal)"; - if (previous_state == State::kInitializing) { + if (previous_state == State::kInitializing && + target_state == State::kUninitialized) { init_result_ = absl::CancelledError("Shutdown called during initialization"); } @@ -126,67 +164,143 @@ absl::Status GrpcSync::Shutdown() { } void GrpcSync::WaitForUpdates() { - // TODO(#12) Add automatic reconection with exponential backoff - SyncFlagsRequest request; + auto last_healthy_time = std::chrono::steady_clock::now(); + int retry_count = 0; - if (config_.GetProviderId().has_value() && - !config_.GetProviderId()->empty()) { - request.set_provider_id(*config_.GetProviderId()); - } - - std::shared_ptr local_ctx; - { - std::scoped_lock lock(lifecycle_mutex_); - local_ctx = context_; - } - - if (!local_ctx) return; - - auto reader = stub_->SyncFlags(local_ctx.get(), request); - SyncFlagsResponse response; + while (true) { + { + std::scoped_lock lock(lifecycle_mutex_); + if (state_ == State::kShuttingDown || state_ == State::kFatal) break; + } - bool first_read = true; - while (reader->Read(&response)) { - try { - Json raw = Json::parse(response.flag_configuration()); + SyncFlagsRequest request; + if (config_.GetProviderId().has_value() && + !config_.GetProviderId()->empty()) { + request.set_provider_id(*config_.GetProviderId()); + } - UpdateFlags(raw); + std::shared_ptr local_ctx = + std::make_shared(); + { + std::scoped_lock lock(lifecycle_mutex_); + if (state_ == State::kShuttingDown) break; + context_ = local_ctx; + } + if (config_.GetStreamDeadlineMs() > 0) { + std::chrono::time_point deadline = + std::chrono::system_clock::now() + + std::chrono::milliseconds(config_.GetStreamDeadlineMs()); + local_ctx->set_deadline(deadline); + } - if (first_read) { - std::scoped_lock lock(lifecycle_mutex_); - if (state_ == State::kInitializing) { - state_ = State::kReady; - init_result_ = absl::OkStatus(); - lifecycle_cv_.notify_all(); - } - first_read = false; + std::unique_ptr> reader = + stub_->SyncFlags(local_ctx.get(), request); + + if (!reader) { + LOG(ERROR) << "Failed to create sync stream"; + std::unique_lock lock(lifecycle_mutex_); + if (state_ == State::kInitializing) { + state_ = State::kReconnecting; + LOG(INFO) << "GrpcSync state changed to kReconnecting"; + init_result_ = absl::InternalError("Failed to create stream"); + lifecycle_cv_.notify_all(); } - } catch (const std::exception& e) { - LOG(ERROR) << "Failed to parse flag configuration: " << e.what(); - if (first_read) { - std::scoped_lock lock(lifecycle_mutex_); - if (state_ == State::kInitializing) { - state_ = State::kUninitialized; - init_result_ = - absl::InternalError(absl::StrCat("Parse error: ", e.what())); - lifecycle_cv_.notify_all(); + lifecycle_cv_.wait_for( + lock, std::chrono::milliseconds(config_.GetRetryBackoffMs()), [this] { + return state_ == State::kShuttingDown || state_ == State::kFatal; + }); + continue; + } + + SyncFlagsResponse response; + bool connected = false; + + while (reader->Read(&response)) { + try { + Json raw = Json::parse(response.flag_configuration()); + UpdateFlags(raw); + + if (!connected) { + connected = true; + retry_count = 0; + std::scoped_lock lock(lifecycle_mutex_); + if (state_ == State::kInitializing || + state_ == State::kReconnecting) { + state_ = State::kReady; + LOG(INFO) << "GrpcSync state changed to kReady"; + init_result_ = absl::OkStatus(); + lifecycle_cv_.notify_all(); + // TODO(#89): emit PROVIDER_READY + // TODO(#89): emit PROVIDER_CONFIGURATION_CHANGED + } } - // If we fail first read, we abort the stream - return; + // TODO(#89): emit PROVIDER_CONFIGURATION_CHANGED + } catch (const std::exception& e) { + LOG(ERROR) << "Failed to parse flag configuration: " << e.what(); } } - } - grpc::Status status = reader->Finish(); + grpc::Status status = reader->Finish(); + LOG(WARNING) << "Sync stream closed: " << status.error_message() + << " (code: " << status.error_code() << ")"; + + if (connected) { + last_healthy_time = std::chrono::steady_clock::now(); + } - { - std::scoped_lock lock(lifecycle_mutex_); - if (state_ == State::kInitializing) { - state_ = State::kUninitialized; - init_result_ = status.ok() - ? absl::InternalError("Stream closed immediately") - : absl::InternalError(status.error_message()); + // Check fatal status codes + const std::vector& fatal_codes = config_.GetFatalStatusCodes(); + bool is_fatal = std::find(fatal_codes.cbegin(), fatal_codes.cend(), + status.error_code()) != fatal_codes.cend(); + + if (is_fatal) { + ClearFlags(); + std::scoped_lock lock(lifecycle_mutex_); + state_ = State::kFatal; + LOG(INFO) << "GrpcSync state changed to kFatal"; + init_result_ = absl::InternalError( + absl::StrCat("Fatal gRPC error: ", status.error_message())); lifecycle_cv_.notify_all(); + // TODO(#89): emit PROVIDER_FATAL + break; + } + + { + std::scoped_lock lock(lifecycle_mutex_); + if (state_ == State::kInitializing) { + state_ = State::kReconnecting; + LOG(INFO) << "GrpcSync state changed to kReconnecting"; + init_result_ = absl::InternalError( + absl::StrCat("Stream failed: ", status.error_message())); + lifecycle_cv_.notify_all(); + } else if (state_ == State::kReady) { + state_ = State::kReconnecting; + LOG(INFO) << "GrpcSync state changed to kReconnecting"; + // TODO(#89): emit PROVIDER_STALE + } + } + + int64_t backoff = config_.GetRetryBackoffMs() * (1LL << retry_count); + if (backoff >= config_.GetRetryBackoffMaxMs()) { + backoff = config_.GetRetryBackoffMaxMs(); + } else { + retry_count++; + } + + std::chrono::time_point now = std::chrono::steady_clock::now(); + std::chrono::duration disconnected_duration = + std::chrono::duration_cast(now - + last_healthy_time); + if (disconnected_duration.count() > config_.GetRetryGracePeriod()) { + ClearFlags(); + // TODO(#89): emit PROVIDER_ERROR + } + + { + std::unique_lock lock(lifecycle_mutex_); + lifecycle_cv_.wait_for(lock, std::chrono::milliseconds(backoff), [this] { + return state_ == State::kShuttingDown || state_ == State::kFatal; + }); } } } diff --git a/providers/flagd/src/sync/grpc/grpc_sync.h b/providers/flagd/src/sync/grpc/grpc_sync.h index dc45abe..ccf88d5 100644 --- a/providers/flagd/src/sync/grpc/grpc_sync.h +++ b/providers/flagd/src/sync/grpc/grpc_sync.h @@ -25,23 +25,28 @@ class GrpcSync final : public FlagSync { absl::Status Shutdown() override; private: - void WaitForUpdates(); - - std::unique_ptr stub_; - std::shared_ptr context_; enum class State : uint8_t { kUninitialized, kInitializing, // Thread started, waiting for first connection kReady, // First sync complete, running normally + kReconnecting, // Connection lost or failed, trying to reconnect kShuttingDown, // Shutdown requested, cleaning up + kFatal, // Fatal error, gives up }; + void WaitForUpdates(); + absl::Status ShutdownInternal(State target_state); + + std::unique_ptr stub_; + std::shared_ptr context_; + std::thread background_thread_; std::mutex lifecycle_mutex_; std::condition_variable lifecycle_cv_; State state_ = State::kUninitialized; absl::Status init_result_; + bool init_timed_out_ = false; FlagdProviderConfig config_; }; diff --git a/providers/flagd/src/sync/sync.cpp b/providers/flagd/src/sync/sync.cpp index 74a11cc..8297e67 100644 --- a/providers/flagd/src/sync/sync.cpp +++ b/providers/flagd/src/sync/sync.cpp @@ -85,6 +85,15 @@ void FlagSync::UpdateFlags(const nlohmann::json& new_json) { } } +void FlagSync::ClearFlags() { + { + std::scoped_lock lock(flags_mutex_); + current_flags_ = std::make_shared(nlohmann::json::object()); + global_metadata_ = + std::make_shared(nlohmann::json::object()); + } +} + std::shared_ptr FlagSync::GetFlags() const { std::shared_lock lock(flags_mutex_); return current_flags_; diff --git a/providers/flagd/src/sync/sync.h b/providers/flagd/src/sync/sync.h index 6e3f613..f0cfd4b 100644 --- a/providers/flagd/src/sync/sync.h +++ b/providers/flagd/src/sync/sync.h @@ -23,6 +23,7 @@ class FlagSync { protected: void UpdateFlags(const nlohmann::json& new_json); + void ClearFlags(); private: mutable std::shared_mutex flags_mutex_; diff --git a/providers/flagd/tests/configuration_test.cpp b/providers/flagd/tests/configuration_test.cpp index 63e20df..02b741b 100644 --- a/providers/flagd/tests/configuration_test.cpp +++ b/providers/flagd/tests/configuration_test.cpp @@ -71,6 +71,76 @@ TEST_F(ConfigurationTest, EffectiveTargetUriPrecedence) { EXPECT_EQ(config.GetEffectiveTargetUri(), target_uri); } +TEST_F(ConfigurationTest, InvalidValues) { + FlagdProviderConfig config; + + // Invalid Port + int original_port = config.GetPort(); + config.SetPort(-1); + EXPECT_EQ(config.GetPort(), original_port); + config.SetPort(65536); + EXPECT_EQ(config.GetPort(), original_port); + + // Invalid Timings + int original_deadline = config.GetDeadlineMs(); + config.SetDeadlineMs(-1); + EXPECT_EQ(config.GetDeadlineMs(), original_deadline); + + int original_stream_deadline = config.GetStreamDeadlineMs(); + config.SetStreamDeadlineMs(-1); + EXPECT_EQ(config.GetStreamDeadlineMs(), original_stream_deadline); + + int original_retry_backoff = config.GetRetryBackoffMs(); + config.SetRetryBackoffMs(-1); + EXPECT_EQ(config.GetRetryBackoffMs(), original_retry_backoff); + + int original_retry_backoff_max = config.GetRetryBackoffMaxMs(); + config.SetRetryBackoffMaxMs(-1); + EXPECT_EQ(config.GetRetryBackoffMaxMs(), original_retry_backoff_max); + + int original_retry_grace = config.GetRetryGracePeriod(); + config.SetRetryGracePeriod(-1); + EXPECT_EQ(config.GetRetryGracePeriod(), original_retry_grace); + + int original_keep_alive = config.GetKeepAliveTimeMs(); + config.SetKeepAliveTimeMs(-1); + EXPECT_EQ(config.GetKeepAliveTimeMs(), original_keep_alive); + + int original_offline_poll = config.GetOfflinePollIntervalMs(); + config.SetOfflinePollIntervalMs(-1); + EXPECT_EQ(config.GetOfflinePollIntervalMs(), original_offline_poll); +} + +TEST_F(ConfigurationTest, FatalStatusCodes) { + FlagdProviderConfig config; + + // Invalid Fatal Status Codes (Integers) + config.SetFatalStatusCodes(std::vector{1, 100, 5}); // 100 is invalid + EXPECT_EQ(config.GetFatalStatusCodes().size(), 2); + EXPECT_EQ(config.GetFatalStatusCodes()[0], 1); + EXPECT_EQ(config.GetFatalStatusCodes()[1], 5); + + // Invalid Fatal Status Codes (Strings) + config.SetFatalStatusCodes("2,INVALID,6"); // INVALID is invalid + EXPECT_EQ(config.GetFatalStatusCodes().size(), 2); + EXPECT_EQ(config.GetFatalStatusCodes()[0], 2); + EXPECT_EQ(config.GetFatalStatusCodes()[1], 6); + + // String names for fatal status codes + config.SetFatalStatusCodes("DEADLINE_EXCEEDED,NOT_FOUND,INVALID_ARGUMENT"); + EXPECT_EQ(config.GetFatalStatusCodes().size(), 3); + EXPECT_EQ(config.GetFatalStatusCodes()[0], 4); + EXPECT_EQ(config.GetFatalStatusCodes()[1], 5); + EXPECT_EQ(config.GetFatalStatusCodes()[2], 3); + + // Mixed string names and integers + config.SetFatalStatusCodes("1,INTERNAL,3"); + EXPECT_EQ(config.GetFatalStatusCodes().size(), 3); + EXPECT_EQ(config.GetFatalStatusCodes()[0], 1); + EXPECT_EQ(config.GetFatalStatusCodes()[1], 13); + EXPECT_EQ(config.GetFatalStatusCodes()[2], 3); +} + TEST_F(ConfigurationTest, GetEffectiveCredentialsInsecure) { FlagdProviderConfig config; config.SetTls(false);