Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .flow/epics/fn-17.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"branch_name": "fn-17",
"created_at": "2026-01-19T01:17:41.162846Z",
"depends_on_epics": [],
"id": "fn-17",
"next_task": 1,
"plan_review_status": "unknown",
"plan_reviewed_at": null,
"spec_path": ".flow/specs/fn-17.md",
"status": "open",
"title": "V7 Golden Master: Rust State Machine with PyO3 Bindings",
"updated_at": "2026-01-19T01:17:47.652945Z"
}
354 changes: 354 additions & 0 deletions .flow/specs/fn-17.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
# fn-17 V7 Golden Master: Rust State Machine with PyO3 Bindings

## Overview

Implement a Rust-based investigation state machine with Python bindings that replaces the **Temporal workflow local variables** (`_current_step`, `_state`, etc. in `InvestigationWorkflow`). The existing `core/state.py` event log is retained for audit/persistence; Rust owns the **in-flight workflow state**.

**Why Rust:** Total, deterministic core; illegal transitions become explicit errors; state is serializable; side effects stay outside.

**Target replacement:** Temporal workflow state variables at `python-packages/dataing/src/dataing/temporal/workflows/investigation.py:67-90`

**What's NOT replaced:** `core/state.py` (event log for persistence), `core/investigation/` (branch/snapshot domain model)

**Architecture:**
- `core/` - Self-contained Rust workspace
- `crates/dataing_investigator/` - Pure Rust library (domain, protocol, state, machine)
- `bindings/python/` - PyO3 adapter exposing `dataing_investigator` module
- `python-packages/investigator/` - Python runtime (envelope, runtime, security)
- Integration with existing `dataing` Temporal workflows

## Scope

### In Scope
- Rust workspace setup with Maturin/PyO3 and wheel distribution
- Event-sourced state machine with strict phase transitions
- **Rust runs in Temporal ACTIVITIES** (not workflow code) for side-effect isolation
- All IDs externally generated (Rust never generates, only accepts)
- Versioned JSON wire protocol (v1) with strict schema
- Python runtime with envelope tracing and defense-in-depth validation
- Temporal workflow integration with signals/queries for HITL
- Build system integration (Justfile, uv workspace, CI wheel builds)

### Out of Scope
- Migration of existing investigations (separate epic)
- Replacing `core/state.py` event log (retained for audit)
- Performance optimization (benchmark after MVP)
- Multi-tenancy in Rust (handled at Python layer)
- OpenTelemetry integration (future enhancement)

## Execution Model

### Where Rust Runs

**CRITICAL DECISION:** The Rust state machine runs **inside Temporal activities**, NOT inside workflow code.

| Layer | Determinism | Contains |
|-------|-------------|----------|
| Workflow code | Must be deterministic | Orchestration, signals, queries, `workflow.uuid4()` |
| Activities | No determinism required | Rust state transitions, LLM calls, DB queries |

**Rationale:**
- Activities can use non-deterministic code safely
- State machine has side effects (generates intents, records metadata)
- Workflow passes IDs and events to activity, receives intent back
- State snapshots persisted to DB via activity (not Temporal history)

### Durability Mechanism

**State is NOT stored in Temporal history.** Instead:

1. Workflow calls `run_brain_step` activity with `(state_json, event_json, workflow_id)`
2. Activity runs Rust state machine, gets new state + intent
3. Activity persists state snapshot to **application DB** with idempotency key `(workflow_id, step)`
4. Activity returns `{intent_json, step}` to workflow
5. Workflow uses `continue_as_new` every N steps to bound history

**History growth mitigation:**
- Only intent payloads flow through history (small)
- State snapshots in DB, keyed by `(workflow_id, step)`
- `continue_as_new` every 100 steps with compacted checkpoint

## Key Design Decisions

### Phase Enum (Mapped to Workflow Steps)

```rust
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", content = "data")]
pub enum Phase {
// Maps to: workflow start
Init,
// Maps to: gather_context activity
GatheringContext { call_id: Option<String> },
// Maps to: check_patterns activity (optional)
CheckingPatterns { call_id: Option<String> },
// Maps to: generate_hypotheses activity
GeneratingHypotheses { call_id: Option<String> },
// Maps to: generate_query + execute_query activities (parallel per hypothesis)
EvaluatingHypotheses { pending_call_ids: Vec<String>, completed: Vec<String> },
// Maps to: interpret_evidence activity
InterpretingEvidence { call_id: Option<String> },
// Maps to: _await_user_input (signal-based)
AwaitingUser { question_id: String, prompt: String, timeout_seconds: u64 },
// Maps to: synthesize activity
Synthesizing { call_id: Option<String> },
// Maps to: counter_analyze activity (optional)
CounterAnalyzing { call_id: Option<String> },
// Terminal: success
Finished { insight: String },
// Terminal: failure (includes cancellation)
Failed { error: String, retryable: bool },
}
```

### HITL Specification

**Trigger conditions for `AwaitingUser`:**
1. LLM requests clarification (intent type `RequestUser`)
2. Confidence below threshold after hypothesis evaluation
3. Ambiguous evidence requiring human judgment

**Signal schema:**
```python
@workflow.signal
def submit_user_response(self, response: UserResponsePayload):
"""
UserResponsePayload:
- question_id: str (must match awaiting question)
- content: str (user's answer)
- timestamp: str (ISO8601)
"""
```

**Timeout semantics:**
- Default: 60 minutes
- On timeout: transition to `Failed { error: "User response timeout", retryable: true }`
- On cancel signal: transition to `Failed { error: "Cancelled", retryable: false }`

**Query surface:**
```python
@workflow.query
def get_awaiting_user_state(self) -> AwaitingUserState | None:
"""Returns question_id, prompt, timeout_remaining if in AwaitingUser phase."""
```

### ID Generation

**RULE: Rust NEVER generates IDs.** All IDs come from external sources:

| ID Type | Source | When |
|---------|--------|------|
| `event_id` | `workflow.uuid4()` | Workflow creates before calling activity |
| `call_id` | `workflow.uuid4()` | Workflow creates before scheduling tool call |
| `step` | Workflow counter | Monotonic, passed to activity |

**Rust accepts IDs via event payload:**
```rust
pub struct Event {
pub id: String, // External: workflow.uuid4()
pub step: u64, // External: workflow counter
pub payload: EventPayload,
}
```

**Idempotency:**
- Activity uses `(workflow_id, event_id)` as dedup key for DB writes
- Rust state machine maintains `seen_event_ids: HashSet<String>` to reject duplicates
- On replay, duplicate events are no-ops

### Versioned Wire Protocol (v1)

```json
{
"protocol_version": 1,
"event_id": "uuid",
"step": 42,
"kind": "CallResult",
"payload": { ... }
}
```

**Schema rules:**
- `protocol_version`: Required, reject if unknown
- Unknown fields: Ignored (forward compat)
- Missing required fields: Error
- Canonical JSON: Keys sorted, no trailing commas, UTF-8

**Backwards compatibility tests:**
- Golden fixtures for each event/intent type
- Round-trip tests across Rust/Python boundary

### Error Classification for Temporal

| Error Type | Temporal Behavior | Rust Exception |
|------------|-------------------|----------------|
| `InvalidTransition` | Non-retryable, fail workflow | `InvalidTransitionError` |
| `SerializationError` | Non-retryable, fail workflow | `SerializationError` |
| `InvariantViolation` | Non-retryable, bug → fail workflow | `InvariantError` |
| `ExternalCallFailed` | Retryable (Temporal retry policy) | `RetryableError` |

### Panic-Free Policy

**Crate-level enforcement:**
```rust
// lib.rs
#![deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
```

**All FFI entrypoints wrapped:**
```rust
fn safe_ingest(event_json: String) -> PyResult<String> {
std::panic::catch_unwind(|| ingest_inner(&event_json))
.map_err(|_| PyRuntimeError::new_err("Internal panic - please report"))?
}
```

**Panic strategy:** `panic = "unwind"` in Cargo.toml to enable `catch_unwind`

### Security Validation Boundaries

| Boundary | Validations |
|----------|-------------|
| API → Workflow signal | Schema validation, size limits (< 1MB), user auth |
| Workflow → Rust event | Protocol version, required fields, event_id uniqueness |
| Rust state invariants | Phase transition rules, call_id matching, step monotonicity |
| Activity → SQL | Existing `safety/validator.py`, forbidden statements, PII redaction |

**Size limits:**
- Event payload: < 100KB
- State snapshot: < 10MB
- Signal payload: < 1MB

## Naming Convention

**Unified naming:** `dataing_investigator`

| Component | Name |
|-----------|------|
| Rust crate | `dataing_investigator` |
| Python wheel | `dataing-investigator` |
| Python import | `from dataing_investigator import Investigator` |
| Module location | `python-packages/dataing-investigator/` (bindings) |
| Runtime package | `python-packages/investigator/` (Python runtime) |

## Build & Distribution

### Prerequisites
```bash
# Required tooling
rustup toolchain install stable
cargo install maturin
```

### Development
```bash
# Build Rust library
just rust-build

# Install bindings to venv (dev mode)
just rust-dev

# Run Rust tests
just rust-test
```

### CI/Release
```bash
# Build wheels for distribution (manylinux, macos, windows)
maturin build --release --strip

# Wheels output to target/wheels/
```

### Justfile additions
```just
# Prerequisites check
rust-check:
@command -v cargo >/dev/null || (echo "Install Rust: rustup.rs" && exit 1)
@command -v maturin >/dev/null || (echo "Install maturin: pip install maturin" && exit 1)

# Build Rust library
rust-build: rust-check
cd core && cargo build --release

# Dev install bindings
rust-dev: rust-check
cd core/bindings/python && maturin develop --uv

# Run Rust tests
rust-test: rust-check
cd core && cargo test

# Note: `just test` does NOT include rust-test until CI is ready
```

## Quick commands

```bash
# Prerequisites
rustup toolchain install stable
cargo install maturin

# Build Rust crate
just rust-build

# Install bindings (dev mode)
just rust-dev

# Run Rust tests
just rust-test

# Run Python tests (excluding Rust for now)
just test
```

## Acceptance

**Testable gates:**

1. **Rust unit tests pass:** `cargo test` in `core/` with 0 failures
2. **Transition table coverage:** Tests for every Phase → Phase transition
3. **Python binding smoke test:** `python -c "from dataing_investigator import Investigator; inv = Investigator(); print(inv.snapshot())"`
4. **Golden fixture tests:** Rust/Python round-trip for all event/intent types
5. **Idempotency test:** Duplicate event_id is rejected gracefully
6. **Panic-free test:** Malformed JSON returns PyResult error, not crash
7. **Deterministic replay test:** Same events → same state (with Temporal test env)
8. **Unexpected call_id handling:** Unexpected call_id produces deterministic `Error`/`Failed` (not silent ignore)
9. **Signal dedup documented:** Temporal signal dedup strategy documented (esp. `continue_as_new` boundary)
10. **Build tooling pinned:** Maturin version pinned in pyproject.toml and verified with uv integration

**NOT required for MVP (future epic):**
- E2E test with live Temporal server
- Coverage percentage metrics (add llvm-cov later)

## Risks & Mitigations

| Risk | Mitigation |
|------|------------|
| Maturin/uv integration complexity | Task fn-17.9 focused on build validation; CI builds wheels |
| Temporal replay non-determinism | Rust runs in activities (not workflow), all IDs external |
| History growth from state snapshots | State in DB, not history; `continue_as_new` every 100 steps |
| Native extension distribution | CI builds manylinux/macos wheels; pin Rust toolchain |
| Protocol drift between Rust/Python | Versioned protocol v1; golden fixtures; backwards-compat tests |
| Rollout risk | Feature flag to use Python-only path; gradual rollout |
| Panic propagation | `#![deny(clippy::unwrap_used)]`, `catch_unwind` at boundary |

## References

### Existing Code
- `python-packages/dataing/src/dataing/temporal/workflows/investigation.py:67-90` - **Target: workflow state to replace**
- `python-packages/dataing/src/dataing/core/state.py:26-203` - Event log (retained)
- `python-packages/dataing/src/dataing/safety/validator.py:24-128` - SQL safety (reused)
- `pyproject.toml:161-162` - uv workspace configuration

### Documentation
- [PyO3 Error Handling](https://pyo3.rs/v0.23.5/function/error-handling)
- [Maturin + uv Integration](https://quanttype.net/posts/2025-09-12-uv-and-maturin.html)
- [Temporal Python SDK - Message Passing](https://docs.temporal.io/develop/python/message-passing)
- [Serde Attributes](https://serde.rs/attributes.html)

## Open Questions (Deferred to Implementation)

1. **Exact `continue_as_new` threshold:** Start with 100 steps, tune based on history size
2. **State snapshot DB schema:** Defer to fn-17.14 (Temporal integration task)
3. **Feature flag mechanism:** Use existing entitlements system or env var
23 changes: 23 additions & 0 deletions .flow/tasks/fn-17.1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"assignee": "bordumbb@gmail.com",
"claim_note": "",
"claimed_at": "2026-01-19T01:46:01.490848Z",
"created_at": "2026-01-19T01:18:50.390127Z",
"depends_on": [],
"epic": "fn-17",
"evidence": {
"commits": [
"23f1fd4792dc94bd66d6949506e96240ed304ebf"
],
"prs": [],
"tests": [
"cargo test"
]
},
"id": "fn-17.1",
"priority": null,
"spec_path": ".flow/tasks/fn-17.1.md",
"status": "done",
"title": "Scaffold Rust workspace structure",
"updated_at": "2026-01-19T01:50:20.724622Z"
}
Loading