From 3bae24cab97a3b0c040b8ad6f3e6a4d37ce457c4 Mon Sep 17 00:00:00 2001 From: Athul Date: Sun, 8 Mar 2026 22:51:17 +0530 Subject: [PATCH 1/8] UN-1798 [PERF] Optimize dashboard metrics aggregation task Reduce DB queries per org from ~42 to ~19 and add self-healing lock: - Combine 4 LLM metric queries into 1 using conditional aggregation - Cache org identifier lookup for PageUsage-based metrics - Merge daily+monthly queries into single source query (backfill pattern) - Tighten active org filter from 2 months to 7 days - Add self-healing Redis lock to prevent stuck-lock incidents - Remove unused org_identifier param from get_failed_pages Co-Authored-By: Claude Opus 4.6 --- .../management/commands/backfill_metrics.py | 33 +- backend/dashboard_metrics/services.py | 248 ++++++------- backend/dashboard_metrics/tasks.py | 325 +++++++++++++----- 3 files changed, 397 insertions(+), 209 deletions(-) diff --git a/backend/dashboard_metrics/management/commands/backfill_metrics.py b/backend/dashboard_metrics/management/commands/backfill_metrics.py index 4c71fcd329..de649c02ad 100644 --- a/backend/dashboard_metrics/management/commands/backfill_metrics.py +++ b/backend/dashboard_metrics/management/commands/backfill_metrics.py @@ -139,15 +139,28 @@ def handle(self, *args, **options): "errors": 0, } + # Pre-resolve org identifiers for PageUsage queries (avoids + # redundant Organization lookups inside the metric query loop). + org_identifiers = dict( + Organization.objects.filter( + id__in=[int(oid) if oid.isdigit() else oid for oid in org_ids] + ).values_list("id", "organization_id") + ) + for i, current_org_id in enumerate(org_ids): self.stdout.write( f"\n[{i + 1}/{len(org_ids)}] Processing org: {current_org_id}" ) try: + # Resolve org string identifier for PageUsage queries + org_id_key = int(current_org_id) if current_org_id.isdigit() else current_org_id + org_identifier = org_identifiers.get(org_id_key) + # Collect all metric data for this org hourly_data, daily_data, monthly_data = self._collect_metrics( - current_org_id, start_date, end_date + current_org_id, start_date, end_date, + org_identifier=org_identifier, ) self.stdout.write( @@ -245,7 +258,11 @@ def _resolve_org_ids( return sorted(str(oid) for oid in all_org_ids) def _collect_metrics( - self, org_id: str, start_date: datetime, end_date: datetime + self, + org_id: str, + start_date: datetime, + end_date: datetime, + org_identifier: str | None = None, ) -> tuple[dict, dict, dict]: """Collect metrics from source tables for all granularities.""" hourly_agg = {} @@ -255,10 +272,17 @@ def _collect_metrics( for metric_name, query_method, is_histogram in self.METRIC_CONFIGS: metric_type = MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER + # Pass org_identifier to PageUsage-based metrics to + # avoid redundant Organization lookups per call. + extra_kwargs = {} + if metric_name == "pages_processed": + extra_kwargs["org_identifier"] = org_identifier + try: # Query hourly data hourly_results = query_method( - org_id, start_date, end_date, granularity=Granularity.HOUR + org_id, start_date, end_date, + granularity=Granularity.HOUR, **extra_kwargs, ) for row in hourly_results: period = row["period"] @@ -277,7 +301,8 @@ def _collect_metrics( # Query daily data daily_results = query_method( - org_id, start_date, end_date, granularity=Granularity.DAY + org_id, start_date, end_date, + granularity=Granularity.DAY, **extra_kwargs, ) for row in daily_results: period = row["period"] diff --git a/backend/dashboard_metrics/services.py b/backend/dashboard_metrics/services.py index e382f62617..be0385ca9c 100644 --- a/backend/dashboard_metrics/services.py +++ b/backend/dashboard_metrics/services.py @@ -14,7 +14,7 @@ from account_usage.models import PageUsage from account_v2.models import Organization from api_v2.models import APIDeployment -from django.db.models import CharField, Count, OuterRef, Subquery, Sum +from django.db.models import CharField, Count, OuterRef, Q, Subquery, Sum from django.db.models.functions import Cast, Coalesce, TruncDay, TruncHour, TruncWeek from pipeline_v2.models import Pipeline from usage_v2.models import Usage @@ -114,12 +114,37 @@ def get_documents_processed( .order_by("period") ) + @staticmethod + def _resolve_org_identifier(organization_id: str, org_identifier: str | None = None) -> str | None: + """Resolve PageUsage's string org identifier from UUID PK. + + PageUsage.organization_id stores Organization.organization_id (a string + like "org_abc123"), not the UUID PK used everywhere else. This helper + resolves the string identifier, accepting a pre-resolved value to avoid + redundant DB lookups when called in a loop. + + Args: + organization_id: Organization UUID string (PK) + org_identifier: Pre-resolved string identifier (skips DB lookup) + + Returns: + Organization string identifier, or None if org not found + """ + if org_identifier: + return org_identifier + try: + org = Organization.objects.get(id=organization_id) + return org.organization_id + except Organization.DoesNotExist: + return None + @staticmethod def get_pages_processed( organization_id: str, start_date: datetime, end_date: datetime, granularity: str = Granularity.DAY, + org_identifier: str | None = None, ) -> list[dict[str, Any]]: """Query pages processed from page_usage. @@ -127,28 +152,30 @@ def get_pages_processed( Note: PageUsage.organization_id stores the Organization's string identifier (organization.organization_id), NOT the UUID PK. - We look up the string identifier from the UUID passed by callers. + Pass org_identifier to avoid a DB lookup per call. Args: organization_id: Organization UUID string start_date: Start of date range end_date: End of date range granularity: Time granularity (hour, day, week) + org_identifier: Pre-resolved Organization.organization_id string. + If None, will be looked up from organization_id. Returns: List of dicts with 'period' and 'value' keys """ - try: - org = Organization.objects.get(id=organization_id) - org_identifier = org.organization_id - except Organization.DoesNotExist: + resolved = MetricsQueryService._resolve_org_identifier( + organization_id, org_identifier + ) + if not resolved: return [] trunc_func = MetricsQueryService._get_trunc_func(granularity) return list( PageUsage.objects.filter( - organization_id=org_identifier, + organization_id=resolved, created_at__gte=start_date, created_at__lte=end_date, ) @@ -159,15 +186,17 @@ def get_pages_processed( ) @staticmethod - def get_llm_calls( + def get_llm_metrics_combined( organization_id: str, start_date: datetime, end_date: datetime, granularity: str = Granularity.DAY, ) -> list[dict[str, Any]]: - """Query LLM calls from usage table. + """Query all LLM metrics from usage table in a single query. - Counts usage records where usage_type='llm' grouped by time period. + Uses conditional aggregation to compute llm_calls, challenges, + summarization_calls, and llm_usage (cost) in one DB round-trip + instead of four separate queries. Args: organization_id: Organization UUID string @@ -176,7 +205,8 @@ def get_llm_calls( granularity: Time granularity (hour, day, week) Returns: - List of dicts with 'period' and 'value' keys + List of dicts with 'period', 'llm_calls', 'challenges', + 'summarization_calls', and 'llm_usage' keys """ trunc_func = MetricsQueryService._get_trunc_func(granularity) @@ -190,46 +220,54 @@ def get_llm_calls( ) .annotate(period=trunc_func("created_at")) .values("period") - .annotate(value=Count("id")) + .annotate( + llm_calls=Count("id"), + challenges=Count("id", filter=Q(llm_usage_reason="challenge")), + summarization_calls=Count( + "id", filter=Q(llm_usage_reason="summarize") + ), + llm_usage=Sum("cost_in_dollars"), + ) .order_by("period") ) @staticmethod - def get_challenges( + def get_llm_calls( organization_id: str, start_date: datetime, end_date: datetime, granularity: str = Granularity.DAY, ) -> list[dict[str, Any]]: - """Query challenge calls from usage table. + """Query LLM calls from usage table. - Counts usage records where llm_usage_reason='challenge' grouped by time period. + Thin wrapper for views/backfill that need a single metric. + For batch aggregation, use get_llm_metrics_combined() instead. + """ + return [ + {"period": r["period"], "value": r["llm_calls"]} + for r in MetricsQueryService.get_llm_metrics_combined( + organization_id, start_date, end_date, granularity + ) + ] - Args: - organization_id: Organization UUID string - start_date: Start of date range - end_date: End of date range - granularity: Time granularity (hour, day, week) + @staticmethod + def get_challenges( + organization_id: str, + start_date: datetime, + end_date: datetime, + granularity: str = Granularity.DAY, + ) -> list[dict[str, Any]]: + """Query challenge calls from usage table. - Returns: - List of dicts with 'period' and 'value' keys + Thin wrapper for views/backfill that need a single metric. + For batch aggregation, use get_llm_metrics_combined() instead. """ - trunc_func = MetricsQueryService._get_trunc_func(granularity) - - return list( - _get_usage_queryset() - .filter( - organization_id=organization_id, - usage_type="llm", - llm_usage_reason="challenge", - created_at__gte=start_date, - created_at__lte=end_date, + return [ + {"period": r["period"], "value": r["challenges"]} + for r in MetricsQueryService.get_llm_metrics_combined( + organization_id, start_date, end_date, granularity ) - .annotate(period=trunc_func("created_at")) - .values("period") - .annotate(value=Count("id")) - .order_by("period") - ) + ] @staticmethod def get_summarization_calls( @@ -240,33 +278,34 @@ def get_summarization_calls( ) -> list[dict[str, Any]]: """Query summarization calls from usage table. - Counts usage records where llm_usage_reason='summarize' grouped by time period. + Thin wrapper for views/backfill that need a single metric. + For batch aggregation, use get_llm_metrics_combined() instead. + """ + return [ + {"period": r["period"], "value": r["summarization_calls"]} + for r in MetricsQueryService.get_llm_metrics_combined( + organization_id, start_date, end_date, granularity + ) + ] - Args: - organization_id: Organization UUID string - start_date: Start of date range - end_date: End of date range - granularity: Time granularity (hour, day, week) + @staticmethod + def get_llm_usage_cost( + organization_id: str, + start_date: datetime, + end_date: datetime, + granularity: str = Granularity.DAY, + ) -> list[dict[str, Any]]: + """Query LLM usage cost from usage table. - Returns: - List of dicts with 'period' and 'value' keys + Thin wrapper for views/backfill that need a single metric. + For batch aggregation, use get_llm_metrics_combined() instead. """ - trunc_func = MetricsQueryService._get_trunc_func(granularity) - - return list( - _get_usage_queryset() - .filter( - organization_id=organization_id, - usage_type="llm", - llm_usage_reason="summarize", - created_at__gte=start_date, - created_at__lte=end_date, + return [ + {"period": r["period"], "value": r["llm_usage"]} + for r in MetricsQueryService.get_llm_metrics_combined( + organization_id, start_date, end_date, granularity ) - .annotate(period=trunc_func("created_at")) - .values("period") - .annotate(value=Count("id")) - .order_by("period") - ) + ] @staticmethod def get_deployed_api_requests( @@ -355,42 +394,6 @@ def get_etl_pipeline_executions( .order_by("period") ) - @staticmethod - def get_llm_usage_cost( - organization_id: str, - start_date: datetime, - end_date: datetime, - granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query LLM usage cost from usage table. - - Sums cost_in_dollars for LLM usage records grouped by time period. - - Args: - organization_id: Organization UUID string - start_date: Start of date range - end_date: End of date range - granularity: Time granularity (hour, day, week) - - Returns: - List of dicts with 'period' and 'value' keys - """ - trunc_func = MetricsQueryService._get_trunc_func(granularity) - - return list( - _get_usage_queryset() - .filter( - organization_id=organization_id, - usage_type="llm", - created_at__gte=start_date, - created_at__lte=end_date, - ) - .annotate(period=trunc_func("created_at")) - .values("period") - .annotate(value=Sum("cost_in_dollars")) - .order_by("period") - ) - @staticmethod def get_prompt_executions( organization_id: str, @@ -437,6 +440,10 @@ def get_failed_pages( Sums pages_processed for file executions with status='ERROR', grouped by time period based on when the failure occurred. + Note: Unlike get_pages_processed, this does NOT need org_identifier + because it joins through WorkflowFileExecution -> WorkflowExecution -> + Workflow -> Organization (UUID FK), not through PageUsage.organization_id. + Args: organization_id: Organization UUID string start_date: Start of date range @@ -576,6 +583,18 @@ def get_all_metrics_summary( Returns: Dict mapping metric name to total value """ + # Resolve org identifier once for PageUsage queries + org_identifier = cls._resolve_org_identifier(organization_id) + + # Combined LLM metrics (1 query instead of 4) + llm_combined = cls.get_llm_metrics_combined( + organization_id, start_date, end_date + ) + llm_calls_total = sum(r["llm_calls"] for r in llm_combined) + challenges_total = sum(r["challenges"] for r in llm_combined) + summarization_total = sum(r["summarization_calls"] for r in llm_combined) + llm_usage_total = sum(r["llm_usage"] or 0 for r in llm_combined) + return { "documents_processed": sum( r["value"] @@ -585,22 +604,14 @@ def get_all_metrics_summary( ), "pages_processed": sum( r["value"] or 0 - for r in cls.get_pages_processed(organization_id, start_date, end_date) - ), - "llm_calls": sum( - r["value"] - for r in cls.get_llm_calls(organization_id, start_date, end_date) - ), - "challenges": sum( - r["value"] - for r in cls.get_challenges(organization_id, start_date, end_date) - ), - "summarization_calls": sum( - r["value"] - for r in cls.get_summarization_calls( - organization_id, start_date, end_date + for r in cls.get_pages_processed( + organization_id, start_date, end_date, + org_identifier=org_identifier, ) ), + "llm_calls": llm_calls_total, + "challenges": challenges_total, + "summarization_calls": summarization_total, "deployed_api_requests": sum( r["value"] for r in cls.get_deployed_api_requests( @@ -613,17 +624,18 @@ def get_all_metrics_summary( organization_id, start_date, end_date ) ), - "llm_usage": sum( - r["value"] or 0 - for r in cls.get_llm_usage_cost(organization_id, start_date, end_date) - ), + "llm_usage": llm_usage_total, "prompt_executions": sum( r["value"] - for r in cls.get_prompt_executions(organization_id, start_date, end_date) + for r in cls.get_prompt_executions( + organization_id, start_date, end_date + ) ), "failed_pages": sum( r["value"] or 0 - for r in cls.get_failed_pages(organization_id, start_date, end_date) + for r in cls.get_failed_pages( + organization_id, start_date, end_date + ) ), "hitl_reviews": sum( r["value"] @@ -631,7 +643,9 @@ def get_all_metrics_summary( ), "hitl_completions": sum( r["value"] - for r in cls.get_hitl_completions(organization_id, start_date, end_date) + for r in cls.get_hitl_completions( + organization_id, start_date, end_date + ) ), } diff --git a/backend/dashboard_metrics/tasks.py b/backend/dashboard_metrics/tasks.py index 879bc8d1fc..f2b73dbf9a 100644 --- a/backend/dashboard_metrics/tasks.py +++ b/backend/dashboard_metrics/tasks.py @@ -7,6 +7,7 @@ """ import logging +import time from datetime import datetime, timedelta from typing import Any @@ -199,6 +200,49 @@ def _bulk_upsert_monthly(aggregations: dict) -> int: AGGREGATION_LOCK_TIMEOUT = 900 # 15 minutes (matches task schedule) +def _acquire_aggregation_lock() -> bool: + """Acquire the distributed aggregation lock with self-healing. + + Stores a Unix timestamp as the lock value. If a previous run crashed + (OOM kill, SIGKILL) without releasing the lock, the next run detects + that the lock is older than AGGREGATION_LOCK_TIMEOUT and reclaims it. + + Returns: + True if lock was acquired, False if another run is legitimately active. + """ + now = time.time() + + # Fast path: lock is free + if cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT): + return True + + # Lock exists — check if it's stale (previous run died without releasing) + lock_value = cache.get(AGGREGATION_LOCK_KEY) + if lock_value is None: + # Expired between our check and get — race is fine, next run will pick up + return False + + try: + lock_time = float(lock_value) + except (TypeError, ValueError): + # Corrupted value (e.g. old "running" string) — reclaim it + logger.warning("Reclaiming aggregation lock with invalid value: %s", lock_value) + cache.delete(AGGREGATION_LOCK_KEY) + return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT) + + age = now - lock_time + if age > AGGREGATION_LOCK_TIMEOUT: + logger.warning( + "Reclaiming stale aggregation lock (age=%.0fs, timeout=%ds)", + age, + AGGREGATION_LOCK_TIMEOUT, + ) + cache.delete(AGGREGATION_LOCK_KEY) + return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT) + + return False + + @shared_task( name="dashboard_metrics.aggregate_from_sources", soft_time_limit=600, @@ -216,19 +260,19 @@ def aggregate_metrics_from_sources() -> dict[str, Any]: them into EventMetricsHourly, EventMetricsDaily, and EventMetricsMonthly tables for fast dashboard queries at different granularities. - Uses a Redis distributed lock to prevent overlapping runs when a task - takes longer than the 15-minute schedule interval. + Uses a Redis distributed lock with self-healing to prevent overlapping + runs. If a previous run was killed without releasing the lock, the next + run detects the stale lock and reclaims it automatically. Aggregation windows: - Hourly: Last 24 hours (rolling window) - - Daily: Last 2 days (to catch day boundaries) - - Monthly: Current month (updates running total) + - Daily: Last 7 days (ensures we capture late-arriving data) + - Monthly: Last 2 months (current + previous month) Returns: Dict with aggregation summary for all three tiers """ - # Acquire distributed lock to prevent overlapping runs - if not cache.add(AGGREGATION_LOCK_KEY, "running", AGGREGATION_LOCK_TIMEOUT): + if not _acquire_aggregation_lock(): logger.info("Skipping aggregation — another run is in progress") return {"success": True, "skipped": True, "reason": "lock_held"} @@ -238,6 +282,150 @@ def aggregate_metrics_from_sources() -> dict[str, Any]: cache.delete(AGGREGATION_LOCK_KEY) +def _aggregate_single_metric( + query_method, + metric_name: str, + metric_type: str, + org_id: str, + hourly_start: datetime, + daily_start: datetime, + monthly_start: datetime, + end_date: datetime, + hourly_agg: dict, + daily_agg: dict, + monthly_agg: dict, + extra_kwargs: dict | None = None, +) -> None: + """Run a single metric query at all 3 granularities and populate agg dicts. + + Uses 2 queries instead of 3: the daily query is widened to monthly_start + and its results are split into both daily_agg and monthly_agg in Python. + This is the same pattern proven in the backfill management command. + """ + extra_kwargs = extra_kwargs or {} + + # === HOURLY (last 24h) === + for row in query_method( + org_id, hourly_start, end_date, + granularity=Granularity.HOUR, **extra_kwargs, + ): + value = row["value"] or 0 + hour_ts = _truncate_to_hour(row["period"]) + key = (org_id, hour_ts.isoformat(), metric_name, "default", "") + if key not in hourly_agg: + hourly_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} + hourly_agg[key]["value"] += value + hourly_agg[key]["count"] += 1 + + # === DAILY + MONTHLY (single query from monthly_start) === + # Query with DAY granularity from monthly_start (2 months back). + # Each row is bucketed into daily_agg if within daily window, + # and always bucketed into monthly_agg for the month rollup. + monthly_buckets: dict[str, dict] = {} + for row in query_method( + org_id, monthly_start, end_date, + granularity=Granularity.DAY, **extra_kwargs, + ): + value = row["value"] or 0 + day_ts = _truncate_to_day(row["period"]) + + # Daily: only include rows within the daily window + if day_ts >= daily_start: + key = (org_id, day_ts.date().isoformat(), metric_name, "default", "") + if key not in daily_agg: + daily_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} + daily_agg[key]["value"] += value + daily_agg[key]["count"] += 1 + + # Monthly: bucket all rows by month + month_key_str = _truncate_to_month(row["period"]).date().isoformat() + if month_key_str not in monthly_buckets: + monthly_buckets[month_key_str] = {"value": 0, "count": 0} + monthly_buckets[month_key_str]["value"] += value + monthly_buckets[month_key_str]["count"] += 1 + + for month_key_str, bucket in monthly_buckets.items(): + key = (org_id, month_key_str, metric_name, "default", "") + monthly_agg[key] = { + "metric_type": metric_type, + "value": bucket["value"], + "count": bucket["count"] or 1, + } + + +def _aggregate_llm_combined( + org_id: str, + hourly_start: datetime, + daily_start: datetime, + monthly_start: datetime, + end_date: datetime, + hourly_agg: dict, + daily_agg: dict, + monthly_agg: dict, + llm_combined_fields: dict, +) -> None: + """Run the combined LLM metrics query at all granularities. + + Issues 2 queries total (hourly + daily/monthly) instead of 3. + The DAY-granularity query is widened to monthly_start and results are + split into daily_agg (recent rows) and monthly_agg (all rows bucketed + by month) in Python. Same pattern as _aggregate_single_metric. + """ + # === HOURLY (last 24h) === + hourly_results = MetricsQueryService.get_llm_metrics_combined( + org_id, hourly_start, end_date, granularity=Granularity.HOUR, + ) + for row in hourly_results: + ts = _truncate_to_hour(row["period"]) + ts_str = ts.isoformat() + for field, (metric_name, metric_type) in llm_combined_fields.items(): + value = row[field] or 0 + key = (org_id, ts_str, metric_name, "default", "") + if key not in hourly_agg: + hourly_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} + hourly_agg[key]["value"] += value + hourly_agg[key]["count"] += 1 + + # === DAILY + MONTHLY (single query from monthly_start) === + daily_monthly_results = MetricsQueryService.get_llm_metrics_combined( + org_id, monthly_start, end_date, granularity=Granularity.DAY, + ) + monthly_buckets: dict[tuple[str, str], dict] = {} + for row in daily_monthly_results: + day_ts = _truncate_to_day(row["period"]) + + # Daily: only include rows within the daily window + if day_ts >= daily_start: + ts_str = day_ts.date().isoformat() + for field, (metric_name, metric_type) in llm_combined_fields.items(): + value = row[field] or 0 + key = (org_id, ts_str, metric_name, "default", "") + if key not in daily_agg: + daily_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} + daily_agg[key]["value"] += value + daily_agg[key]["count"] += 1 + + # Monthly: bucket all rows by month + month_key_str = _truncate_to_month(row["period"]).date().isoformat() + for field, (metric_name, metric_type) in llm_combined_fields.items(): + value = row[field] or 0 + bkey = (month_key_str, metric_name) + if bkey not in monthly_buckets: + monthly_buckets[bkey] = { + "metric_type": metric_type, "value": 0, "count": 0, + } + monthly_buckets[bkey]["value"] += value + monthly_buckets[bkey]["count"] += 1 + + for (month_key_str, metric_name), bucket in monthly_buckets.items(): + key = (org_id, month_key_str, metric_name, "default", "") + monthly_agg[key] = { + "metric_type": bucket["metric_type"], + "value": bucket["value"], + "count": bucket["count"] or 1, + } + + def _run_aggregation() -> dict[str, Any]: """Execute the actual aggregation logic. @@ -268,25 +456,32 @@ def _run_aggregation() -> dict[str, Any]: ) # Metric definitions: (name, query_method, is_histogram) + # Note: llm_calls, challenges, summarization_calls, and llm_usage are + # handled separately via get_llm_metrics_combined (1 query instead of 4). metric_configs = [ ("documents_processed", MetricsQueryService.get_documents_processed, False), ("pages_processed", MetricsQueryService.get_pages_processed, True), - ("llm_calls", MetricsQueryService.get_llm_calls, False), - ("challenges", MetricsQueryService.get_challenges, False), - ("summarization_calls", MetricsQueryService.get_summarization_calls, False), ("deployed_api_requests", MetricsQueryService.get_deployed_api_requests, False), ( "etl_pipeline_executions", MetricsQueryService.get_etl_pipeline_executions, False, ), - ("llm_usage", MetricsQueryService.get_llm_usage_cost, True), ("prompt_executions", MetricsQueryService.get_prompt_executions, False), ("failed_pages", MetricsQueryService.get_failed_pages, True), ("hitl_reviews", MetricsQueryService.get_hitl_reviews, False), ("hitl_completions", MetricsQueryService.get_hitl_completions, False), ] + # LLM metrics combined via conditional aggregation (4 metrics in 1 query). + # Maps combined query field -> (metric_name, metric_type) + llm_combined_fields = { + "llm_calls": ("llm_calls", MetricType.COUNTER), + "challenges": ("challenges", MetricType.COUNTER), + "summarization_calls": ("summarization_calls", MetricType.COUNTER), + "llm_usage": ("llm_usage", MetricType.HISTOGRAM), + } + stats = { "hourly": {"upserted": 0}, "daily": {"upserted": 0}, @@ -296,10 +491,14 @@ def _run_aggregation() -> dict[str, Any]: } # Pre-filter to orgs with recent activity to reduce DB load. - # One lightweight query avoids running 36 queries per dormant org. + # Uses daily_start (7 days) instead of monthly_start (2 months) because: + # - Hourly/daily queries only need recent data (24h / 7d windows) + # - Monthly totals for dormant orgs were already written by previous + # runs when the org was active — re-running just overwrites same values + # - This avoids 28 queries per dormant org that had activity 2-8 weeks ago active_org_ids = set( WorkflowExecution.objects.filter( - created_at__gte=monthly_start, + created_at__gte=daily_start, ) .values_list("workflow__organization_id", flat=True) .distinct() @@ -322,10 +521,13 @@ def _run_aggregation() -> dict[str, Any]: "skipped_reason": "no_active_orgs", } - organizations = Organization.objects.filter(id__in=active_org_ids).only("id") + organizations = Organization.objects.filter(id__in=active_org_ids).only( + "id", "organization_id" + ) for org in organizations: org_id = str(org.id) + org_identifier = org.organization_id # Pre-resolved for PageUsage queries hourly_agg: dict[tuple, dict] = {} daily_agg: dict[tuple, dict] = {} monthly_agg: dict[tuple, dict] = {} @@ -334,87 +536,34 @@ def _run_aggregation() -> dict[str, Any]: for metric_name, query_method, is_histogram in metric_configs: metric_type = MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER + # Pass org_identifier to PageUsage-based metrics to + # avoid redundant Organization lookups per call. + extra_kwargs = {} + if metric_name == "pages_processed": + extra_kwargs["org_identifier"] = org_identifier + try: - # === HOURLY AGGREGATION (last 24 hours) === - hourly_results = query_method( - org_id, hourly_start, end_date, granularity=Granularity.HOUR + _aggregate_single_metric( + query_method, metric_name, metric_type, + org_id, hourly_start, daily_start, monthly_start, end_date, + hourly_agg, daily_agg, monthly_agg, + extra_kwargs, ) - for row in hourly_results: - period = row["period"] - value = row["value"] or 0 - hour_ts = _truncate_to_hour(period) - key = (org_id, hour_ts.isoformat(), metric_name, "default", "") - - if key not in hourly_agg: - hourly_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - hourly_agg[key]["value"] += value - hourly_agg[key]["count"] += 1 - - # === DAILY AGGREGATION (last 7 days) === - daily_results = query_method( - org_id, daily_start, end_date, granularity=Granularity.DAY - ) - for row in daily_results: - period = row["period"] - value = row["value"] or 0 - day_ts = _truncate_to_day(period) - key = ( - org_id, - day_ts.date().isoformat(), - metric_name, - "default", - "", - ) - - if key not in daily_agg: - daily_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - daily_agg[key]["value"] += value - daily_agg[key]["count"] += 1 - - # === MONTHLY AGGREGATION (last 2 months) === - monthly_results = query_method( - org_id, monthly_start, end_date, granularity=Granularity.DAY - ) - # Group results by month and create separate records - monthly_buckets: dict[str, dict] = {} - for row in monthly_results: - period = row["period"] - value = row["value"] or 0 - month_ts = _truncate_to_month(period) - month_key_str = month_ts.date().isoformat() - - if month_key_str not in monthly_buckets: - monthly_buckets[month_key_str] = {"value": 0, "count": 0} - monthly_buckets[month_key_str]["value"] += value - monthly_buckets[month_key_str]["count"] += 1 - - # Create aggregation entries for each month - for month_key_str, bucket in monthly_buckets.items(): - month_key = ( - org_id, - month_key_str, - metric_name, - "default", - "", - ) - monthly_agg[month_key] = { - "metric_type": metric_type, - "value": bucket["value"], - "count": bucket["count"] or 1, - } - except Exception: logger.exception("Error querying %s for org %s", metric_name, org_id) stats["errors"] += 1 + # Combined LLM metrics: 1 query per granularity instead of 4 + try: + _aggregate_llm_combined( + org_id, hourly_start, daily_start, monthly_start, end_date, + hourly_agg, daily_agg, monthly_agg, + llm_combined_fields, + ) + except Exception: + logger.exception("Error querying combined LLM metrics for org %s", org_id) + stats["errors"] += 1 + # Bulk upsert all three tiers (single INSERT...ON CONFLICT each) if hourly_agg: stats["hourly"]["upserted"] += _bulk_upsert_hourly(hourly_agg) From 27b1575c856d50030a9db31a6f1ce99fe75665dd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 8 Mar 2026 18:32:30 +0000 Subject: [PATCH 2/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../management/commands/backfill_metrics.py | 22 +++++--- backend/dashboard_metrics/services.py | 28 ++++------ backend/dashboard_metrics/tasks.py | 52 ++++++++++++++----- 3 files changed, 67 insertions(+), 35 deletions(-) diff --git a/backend/dashboard_metrics/management/commands/backfill_metrics.py b/backend/dashboard_metrics/management/commands/backfill_metrics.py index de649c02ad..b1a1c3c072 100644 --- a/backend/dashboard_metrics/management/commands/backfill_metrics.py +++ b/backend/dashboard_metrics/management/commands/backfill_metrics.py @@ -154,12 +154,16 @@ def handle(self, *args, **options): try: # Resolve org string identifier for PageUsage queries - org_id_key = int(current_org_id) if current_org_id.isdigit() else current_org_id + org_id_key = ( + int(current_org_id) if current_org_id.isdigit() else current_org_id + ) org_identifier = org_identifiers.get(org_id_key) # Collect all metric data for this org hourly_data, daily_data, monthly_data = self._collect_metrics( - current_org_id, start_date, end_date, + current_org_id, + start_date, + end_date, org_identifier=org_identifier, ) @@ -281,8 +285,11 @@ def _collect_metrics( try: # Query hourly data hourly_results = query_method( - org_id, start_date, end_date, - granularity=Granularity.HOUR, **extra_kwargs, + org_id, + start_date, + end_date, + granularity=Granularity.HOUR, + **extra_kwargs, ) for row in hourly_results: period = row["period"] @@ -301,8 +308,11 @@ def _collect_metrics( # Query daily data daily_results = query_method( - org_id, start_date, end_date, - granularity=Granularity.DAY, **extra_kwargs, + org_id, + start_date, + end_date, + granularity=Granularity.DAY, + **extra_kwargs, ) for row in daily_results: period = row["period"] diff --git a/backend/dashboard_metrics/services.py b/backend/dashboard_metrics/services.py index be0385ca9c..e7d604e41e 100644 --- a/backend/dashboard_metrics/services.py +++ b/backend/dashboard_metrics/services.py @@ -115,7 +115,9 @@ def get_documents_processed( ) @staticmethod - def _resolve_org_identifier(organization_id: str, org_identifier: str | None = None) -> str | None: + def _resolve_org_identifier( + organization_id: str, org_identifier: str | None = None + ) -> str | None: """Resolve PageUsage's string org identifier from UUID PK. PageUsage.organization_id stores Organization.organization_id (a string @@ -223,9 +225,7 @@ def get_llm_metrics_combined( .annotate( llm_calls=Count("id"), challenges=Count("id", filter=Q(llm_usage_reason="challenge")), - summarization_calls=Count( - "id", filter=Q(llm_usage_reason="summarize") - ), + summarization_calls=Count("id", filter=Q(llm_usage_reason="summarize")), llm_usage=Sum("cost_in_dollars"), ) .order_by("period") @@ -587,9 +587,7 @@ def get_all_metrics_summary( org_identifier = cls._resolve_org_identifier(organization_id) # Combined LLM metrics (1 query instead of 4) - llm_combined = cls.get_llm_metrics_combined( - organization_id, start_date, end_date - ) + llm_combined = cls.get_llm_metrics_combined(organization_id, start_date, end_date) llm_calls_total = sum(r["llm_calls"] for r in llm_combined) challenges_total = sum(r["challenges"] for r in llm_combined) summarization_total = sum(r["summarization_calls"] for r in llm_combined) @@ -605,7 +603,9 @@ def get_all_metrics_summary( "pages_processed": sum( r["value"] or 0 for r in cls.get_pages_processed( - organization_id, start_date, end_date, + organization_id, + start_date, + end_date, org_identifier=org_identifier, ) ), @@ -627,15 +627,11 @@ def get_all_metrics_summary( "llm_usage": llm_usage_total, "prompt_executions": sum( r["value"] - for r in cls.get_prompt_executions( - organization_id, start_date, end_date - ) + for r in cls.get_prompt_executions(organization_id, start_date, end_date) ), "failed_pages": sum( r["value"] or 0 - for r in cls.get_failed_pages( - organization_id, start_date, end_date - ) + for r in cls.get_failed_pages(organization_id, start_date, end_date) ), "hitl_reviews": sum( r["value"] @@ -643,9 +639,7 @@ def get_all_metrics_summary( ), "hitl_completions": sum( r["value"] - for r in cls.get_hitl_completions( - organization_id, start_date, end_date - ) + for r in cls.get_hitl_completions(organization_id, start_date, end_date) ), } diff --git a/backend/dashboard_metrics/tasks.py b/backend/dashboard_metrics/tasks.py index f2b73dbf9a..9d78385c6f 100644 --- a/backend/dashboard_metrics/tasks.py +++ b/backend/dashboard_metrics/tasks.py @@ -306,8 +306,11 @@ def _aggregate_single_metric( # === HOURLY (last 24h) === for row in query_method( - org_id, hourly_start, end_date, - granularity=Granularity.HOUR, **extra_kwargs, + org_id, + hourly_start, + end_date, + granularity=Granularity.HOUR, + **extra_kwargs, ): value = row["value"] or 0 hour_ts = _truncate_to_hour(row["period"]) @@ -323,8 +326,11 @@ def _aggregate_single_metric( # and always bucketed into monthly_agg for the month rollup. monthly_buckets: dict[str, dict] = {} for row in query_method( - org_id, monthly_start, end_date, - granularity=Granularity.DAY, **extra_kwargs, + org_id, + monthly_start, + end_date, + granularity=Granularity.DAY, + **extra_kwargs, ): value = row["value"] or 0 day_ts = _truncate_to_day(row["period"]) @@ -373,7 +379,10 @@ def _aggregate_llm_combined( """ # === HOURLY (last 24h) === hourly_results = MetricsQueryService.get_llm_metrics_combined( - org_id, hourly_start, end_date, granularity=Granularity.HOUR, + org_id, + hourly_start, + end_date, + granularity=Granularity.HOUR, ) for row in hourly_results: ts = _truncate_to_hour(row["period"]) @@ -388,7 +397,10 @@ def _aggregate_llm_combined( # === DAILY + MONTHLY (single query from monthly_start) === daily_monthly_results = MetricsQueryService.get_llm_metrics_combined( - org_id, monthly_start, end_date, granularity=Granularity.DAY, + org_id, + monthly_start, + end_date, + granularity=Granularity.DAY, ) monthly_buckets: dict[tuple[str, str], dict] = {} for row in daily_monthly_results: @@ -412,7 +424,9 @@ def _aggregate_llm_combined( bkey = (month_key_str, metric_name) if bkey not in monthly_buckets: monthly_buckets[bkey] = { - "metric_type": metric_type, "value": 0, "count": 0, + "metric_type": metric_type, + "value": 0, + "count": 0, } monthly_buckets[bkey]["value"] += value monthly_buckets[bkey]["count"] += 1 @@ -544,9 +558,17 @@ def _run_aggregation() -> dict[str, Any]: try: _aggregate_single_metric( - query_method, metric_name, metric_type, - org_id, hourly_start, daily_start, monthly_start, end_date, - hourly_agg, daily_agg, monthly_agg, + query_method, + metric_name, + metric_type, + org_id, + hourly_start, + daily_start, + monthly_start, + end_date, + hourly_agg, + daily_agg, + monthly_agg, extra_kwargs, ) except Exception: @@ -556,8 +578,14 @@ def _run_aggregation() -> dict[str, Any]: # Combined LLM metrics: 1 query per granularity instead of 4 try: _aggregate_llm_combined( - org_id, hourly_start, daily_start, monthly_start, end_date, - hourly_agg, daily_agg, monthly_agg, + org_id, + hourly_start, + daily_start, + monthly_start, + end_date, + hourly_agg, + daily_agg, + monthly_agg, llm_combined_fields, ) except Exception: From db14aa2f8dd829e65571f053280dd8e76ac02290 Mon Sep 17 00:00:00 2001 From: Athul Date: Tue, 10 Mar 2026 22:33:12 +0530 Subject: [PATCH 3/8] Fix redundant LLM queries and daily_start boundary bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace 4 thin LLM wrapper methods with get_llm_metrics_split() that fetches combined data once and splits into per-metric series - Update live_series view to use single combined query (4→1 DB queries) - Update backfill_metrics command to use combined query (8→2 DB queries) - Fix daily_start boundary day exclusion by truncating to midnight - Rename resolved → page_usage_org_id for clarity Co-Authored-By: Claude Opus 4.6 --- .../management/commands/backfill_metrics.py | 148 +++++++++++------- backend/dashboard_metrics/services.py | 98 ++++-------- backend/dashboard_metrics/tasks.py | 2 +- backend/dashboard_metrics/views.py | 62 ++++++-- 4 files changed, 173 insertions(+), 137 deletions(-) diff --git a/backend/dashboard_metrics/management/commands/backfill_metrics.py b/backend/dashboard_metrics/management/commands/backfill_metrics.py index b1a1c3c072..7354c1ca66 100644 --- a/backend/dashboard_metrics/management/commands/backfill_metrics.py +++ b/backend/dashboard_metrics/management/commands/backfill_metrics.py @@ -12,6 +12,7 @@ import logging from datetime import datetime, timedelta +from typing import Any from account_v2.models import Organization from django.core.management.base import BaseCommand @@ -41,25 +42,30 @@ class Command(BaseCommand): help = "Backfill metrics from source tables into aggregated tables" # Metric configurations: (name, query_method, is_histogram) + # LLM metrics are handled separately via get_llm_metrics_split(). METRIC_CONFIGS = [ ("documents_processed", MetricsQueryService.get_documents_processed, False), ("pages_processed", MetricsQueryService.get_pages_processed, True), - ("llm_calls", MetricsQueryService.get_llm_calls, False), - ("challenges", MetricsQueryService.get_challenges, False), - ("summarization_calls", MetricsQueryService.get_summarization_calls, False), ("deployed_api_requests", MetricsQueryService.get_deployed_api_requests, False), ( "etl_pipeline_executions", MetricsQueryService.get_etl_pipeline_executions, False, ), - ("llm_usage", MetricsQueryService.get_llm_usage_cost, True), ("prompt_executions", MetricsQueryService.get_prompt_executions, False), ("failed_pages", MetricsQueryService.get_failed_pages, True), ("hitl_reviews", MetricsQueryService.get_hitl_reviews, False), ("hitl_completions", MetricsQueryService.get_hitl_completions, False), ] + # LLM metric names and whether they are histograms + LLM_METRIC_TYPES: dict[str, bool] = { + "llm_calls": False, + "challenges": False, + "summarization_calls": False, + "llm_usage": True, + } + def add_arguments(self, parser): parser.add_argument( "--days", @@ -273,17 +279,95 @@ def _collect_metrics( daily_agg = {} monthly_agg = {} + def _ingest_results( + results: list[dict[str, Any]], + metric_name: str, + metric_type: str, + ) -> None: + """Ingest query results into hourly/daily/monthly aggregation dicts.""" + for row in results: + period = row["period"] + value = row["value"] or 0 + hour_ts = self._truncate_to_hour(period) + key = (org_id, hour_ts.isoformat(), metric_name, "default", "") + + if key not in hourly_agg: + hourly_agg[key] = { + "metric_type": metric_type, + "value": 0, + "count": 0, + } + hourly_agg[key]["value"] += value + hourly_agg[key]["count"] += 1 + + def _ingest_daily_results( + results: list[dict[str, Any]], + metric_name: str, + metric_type: str, + ) -> None: + """Ingest daily query results into daily and monthly aggregation dicts.""" + for row in results: + period = row["period"] + value = row["value"] or 0 + day_date = period.date() if hasattr(period, "date") else period + key = (org_id, day_date.isoformat(), metric_name, "default", "") + + if key not in daily_agg: + daily_agg[key] = { + "metric_type": metric_type, + "value": 0, + "count": 0, + } + daily_agg[key]["value"] += value + daily_agg[key]["count"] += 1 + + # Monthly aggregation from daily results + if hasattr(period, "date"): + month_date = period.replace(day=1).date() + else: + month_date = period.replace(day=1) + mkey = (org_id, month_date.isoformat(), metric_name, "default", "") + + if mkey not in monthly_agg: + monthly_agg[mkey] = { + "metric_type": metric_type, + "value": 0, + "count": 0, + } + monthly_agg[mkey]["value"] += value + monthly_agg[mkey]["count"] += 1 + + # Fetch all 4 LLM metrics in one query per granularity + try: + for granularity in (Granularity.HOUR, Granularity.DAY): + llm_split = MetricsQueryService.get_llm_metrics_split( + org_id, start_date, end_date, granularity + ) + for metric_name, data in llm_split.items(): + is_histogram = self.LLM_METRIC_TYPES[metric_name] + metric_type = ( + MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER + ) + if granularity == Granularity.HOUR: + _ingest_results(data, metric_name, metric_type) + else: + _ingest_daily_results(data, metric_name, metric_type) + except Exception as e: + logger.warning( + "Error querying LLM metrics for org %s: %s", org_id, e + ) + + # Fetch remaining (non-LLM) metrics individually for metric_name, query_method, is_histogram in self.METRIC_CONFIGS: metric_type = MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER # Pass org_identifier to PageUsage-based metrics to # avoid redundant Organization lookups per call. - extra_kwargs = {} + extra_kwargs: dict[str, Any] = {} if metric_name == "pages_processed": extra_kwargs["org_identifier"] = org_identifier try: - # Query hourly data hourly_results = query_method( org_id, start_date, @@ -291,22 +375,8 @@ def _collect_metrics( granularity=Granularity.HOUR, **extra_kwargs, ) - for row in hourly_results: - period = row["period"] - value = row["value"] or 0 - hour_ts = self._truncate_to_hour(period) - key = (org_id, hour_ts.isoformat(), metric_name, "default", "") - - if key not in hourly_agg: - hourly_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - hourly_agg[key]["value"] += value - hourly_agg[key]["count"] += 1 - - # Query daily data + _ingest_results(hourly_results, metric_name, metric_type) + daily_results = query_method( org_id, start_date, @@ -314,39 +384,7 @@ def _collect_metrics( granularity=Granularity.DAY, **extra_kwargs, ) - for row in daily_results: - period = row["period"] - value = row["value"] or 0 - day_date = period.date() if hasattr(period, "date") else period - key = (org_id, day_date.isoformat(), metric_name, "default", "") - - if key not in daily_agg: - daily_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - daily_agg[key]["value"] += value - daily_agg[key]["count"] += 1 - - # Query for monthly (aggregate daily results by month) - for row in daily_results: - period = row["period"] - value = row["value"] or 0 - if hasattr(period, "date"): - month_date = period.replace(day=1).date() - else: - month_date = period.replace(day=1) - key = (org_id, month_date.isoformat(), metric_name, "default", "") - - if key not in monthly_agg: - monthly_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - monthly_agg[key]["value"] += value - monthly_agg[key]["count"] += 1 + _ingest_daily_results(daily_results, metric_name, metric_type) except Exception as e: logger.warning("Error querying %s for org %s: %s", metric_name, org_id, e) diff --git a/backend/dashboard_metrics/services.py b/backend/dashboard_metrics/services.py index e7d604e41e..5e6c654aa7 100644 --- a/backend/dashboard_metrics/services.py +++ b/backend/dashboard_metrics/services.py @@ -167,17 +167,17 @@ def get_pages_processed( Returns: List of dicts with 'period' and 'value' keys """ - resolved = MetricsQueryService._resolve_org_identifier( + page_usage_org_id = MetricsQueryService._resolve_org_identifier( organization_id, org_identifier ) - if not resolved: + if not page_usage_org_id: return [] trunc_func = MetricsQueryService._get_trunc_func(granularity) return list( PageUsage.objects.filter( - organization_id=resolved, + organization_id=page_usage_org_id, created_at__gte=start_date, created_at__lte=end_date, ) @@ -231,81 +231,37 @@ def get_llm_metrics_combined( .order_by("period") ) - @staticmethod - def get_llm_calls( - organization_id: str, - start_date: datetime, - end_date: datetime, - granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query LLM calls from usage table. - - Thin wrapper for views/backfill that need a single metric. - For batch aggregation, use get_llm_metrics_combined() instead. - """ - return [ - {"period": r["period"], "value": r["llm_calls"]} - for r in MetricsQueryService.get_llm_metrics_combined( - organization_id, start_date, end_date, granularity - ) - ] - - @staticmethod - def get_challenges( - organization_id: str, - start_date: datetime, - end_date: datetime, - granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query challenge calls from usage table. - - Thin wrapper for views/backfill that need a single metric. - For batch aggregation, use get_llm_metrics_combined() instead. - """ - return [ - {"period": r["period"], "value": r["challenges"]} - for r in MetricsQueryService.get_llm_metrics_combined( - organization_id, start_date, end_date, granularity - ) - ] + # Mapping from metric name to the key in get_llm_metrics_combined() results. + LLM_METRIC_KEYS: dict[str, str] = { + "llm_calls": "llm_calls", + "challenges": "challenges", + "summarization_calls": "summarization_calls", + "llm_usage": "llm_usage", + } - @staticmethod - def get_summarization_calls( - organization_id: str, - start_date: datetime, - end_date: datetime, - granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query summarization calls from usage table. - - Thin wrapper for views/backfill that need a single metric. - For batch aggregation, use get_llm_metrics_combined() instead. - """ - return [ - {"period": r["period"], "value": r["summarization_calls"]} - for r in MetricsQueryService.get_llm_metrics_combined( - organization_id, start_date, end_date, granularity - ) - ] - - @staticmethod - def get_llm_usage_cost( + @classmethod + def get_llm_metrics_split( + cls, organization_id: str, start_date: datetime, end_date: datetime, granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query LLM usage cost from usage table. + ) -> dict[str, list[dict[str, Any]]]: + """Fetch combined LLM metrics once and split into per-metric series. - Thin wrapper for views/backfill that need a single metric. - For batch aggregation, use get_llm_metrics_combined() instead. + Returns: + Dict mapping metric name to list of {period, value} dicts. """ - return [ - {"period": r["period"], "value": r["llm_usage"]} - for r in MetricsQueryService.get_llm_metrics_combined( - organization_id, start_date, end_date, granularity - ) - ] + combined = cls.get_llm_metrics_combined( + organization_id, start_date, end_date, granularity + ) + return { + metric_name: [ + {"period": r["period"], "value": r[combined_key]} + for r in combined + ] + for metric_name, combined_key in cls.LLM_METRIC_KEYS.items() + } @staticmethod def get_deployed_api_requests( diff --git a/backend/dashboard_metrics/tasks.py b/backend/dashboard_metrics/tasks.py index 9d78385c6f..d6c2a16820 100644 --- a/backend/dashboard_metrics/tasks.py +++ b/backend/dashboard_metrics/tasks.py @@ -452,7 +452,7 @@ def _run_aggregation() -> dict[str, Any]: # - Daily: Last 7 days (ensures we capture late-arriving data) # - Monthly: Last 2 months (current + previous, ensures month transitions are captured) hourly_start = end_date - timedelta(hours=24) - daily_start = end_date - timedelta(days=7) + daily_start = _truncate_to_day(end_date - timedelta(days=7)) # Include previous month to handle month boundaries if end_date.month == 1: monthly_start = end_date.replace( diff --git a/backend/dashboard_metrics/views.py b/backend/dashboard_metrics/views.py index adb086dbfb..6a0145f1c7 100644 --- a/backend/dashboard_metrics/views.py +++ b/backend/dashboard_metrics/views.py @@ -762,29 +762,71 @@ def live_series(self, request: Request) -> Response: org_id = str(organization.id) granularity = params.get("granularity", Granularity.DAY) - # Define metric query mapping + # Define metric query mapping (excludes LLM metrics handled below) metric_queries = { "documents_processed": MetricsQueryService.get_documents_processed, "pages_processed": MetricsQueryService.get_pages_processed, - "llm_calls": MetricsQueryService.get_llm_calls, - "challenges": MetricsQueryService.get_challenges, - "summarization_calls": MetricsQueryService.get_summarization_calls, "deployed_api_requests": MetricsQueryService.get_deployed_api_requests, "etl_pipeline_executions": MetricsQueryService.get_etl_pipeline_executions, - "llm_usage": MetricsQueryService.get_llm_usage_cost, "prompt_executions": MetricsQueryService.get_prompt_executions, "hitl_reviews": MetricsQueryService.get_hitl_reviews, "hitl_completions": MetricsQueryService.get_hitl_completions, } - # Filter by specific metric if requested - if params.get("metric_name"): - metric_queries = { - k: v for k, v in metric_queries.items() if k == params["metric_name"] - } + requested_metric = params.get("metric_name") + llm_metric_keys = MetricsQueryService.LLM_METRIC_KEYS series = [] errors = [] + + # Fetch all 4 LLM metrics in a single query + if not requested_metric or requested_metric in llm_metric_keys: + try: + llm_split = MetricsQueryService.get_llm_metrics_split( + org_id, + params["start_date"], + params["end_date"], + granularity, + ) + for metric_name, data in llm_split.items(): + if requested_metric and metric_name != requested_metric: + continue + series.append( + { + "metric_name": metric_name, + "metric_type": MetricType.HISTOGRAM + if metric_name == "llm_usage" + else MetricType.COUNTER, + "data": [ + { + "timestamp": r["period"].isoformat(), + "value": r["value"] or 0, + } + for r in data + ], + "total_value": sum(r["value"] or 0 for r in data), + } + ) + except Exception: + logger.exception("Failed to fetch LLM metrics") + for name in llm_metric_keys: + if not requested_metric or name == requested_metric: + errors.append(name) + series.append( + { + "metric_name": name, + "error": "unavailable", + "data": [], + "total_value": 0, + } + ) + + # Filter non-LLM metrics if a specific metric was requested + if requested_metric: + metric_queries = { + k: v for k, v in metric_queries.items() if k == requested_metric + } + for metric_name, query_fn in metric_queries.items(): try: data = query_fn( From 10b6b0dab59576e3c903e74eb5b4690afeee8d11 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:09:04 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../dashboard_metrics/management/commands/backfill_metrics.py | 4 +--- backend/dashboard_metrics/services.py | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/backend/dashboard_metrics/management/commands/backfill_metrics.py b/backend/dashboard_metrics/management/commands/backfill_metrics.py index 7354c1ca66..daf0b3130e 100644 --- a/backend/dashboard_metrics/management/commands/backfill_metrics.py +++ b/backend/dashboard_metrics/management/commands/backfill_metrics.py @@ -353,9 +353,7 @@ def _ingest_daily_results( else: _ingest_daily_results(data, metric_name, metric_type) except Exception as e: - logger.warning( - "Error querying LLM metrics for org %s: %s", org_id, e - ) + logger.warning("Error querying LLM metrics for org %s: %s", org_id, e) # Fetch remaining (non-LLM) metrics individually for metric_name, query_method, is_histogram in self.METRIC_CONFIGS: diff --git a/backend/dashboard_metrics/services.py b/backend/dashboard_metrics/services.py index 5e6c654aa7..f8591c4f07 100644 --- a/backend/dashboard_metrics/services.py +++ b/backend/dashboard_metrics/services.py @@ -257,8 +257,7 @@ def get_llm_metrics_split( ) return { metric_name: [ - {"period": r["period"], "value": r[combined_key]} - for r in combined + {"period": r["period"], "value": r[combined_key]} for r in combined ] for metric_name, combined_key in cls.LLM_METRIC_KEYS.items() } From 6341fcad79fd4717edb0e75957231eea69ee368c Mon Sep 17 00:00:00 2001 From: Athul Date: Tue, 10 Mar 2026 23:28:15 +0530 Subject: [PATCH 5/8] Reduce cognitive complexity in tasks and views - Extract _upsert_agg helper in tasks.py to eliminate repeated dict-init-and-increment pattern in both aggregation functions - Extract _build_series_entry and _build_error_entry helpers in views.py to deduplicate series construction in live_series endpoint Co-Authored-By: Claude Opus 4.6 --- backend/dashboard_metrics/tasks.py | 125 ++++++++--------------------- backend/dashboard_metrics/views.py | 92 ++++++++------------- 2 files changed, 68 insertions(+), 149 deletions(-) diff --git a/backend/dashboard_metrics/tasks.py b/backend/dashboard_metrics/tasks.py index d6c2a16820..a7c72c6c2a 100644 --- a/backend/dashboard_metrics/tasks.py +++ b/backend/dashboard_metrics/tasks.py @@ -34,6 +34,14 @@ DASHBOARD_DAILY_METRICS_RETENTION_DAYS = 365 +def _upsert_agg(agg: dict, key: tuple, metric_type: str, value: float) -> None: + """Add a value to an aggregation dict, creating the entry if needed.""" + if key not in agg: + agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} + agg[key]["value"] += value + agg[key]["count"] += 1 + + def _truncate_to_hour(ts: float | datetime) -> datetime: """Truncate a timestamp to the hour. @@ -306,57 +314,28 @@ def _aggregate_single_metric( # === HOURLY (last 24h) === for row in query_method( - org_id, - hourly_start, - end_date, - granularity=Granularity.HOUR, - **extra_kwargs, + org_id, hourly_start, end_date, + granularity=Granularity.HOUR, **extra_kwargs, ): - value = row["value"] or 0 hour_ts = _truncate_to_hour(row["period"]) key = (org_id, hour_ts.isoformat(), metric_name, "default", "") - if key not in hourly_agg: - hourly_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} - hourly_agg[key]["value"] += value - hourly_agg[key]["count"] += 1 + _upsert_agg(hourly_agg, key, metric_type, row["value"] or 0) # === DAILY + MONTHLY (single query from monthly_start) === - # Query with DAY granularity from monthly_start (2 months back). - # Each row is bucketed into daily_agg if within daily window, - # and always bucketed into monthly_agg for the month rollup. - monthly_buckets: dict[str, dict] = {} for row in query_method( - org_id, - monthly_start, - end_date, - granularity=Granularity.DAY, - **extra_kwargs, + org_id, monthly_start, end_date, + granularity=Granularity.DAY, **extra_kwargs, ): value = row["value"] or 0 day_ts = _truncate_to_day(row["period"]) - # Daily: only include rows within the daily window if day_ts >= daily_start: key = (org_id, day_ts.date().isoformat(), metric_name, "default", "") - if key not in daily_agg: - daily_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} - daily_agg[key]["value"] += value - daily_agg[key]["count"] += 1 - - # Monthly: bucket all rows by month - month_key_str = _truncate_to_month(row["period"]).date().isoformat() - if month_key_str not in monthly_buckets: - monthly_buckets[month_key_str] = {"value": 0, "count": 0} - monthly_buckets[month_key_str]["value"] += value - monthly_buckets[month_key_str]["count"] += 1 - - for month_key_str, bucket in monthly_buckets.items(): - key = (org_id, month_key_str, metric_name, "default", "") - monthly_agg[key] = { - "metric_type": metric_type, - "value": bucket["value"], - "count": bucket["count"] or 1, - } + _upsert_agg(daily_agg, key, metric_type, value) + + month_key = _truncate_to_month(row["period"]).date().isoformat() + key = (org_id, month_key, metric_name, "default", "") + _upsert_agg(monthly_agg, key, metric_type, value) def _aggregate_llm_combined( @@ -378,66 +357,30 @@ def _aggregate_llm_combined( by month) in Python. Same pattern as _aggregate_single_metric. """ # === HOURLY (last 24h) === - hourly_results = MetricsQueryService.get_llm_metrics_combined( - org_id, - hourly_start, - end_date, - granularity=Granularity.HOUR, - ) - for row in hourly_results: - ts = _truncate_to_hour(row["period"]) - ts_str = ts.isoformat() + for row in MetricsQueryService.get_llm_metrics_combined( + org_id, hourly_start, end_date, granularity=Granularity.HOUR, + ): + ts_str = _truncate_to_hour(row["period"]).isoformat() for field, (metric_name, metric_type) in llm_combined_fields.items(): - value = row[field] or 0 key = (org_id, ts_str, metric_name, "default", "") - if key not in hourly_agg: - hourly_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} - hourly_agg[key]["value"] += value - hourly_agg[key]["count"] += 1 + _upsert_agg(hourly_agg, key, metric_type, row[field] or 0) # === DAILY + MONTHLY (single query from monthly_start) === - daily_monthly_results = MetricsQueryService.get_llm_metrics_combined( - org_id, - monthly_start, - end_date, - granularity=Granularity.DAY, - ) - monthly_buckets: dict[tuple[str, str], dict] = {} - for row in daily_monthly_results: + for row in MetricsQueryService.get_llm_metrics_combined( + org_id, monthly_start, end_date, granularity=Granularity.DAY, + ): day_ts = _truncate_to_day(row["period"]) + month_key = _truncate_to_month(row["period"]).date().isoformat() - # Daily: only include rows within the daily window - if day_ts >= daily_start: - ts_str = day_ts.date().isoformat() - for field, (metric_name, metric_type) in llm_combined_fields.items(): - value = row[field] or 0 - key = (org_id, ts_str, metric_name, "default", "") - if key not in daily_agg: - daily_agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} - daily_agg[key]["value"] += value - daily_agg[key]["count"] += 1 - - # Monthly: bucket all rows by month - month_key_str = _truncate_to_month(row["period"]).date().isoformat() for field, (metric_name, metric_type) in llm_combined_fields.items(): value = row[field] or 0 - bkey = (month_key_str, metric_name) - if bkey not in monthly_buckets: - monthly_buckets[bkey] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - monthly_buckets[bkey]["value"] += value - monthly_buckets[bkey]["count"] += 1 - - for (month_key_str, metric_name), bucket in monthly_buckets.items(): - key = (org_id, month_key_str, metric_name, "default", "") - monthly_agg[key] = { - "metric_type": bucket["metric_type"], - "value": bucket["value"], - "count": bucket["count"] or 1, - } + + if day_ts >= daily_start: + key = (org_id, day_ts.date().isoformat(), metric_name, "default", "") + _upsert_agg(daily_agg, key, metric_type, value) + + key = (org_id, month_key, metric_name, "default", "") + _upsert_agg(monthly_agg, key, metric_type, value) def _run_aggregation() -> dict[str, Any]: diff --git a/backend/dashboard_metrics/views.py b/backend/dashboard_metrics/views.py index 6a0145f1c7..7f294d78db 100644 --- a/backend/dashboard_metrics/views.py +++ b/backend/dashboard_metrics/views.py @@ -45,6 +45,33 @@ # Bucket caching for better cache reuse across overlapping queries BUCKET_CACHE_ENABLED = settings.DASHBOARD_BUCKET_CACHE_ENABLED + +def _build_series_entry( + metric_name: str, data: list[dict], +) -> dict: + """Build a single series entry dict from metric query results.""" + return { + "metric_name": metric_name, + "metric_type": MetricType.HISTOGRAM + if metric_name == "llm_usage" + else MetricType.COUNTER, + "data": [ + {"timestamp": r["period"].isoformat(), "value": r["value"] or 0} + for r in data + ], + "total_value": sum(r["value"] or 0 for r in data), + } + + +def _build_error_entry(metric_name: str) -> dict: + """Build an error series entry for a failed metric.""" + return { + "metric_name": metric_name, + "error": "unavailable", + "data": [], + "total_value": 0, + } + # Thresholds for automatic source selection (in days) HOURLY_MAX_DAYS = 7 # Use hourly for ≤7 days DAILY_MAX_DAYS = 90 # Use daily for ≤90 days, monthly for >90 days @@ -783,43 +810,17 @@ def live_series(self, request: Request) -> Response: if not requested_metric or requested_metric in llm_metric_keys: try: llm_split = MetricsQueryService.get_llm_metrics_split( - org_id, - params["start_date"], - params["end_date"], - granularity, + org_id, params["start_date"], params["end_date"], granularity, ) for metric_name, data in llm_split.items(): - if requested_metric and metric_name != requested_metric: - continue - series.append( - { - "metric_name": metric_name, - "metric_type": MetricType.HISTOGRAM - if metric_name == "llm_usage" - else MetricType.COUNTER, - "data": [ - { - "timestamp": r["period"].isoformat(), - "value": r["value"] or 0, - } - for r in data - ], - "total_value": sum(r["value"] or 0 for r in data), - } - ) + if not requested_metric or metric_name == requested_metric: + series.append(_build_series_entry(metric_name, data)) except Exception: logger.exception("Failed to fetch LLM metrics") for name in llm_metric_keys: if not requested_metric or name == requested_metric: errors.append(name) - series.append( - { - "metric_name": name, - "error": "unavailable", - "data": [], - "total_value": 0, - } - ) + series.append(_build_error_entry(name)) # Filter non-LLM metrics if a specific metric was requested if requested_metric: @@ -830,38 +831,13 @@ def live_series(self, request: Request) -> Response: for metric_name, query_fn in metric_queries.items(): try: data = query_fn( - org_id, - params["start_date"], - params["end_date"], - granularity, - ) - series.append( - { - "metric_name": metric_name, - "metric_type": MetricType.HISTOGRAM - if metric_name == "llm_usage" - else MetricType.COUNTER, - "data": [ - { - "timestamp": r["period"].isoformat(), - "value": r["value"] or 0, - } - for r in data - ], - "total_value": sum(r["value"] or 0 for r in data), - } + org_id, params["start_date"], params["end_date"], granularity, ) + series.append(_build_series_entry(metric_name, data)) except Exception: logger.exception("Failed to fetch metric %s", metric_name) errors.append(metric_name) - series.append( - { - "metric_name": metric_name, - "error": "unavailable", - "data": [], - "total_value": 0, - } - ) + series.append(_build_error_entry(metric_name)) response_data = { "start_date": params["start_date"].isoformat(), From d20a95a46e28c496a7ff5ea57993cc993f473fde Mon Sep 17 00:00:00 2001 From: Athul Date: Tue, 10 Mar 2026 23:33:07 +0530 Subject: [PATCH 6/8] Extract _fetch_live_series to reduce live_series complexity Move LLM and non-LLM metric fetching logic out of the view method into a standalone function, bringing the view's cognitive complexity well under the Sonar threshold of 15. Co-Authored-By: Claude Opus 4.6 --- backend/dashboard_metrics/views.py | 97 ++++++++++++++++++------------ 1 file changed, 59 insertions(+), 38 deletions(-) diff --git a/backend/dashboard_metrics/views.py b/backend/dashboard_metrics/views.py index 7f294d78db..d884a4bd01 100644 --- a/backend/dashboard_metrics/views.py +++ b/backend/dashboard_metrics/views.py @@ -72,6 +72,57 @@ def _build_error_entry(metric_name: str) -> dict: "total_value": 0, } + +def _fetch_live_series( + org_id: str, + start_date: datetime, + end_date: datetime, + granularity: str, + metric_queries: dict, + requested_metric: str | None, +) -> tuple[list[dict], list[str]]: + """Fetch all metric series (LLM combined + individual). + + Returns: + Tuple of (series list, error names list). + """ + series: list[dict] = [] + errors: list[str] = [] + llm_metric_keys = MetricsQueryService.LLM_METRIC_KEYS + + # Fetch all 4 LLM metrics in a single query + if not requested_metric or requested_metric in llm_metric_keys: + try: + llm_split = MetricsQueryService.get_llm_metrics_split( + org_id, start_date, end_date, granularity, + ) + for name, data in llm_split.items(): + if not requested_metric or name == requested_metric: + series.append(_build_series_entry(name, data)) + except Exception: + logger.exception("Failed to fetch LLM metrics") + for name in llm_metric_keys: + if not requested_metric or name == requested_metric: + errors.append(name) + series.append(_build_error_entry(name)) + + # Filter non-LLM metrics if a specific metric was requested + if requested_metric: + metric_queries = { + k: v for k, v in metric_queries.items() if k == requested_metric + } + + for name, query_fn in metric_queries.items(): + try: + data = query_fn(org_id, start_date, end_date, granularity) + series.append(_build_series_entry(name, data)) + except Exception: + logger.exception("Failed to fetch metric %s", name) + errors.append(name) + series.append(_build_error_entry(name)) + + return series, errors + # Thresholds for automatic source selection (in days) HOURLY_MAX_DAYS = 7 # Use hourly for ≤7 days DAILY_MAX_DAYS = 90 # Use daily for ≤90 days, monthly for >90 days @@ -800,44 +851,14 @@ def live_series(self, request: Request) -> Response: "hitl_completions": MetricsQueryService.get_hitl_completions, } - requested_metric = params.get("metric_name") - llm_metric_keys = MetricsQueryService.LLM_METRIC_KEYS - - series = [] - errors = [] - - # Fetch all 4 LLM metrics in a single query - if not requested_metric or requested_metric in llm_metric_keys: - try: - llm_split = MetricsQueryService.get_llm_metrics_split( - org_id, params["start_date"], params["end_date"], granularity, - ) - for metric_name, data in llm_split.items(): - if not requested_metric or metric_name == requested_metric: - series.append(_build_series_entry(metric_name, data)) - except Exception: - logger.exception("Failed to fetch LLM metrics") - for name in llm_metric_keys: - if not requested_metric or name == requested_metric: - errors.append(name) - series.append(_build_error_entry(name)) - - # Filter non-LLM metrics if a specific metric was requested - if requested_metric: - metric_queries = { - k: v for k, v in metric_queries.items() if k == requested_metric - } - - for metric_name, query_fn in metric_queries.items(): - try: - data = query_fn( - org_id, params["start_date"], params["end_date"], granularity, - ) - series.append(_build_series_entry(metric_name, data)) - except Exception: - logger.exception("Failed to fetch metric %s", metric_name) - errors.append(metric_name) - series.append(_build_error_entry(metric_name)) + series, errors = _fetch_live_series( + org_id, + params["start_date"], + params["end_date"], + granularity, + metric_queries, + params.get("metric_name"), + ) response_data = { "start_date": params["start_date"].isoformat(), From 0cece48c67b4f8db5f5af016bdd31d7c2257cef8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:03:40 +0000 Subject: [PATCH 7/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- backend/dashboard_metrics/tasks.py | 24 ++++++++++++++++++------ backend/dashboard_metrics/views.py | 12 ++++++++---- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/backend/dashboard_metrics/tasks.py b/backend/dashboard_metrics/tasks.py index a7c72c6c2a..7208b40716 100644 --- a/backend/dashboard_metrics/tasks.py +++ b/backend/dashboard_metrics/tasks.py @@ -314,8 +314,11 @@ def _aggregate_single_metric( # === HOURLY (last 24h) === for row in query_method( - org_id, hourly_start, end_date, - granularity=Granularity.HOUR, **extra_kwargs, + org_id, + hourly_start, + end_date, + granularity=Granularity.HOUR, + **extra_kwargs, ): hour_ts = _truncate_to_hour(row["period"]) key = (org_id, hour_ts.isoformat(), metric_name, "default", "") @@ -323,8 +326,11 @@ def _aggregate_single_metric( # === DAILY + MONTHLY (single query from monthly_start) === for row in query_method( - org_id, monthly_start, end_date, - granularity=Granularity.DAY, **extra_kwargs, + org_id, + monthly_start, + end_date, + granularity=Granularity.DAY, + **extra_kwargs, ): value = row["value"] or 0 day_ts = _truncate_to_day(row["period"]) @@ -358,7 +364,10 @@ def _aggregate_llm_combined( """ # === HOURLY (last 24h) === for row in MetricsQueryService.get_llm_metrics_combined( - org_id, hourly_start, end_date, granularity=Granularity.HOUR, + org_id, + hourly_start, + end_date, + granularity=Granularity.HOUR, ): ts_str = _truncate_to_hour(row["period"]).isoformat() for field, (metric_name, metric_type) in llm_combined_fields.items(): @@ -367,7 +376,10 @@ def _aggregate_llm_combined( # === DAILY + MONTHLY (single query from monthly_start) === for row in MetricsQueryService.get_llm_metrics_combined( - org_id, monthly_start, end_date, granularity=Granularity.DAY, + org_id, + monthly_start, + end_date, + granularity=Granularity.DAY, ): day_ts = _truncate_to_day(row["period"]) month_key = _truncate_to_month(row["period"]).date().isoformat() diff --git a/backend/dashboard_metrics/views.py b/backend/dashboard_metrics/views.py index d884a4bd01..424da701b4 100644 --- a/backend/dashboard_metrics/views.py +++ b/backend/dashboard_metrics/views.py @@ -47,7 +47,8 @@ def _build_series_entry( - metric_name: str, data: list[dict], + metric_name: str, + data: list[dict], ) -> dict: """Build a single series entry dict from metric query results.""" return { @@ -56,8 +57,7 @@ def _build_series_entry( if metric_name == "llm_usage" else MetricType.COUNTER, "data": [ - {"timestamp": r["period"].isoformat(), "value": r["value"] or 0} - for r in data + {"timestamp": r["period"].isoformat(), "value": r["value"] or 0} for r in data ], "total_value": sum(r["value"] or 0 for r in data), } @@ -94,7 +94,10 @@ def _fetch_live_series( if not requested_metric or requested_metric in llm_metric_keys: try: llm_split = MetricsQueryService.get_llm_metrics_split( - org_id, start_date, end_date, granularity, + org_id, + start_date, + end_date, + granularity, ) for name, data in llm_split.items(): if not requested_metric or name == requested_metric: @@ -123,6 +126,7 @@ def _fetch_live_series( return series, errors + # Thresholds for automatic source selection (in days) HOURLY_MAX_DAYS = 7 # Use hourly for ≤7 days DAILY_MAX_DAYS = 90 # Use daily for ≤90 days, monthly for >90 days From 3609c9ed8fe40b8db31b153c3d99213cc7e4ce56 Mon Sep 17 00:00:00 2001 From: Athul Date: Tue, 10 Mar 2026 23:44:29 +0530 Subject: [PATCH 8/8] Fix metric_type for histogram metrics in views Add _HISTOGRAM_METRICS set and _metric_type helper to correctly classify pages_processed and failed_pages as histograms, consistent with tasks.py and backfill_metrics.py. Previously only llm_usage was marked as histogram in the views layer. Co-Authored-By: Claude Opus 4.6 --- backend/dashboard_metrics/views.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/backend/dashboard_metrics/views.py b/backend/dashboard_metrics/views.py index 424da701b4..c01c56908a 100644 --- a/backend/dashboard_metrics/views.py +++ b/backend/dashboard_metrics/views.py @@ -45,6 +45,14 @@ # Bucket caching for better cache reuse across overlapping queries BUCKET_CACHE_ENABLED = settings.DASHBOARD_BUCKET_CACHE_ENABLED +# Metrics that use histogram type; everything else is a counter. +_HISTOGRAM_METRICS = frozenset({"llm_usage", "pages_processed", "failed_pages"}) + + +def _metric_type(name: str) -> str: + """Return the MetricType for a given metric name.""" + return MetricType.HISTOGRAM if name in _HISTOGRAM_METRICS else MetricType.COUNTER + def _build_series_entry( metric_name: str, @@ -53,9 +61,7 @@ def _build_series_entry( """Build a single series entry dict from metric query results.""" return { "metric_name": metric_name, - "metric_type": MetricType.HISTOGRAM - if metric_name == "llm_usage" - else MetricType.COUNTER, + "metric_type": _metric_type(metric_name), "data": [ {"timestamp": r["period"].isoformat(), "value": r["value"] or 0} for r in data ], @@ -797,9 +803,7 @@ def live_summary(self, request: Request) -> Response: summary_list = [ { "metric_name": name, - "metric_type": MetricType.HISTOGRAM - if name == "llm_usage" - else MetricType.COUNTER, + "metric_type": _metric_type(name), "total_value": value, "total_count": 1, # Not available from live queries "average_value": value, # Same as total for live data