diff --git a/.gitignore b/.gitignore index e56523b9..9ebb1f55 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ .env /frontend/.env +.venv/ +.venv*/ .pytest_cache __pycache__/ db.sqlite3 @@ -18,6 +20,8 @@ staticfiles/ coverage.txt photos/ csrf_test.py +tmp/ +.env # Elastic Beanstalk Files .elasticbeanstalk/* diff --git a/domains/etl/admin.py b/domains/etl/admin.py index 9a4ab67c..770d56a9 100644 --- a/domains/etl/admin.py +++ b/domains/etl/admin.py @@ -1,6 +1,13 @@ from django.urls import path from django.contrib import admin -from domains.etl.models import OrchestrationSystem, DataConnection, Task, TaskMapping, TaskMappingPath, TaskRun +from domains.etl.models import ( + OrchestrationSystem, + DataConnection, + Task, + TaskMapping, + TaskMappingPath, + TaskRun, +) from hydroserver.admin import VocabularyAdmin diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py new file mode 100644 index 00000000..cfe78394 --- /dev/null +++ b/domains/etl/etl_errors.py @@ -0,0 +1,551 @@ +from __future__ import annotations + +import ast +import json +import re +from typing import Any, Iterable, Optional + +from pydantic import ValidationError + + +class EtlUserFacingError(Exception): + """ + Exception intended to be shown to end users (TaskDetails "run message"). + + Keep this as a single readable string. Avoid structured payloads. + """ + + +_EXTRACTOR_ALIAS_MAP: dict[str, str] = { + "source_uri": "sourceUri", + "placeholder_variables": "placeholderVariables", + "run_time_value": "runTimeValue", +} + +_TRANSFORMER_ALIAS_MAP: dict[str, str] = { + "header_row": "headerRow", + "data_start_row": "dataStartRow", + "identifier_type": "identifierType", + "custom_format": "customFormat", + "timezone_mode": "timezoneMode", + "run_time_value": "runTimeValue", + "jmespath": "JMESPath", + "target_identifier": "targetIdentifier", + "source_identifier": "sourceIdentifier", + "data_transformations": "dataTransformations", + "rating_curve_url": "ratingCurveUrl", +} + + +def _alias(component: str, field: str) -> str: + if component == "extractor": + return _EXTRACTOR_ALIAS_MAP.get(field, field) + if component == "transformer": + return _TRANSFORMER_ALIAS_MAP.get(field, field) + return field + + +def _format_loc(component: str, loc: Iterable[Any]) -> str: + loc_list = list(loc) + # Strip pydantic union branch model names from the front. + if ( + component == "extractor" + and loc_list + and loc_list[0] + in ( + "HTTPExtractor", + "LocalFileExtractor", + ) + ): + loc_list = loc_list[1:] + if ( + component == "transformer" + and loc_list + and loc_list[0] + in ( + "JSONTransformer", + "CSVTransformer", + ) + ): + loc_list = loc_list[1:] + + parts: list[str] = [] + for item in loc_list: + if isinstance(item, int): + if not parts: + parts.append(f"[{item}]") + else: + parts[-1] = f"{parts[-1]}[{item}]" + continue + if isinstance(item, str): + parts.append(_alias(component, item)) + continue + parts.append(str(item)) + + if not parts: + return component + return ".".join([component] + parts) + + +def _jsonish(value: Any) -> str: + if value is None: + return "null" + if isinstance(value, str): + if value == "": + return '""' + return repr(value) + return repr(value) + + +def user_facing_error_from_validation_error( + component: str, + exc: ValidationError, + *, + raw: Optional[dict[str, Any]] = None, +) -> EtlUserFacingError: + """ + Convert pydantic's ValidationError into one readable, actionable sentence. + """ + errs = exc.errors(include_url=False) + + # Unions emit errors for every branch. Filter to the selected type when possible. + if raw and component in ("extractor", "transformer"): + raw_type = raw.get("type") + type_to_model = { + "extractor": {"HTTP": "HTTPExtractor", "local": "LocalFileExtractor"}, + "transformer": {"JSON": "JSONTransformer", "CSV": "CSVTransformer"}, + } + selected_model = type_to_model.get(component, {}).get(raw_type) + if selected_model: + errs = [ + e for e in errs if not e.get("loc") or e["loc"][0] == selected_model + ] or errs + + if not errs: + return EtlUserFacingError(f"Invalid {component} configuration.") + + first = errs[0] + loc = first.get("loc") or () + msg = first.get("msg") or "Invalid value" + inp = first.get("input", None) + + path = _format_loc(component, loc) + if component == "transformer" and isinstance(raw, dict): + ts = raw.get("timestamp") + if isinstance(ts, dict): + tz_mode = ts.get("timezoneMode") or ts.get("timezone_mode") + tz_val = ts.get("timezone") + if ( + path.endswith("transformer.timestamp.timezone") + and str(tz_mode) == "daylightSavings" + ): + if tz_val is None or str(tz_val).strip() == "": + return EtlUserFacingError( + "Timezone information is required when daylight savings mode is enabled. " + "Select a valid timezone such as America/Denver and try again." + ) + if "Invalid timezone" in str(msg): + return EtlUserFacingError( + "The configured timezone is not recognized. " + "Use a valid IANA timezone such as America/Denver and run the job again." + ) + + message = ( + f"Invalid {component} configuration at {path}: {msg} (got {_jsonish(inp)}). " + f"Update the Data Connection {component} settings." + ) + return EtlUserFacingError(message) + + +_MISSING_PER_TASK_VAR_RE = re.compile(r"Missing per-task variable '([^']+)'") +_MISSING_PLACEHOLDER_VAR_RE = re.compile(r"Missing placeholder variable: (.+)$") +_TIMESTAMP_COL_NOT_FOUND_RE = re.compile( + r"Timestamp column '([^']*)' not found in data\." +) + +_MISSING_REQUIRED_TASK_VAR_RE = re.compile( + r"Missing required per-task extractor variable '([^']+)'" +) +_MISSING_URI_PLACEHOLDER_RE = re.compile( + r"Extractor source URI contains a placeholder '([^']+)', but it was not provided" +) +_SOURCE_INDEX_OOR_RE = re.compile( + r"Source index (\d+) is out of range for extracted data\." +) +_SOURCE_COL_NOT_FOUND_RE = re.compile( + r"Source column '([^']+)' not found in extracted data\." +) +_USECOLS_NOT_FOUND_RE = re.compile( + r"columns expected but not found:\s*(\[[^\]]*\])", + re.IGNORECASE, +) + + +def _iter_exception_chain(exc: Exception) -> Iterable[Exception]: + seen: set[int] = set() + current: Optional[Exception] = exc + while current is not None and id(current) not in seen: + seen.add(id(current)) + yield current + next_exc = current.__cause__ or current.__context__ + current = next_exc if isinstance(next_exc, Exception) else None + + +def _extract_missing_usecols(exc: Exception) -> list[str]: + for err in _iter_exception_chain(exc): + msg = str(err) + match = _USECOLS_NOT_FOUND_RE.search(msg) + if not match: + continue + + raw_list = match.group(1) + try: + parsed = ast.literal_eval(raw_list) + if isinstance(parsed, (list, tuple, set)): + cols = [str(c).strip() for c in parsed if str(c).strip()] + if cols: + return cols + except Exception: + pass + + inner = raw_list.strip()[1:-1] + if inner: + cols = [part.strip().strip("'\"") for part in inner.split(",")] + cols = [c for c in cols if c] + if cols: + return cols + return [] + + +def _format_cols(cols: list[str], max_cols: int = 4) -> str: + shown = [f"'{c}'" for c in cols[:max_cols]] + if len(cols) > max_cols: + shown.append(f"+{len(cols) - max_cols} more") + return ", ".join(shown) + + +def user_facing_error_from_exception( + exc: Exception, + *, + transformer_raw: Optional[dict[str, Any]] = None, +) -> Optional[EtlUserFacingError]: + """ + Map common ETL/hydroserverpy exceptions to a single readable message. + """ + if isinstance(exc, EtlUserFacingError): + return exc + + if isinstance(exc, ValidationError): + return None + + if isinstance(exc, KeyError): + msg = exc.args[0] if exc.args and isinstance(exc.args[0], str) else str(exc) + m = _MISSING_PER_TASK_VAR_RE.search(msg) + if m: + name = m.group(1) + return EtlUserFacingError( + f"A required task variable named '{name}' was not provided. " + "Add a value for it in the task configuration and run the job again." + ) + m = _MISSING_PLACEHOLDER_VAR_RE.search(msg) + if m: + name = m.group(1).strip() + return EtlUserFacingError( + f"The extractor URL includes a placeholder '{name}', but no value was supplied. " + "Provide the missing value in the task variables." + ) + + msg_str = str(exc) + + if isinstance(exc, TypeError) and "JSONTransformer received None" in msg_str: + return EtlUserFacingError( + "The transformer did not receive any extracted data to parse. " + "Confirm the extractor is returning a valid JSON payload." + ) + + if isinstance(exc, TypeError) and "CSVTransformer received None" in msg_str: + return EtlUserFacingError( + "The transformer did not receive any extracted data to parse. " + "Confirm the extractor is returning a valid CSV payload." + ) + + if ( + ("NoneType" in msg_str or "nonetype" in msg_str.lower()) + and "string" in msg_str.lower() + and "assign" in msg_str.lower() + ): + return EtlUserFacingError( + "A required configuration value is null where a string is expected. " + "Provide the missing value in your ETL configuration JSON." + ) + + # django-ninja HttpError (avoid importing ninja here to keep module import-safe) + status = getattr(exc, "status_code", None) + if status is not None and exc.__class__.__name__ == "HttpError": + message = getattr(exc, "message", None) or msg_str + if "Datastream does not exist" in message: + return EtlUserFacingError( + "One or more destination datastream identifiers could not be found in HydroServer. " + "Update the task mappings to use valid datastream IDs." + ) + if status in (401, 403): + return EtlUserFacingError( + "HydroServer rejected the load due to authorization. " + "Confirm the target datastream(s) belong to this workspace and the job has permission to write." + ) + if status >= 400: + return EtlUserFacingError( + "HydroServer rejected some or all of the data. " + "Verify the transformed timestamps/values are valid and the target datastream mappings are correct." + ) + + if isinstance(exc, ValueError): + # Extractor placeholder/variable resolution + m = _MISSING_REQUIRED_TASK_VAR_RE.search(msg_str) + if m: + name = m.group(1) + return EtlUserFacingError( + f"A required task variable named '{name}' was not provided. " + "Add a value for it in the task configuration and run the job again." + ) + m = _MISSING_URI_PLACEHOLDER_RE.search(msg_str) + if m: + name = m.group(1) + return EtlUserFacingError( + f"The extractor URL includes a placeholder '{name}', but no value was supplied. " + "Provide the missing value in the task variables." + ) + + if "identifierType='index' requires timestamp.key" in msg_str: + return EtlUserFacingError( + "The timestamp column is set incorrectly. Index mode expects a 1-based column number (1 for the first column). " + "Update the timestamp setting to a valid column index." + ) + + if msg_str.startswith( + "One or more timestamps could not be read with the current settings" + ): + return EtlUserFacingError( + "One or more timestamps could not be read using the current format and timezone settings. " + "Confirm how dates appear in the source file and update the transformer configuration to match." + ) + + if ( + msg_str + == "One or more configured CSV columns were not found in the header row." + ): + missing_cols = _extract_missing_usecols(exc) + if len(missing_cols) > 1: + return EtlUserFacingError( + f"Configured CSV columns were not found in the file header ({_format_cols(missing_cols)}). " + "This often means the delimiter or headerRow setting is incorrect. " + "Verify the delimiter and headerRow settings, then run the job again." + ) + if len(missing_cols) == 1 and isinstance(transformer_raw, dict): + ts_cfg = transformer_raw.get("timestamp") + ts_key = ts_cfg.get("key") if isinstance(ts_cfg, dict) else None + if ts_key is not None and str(missing_cols[0]) == str(ts_key): + col = missing_cols[0] + return EtlUserFacingError( + f"The configured timestamp column '{col}' was not found in the file header. " + "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." + ) + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + msg_str + == "The header row contained unexpected values and could not be processed." + ): + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + msg_str + == "One or more data rows contained unexpected values and could not be processed." + ): + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + + if msg_str.startswith("Rating curve transformation is missing ratingCurveUrl"): + return EtlUserFacingError( + "A rating curve transformation is missing the rating curve URL. " + "Set a ratingCurveUrl for the mapping and run the job again." + ) + + if msg_str.startswith("Rating curve file not found at"): + return EtlUserFacingError( + "The configured rating curve file could not be found. " + "Verify the ratingCurveUrl is correct and points to an existing file." + ) + + if msg_str.startswith("Timed out while retrieving rating curve from"): + return EtlUserFacingError( + "The task timed out while trying to load the rating curve file. " + "Verify the file URL is reachable and run the job again." + ) + + if ( + msg_str.startswith("Could not connect to rating curve URL") + or msg_str.startswith("Failed to retrieve rating curve from") + or msg_str.startswith("Rating curve request to") + ): + return EtlUserFacingError( + "The task could not download the configured rating curve file. " + "Verify the file exists and can be reached, then run the job again." + ) + + if msg_str.startswith("Authentication failed while retrieving rating curve from"): + return EtlUserFacingError( + "The task is not authorized to access the configured rating curve file. " + "Use a rating curve file stored in HydroServer and run the job again." + ) + + if msg_str.startswith("Unable to read rating curve file from"): + return EtlUserFacingError( + "The configured rating curve file could not be read. " + "Replace it with a valid CSV file and run the job again." + ) + + if msg_str.startswith("Rating curve at") and "is empty" in msg_str: + return EtlUserFacingError( + "The configured rating curve file is empty. " + "Upload a valid rating curve file and run the job again." + ) + + if msg_str.startswith("Rating curve at") and "not a valid CSV file" in msg_str: + return EtlUserFacingError( + "The configured rating curve file is not a valid CSV file. " + "Upload a valid rating curve CSV using the standardized two-column format." + ) + + if msg_str.startswith("Rating curve at") and "at least two columns" in msg_str: + return EtlUserFacingError( + "The rating curve CSV must include two columns (input on the left, output on the right). " + "Update the file and run the job again." + ) + + if msg_str.startswith("Rating curve at") and "at least two numeric rows" in msg_str: + return EtlUserFacingError( + "The rating curve CSV must include at least two numeric rows. " + "Update the file and run the job again." + ) + + # JSON transformer common configuration errors + if msg_str == "The payload's expected fields were not found.": + return EtlUserFacingError( + "Failed to find the timestamp or value using the current JSON query. " + "Confirm the JMESPath expression matches the structure returned by the source." + ) + if ( + msg_str + == "The timestamp or value key could not be found with the specified query." + ): + return EtlUserFacingError( + "Failed to find the timestamp or value using the current JSON query. " + "Confirm the JMESPath expression matches the structure returned by the source." + ) + + m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg_str) + if m: + col = m.group(1) + return EtlUserFacingError( + f"The configured timestamp column '{col}' was not found in the file header. " + "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." + ) + + m = _SOURCE_INDEX_OOR_RE.search(msg_str) + if m: + idx = m.group(1) + return EtlUserFacingError( + f"A mapping source index ({idx}) is out of range for the extracted data. " + "Update task.mappings sourceIdentifier values (or switch identifierType) to match the extracted columns." + ) + + m = _SOURCE_COL_NOT_FOUND_RE.search(msg_str) + if m: + col = m.group(1) + return EtlUserFacingError( + f"A mapped field named '{col}' was not found in the extracted data. " + "Update the task mapping so the source identifier matches the JSON." + ) + + # JSON decode failures (usually extractor returned HTML/text instead of JSON) + if isinstance(exc, json.JSONDecodeError): + return EtlUserFacingError( + "The source did not return valid JSON. " + "Verify the URL points to a JSON endpoint." + ) + + if msg_str == "Could not connect to the source system.": + return EtlUserFacingError( + "Failed to connect to the source system. This may be temporary; try again shortly. " + "If it persists, the source system may be offline." + ) + + if msg_str == "The requested data could not be found on the source system.": + return EtlUserFacingError( + "The requested data could not be found on the source system. " + "Verify the URL is correct and that the file or endpoint still exists." + ) + + if msg_str.startswith("Authentication with the source system failed."): + return EtlUserFacingError( + "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " + "Update the credentials and try again." + ) + + if msg_str in ( + "The connection to the source worked but no observations were returned.", + ): + return EtlUserFacingError( + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range." + ) + + # Backward-compatible mappings for older hydroserverpy strings. + if msg_str == "The requested payload was not found on the source system.": + return EtlUserFacingError( + "The requested data could not be found on the source system. " + "Verify the URL is correct and that the file or endpoint still exists." + ) + + if msg_str == "The source system returned no data.": + return EtlUserFacingError( + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range." + ) + + if ( + msg_str + == "Authentication with the source system failed; credentials may be invalid or expired." + ): + return EtlUserFacingError( + "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " + "Update the credentials and try again." + ) + + if "jmespath.exceptions" in msg_str or "Parse error at column" in msg_str: + return EtlUserFacingError( + "The JSON query used to extract timestamps or values is invalid or returned unexpected data. " + "Review and correct the JMESPath expression." + ) + + if msg_str in ( + "The target datastream could not be found.", + "The target data series (datastream) could not be found.", + "The target datastream was not found.", + ): + return EtlUserFacingError( + "One or more destination datastream identifiers could not be found in HydroServer. " + "Update the task mappings to use valid datastream IDs." + ) + + return None diff --git a/domains/etl/loader.py b/domains/etl/loader.py index 78289608..88ad1361 100644 --- a/domains/etl/loader.py +++ b/domains/etl/loader.py @@ -1,10 +1,12 @@ from __future__ import annotations from uuid import UUID +from dataclasses import dataclass +from typing import Any from hydroserverpy.etl.loaders.base import Loader import logging import pandas as pd -from datetime import datetime +from datetime import datetime, timezone as dt_timezone from django.db.models import Min, Value from django.db.models.functions import Coalesce from domains.etl.models import Task @@ -15,6 +17,16 @@ observation_service = ObservationService() +@dataclass(frozen=True) +class LoadSummary: + cutoff: str + timestamps_total: int + timestamps_after_cutoff: int + observations_available: int + observations_loaded: int + datastreams_loaded: int + + class HydroServerInternalLoader(Loader): """ A class that extends the HydroServer client with ETL-specific functionalities. @@ -24,42 +36,66 @@ def __init__(self, task): self._begin_cache: dict[str, pd.Timestamp] = {} self.task = task - def load(self, data: pd.DataFrame, task: Task) -> None: + def load(self, data: pd.DataFrame, task: Task) -> LoadSummary: """ Load observations from a DataFrame to the HydroServer. """ - begin_date = self.earliest_begin_date(task) new_data = data[data["timestamp"] > begin_date] + + cutoff_value = ( + begin_date.isoformat() + if hasattr(begin_date, "isoformat") + else str(begin_date) + ) + timestamps_total = len(data) + timestamps_after_cutoff = len(new_data) + observations_available = 0 + observations_loaded = 0 + datastreams_loaded = 0 + for col in new_data.columns.difference(["timestamp"]): df = ( new_data[["timestamp", col]] .rename(columns={col: "value"}) .dropna(subset=["value"]) ) - if df.empty: - logging.warning(f"No new data for {col}, skipping.") + available = len(df) + observations_available += available + if available == 0: + logging.warning( + "No new observations for %s after filtering; skipping.", col + ) continue df = df.rename(columns={"timestamp": "phenomenonTime", "value": "result"}) + loaded = 0 # Chunked upload CHUNK_SIZE = 5000 total = len(df) + chunks = (total + CHUNK_SIZE - 1) // CHUNK_SIZE + logging.info( + "Uploading %s observation(s) to datastream %s (%s chunk(s), chunk_size=%s)", + total, + col, + chunks, + CHUNK_SIZE, + ) for start in range(0, total, CHUNK_SIZE): end = min(start + CHUNK_SIZE, total) chunk = df.iloc[start:end] - logging.info( - "Uploading %s rows (%s-%s) to datastream %s", - len(chunk), + logging.debug( + "Uploading chunk to datastream %s: rows %s-%s (%s rows)", + col, start, end - 1, - col, + len(chunk), ) chunk_data = ObservationBulkPostBody( fields=["phenomenonTime", "result"], - data=chunk.values.tolist() + data=chunk.values.tolist(), ) try: @@ -69,6 +105,7 @@ def load(self, data: pd.DataFrame, task: Task) -> None: datastream_id=UUID(col), mode="append", ) + loaded += len(chunk) except Exception as e: status = getattr(e, "status_code", None) or getattr( getattr(e, "response", None), "status_code", None @@ -83,17 +120,38 @@ def load(self, data: pd.DataFrame, task: Task) -> None: break raise + if loaded > 0: + datastreams_loaded += 1 + observations_loaded += loaded + + return LoadSummary( + cutoff=cutoff_value, + timestamps_total=timestamps_total, + timestamps_after_cutoff=timestamps_after_cutoff, + observations_available=observations_available, + observations_loaded=observations_loaded, + datastreams_loaded=datastreams_loaded, + ) + @staticmethod def _fetch_earliest_begin(task: Task) -> pd.Timestamp: - logging.info("Querying HydroServer for earliest begin date for payload...") - - return Datastream.objects.filter(id__in={ - path.target_identifier - for mapping in task.mappings.all() - for path in mapping.paths.all() - }).aggregate( - earliest_end=Coalesce(Min("phenomenon_end_time"), Value(datetime(1970, 1, 1))) - )["earliest_end"] + logging.info( + "Checking HydroServer for the most recent data already stored (so we only extract new observations)..." + ) + + return Datastream.objects.filter( + id__in={ + path.target_identifier + for mapping in task.mappings.all() + for path in mapping.paths.all() + } + ).aggregate( + earliest_end=Coalesce( + Min("phenomenon_end_time"), Value(datetime(1880, 1, 1, tzinfo=dt_timezone.utc)) + ) + )[ + "earliest_end" + ] def earliest_begin_date(self, task: Task) -> pd.Timestamp: """ diff --git a/domains/etl/models/run.py b/domains/etl/models/run.py index 8928fad2..3136d10d 100644 --- a/domains/etl/models/run.py +++ b/domains/etl/models/run.py @@ -1,6 +1,7 @@ import uuid6 from django.db import models from .task import Task +from domains.etl.run_result_normalizer import normalize_task_run_result, task_transformer_raw class TaskRun(models.Model): @@ -10,3 +11,12 @@ class TaskRun(models.Model): started_at = models.DateTimeField(auto_now_add=True) finished_at = models.DateTimeField(null=True, blank=True) result = models.JSONField(blank=True, null=True) + + def save(self, *args, **kwargs): + transformer_raw = task_transformer_raw(self.task) if self.task_id else None + self.result = normalize_task_run_result( + status=self.status, + result=self.result, + transformer_raw=transformer_raw, + ) + super().save(*args, **kwargs) diff --git a/domains/etl/run_result_normalizer.py b/domains/etl/run_result_normalizer.py new file mode 100644 index 00000000..f9f525af --- /dev/null +++ b/domains/etl/run_result_normalizer.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import re +from typing import Any, Optional + +from .etl_errors import user_facing_error_from_exception + + +_SUCCESS_LOAD_RE = re.compile( + r"^Load complete\.\s*(\d+)\s+rows were added to\s+(\d+)\s+datastreams?\.$" +) +_SUCCESS_LOADED_RE = re.compile( + r"^Loaded\s+(\d+)\s+total observations\s+(?:into|across)\s+(\d+)\s+datastream(?:s|\(s\))\.$" +) +_FAILURE_STAGE_PREFIX_RE = re.compile( + r"^(?:Setup failed|Failed during [^:]+):\s*", + re.IGNORECASE, +) + + +def task_transformer_raw(task: Any) -> Optional[dict[str, Any]]: + if task is None: + return None + + data_connection = getattr(task, "data_connection", None) + if data_connection is None: + return None + + raw_settings = getattr(data_connection, "transformer_settings", None) or {} + if not isinstance(raw_settings, dict): + return None + + raw: dict[str, Any] = dict(raw_settings) + transformer_type = getattr(data_connection, "transformer_type", None) + if transformer_type and "type" not in raw: + raw["type"] = transformer_type + return raw + + +def _format_loaded_success_message(loaded: int, ds_count: int) -> str: + preposition = "into" if ds_count == 1 else "across" + ds_word = "datastream" if ds_count == 1 else "datastreams" + return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." + + +def _extract_message(result: dict[str, Any]) -> Optional[str]: + for key in ("message", "summary", "error", "detail"): + val = result.get(key) + if isinstance(val, str) and val.strip(): + return val.strip() + return None + + +def _normalize_success_message(message: str) -> str: + m = _SUCCESS_LOAD_RE.match(message) + if m: + return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) + + m = _SUCCESS_LOADED_RE.match(message) + if m: + return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) + + if ( + message + == "Already up to date. No new observations were loaded because all timestamps in the source are older than what is already stored." + ): + return "Already up to date. No new observations were loaded." + + return message + + +def _normalize_failure_message( + message: str, + *, + transformer_raw: Optional[dict[str, Any]] = None, +) -> str: + candidate = _FAILURE_STAGE_PREFIX_RE.sub("", message).strip() + + if candidate.startswith("Error reading CSV data:"): + candidate = candidate.split("Error reading CSV data:", 1)[1].strip() + + mapped = user_facing_error_from_exception( + ValueError(candidate), + transformer_raw=transformer_raw, + ) + if mapped: + return str(mapped) + return candidate or message + + +def normalize_task_run_result( + *, + status: str, + result: Any, + transformer_raw: Optional[dict[str, Any]] = None, +) -> Any: + if result is None: + return None + + normalized: dict[str, Any] + if isinstance(result, dict): + normalized = dict(result) + else: + normalized = {"message": str(result)} + + message = _extract_message(normalized) + if not message: + return normalized + + if status == "SUCCESS": + normalized_message = _normalize_success_message(message) + elif status == "FAILURE": + normalized_message = _normalize_failure_message( + message, + transformer_raw=transformer_raw, + ) + else: + normalized_message = message + + normalized["message"] = normalized_message + summary = normalized.get("summary") + if not isinstance(summary, str) or not summary.strip() or summary.strip() == message: + normalized["summary"] = normalized_message + + return normalized diff --git a/domains/etl/services/run.py b/domains/etl/services/run.py index 012ffa11..757864a2 100644 --- a/domains/etl/services/run.py +++ b/domains/etl/services/run.py @@ -6,6 +6,10 @@ from django.contrib.auth import get_user_model from domains.iam.models import APIKey from domains.etl.models import TaskRun +from domains.etl.run_result_normalizer import ( + normalize_task_run_result, + task_transformer_raw, +) from interfaces.api.schemas import TaskRunFields, TaskRunPostBody, TaskRunPatchBody, TaskRunOrderByFields from interfaces.api.service import ServiceUtils from .task import TaskService @@ -82,11 +86,18 @@ def create( principal=principal, uid=task_id, action="edit", expand_related=True ) + task_run_data = data.dict(include=set(TaskRunFields.model_fields.keys())) + task_run_data["result"] = normalize_task_run_result( + status=task_run_data["status"], + result=task_run_data.get("result"), + transformer_raw=task_transformer_raw(task), + ) + try: task_run = TaskRun.objects.create( pk=data.id, task=task, - **data.dict(include=set(TaskRunFields.model_fields.keys())), + **task_run_data, ) except IntegrityError: raise HttpError(409, "The operation could not be completed due to a resource conflict.") @@ -121,6 +132,11 @@ def update( for field, value in task_run_data.items(): setattr(task_run, field, value) + task_run.result = normalize_task_run_result( + status=task_run.status, + result=task_run.result, + transformer_raw=task_transformer_raw(task), + ) task_run.save() return self.get( diff --git a/domains/etl/services/task.py b/domains/etl/services/task.py index c1959f79..784aed36 100644 --- a/domains/etl/services/task.py +++ b/domains/etl/services/task.py @@ -1,4 +1,5 @@ import uuid +from urllib.parse import parse_qs, urlparse from typing import List, Literal, Optional, get_args from datetime import datetime, timezone from croniter import croniter @@ -11,8 +12,19 @@ from django.conf import settings from django_celery_beat.models import PeriodicTask, CrontabSchedule, IntervalSchedule from domains.iam.models import APIKey -from domains.etl.models import Task, TaskMapping, TaskMappingPath, TaskRun -from interfaces.api.schemas import TaskFields, TaskPostBody, TaskPatchBody, TaskOrderByFields +from domains.etl.models import ( + Task, + TaskMapping, + TaskMappingPath, + TaskRun, +) +from domains.sta.models import ThingFileAttachment +from interfaces.api.schemas import ( + TaskFields, + TaskPostBody, + TaskPatchBody, + TaskOrderByFields, +) from domains.etl.tasks import run_etl_task from interfaces.api.service import ServiceUtils from .data_connection import DataConnectionService @@ -297,7 +309,10 @@ def create( raise HttpError(409, "The operation could not be completed due to a resource conflict.") task = self.update_scheduling(task, data.schedule.dict()) - task = self.update_mapping(task, [mapping.dict() for mapping in data.mappings] if data.mappings else None) + task = self.update_mapping( + task, + [mapping.dict() for mapping in data.mappings] if data.mappings else None, + ) task.save() return self.get( @@ -523,11 +538,122 @@ def update_scheduling(task: Task, schedule_data: dict | None = None): return task + @staticmethod + def _extract_rating_curve_url(transformation: dict) -> str: + url = transformation.get("ratingCurveUrl") + if isinstance(url, str) and url.strip(): + return url.strip() + + raise HttpError(400, "Rating curve transformations must define ratingCurveUrl") + + @staticmethod + def _normalize_transformation(transformation: dict) -> dict: + normalized = dict(transformation or {}) + transform_type = normalized.get("type") + + if transform_type == "rating_curve": + rating_curve_url = TaskService._extract_rating_curve_url(normalized) + normalized["ratingCurveUrl"] = rating_curve_url + + return normalized + + @staticmethod + def _thing_attachment_rating_curve_references( + workspace_id: uuid.UUID, + ) -> set[tuple[uuid.UUID, int, uuid.UUID]]: + return set( + ThingFileAttachment.objects.filter( + thing__workspace_id=workspace_id, + file_attachment_type="rating_curve", + ).values_list("thing_id", "id", "download_token") + ) + + @staticmethod + def _parse_thing_attachment_reference( + rating_curve_url: str, + ) -> tuple[uuid.UUID, int, uuid.UUID] | None: + parsed = urlparse(rating_curve_url) + path_segments = [segment for segment in parsed.path.split("/") if segment] + if len(path_segments) < 5: + return None + + if path_segments[-1] != "download": + return None + if path_segments[-3] != "file-attachments": + return None + if path_segments[-5] != "things": + return None + + try: + thing_id = uuid.UUID(path_segments[-4]) + except ValueError: + return None + + try: + attachment_id = int(path_segments[-2]) + except ValueError: + return None + + query = parse_qs(parsed.query) + token = (query.get("token") or [None])[0] + if not token: + return None + + try: + download_token = uuid.UUID(token) + except ValueError: + return None + + return thing_id, attachment_id, download_token + + @staticmethod + def _validate_rating_curve_transformation_references( + workspace_id: uuid.UUID, mapping_data: List[dict] + ): + valid_references = TaskService._thing_attachment_rating_curve_references( + workspace_id + ) + + for mapping in mapping_data: + for path in mapping.get("paths", []): + transformations = path.get("data_transformations", []) or [] + if not isinstance(transformations, list): + raise HttpError( + 400, + "Path data_transformations must be an array of transformation objects", + ) + + normalized_transformations = [] + for transformation in transformations: + if not isinstance(transformation, dict): + raise HttpError(400, "Invalid data transformation payload") + + normalized = TaskService._normalize_transformation(transformation) + + if normalized.get("type") == "rating_curve": + rating_curve_url = TaskService._extract_rating_curve_url(normalized) + reference = TaskService._parse_thing_attachment_reference( + rating_curve_url + ) + if not reference or reference not in valid_references: + raise HttpError( + 400, + "ratingCurveUrl must reference an existing thing rating curve attachment in the task workspace", + ) + + normalized_transformations.append(normalized) + + path["data_transformations"] = normalized_transformations + @staticmethod def update_mapping(task: Task, mapping_data: List[dict] | None = None): if mapping_data is None: return task + TaskService._validate_rating_curve_transformation_references( + workspace_id=task.workspace_id, mapping_data=mapping_data + ) + task.mappings.all().delete() for mapping in mapping_data: @@ -538,7 +664,7 @@ def update_mapping(task: Task, mapping_data: List[dict] | None = None): TaskMappingPath.objects.create( task_mapping=task_mapping, target_identifier=path["target_identifier"], - data_transformations=path.get("data_transformations", {}), + data_transformations=path.get("data_transformations", []), ) return task diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 2de705ac..cddbc466 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -1,67 +1,464 @@ import logging -import pandas as pd +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone as dt_timezone +import os +from typing import Any, Optional from uuid import UUID -from datetime import timedelta +import pandas as pd from celery import shared_task -from pydantic import TypeAdapter +from pydantic import TypeAdapter, ValidationError from celery.signals import task_prerun, task_success, task_failure, task_postrun from django.utils import timezone from django.db.utils import IntegrityError from django.core.management import call_command from domains.etl.models import Task, TaskRun -from .loader import HydroServerInternalLoader +from .loader import HydroServerInternalLoader, LoadSummary +from .etl_errors import ( + EtlUserFacingError, + user_facing_error_from_exception, + user_facing_error_from_validation_error, +) +from .run_result_normalizer import normalize_task_run_result, task_transformer_raw from hydroserverpy.etl.factories import extractor_factory, transformer_factory -from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, SourceTargetMapping, MappingPath +from hydroserverpy.etl.etl_configuration import ( + ExtractorConfig, + TransformerConfig, + SourceTargetMapping, + MappingPath, +) + + +@dataclass +class TaskRunContext: + stage: str = "setup" + runtime_source_uri: Optional[str] = None + log_handler: Optional["TaskLogHandler"] = None + task_meta: dict[str, Any] = field(default_factory=dict) + + +class TaskLogFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + path = (record.pathname or "").replace("\\", "/") + return "/hydroserverpy/" in path or "/domains/etl/" in path + + +class TaskLogHandler(logging.Handler): + def __init__(self, context: TaskRunContext): + super().__init__(level=logging.INFO) + self.context = context + self.lines: list[str] = [] + self.entries: list[dict[str, Any]] = [] + self._formatter = logging.Formatter() + + def emit(self, record: logging.LogRecord) -> None: + if not self.filter(record): + return + + message = record.getMessage() + + timestamp = datetime.fromtimestamp( + record.created, tz=dt_timezone.utc + ).isoformat() + line = f"{timestamp} {record.levelname:<8} {message}" + if record.exc_info: + line = f"{line}\n{self._formatter.formatException(record.exc_info)}" + self.lines.append(line) + + entry: dict[str, Any] = { + "timestamp": timestamp, + "level": record.levelname, + "logger": record.name, + "message": message, + "pathname": record.pathname, + "lineno": record.lineno, + } + if record.exc_info: + entry["exception"] = self._formatter.formatException(record.exc_info) + self.entries.append(entry) + + self._capture_runtime_uri(message) + + def _capture_runtime_uri(self, message: str) -> None: + if self.context.runtime_source_uri: + return + if "Resolved runtime source URI:" in message: + self.context.runtime_source_uri = message.split( + "Resolved runtime source URI:", 1 + )[1].strip() + return + if "Requesting data from" in message: + if "→" in message: + self.context.runtime_source_uri = message.split("→", 1)[1].strip() + return + if "from" in message: + self.context.runtime_source_uri = message.split("from", 1)[1].strip() + + def as_text(self) -> str: + return "\n".join(self.lines).strip() + + +TASK_RUN_CONTEXT: dict[str, TaskRunContext] = {} + + +@contextmanager +def capture_task_logs(context: TaskRunContext): + logger = logging.getLogger() + handler = TaskLogHandler(context) + handler.addFilter(TaskLogFilter()) + context.log_handler = handler + + previous_level = logger.level + if previous_level > logging.INFO: + logger.setLevel(logging.INFO) + + logger.addHandler(handler) + try: + yield handler + finally: + logger.removeHandler(handler) + if previous_level > logging.INFO: + logger.setLevel(previous_level) + + +def _is_empty(data: Any) -> bool: + if data is None: + return True + if isinstance(data, pd.DataFrame) and data.empty: + return True + return False + + +def _describe_payload(data: Any) -> dict[str, Any]: + if isinstance(data, pd.DataFrame): + return { + "type": "DataFrame", + "rows": len(data), + "columns": len(data.columns), + } + info: dict[str, Any] = {"type": type(data).__name__} + if isinstance(data, (bytes, bytearray)): + info["bytes"] = len(data) + return info + # BytesIO and similar + try: + buf = getattr(data, "getbuffer", None) + if callable(buf): + info["bytes"] = len(data.getbuffer()) + return info + except Exception: + pass + # Real file handles + try: + fileno = getattr(data, "fileno", None) + if callable(fileno): + info["bytes"] = os.fstat(data.fileno()).st_size + return info + except Exception: + pass + return info + + +def _describe_transformed_data(data: Any) -> dict[str, Any]: + if not isinstance(data, pd.DataFrame): + return {"type": type(data).__name__} + datastreams = [col for col in data.columns if col != "timestamp"] + return { + "type": "DataFrame", + "rows": len(data), + "columns": len(data.columns), + "datastreams": len(datastreams), + } + +def _success_message(load: Optional[LoadSummary]) -> str: + if not load: + return "Load complete." + + loaded = load.observations_loaded + if loaded == 0: + if load.timestamps_total and load.timestamps_after_cutoff == 0: + return "Already up to date. No new observations were loaded." + # Otherwise, we don't have strong evidence for why nothing loaded beyond "no new observations". + return "No new observations were loaded." + + ds_count = load.datastreams_loaded or 0 + preposition = "into" if ds_count == 1 else "across" + ds_word = "datastream" if ds_count == 1 else "datastreams" + return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." + + +def _apply_runtime_uri_aliases(result: dict[str, Any], runtime_source_uri: str) -> None: + result.setdefault("runtime_source_uri", runtime_source_uri) + result.setdefault("runtimeSourceUri", runtime_source_uri) + result.setdefault("runtime_url", runtime_source_uri) + result.setdefault("runtimeUrl", runtime_source_uri) + + +def _apply_log_aliases(result: dict[str, Any]) -> None: + if "log_entries" in result and "logEntries" not in result: + result["logEntries"] = result["log_entries"] + + +def _merge_result_with_context( + result: dict[str, Any], context: Optional[TaskRunContext] +) -> dict[str, Any]: + if "summary" not in result and "message" in result: + result["summary"] = result["message"] + + if context: + if context.runtime_source_uri and not ( + result.get("runtime_source_uri") + or result.get("runtimeSourceUri") + or result.get("runtime_url") + or result.get("runtimeUrl") + ): + _apply_runtime_uri_aliases(result, context.runtime_source_uri) + + if context.log_handler: + if "logs" not in result: + logs_text = context.log_handler.as_text() + if logs_text: + result["logs"] = logs_text + if "log_entries" not in result and context.log_handler.entries: + result["log_entries"] = context.log_handler.entries + + _apply_log_aliases(result) + return result + + +def _build_task_result( + message: str, + context: Optional[TaskRunContext] = None, + *, + stage: Optional[str] = None, + traceback: Optional[str] = None, +) -> dict[str, Any]: + result: dict[str, Any] = {"message": message, "summary": message} + if stage: + result["stage"] = stage + if traceback: + result["traceback"] = traceback + + if context and context.runtime_source_uri: + _apply_runtime_uri_aliases(result, context.runtime_source_uri) + + if context and context.task_meta and "task" not in result: + result["task"] = context.task_meta + + if context and context.log_handler: + logs_text = context.log_handler.as_text() + if logs_text: + result["logs"] = logs_text + if context.log_handler.entries: + result["log_entries"] = context.log_handler.entries + + _apply_log_aliases(result) + return result + + +def _last_logged_error(context: Optional[TaskRunContext]) -> Optional[str]: + if not context or not context.log_handler or not context.log_handler.entries: + return None + for entry in reversed(context.log_handler.entries): + if entry.get("level") == "ERROR": + msg = entry.get("message") + if msg: + return msg + return None + + +def _mapped_csv_error_from_log(last_err: str) -> Optional[str]: + prefix = "Error reading CSV data:" + if not last_err.startswith(prefix): + return None + + detail = last_err[len(prefix) :].strip() + if detail == "One or more configured CSV columns were not found in the header row.": + return ( + "Configured CSV columns were not found in the file header. " + "This often means the delimiter or headerRow setting is incorrect. " + "Verify the delimiter and headerRow settings, then run the job again." + ) + if ( + detail + == "The header row contained unexpected values and could not be processed." + ): + return ( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + detail + == "One or more data rows contained unexpected values and could not be processed." + ): + return ( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + return None + + +def _validate_component_config( + component: str, adapter: TypeAdapter, raw: dict[str, Any] +): + try: + return adapter.validate_python(raw) + except ValidationError as ve: + raise user_facing_error_from_validation_error(component, ve, raw=raw) from ve -@shared_task(bind=True, expires=10) + +@shared_task(bind=True, expires=10, name="etl.tasks.run_etl_task") def run_etl_task(self, task_id: str): """ Runs a HydroServer ETL task based on the task configuration provided. """ - task = Task.objects.select_related( - "data_connection" - ).prefetch_related( - "mappings", "mappings__paths" - ).get(pk=UUID(task_id)) - - extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({ - "type": task.data_connection.extractor_type, - **task.data_connection.extractor_settings - })) - transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({ - "type": task.data_connection.transformer_type, - **task.data_connection.transformer_settings - })) - loader_cls = HydroServerInternalLoader(task) - - task_mappings = [ - SourceTargetMapping( - source_identifier=task_mapping.source_identifier, - paths=[ - MappingPath( - target_identifier=task_mapping_path.target_identifier, - data_transformations=task_mapping_path.data_transformations, - ) for task_mapping_path in task_mapping.paths.all() + task_run_id = self.request.id + context = TaskRunContext() + TASK_RUN_CONTEXT[task_run_id] = context + + with capture_task_logs(context): + try: + task = ( + Task.objects.select_related("data_connection") + .prefetch_related("mappings", "mappings__paths") + .get(pk=UUID(task_id)) + ) + + context.task_meta = { + "id": str(task.id), + "name": task.name, + "data_connection_id": str(task.data_connection_id), + "data_connection_name": task.data_connection.name, + } + + context.stage = "setup" + extractor_raw = { + "type": task.data_connection.extractor_type, + **(task.data_connection.extractor_settings or {}), + } + transformer_raw = { + "type": task.data_connection.transformer_type, + **(task.data_connection.transformer_settings or {}), + } + + timestamp_cfg = transformer_raw.get("timestamp") or {} + if isinstance(timestamp_cfg, dict): + tz_mode = timestamp_cfg.get("timezoneMode") + tz_value = timestamp_cfg.get("timezone") + if tz_mode == "daylightSavings" and not tz_value: + raise EtlUserFacingError( + "Timezone information is required when daylight savings mode is enabled. " + "Select a valid timezone such as America/Denver and try again." + ) + + extractor_cfg = _validate_component_config( + "extractor", TypeAdapter(ExtractorConfig), extractor_raw + ) + transformer_cfg = _validate_component_config( + "transformer", TypeAdapter(TransformerConfig), transformer_raw + ) + + extractor_cls = extractor_factory(extractor_cfg) + transformer_cls = transformer_factory(transformer_cfg) + loader_cls = HydroServerInternalLoader(task) + + task_mappings = [ + SourceTargetMapping( + source_identifier=task_mapping.source_identifier, + paths=[ + MappingPath( + target_identifier=task_mapping_path.target_identifier, + data_transformations=task_mapping_path.data_transformations, + ) + for task_mapping_path in task_mapping.paths.all() + ], + ) + for task_mapping in task.mappings.all() ] - ) for task_mapping in task.mappings.all() - ] - - logging.info("Starting extract") - data = extractor_cls.extract(task, loader_cls) - if data is None or (isinstance(data, pd.DataFrame) and data.empty): - return {"message": f"No data returned from the extractor for task: {str(task.id)}"} - - logging.info("Starting transform") - data = transformer_cls.transform(data, task_mappings) - if data is None or (isinstance(data, pd.DataFrame) and data.empty): - return {"message": f"No data returned from the transformer for task: {str(task.id)}"} - logging.info("Starting load") - loader_cls.load(data, task) - - return {"message": f"Finished processing task: {str(task.id)}"} + context.stage = "extract" + logging.info("Starting extract") + data = extractor_cls.extract(task, loader_cls) + context.runtime_source_uri = ( + getattr(extractor_cls, "runtime_source_uri", None) + or context.runtime_source_uri + ) + extract_summary = _describe_payload(data) + logging.info("Extractor returned payload: %s", extract_summary) + if _is_empty(data): + if task.data_connection.extractor_type == "HTTP": + return _build_task_result( + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range.", + context, + stage=context.stage, + ) + return _build_task_result( + "The extractor returned no observations. Nothing to load.", + context, + stage=context.stage, + ) + + context.stage = "transform" + logging.info("Starting transform") + data = transformer_cls.transform(data, task_mappings) + transform_summary = _describe_transformed_data(data) + logging.info("Transform result: %s", transform_summary) + if isinstance(data, pd.DataFrame) and "timestamp" in data.columns: + bad = data["timestamp"].isna().sum() + if bad: + raise EtlUserFacingError( + "One or more timestamps could not be read using the current format and timezone settings. " + "Confirm how dates appear in the source file and update the transformer configuration to match." + ) + if _is_empty(data): + # hydroserverpy's CSVTransformer returns None on read errors (but logs ERROR). + # Treat that as a failure to avoid misleading "produced no rows" messaging. + last_err = _last_logged_error(context) + if last_err and last_err.startswith("Error reading CSV data:"): + mapped_csv_error = _mapped_csv_error_from_log(last_err) + if mapped_csv_error: + raise EtlUserFacingError(mapped_csv_error) + raise EtlUserFacingError( + f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType settings " + "and confirm the upstream CSV columns match your task mappings." + ) + return _build_task_result( + "Transform produced no rows. Nothing to load.", + context, + stage=context.stage, + ) + + context.stage = "load" + logging.info("Starting load") + load_summary = loader_cls.load(data, task) + logging.info( + "Load result: loaded=%s available=%s cutoff=%s", + getattr(load_summary, "observations_loaded", None), + getattr(load_summary, "observations_available", None), + getattr(load_summary, "cutoff", None), + ) + + return _build_task_result( + _success_message(load_summary), + context, + stage=context.stage, + ) + except Exception as e: + mapped = user_facing_error_from_exception( + e, transformer_raw=locals().get("transformer_raw") + ) + if mapped: + logging.error("%s", str(mapped)) + if mapped is e: + raise + raise mapped from e + logging.exception("ETL task failed during %s", context.stage) + raise @task_prerun.connect @@ -90,9 +487,7 @@ def update_next_run(sender, task_id, kwargs, **extra): return try: - task = Task.objects.select_related("periodic_task").get( - pk=kwargs["task_id"] - ) + task = Task.objects.select_related("periodic_task").get(pk=kwargs["task_id"]) except Task.DoesNotExist: return @@ -119,11 +514,27 @@ def mark_etl_task_success(sender, result, **extra): if sender != run_etl_task: return + context = TASK_RUN_CONTEXT.pop(sender.request.id, None) + try: task_run = TaskRun.objects.get(id=sender.request.id) except TaskRun.DoesNotExist: return + if not isinstance(result, dict): + result = {"message": str(result)} + + result = _merge_result_with_context(result, context) + if context and context.stage and "stage" not in result: + result["stage"] = context.stage + + transformer_raw = task_transformer_raw(task_run.task) + result = normalize_task_run_result( + status="SUCCESS", + result=result, + transformer_raw=transformer_raw, + ) + task_run.status = "SUCCESS" task_run.finished_at = timezone.now() task_run.result = result @@ -140,22 +551,42 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): if sender != run_etl_task: return + context = TASK_RUN_CONTEXT.pop(task_id, None) + try: task_run = TaskRun.objects.get(id=task_id) except TaskRun.DoesNotExist: return + stage = context.stage if context else None + mapped = user_facing_error_from_exception(exception) + if mapped: + # User-facing errors are already stage-aware and readable; don't prepend robotic prefixes. + message = str(mapped) + else: + if stage and stage.lower() == "setup": + message = f"Setup failed: {exception}" + else: + message = f"Failed during {stage}: {exception}" if stage else f"{exception}" + task_run.status = "FAILURE" task_run.finished_at = timezone.now() - task_run.result = { - "error": str(exception), - "traceback": einfo.traceback, - } + transformer_raw = task_transformer_raw(task_run.task) + task_run.result = normalize_task_run_result( + status="FAILURE", + result=_build_task_result( + message, + context, + stage=stage, + traceback=einfo.traceback, + ), + transformer_raw=transformer_raw, + ) task_run.save(update_fields=["status", "finished_at", "result"]) -@shared_task(bind=True, expires=10) +@shared_task(bind=True, expires=10, name="etl.tasks.cleanup_etl_task_runs") def cleanup_etl_task_runs(self, days=14): """ Celery task to run the cleanup_etl_task_runs management command. diff --git a/domains/sta/migrations/0006_thingfileattachment_rating_curve_fields.py b/domains/sta/migrations/0006_thingfileattachment_rating_curve_fields.py new file mode 100644 index 00000000..83fd8543 --- /dev/null +++ b/domains/sta/migrations/0006_thingfileattachment_rating_curve_fields.py @@ -0,0 +1,24 @@ +# Generated by Django 5.2.5 on 2026-02-13 + +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sta", "0005_remove_datastream_data_archives_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="thingfileattachment", + name="description", + field=models.TextField(blank=True, default=""), + ), + migrations.AddField( + model_name="thingfileattachment", + name="download_token", + field=models.UUIDField(db_index=True, default=uuid.uuid4, editable=False), + ), + ] diff --git a/domains/sta/models/thing.py b/domains/sta/models/thing.py index 423742d9..0bc3bc38 100644 --- a/domains/sta/models/thing.py +++ b/domains/sta/models/thing.py @@ -1,3 +1,4 @@ +import uuid import uuid6 import typing from typing import Literal, Optional, Union @@ -195,25 +196,35 @@ class ThingFileAttachment(models.Model, PermissionChecker): Thing, related_name="thing_file_attachments", on_delete=models.DO_NOTHING ) name = models.CharField(max_length=255) + description = models.TextField(default="", blank=True) file_attachment = models.FileField(upload_to=thing_file_attachment_storage_path) file_attachment_type = models.CharField(max_length=200) + download_token = models.UUIDField(default=uuid.uuid4, editable=False, db_index=True) def __str__(self): return f"{self.name} - {self.id}" @property def link(self): - storage = self.file_attachment.storage + if self.file_attachment_type != "rating_curve": + storage = self.file_attachment.storage - try: - file_attachment_link = storage.url(self.file_attachment.name, expire=3600) - except TypeError: - file_attachment_link = storage.url(self.file_attachment.name) + try: + file_attachment_link = storage.url(self.file_attachment.name, expire=3600) + except TypeError: + file_attachment_link = storage.url(self.file_attachment.name) - if settings.DEPLOYMENT_BACKEND == "local": - file_attachment_link = settings.PROXY_BASE_URL + file_attachment_link + if settings.DEPLOYMENT_BACKEND == "local": + file_attachment_link = settings.PROXY_BASE_URL + file_attachment_link - return file_attachment_link + return file_attachment_link + + base = getattr(settings, "PROXY_BASE_URL", "").rstrip("/") + path = ( + f"/api/data/things/{self.thing_id}/file-attachments/{self.id}/download" + ) + token_qs = f"?token={self.download_token}" + return f"{base}{path}{token_qs}" if base else f"{path}{token_qs}" class Meta: unique_together = ("thing", "name") diff --git a/domains/sta/services/thing.py b/domains/sta/services/thing.py index cddb68d2..4e22d8c0 100644 --- a/domains/sta/services/thing.py +++ b/domains/sta/services/thing.py @@ -1,5 +1,5 @@ import uuid -from typing import Optional, Literal, get_args +from typing import Optional, Literal, List, get_args from ninja.errors import HttpError from django.http import HttpResponse from django.contrib.auth import get_user_model @@ -24,6 +24,7 @@ TagPostBody, TagDeleteBody, FileAttachmentDeleteBody, + FileAttachmentPatchBody, ) from interfaces.api.schemas.thing import ThingFields, LocationFields, ThingOrderByFields from interfaces.api.service import ServiceUtils @@ -353,25 +354,127 @@ def remove_tag(self, principal: User | APIKey, uid: uuid.UUID, data: TagDeleteBo return f"{deleted_count} tag(s) deleted" - def get_file_attachments(self, principal: Optional[User | APIKey], uid: uuid.UUID): + @staticmethod + def _normalize_attachment_type(attachment_type: str) -> str: + normalized = (attachment_type or "").strip() + if not normalized: + raise HttpError(400, "File attachment type is required") + return normalized + + @staticmethod + def _normalize_name(name: str) -> str: + normalized = (name or "").strip() + if not normalized: + raise HttpError(400, "File attachment name is required") + return normalized + + def get_file_attachments( + self, + principal: Optional[User | APIKey], + uid: uuid.UUID, + attachment_types: Optional[List[str]] = None, + ): thing = self.get_thing_for_action(principal=principal, uid=uid, action="view") + queryset = thing.thing_file_attachments.all() + + if attachment_types: + normalized_types = [ + self._normalize_attachment_type(item) for item in attachment_types if item + ] + if normalized_types: + queryset = queryset.filter(file_attachment_type__in=normalized_types) + + return queryset.order_by("file_attachment_type", "name") + + def get_file_attachment_for_action( + self, + principal: Optional[User | APIKey], + uid: uuid.UUID, + attachment_id: int, + action: Literal["view", "edit", "delete"], + ) -> ThingFileAttachment: + thing = self.get_thing_for_action(principal=principal, uid=uid, action=action) - return thing.thing_file_attachments.all() + try: + return ThingFileAttachment.objects.get(pk=attachment_id, thing=thing) + except ThingFileAttachment.DoesNotExist: + raise HttpError(404, "File attachment does not exist") def add_file_attachment( - self, principal: User | APIKey, uid: uuid.UUID, file, file_attachment_type: str + self, + principal: User | APIKey, + uid: uuid.UUID, + file, + file_attachment_type: str, + name: Optional[str] = None, + description: Optional[str] = None, ): thing = self.get_thing_for_action(principal=principal, uid=uid, action="edit") + normalized_name = self._normalize_name(name or file.name) + normalized_type = self._normalize_attachment_type(file_attachment_type) + normalized_description = (description or "").strip() - if ThingFileAttachment.objects.filter(thing=thing, name=file.name).exists(): + if ThingFileAttachment.objects.filter(thing=thing, name=normalized_name).exists(): raise HttpError(400, "File attachment already exists") return ThingFileAttachment.objects.create( thing=thing, - name=file.name, + name=normalized_name, + description=normalized_description, file_attachment=file, - file_attachment_type=file_attachment_type, + file_attachment_type=normalized_type, + ) + + def update_file_attachment( + self, + principal: User | APIKey, + uid: uuid.UUID, + attachment_id: int, + data: FileAttachmentPatchBody, + ) -> ThingFileAttachment: + file_attachment = self.get_file_attachment_for_action( + principal=principal, uid=uid, attachment_id=attachment_id, action="edit" ) + body = data.dict(exclude_unset=True) + + if "name" in body: + new_name = self._normalize_name(body["name"]) + if ( + new_name != file_attachment.name + and ThingFileAttachment.objects.filter( + thing_id=file_attachment.thing_id, + name=new_name, + ).exists() + ): + raise HttpError(400, "File attachment already exists") + file_attachment.name = new_name + + if "description" in body: + file_attachment.description = (body["description"] or "").strip() + + file_attachment.save() + return file_attachment + + def replace_file_attachment( + self, + principal: User | APIKey, + uid: uuid.UUID, + attachment_id: int, + file, + ) -> ThingFileAttachment: + file_attachment = self.get_file_attachment_for_action( + principal=principal, uid=uid, attachment_id=attachment_id, action="edit" + ) + + old_name = file_attachment.file_attachment.name + file_attachment.file_attachment = file + file_attachment.save() + + new_name = file_attachment.file_attachment.name + if old_name and old_name != new_name: + file_attachment.file_attachment.storage.delete(old_name) + + return file_attachment def remove_file_attachment( self, principal: User | APIKey, uid: uuid.UUID, data: FileAttachmentDeleteBody @@ -379,9 +482,7 @@ def remove_file_attachment( thing = self.get_thing_for_action(principal=principal, uid=uid, action="edit") try: - file_attachment = ThingFileAttachment.objects.get( - thing=thing, name=data.name - ) + file_attachment = ThingFileAttachment.objects.get(thing=thing, name=data.name) except ThingFileAttachment.DoesNotExist: raise HttpError(404, "File attachment does not exist") @@ -390,6 +491,37 @@ def remove_file_attachment( return "File attachment deleted" + def delete_file_attachment( + self, principal: User | APIKey, uid: uuid.UUID, attachment_id: int + ): + file_attachment = self.get_file_attachment_for_action( + principal=principal, uid=uid, attachment_id=attachment_id, action="edit" + ) + file_attachment.file_attachment.delete(save=False) + file_attachment.delete() + + return "File attachment deleted" + + def get_file_attachment_for_download( + self, + principal: Optional[User | APIKey], + uid: uuid.UUID, + attachment_id: int, + token: Optional[str] = None, + ) -> ThingFileAttachment: + try: + file_attachment = ThingFileAttachment.objects.get( + pk=attachment_id, thing_id=uid + ) + except ThingFileAttachment.DoesNotExist: + raise HttpError(404, "File attachment does not exist") + + if token and token == str(file_attachment.download_token): + return file_attachment + + self.get_thing_for_action(principal=principal, uid=uid, action="view") + return file_attachment + def list_site_types( self, response: HttpResponse, diff --git a/hydroserver/settings.py b/hydroserver/settings.py index feaada43..1e8e3e00 100644 --- a/hydroserver/settings.py +++ b/hydroserver/settings.py @@ -349,11 +349,11 @@ STORAGES = { "default": { "BACKEND": "django.core.files.storage.FileSystemStorage", - "OPTIONS": {"location": "media"}, + "OPTIONS": {"location": str(BASE_DIR / "media")}, }, "staticfiles": { "BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage", - "OPTIONS": {"location": "static"}, + "OPTIONS": {"location": str(BASE_DIR / "static")}, }, } diff --git a/hydroserver/urls.py b/hydroserver/urls.py index 0f3fa616..5b129b3a 100644 --- a/hydroserver/urls.py +++ b/hydroserver/urls.py @@ -2,6 +2,7 @@ from django.conf.urls.static import static from django.contrib import admin from django.urls import path, re_path, include +from django.views.static import serve from interfaces.web.views import index @@ -24,3 +25,19 @@ settings.MEDIA_URL, document_root=settings.STORAGES["default"]["OPTIONS"]["location"], ) + +# In local/dev environments we want file attachments to remain accessible from the +# Django process even when DEBUG is false. +if settings.DEPLOYMENT_BACKEND in {"dev", "local"} and not settings.DEBUG: + urlpatterns += [ + re_path( + r"^media/(?P.*)$", + serve, + {"document_root": settings.STORAGES["default"]["OPTIONS"]["location"]}, + ), + re_path( + r"^static/(?P.*)$", + serve, + {"document_root": settings.STORAGES["staticfiles"]["OPTIONS"]["location"]}, + ), + ] diff --git a/interfaces/api/schemas/__init__.py b/interfaces/api/schemas/__init__.py index e1fcd988..e7ae1e8a 100644 --- a/interfaces/api/schemas/__init__.py +++ b/interfaces/api/schemas/__init__.py @@ -91,10 +91,12 @@ ObservationBulkDeleteBody, ) from .attachment import ( + FileAttachmentQueryParameters, TagGetResponse, TagPostBody, TagDeleteBody, FileAttachmentDeleteBody, + FileAttachmentPatchBody, FileAttachmentGetResponse, ) diff --git a/interfaces/api/schemas/attachment.py b/interfaces/api/schemas/attachment.py index 43625323..9a9cc929 100644 --- a/interfaces/api/schemas/attachment.py +++ b/interfaces/api/schemas/attachment.py @@ -1,5 +1,15 @@ from typing import Optional -from interfaces.api.schemas import BaseGetResponse, BasePostBody +from ninja import Query +from interfaces.api.schemas import ( + BaseGetResponse, + BasePostBody, + BasePatchBody, + CollectionQueryParameters, +) + + +class FileAttachmentQueryParameters(CollectionQueryParameters): + type: list[str] = Query([], description="Filter by file attachment type.") class TagGetResponse(BaseGetResponse): @@ -18,10 +28,17 @@ class TagDeleteBody(BasePostBody): class FileAttachmentGetResponse(BaseGetResponse): + id: int name: str + description: str link: str file_attachment_type: str class FileAttachmentDeleteBody(BasePostBody): name: str + + +class FileAttachmentPatchBody(BasePatchBody): + name: Optional[str] = None + description: Optional[str] = None diff --git a/interfaces/api/schemas/task.py b/interfaces/api/schemas/task.py index 810c8945..5c61e067 100644 --- a/interfaces/api/schemas/task.py +++ b/interfaces/api/schemas/task.py @@ -126,7 +126,7 @@ class TaskSchedulePatchBody(BasePatchBody, TaskScheduleFields): class TaskMappingPathFields(Schema): target_identifier: str - data_transformations: dict[str, Any] | list[Any] = Field(default_factory=dict) + data_transformations: list[Any] = Field(default_factory=list) class TaskMappingPathResponse(BaseGetResponse, TaskMappingPathFields): diff --git a/interfaces/api/views/thing.py b/interfaces/api/views/thing.py index 0e75ed7e..148f2c18 100644 --- a/interfaces/api/views/thing.py +++ b/interfaces/api/views/thing.py @@ -3,7 +3,7 @@ from ninja import Router, Path, Query, File, Form from ninja.files import UploadedFile from django.db import transaction -from django.http import HttpResponse +from django.http import HttpResponse, FileResponse from interfaces.http.auth import bearer_auth, session_auth, apikey_auth, anonymous_auth from interfaces.http.request import HydroServerHttpRequest from interfaces.api.schemas import VocabularyQueryParameters @@ -16,7 +16,9 @@ TagGetResponse, TagPostBody, TagDeleteBody, + FileAttachmentQueryParameters, FileAttachmentGetResponse, + FileAttachmentPatchBody, FileAttachmentDeleteBody, ) from domains.sta.services import ThingService @@ -343,7 +345,9 @@ def remove_thing_tag( by_alias=True, ) def get_thing_file_attachments( - request: HydroServerHttpRequest, thing_id: Path[uuid.UUID] + request: HydroServerHttpRequest, + thing_id: Path[uuid.UUID], + query: Query[FileAttachmentQueryParameters], ): """ Get all file attachments associated with a Thing. @@ -352,6 +356,7 @@ def get_thing_file_attachments( return 200, thing_service.get_file_attachments( principal=request.principal, uid=thing_id, + attachment_types=query.type, ) @@ -372,6 +377,8 @@ def add_thing_file_attachment( request: HydroServerHttpRequest, thing_id: Path[uuid.UUID], file_attachment_type: str = Form(...), + name: Optional[str] = Form(None), + description: Optional[str] = Form(None), file: UploadedFile = File(...), ): """ @@ -383,6 +390,141 @@ def add_thing_file_attachment( uid=thing_id, file=file, file_attachment_type=file_attachment_type, + name=name, + description=description, + ) + + +@thing_router.patch( + "/{thing_id}/file-attachments/{attachment_id}", + auth=[session_auth, bearer_auth, apikey_auth], + response={ + 200: FileAttachmentGetResponse, + 400: str, + 401: str, + 403: str, + 404: str, + 422: str, + }, + by_alias=True, +) +def update_thing_file_attachment( + request: HydroServerHttpRequest, + thing_id: Path[uuid.UUID], + attachment_id: Path[int], + data: FileAttachmentPatchBody, +): + """ + Update a thing file attachment's metadata. + """ + + return 200, thing_service.update_file_attachment( + principal=request.principal, + uid=thing_id, + attachment_id=attachment_id, + data=data, + ) + + +def _replace_thing_file_attachment( + request: HydroServerHttpRequest, + thing_id: Path[uuid.UUID], + attachment_id: Path[int], + file: UploadedFile = File(...), +): + """ + Replace the file content for an existing thing file attachment. + """ + + return 200, thing_service.replace_file_attachment( + principal=request.principal, + uid=thing_id, + attachment_id=attachment_id, + file=file, + ) + + +@thing_router.put( + "/{thing_id}/file-attachments/{attachment_id}/file", + auth=[session_auth, bearer_auth, apikey_auth], + response={ + 200: FileAttachmentGetResponse, + 400: str, + 401: str, + 403: str, + 404: str, + 413: str, + 422: str, + }, + by_alias=True, +) +def replace_thing_file_attachment_put( + request: HydroServerHttpRequest, + thing_id: Path[uuid.UUID], + attachment_id: Path[int], + file: UploadedFile = File(...), +): + return _replace_thing_file_attachment( + request=request, + thing_id=thing_id, + attachment_id=attachment_id, + file=file, + ) + + +@thing_router.post( + "/{thing_id}/file-attachments/{attachment_id}/file", + auth=[session_auth, bearer_auth, apikey_auth], + response={ + 200: FileAttachmentGetResponse, + 400: str, + 401: str, + 403: str, + 404: str, + 413: str, + 422: str, + }, + by_alias=True, +) +def replace_thing_file_attachment_post( + request: HydroServerHttpRequest, + thing_id: Path[uuid.UUID], + attachment_id: Path[int], + file: UploadedFile = File(...), +): + return _replace_thing_file_attachment( + request=request, + thing_id=thing_id, + attachment_id=attachment_id, + file=file, + ) + + +@thing_router.delete( + "/{thing_id}/file-attachments/{attachment_id}", + auth=[session_auth, bearer_auth, apikey_auth], + response={ + 204: None, + 400: str, + 401: str, + 403: str, + 404: str, + }, + by_alias=True, +) +def delete_thing_file_attachment( + request: HydroServerHttpRequest, + thing_id: Path[uuid.UUID], + attachment_id: Path[int], +): + """ + Remove a file attachment from a thing by id. + """ + + return 204, thing_service.delete_file_attachment( + principal=request.principal, + uid=thing_id, + attachment_id=attachment_id, ) @@ -412,3 +554,38 @@ def remove_thing_file_attachment( uid=thing_id, data=data, ) + + +@thing_router.get( + "/{thing_id}/file-attachments/{attachment_id}/download", + auth=[session_auth, bearer_auth, apikey_auth, anonymous_auth], + response={200: None, 401: str, 403: str, 404: str}, + by_alias=True, +) +def download_thing_file_attachment( + request: HydroServerHttpRequest, + thing_id: Path[uuid.UUID], + attachment_id: Path[int], + token: Optional[str] = None, +): + """ + Download a thing file attachment. + """ + + file_attachment = thing_service.get_file_attachment_for_download( + principal=request.principal, + uid=thing_id, + attachment_id=attachment_id, + token=token, + ) + + if not file_attachment.file_attachment: + return 404, "File attachment does not exist" + + file_obj = file_attachment.file_attachment.open("rb") + return FileResponse( + file_obj, + as_attachment=True, + filename=file_attachment.name, + content_type="application/octet-stream", + ) diff --git a/interfaces/sensorthings/schemas/sensor.py b/interfaces/sensorthings/schemas/sensor.py index e2fcd365..659937dc 100644 --- a/interfaces/sensorthings/schemas/sensor.py +++ b/interfaces/sensorthings/schemas/sensor.py @@ -24,6 +24,7 @@ class SensorModel(Schema): "application/pdf", "http://www.opengis.net/doc/IS/SensorML/2.0", "text/html", + "text/plain", "application/json", "text/plain", ] diff --git a/tests/etl/services/test_task_rating_curve_mapping.py b/tests/etl/services/test_task_rating_curve_mapping.py new file mode 100644 index 00000000..402781d0 --- /dev/null +++ b/tests/etl/services/test_task_rating_curve_mapping.py @@ -0,0 +1,132 @@ +import uuid + +import pytest +from django.core.files.uploadedfile import SimpleUploadedFile +from ninja.errors import HttpError + +from domains.sta.models import Thing, ThingFileAttachment +from domains.etl.services import TaskService +from interfaces.api.schemas import TaskPatchBody + + +task_service = TaskService() +TASK_ID = uuid.UUID("019adbc3-35e8-7f25-bc68-171fb66d446e") +WORKSPACE_ID = uuid.UUID("b27c51a0-7374-462d-8a53-d97d47176c10") + + +def create_workspace_thing() -> Thing: + return Thing.objects.create( + workspace_id=WORKSPACE_ID, + name="Test Thing", + description="", + sampling_feature_type="Site", + sampling_feature_code="TEST_THING", + site_type="Stream", + is_private=False, + ) + + +def create_thing_rating_curve( + thing: Thing, name: str = "Validation curve" +) -> ThingFileAttachment: + return ThingFileAttachment.objects.create( + thing=thing, + name=name, + description="", + file_attachment_type="rating_curve", + file_attachment=SimpleUploadedFile( + "validation_curve.csv", + b"input_value,output_value\n1.0,2.0\n2.0,4.0\n", + content_type="text/csv", + ), + ) + + +def test_update_task_accepts_thing_rating_curve_url(get_principal): + thing = create_workspace_thing() + rating_curve = create_thing_rating_curve(thing=thing) + rating_curve_url = rating_curve.link + + updated = task_service.update( + principal=get_principal("owner"), + uid=TASK_ID, + data=TaskPatchBody( + mappings=[ + { + "sourceIdentifier": "test_value", + "paths": [ + { + "targetIdentifier": "27c70b41-e845-40ea-8cc7-d1b40f89816b", + "dataTransformations": [ + { + "type": "rating_curve", + "ratingCurveUrl": rating_curve_url, + } + ], + } + ], + } + ] + ), + ) + + transform = updated["mappings"][0]["paths"][0]["data_transformations"][0] + assert transform["type"] == "rating_curve" + assert transform["ratingCurveUrl"] == rating_curve_url + + +def test_update_task_rejects_rating_curve_url_not_in_workspace_thing_attachments( + get_principal, +): + with pytest.raises(HttpError) as exc_info: + task_service.update( + principal=get_principal("owner"), + uid=TASK_ID, + data=TaskPatchBody( + mappings=[ + { + "sourceIdentifier": "test_value", + "paths": [ + { + "targetIdentifier": "27c70b41-e845-40ea-8cc7-d1b40f89816b", + "dataTransformations": [ + { + "type": "rating_curve", + "ratingCurveUrl": "https://example.com/not-allowed.csv", + } + ], + } + ], + } + ] + ), + ) + + assert exc_info.value.status_code == 400 + assert "thing rating curve attachment" in exc_info.value.message + + +def test_update_task_rejects_rating_curve_without_rating_curve_url(get_principal): + create_thing_rating_curve(thing=create_workspace_thing()) + + with pytest.raises(HttpError) as exc_info: + task_service.update( + principal=get_principal("owner"), + uid=TASK_ID, + data=TaskPatchBody( + mappings=[ + { + "sourceIdentifier": "test_value", + "paths": [ + { + "targetIdentifier": "27c70b41-e845-40ea-8cc7-d1b40f89816b", + "dataTransformations": [{"type": "rating_curve"}], + } + ], + } + ] + ), + ) + + assert exc_info.value.status_code == 400 + assert "ratingCurveUrl" in exc_info.value.message