Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 129 additions & 58 deletions backend/dashboard_metrics/management/commands/backfill_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import logging
from datetime import datetime, timedelta
from typing import Any

from account_v2.models import Organization
from django.core.management.base import BaseCommand
Expand Down Expand Up @@ -41,25 +42,30 @@ class Command(BaseCommand):
help = "Backfill metrics from source tables into aggregated tables"

# Metric configurations: (name, query_method, is_histogram)
# LLM metrics are handled separately via get_llm_metrics_split().
METRIC_CONFIGS = [
("documents_processed", MetricsQueryService.get_documents_processed, False),
("pages_processed", MetricsQueryService.get_pages_processed, True),
("llm_calls", MetricsQueryService.get_llm_calls, False),
("challenges", MetricsQueryService.get_challenges, False),
("summarization_calls", MetricsQueryService.get_summarization_calls, False),
("deployed_api_requests", MetricsQueryService.get_deployed_api_requests, False),
(
"etl_pipeline_executions",
MetricsQueryService.get_etl_pipeline_executions,
False,
),
("llm_usage", MetricsQueryService.get_llm_usage_cost, True),
("prompt_executions", MetricsQueryService.get_prompt_executions, False),
("failed_pages", MetricsQueryService.get_failed_pages, True),
("hitl_reviews", MetricsQueryService.get_hitl_reviews, False),
("hitl_completions", MetricsQueryService.get_hitl_completions, False),
]

# LLM metric names and whether they are histograms
LLM_METRIC_TYPES: dict[str, bool] = {
"llm_calls": False,
"challenges": False,
"summarization_calls": False,
"llm_usage": True,
}

def add_arguments(self, parser):
parser.add_argument(
"--days",
Expand Down Expand Up @@ -139,15 +145,32 @@ def handle(self, *args, **options):
"errors": 0,
}

# Pre-resolve org identifiers for PageUsage queries (avoids
# redundant Organization lookups inside the metric query loop).
org_identifiers = dict(
Organization.objects.filter(
id__in=[int(oid) if oid.isdigit() else oid for oid in org_ids]
).values_list("id", "organization_id")
)

for i, current_org_id in enumerate(org_ids):
self.stdout.write(
f"\n[{i + 1}/{len(org_ids)}] Processing org: {current_org_id}"
)

try:
# Resolve org string identifier for PageUsage queries
org_id_key = (
int(current_org_id) if current_org_id.isdigit() else current_org_id
)
org_identifier = org_identifiers.get(org_id_key)

# Collect all metric data for this org
hourly_data, daily_data, monthly_data = self._collect_metrics(
current_org_id, start_date, end_date
current_org_id,
start_date,
end_date,
org_identifier=org_identifier,
)

self.stdout.write(
Expand Down Expand Up @@ -245,73 +268,121 @@ def _resolve_org_ids(
return sorted(str(oid) for oid in all_org_ids)

def _collect_metrics(
self, org_id: str, start_date: datetime, end_date: datetime
self,
org_id: str,
start_date: datetime,
end_date: datetime,
org_identifier: str | None = None,
) -> tuple[dict, dict, dict]:
"""Collect metrics from source tables for all granularities."""
hourly_agg = {}
daily_agg = {}
monthly_agg = {}

def _ingest_results(
results: list[dict[str, Any]],
metric_name: str,
metric_type: str,
) -> None:
"""Ingest query results into hourly/daily/monthly aggregation dicts."""
for row in results:
period = row["period"]
value = row["value"] or 0
hour_ts = self._truncate_to_hour(period)
key = (org_id, hour_ts.isoformat(), metric_name, "default", "")

if key not in hourly_agg:
hourly_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
hourly_agg[key]["value"] += value
hourly_agg[key]["count"] += 1

def _ingest_daily_results(
results: list[dict[str, Any]],
metric_name: str,
metric_type: str,
) -> None:
"""Ingest daily query results into daily and monthly aggregation dicts."""
for row in results:
period = row["period"]
value = row["value"] or 0
day_date = period.date() if hasattr(period, "date") else period
key = (org_id, day_date.isoformat(), metric_name, "default", "")

if key not in daily_agg:
daily_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
daily_agg[key]["value"] += value
daily_agg[key]["count"] += 1

# Monthly aggregation from daily results
if hasattr(period, "date"):
month_date = period.replace(day=1).date()
else:
month_date = period.replace(day=1)
mkey = (org_id, month_date.isoformat(), metric_name, "default", "")

if mkey not in monthly_agg:
monthly_agg[mkey] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
monthly_agg[mkey]["value"] += value
monthly_agg[mkey]["count"] += 1

# Fetch all 4 LLM metrics in one query per granularity
try:
for granularity in (Granularity.HOUR, Granularity.DAY):
llm_split = MetricsQueryService.get_llm_metrics_split(
org_id, start_date, end_date, granularity
)
for metric_name, data in llm_split.items():
is_histogram = self.LLM_METRIC_TYPES[metric_name]
metric_type = (
MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER
)
if granularity == Granularity.HOUR:
_ingest_results(data, metric_name, metric_type)
else:
_ingest_daily_results(data, metric_name, metric_type)
except Exception as e:
logger.warning("Error querying LLM metrics for org %s: %s", org_id, e)

# Fetch remaining (non-LLM) metrics individually
for metric_name, query_method, is_histogram in self.METRIC_CONFIGS:
metric_type = MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER

# Pass org_identifier to PageUsage-based metrics to
# avoid redundant Organization lookups per call.
extra_kwargs: dict[str, Any] = {}
if metric_name == "pages_processed":
extra_kwargs["org_identifier"] = org_identifier

try:
# Query hourly data
hourly_results = query_method(
org_id, start_date, end_date, granularity=Granularity.HOUR
org_id,
start_date,
end_date,
granularity=Granularity.HOUR,
**extra_kwargs,
)
for row in hourly_results:
period = row["period"]
value = row["value"] or 0
hour_ts = self._truncate_to_hour(period)
key = (org_id, hour_ts.isoformat(), metric_name, "default", "")

if key not in hourly_agg:
hourly_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
hourly_agg[key]["value"] += value
hourly_agg[key]["count"] += 1

# Query daily data
_ingest_results(hourly_results, metric_name, metric_type)

daily_results = query_method(
org_id, start_date, end_date, granularity=Granularity.DAY
org_id,
start_date,
end_date,
granularity=Granularity.DAY,
**extra_kwargs,
)
for row in daily_results:
period = row["period"]
value = row["value"] or 0
day_date = period.date() if hasattr(period, "date") else period
key = (org_id, day_date.isoformat(), metric_name, "default", "")

if key not in daily_agg:
daily_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
daily_agg[key]["value"] += value
daily_agg[key]["count"] += 1

# Query for monthly (aggregate daily results by month)
for row in daily_results:
period = row["period"]
value = row["value"] or 0
if hasattr(period, "date"):
month_date = period.replace(day=1).date()
else:
month_date = period.replace(day=1)
key = (org_id, month_date.isoformat(), metric_name, "default", "")

if key not in monthly_agg:
monthly_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
monthly_agg[key]["value"] += value
monthly_agg[key]["count"] += 1
_ingest_daily_results(daily_results, metric_name, metric_type)

except Exception as e:
logger.warning("Error querying %s for org %s: %s", metric_name, org_id, e)
Expand Down
Loading
Loading