Skip to content

Commit f56b059

Browse files
feat: automatic steps discovery and steps registry with filtering
1 parent ded6126 commit f56b059

7 files changed

Lines changed: 595 additions & 133 deletions

File tree

announcement.md

Lines changed: 0 additions & 128 deletions
This file was deleted.

docs/artifact-centric.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
To understand the shift from Task-Centric (traditional) to Artifact-Centric (FlowyML) pipelines, we have to look at how the execution engine views the relationship between code and data.
2+
3+
Technically, this isn't just a naming convention; it’s a change in how the Directed Acyclic Graph (DAG) is constructed and how the state is persisted.
4+
5+
1. Declarative Signatures vs. Imperative Sequences
6+
In a task-centric system (like Airflow), you define the order of operations. You essentially write a script that says, "Run preprocess, then run train." The movement of data between them is usually an afterthought—you manualy pass S3 paths or local file locations between functions.
7+
8+
In FlowyML (Artifact-Centric), the system builds the DAG by looking at the Input/Output signatures of your steps.
9+
10+
Technical Implementation: When you define a step, you declare: "I produce an artifact named 'train_data' of type Dataset." The Orchestrator looks at another step that says, "I require an input named 'train_data' of type Dataset."
11+
Result: The "edge" in the graph is formed automatically because of a data dependency, not because you wrote step_a >> step_b. If you change an output name, the graph breaks at build-time (checked by the
12+
13+
TypeValidator
14+
).
15+
2. The Global Artifact Catalog vs. Manual "Handoffs"
16+
The biggest technical hurdle in task-centric pipelines is the "handoff." You often see code like: pd.read_csv(f"s3://my-bucket/{run_id}/data.csv"). This hardcodes the infrastructure and pathing logic inside your business logic.
17+
18+
In an artifact-centric system, FlowyML uses the Catalog (Registry Pattern):
19+
20+
Unique Identity: Every artifact is registered in the
21+
22+
Catalog
23+
(via
24+
25+
register()
26+
which I just fixed) with a
27+
28+
content_hash
29+
, source_step, and source_run_id.
30+
Discovery: A downstream step doesn't need to know where the model is stored (S3 vs. Azure vs. Local). It asks the Catalog for the artifact by name/version. The
31+
32+
CatalogBackend
33+
resolves the storage URI and handles the high-level fetching.
34+
Immutability: Each artifact is a record of truth. If the input data hash hasn't changed, the system knows it can skip the task entirely (Caching/Memoization).
35+
3. Automatic Lineage (The "Parents" Concept)
36+
In task-centric systems, if you find a bad model in production, tracing it back to the exact version of the SQL query and the raw CSV that created it is a manual forensic exercise.
37+
38+
In Artifact-Centric FlowyML:
39+
40+
Lineage Tracking: As seen in the
41+
42+
CatalogEntry
43+
structure, every artifact stores parent_ids.
44+
Technical Flow: When Step B consumes Artifact A, FlowyML automatically records that Artifact A is the parent of whatever Step B produces.
45+
Observability: You can call
46+
47+
get_lineage(artifact_id)
48+
to get a full recursive tree of every transformation that touched that specific piece of data, from raw ingestion to the final insight.
49+
4. Infrastructure as Configuration (flowyml.yaml)
50+
In task-centric code, you often specify cpu=4, memory='16Gi' inside your Python @task decorator. This locks your code to specific hardware.
51+
52+
In Artifact-Centric design, we decouple "What happens" from "Where it happens":
53+
54+
The Code: Pure Python logic defined by inputs and outputs.
55+
The YAML: Defines the Stack. It specifies that the "Model" artifact produced by train_step should be stored in an S3ArtifactStore and that the step should run on a KubernetesOrchestrator.
56+
Benefit: You can run the exact same artifact logic on your local machine (using
57+
58+
LocalCatalogBackend
59+
) or in Great-Grandchild-scale production without changing a single line of Python.
60+
Summary Comparison
61+
Metric Task-Centric Artifact-Centric (FlowyML)
62+
Logic Focus "What do I run?" (Verbs) "What do I produce?" (Nouns)
63+
Data Flow Manual path passing Automatic resolution via Catalog
64+
Validation Errors happen at runtime (file not found) Errors happen at build-time (type mismatch)
65+
Debugging Check the logs of Task X Inspect the state of Artifact Y
66+
Portability Hardcoded file paths/infra Stack-based storage abstraction
67+
By focusing on the Artifact, FlowyML treats data as a first-class citizen of the deployment, enabling reproducible machine learning where every result is mathematically linked to its origin.

flowyml/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
# Core imports
1212
from flowyml.core.context import Context, context
13-
from flowyml.core.step import step, Step
13+
from flowyml.core.step import step, Step, StepRegistry, get_registered_steps, clear_step_registry
1414
from flowyml.core.pipeline import Pipeline
1515
from flowyml.core.executor import Executor, LocalExecutor
1616
from flowyml.core.cache import CacheStrategy
@@ -176,6 +176,9 @@
176176
"context",
177177
"step",
178178
"Step",
179+
"StepRegistry",
180+
"get_registered_steps",
181+
"clear_step_registry",
179182
"Pipeline",
180183
"Executor",
181184
"LocalExecutor",

flowyml/core/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Core pipeline execution components."""
22

33
from flowyml.core.context import Context, context
4-
from flowyml.core.step import step, Step
4+
from flowyml.core.step import step, Step, StepRegistry, get_registered_steps, clear_step_registry
55
from flowyml.core.pipeline import Pipeline
66
from flowyml.core.executor import Executor, LocalExecutor
77
from flowyml.core.cache import CacheStrategy
@@ -44,6 +44,9 @@
4444
# Steps & Pipelines
4545
"step",
4646
"Step",
47+
"StepRegistry",
48+
"get_registered_steps",
49+
"clear_step_registry",
4750
"Pipeline",
4851
# Execution
4952
"Executor",

flowyml/core/pipeline.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,19 @@ class Pipeline:
135135
>>> @step(outputs=["model/trained"])
136136
... def train(learning_rate: float, epochs: int):
137137
... return train_model(learning_rate, epochs)
138+
139+
# Option 1: Auto-discover all @step-decorated functions
140+
>>> pipeline = Pipeline("my_pipeline", context=ctx, auto_discover=True)
141+
>>> result = pipeline.run()
142+
143+
# Option 2: Concise explicit selection
144+
>>> pipeline = Pipeline.from_steps(train, name="my_pipeline", context=ctx)
145+
146+
# Option 3: Batch add
147+
>>> pipeline = Pipeline("my_pipeline", context=ctx)
148+
>>> pipeline.add_steps([train])
149+
150+
# Option 4: Manual add_step (existing, still works)
138151
>>> pipeline = Pipeline("my_pipeline", context=ctx)
139152
>>> pipeline.add_step(train)
140153
>>> result = pipeline.run()
@@ -184,6 +197,7 @@ def __init__(
184197
project: str | None = None, # Project name to attach to (deprecated, use project_name)
185198
project_name: str | None = None, # Project name to attach to (creates if doesn't exist)
186199
version: str | None = None, # If provided, VersionedPipeline is created via __new__
200+
auto_discover: bool = False, # Auto-discover @step-decorated functions
187201
**kwargs,
188202
):
189203
"""Initialize pipeline.
@@ -202,8 +216,10 @@ def __init__(
202216
If the project doesn't exist, it will be created automatically.
203217
version: Optional version string. If provided, a VersionedPipeline
204218
instance will be created instead of a regular Pipeline.
219+
auto_discover: If True, automatically discover all ``@step``-decorated
220+
functions from the global registry at build time. Steps with a
221+
matching ``pipeline`` tag are preferred. Defaults to False.
205222
**kwargs: Additional keyword arguments passed to the pipeline.
206-
instance is automatically created instead of a regular Pipeline.
207223
"""
208224
from flowyml.utils.config import get_config
209225

@@ -290,6 +306,7 @@ def __init__(
290306

291307
# State
292308
self._built = False
309+
self._auto_discover = auto_discover
293310
self.step_groups: list[Any] = [] # Will hold StepGroup objects
294311
self.control_flows: list[Any] = [] # Store conditional control flows (If, Switch, etc.)
295312

@@ -318,6 +335,56 @@ def add_step(self, step: Step) -> "Pipeline":
318335
self._built = False
319336
return self
320337

338+
def add_steps(self, steps: list[Step]) -> "Pipeline":
339+
"""Add multiple steps to the pipeline at once.
340+
341+
Args:
342+
steps: List of Step instances to add
343+
344+
Returns:
345+
Self for chaining
346+
347+
Example:
348+
>>> pipeline.add_steps([load_data, train_model, evaluate])
349+
"""
350+
for s in steps:
351+
self.steps.append(s)
352+
self._built = False
353+
return self
354+
355+
@classmethod
356+
def from_steps(
357+
cls,
358+
*steps: Step,
359+
name: str,
360+
**kwargs,
361+
) -> "Pipeline":
362+
"""Create a pipeline from an explicit list of steps.
363+
364+
Convenience constructor that avoids repetitive ``add_step()`` calls
365+
while still giving you full control over which steps are included.
366+
367+
Args:
368+
*steps: Step instances to include
369+
name: Pipeline name (keyword-only)
370+
**kwargs: Additional arguments passed to Pipeline()
371+
372+
Returns:
373+
Configured Pipeline instance
374+
375+
Example:
376+
>>> pipeline = Pipeline.from_steps(
377+
... load_data,
378+
... train_model,
379+
... evaluate,
380+
... name="training",
381+
... enable_cache=False,
382+
... )
383+
"""
384+
pipeline = cls(name=name, **kwargs)
385+
pipeline.add_steps(list(steps))
386+
return pipeline
387+
321388
def add_control_flow(self, control_flow: Any) -> "Pipeline":
322389
"""Add conditional control flow to the pipeline.
323390
@@ -397,6 +464,15 @@ def build(self) -> None:
397464
if self._built:
398465
return
399466

467+
# Auto-discover steps from global registry if enabled
468+
if self._auto_discover and not self.steps:
469+
from flowyml.core.step import get_registered_steps
470+
471+
discovered = get_registered_steps(pipeline=self.name)
472+
if not discovered:
473+
discovered = get_registered_steps()
474+
self.steps = list(discovered)
475+
400476
# Clear previous DAG
401477
self.dag = DAG()
402478

0 commit comments

Comments
 (0)