Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/7938-fix-rate-limits.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: Fixed
description: Fixed problems with larger timeout checks on rate_limit for integrations
pr: 7938
labels: []
54 changes: 46 additions & 8 deletions src/fides/api/service/connectors/limiter/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
from enum import Enum
from typing import List
from typing import List, Optional

from loguru import logger

Expand Down Expand Up @@ -53,6 +53,8 @@ class RateLimiter:
"""

EXPIRE_AFTER_PERIOD_SECONDS: int = 500
MAX_DEFAULT_TIMEOUT_SECONDS: int = 120
MIN_DEFAULT_TIMEOUT_SECONDS: int = 30

def build_redis_key(self, current_seconds: int, request: RateLimiterRequest) -> str:
"""
Expand Down Expand Up @@ -110,20 +112,45 @@ def decrement_usage(
pipe.decrby(redis_key, 1)
pipe.execute()

def seconds_until_next_bucket(
self, current_seconds: int, request: RateLimiterRequest
) -> int:
"""
Returns the number of seconds until the next time bucket starts for the given request.
"""
fixed_time_filter = (
int(current_seconds / request.period.factor) * request.period.factor
)
return (fixed_time_filter + request.period.factor) - current_seconds

def limit(
self, requests: List[RateLimiterRequest], timeout_seconds: int = 30
self, requests: List[RateLimiterRequest], timeout_seconds: Optional[int] = None
) -> None:
"""
Increments call count for the current time bucket and verifies that it is within the
rate limit provided. If limit is breached it will decrement the count and try again
until it can successfully reserve a call, or timeout. Because we rely on optimistic
locking for many keys at a time, it is possible that concurrent rate limiters could
make the wrong decision in between increment to decrement operations.
rate limit provided. If limit is breached it will decrement the count and sleep until
the current bucket rolls over before retrying. Because we rely on optimistic locking
for many keys at a time, it is possible that concurrent rate limiters could make the
wrong decision in between increment to decrement operations.

If connection to the redis cluster fails then rate limiter will be skipped.

Expiration is set on any keys which are stored in the cluster
Expiration is set on any keys which are stored in the cluster.

timeout_seconds defaults to the longest period factor + 5s (capped at 120s),
giving the limiter at least one full bucket rollover window before giving up.
The cap prevents HOUR/DAY limits from blocking a worker for unreasonable
durations; connectors like SurveyMonkey configure both minute and day limits,
and a breached day limit should fail fast rather than sleep for 24 hours.
"""
if timeout_seconds is None:
timeout_seconds = min(
max(r.period.factor for r in requests) + 5
if requests
else self.MIN_DEFAULT_TIMEOUT_SECONDS,
self.MAX_DEFAULT_TIMEOUT_SECONDS,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/fides/api/service/connectors/limiter/rate_limiter.py:146-152

The dynamic timeout logic is sound for MINUTE+ periods, but for SECOND-period requests it silently reduces the timeout from the old hardcoded 30 s to min(1 + 5, 120) = 6 s. Any SaaS connector that configures period: second under sustained load will now hit RateLimiterTimeoutException five times faster than before.

The Okta and SaaS authenticated_client callers pass no explicit timeout_seconds, so they'll pick up this new default. Worth validating that no active SaaS connector YAML relies on the old 30 s behaviour for second-period limits (or documenting the intentional change).


try:
redis: FidesopsRedis = get_cache()
except RedisConnectionError as exc:
Expand All @@ -135,6 +162,7 @@ def limit(
return

start_time = time.time()
breached_requests: List[RateLimiterRequest] = []
while time.time() - start_time < timeout_seconds:
current_seconds = int(time.time())

Expand All @@ -156,7 +184,17 @@ def limit(
self.decrement_usage(
redis=redis, current_seconds=current_seconds, requests=requests
)
time.sleep(0.1)
# Sleep until the next bucket boundary for the longest-period breached
# request. This avoids hammering Redis every 100ms for minute/hour/day
# limits where the bucket won't roll for a long time.
sleep_seconds = max(
self.seconds_until_next_bucket(current_seconds, r)
for r in breached_requests
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/fides/api/service/connectors/limiter/rate_limiter.py:190-193

current_seconds was captured at the top of the loop (int(time.time())) before the Redis pipeline round-trips for both increment_usage and decrement_usage. By the time we call seconds_until_next_bucket(current_seconds, r) here, real clock time has advanced (typically a few milliseconds, but up to hundreds on a loaded Redis). The computed sleep_seconds is therefore slightly over-estimated — we'll wake up a bit past the true bucket boundary and then pay an extra increment_usage / branch iteration.

For practical purposes this is harmless (the 0.05 s buffer and the remaining cap absorb it), but snapshotting the time again here or passing the actual elapsed time to seconds_until_next_bucket would make the intent clearer:

now = int(time.time())
sleep_seconds = max(
    self.seconds_until_next_bucket(now, r)
    for r in breached_requests
)

# Add a small buffer to avoid landing exactly on the boundary, but
# never sleep longer than the remaining timeout.
remaining = timeout_seconds - (time.time() - start_time)
time.sleep(min(sleep_seconds + 0.05, max(remaining, 0)))
else:
# success
return
Expand Down
180 changes: 180 additions & 0 deletions tests/ops/integration_tests/limiter/test_rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import unittest.mock as mock
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, Generator, List

import pytest
from freezegun import freeze_time
from requests import Session

from fides.api.common_exceptions import RedisConnectionError
from fides.api.db import session
from fides.api.graph.graph import DatasetGraph
from fides.api.models.connectionconfig import (
Expand Down Expand Up @@ -218,6 +221,88 @@ def test_limiter_times_out_when_bucket_full() -> None:
time.sleep(0.002)


@pytest.mark.integration
def test_minute_period_breach_waits_for_rollover() -> None:
"""A MINUTE-period breach must sleep until the bucket rolls over, not time out.

Regression: the old default timeout_seconds=30 was shorter than the MINUTE
bucket period (60s). When a breach occurred more than 30s before the next
boundary the limiter would raise RateLimiterTimeoutException instead of
waiting. This affected the Okta client (period=MINUTE) and any SaaS
connector with a per-minute rate limit (e.g. Zenoti, SurveyMonkey).

Uses real Redis for bucket state; only mocks time to avoid 60s wall-clock
waits.
"""
# "2024-01-01 00:00:05" is 5s into a minute, so the next bucket is 55s
# away. The old 30s default would time out before reaching it.
with freeze_time("2024-01-01 00:00:05") as frozen:
limiter = RateLimiter()
key = f"test_minute_rollover_{random.randint(0, 10**12)}"
request = RateLimiterRequest(
key=key, rate_limit=1, period=RateLimiterPeriod.MINUTE
)

def advancing_sleep(seconds: float) -> None:
frozen.tick(timedelta(seconds=seconds))

with mock.patch(
"fides.api.service.connectors.limiter.rate_limiter.time.sleep",
side_effect=advancing_sleep,
):
limiter.limit(requests=[request]) # fills the single slot
limiter.limit(requests=[request]) # breach -> sleep to boundary -> succeed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/ops/integration_tests/limiter/test_rate_limiter.py:253-254

The test passes by not raising, which is correct, but there is no positive assertion to confirm the limiter actually waited for the bucket rollover. If, say, freeze_time failed to intercept time.sleep and the second limit() call returned immediately (e.g., due to a Redis key collision cleaning itself up), the test would still pass.

A small guard like checking that the frozen clock advanced by roughly the expected sleep duration would make the regression protection more robust:

limiter.limit(requests=[request])  # fills the single slot
before = time.time()
limiter.limit(requests=[request])  # breach -> sleep to boundary -> succeed
assert time.time() - before >= 50  # slept at least 50 s into the new bucket


# Confirm the limiter actually slept past the bucket boundary (00:01:00),
# not just that it didn't raise.
assert frozen().timestamp() >= datetime(2024, 1, 1, 0, 1, 0).timestamp()


@pytest.mark.integration
def test_dynamic_timeout_capped_for_day_limits() -> None:
"""Mixed MINUTE + DAY limits must not block a worker for hours.

SurveyMonkey configures ``rate: 120/minute`` and ``rate: 500/day``.
Without a cap the dynamic timeout would be ``86400 + 5 = 86405s`` (~24h),
leaving a Celery worker sleeping until the next day bucket rolls over.
The 120s cap ensures the limiter fails fast and surfaces an error instead.

Uses real Redis for bucket state; only mocks time to avoid real sleeping.
"""
with freeze_time("2024-01-01 00:00:05") as frozen:
limiter = RateLimiter()
key = f"test_day_cap_{random.randint(0, 10**12)}"

minute_request = RateLimiterRequest(
key=f"{key}:min", rate_limit=1, period=RateLimiterPeriod.MINUTE
)
day_request = RateLimiterRequest(
key=f"{key}:day", rate_limit=1, period=RateLimiterPeriod.DAY
)
both = [minute_request, day_request]

sleep_total = [0.0]

def advancing_sleep(seconds: float) -> None:
sleep_total[0] += seconds
frozen.tick(timedelta(seconds=seconds))

with mock.patch(
"fides.api.service.connectors.limiter.rate_limiter.time.sleep",
side_effect=advancing_sleep,
):
# Fill both buckets.
limiter.limit(requests=both)

# Next call breaches both. Should timeout, not sleep for 24h.
with pytest.raises(RateLimiterTimeoutException):
limiter.limit(requests=both)

# Total mocked sleep must reflect the 120s cap, not the 86405s
# uncapped value.
assert 110 <= sleep_total[0] < 130 # should be ~120 s, not 86400 s


@pytest.mark.integration_saas
@pytest.mark.asyncio
async def test_rate_limiter_full_integration(
Expand Down Expand Up @@ -273,3 +358,98 @@ def wrapper(self, *args, **kwargs):

wrapper.call_log = call_log
return wrapper


class TestRateLimiterRedisFailure:
"""Unit tests for RateLimiter.limit() when Redis is unavailable."""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/ops/integration_tests/limiter/test_rate_limiter.py:362

TestRateLimiterRedisFailure and TestSecondsUntilNextBucket are pure unit tests — they mock all external deps or do pure computation — but they live in integration_tests/ without a @pytest.mark.integration marker on their methods. This means they run in all test modes (unit + integration), which is actually desirable. Just worth a brief comment explaining the intentional placement so a future reader doesn't add an unnecessary marker or move the class to a unit test file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


def test_redis_connection_error_is_silently_skipped(self) -> None:
with mock.patch(
"fides.api.service.connectors.limiter.rate_limiter.get_cache",
side_effect=RedisConnectionError("Redis unavailable"),
):
# Should not raise — limiter is a no-op when Redis is down.
RateLimiter().limit(
requests=[
RateLimiterRequest(
key="k", rate_limit=1, period=RateLimiterPeriod.SECOND
)
]
)


class TestSecondsUntilNextBucket:
"""Unit tests for RateLimiter.seconds_until_next_bucket."""

def _req(self, period: RateLimiterPeriod) -> RateLimiterRequest:
return RateLimiterRequest(key="k", rate_limit=10, period=period)

def test_second_at_boundary(self) -> None:
# At the start of a second (e.g. t=1000), 1s remains.
assert (
RateLimiter().seconds_until_next_bucket(
1000, self._req(RateLimiterPeriod.SECOND)
)
== 1
)

def test_day_at_start(self) -> None:
# At the exact start of a day (t=86400), 86400s remain.
assert (
RateLimiter().seconds_until_next_bucket(
86400, self._req(RateLimiterPeriod.DAY)
)
== 86400
)

def test_day_mid(self) -> None:
# 12 hours into a day → 12 hours remain.
assert (
RateLimiter().seconds_until_next_bucket(
86400 + 43200, self._req(RateLimiterPeriod.DAY)
)
== 43200
)

def test_minute_at_start(self) -> None:
# At the exact start of a minute (e.g. t=600), 60s remain.
assert (
RateLimiter().seconds_until_next_bucket(
600, self._req(RateLimiterPeriod.MINUTE)
)
== 60
)

def test_minute_30s_in(self) -> None:
# 30 seconds into a minute → 30s remain.
assert (
RateLimiter().seconds_until_next_bucket(
630, self._req(RateLimiterPeriod.MINUTE)
)
== 30
)

def test_minute_59s_in(self) -> None:
# 59 seconds into a minute → 1s remains.
assert (
RateLimiter().seconds_until_next_bucket(
659, self._req(RateLimiterPeriod.MINUTE)
)
== 1
)

def test_hour_at_start(self) -> None:
assert (
RateLimiter().seconds_until_next_bucket(
3600, self._req(RateLimiterPeriod.HOUR)
)
== 3600
)

def test_hour_mid(self) -> None:
assert (
RateLimiter().seconds_until_next_bucket(
5400, self._req(RateLimiterPeriod.HOUR)
)
== 1800
)
Loading