-
Notifications
You must be signed in to change notification settings - Fork 91
ENG-3375: Add TTL to RedisRepository keys #7884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
59bad32
f2c64de
d0ef770
acd990e
e4b95ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| type: Fixed | ||
| description: Apply TTL to all RedisRepository keys to prevent unbounded Redis memory growth across PBAC entities. | ||
| pr: 7884 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
|
|
||
| from fides.api.common_exceptions import RedisConnectionError | ||
| from fides.api.util.cache import FidesopsRedis | ||
| from fides.config import CONFIG | ||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
@@ -29,8 +30,9 @@ class RedisRepository(ABC, Generic[T]): | |
|
|
||
| PREFIX: str # e.g. "data_purpose" | ||
|
|
||
| def __init__(self, cache: FidesopsRedis) -> None: | ||
| def __init__(self, cache: FidesopsRedis, ttl_seconds: Optional[int] = None) -> None: | ||
| self._cache = cache | ||
| self._ttl_seconds = ttl_seconds if ttl_seconds is not None else CONFIG.redis.default_ttl_seconds | ||
|
|
||
| # ── Key construction (private) ──────────────────────────────────── | ||
|
|
||
|
|
@@ -77,11 +79,13 @@ def save(self, entity: T) -> T: | |
| for field, value in self._get_index_entries(old_entity): | ||
| pipe.srem(self._index_key(field, value), pk) | ||
|
|
||
| # Write entity + add new indexes | ||
| pipe.set(key, self._serialize(entity)) | ||
| # Write entity + add new indexes (all keys get a TTL) | ||
| pipe.set(key, self._serialize(entity), ex=self._ttl_seconds) | ||
| pipe.sadd(self._all_key(), pk) | ||
| pipe.expire(self._all_key(), self._ttl_seconds) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The This may be intentional (sliding TTL for active collections is a reasonable design), but it's meaningfully different from the absolute TTL that the entity key gets. Worth a comment to make the intent explicit: # Shared set keys get a sliding TTL — any write to the collection
# extends expiry, so active collections stay alive.
pipe.expire(self._all_key(), self._ttl_seconds) |
||
| for field, value in self._get_index_entries(entity): | ||
| pipe.sadd(self._index_key(field, value), pk) | ||
| pipe.expire(self._index_key(field, value), self._ttl_seconds) | ||
|
Comment on lines
+82
to
+88
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a bit confused , don't we have this same code in fidesplus? it seems to me |
||
| pipe.execute() | ||
| except RedisConnectionError: | ||
| logger.error("Redis connection error during save of {}:{}", self.PREFIX, pk) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| """Unit tests for the RedisRepository base class.""" | ||
|
|
||
| import json | ||
| from dataclasses import dataclass | ||
| from datetime import datetime | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
| from fastapi_pagination import Params | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| from fides.service.pbac.redis_repository import RedisRepository | ||
|
|
||
| TTL = 604800 # 7 days | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This constant duplicates the config default value. If the default ever changes, this test silently breaks. Consider asserting against # Instead of:
assert repo._ttl_seconds == 86400
# Prefer:
assert repo._ttl_seconds == CONFIG.redis.default_ttl_seconds |
||
|
|
||
|
|
||
| @dataclass | ||
| class SampleEntity: | ||
| id: str | ||
| name: str | ||
| category: str | ||
| created_at: datetime | ||
|
|
||
|
|
||
| class SampleRepository(RedisRepository[SampleEntity]): | ||
| PREFIX = "sample" | ||
|
|
||
| def _serialize(self, entity: SampleEntity) -> str: | ||
| return json.dumps( | ||
| { | ||
| "id": entity.id, | ||
| "name": entity.name, | ||
| "category": entity.category, | ||
| "created_at": entity.created_at.isoformat(), | ||
| } | ||
| ) | ||
|
|
||
| def _deserialize(self, data: str) -> SampleEntity: | ||
| d = json.loads(data) | ||
| d["created_at"] = datetime.fromisoformat(d["created_at"]) | ||
| return SampleEntity(**d) | ||
|
|
||
| def _get_pk(self, entity: SampleEntity) -> str: | ||
| return entity.id | ||
|
|
||
| def _get_index_entries(self, entity: SampleEntity) -> list[tuple[str, str]]: | ||
| return [("category", entity.category)] | ||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def mock_cache(): | ||
| cache = MagicMock() | ||
| cache.pipeline.return_value = MagicMock() | ||
| cache.pipeline.return_value.execute.return_value = None | ||
| return cache | ||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def repo(mock_cache): | ||
| return SampleRepository(mock_cache, ttl_seconds=TTL) | ||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def sample_entity(): | ||
| return SampleEntity( | ||
| id="abc123", | ||
| name="Test Entity", | ||
| category="widgets", | ||
| created_at=datetime(2024, 1, 1, 12, 0, 0), | ||
| ) | ||
|
|
||
|
|
||
| class TestSaveTTL: | ||
| """Verify that save() applies TTL to all Redis keys.""" | ||
|
|
||
| def test_entity_key_has_ttl(self, repo, mock_cache, sample_entity): | ||
| mock_cache.get.return_value = None | ||
| repo.save(sample_entity) | ||
|
|
||
| pipe = mock_cache.pipeline.return_value | ||
| pipe.set.assert_called_once_with( | ||
| "sample:abc123", repo._serialize(sample_entity), ex=TTL | ||
| ) | ||
|
|
||
| def test_all_key_has_ttl(self, repo, mock_cache, sample_entity): | ||
| mock_cache.get.return_value = None | ||
| repo.save(sample_entity) | ||
|
|
||
| pipe = mock_cache.pipeline.return_value | ||
| pipe.sadd.assert_any_call("sample:_all", "abc123") | ||
| pipe.expire.assert_any_call("sample:_all", TTL) | ||
|
|
||
| def test_index_keys_have_ttl(self, repo, mock_cache, sample_entity): | ||
| mock_cache.get.return_value = None | ||
| repo.save(sample_entity) | ||
|
|
||
| pipe = mock_cache.pipeline.return_value | ||
| pipe.sadd.assert_any_call("sample:idx:category:widgets", "abc123") | ||
| pipe.expire.assert_any_call("sample:idx:category:widgets", TTL) | ||
|
|
||
| def test_pipeline_is_executed(self, repo, mock_cache, sample_entity): | ||
| mock_cache.get.return_value = None | ||
| repo.save(sample_entity) | ||
|
|
||
| pipe = mock_cache.pipeline.return_value | ||
| pipe.execute.assert_called_once() | ||
|
|
||
| def test_update_cleans_stale_indexes_with_ttl(self, repo, mock_cache, sample_entity): | ||
| old_entity = SampleEntity( | ||
| id="abc123", | ||
| name="Old", | ||
| category="old_category", | ||
| created_at=datetime(2024, 1, 1), | ||
| ) | ||
| mock_cache.get.return_value = repo._serialize(old_entity) | ||
| repo.save(sample_entity) | ||
|
|
||
| pipe = mock_cache.pipeline.return_value | ||
| # Old index entry removed | ||
| pipe.srem.assert_any_call("sample:idx:category:old_category", "abc123") | ||
| # New entity key still gets TTL | ||
| pipe.set.assert_called_once_with( | ||
| "sample:abc123", repo._serialize(sample_entity), ex=TTL | ||
| ) | ||
|
|
||
| @patch("fides.service.pbac.redis_repository.CONFIG") | ||
| def test_defaults_to_config_ttl(self, mock_config, mock_cache): | ||
| mock_config.redis.default_ttl_seconds = 86400 | ||
| repo = SampleRepository(mock_cache) | ||
| assert repo._ttl_seconds == 86400 | ||
|
|
||
|
|
||
| class TestDelete: | ||
| def test_delete_existing(self, repo, mock_cache, sample_entity): | ||
| mock_cache.get.return_value = repo._serialize(sample_entity) | ||
| result = repo.delete("abc123") | ||
|
|
||
| assert result is True | ||
| pipe = mock_cache.pipeline.return_value | ||
| pipe.delete.assert_called_once_with("sample:abc123") | ||
| pipe.srem.assert_any_call("sample:_all", "abc123") | ||
| pipe.srem.assert_any_call("sample:idx:category:widgets", "abc123") | ||
| pipe.execute.assert_called() | ||
|
|
||
| def test_delete_missing(self, repo, mock_cache): | ||
| mock_cache.get.return_value = None | ||
| result = repo.delete("nonexistent") | ||
| assert result is False | ||
|
|
||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/fides/service/pbac/redis_repository.py:34_ttl_secondsis evaluated once at construction time fromCONFIG. Ifttl_seconds=0or a negative value is passed in, Redis will interpretEXPIRE key 0as an immediate delete, silently discarding every entity immediately after writing. Consider adding a guard: