-
Notifications
You must be signed in to change notification settings - Fork 91
Fix 3459 rate limit #7938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix 3459 rate limit #7938
Changes from all commits
1ad8a0d
5ec49b2
ff0af8f
6002925
9a1a3f6
389bb1c
b2f5f7f
4c961ae
0c2c7a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| type: Fixed | ||
| description: Fixed problems with larger timeout checks on rate_limit for integrations | ||
| pr: 7938 | ||
| labels: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| import time | ||
| from enum import Enum | ||
| from typing import List | ||
| from typing import List, Optional | ||
|
|
||
| from loguru import logger | ||
|
|
||
|
|
@@ -53,6 +53,8 @@ class RateLimiter: | |
| """ | ||
|
|
||
| EXPIRE_AFTER_PERIOD_SECONDS: int = 500 | ||
| MAX_DEFAULT_TIMEOUT_SECONDS: int = 120 | ||
| MIN_DEFAULT_TIMEOUT_SECONDS: int = 30 | ||
|
|
||
| def build_redis_key(self, current_seconds: int, request: RateLimiterRequest) -> str: | ||
| """ | ||
|
|
@@ -110,20 +112,45 @@ def decrement_usage( | |
| pipe.decrby(redis_key, 1) | ||
| pipe.execute() | ||
|
|
||
| def seconds_until_next_bucket( | ||
| self, current_seconds: int, request: RateLimiterRequest | ||
| ) -> int: | ||
| """ | ||
| Returns the number of seconds until the next time bucket starts for the given request. | ||
| """ | ||
| fixed_time_filter = ( | ||
| int(current_seconds / request.period.factor) * request.period.factor | ||
| ) | ||
| return (fixed_time_filter + request.period.factor) - current_seconds | ||
|
|
||
| def limit( | ||
| self, requests: List[RateLimiterRequest], timeout_seconds: int = 30 | ||
| self, requests: List[RateLimiterRequest], timeout_seconds: Optional[int] = None | ||
| ) -> None: | ||
| """ | ||
| Increments call count for the current time bucket and verifies that it is within the | ||
| rate limit provided. If limit is breached it will decrement the count and try again | ||
| until it can successfully reserve a call, or timeout. Because we rely on optimistic | ||
| locking for many keys at a time, it is possible that concurrent rate limiters could | ||
| make the wrong decision in between increment to decrement operations. | ||
| rate limit provided. If limit is breached it will decrement the count and sleep until | ||
| the current bucket rolls over before retrying. Because we rely on optimistic locking | ||
| for many keys at a time, it is possible that concurrent rate limiters could make the | ||
| wrong decision in between increment to decrement operations. | ||
|
|
||
| If connection to the redis cluster fails then rate limiter will be skipped. | ||
|
|
||
| Expiration is set on any keys which are stored in the cluster | ||
| Expiration is set on any keys which are stored in the cluster. | ||
|
|
||
| timeout_seconds defaults to the longest period factor + 5s (capped at 120s), | ||
| giving the limiter at least one full bucket rollover window before giving up. | ||
| The cap prevents HOUR/DAY limits from blocking a worker for unreasonable | ||
| durations; connectors like SurveyMonkey configure both minute and day limits, | ||
| and a breached day limit should fail fast rather than sleep for 24 hours. | ||
| """ | ||
| if timeout_seconds is None: | ||
| timeout_seconds = min( | ||
| max(r.period.factor for r in requests) + 5 | ||
| if requests | ||
| else self.MIN_DEFAULT_TIMEOUT_SECONDS, | ||
| self.MAX_DEFAULT_TIMEOUT_SECONDS, | ||
| ) | ||
|
|
||
| try: | ||
| redis: FidesopsRedis = get_cache() | ||
| except RedisConnectionError as exc: | ||
|
|
@@ -135,6 +162,7 @@ def limit( | |
| return | ||
|
|
||
| start_time = time.time() | ||
| breached_requests: List[RateLimiterRequest] = [] | ||
| while time.time() - start_time < timeout_seconds: | ||
| current_seconds = int(time.time()) | ||
|
|
||
|
|
@@ -156,7 +184,17 @@ def limit( | |
| self.decrement_usage( | ||
| redis=redis, current_seconds=current_seconds, requests=requests | ||
| ) | ||
| time.sleep(0.1) | ||
| # Sleep until the next bucket boundary for the longest-period breached | ||
| # request. This avoids hammering Redis every 100ms for minute/hour/day | ||
| # limits where the bucket won't roll for a long time. | ||
| sleep_seconds = max( | ||
| self.seconds_until_next_bucket(current_seconds, r) | ||
| for r in breached_requests | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For practical purposes this is harmless (the 0.05 s buffer and the now = int(time.time())
sleep_seconds = max(
self.seconds_until_next_bucket(now, r)
for r in breached_requests
) |
||
| # Add a small buffer to avoid landing exactly on the boundary, but | ||
| # never sleep longer than the remaining timeout. | ||
| remaining = timeout_seconds - (time.time() - start_time) | ||
| time.sleep(min(sleep_seconds + 0.05, max(remaining, 0))) | ||
| else: | ||
| # success | ||
| return | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,11 +3,14 @@ | |
| import unittest.mock as mock | ||
| from collections import Counter | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from datetime import datetime, timedelta | ||
| from typing import Any, Callable, Dict, Generator, List | ||
|
|
||
| import pytest | ||
| from freezegun import freeze_time | ||
| from requests import Session | ||
|
|
||
| from fides.api.common_exceptions import RedisConnectionError | ||
| from fides.api.db import session | ||
| from fides.api.graph.graph import DatasetGraph | ||
| from fides.api.models.connectionconfig import ( | ||
|
|
@@ -218,6 +221,88 @@ def test_limiter_times_out_when_bucket_full() -> None: | |
| time.sleep(0.002) | ||
|
|
||
|
|
||
| @pytest.mark.integration | ||
| def test_minute_period_breach_waits_for_rollover() -> None: | ||
| """A MINUTE-period breach must sleep until the bucket rolls over, not time out. | ||
|
|
||
| Regression: the old default timeout_seconds=30 was shorter than the MINUTE | ||
| bucket period (60s). When a breach occurred more than 30s before the next | ||
| boundary the limiter would raise RateLimiterTimeoutException instead of | ||
| waiting. This affected the Okta client (period=MINUTE) and any SaaS | ||
| connector with a per-minute rate limit (e.g. Zenoti, SurveyMonkey). | ||
|
|
||
| Uses real Redis for bucket state; only mocks time to avoid 60s wall-clock | ||
| waits. | ||
| """ | ||
| # "2024-01-01 00:00:05" is 5s into a minute, so the next bucket is 55s | ||
| # away. The old 30s default would time out before reaching it. | ||
| with freeze_time("2024-01-01 00:00:05") as frozen: | ||
| limiter = RateLimiter() | ||
| key = f"test_minute_rollover_{random.randint(0, 10**12)}" | ||
| request = RateLimiterRequest( | ||
| key=key, rate_limit=1, period=RateLimiterPeriod.MINUTE | ||
| ) | ||
|
|
||
| def advancing_sleep(seconds: float) -> None: | ||
| frozen.tick(timedelta(seconds=seconds)) | ||
|
|
||
| with mock.patch( | ||
| "fides.api.service.connectors.limiter.rate_limiter.time.sleep", | ||
| side_effect=advancing_sleep, | ||
| ): | ||
| limiter.limit(requests=[request]) # fills the single slot | ||
| limiter.limit(requests=[request]) # breach -> sleep to boundary -> succeed | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The test passes by not raising, which is correct, but there is no positive assertion to confirm the limiter actually waited for the bucket rollover. If, say, A small guard like checking that the frozen clock advanced by roughly the expected sleep duration would make the regression protection more robust: limiter.limit(requests=[request]) # fills the single slot
before = time.time()
limiter.limit(requests=[request]) # breach -> sleep to boundary -> succeed
assert time.time() - before >= 50 # slept at least 50 s into the new bucket |
||
|
|
||
| # Confirm the limiter actually slept past the bucket boundary (00:01:00), | ||
| # not just that it didn't raise. | ||
| assert frozen().timestamp() >= datetime(2024, 1, 1, 0, 1, 0).timestamp() | ||
|
|
||
|
|
||
| @pytest.mark.integration | ||
| def test_dynamic_timeout_capped_for_day_limits() -> None: | ||
| """Mixed MINUTE + DAY limits must not block a worker for hours. | ||
|
|
||
| SurveyMonkey configures ``rate: 120/minute`` and ``rate: 500/day``. | ||
| Without a cap the dynamic timeout would be ``86400 + 5 = 86405s`` (~24h), | ||
| leaving a Celery worker sleeping until the next day bucket rolls over. | ||
| The 120s cap ensures the limiter fails fast and surfaces an error instead. | ||
|
|
||
| Uses real Redis for bucket state; only mocks time to avoid real sleeping. | ||
| """ | ||
| with freeze_time("2024-01-01 00:00:05") as frozen: | ||
| limiter = RateLimiter() | ||
| key = f"test_day_cap_{random.randint(0, 10**12)}" | ||
|
|
||
| minute_request = RateLimiterRequest( | ||
| key=f"{key}:min", rate_limit=1, period=RateLimiterPeriod.MINUTE | ||
| ) | ||
| day_request = RateLimiterRequest( | ||
| key=f"{key}:day", rate_limit=1, period=RateLimiterPeriod.DAY | ||
| ) | ||
| both = [minute_request, day_request] | ||
|
|
||
| sleep_total = [0.0] | ||
|
|
||
| def advancing_sleep(seconds: float) -> None: | ||
| sleep_total[0] += seconds | ||
| frozen.tick(timedelta(seconds=seconds)) | ||
|
|
||
| with mock.patch( | ||
| "fides.api.service.connectors.limiter.rate_limiter.time.sleep", | ||
| side_effect=advancing_sleep, | ||
| ): | ||
| # Fill both buckets. | ||
| limiter.limit(requests=both) | ||
|
|
||
| # Next call breaches both. Should timeout, not sleep for 24h. | ||
| with pytest.raises(RateLimiterTimeoutException): | ||
| limiter.limit(requests=both) | ||
|
|
||
| # Total mocked sleep must reflect the 120s cap, not the 86405s | ||
| # uncapped value. | ||
| assert 110 <= sleep_total[0] < 130 # should be ~120 s, not 86400 s | ||
|
|
||
|
|
||
| @pytest.mark.integration_saas | ||
| @pytest.mark.asyncio | ||
| async def test_rate_limiter_full_integration( | ||
|
|
@@ -273,3 +358,98 @@ def wrapper(self, *args, **kwargs): | |
|
|
||
| wrapper.call_log = call_log | ||
| return wrapper | ||
|
|
||
|
|
||
| class TestRateLimiterRedisFailure: | ||
| """Unit tests for RateLimiter.limit() when Redis is unavailable.""" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
|
|
||
| def test_redis_connection_error_is_silently_skipped(self) -> None: | ||
| with mock.patch( | ||
| "fides.api.service.connectors.limiter.rate_limiter.get_cache", | ||
| side_effect=RedisConnectionError("Redis unavailable"), | ||
| ): | ||
| # Should not raise — limiter is a no-op when Redis is down. | ||
| RateLimiter().limit( | ||
| requests=[ | ||
| RateLimiterRequest( | ||
| key="k", rate_limit=1, period=RateLimiterPeriod.SECOND | ||
| ) | ||
| ] | ||
| ) | ||
|
|
||
|
|
||
| class TestSecondsUntilNextBucket: | ||
| """Unit tests for RateLimiter.seconds_until_next_bucket.""" | ||
|
|
||
| def _req(self, period: RateLimiterPeriod) -> RateLimiterRequest: | ||
| return RateLimiterRequest(key="k", rate_limit=10, period=period) | ||
|
|
||
| def test_second_at_boundary(self) -> None: | ||
| # At the start of a second (e.g. t=1000), 1s remains. | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 1000, self._req(RateLimiterPeriod.SECOND) | ||
| ) | ||
| == 1 | ||
| ) | ||
|
|
||
| def test_day_at_start(self) -> None: | ||
| # At the exact start of a day (t=86400), 86400s remain. | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 86400, self._req(RateLimiterPeriod.DAY) | ||
| ) | ||
| == 86400 | ||
| ) | ||
|
|
||
| def test_day_mid(self) -> None: | ||
| # 12 hours into a day → 12 hours remain. | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 86400 + 43200, self._req(RateLimiterPeriod.DAY) | ||
| ) | ||
| == 43200 | ||
| ) | ||
|
|
||
| def test_minute_at_start(self) -> None: | ||
| # At the exact start of a minute (e.g. t=600), 60s remain. | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 600, self._req(RateLimiterPeriod.MINUTE) | ||
| ) | ||
| == 60 | ||
| ) | ||
|
|
||
| def test_minute_30s_in(self) -> None: | ||
| # 30 seconds into a minute → 30s remain. | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 630, self._req(RateLimiterPeriod.MINUTE) | ||
| ) | ||
| == 30 | ||
| ) | ||
|
|
||
| def test_minute_59s_in(self) -> None: | ||
| # 59 seconds into a minute → 1s remains. | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 659, self._req(RateLimiterPeriod.MINUTE) | ||
| ) | ||
| == 1 | ||
| ) | ||
|
|
||
| def test_hour_at_start(self) -> None: | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 3600, self._req(RateLimiterPeriod.HOUR) | ||
| ) | ||
| == 3600 | ||
| ) | ||
|
|
||
| def test_hour_mid(self) -> None: | ||
| assert ( | ||
| RateLimiter().seconds_until_next_bucket( | ||
| 5400, self._req(RateLimiterPeriod.HOUR) | ||
| ) | ||
| == 1800 | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/fides/api/service/connectors/limiter/rate_limiter.py:146-152The dynamic timeout logic is sound for MINUTE+ periods, but for
SECOND-period requests it silently reduces the timeout from the old hardcoded 30 s tomin(1 + 5, 120) = 6 s. Any SaaS connector that configuresperiod: secondunder sustained load will now hitRateLimiterTimeoutExceptionfive times faster than before.The Okta and SaaS
authenticated_clientcallers pass no explicittimeout_seconds, so they'll pick up this new default. Worth validating that no active SaaS connector YAML relies on the old 30 s behaviour for second-period limits (or documenting the intentional change).