Skip to content

Commit d05fccf

Browse files
chore: Support flag change listeners in contract tests (#410)
1 parent 60272b2 commit d05fccf

File tree

5 files changed

+122
-11
lines changed

5 files changed

+122
-11
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ jobs:
6969
with:
7070
test_service_port: 9000
7171
token: ${{ secrets.GITHUB_TOKEN }}
72-
version: v3.0.0-alpha.3
72+
version: v3.0.0-alpha.4
7373
enable_persistence_tests: "true"
7474

7575
windows:

contract-tests/client_entity.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import requests
88
from big_segment_store_fixture import BigSegmentStoreFixture
9+
from flag_change_listener import ListenerRegistry
910
from hook import PostingHook
1011

1112
from ldclient import *
@@ -158,6 +159,7 @@ def __init__(self, tag, config):
158159
config = Config(**opts)
159160

160161
self.client = client.LDClient(config, start_wait / 1000.0)
162+
self.listeners = ListenerRegistry(self.client.flag_tracker)
161163

162164
def is_initializing(self) -> bool:
163165
return self.client.is_initialized()
@@ -282,7 +284,25 @@ def fn(payload) -> Result:
282284
result = migrator.write(params["key"], Context.from_dict(params["context"]), Stage.from_str(params["defaultStage"]), params["payload"])
283285
return {"result": result.authoritative.value if result.authoritative.is_success() else result.authoritative.error}
284286

287+
def register_flag_change_listener(self, params: dict):
288+
self.listeners.register_flag_change_listener(
289+
listener_id=params['listenerId'],
290+
callback_uri=params['callbackUri'],
291+
)
292+
293+
def register_flag_value_change_listener(self, params: dict):
294+
self.listeners.register_flag_value_change_listener(
295+
listener_id=params["listenerId"],
296+
flag_key=params["flagKey"],
297+
context=Context.from_dict(params["context"]),
298+
callback_uri=params["callbackUri"],
299+
)
300+
301+
def unregister_listener(self, params: dict) -> bool:
302+
return self.listeners.unregister(params['listenerId'])
303+
285304
def close(self):
305+
self.listeners.close_all()
286306
self.client.close()
287307
self.log.info('Test ended')
288308

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import logging
2+
import threading
3+
from typing import Callable, Dict
4+
5+
import requests
6+
7+
from ldclient.context import Context
8+
from ldclient.interfaces import FlagChange, FlagTracker, FlagValueChange
9+
10+
log = logging.getLogger('testservice')
11+
12+
13+
class ListenerRegistry:
14+
"""Manages all active flag change listener registrations for a single SDK client entity."""
15+
16+
def __init__(self, tracker: FlagTracker):
17+
self._tracker = tracker
18+
self._lock = threading.Lock()
19+
# Maps listener_id -> (sdk_listener callable, cleanup function)
20+
self._listeners: Dict[str, Callable] = {}
21+
22+
def register_flag_change_listener(self, listener_id: str, callback_uri: str):
23+
"""Register a general flag change listener that fires on any flag configuration change."""
24+
def on_flag_change(flag_change: FlagChange):
25+
payload = {
26+
'listenerId': listener_id,
27+
'flagKey': flag_change.key,
28+
}
29+
try:
30+
requests.post(callback_uri, json=payload)
31+
except Exception as e:
32+
log.warning('Failed to post flag change notification: %s', e)
33+
34+
with self._lock:
35+
# If a listener with this ID already exists, unregister the old one first
36+
if listener_id in self._listeners:
37+
self._tracker.remove_listener(self._listeners[listener_id])
38+
39+
self._tracker.add_listener(on_flag_change)
40+
self._listeners[listener_id] = on_flag_change
41+
42+
def register_flag_value_change_listener(
43+
self,
44+
listener_id: str,
45+
flag_key: str,
46+
context: Context,
47+
callback_uri: str,
48+
):
49+
"""Register a flag value change listener that fires when the evaluated value changes."""
50+
def on_value_change(change: FlagValueChange):
51+
payload = {
52+
'listenerId': listener_id,
53+
'flagKey': change.key,
54+
'oldValue': change.old_value,
55+
'newValue': change.new_value,
56+
}
57+
try:
58+
requests.post(callback_uri, json=payload)
59+
except Exception as e:
60+
log.warning('Failed to post flag value change notification: %s', e)
61+
62+
# add_flag_value_change_listener returns the underlying listener
63+
# that must be passed to remove_listener to unsubscribe
64+
with self._lock:
65+
if listener_id in self._listeners:
66+
self._tracker.remove_listener(self._listeners[listener_id])
67+
68+
underlying_listener = self._tracker.add_flag_value_change_listener(flag_key, context, on_value_change)
69+
self._listeners[listener_id] = underlying_listener
70+
71+
def unregister(self, listener_id: str) -> bool:
72+
"""Unregister a previously registered listener. Returns False if not found."""
73+
with self._lock:
74+
listener = self._listeners.pop(listener_id, None)
75+
if listener is None:
76+
return False
77+
78+
self._tracker.remove_listener(listener)
79+
return True
80+
81+
def close_all(self):
82+
"""Unregister all listeners. Called when the SDK client entity shuts down."""
83+
with self._lock:
84+
for listener in self._listeners.values():
85+
self._tracker.remove_listener(listener)
86+
self._listeners.clear()

contract-tests/service.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ def status():
8282
'persistent-data-store-redis',
8383
'persistent-data-store-dynamodb',
8484
'persistent-data-store-consul',
85+
'flag-change-listeners',
86+
'flag-value-change-listeners',
8587
]
8688
}
8789
return json.dumps(body), 200, {'Content-type': 'application/json'}
@@ -150,6 +152,13 @@ def post_client_command(id):
150152
response = client.migration_variation(sub_params)
151153
elif command == "migrationOperation":
152154
response = client.migration_operation(sub_params)
155+
elif command == "registerFlagChangeListener":
156+
client.register_flag_change_listener(sub_params)
157+
elif command == "registerFlagValueChangeListener":
158+
client.register_flag_value_change_listener(sub_params)
159+
elif command == "unregisterListener":
160+
if not client.unregister_listener(sub_params):
161+
return 'no listener with id "%s"' % sub_params['listenerId'], 400
153162
else:
154163
return '', 400
155164

ldclient/testing/impl/datasystem/test_fdv2_datasystem.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,10 @@ def test_fdv2_falls_back_to_fdv1_on_polling_success_with_header():
266266

267267
changed = Event()
268268
changes: List[FlagChange] = []
269-
count = 0
270269

271270
def listener(flag_change: FlagChange):
272-
nonlocal count
273-
count += 1
274271
changes.append(flag_change)
275-
if count >= 2:
272+
if flag_change.key == "fdv1-update-flag":
276273
changed.set()
277274

278275
set_on_ready = Event()
@@ -282,13 +279,12 @@ def listener(flag_change: FlagChange):
282279

283280
assert set_on_ready.wait(1), "Data system did not become ready in time"
284281

285-
# Trigger a flag update in FDv1
286-
td_fdv1.update(td_fdv1.flag("fdv1-fallback-flag").on(False))
287-
assert changed.wait(1), "Flag change listener was not called in time"
282+
# Update a different flag than the one in initial data to verify FDv1 is
283+
# actively processing updates (not just init)
284+
td_fdv1.update(td_fdv1.flag("fdv1-update-flag").on(True))
285+
assert changed.wait(2), "Flag change listener was not called in time"
288286

289-
# Verify FDv1 is active
290-
assert len(changes) > 0
291-
assert any(c.key == "fdv1-fallback-flag" for c in changes)
287+
assert any(c.key == "fdv1-update-flag" for c in changes)
292288

293289

294290
def test_fdv2_falls_back_to_fdv1_with_initializer():

0 commit comments

Comments
 (0)