diff --git a/backend/dashboard_metrics/management/commands/backfill_metrics.py b/backend/dashboard_metrics/management/commands/backfill_metrics.py index 4c71fcd329..daf0b3130e 100644 --- a/backend/dashboard_metrics/management/commands/backfill_metrics.py +++ b/backend/dashboard_metrics/management/commands/backfill_metrics.py @@ -12,6 +12,7 @@ import logging from datetime import datetime, timedelta +from typing import Any from account_v2.models import Organization from django.core.management.base import BaseCommand @@ -41,25 +42,30 @@ class Command(BaseCommand): help = "Backfill metrics from source tables into aggregated tables" # Metric configurations: (name, query_method, is_histogram) + # LLM metrics are handled separately via get_llm_metrics_split(). METRIC_CONFIGS = [ ("documents_processed", MetricsQueryService.get_documents_processed, False), ("pages_processed", MetricsQueryService.get_pages_processed, True), - ("llm_calls", MetricsQueryService.get_llm_calls, False), - ("challenges", MetricsQueryService.get_challenges, False), - ("summarization_calls", MetricsQueryService.get_summarization_calls, False), ("deployed_api_requests", MetricsQueryService.get_deployed_api_requests, False), ( "etl_pipeline_executions", MetricsQueryService.get_etl_pipeline_executions, False, ), - ("llm_usage", MetricsQueryService.get_llm_usage_cost, True), ("prompt_executions", MetricsQueryService.get_prompt_executions, False), ("failed_pages", MetricsQueryService.get_failed_pages, True), ("hitl_reviews", MetricsQueryService.get_hitl_reviews, False), ("hitl_completions", MetricsQueryService.get_hitl_completions, False), ] + # LLM metric names and whether they are histograms + LLM_METRIC_TYPES: dict[str, bool] = { + "llm_calls": False, + "challenges": False, + "summarization_calls": False, + "llm_usage": True, + } + def add_arguments(self, parser): parser.add_argument( "--days", @@ -139,15 +145,32 @@ def handle(self, *args, **options): "errors": 0, } + # Pre-resolve org identifiers for PageUsage queries (avoids + # redundant Organization lookups inside the metric query loop). + org_identifiers = dict( + Organization.objects.filter( + id__in=[int(oid) if oid.isdigit() else oid for oid in org_ids] + ).values_list("id", "organization_id") + ) + for i, current_org_id in enumerate(org_ids): self.stdout.write( f"\n[{i + 1}/{len(org_ids)}] Processing org: {current_org_id}" ) try: + # Resolve org string identifier for PageUsage queries + org_id_key = ( + int(current_org_id) if current_org_id.isdigit() else current_org_id + ) + org_identifier = org_identifiers.get(org_id_key) + # Collect all metric data for this org hourly_data, daily_data, monthly_data = self._collect_metrics( - current_org_id, start_date, end_date + current_org_id, + start_date, + end_date, + org_identifier=org_identifier, ) self.stdout.write( @@ -245,73 +268,121 @@ def _resolve_org_ids( return sorted(str(oid) for oid in all_org_ids) def _collect_metrics( - self, org_id: str, start_date: datetime, end_date: datetime + self, + org_id: str, + start_date: datetime, + end_date: datetime, + org_identifier: str | None = None, ) -> tuple[dict, dict, dict]: """Collect metrics from source tables for all granularities.""" hourly_agg = {} daily_agg = {} monthly_agg = {} + def _ingest_results( + results: list[dict[str, Any]], + metric_name: str, + metric_type: str, + ) -> None: + """Ingest query results into hourly/daily/monthly aggregation dicts.""" + for row in results: + period = row["period"] + value = row["value"] or 0 + hour_ts = self._truncate_to_hour(period) + key = (org_id, hour_ts.isoformat(), metric_name, "default", "") + + if key not in hourly_agg: + hourly_agg[key] = { + "metric_type": metric_type, + "value": 0, + "count": 0, + } + hourly_agg[key]["value"] += value + hourly_agg[key]["count"] += 1 + + def _ingest_daily_results( + results: list[dict[str, Any]], + metric_name: str, + metric_type: str, + ) -> None: + """Ingest daily query results into daily and monthly aggregation dicts.""" + for row in results: + period = row["period"] + value = row["value"] or 0 + day_date = period.date() if hasattr(period, "date") else period + key = (org_id, day_date.isoformat(), metric_name, "default", "") + + if key not in daily_agg: + daily_agg[key] = { + "metric_type": metric_type, + "value": 0, + "count": 0, + } + daily_agg[key]["value"] += value + daily_agg[key]["count"] += 1 + + # Monthly aggregation from daily results + if hasattr(period, "date"): + month_date = period.replace(day=1).date() + else: + month_date = period.replace(day=1) + mkey = (org_id, month_date.isoformat(), metric_name, "default", "") + + if mkey not in monthly_agg: + monthly_agg[mkey] = { + "metric_type": metric_type, + "value": 0, + "count": 0, + } + monthly_agg[mkey]["value"] += value + monthly_agg[mkey]["count"] += 1 + + # Fetch all 4 LLM metrics in one query per granularity + try: + for granularity in (Granularity.HOUR, Granularity.DAY): + llm_split = MetricsQueryService.get_llm_metrics_split( + org_id, start_date, end_date, granularity + ) + for metric_name, data in llm_split.items(): + is_histogram = self.LLM_METRIC_TYPES[metric_name] + metric_type = ( + MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER + ) + if granularity == Granularity.HOUR: + _ingest_results(data, metric_name, metric_type) + else: + _ingest_daily_results(data, metric_name, metric_type) + except Exception as e: + logger.warning("Error querying LLM metrics for org %s: %s", org_id, e) + + # Fetch remaining (non-LLM) metrics individually for metric_name, query_method, is_histogram in self.METRIC_CONFIGS: metric_type = MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER + # Pass org_identifier to PageUsage-based metrics to + # avoid redundant Organization lookups per call. + extra_kwargs: dict[str, Any] = {} + if metric_name == "pages_processed": + extra_kwargs["org_identifier"] = org_identifier + try: - # Query hourly data hourly_results = query_method( - org_id, start_date, end_date, granularity=Granularity.HOUR + org_id, + start_date, + end_date, + granularity=Granularity.HOUR, + **extra_kwargs, ) - for row in hourly_results: - period = row["period"] - value = row["value"] or 0 - hour_ts = self._truncate_to_hour(period) - key = (org_id, hour_ts.isoformat(), metric_name, "default", "") - - if key not in hourly_agg: - hourly_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - hourly_agg[key]["value"] += value - hourly_agg[key]["count"] += 1 - - # Query daily data + _ingest_results(hourly_results, metric_name, metric_type) + daily_results = query_method( - org_id, start_date, end_date, granularity=Granularity.DAY + org_id, + start_date, + end_date, + granularity=Granularity.DAY, + **extra_kwargs, ) - for row in daily_results: - period = row["period"] - value = row["value"] or 0 - day_date = period.date() if hasattr(period, "date") else period - key = (org_id, day_date.isoformat(), metric_name, "default", "") - - if key not in daily_agg: - daily_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - daily_agg[key]["value"] += value - daily_agg[key]["count"] += 1 - - # Query for monthly (aggregate daily results by month) - for row in daily_results: - period = row["period"] - value = row["value"] or 0 - if hasattr(period, "date"): - month_date = period.replace(day=1).date() - else: - month_date = period.replace(day=1) - key = (org_id, month_date.isoformat(), metric_name, "default", "") - - if key not in monthly_agg: - monthly_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - monthly_agg[key]["value"] += value - monthly_agg[key]["count"] += 1 + _ingest_daily_results(daily_results, metric_name, metric_type) except Exception as e: logger.warning("Error querying %s for org %s: %s", metric_name, org_id, e) diff --git a/backend/dashboard_metrics/services.py b/backend/dashboard_metrics/services.py index e382f62617..f8591c4f07 100644 --- a/backend/dashboard_metrics/services.py +++ b/backend/dashboard_metrics/services.py @@ -14,7 +14,7 @@ from account_usage.models import PageUsage from account_v2.models import Organization from api_v2.models import APIDeployment -from django.db.models import CharField, Count, OuterRef, Subquery, Sum +from django.db.models import CharField, Count, OuterRef, Q, Subquery, Sum from django.db.models.functions import Cast, Coalesce, TruncDay, TruncHour, TruncWeek from pipeline_v2.models import Pipeline from usage_v2.models import Usage @@ -115,95 +115,90 @@ def get_documents_processed( ) @staticmethod - def get_pages_processed( - organization_id: str, - start_date: datetime, - end_date: datetime, - granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query pages processed from page_usage. - - Sums pages_processed field grouped by time period. + def _resolve_org_identifier( + organization_id: str, org_identifier: str | None = None + ) -> str | None: + """Resolve PageUsage's string org identifier from UUID PK. - Note: PageUsage.organization_id stores the Organization's string - identifier (organization.organization_id), NOT the UUID PK. - We look up the string identifier from the UUID passed by callers. + PageUsage.organization_id stores Organization.organization_id (a string + like "org_abc123"), not the UUID PK used everywhere else. This helper + resolves the string identifier, accepting a pre-resolved value to avoid + redundant DB lookups when called in a loop. Args: - organization_id: Organization UUID string - start_date: Start of date range - end_date: End of date range - granularity: Time granularity (hour, day, week) + organization_id: Organization UUID string (PK) + org_identifier: Pre-resolved string identifier (skips DB lookup) Returns: - List of dicts with 'period' and 'value' keys + Organization string identifier, or None if org not found """ + if org_identifier: + return org_identifier try: org = Organization.objects.get(id=organization_id) - org_identifier = org.organization_id + return org.organization_id except Organization.DoesNotExist: - return [] - - trunc_func = MetricsQueryService._get_trunc_func(granularity) - - return list( - PageUsage.objects.filter( - organization_id=org_identifier, - created_at__gte=start_date, - created_at__lte=end_date, - ) - .annotate(period=trunc_func("created_at")) - .values("period") - .annotate(value=Sum("pages_processed")) - .order_by("period") - ) + return None @staticmethod - def get_llm_calls( + def get_pages_processed( organization_id: str, start_date: datetime, end_date: datetime, granularity: str = Granularity.DAY, + org_identifier: str | None = None, ) -> list[dict[str, Any]]: - """Query LLM calls from usage table. + """Query pages processed from page_usage. + + Sums pages_processed field grouped by time period. - Counts usage records where usage_type='llm' grouped by time period. + Note: PageUsage.organization_id stores the Organization's string + identifier (organization.organization_id), NOT the UUID PK. + Pass org_identifier to avoid a DB lookup per call. Args: organization_id: Organization UUID string start_date: Start of date range end_date: End of date range granularity: Time granularity (hour, day, week) + org_identifier: Pre-resolved Organization.organization_id string. + If None, will be looked up from organization_id. Returns: List of dicts with 'period' and 'value' keys """ + page_usage_org_id = MetricsQueryService._resolve_org_identifier( + organization_id, org_identifier + ) + if not page_usage_org_id: + return [] + trunc_func = MetricsQueryService._get_trunc_func(granularity) return list( - _get_usage_queryset() - .filter( - organization_id=organization_id, - usage_type="llm", + PageUsage.objects.filter( + organization_id=page_usage_org_id, created_at__gte=start_date, created_at__lte=end_date, ) .annotate(period=trunc_func("created_at")) .values("period") - .annotate(value=Count("id")) + .annotate(value=Sum("pages_processed")) .order_by("period") ) @staticmethod - def get_challenges( + def get_llm_metrics_combined( organization_id: str, start_date: datetime, end_date: datetime, granularity: str = Granularity.DAY, ) -> list[dict[str, Any]]: - """Query challenge calls from usage table. + """Query all LLM metrics from usage table in a single query. - Counts usage records where llm_usage_reason='challenge' grouped by time period. + Uses conditional aggregation to compute llm_calls, challenges, + summarization_calls, and llm_usage (cost) in one DB round-trip + instead of four separate queries. Args: organization_id: Organization UUID string @@ -212,7 +207,8 @@ def get_challenges( granularity: Time granularity (hour, day, week) Returns: - List of dicts with 'period' and 'value' keys + List of dicts with 'period', 'llm_calls', 'challenges', + 'summarization_calls', and 'llm_usage' keys """ trunc_func = MetricsQueryService._get_trunc_func(granularity) @@ -221,52 +217,50 @@ def get_challenges( .filter( organization_id=organization_id, usage_type="llm", - llm_usage_reason="challenge", created_at__gte=start_date, created_at__lte=end_date, ) .annotate(period=trunc_func("created_at")) .values("period") - .annotate(value=Count("id")) + .annotate( + llm_calls=Count("id"), + challenges=Count("id", filter=Q(llm_usage_reason="challenge")), + summarization_calls=Count("id", filter=Q(llm_usage_reason="summarize")), + llm_usage=Sum("cost_in_dollars"), + ) .order_by("period") ) - @staticmethod - def get_summarization_calls( + # Mapping from metric name to the key in get_llm_metrics_combined() results. + LLM_METRIC_KEYS: dict[str, str] = { + "llm_calls": "llm_calls", + "challenges": "challenges", + "summarization_calls": "summarization_calls", + "llm_usage": "llm_usage", + } + + @classmethod + def get_llm_metrics_split( + cls, organization_id: str, start_date: datetime, end_date: datetime, granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query summarization calls from usage table. - - Counts usage records where llm_usage_reason='summarize' grouped by time period. - - Args: - organization_id: Organization UUID string - start_date: Start of date range - end_date: End of date range - granularity: Time granularity (hour, day, week) + ) -> dict[str, list[dict[str, Any]]]: + """Fetch combined LLM metrics once and split into per-metric series. Returns: - List of dicts with 'period' and 'value' keys + Dict mapping metric name to list of {period, value} dicts. """ - trunc_func = MetricsQueryService._get_trunc_func(granularity) - - return list( - _get_usage_queryset() - .filter( - organization_id=organization_id, - usage_type="llm", - llm_usage_reason="summarize", - created_at__gte=start_date, - created_at__lte=end_date, - ) - .annotate(period=trunc_func("created_at")) - .values("period") - .annotate(value=Count("id")) - .order_by("period") + combined = cls.get_llm_metrics_combined( + organization_id, start_date, end_date, granularity ) + return { + metric_name: [ + {"period": r["period"], "value": r[combined_key]} for r in combined + ] + for metric_name, combined_key in cls.LLM_METRIC_KEYS.items() + } @staticmethod def get_deployed_api_requests( @@ -355,42 +349,6 @@ def get_etl_pipeline_executions( .order_by("period") ) - @staticmethod - def get_llm_usage_cost( - organization_id: str, - start_date: datetime, - end_date: datetime, - granularity: str = Granularity.DAY, - ) -> list[dict[str, Any]]: - """Query LLM usage cost from usage table. - - Sums cost_in_dollars for LLM usage records grouped by time period. - - Args: - organization_id: Organization UUID string - start_date: Start of date range - end_date: End of date range - granularity: Time granularity (hour, day, week) - - Returns: - List of dicts with 'period' and 'value' keys - """ - trunc_func = MetricsQueryService._get_trunc_func(granularity) - - return list( - _get_usage_queryset() - .filter( - organization_id=organization_id, - usage_type="llm", - created_at__gte=start_date, - created_at__lte=end_date, - ) - .annotate(period=trunc_func("created_at")) - .values("period") - .annotate(value=Sum("cost_in_dollars")) - .order_by("period") - ) - @staticmethod def get_prompt_executions( organization_id: str, @@ -437,6 +395,10 @@ def get_failed_pages( Sums pages_processed for file executions with status='ERROR', grouped by time period based on when the failure occurred. + Note: Unlike get_pages_processed, this does NOT need org_identifier + because it joins through WorkflowFileExecution -> WorkflowExecution -> + Workflow -> Organization (UUID FK), not through PageUsage.organization_id. + Args: organization_id: Organization UUID string start_date: Start of date range @@ -576,6 +538,16 @@ def get_all_metrics_summary( Returns: Dict mapping metric name to total value """ + # Resolve org identifier once for PageUsage queries + org_identifier = cls._resolve_org_identifier(organization_id) + + # Combined LLM metrics (1 query instead of 4) + llm_combined = cls.get_llm_metrics_combined(organization_id, start_date, end_date) + llm_calls_total = sum(r["llm_calls"] for r in llm_combined) + challenges_total = sum(r["challenges"] for r in llm_combined) + summarization_total = sum(r["summarization_calls"] for r in llm_combined) + llm_usage_total = sum(r["llm_usage"] or 0 for r in llm_combined) + return { "documents_processed": sum( r["value"] @@ -585,22 +557,16 @@ def get_all_metrics_summary( ), "pages_processed": sum( r["value"] or 0 - for r in cls.get_pages_processed(organization_id, start_date, end_date) - ), - "llm_calls": sum( - r["value"] - for r in cls.get_llm_calls(organization_id, start_date, end_date) - ), - "challenges": sum( - r["value"] - for r in cls.get_challenges(organization_id, start_date, end_date) - ), - "summarization_calls": sum( - r["value"] - for r in cls.get_summarization_calls( - organization_id, start_date, end_date + for r in cls.get_pages_processed( + organization_id, + start_date, + end_date, + org_identifier=org_identifier, ) ), + "llm_calls": llm_calls_total, + "challenges": challenges_total, + "summarization_calls": summarization_total, "deployed_api_requests": sum( r["value"] for r in cls.get_deployed_api_requests( @@ -613,10 +579,7 @@ def get_all_metrics_summary( organization_id, start_date, end_date ) ), - "llm_usage": sum( - r["value"] or 0 - for r in cls.get_llm_usage_cost(organization_id, start_date, end_date) - ), + "llm_usage": llm_usage_total, "prompt_executions": sum( r["value"] for r in cls.get_prompt_executions(organization_id, start_date, end_date) diff --git a/backend/dashboard_metrics/tasks.py b/backend/dashboard_metrics/tasks.py index 879bc8d1fc..7208b40716 100644 --- a/backend/dashboard_metrics/tasks.py +++ b/backend/dashboard_metrics/tasks.py @@ -7,6 +7,7 @@ """ import logging +import time from datetime import datetime, timedelta from typing import Any @@ -33,6 +34,14 @@ DASHBOARD_DAILY_METRICS_RETENTION_DAYS = 365 +def _upsert_agg(agg: dict, key: tuple, metric_type: str, value: float) -> None: + """Add a value to an aggregation dict, creating the entry if needed.""" + if key not in agg: + agg[key] = {"metric_type": metric_type, "value": 0, "count": 0} + agg[key]["value"] += value + agg[key]["count"] += 1 + + def _truncate_to_hour(ts: float | datetime) -> datetime: """Truncate a timestamp to the hour. @@ -199,6 +208,49 @@ def _bulk_upsert_monthly(aggregations: dict) -> int: AGGREGATION_LOCK_TIMEOUT = 900 # 15 minutes (matches task schedule) +def _acquire_aggregation_lock() -> bool: + """Acquire the distributed aggregation lock with self-healing. + + Stores a Unix timestamp as the lock value. If a previous run crashed + (OOM kill, SIGKILL) without releasing the lock, the next run detects + that the lock is older than AGGREGATION_LOCK_TIMEOUT and reclaims it. + + Returns: + True if lock was acquired, False if another run is legitimately active. + """ + now = time.time() + + # Fast path: lock is free + if cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT): + return True + + # Lock exists — check if it's stale (previous run died without releasing) + lock_value = cache.get(AGGREGATION_LOCK_KEY) + if lock_value is None: + # Expired between our check and get — race is fine, next run will pick up + return False + + try: + lock_time = float(lock_value) + except (TypeError, ValueError): + # Corrupted value (e.g. old "running" string) — reclaim it + logger.warning("Reclaiming aggregation lock with invalid value: %s", lock_value) + cache.delete(AGGREGATION_LOCK_KEY) + return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT) + + age = now - lock_time + if age > AGGREGATION_LOCK_TIMEOUT: + logger.warning( + "Reclaiming stale aggregation lock (age=%.0fs, timeout=%ds)", + age, + AGGREGATION_LOCK_TIMEOUT, + ) + cache.delete(AGGREGATION_LOCK_KEY) + return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT) + + return False + + @shared_task( name="dashboard_metrics.aggregate_from_sources", soft_time_limit=600, @@ -216,19 +268,19 @@ def aggregate_metrics_from_sources() -> dict[str, Any]: them into EventMetricsHourly, EventMetricsDaily, and EventMetricsMonthly tables for fast dashboard queries at different granularities. - Uses a Redis distributed lock to prevent overlapping runs when a task - takes longer than the 15-minute schedule interval. + Uses a Redis distributed lock with self-healing to prevent overlapping + runs. If a previous run was killed without releasing the lock, the next + run detects the stale lock and reclaims it automatically. Aggregation windows: - Hourly: Last 24 hours (rolling window) - - Daily: Last 2 days (to catch day boundaries) - - Monthly: Current month (updates running total) + - Daily: Last 7 days (ensures we capture late-arriving data) + - Monthly: Last 2 months (current + previous month) Returns: Dict with aggregation summary for all three tiers """ - # Acquire distributed lock to prevent overlapping runs - if not cache.add(AGGREGATION_LOCK_KEY, "running", AGGREGATION_LOCK_TIMEOUT): + if not _acquire_aggregation_lock(): logger.info("Skipping aggregation — another run is in progress") return {"success": True, "skipped": True, "reason": "lock_held"} @@ -238,6 +290,111 @@ def aggregate_metrics_from_sources() -> dict[str, Any]: cache.delete(AGGREGATION_LOCK_KEY) +def _aggregate_single_metric( + query_method, + metric_name: str, + metric_type: str, + org_id: str, + hourly_start: datetime, + daily_start: datetime, + monthly_start: datetime, + end_date: datetime, + hourly_agg: dict, + daily_agg: dict, + monthly_agg: dict, + extra_kwargs: dict | None = None, +) -> None: + """Run a single metric query at all 3 granularities and populate agg dicts. + + Uses 2 queries instead of 3: the daily query is widened to monthly_start + and its results are split into both daily_agg and monthly_agg in Python. + This is the same pattern proven in the backfill management command. + """ + extra_kwargs = extra_kwargs or {} + + # === HOURLY (last 24h) === + for row in query_method( + org_id, + hourly_start, + end_date, + granularity=Granularity.HOUR, + **extra_kwargs, + ): + hour_ts = _truncate_to_hour(row["period"]) + key = (org_id, hour_ts.isoformat(), metric_name, "default", "") + _upsert_agg(hourly_agg, key, metric_type, row["value"] or 0) + + # === DAILY + MONTHLY (single query from monthly_start) === + for row in query_method( + org_id, + monthly_start, + end_date, + granularity=Granularity.DAY, + **extra_kwargs, + ): + value = row["value"] or 0 + day_ts = _truncate_to_day(row["period"]) + + if day_ts >= daily_start: + key = (org_id, day_ts.date().isoformat(), metric_name, "default", "") + _upsert_agg(daily_agg, key, metric_type, value) + + month_key = _truncate_to_month(row["period"]).date().isoformat() + key = (org_id, month_key, metric_name, "default", "") + _upsert_agg(monthly_agg, key, metric_type, value) + + +def _aggregate_llm_combined( + org_id: str, + hourly_start: datetime, + daily_start: datetime, + monthly_start: datetime, + end_date: datetime, + hourly_agg: dict, + daily_agg: dict, + monthly_agg: dict, + llm_combined_fields: dict, +) -> None: + """Run the combined LLM metrics query at all granularities. + + Issues 2 queries total (hourly + daily/monthly) instead of 3. + The DAY-granularity query is widened to monthly_start and results are + split into daily_agg (recent rows) and monthly_agg (all rows bucketed + by month) in Python. Same pattern as _aggregate_single_metric. + """ + # === HOURLY (last 24h) === + for row in MetricsQueryService.get_llm_metrics_combined( + org_id, + hourly_start, + end_date, + granularity=Granularity.HOUR, + ): + ts_str = _truncate_to_hour(row["period"]).isoformat() + for field, (metric_name, metric_type) in llm_combined_fields.items(): + key = (org_id, ts_str, metric_name, "default", "") + _upsert_agg(hourly_agg, key, metric_type, row[field] or 0) + + # === DAILY + MONTHLY (single query from monthly_start) === + for row in MetricsQueryService.get_llm_metrics_combined( + org_id, + monthly_start, + end_date, + granularity=Granularity.DAY, + ): + day_ts = _truncate_to_day(row["period"]) + month_key = _truncate_to_month(row["period"]).date().isoformat() + + for field, (metric_name, metric_type) in llm_combined_fields.items(): + value = row[field] or 0 + + if day_ts >= daily_start: + key = (org_id, day_ts.date().isoformat(), metric_name, "default", "") + _upsert_agg(daily_agg, key, metric_type, value) + + key = (org_id, month_key, metric_name, "default", "") + _upsert_agg(monthly_agg, key, metric_type, value) + + def _run_aggregation() -> dict[str, Any]: """Execute the actual aggregation logic. @@ -250,7 +407,7 @@ def _run_aggregation() -> dict[str, Any]: # - Daily: Last 7 days (ensures we capture late-arriving data) # - Monthly: Last 2 months (current + previous, ensures month transitions are captured) hourly_start = end_date - timedelta(hours=24) - daily_start = end_date - timedelta(days=7) + daily_start = _truncate_to_day(end_date - timedelta(days=7)) # Include previous month to handle month boundaries if end_date.month == 1: monthly_start = end_date.replace( @@ -268,25 +425,32 @@ def _run_aggregation() -> dict[str, Any]: ) # Metric definitions: (name, query_method, is_histogram) + # Note: llm_calls, challenges, summarization_calls, and llm_usage are + # handled separately via get_llm_metrics_combined (1 query instead of 4). metric_configs = [ ("documents_processed", MetricsQueryService.get_documents_processed, False), ("pages_processed", MetricsQueryService.get_pages_processed, True), - ("llm_calls", MetricsQueryService.get_llm_calls, False), - ("challenges", MetricsQueryService.get_challenges, False), - ("summarization_calls", MetricsQueryService.get_summarization_calls, False), ("deployed_api_requests", MetricsQueryService.get_deployed_api_requests, False), ( "etl_pipeline_executions", MetricsQueryService.get_etl_pipeline_executions, False, ), - ("llm_usage", MetricsQueryService.get_llm_usage_cost, True), ("prompt_executions", MetricsQueryService.get_prompt_executions, False), ("failed_pages", MetricsQueryService.get_failed_pages, True), ("hitl_reviews", MetricsQueryService.get_hitl_reviews, False), ("hitl_completions", MetricsQueryService.get_hitl_completions, False), ] + # LLM metrics combined via conditional aggregation (4 metrics in 1 query). + # Maps combined query field -> (metric_name, metric_type) + llm_combined_fields = { + "llm_calls": ("llm_calls", MetricType.COUNTER), + "challenges": ("challenges", MetricType.COUNTER), + "summarization_calls": ("summarization_calls", MetricType.COUNTER), + "llm_usage": ("llm_usage", MetricType.HISTOGRAM), + } + stats = { "hourly": {"upserted": 0}, "daily": {"upserted": 0}, @@ -296,10 +460,14 @@ def _run_aggregation() -> dict[str, Any]: } # Pre-filter to orgs with recent activity to reduce DB load. - # One lightweight query avoids running 36 queries per dormant org. + # Uses daily_start (7 days) instead of monthly_start (2 months) because: + # - Hourly/daily queries only need recent data (24h / 7d windows) + # - Monthly totals for dormant orgs were already written by previous + # runs when the org was active — re-running just overwrites same values + # - This avoids 28 queries per dormant org that had activity 2-8 weeks ago active_org_ids = set( WorkflowExecution.objects.filter( - created_at__gte=monthly_start, + created_at__gte=daily_start, ) .values_list("workflow__organization_id", flat=True) .distinct() @@ -322,10 +490,13 @@ def _run_aggregation() -> dict[str, Any]: "skipped_reason": "no_active_orgs", } - organizations = Organization.objects.filter(id__in=active_org_ids).only("id") + organizations = Organization.objects.filter(id__in=active_org_ids).only( + "id", "organization_id" + ) for org in organizations: org_id = str(org.id) + org_identifier = org.organization_id # Pre-resolved for PageUsage queries hourly_agg: dict[tuple, dict] = {} daily_agg: dict[tuple, dict] = {} monthly_agg: dict[tuple, dict] = {} @@ -334,87 +505,48 @@ def _run_aggregation() -> dict[str, Any]: for metric_name, query_method, is_histogram in metric_configs: metric_type = MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER + # Pass org_identifier to PageUsage-based metrics to + # avoid redundant Organization lookups per call. + extra_kwargs = {} + if metric_name == "pages_processed": + extra_kwargs["org_identifier"] = org_identifier + try: - # === HOURLY AGGREGATION (last 24 hours) === - hourly_results = query_method( - org_id, hourly_start, end_date, granularity=Granularity.HOUR - ) - for row in hourly_results: - period = row["period"] - value = row["value"] or 0 - hour_ts = _truncate_to_hour(period) - key = (org_id, hour_ts.isoformat(), metric_name, "default", "") - - if key not in hourly_agg: - hourly_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - hourly_agg[key]["value"] += value - hourly_agg[key]["count"] += 1 - - # === DAILY AGGREGATION (last 7 days) === - daily_results = query_method( - org_id, daily_start, end_date, granularity=Granularity.DAY + _aggregate_single_metric( + query_method, + metric_name, + metric_type, + org_id, + hourly_start, + daily_start, + monthly_start, + end_date, + hourly_agg, + daily_agg, + monthly_agg, + extra_kwargs, ) - for row in daily_results: - period = row["period"] - value = row["value"] or 0 - day_ts = _truncate_to_day(period) - key = ( - org_id, - day_ts.date().isoformat(), - metric_name, - "default", - "", - ) - - if key not in daily_agg: - daily_agg[key] = { - "metric_type": metric_type, - "value": 0, - "count": 0, - } - daily_agg[key]["value"] += value - daily_agg[key]["count"] += 1 - - # === MONTHLY AGGREGATION (last 2 months) === - monthly_results = query_method( - org_id, monthly_start, end_date, granularity=Granularity.DAY - ) - # Group results by month and create separate records - monthly_buckets: dict[str, dict] = {} - for row in monthly_results: - period = row["period"] - value = row["value"] or 0 - month_ts = _truncate_to_month(period) - month_key_str = month_ts.date().isoformat() - - if month_key_str not in monthly_buckets: - monthly_buckets[month_key_str] = {"value": 0, "count": 0} - monthly_buckets[month_key_str]["value"] += value - monthly_buckets[month_key_str]["count"] += 1 - - # Create aggregation entries for each month - for month_key_str, bucket in monthly_buckets.items(): - month_key = ( - org_id, - month_key_str, - metric_name, - "default", - "", - ) - monthly_agg[month_key] = { - "metric_type": metric_type, - "value": bucket["value"], - "count": bucket["count"] or 1, - } - except Exception: logger.exception("Error querying %s for org %s", metric_name, org_id) stats["errors"] += 1 + # Combined LLM metrics: 1 query per granularity instead of 4 + try: + _aggregate_llm_combined( + org_id, + hourly_start, + daily_start, + monthly_start, + end_date, + hourly_agg, + daily_agg, + monthly_agg, + llm_combined_fields, + ) + except Exception: + logger.exception("Error querying combined LLM metrics for org %s", org_id) + stats["errors"] += 1 + # Bulk upsert all three tiers (single INSERT...ON CONFLICT each) if hourly_agg: stats["hourly"]["upserted"] += _bulk_upsert_hourly(hourly_agg) diff --git a/backend/dashboard_metrics/views.py b/backend/dashboard_metrics/views.py index adb086dbfb..c01c56908a 100644 --- a/backend/dashboard_metrics/views.py +++ b/backend/dashboard_metrics/views.py @@ -45,6 +45,94 @@ # Bucket caching for better cache reuse across overlapping queries BUCKET_CACHE_ENABLED = settings.DASHBOARD_BUCKET_CACHE_ENABLED +# Metrics that use histogram type; everything else is a counter. +_HISTOGRAM_METRICS = frozenset({"llm_usage", "pages_processed", "failed_pages"}) + + +def _metric_type(name: str) -> str: + """Return the MetricType for a given metric name.""" + return MetricType.HISTOGRAM if name in _HISTOGRAM_METRICS else MetricType.COUNTER + + +def _build_series_entry( + metric_name: str, + data: list[dict], +) -> dict: + """Build a single series entry dict from metric query results.""" + return { + "metric_name": metric_name, + "metric_type": _metric_type(metric_name), + "data": [ + {"timestamp": r["period"].isoformat(), "value": r["value"] or 0} for r in data + ], + "total_value": sum(r["value"] or 0 for r in data), + } + + +def _build_error_entry(metric_name: str) -> dict: + """Build an error series entry for a failed metric.""" + return { + "metric_name": metric_name, + "error": "unavailable", + "data": [], + "total_value": 0, + } + + +def _fetch_live_series( + org_id: str, + start_date: datetime, + end_date: datetime, + granularity: str, + metric_queries: dict, + requested_metric: str | None, +) -> tuple[list[dict], list[str]]: + """Fetch all metric series (LLM combined + individual). + + Returns: + Tuple of (series list, error names list). + """ + series: list[dict] = [] + errors: list[str] = [] + llm_metric_keys = MetricsQueryService.LLM_METRIC_KEYS + + # Fetch all 4 LLM metrics in a single query + if not requested_metric or requested_metric in llm_metric_keys: + try: + llm_split = MetricsQueryService.get_llm_metrics_split( + org_id, + start_date, + end_date, + granularity, + ) + for name, data in llm_split.items(): + if not requested_metric or name == requested_metric: + series.append(_build_series_entry(name, data)) + except Exception: + logger.exception("Failed to fetch LLM metrics") + for name in llm_metric_keys: + if not requested_metric or name == requested_metric: + errors.append(name) + series.append(_build_error_entry(name)) + + # Filter non-LLM metrics if a specific metric was requested + if requested_metric: + metric_queries = { + k: v for k, v in metric_queries.items() if k == requested_metric + } + + for name, query_fn in metric_queries.items(): + try: + data = query_fn(org_id, start_date, end_date, granularity) + series.append(_build_series_entry(name, data)) + except Exception: + logger.exception("Failed to fetch metric %s", name) + errors.append(name) + series.append(_build_error_entry(name)) + + return series, errors + + # Thresholds for automatic source selection (in days) HOURLY_MAX_DAYS = 7 # Use hourly for ≤7 days DAILY_MAX_DAYS = 90 # Use daily for ≤90 days, monthly for >90 days @@ -715,9 +803,7 @@ def live_summary(self, request: Request) -> Response: summary_list = [ { "metric_name": name, - "metric_type": MetricType.HISTOGRAM - if name == "llm_usage" - else MetricType.COUNTER, + "metric_type": _metric_type(name), "total_value": value, "total_count": 1, # Not available from live queries "average_value": value, # Same as total for live data @@ -762,64 +848,25 @@ def live_series(self, request: Request) -> Response: org_id = str(organization.id) granularity = params.get("granularity", Granularity.DAY) - # Define metric query mapping + # Define metric query mapping (excludes LLM metrics handled below) metric_queries = { "documents_processed": MetricsQueryService.get_documents_processed, "pages_processed": MetricsQueryService.get_pages_processed, - "llm_calls": MetricsQueryService.get_llm_calls, - "challenges": MetricsQueryService.get_challenges, - "summarization_calls": MetricsQueryService.get_summarization_calls, "deployed_api_requests": MetricsQueryService.get_deployed_api_requests, "etl_pipeline_executions": MetricsQueryService.get_etl_pipeline_executions, - "llm_usage": MetricsQueryService.get_llm_usage_cost, "prompt_executions": MetricsQueryService.get_prompt_executions, "hitl_reviews": MetricsQueryService.get_hitl_reviews, "hitl_completions": MetricsQueryService.get_hitl_completions, } - # Filter by specific metric if requested - if params.get("metric_name"): - metric_queries = { - k: v for k, v in metric_queries.items() if k == params["metric_name"] - } - - series = [] - errors = [] - for metric_name, query_fn in metric_queries.items(): - try: - data = query_fn( - org_id, - params["start_date"], - params["end_date"], - granularity, - ) - series.append( - { - "metric_name": metric_name, - "metric_type": MetricType.HISTOGRAM - if metric_name == "llm_usage" - else MetricType.COUNTER, - "data": [ - { - "timestamp": r["period"].isoformat(), - "value": r["value"] or 0, - } - for r in data - ], - "total_value": sum(r["value"] or 0 for r in data), - } - ) - except Exception: - logger.exception("Failed to fetch metric %s", metric_name) - errors.append(metric_name) - series.append( - { - "metric_name": metric_name, - "error": "unavailable", - "data": [], - "total_value": 0, - } - ) + series, errors = _fetch_live_series( + org_id, + params["start_date"], + params["end_date"], + granularity, + metric_queries, + params.get("metric_name"), + ) response_data = { "start_date": params["start_date"].isoformat(),