Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions src/sentry/tasks/context_engine_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import seer_tasks
from sentry.taskworker.retry import Retry
from sentry.utils.cache import cache
from sentry.utils.hashlib import md5_text
from sentry.utils.query import RangeQuerySetWrapper
from sentry.utils.snuba_rpc import SnubaRPCRateLimitExceeded

logger = logging.getLogger(__name__)
INDEXING_DAY = 6 # Sunday
CONTEXT_ENGINE_ENABLED_ORG_IDS = "context_engine_indexing:enabled_org_ids"
CONTEXT_ENGINE_CACHE_TTL = 8 * 24 * 60 * 60 # 8 days


@instrumented_task(
Expand Down Expand Up @@ -208,32 +212,45 @@ def build_service_map(organization_id: int, *args, **kwargs) -> None:
raise


def get_allowed_org_ids_context_engine_indexing() -> list[int]:
def get_allowed_org_ids_context_engine_indexing() -> tuple[list[int], list[int]]:
"""
Get the list of allowed organizations for context engine indexing.

Only includes orgs that have the seer-explorer-context-engine feature flag
enabled. Spreads orgs evenly across every hour
of the day, every day of the week (168 slots total). Each org is
deterministically assigned a slot via md5 hash so it is indexed exactly
once per week.
enabled. On the weekly indexing day (Sunday), spreads orgs evenly across
24 hourly slots via md5 hash. On other days, only newly-enabled orgs
(detected via cache diff) are returned as eligible.
"""
now = datetime.now(UTC)
current_slot = now.weekday() * 24 + now.hour
TOTAL_HOURLY_SLOTS = 24 * 7 # 168 slots across every hour of the week
TOTAL_HOURLY_SLOTS = 24
Comment thread
Mihir-Mavalankar marked this conversation as resolved.

eligible_org_ids: list[int] = []
all_enabled_org_ids: list[int] = []

for org in RangeQuerySetWrapper(
Organization.objects.filter(status=ObjectStatus.ACTIVE),
result_value_getter=lambda o: o.id,
):
if features.has("organizations:seer-explorer", org) and features.has(
"organizations:seer-explorer-context-engine", org
):
if int(md5_text(str(org.id)).hexdigest(), 16) % TOTAL_HOURLY_SLOTS == current_slot:
eligible_org_ids.append(org.id)
if features.has("organizations:seer-explorer-context-engine", org):
Comment thread
Mihir-Mavalankar marked this conversation as resolved.
all_enabled_org_ids.append(org.id)

if now.weekday() == INDEXING_DAY:
slot = now.hour
for org_id in all_enabled_org_ids:
if int(md5_text(str(org_id)).hexdigest(), 16) % TOTAL_HOURLY_SLOTS == slot:
eligible_org_ids.append(org_id)

previous_enabled_org_ids = cache.get(CONTEXT_ENGINE_ENABLED_ORG_IDS)
if previous_enabled_org_ids is not None:
Comment thread
Mihir-Mavalankar marked this conversation as resolved.
newly_added_org_ids_set = set(all_enabled_org_ids) - set(previous_enabled_org_ids)
if newly_added_org_ids_set:
logger.info(
"Adding context engine index for recently enabled orgs",
extra={"org_ids": list(newly_added_org_ids_set)},
)
eligible_org_ids = list(set(eligible_org_ids).union(newly_added_org_ids_set))

return eligible_org_ids
return all_enabled_org_ids, eligible_org_ids


@instrumented_task(
Expand All @@ -252,10 +269,7 @@ def schedule_context_engine_indexing_tasks() -> None:
logger.info("explorer.context_engine_indexing.enable flag is disabled")
return

allowed_org_ids = get_allowed_org_ids_context_engine_indexing()
if not allowed_org_ids:
logger.info("No allowed organizations for context engine indexing")
return
feature_enabled_org_ids, allowed_org_ids = get_allowed_org_ids_context_engine_indexing()

dispatched = 0
for org_id in allowed_org_ids:
Expand All @@ -269,6 +283,13 @@ def schedule_context_engine_indexing_tasks() -> None:
extra={"org_id": org_id},
)

# Store full currently-enabled orgs so next run can compute a stable diff.
cache.set(CONTEXT_ENGINE_ENABLED_ORG_IDS, feature_enabled_org_ids, CONTEXT_ENGINE_CACHE_TTL)
logger.info(
"Stored context engine enabled org ids cache size",
extra={"size": len(feature_enabled_org_ids)},
)

logger.info(
"Scheduled context engine indexing tasks",
extra={"total_org_count": len(allowed_org_ids), "dispatched": dispatched},
Expand Down
35 changes: 15 additions & 20 deletions tests/sentry/tasks/test_context_engine_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,9 @@ def test_returns_only_orgs_assigned_to_current_slot(self):
orgs = [self.create_organization() for _ in range(50)]
org_ids = [org.id for org in orgs]

TOTAL_SLOTS = 168
TOTAL_SLOTS = 24
target_slot = int(md5_text(str(org_ids[0])).hexdigest(), 16) % TOTAL_SLOTS
day = target_slot // 24
hour = target_slot % 24
frozen_time = f"2024-01-{15 + day} {hour:02d}:00:00"
frozen_time = f"2024-01-14 {target_slot:02d}:00:00"

def feature_enabled_for_test_orgs(_flag_name: str, org, *args, **kwargs) -> bool:
return org.id in org_ids
Expand All @@ -137,12 +135,12 @@ def feature_enabled_for_test_orgs(_flag_name: str, org, *args, **kwargs) -> bool
"sentry.tasks.context_engine_index.features.has",
side_effect=feature_enabled_for_test_orgs,
):
result = get_allowed_org_ids_context_engine_indexing()
_feature_enabled, eligible = get_allowed_org_ids_context_engine_indexing()

assert len(result) > 0
assert org_ids[0] in result
assert all(org_id in org_ids for org_id in result)
for org_id in result:
assert len(eligible) > 0
assert org_ids[0] in eligible
assert all(org_id in org_ids for org_id in eligible)
for org_id in eligible:
assert int(md5_text(str(org_id)).hexdigest(), 16) % TOTAL_SLOTS == target_slot

def test_excludes_orgs_without_feature_flag(self):
Expand All @@ -151,13 +149,9 @@ def test_excludes_orgs_without_feature_flag(self):
org_with_flag = self.create_organization()
org_without_flag = self.create_organization()

# Freeze to org_without_flag's own slot so we know it would appear
# if it had the flag — proving exclusion is due to the flag check.
TOTAL_SLOTS = 168
TOTAL_SLOTS = 24
target_slot = int(md5_text(str(org_without_flag.id)).hexdigest(), 16) % TOTAL_SLOTS
day = target_slot // 24
hour = target_slot % 24
frozen_time = f"2024-01-{15 + day} {hour:02d}:00:00"
frozen_time = f"2024-01-14 {target_slot:02d}:00:00"

with freeze_time(frozen_time):
with self.feature(
Expand All @@ -166,9 +160,9 @@ def test_excludes_orgs_without_feature_flag(self):
"organizations:seer-explorer-context-engine": [org_with_flag.slug],
}
):
result = get_allowed_org_ids_context_engine_indexing()
_feature_enabled, eligible = get_allowed_org_ids_context_engine_indexing()

assert org_without_flag.id not in result
assert org_without_flag.id not in eligible

def test_returns_empty_when_no_orgs_have_feature_flag(self):
with self.feature(
Expand All @@ -177,9 +171,10 @@ def test_returns_empty_when_no_orgs_have_feature_flag(self):
"organizations:seer-explorer-context-engine": False,
}
):
result = get_allowed_org_ids_context_engine_indexing()
feature_enabled, eligible = get_allowed_org_ids_context_engine_indexing()

assert result == []
assert feature_enabled == []
assert eligible == []


@django_db_all
Expand All @@ -190,7 +185,7 @@ class TestScheduleContextEngineIndexingTasks(TestCase):
def test_dispatches_for_allowed_orgs(self, mock_get_orgs, mock_index, mock_build):
org1 = self.create_organization()
org2 = self.create_organization()
mock_get_orgs.return_value = [org1.id, org2.id]
mock_get_orgs.return_value = ([org1.id, org2.id], [org1.id, org2.id])

with override_options(
{
Expand Down
Loading