-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
Revert "feat(ACI): Make rule stats and group history endpoints backwards compatible (#110282)" #111038
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revert "feat(ACI): Make rule stats and group history endpoints backwards compatible (#110282)" #111038
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |
| from sentry.models.rulefirehistory import RuleFireHistory | ||
| from sentry.rules.history.base import RuleGroupHistory, RuleHistoryBackend, TimeSeriesValue | ||
| from sentry.utils.cursors import Cursor, CursorResult | ||
| from sentry.workflow_engine.models.workflow import Workflow | ||
| from sentry.workflow_engine.models import AlertRuleWorkflow | ||
|
|
||
| if TYPE_CHECKING: | ||
| from sentry.models.rule import Rule | ||
|
|
@@ -60,63 +60,72 @@ def record( | |
| notification_uuid=notification_uuid, | ||
| ) | ||
|
|
||
| def fetch_workflow_groups( | ||
| def fetch_rule_groups_paginated( | ||
| self, | ||
| workflow: Workflow, | ||
| rule: Rule, | ||
| start: datetime, | ||
| end: datetime, | ||
| cursor: Cursor | None = None, | ||
| per_page: int = 25, | ||
| ) -> CursorResult[RuleGroupHistory]: | ||
| # Performs the raw SQL query with pagination | ||
| def data_fn(offset: int, limit: int) -> list[_Result]: | ||
| query = """ | ||
| WITH workflow_data AS ( | ||
| SELECT group_id, date_added, event_id | ||
| FROM workflow_engine_workflowfirehistory | ||
| WHERE workflow_id = %s | ||
| AND date_added >= %s AND date_added < %s | ||
| ) | ||
| SELECT | ||
| group_id as group, | ||
| COUNT(*) as count, | ||
| MAX(date_added) as last_triggered, | ||
| (ARRAY_AGG(event_id ORDER BY date_added DESC))[1] as event_id | ||
| FROM workflow_data | ||
| GROUP BY group_id | ||
| ORDER BY count DESC, last_triggered DESC | ||
| LIMIT %s OFFSET %s | ||
| """ | ||
|
|
||
| with connection.cursor() as cursor: | ||
| cursor.execute(query, [workflow.id, start, end, limit, offset]) | ||
| return [ | ||
| _Result( | ||
| group=row[0], | ||
| count=row[1], | ||
| last_triggered=row[2], | ||
| event_id=row[3], | ||
| try: | ||
| alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id) | ||
| workflow = alert_rule_workflow.workflow | ||
|
|
||
| # Performs the raw SQL query with pagination | ||
| def data_fn(offset: int, limit: int) -> list[_Result]: | ||
| query = """ | ||
| WITH combined_data AS ( | ||
| SELECT group_id, date_added, event_id | ||
| FROM sentry_rulefirehistory | ||
| WHERE rule_id = %s AND date_added >= %s AND date_added < %s | ||
| UNION ALL | ||
| SELECT group_id, date_added, event_id | ||
| FROM workflow_engine_workflowfirehistory | ||
| WHERE workflow_id = %s | ||
| AND date_added >= %s AND date_added < %s | ||
| ) | ||
| for row in cursor.fetchall() | ||
| ] | ||
| SELECT | ||
| group_id as group, | ||
| COUNT(*) as count, | ||
| MAX(date_added) as last_triggered, | ||
| (ARRAY_AGG(event_id ORDER BY date_added DESC))[1] as event_id | ||
| FROM combined_data | ||
| GROUP BY group_id | ||
| ORDER BY count DESC, last_triggered DESC | ||
| LIMIT %s OFFSET %s | ||
| """ | ||
|
|
||
| result = GenericOffsetPaginator(data_fn=data_fn).get_result(per_page, cursor) | ||
| result.results = convert_results(result.results) | ||
| return result | ||
| with connection.cursor() as cursor: | ||
| cursor.execute( | ||
| query, [rule.id, start, end, workflow.id, start, end, limit, offset] | ||
| ) | ||
| return [ | ||
| _Result( | ||
| group=row[0], | ||
| count=row[1], | ||
| last_triggered=row[2], | ||
| event_id=row[3], | ||
| ) | ||
| for row in cursor.fetchall() | ||
| ] | ||
|
|
||
| result = GenericOffsetPaginator(data_fn=data_fn).get_result(per_page, cursor) | ||
| result.results = convert_results(result.results) | ||
|
|
||
| return result | ||
|
|
||
| except AlertRuleWorkflow.DoesNotExist: | ||
| # If no workflow is associated with this rule, just use the original behavior | ||
| logger.exception("No workflow associated with rule", extra={"rule_id": rule.id}) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| pass | ||
|
|
||
| def fetch_rule_groups( | ||
| self, | ||
| target: Rule, | ||
| start: datetime, | ||
| end: datetime, | ||
| cursor: Cursor | None = None, | ||
| per_page: int = 25, | ||
| ) -> CursorResult[RuleGroupHistory]: | ||
| rule_filtered_history = RuleFireHistory.objects.filter( | ||
| rule=target, | ||
| rule=rule, | ||
| date_added__gte=start, | ||
| date_added__lt=end, | ||
| ) | ||
|
|
||
| # subquery that retrieves row with the largest date in a group for RuleFireHistory | ||
| rule_group_max_dates = rule_filtered_history.filter(group=OuterRef("group")).order_by( | ||
| "-date_added" | ||
|
|
@@ -133,77 +142,71 @@ def fetch_rule_groups( | |
| qs, order_by=("-count", "-last_triggered"), on_results=convert_results | ||
| ).get_result(per_page, cursor) | ||
|
|
||
| def fetch_rule_groups_paginated( | ||
| self, | ||
| target: Rule | Workflow, | ||
| start: datetime, | ||
| end: datetime, | ||
| cursor: Cursor | None = None, | ||
| per_page: int = 25, | ||
| ) -> CursorResult[RuleGroupHistory]: | ||
| if isinstance(target, Workflow): | ||
| return self.fetch_workflow_groups(target, start, end, cursor, per_page) | ||
|
|
||
| return self.fetch_rule_groups(target, start, end, cursor, per_page) | ||
|
|
||
| def fetch_workflow_stats( | ||
| self, target: Workflow, start: datetime, end: datetime | ||
| ) -> dict[datetime, TimeSeriesValue]: | ||
| # Use raw SQL to combine data from both tables | ||
| with connection.cursor() as db_cursor: | ||
| db_cursor.execute( | ||
| """ | ||
| SELECT | ||
| DATE_TRUNC('hour', date_added) as bucket, | ||
| COUNT(*) as count | ||
| FROM ( | ||
| SELECT date_added | ||
| FROM workflow_engine_workflowfirehistory | ||
| WHERE workflow_id = %s | ||
| AND date_added >= %s | ||
| AND date_added < %s | ||
| ) combined_data | ||
| GROUP BY DATE_TRUNC('hour', date_added) | ||
| ORDER BY bucket | ||
| """, | ||
| [target.id, start, end], | ||
| ) | ||
|
|
||
| results = db_cursor.fetchall() | ||
|
|
||
| # Convert raw SQL results to the expected format | ||
| existing_data = {row[0]: TimeSeriesValue(row[0], row[1]) for row in results} | ||
| return existing_data | ||
|
|
||
| def fetch_rule_stats( | ||
| self, target: Rule, start: datetime, end: datetime | ||
| ) -> dict[datetime, TimeSeriesValue]: | ||
| qs = ( | ||
| RuleFireHistory.objects.filter( | ||
| rule=target, | ||
| date_added__gte=start, | ||
| date_added__lt=end, | ||
| ) | ||
| .annotate(bucket=TruncHour("date_added")) | ||
| .order_by("bucket") | ||
| .values("bucket") | ||
| .annotate(count=Count("id")) | ||
| ) | ||
| existing_data = {row["bucket"]: TimeSeriesValue(row["bucket"], row["count"]) for row in qs} | ||
| return existing_data | ||
|
|
||
| def fetch_rule_hourly_stats( | ||
| self, target: Rule | Workflow, start: datetime, end: datetime | ||
| self, rule: Rule, start: datetime, end: datetime | ||
| ) -> Sequence[TimeSeriesValue]: | ||
| start = start.replace(tzinfo=timezone.utc) | ||
| end = end.replace(tzinfo=timezone.utc) | ||
|
|
||
| existing_data: dict[datetime, TimeSeriesValue] = {} | ||
|
|
||
| if isinstance(target, Workflow): | ||
| existing_data = self.fetch_workflow_stats(target, start, end) | ||
| else: | ||
| existing_data = self.fetch_rule_stats(target, start, end) | ||
| try: | ||
| alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id) | ||
| workflow = alert_rule_workflow.workflow | ||
|
|
||
| # Use raw SQL to combine data from both tables | ||
| with connection.cursor() as db_cursor: | ||
| db_cursor.execute( | ||
| """ | ||
| SELECT | ||
| DATE_TRUNC('hour', date_added) as bucket, | ||
| COUNT(*) as count | ||
| FROM ( | ||
| SELECT date_added | ||
| FROM sentry_rulefirehistory | ||
| WHERE rule_id = %s | ||
| AND date_added >= %s | ||
| AND date_added < %s | ||
|
|
||
| UNION ALL | ||
|
|
||
| SELECT date_added | ||
| FROM workflow_engine_workflowfirehistory | ||
| WHERE workflow_id = %s | ||
| AND date_added >= %s | ||
| AND date_added < %s | ||
| ) combined_data | ||
| GROUP BY DATE_TRUNC('hour', date_added) | ||
| ORDER BY bucket | ||
| """, | ||
| [rule.id, start, end, workflow.id, start, end], | ||
| ) | ||
|
|
||
| results = db_cursor.fetchall() | ||
|
|
||
| # Convert raw SQL results to the expected format | ||
| existing_data = {row[0]: TimeSeriesValue(row[0], row[1]) for row in results} | ||
|
|
||
| except AlertRuleWorkflow.DoesNotExist: | ||
| # If no workflow is associated with this rule, just use the original behavior | ||
| logger.exception("No workflow associated with rule", extra={"rule_id": rule.id}) | ||
| pass | ||
|
|
||
| if not existing_data: | ||
| qs = ( | ||
| RuleFireHistory.objects.filter( | ||
| rule=rule, | ||
| date_added__gte=start, | ||
| date_added__lt=end, | ||
| ) | ||
| .annotate(bucket=TruncHour("date_added")) | ||
| .order_by("bucket") | ||
| .values("bucket") | ||
| .annotate(count=Count("id")) | ||
| ) | ||
| existing_data = { | ||
| row["bucket"]: TimeSeriesValue(row["bucket"], row["count"]) for row in qs | ||
| } | ||
|
|
||
| # Fill in gaps with zero values for missing hours | ||
| results = [] | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.