Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 112 additions & 109 deletions src/sentry/rules/history/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from sentry.models.rulefirehistory import RuleFireHistory
from sentry.rules.history.base import RuleGroupHistory, RuleHistoryBackend, TimeSeriesValue
from sentry.utils.cursors import Cursor, CursorResult
from sentry.workflow_engine.models.workflow import Workflow
from sentry.workflow_engine.models import AlertRuleWorkflow

if TYPE_CHECKING:
from sentry.models.rule import Rule
Expand Down Expand Up @@ -60,63 +60,72 @@ def record(
notification_uuid=notification_uuid,
)

def fetch_workflow_groups(
def fetch_rule_groups_paginated(
self,
workflow: Workflow,
rule: Rule,
start: datetime,
end: datetime,
cursor: Cursor | None = None,
per_page: int = 25,
) -> CursorResult[RuleGroupHistory]:
# Performs the raw SQL query with pagination
def data_fn(offset: int, limit: int) -> list[_Result]:
query = """
WITH workflow_data AS (
SELECT group_id, date_added, event_id
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s AND date_added < %s
)
SELECT
group_id as group,
COUNT(*) as count,
MAX(date_added) as last_triggered,
(ARRAY_AGG(event_id ORDER BY date_added DESC))[1] as event_id
FROM workflow_data
GROUP BY group_id
ORDER BY count DESC, last_triggered DESC
LIMIT %s OFFSET %s
"""

with connection.cursor() as cursor:
cursor.execute(query, [workflow.id, start, end, limit, offset])
return [
_Result(
group=row[0],
count=row[1],
last_triggered=row[2],
event_id=row[3],
try:
alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id)
workflow = alert_rule_workflow.workflow

# Performs the raw SQL query with pagination
def data_fn(offset: int, limit: int) -> list[_Result]:
query = """
WITH combined_data AS (
SELECT group_id, date_added, event_id
FROM sentry_rulefirehistory
WHERE rule_id = %s AND date_added >= %s AND date_added < %s
UNION ALL
SELECT group_id, date_added, event_id
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s AND date_added < %s
)
for row in cursor.fetchall()
]
SELECT
group_id as group,
COUNT(*) as count,
MAX(date_added) as last_triggered,
(ARRAY_AGG(event_id ORDER BY date_added DESC))[1] as event_id
FROM combined_data
GROUP BY group_id
ORDER BY count DESC, last_triggered DESC
LIMIT %s OFFSET %s
"""

result = GenericOffsetPaginator(data_fn=data_fn).get_result(per_page, cursor)
result.results = convert_results(result.results)
return result
with connection.cursor() as cursor:
cursor.execute(
query, [rule.id, start, end, workflow.id, start, end, limit, offset]
)
return [
_Result(
group=row[0],
count=row[1],
last_triggered=row[2],
event_id=row[3],
)
for row in cursor.fetchall()
]

result = GenericOffsetPaginator(data_fn=data_fn).get_result(per_page, cursor)
result.results = convert_results(result.results)

return result

except AlertRuleWorkflow.DoesNotExist:
# If no workflow is associated with this rule, just use the original behavior
logger.exception("No workflow associated with rule", extra={"rule_id": rule.id})
Comment thread
sentry[bot] marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.exception used for expected non-error code path

Medium Severity

logger.exception is used for an AlertRuleWorkflow.DoesNotExist case that the comment explicitly describes as expected behavior ("just use the original behavior"). logger.exception logs at ERROR level with a full stack trace, which in the Sentry codebase would generate error events in Sentry's own monitoring. For any rule without an associated workflow — likely very common during migration — this creates significant noise in production error tracking. logger.info or logger.warning would be more appropriate here.

Additional Locations (1)
Fix in Cursor Fix in Web

pass

def fetch_rule_groups(
self,
target: Rule,
start: datetime,
end: datetime,
cursor: Cursor | None = None,
per_page: int = 25,
) -> CursorResult[RuleGroupHistory]:
rule_filtered_history = RuleFireHistory.objects.filter(
rule=target,
rule=rule,
date_added__gte=start,
date_added__lt=end,
)

# subquery that retrieves row with the largest date in a group for RuleFireHistory
rule_group_max_dates = rule_filtered_history.filter(group=OuterRef("group")).order_by(
"-date_added"
Expand All @@ -133,77 +142,71 @@ def fetch_rule_groups(
qs, order_by=("-count", "-last_triggered"), on_results=convert_results
).get_result(per_page, cursor)

def fetch_rule_groups_paginated(
self,
target: Rule | Workflow,
start: datetime,
end: datetime,
cursor: Cursor | None = None,
per_page: int = 25,
) -> CursorResult[RuleGroupHistory]:
if isinstance(target, Workflow):
return self.fetch_workflow_groups(target, start, end, cursor, per_page)

return self.fetch_rule_groups(target, start, end, cursor, per_page)

def fetch_workflow_stats(
self, target: Workflow, start: datetime, end: datetime
) -> dict[datetime, TimeSeriesValue]:
# Use raw SQL to combine data from both tables
with connection.cursor() as db_cursor:
db_cursor.execute(
"""
SELECT
DATE_TRUNC('hour', date_added) as bucket,
COUNT(*) as count
FROM (
SELECT date_added
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s
AND date_added < %s
) combined_data
GROUP BY DATE_TRUNC('hour', date_added)
ORDER BY bucket
""",
[target.id, start, end],
)

results = db_cursor.fetchall()

# Convert raw SQL results to the expected format
existing_data = {row[0]: TimeSeriesValue(row[0], row[1]) for row in results}
return existing_data

def fetch_rule_stats(
self, target: Rule, start: datetime, end: datetime
) -> dict[datetime, TimeSeriesValue]:
qs = (
RuleFireHistory.objects.filter(
rule=target,
date_added__gte=start,
date_added__lt=end,
)
.annotate(bucket=TruncHour("date_added"))
.order_by("bucket")
.values("bucket")
.annotate(count=Count("id"))
)
existing_data = {row["bucket"]: TimeSeriesValue(row["bucket"], row["count"]) for row in qs}
return existing_data

def fetch_rule_hourly_stats(
self, target: Rule | Workflow, start: datetime, end: datetime
self, rule: Rule, start: datetime, end: datetime
) -> Sequence[TimeSeriesValue]:
start = start.replace(tzinfo=timezone.utc)
end = end.replace(tzinfo=timezone.utc)

existing_data: dict[datetime, TimeSeriesValue] = {}

if isinstance(target, Workflow):
existing_data = self.fetch_workflow_stats(target, start, end)
else:
existing_data = self.fetch_rule_stats(target, start, end)
try:
alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id)
workflow = alert_rule_workflow.workflow

# Use raw SQL to combine data from both tables
with connection.cursor() as db_cursor:
db_cursor.execute(
"""
SELECT
DATE_TRUNC('hour', date_added) as bucket,
COUNT(*) as count
FROM (
SELECT date_added
FROM sentry_rulefirehistory
WHERE rule_id = %s
AND date_added >= %s
AND date_added < %s

UNION ALL

SELECT date_added
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s
AND date_added < %s
) combined_data
GROUP BY DATE_TRUNC('hour', date_added)
ORDER BY bucket
""",
[rule.id, start, end, workflow.id, start, end],
)

results = db_cursor.fetchall()

# Convert raw SQL results to the expected format
existing_data = {row[0]: TimeSeriesValue(row[0], row[1]) for row in results}

except AlertRuleWorkflow.DoesNotExist:
# If no workflow is associated with this rule, just use the original behavior
logger.exception("No workflow associated with rule", extra={"rule_id": rule.id})
pass

if not existing_data:
qs = (
RuleFireHistory.objects.filter(
rule=rule,
date_added__gte=start,
date_added__lt=end,
)
.annotate(bucket=TruncHour("date_added"))
.order_by("bucket")
.values("bucket")
.annotate(count=Count("id"))
)
existing_data = {
row["bucket"]: TimeSeriesValue(row["bucket"], row["count"]) for row in qs
}

# Fill in gaps with zero values for missing hours
results = []
Expand Down
5 changes: 2 additions & 3 deletions src/sentry/rules/history/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import TYPE_CHECKING

from sentry.utils.services import Service
from sentry.workflow_engine.models.workflow import Workflow

if TYPE_CHECKING:
from typing import Any
Expand Down Expand Up @@ -50,7 +49,7 @@ def record(
raise NotImplementedError

def fetch_rule_groups_paginated(
self, rule: Rule | Workflow, start: datetime, end: datetime, cursor: Cursor, per_page: int
self, rule: Rule, start: datetime, end: datetime, cursor: Cursor, per_page: int
) -> CursorResult[RuleGroupHistory]:
"""
Fetches groups that triggered a rule within a given timeframe, ordered by number of
Expand All @@ -59,7 +58,7 @@ def fetch_rule_groups_paginated(
raise NotImplementedError

def fetch_rule_hourly_stats(
self, rule: Rule | Workflow, start: datetime, end: datetime
self, rule: Rule, start: datetime, end: datetime
) -> Sequence[TimeSeriesValue]:
"""
Fetches counts of how often a rule has fired withing a given datetime range, bucketed by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import cell_silo_endpoint
from sentry.api.bases.rule import WorkflowEngineRuleEndpoint
from sentry.api.bases.rule import RuleEndpoint
from sentry.api.serializers import Serializer, serialize
from sentry.api.serializers.models.group import BaseGroupSerializerResponse
from sentry.api.utils import get_date_range_from_params
Expand All @@ -22,7 +22,6 @@
from sentry.models.rule import Rule
from sentry.rules.history import fetch_rule_groups_paginated
from sentry.rules.history.base import RuleGroupHistory
from sentry.workflow_engine.models.workflow import Workflow


class RuleGroupHistoryResponse(TypedDict):
Expand Down Expand Up @@ -56,7 +55,7 @@ def serialize(

@extend_schema(tags=["issue_alerts"])
@cell_silo_endpoint
class ProjectRuleGroupHistoryIndexEndpoint(WorkflowEngineRuleEndpoint):
class ProjectRuleGroupHistoryIndexEndpoint(RuleEndpoint):
publish_status = {
"GET": ApiPublishStatus.EXPERIMENTAL,
}
Expand All @@ -75,7 +74,7 @@ class ProjectRuleGroupHistoryIndexEndpoint(WorkflowEngineRuleEndpoint):
404: RESPONSE_NOT_FOUND,
},
)
def get(self, request: Request, project: Project, rule: Rule | Workflow) -> Response:
def get(self, request: Request, project: Project, rule: Rule) -> Response:
per_page = self.get_per_page(request)
cursor = self.get_cursor_from_request(request)
try:
Expand Down
7 changes: 3 additions & 4 deletions src/sentry/rules/history/endpoints/project_rule_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import cell_silo_endpoint
from sentry.api.bases.rule import WorkflowEngineRuleEndpoint
from sentry.api.bases.rule import RuleEndpoint
from sentry.api.serializers import Serializer, serialize
from sentry.api.utils import get_date_range_from_params
from sentry.apidocs.constants import RESPONSE_FORBIDDEN, RESPONSE_NOT_FOUND, RESPONSE_UNAUTHORIZED
Expand All @@ -19,7 +19,6 @@
from sentry.models.rule import Rule
from sentry.rules.history import fetch_rule_hourly_stats
from sentry.rules.history.base import TimeSeriesValue
from sentry.workflow_engine.models.workflow import Workflow


class TimeSeriesValueResponse(TypedDict):
Expand All @@ -39,7 +38,7 @@ def serialize(

@extend_schema(tags=["issue_alerts"])
@cell_silo_endpoint
class ProjectRuleStatsIndexEndpoint(WorkflowEngineRuleEndpoint):
class ProjectRuleStatsIndexEndpoint(RuleEndpoint):
publish_status = {
"GET": ApiPublishStatus.EXPERIMENTAL,
}
Expand All @@ -58,7 +57,7 @@ class ProjectRuleStatsIndexEndpoint(WorkflowEngineRuleEndpoint):
404: RESPONSE_NOT_FOUND,
},
)
def get(self, request: Request, project: Project, rule: Rule | Workflow) -> Response:
def get(self, request: Request, project: Project, rule: Rule) -> Response:
"""
Note that results are returned in hourly buckets.
"""
Expand Down
Loading
Loading