Skip to content
71 changes: 64 additions & 7 deletions src/hydroserverpy/api/models/etl/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime, timedelta, timezone
from pydantic import Field, AliasPath, AliasChoices, TypeAdapter
from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory
from hydroserverpy.etl.loaders.hydroserver_loader import LoadSummary
from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath
from ..base import HydroServerBaseModel
from .orchestration_system import OrchestrationSystem
Expand Down Expand Up @@ -183,6 +184,7 @@ def run_local(self):

task_run = self.create_task_run(status="RUNNING", started_at=datetime.now(timezone.utc))

runtime_source_uri: Optional[str] = None
try:
logging.info("Starting extract")

Expand All @@ -199,25 +201,39 @@ def run_local(self):
]

data = extractor_cls.extract(self, loader_cls)
runtime_source_uri = getattr(extractor_cls, "runtime_source_uri", None)
if self.is_empty(data):
self._update_status(
loader_cls, True, "No data returned from the extractor"
task_run,
True,
"No data returned from the extractor",
runtime_source_uri=runtime_source_uri,
)
return

logging.info("Starting transform")
data = transformer_cls.transform(data, task_mappings)
if self.is_empty(data):
self._update_status(
loader_cls, True, "No data returned from the transformer"
task_run,
True,
"No data returned from the transformer",
runtime_source_uri=runtime_source_uri,
)
return

logging.info("Starting load")
loader_cls.load(data, self)
self._update_status(task_run, True, "OK")
load_summary = loader_cls.load(data, self)
self._update_status(
task_run,
True,
self._success_message(load_summary),
runtime_source_uri=runtime_source_uri,
)
except Exception as e:
self._update_status(task_run, False, str(e))
self._update_status(
task_run, False, str(e), runtime_source_uri=runtime_source_uri
)

@staticmethod
def is_empty(data):
Expand All @@ -229,15 +245,56 @@ def is_empty(data):

return False

def _update_status(self, task_run: TaskRun, success: bool, msg: str):
def _update_status(
self,
task_run: TaskRun,
success: bool,
msg: str,
runtime_source_uri: Optional[str] = None,
):
result = {"message": msg}
if runtime_source_uri:
result.update(
{
"runtimeSourceUri": runtime_source_uri,
"runtime_source_uri": runtime_source_uri,
"runtimeUrl": runtime_source_uri,
"runtime_url": runtime_source_uri,
}
)

self.update_task_run(
task_run.id,
status="SUCCESS" if success else "FAILURE",
result={"message": msg}
result=result
)
self.next_run_at = self._next_run()
self.save()

@staticmethod
def _success_message(load: Optional[LoadSummary]) -> str:
if not load:
return "OK"

loaded = load.observations_loaded
if loaded == 0:
if load.timestamps_total and load.timestamps_after_cutoff == 0:
if load.cutoff:
return (
"Already up to date - no new observations loaded "
f"(all timestamps were at or before {load.cutoff})."
)
return "Already up to date - no new observations loaded (all timestamps were at or before the cutoff)."
if load.observations_available == 0:
return "Already up to date - no new observations loaded."
return "No new observations were loaded."

if load.datastreams_loaded:
return (
f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)."
)
return f"Load completed successfully ({loaded} rows loaded)."

def _next_run(self) -> Optional[str]:
now = datetime.now(timezone.utc)
if cron := self.crontab:
Expand Down
29 changes: 16 additions & 13 deletions src/hydroserverpy/api/services/etl/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Literal, Union, Optional, List, Dict, Any, TYPE_CHECKING
from uuid import UUID
from datetime import datetime
from pydantic import Field
from hydroserverpy.api.models import DataConnection, Task, TaskRun, TaskMapping, OrchestrationSystem
from hydroserverpy.api.utils import normalize_uuid
from ..base import HydroServerBaseService
Expand Down Expand Up @@ -79,17 +78,17 @@ def create(
workspace: Union["Workspace", UUID, str],
data_connection: Union["DataConnection", UUID, str],
orchestration_system: Union["OrchestrationSystem", UUID, str],
extractor_variables: dict = Field(default_factory=dict),
transformer_variables: dict = Field(default_factory=dict),
loader_variables: dict = Field(default_factory=dict),
extractor_variables: Optional[dict] = None,
transformer_variables: Optional[dict] = None,
loader_variables: Optional[dict] = None,
paused: bool = False,
start_time: Optional[datetime] = None,
next_run_at: Optional[datetime] = None,
crontab: Optional[str] = None,
interval: Optional[int] = None,
interval_period: Optional[str] = None,
mappings: List[dict] = Field(default_factory=list),
uid: Optional[UUID] = None
mappings: Optional[List[dict]] = None,
uid: Optional[UUID] = None,
) -> "Task":
"""Create a new ETL task."""

Expand All @@ -99,19 +98,23 @@ def create(
"workspaceId": normalize_uuid(workspace),
"dataConnectionId": normalize_uuid(data_connection),
"orchestrationSystemId": normalize_uuid(orchestration_system),
"extractorVariables": extractor_variables,
"transformerVariables": transformer_variables,
"loaderVariables": loader_variables,
"schedule": {
"extractorVariables": extractor_variables or {},
"transformerVariables": transformer_variables or {},
"loaderVariables": loader_variables or {},
"mappings": mappings or [],
}

# Only include schedule if the caller provided scheduling information.
# Using Ellipsis here breaks JSON serialization.
if interval or crontab:
body["schedule"] = {
"paused": paused,
"startTime": start_time,
"nextRunAt": next_run_at,
"crontab": crontab,
"interval": interval,
"intervalPeriod": interval_period,
} if interval or crontab else ...,
"mappings": mappings if mappings else []
}
}

return super().create(**body)

Expand Down
2 changes: 1 addition & 1 deletion src/hydroserverpy/api/services/iam/workspace.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import TYPE_CHECKING, Union, List, Tuple, Optional
from typing import TYPE_CHECKING, Optional, Union, List, Tuple
from pydantic import EmailStr
from uuid import UUID
from datetime import datetime
Expand Down
65 changes: 65 additions & 0 deletions src/hydroserverpy/etl/STATES_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
## Possible Needs Attention states:

These are the most important end-user messages the ETL system can return for a task run
that needs user action.

### Configuration / Setup

- Invalid extractor configuration. Tell the user exactly which field is missing or invalid.
- Invalid transformer configuration. Tell the user exactly which field is missing or invalid.
- A required configuration value is missing.
- A required configuration value is null where a value is expected.
- Missing required per-task extractor variable "<name>".
- Extractor source URI contains a placeholder "<name>", but it was not provided.
- Task configuration is missing required daylight savings offset (when using daylightSavings mode).

### Data Source (Connectivity / Authentication)

- Could not connect to the source system.
- The source system did not respond before the timeout.
- Authentication with the source system failed; credentials may be invalid or expired.
- The requested payload was not found on the source system.
- The source system returned no data.

### Source Data Did Not Match The Task

- The source returned a format different from what this job expects.
- The payload's expected fields were not found.
- One or more timestamps could not be read with the current settings.
- This job references a resource that no longer exists.
- The file structure does not match the configuration.

For CSV:
- The header row contained unexpected values and could not be processed.
- One or more data rows contained unexpected values and could not be processed.
- Timestamp column "<key>" was not found in the extracted data.
- A mapping source index is out of range for the extracted data.
- A mapping source column was not found in the extracted data.

For JSON:
- The timestamp or value key could not be found with the specified query.
- Transformer did not receive any extracted data to parse.

### Targets / HydroServer

- HydroServer rejected some or all of the data.
- The target data series (datastream) could not be found.
- This may happen if the datastream was deleted or the mapping points to the wrong target.

### Unexpected System Error

- An internal system error occurred while processing the job.
- The job stopped before completion.

## Possible OK states:

These are the most important end-user messages the ETL system can return for a successful run.

- Load completed successfully.
- Load completed successfully (<n> rows loaded).
- Load completed successfully (<n> rows across <m> datastreams).
- Already up to date - no new observations loaded.
- No new observations were loaded.
- Already up to date - no new observations loaded (all timestamps were at or before <cutoff>).
- No data returned from the extractor. Nothing to load.
- Transform produced no rows. Nothing to load.
31 changes: 24 additions & 7 deletions src/hydroserverpy/etl/etl_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Annotated, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field, field_validator
from enum import Enum
from zoneinfo import ZoneInfo

WorkflowType = Literal["ETL", "Aggregation", "Virtual", "SDL"]
CSVDelimiterType = Literal[",", "|", "\t", ";", " "]
Expand Down Expand Up @@ -76,12 +77,28 @@ class Timestamp(BaseModel):

class Config:
populate_by_name = True
validate_default = True

@field_validator("timezone")
@field_validator("timezone", mode="after")
def check_timezone(cls, timezone_value, info):
mode = info.data.get("timezone_mode")
if mode == TimezoneMode.fixedOffset and timezone_value is None:
raise ValueError("`timezone` must be set when timezoneMode is fixedOffset")
if mode == TimezoneMode.fixedOffset:
if timezone_value is None:
raise ValueError(
"`timezone` must be set when timezoneMode is fixedOffset (e.g. '-0700')"
)
if mode == TimezoneMode.daylightSavings:
if timezone_value is None or str(timezone_value).strip() == "":
raise ValueError(
"Task configuration is missing required daylight savings offset (when using daylightSavings mode)."
)
# Validate it's a real IANA tz name early to avoid cryptic ZoneInfo errors later.
try:
ZoneInfo(str(timezone_value))
except Exception:
raise ValueError(
f"Invalid timezone {timezone_value!r}. Use an IANA timezone like 'America/Denver'."
)
return timezone_value


Expand Down Expand Up @@ -177,15 +194,15 @@ class Config:
populate_by_name = True


class LookupTableDataTransformation(BaseModel):
type: Literal["lookup"]
lookup_table_id: str = Field(..., alias="lookupTableId")
class RatingCurveDataTransformation(BaseModel):
type: Literal["rating_curve"]
rating_curve_url: str = Field(..., alias="ratingCurveUrl")

class Config:
populate_by_name = True


DataTransformation = Union[ExpressionDataTransformation, LookupTableDataTransformation]
DataTransformation = Union[ExpressionDataTransformation, RatingCurveDataTransformation]


class MappingPath(BaseModel):
Expand Down
Loading
Loading