Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ jobs:
test_service_port: 9000
token: ${{ secrets.GITHUB_TOKEN }}
stop_service: 'false'
enable_persistence_tests: 'true'

- name: Run contract tests v3
uses: launchdarkly/gh-actions/actions/contract-tests@contract-tests-v1
with:
test_service_port: 9000
token: ${{ secrets.GITHUB_TOKEN }}
version: v3.0.0-alpha.1
version: v3.0.0-alpha.2
enable_persistence_tests: 'true'

windows:
runs-on: windows-latest
Expand Down
94 changes: 93 additions & 1 deletion contract-tests/client_entity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import logging
import sys
from urllib.parse import urlparse

import requests
from big_segment_store_fixture import BigSegmentStoreFixture
Expand All @@ -21,7 +23,10 @@
polling_ds_builder,
streaming_ds_builder
)
from ldclient.feature_store import CacheConfig
from ldclient.impl.datasourcev2.polling import PollingDataSourceBuilder
from ldclient.integrations import Consul, DynamoDB, Redis
from ldclient.interfaces import DataStoreMode


class ClientEntity:
Expand Down Expand Up @@ -102,6 +107,19 @@ def __init__(self, tag, config):
if datasystem_config.get("payloadFilter") is not None:
opts["payload_filter_key"] = datasystem_config["payloadFilter"]

# Handle persistent data store configuration for dataSystem
store_config = datasystem_config.get("store")
if store_config is not None:
persistent_store_config = store_config.get("persistentDataStore")
if persistent_store_config is not None:
store = _create_persistent_store(persistent_store_config)

# Parse store mode (0 = READ_ONLY, 1 = READ_WRITE)
store_mode_value = datasystem_config.get("storeMode", 0)
store_mode = DataStoreMode.READ_WRITE if store_mode_value == 1 else DataStoreMode.READ_ONLY

datasystem.data_store(store, store_mode)

opts["datasystem_config"] = datasystem.build()

elif config.get("streaming") is not None:
Expand All @@ -111,14 +129,16 @@ def __init__(self, tag, config):
if streaming.get("filter") is not None:
opts["payload_filter_key"] = streaming["filter"]
_set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay")
else:
elif config.get("polling") is not None:
opts['stream'] = False
polling = config["polling"]
if polling.get("baseUri") is not None:
opts["base_uri"] = polling["baseUri"]
if polling.get("filter") is not None:
opts["payload_filter_key"] = polling["filter"]
_set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval")
else:
opts['use_ldd'] = True

if config.get("events") is not None:
events = config["events"]
Expand Down Expand Up @@ -148,6 +168,9 @@ def __init__(self, tag, config):
_set_optional_time_prop(big_params, "staleAfterMs", big_config, "stale_after")
opts["big_segments"] = BigSegmentsConfig(**big_config)

if config.get("persistentDataStore") is not None:
opts["feature_store"] = _create_persistent_store(config["persistentDataStore"])

start_wait = config.get("startWaitTimeMs") or 5000
config = Config(**opts)

Expand Down Expand Up @@ -285,3 +308,72 @@ def _set_optional_time_prop(params_in: dict, name_in: str, params_out: dict, nam
if params_in.get(name_in) is not None:
params_out[name_out] = params_in[name_in] / 1000.0
return None


def _create_persistent_store(persistent_store_config: dict):
"""
Creates a persistent store instance based on the configuration.
Used for both v2 and v3 (dataSystem) configurations.
"""
store_params = persistent_store_config["store"]
store_type = store_params["type"]
dsn = store_params["dsn"]
prefix = store_params.get("prefix")

# Parse cache configuration
cache_config = persistent_store_config.get("cache", {})
cache_mode = cache_config.get("mode", "ttl")

if cache_mode == "off":
caching = CacheConfig.disabled()
elif cache_mode == "infinite":
caching = CacheConfig(expiration=sys.maxsize)
elif cache_mode == "ttl":
ttl_seconds = cache_config.get("ttl", 15)
caching = CacheConfig(expiration=ttl_seconds)
else:
caching = CacheConfig.default()

# Create the appropriate store based on type
if store_type == "redis":
return Redis.new_feature_store(
url=dsn,
prefix=prefix or Redis.DEFAULT_PREFIX,
caching=caching
)
elif store_type == "dynamodb":
# Parse endpoint from DSN (handle URLs without scheme)
parsed = urlparse(dsn) if '://' in dsn else urlparse(f'http://{dsn}')
endpoint_url = f"{parsed.scheme}://{parsed.netloc}"

# Import boto3 for DynamoDB configuration
import boto3

# Create DynamoDB client with test credentials
dynamodb_opts = {
'endpoint_url': endpoint_url,
'region_name': 'us-east-1',
'aws_access_key_id': 'dummy',
'aws_secret_access_key': 'dummy'
}

return DynamoDB.new_feature_store(
table_name="sdk-contract-tests",
prefix=prefix,
dynamodb_opts=dynamodb_opts,
caching=caching
)
elif store_type == "consul":
# Parse host and port from DSN
parsed = urlparse(dsn) if '://' in dsn else urlparse(f'http://{dsn}')
host = parsed.hostname or 'localhost'
port = parsed.port or 8500

return Consul.new_feature_store(
host=host,
port=port,
prefix=prefix,
caching=caching
)
else:
raise ValueError(f"Unsupported data store type: {store_type}")
3 changes: 3 additions & 0 deletions contract-tests/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def status():
'evaluation-hooks',
'omit-anonymous-contexts',
'client-prereq-events',
'persistent-data-store-redis',
'persistent-data-store-dynamodb',
'persistent-data-store-consul',
]
}
return json.dumps(body), 200, {'Content-type': 'application/json'}
Expand Down
8 changes: 4 additions & 4 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac
if self._config.offline:
return EvaluationDetail(default, None, error_reason('CLIENT_NOT_READY')), None

if not self.is_initialized():
if self._data_system.store.initialized:
if self._data_system.data_availability != DataAvailability.REFRESHED:
if self._data_system.data_availability == DataAvailability.CACHED:
log.warning("Feature Flag evaluation attempted before client has initialized - using last known values from feature store for feature key: " + key)
else:
log.warning("Feature Flag evaluation attempted before client has initialized! Feature store unavailable - returning default: " + str(default) + " for feature key: " + key)
Expand Down Expand Up @@ -632,8 +632,8 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState:
log.warning("all_flags_state() called, but client is in offline mode. Returning empty state")
return FeatureFlagsState(False)

if not self.is_initialized():
if self._data_system.store.initialized:
if self._data_system.data_availability != DataAvailability.REFRESHED:
if self._data_system.data_availability == DataAvailability.CACHED:
log.warning("all_flags_state() called before client has finished initializing! Using last known values from feature store")
else:
log.warning("all_flags_state() called before client has finished initializing! Feature store unavailable - returning empty state")
Expand Down
6 changes: 5 additions & 1 deletion ldclient/impl/integrations/redis/redis_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, Dict

from ldclient import log
from ldclient.feature_store_helpers import CachingStoreWrapper
from ldclient.impl.util import redact_password
from ldclient.interfaces import DiagnosticDescription, FeatureStoreCore
from ldclient.versioned_data_kind import FEATURES
Expand All @@ -20,6 +21,7 @@ def __init__(self, url, prefix, redis_opts: Dict[str, Any]):
if not have_redis:
raise NotImplementedError("Cannot use Redis feature store because redis package is not installed")
self._prefix = prefix or 'launchdarkly'
self._init_key = "{0}:{1}".format(self._prefix, CachingStoreWrapper.__INITED_CACHE_KEY__)
self._pool = redis.ConnectionPool.from_url(url=url, **redis_opts)
self.test_update_hook = None # exposed for testing
log.info("Started RedisFeatureStore connected to URL: " + redact_password(url) + " using prefix: " + self._prefix)
Expand All @@ -46,6 +48,8 @@ def init_internal(self, all_data):
item_json = json.dumps(item)
pipe.hset(base_key, key, item_json)
all_count = all_count + len(items)

pipe.set(self._init_key, self._init_key)
pipe.execute()
log.info("Initialized RedisFeatureStore with %d items", all_count)

Expand Down Expand Up @@ -109,7 +113,7 @@ def upsert_internal(self, kind, item):

def initialized_internal(self):
r = redis.Redis(connection_pool=self._pool)
return r.exists(self._items_key(FEATURES))
return r.exists(self._init_key)

def describe_configuration(self, config):
return 'Redis'
Expand Down