Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
```python
from transformplan import TransformPlan, Col

# Build readable pipelines with 88 chainable operations
# Build readable pipelines with 89 chainable operations
plan = (
TransformPlan()
# Standardize column names
Expand Down
2 changes: 1 addition & 1 deletion docs/api/backends.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Backends

TransformPlan uses a pluggable backend system. Each backend implements the `Backend` ABC, providing all 88 operations plus meta methods for hashing, schema inspection, and type classification.
TransformPlan uses a pluggable backend system. Each backend implements the `Backend` ABC, providing all 89 operations plus meta methods for hashing, schema inspection, and type classification.

## Overview

Expand Down
2 changes: 2 additions & 0 deletions docs/api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ All TransformPlan operations at a glance. Click method names for detailed docume
| [`math_log`](ops/math.md) | Logarithmic transform |
| [`math_sqrt`](ops/math.md) | Square root transform |
| [`math_power`](ops/math.md) | Power transform |
| [`math_diff_from_agg`](ops/math.md) | Difference from a group aggregate (min, mean, etc.) |
| [`math_diff_lag`](ops/math.md) | Row-to-row difference using lag (numeric or datetime) |
| [`math_winsorize`](ops/math.md) | Clip values to percentiles or bounds |

### Row Operations
Expand Down
24 changes: 24 additions & 0 deletions docs/api/ops/math.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ plan = (
- math_cumsum
- math_rank
- math_diff_from_agg
- math_diff_lag
- math_standardize
- math_minmax
- math_robust_scale
Expand Down Expand Up @@ -162,6 +163,29 @@ plan = TransformPlan().math_diff_from_agg(
agg="max",
new_column="diff_from_max",
)

# Row-to-row difference (lag)
plan = TransformPlan().math_diff_lag(
column="timestamp",
order_by="timestamp",
new_column="time_between",
group_by="patient_id",
)

# Numeric change ordered by date
plan = TransformPlan().math_diff_lag(
column="price",
order_by="date",
new_column="daily_change",
)

# Lag of 2 rows
plan = TransformPlan().math_diff_lag(
column="value",
order_by="seq",
new_column="diff_2",
lag=2,
)
```

### Scaling Operations
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ print(df_result)

## Using the DuckDB Backend

TransformPlan supports DuckDB as an alternative backend. All 88 operations, validation, and dry-run work identically — the same plan works with both Polars DataFrames and DuckDB relations. Simply pass the backend at execution time:
TransformPlan supports DuckDB as an alternative backend. All 89 operations, validation, and dry-run work identically — the same plan works with both Polars DataFrames and DuckDB relations. Simply pass the backend at execution time:

```python
import duckdb
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ TransformPlan tracks transformation history, validates operations against DataFr
```python
from transformplan import TransformPlan, Col

# Build readable pipelines with 88 chainable operations
# Build readable pipelines with 89 chainable operations
plan = (
TransformPlan()
# Standardize column names
Expand Down
125 changes: 125 additions & 0 deletions tests/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,131 @@ def test_invalid_agg_raises(
)


class TestMathDiffLag:
"""Tests for math_diff_lag with DuckDB backend."""

def test_numeric_basic(
self, backend: DuckDBBackend, con: duckdb.DuckDBPyConnection
) -> None:
rel = con.sql(
"SELECT * FROM (VALUES (1, 10), (2, 30), (3, 35), (4, 50)) AS t(id, val)"
)
result, _ = (
TransformPlan()
.math_diff_lag("val", order_by="id", new_column="diff")
.process(rel, backend=backend)
)
vals = _col_values(result, "diff")
assert vals[0] is None
assert vals[1:] == [20, 5, 15]

def test_numeric_lag2(
self, backend: DuckDBBackend, con: duckdb.DuckDBPyConnection
) -> None:
rel = con.sql(
"SELECT * FROM (VALUES (1, 10), (2, 30), (3, 35), (4, 50)) AS t(id, val)"
)
result, _ = (
TransformPlan()
.math_diff_lag("val", order_by="id", new_column="diff", lag=2)
.process(rel, backend=backend)
)
vals = _col_values(result, "diff")
assert vals[0] is None
assert vals[1] is None
assert vals[2:] == [25, 20]

def test_grouped_numeric(
self, backend: DuckDBBackend, con: duckdb.DuckDBPyConnection
) -> None:
rel = con.sql(
"SELECT * FROM (VALUES "
"('A', 1, 10), ('A', 2, 30), ('A', 3, 35), "
"('B', 1, 100), ('B', 2, 150), ('B', 3, 160)"
") AS t(grp, seq, val)"
)
result, _ = (
TransformPlan()
.math_diff_lag("val", order_by="seq", new_column="diff", group_by="grp")
.rows_sort(["grp", "seq"])
.process(rel, backend=backend)
)
vals = _col_values(result, "diff")
assert vals == [None, 20, 5, None, 50, 10]

def test_datetime_column(
self, backend: DuckDBBackend, con: duckdb.DuckDBPyConnection
) -> None:
rel = con.sql(
"SELECT * FROM (VALUES "
"(1, TIMESTAMP '2024-01-01 00:00:00'), "
"(2, TIMESTAMP '2024-01-01 01:00:00'), "
"(3, TIMESTAMP '2024-01-01 03:00:00')"
") AS t(id, ts)"
)
result, _ = (
TransformPlan()
.math_diff_lag("ts", order_by="id", new_column="gap")
.process(rel, backend=backend)
)
vals = _col_values(result, "gap")
assert vals[0] is None
assert vals[1].total_seconds() == 3600
assert vals[2].total_seconds() == 7200

def test_datetime_grouped(
self, backend: DuckDBBackend, con: duckdb.DuckDBPyConnection
) -> None:
rel = con.sql(
"SELECT * FROM (VALUES "
"('A', TIMESTAMP '2024-01-01 00:00:00'), "
"('A', TIMESTAMP '2024-01-01 02:00:00'), "
"('B', TIMESTAMP '2024-01-01 10:00:00'), "
"('B', TIMESTAMP '2024-01-01 13:00:00')"
") AS t(patient, ts)"
)
result, _ = (
TransformPlan()
.math_diff_lag("ts", order_by="ts", new_column="gap", group_by="patient")
.rows_sort(["patient", "ts"])
.process(rel, backend=backend)
)
vals = _col_values(result, "gap")
assert vals[0] is None
assert vals[1].total_seconds() / 3600 == 2.0
assert vals[2] is None
assert vals[3].total_seconds() / 3600 == 3.0

def test_order_by_list(
self, backend: DuckDBBackend, con: duckdb.DuckDBPyConnection
) -> None:
rel = con.sql(
"SELECT * FROM (VALUES "
"(1, 1, 10), (1, 2, 20), (2, 1, 30), (2, 2, 40)"
") AS t(a, b, val)"
)
result, _ = (
TransformPlan()
.math_diff_lag("val", order_by=["a", "b"], new_column="diff")
.process(rel, backend=backend)
)
vals = _col_values(result, "diff")
assert vals == [None, 10, 10, 10]

def test_global_no_group(
self, backend: DuckDBBackend, con: duckdb.DuckDBPyConnection
) -> None:
rel = con.sql("SELECT * FROM (VALUES (3, 30), (1, 10), (2, 20)) AS t(seq, val)")
result, _ = (
TransformPlan()
.math_diff_lag("val", order_by="seq", new_column="diff")
.process(rel, backend=backend)
)
vals = _col_values(result, "diff")
assert vals[0] is None
assert vals[1:] == [10, 10]


class TestColExpr:
"""Tests for col_expr on DuckDB backend."""

Expand Down
164 changes: 164 additions & 0 deletions tests/test_math.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,167 @@ def test_serialization_roundtrip(self, numeric_df: pl.DataFrame) -> None:
result1, _ = plan.process(numeric_df)
result2, _ = restored.process(numeric_df)
assert result1["diff"].to_list() == result2["diff"].to_list()


class TestMathDiffLag:
"""Tests for math_diff_lag operation."""

def test_numeric_basic(self) -> None:
"""Test lag=1 on integers ordered by id; first row null."""
df = pl.DataFrame({"id": [1, 2, 3, 4], "val": [10, 30, 35, 50]})
plan = TransformPlan().math_diff_lag("val", order_by="id", new_column="diff")
result, _ = plan.process(df)
assert result["diff"].to_list() == [None, 20.0, 5.0, 15.0]

def test_numeric_lag2(self) -> None:
"""Test lag=2; first two rows null."""
df = pl.DataFrame({"id": [1, 2, 3, 4], "val": [10, 30, 35, 50]})
plan = TransformPlan().math_diff_lag(
"val", order_by="id", new_column="diff", lag=2
)
result, _ = plan.process(df)
assert result["diff"].to_list() == [None, None, 25.0, 20.0]

def test_grouped_numeric(self) -> None:
"""Test partition by group; nulls restart per group."""
df = pl.DataFrame(
{
"grp": ["A", "A", "A", "B", "B", "B"],
"seq": [1, 2, 3, 1, 2, 3],
"val": [10, 30, 35, 100, 150, 160],
}
)
plan = TransformPlan().math_diff_lag(
"val", order_by="seq", new_column="diff", group_by="grp"
)
result, _ = plan.process(df)
expected = [None, 20.0, 5.0, None, 50.0, 10.0]
assert result["diff"].to_list() == expected

def test_datetime_column(self) -> None:
"""Test datetime input produces duration output."""
df = pl.DataFrame(
{
"id": [1, 2, 3],
"ts": [
datetime(2024, 1, 1, 0, 0),
datetime(2024, 1, 1, 1, 0),
datetime(2024, 1, 1, 3, 0),
],
}
)
plan = TransformPlan().math_diff_lag("ts", order_by="id", new_column="gap")
result, _ = plan.process(df)
assert result["gap"].dtype == pl.Duration
vals = result["gap"].to_list()
assert vals[0] is None
assert vals[1].total_seconds() == 3600
assert vals[2].total_seconds() == 7200

def test_datetime_grouped(self) -> None:
"""Test primary use case: time between events per patient."""
df = pl.DataFrame(
{
"patient": ["A", "A", "B", "B"],
"ts": [
datetime(2024, 1, 1, 0, 0),
datetime(2024, 1, 1, 2, 0),
datetime(2024, 1, 1, 10, 0),
datetime(2024, 1, 1, 13, 0),
],
}
)
plan = TransformPlan().math_diff_lag(
"ts", order_by="ts", new_column="gap", group_by="patient"
)
result, _ = plan.process(df)
assert result["gap"].dtype == pl.Duration
vals = result["gap"].to_list()
assert vals[0] is None
assert vals[1].total_seconds() / 3600 == 2.0
assert vals[2] is None
assert vals[3].total_seconds() / 3600 == 3.0

def test_order_by_different_column(self) -> None:
"""Test diffing 'value' ordered by 'timestamp'."""
df = pl.DataFrame(
{
"ts": [
datetime(2024, 1, 1),
datetime(2024, 1, 2),
datetime(2024, 1, 3),
],
"val": [100, 130, 125],
}
)
plan = TransformPlan().math_diff_lag("val", order_by="ts", new_column="change")
result, _ = plan.process(df)
assert result["change"].to_list() == [None, 30.0, -5.0]

def test_order_by_list(self) -> None:
"""Test multi-column order_by."""
df = pl.DataFrame(
{
"a": [1, 1, 2, 2],
"b": [1, 2, 1, 2],
"val": [10, 20, 30, 40],
}
)
plan = TransformPlan().math_diff_lag(
"val", order_by=["a", "b"], new_column="diff"
)
result, _ = plan.process(df)
assert result["diff"].to_list() == [None, 10.0, 10.0, 10.0]

def test_global_no_group(self) -> None:
"""Test no group_by, global ordering."""
df = pl.DataFrame({"seq": [3, 1, 2], "val": [30, 10, 20]})
plan = TransformPlan().math_diff_lag("val", order_by="seq", new_column="diff")
result, _ = plan.process(df)
# After sorting by seq: [10, 20, 30], diffs: [None, 10, 10]
assert result["diff"].to_list() == [None, 10.0, 10.0]

def test_validation_nonexistent_column(self, numeric_df: pl.DataFrame) -> None:
"""Test validation catches non-existent column."""
plan = TransformPlan().math_diff_lag(
"nonexistent", order_by="a", new_column="diff"
)
result = plan.validate(numeric_df)
assert not result.is_valid
assert "does not exist" in str(result.errors[0])

def test_validation_wrong_type(self, basic_df: pl.DataFrame) -> None:
"""Test validation catches string column."""
plan = TransformPlan().math_diff_lag("name", order_by="id", new_column="diff")
result = plan.validate(basic_df)
assert not result.is_valid
assert "numeric or datetime" in str(result.errors[0])

def test_validation_missing_order_by(self, numeric_df: pl.DataFrame) -> None:
"""Test validation catches missing order_by column."""
plan = TransformPlan().math_diff_lag(
"a", order_by="nonexistent", new_column="diff"
)
result = plan.validate(numeric_df)
assert not result.is_valid
assert "Order-by" in str(result.errors[0])

def test_validation_missing_group_by(self, numeric_df: pl.DataFrame) -> None:
"""Test validation catches missing group_by column."""
plan = TransformPlan().math_diff_lag(
"a", order_by="a", new_column="diff", group_by="nonexistent"
)
result = plan.validate(numeric_df)
assert not result.is_valid
assert "Group-by" in str(result.errors[0])

def test_serialization_roundtrip(self, numeric_df: pl.DataFrame) -> None:
"""Test JSON serialization round-trip."""
plan = TransformPlan().math_diff_lag(
"a", order_by="a", new_column="diff", group_by="b", lag=2
)
json_str = plan.to_json()
restored = TransformPlan.from_json(json_str)
result1, _ = plan.process(numeric_df)
result2, _ = restored.process(numeric_df)
assert result1["diff"].to_list() == result2["diff"].to_list()
11 changes: 11 additions & 0 deletions transformplan/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ def math_diff_from_agg(
group_by: list[str] | None,
) -> Any: ...

@abstractmethod
def math_diff_lag(
self,
data: Any,
column: str,
order_by: list[str],
new_column: str,
group_by: list[str] | None,
lag: int,
) -> Any: ...

@abstractmethod
def math_standardize(
self,
Expand Down
Loading
Loading