diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e0b1059b..2c3ba795 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,13 +62,15 @@ jobs: test_service_port: 9000 token: ${{ secrets.GITHUB_TOKEN }} stop_service: 'false' + enable_persistence_tests: 'true' - name: Run contract tests v3 uses: launchdarkly/gh-actions/actions/contract-tests@contract-tests-v1 with: test_service_port: 9000 token: ${{ secrets.GITHUB_TOKEN }} - version: v3.0.0-alpha.1 + version: v3.0.0-alpha.2 + enable_persistence_tests: 'true' windows: runs-on: windows-latest diff --git a/contract-tests/client_entity.py b/contract-tests/client_entity.py index f68f7488..55415d48 100644 --- a/contract-tests/client_entity.py +++ b/contract-tests/client_entity.py @@ -1,5 +1,7 @@ import json import logging +import sys +from urllib.parse import urlparse import requests from big_segment_store_fixture import BigSegmentStoreFixture @@ -21,7 +23,10 @@ polling_ds_builder, streaming_ds_builder ) +from ldclient.feature_store import CacheConfig from ldclient.impl.datasourcev2.polling import PollingDataSourceBuilder +from ldclient.integrations import Consul, DynamoDB, Redis +from ldclient.interfaces import DataStoreMode class ClientEntity: @@ -102,6 +107,19 @@ def __init__(self, tag, config): if datasystem_config.get("payloadFilter") is not None: opts["payload_filter_key"] = datasystem_config["payloadFilter"] + # Handle persistent data store configuration for dataSystem + store_config = datasystem_config.get("store") + if store_config is not None: + persistent_store_config = store_config.get("persistentDataStore") + if persistent_store_config is not None: + store = _create_persistent_store(persistent_store_config) + + # Parse store mode (0 = READ_ONLY, 1 = READ_WRITE) + store_mode_value = datasystem_config.get("storeMode", 0) + store_mode = DataStoreMode.READ_WRITE if store_mode_value == 1 else DataStoreMode.READ_ONLY + + datasystem.data_store(store, store_mode) + opts["datasystem_config"] = datasystem.build() elif config.get("streaming") is not None: @@ -111,7 +129,7 @@ def __init__(self, tag, config): if streaming.get("filter") is not None: opts["payload_filter_key"] = streaming["filter"] _set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay") - else: + elif config.get("polling") is not None: opts['stream'] = False polling = config["polling"] if polling.get("baseUri") is not None: @@ -119,6 +137,8 @@ def __init__(self, tag, config): if polling.get("filter") is not None: opts["payload_filter_key"] = polling["filter"] _set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval") + else: + opts['use_ldd'] = True if config.get("events") is not None: events = config["events"] @@ -148,6 +168,9 @@ def __init__(self, tag, config): _set_optional_time_prop(big_params, "staleAfterMs", big_config, "stale_after") opts["big_segments"] = BigSegmentsConfig(**big_config) + if config.get("persistentDataStore") is not None: + opts["feature_store"] = _create_persistent_store(config["persistentDataStore"]) + start_wait = config.get("startWaitTimeMs") or 5000 config = Config(**opts) @@ -285,3 +308,72 @@ def _set_optional_time_prop(params_in: dict, name_in: str, params_out: dict, nam if params_in.get(name_in) is not None: params_out[name_out] = params_in[name_in] / 1000.0 return None + + +def _create_persistent_store(persistent_store_config: dict): + """ + Creates a persistent store instance based on the configuration. + Used for both v2 and v3 (dataSystem) configurations. + """ + store_params = persistent_store_config["store"] + store_type = store_params["type"] + dsn = store_params["dsn"] + prefix = store_params.get("prefix") + + # Parse cache configuration + cache_config = persistent_store_config.get("cache", {}) + cache_mode = cache_config.get("mode", "ttl") + + if cache_mode == "off": + caching = CacheConfig.disabled() + elif cache_mode == "infinite": + caching = CacheConfig(expiration=sys.maxsize) + elif cache_mode == "ttl": + ttl_seconds = cache_config.get("ttl", 15) + caching = CacheConfig(expiration=ttl_seconds) + else: + caching = CacheConfig.default() + + # Create the appropriate store based on type + if store_type == "redis": + return Redis.new_feature_store( + url=dsn, + prefix=prefix or Redis.DEFAULT_PREFIX, + caching=caching + ) + elif store_type == "dynamodb": + # Parse endpoint from DSN (handle URLs without scheme) + parsed = urlparse(dsn) if '://' in dsn else urlparse(f'http://{dsn}') + endpoint_url = f"{parsed.scheme}://{parsed.netloc}" + + # Import boto3 for DynamoDB configuration + import boto3 + + # Create DynamoDB client with test credentials + dynamodb_opts = { + 'endpoint_url': endpoint_url, + 'region_name': 'us-east-1', + 'aws_access_key_id': 'dummy', + 'aws_secret_access_key': 'dummy' + } + + return DynamoDB.new_feature_store( + table_name="sdk-contract-tests", + prefix=prefix, + dynamodb_opts=dynamodb_opts, + caching=caching + ) + elif store_type == "consul": + # Parse host and port from DSN + parsed = urlparse(dsn) if '://' in dsn else urlparse(f'http://{dsn}') + host = parsed.hostname or 'localhost' + port = parsed.port or 8500 + + return Consul.new_feature_store( + host=host, + port=port, + prefix=prefix, + caching=caching + ) + else: + raise ValueError(f"Unsupported data store type: {store_type}") diff --git a/contract-tests/service.py b/contract-tests/service.py index 8e9f1c26..699dec07 100644 --- a/contract-tests/service.py +++ b/contract-tests/service.py @@ -79,6 +79,9 @@ def status(): 'evaluation-hooks', 'omit-anonymous-contexts', 'client-prereq-events', + 'persistent-data-store-redis', + 'persistent-data-store-dynamodb', + 'persistent-data-store-consul', ] } return json.dumps(body), 200, {'Content-type': 'application/json'} diff --git a/ldclient/client.py b/ldclient/client.py index 01007610..829575d4 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -561,8 +561,8 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac if self._config.offline: return EvaluationDetail(default, None, error_reason('CLIENT_NOT_READY')), None - if not self.is_initialized(): - if self._data_system.store.initialized: + if self._data_system.data_availability != DataAvailability.REFRESHED: + if self._data_system.data_availability == DataAvailability.CACHED: log.warning("Feature Flag evaluation attempted before client has initialized - using last known values from feature store for feature key: " + key) else: log.warning("Feature Flag evaluation attempted before client has initialized! Feature store unavailable - returning default: " + str(default) + " for feature key: " + key) @@ -632,8 +632,8 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState: log.warning("all_flags_state() called, but client is in offline mode. Returning empty state") return FeatureFlagsState(False) - if not self.is_initialized(): - if self._data_system.store.initialized: + if self._data_system.data_availability != DataAvailability.REFRESHED: + if self._data_system.data_availability == DataAvailability.CACHED: log.warning("all_flags_state() called before client has finished initializing! Using last known values from feature store") else: log.warning("all_flags_state() called before client has finished initializing! Feature store unavailable - returning empty state") diff --git a/ldclient/impl/integrations/redis/redis_feature_store.py b/ldclient/impl/integrations/redis/redis_feature_store.py index 544628b5..95a95d14 100644 --- a/ldclient/impl/integrations/redis/redis_feature_store.py +++ b/ldclient/impl/integrations/redis/redis_feature_store.py @@ -2,6 +2,7 @@ from typing import Any, Dict from ldclient import log +from ldclient.feature_store_helpers import CachingStoreWrapper from ldclient.impl.util import redact_password from ldclient.interfaces import DiagnosticDescription, FeatureStoreCore from ldclient.versioned_data_kind import FEATURES @@ -20,6 +21,7 @@ def __init__(self, url, prefix, redis_opts: Dict[str, Any]): if not have_redis: raise NotImplementedError("Cannot use Redis feature store because redis package is not installed") self._prefix = prefix or 'launchdarkly' + self._init_key = "{0}:{1}".format(self._prefix, CachingStoreWrapper.__INITED_CACHE_KEY__) self._pool = redis.ConnectionPool.from_url(url=url, **redis_opts) self.test_update_hook = None # exposed for testing log.info("Started RedisFeatureStore connected to URL: " + redact_password(url) + " using prefix: " + self._prefix) @@ -46,6 +48,8 @@ def init_internal(self, all_data): item_json = json.dumps(item) pipe.hset(base_key, key, item_json) all_count = all_count + len(items) + + pipe.set(self._init_key, self._init_key) pipe.execute() log.info("Initialized RedisFeatureStore with %d items", all_count) @@ -109,7 +113,7 @@ def upsert_internal(self, kind, item): def initialized_internal(self): r = redis.Redis(connection_pool=self._pool) - return r.exists(self._items_key(FEATURES)) + return r.exists(self._init_key) def describe_configuration(self, config): return 'Redis'