Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions providers/flagd/src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ cc_library(
deps = [
"@abseil-cpp//absl/strings",
"@com_github_grpc_grpc//:grpc++",
"@nlohmann_json//:json",
"@openfeature_cpp_sdk//openfeature",
],
)
257 changes: 238 additions & 19 deletions providers/flagd/src/configuration.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
#include "configuration.h"

#include <grpcpp/grpcpp.h>
#include <grpcpp/security/credentials.h>

#include <algorithm>
#include <cstdlib>
#include <fstream>
#include <map>
#include <sstream>
#include <string>
#include <utility>
#include <vector>

#include "absl/log/log.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "nlohmann/json.hpp"

namespace flagd {

namespace {
constexpr double kMsInSecond = 1000.0;

struct EnvVars {
static constexpr std::string_view kHost = "FLAGD_HOST";
static constexpr std::string_view kPort = "FLAGD_PORT";
Expand All @@ -23,19 +30,49 @@ struct EnvVars {
static constexpr std::string_view kSocketPath = "FLAGD_SOCKET_PATH";
static constexpr std::string_view kServerCertPath = "FLAGD_SERVER_CERT_PATH";
static constexpr std::string_view kDeadlineMs = "FLAGD_DEADLINE_MS";
static constexpr std::string_view kStreamDeadlineMs =
"FLAGD_STREAM_DEADLINE_MS";
static constexpr std::string_view kRetryBackoffMs = "FLAGD_RETRY_BACKOFF_MS";
static constexpr std::string_view kRetryBackoffMaxMs =
"FLAGD_RETRY_BACKOFF_MAX_MS";
static constexpr std::string_view kRetryGracePeriod =
"FLAGD_RETRY_GRACE_PERIOD";
static constexpr std::string_view kKeepAliveTimeMs =
"FLAGD_KEEP_ALIVE_TIME_MS";
static constexpr std::string_view kFatalStatusCodes =
"FLAGD_FATAL_STATUS_CODES";
static constexpr std::string_view kSourceSelector = "FLAGD_SOURCE_SELECTOR";
static constexpr std::string_view kProviderId = "FLAGD_PROVIDER_ID";
static constexpr std::string_view kOfflineFlagSourcePath =
"FLAGD_OFFLINE_FLAG_SOURCE_PATH";
static constexpr std::string_view kOfflinePollMs = "FLAGD_OFFLINE_POLL_MS";
};

struct Defaults {
static constexpr std::string_view kHost = "localhost";
static constexpr int kPortInProcess = 8015;
static constexpr bool kTls = false;
static constexpr int kDeadlineMs = 500;
static constexpr int kOfflinePollMs = 5000;
struct Validation {
static constexpr int kMinPort = 1;
static constexpr int kMaxPort = 65535;
static constexpr int kMinStatusCode = 0;
static constexpr int kMaxStatusCode = 16;
};

const std::map<std::string, int> kStatusCodeMap = {
{"OK", 0},
{"CANCELLED", 1},
{"UNKNOWN", 2},
{"INVALID_ARGUMENT", 3},
{"DEADLINE_EXCEEDED", 4},
{"NOT_FOUND", 5},
{"ALREADY_EXISTS", 6},
{"PERMISSION_DENIED", 7},
{"UNAUTHENTICATED", 16},
{"RESOURCE_EXHAUSTED", 8},
{"FAILED_PRECONDITION", 9},
{"ABORTED", 10},
{"OUT_OF_RANGE", 11},
{"UNIMPLEMENTED", 12},
{"INTERNAL", 13},
{"UNAVAILABLE", 14},
{"DATA_LOSS", 15},
};
} // namespace

Expand Down Expand Up @@ -69,31 +106,88 @@ static bool GetEnvBool(const std::string_view name, bool default_value) {
return str == "true" || str == "1";
}

FlagdProviderConfig::FlagdProviderConfig()
: host_(GetEnvStr(EnvVars::kHost, Defaults::kHost)),
port_(GetEnvInt(EnvVars::kPort, Defaults::kPortInProcess)),
tls_(GetEnvBool(EnvVars::kTls, Defaults::kTls)),
deadline_ms_(GetEnvInt(EnvVars::kDeadlineMs, Defaults::kDeadlineMs)),
offline_poll_interval_ms_(
GetEnvInt(EnvVars::kOfflinePollMs, Defaults::kOfflinePollMs)) {
static bool IsValidPort(int port) {
return port >= Validation::kMinPort && port <= Validation::kMaxPort;
}

static bool IsValidStatusCode(int code) {
return code >= Validation::kMinStatusCode &&
code <= Validation::kMaxStatusCode;
}

static std::vector<int> ParseFatalStatusCodes(const std::string& str) {
std::vector<int> result;
std::stringstream sstream(str);
std::string item;
while (std::getline(sstream, item, ',')) {
item.erase(0, item.find_first_not_of(" \t\n\r"));
item.erase(item.find_last_not_of(" \t\n\r") + 1);

if (item.empty()) continue;

try {
int code = std::stoi(item);
if (IsValidStatusCode(code)) {
result.push_back(code);
} else {
LOG(WARNING) << "Invalid gRPC status code: " << code;
}
continue;
} catch (const std::invalid_argument&) {
// Not an integer, try parsing as a status code string.
} catch (const std::out_of_range&) {
// Not a valid integer, try parsing as a status code string.
}

auto iter = kStatusCodeMap.find(item);
if (iter != kStatusCodeMap.end()) {
result.push_back(iter->second);
} else {
LOG(WARNING) << "Unknown gRPC status code: " << item;
}
}
return result;
}

FlagdProviderConfig::FlagdProviderConfig() {
SetHost(GetEnvStr(EnvVars::kHost, Defaults::kHost));
SetPort(GetEnvInt(EnvVars::kPort, Defaults::kPortInProcess));
SetTls(GetEnvBool(EnvVars::kTls, Defaults::kTls));
SetDeadlineMs(GetEnvInt(EnvVars::kDeadlineMs, Defaults::kDeadlineMs));
SetStreamDeadlineMs(
GetEnvInt(EnvVars::kStreamDeadlineMs, Defaults::kStreamDeadlineMs));
SetRetryBackoffMs(
GetEnvInt(EnvVars::kRetryBackoffMs, Defaults::kRetryBackoffMs));
SetRetryBackoffMaxMs(
GetEnvInt(EnvVars::kRetryBackoffMaxMs, Defaults::kRetryBackoffMaxMs));
SetRetryGracePeriod(
GetEnvInt(EnvVars::kRetryGracePeriod, Defaults::kRetryGracePeriod));
SetKeepAliveTimeMs(
GetEnvInt(EnvVars::kKeepAliveTimeMs, Defaults::kKeepAliveTimeMs));
SetOfflinePollIntervalMs(
GetEnvInt(EnvVars::kOfflinePollMs, Defaults::kOfflinePollMs));

if (std::string val = GetEnvStr(EnvVars::kTargetUri); !val.empty()) {
target_uri_ = val;
SetTargetUri(val);
}
if (std::string val = GetEnvStr(EnvVars::kSocketPath); !val.empty()) {
socket_path_ = val;
SetSocketPath(val);
}
if (std::string val = GetEnvStr(EnvVars::kServerCertPath); !val.empty()) {
cert_path_ = val;
SetCertPath(val);
}
if (std::string val = GetEnvStr(EnvVars::kSourceSelector); !val.empty()) {
selector_ = val;
SetSelector(val);
}
if (std::string val = GetEnvStr(EnvVars::kProviderId); !val.empty()) {
provider_id_ = val;
SetProviderId(val);
}
if (std::string val = GetEnvStr(EnvVars::kOfflineFlagSourcePath);
!val.empty()) {
offline_flag_source_path_ = val;
SetOfflineFlagSourcePath(val);
}
if (std::string val = GetEnvStr(EnvVars::kFatalStatusCodes); !val.empty()) {
SetFatalStatusCodes(val);
}
}

Expand Down Expand Up @@ -150,6 +244,50 @@ std::optional<std::string> FlagdProviderConfig::GetCertPath() const {
return cert_path_;
}
int FlagdProviderConfig::GetDeadlineMs() const { return deadline_ms_; }
int FlagdProviderConfig::GetStreamDeadlineMs() const {
return stream_deadline_ms_;
}
int FlagdProviderConfig::GetRetryBackoffMs() const { return retry_backoff_ms_; }
int FlagdProviderConfig::GetRetryBackoffMaxMs() const {
return retry_backoff_max_ms_;
}
int FlagdProviderConfig::GetRetryGracePeriod() const {
return retry_grace_period_;
}
int FlagdProviderConfig::GetKeepAliveTimeMs() const {
return keep_alive_time_ms_;
}
const std::vector<int>& FlagdProviderConfig::GetFatalStatusCodes() const {
return fatal_status_codes_;
}

std::string FlagdProviderConfig::GetServiceConfigJson() const {
const auto names = nlohmann::json::array({
nlohmann::json::object({{"service", "flagd.evaluation.v1.Service"}}),
nlohmann::json::object({{"service", "flagd.sync.v1.FlagSyncService"}}),
});

const auto retry_policy = nlohmann::json::object({
{"maxAttempts", 4},
{"initialBackoff", absl::StrCat(retry_backoff_ms_ / kMsInSecond, "s")},
{"maxBackoff", absl::StrCat(retry_backoff_max_ms_ / kMsInSecond, "s")},
{"backoffMultiplier", 2},
{
"retryableStatusCodes",
nlohmann::json::array({"UNAVAILABLE", "UNKNOWN"}),
},
});

const auto method_config = nlohmann::json::object({
{"name", names},
{"retryPolicy", retry_policy},
});

return nlohmann::json::object(
{{"methodConfig", nlohmann::json::array({method_config})}})
.dump();
}

std::optional<std::string> FlagdProviderConfig::GetSelector() const {
return selector_;
}
Expand All @@ -170,6 +308,10 @@ FlagdProviderConfig& FlagdProviderConfig::SetHost(std::string_view host) {
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetPort(int port) {
if (!IsValidPort(port)) {
LOG(WARNING) << "Invalid port: " << port << ". Ignoring.";
return *this;
}
port_ = port;
return *this;
}
Expand All @@ -195,9 +337,81 @@ FlagdProviderConfig& FlagdProviderConfig::SetCertPath(std::string_view path) {
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetDeadlineMs(int deadline_ms) {
if (deadline_ms <= 0) {
LOG(WARNING) << "Invalid deadline_ms: " << deadline_ms << ". Ignoring.";
return *this;
}
deadline_ms_ = deadline_ms;
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetStreamDeadlineMs(
int stream_deadline_ms) {
if (stream_deadline_ms <= 0) {
LOG(WARNING) << "Invalid stream_deadline_ms: " << stream_deadline_ms
<< ". Ignoring.";
return *this;
}
stream_deadline_ms_ = stream_deadline_ms;
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetRetryBackoffMs(
int retry_backoff_ms) {
if (retry_backoff_ms <= 0) {
LOG(WARNING) << "Invalid retry_backoff_ms: " << retry_backoff_ms
<< ". Ignoring.";
return *this;
}
retry_backoff_ms_ = retry_backoff_ms;
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetRetryBackoffMaxMs(
int retry_backoff_max_ms) {
if (retry_backoff_max_ms <= 0) {
LOG(WARNING) << "Invalid retry_backoff_max_ms: " << retry_backoff_max_ms
<< ". Ignoring.";
return *this;
}
retry_backoff_max_ms_ = retry_backoff_max_ms;
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetRetryGracePeriod(
int retry_grace_period) {
if (retry_grace_period < 0) {
LOG(WARNING) << "Invalid retry_grace_period: " << retry_grace_period
<< ". Ignoring.";
return *this;
}
retry_grace_period_ = retry_grace_period;
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetKeepAliveTimeMs(
int keep_alive_time_ms) {
if (keep_alive_time_ms < 0) {
LOG(WARNING) << "Invalid keep_alive_time_ms: " << keep_alive_time_ms
<< ". Ignoring.";
return *this;
}
keep_alive_time_ms_ = keep_alive_time_ms;
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetFatalStatusCodes(
const std::vector<int>& fatal_status_codes) {
std::vector<int> valid_codes;
for (int code : fatal_status_codes) {
if (IsValidStatusCode(code)) {
valid_codes.push_back(code);
} else {
LOG(WARNING) << "Invalid gRPC status code: " << code << ". Ignoring.";
}
}
fatal_status_codes_ = std::move(valid_codes);
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetFatalStatusCodes(
const std::string& fatal_status_codes_str) {
fatal_status_codes_ = ParseFatalStatusCodes(fatal_status_codes_str);
return *this;
}
FlagdProviderConfig& FlagdProviderConfig::SetSelector(
std::string_view selector) {
selector_ = std::string(selector);
Expand All @@ -215,6 +429,11 @@ FlagdProviderConfig& FlagdProviderConfig::SetOfflineFlagSourcePath(
}
FlagdProviderConfig& FlagdProviderConfig::SetOfflinePollIntervalMs(
int interval_ms) {
if (interval_ms <= 0) {
LOG(WARNING) << "Invalid offline_poll_interval_ms: " << interval_ms
<< ". Ignoring.";
return *this;
}
offline_poll_interval_ms_ = interval_ms;
return *this;
}
Expand Down
Loading
Loading