Skip to content

UnicoLab/FlowyML

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

170 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

🌊 flowyml

flowyml Logo
The Enterprise-Grade ML Pipeline Framework for Humans

CI Status PyPI Version Python 3.10+ License UnicoLab


FlowyML is a lightweight yet powerful ML pipeline orchestration framework. It bridges the gap between rapid experimentation and enterprise production by making assets first-class citizens. Write pipelines in pure Python, and scale them to production without changing a single line of code.

πŸš€ Why FlowyML?

Feature FlowyML Traditional Orchestrators
Developer Experience 🐍 Native Python - No DSLs, no YAML hell. πŸ“œ Complex YAML or rigid DSLs.
Type-Based Routing 🧠 Auto-Routing - Define WHAT, we handle WHERE. πŸ”Œ Manual wiring to cloud buckets.
Smart Caching ⚑ Multi-Level - Smart content-hashing skips re-runs. 🐒 Basic file-timestamp checking.
Asset Management πŸ“¦ First-Class Assets - Models & Datasets with lineage. πŸ“ Generic file paths only.
Multi-Stack 🌍 Abstract Infra - Switch local/prod with one env var. πŸ”’ Vendor lock-in or complex setup.
GenAI Ready πŸ€– LLM Tracing - Built-in token & cost tracking. 🧩 Requires external tools.
Build-Time Validation βœ… Type Safety - Catches mismatches at build time. πŸ’₯ Runtime errors only.
Map Tasks πŸ—ΊοΈ Parallel Maps - @map_task with retries & concurrency. πŸ” Manual parallelism boilerplate.
Dynamic Workflows πŸ”€ Runtime DAGs - Generate pipelines based on data. πŸ“ Static definitions only.
GenAI Assets 🎯 Prompt & Checkpoint - First-class prompt versioning and training resumability. πŸ“ Unmanaged text files.
Stack Hydration πŸ—οΈ YAML β†’ Live Stack - StackConfig.to_stack() wires infra automatically. βš™οΈ Manual component assembly.

⚑️ Quick Start

This is a complete, multi-step ML pipeline with auto-injected context:

from flowyml import Pipeline, step, context

@step(outputs=["dataset"])
def load_data(batch_size: int = 32):
    return [i for i in range(batch_size)]

@step(inputs=["dataset"], outputs=["model"])
def train_model(dataset, learning_rate: float = 0.01):
    print(f"Training on {len(dataset)} items with lr={learning_rate}")
    return "model_v1"

# Configure and Run
ctx = context(learning_rate=0.05, batch_size=64)
pipeline = Pipeline("quickstart", context=ctx)
pipeline.add_step(load_data).add_step(train_model)

pipeline.run()

🌟 Key Features

1. 🧠 Type-Based Artifact Routing (New in 1.8.0)

Define artifact types in code, and FlowyML automatically routes them to your cloud infrastructure.

@step
def train(...) -> Model:
    # Auto-saved to GCS/S3 and registered to Vertex AI / SageMaker
    return Model(obj, name="classifier")

2. 🌍 Multi-Stack Configuration

Manage local, staging, and production environments in a single flowyml.yaml.

export FLOWYML_STACK=production
python pipeline.py  # Now runs on Vertex AI with GCS storage

3. πŸ›‘οΈ Intelligent Step Grouping

Group consecutive steps to run in the same container. Perfect for reducing overhead while maintaining clear step boundaries.

4. πŸ“Š Built-in Observability

Beautiful dark-mode dashboard to monitor pipelines, visualize DAGs, and inspect artifacts in real-time.

5. 🎯 Evaluations Framework

Production-grade evaluation system with 29+ scorers β€” classification, regression, GenAI (LLM-as-a-judge), and adapters for DeepEval, RAGAS, and Phoenix:

from flowyml.evals import evaluate, EvalDataset, get_scorer

data = EvalDataset.create_genai("my_test", examples=[...])
result = evaluate(data=data, scorers=[get_scorer("relevance"), get_scorer("ragas.faithfulness")])
result.notify_if_regression(threshold=0.05)

6. πŸ—ΊοΈ Map Tasks & Dynamic Workflows

Distribute work over collections with @map_task and generate pipelines at runtime with @dynamic:

from flowyml import map_task, dynamic

@map_task(concurrency=8, retries=2, min_success_ratio=0.95)
def process_document(doc: dict) -> dict:
    return transform(doc)

@dynamic(outputs=["best_model"])
def hyperparameter_search(config: dict):
    sub = Pipeline("hp_search")
    for lr in config["learning_rates"]:
        sub.add_step(train_with_lr(lr))
    return sub

7. πŸ“¦ Artifact Catalog with Lineage

Centralized artifact discovery, tagging, and lineage tracking β€” works local and remote:

from flowyml import ArtifactCatalog

catalog = ArtifactCatalog()  # Auto-selects local SQLite or remote API
catalog.register(name="classifier", artifact_type="Model", parent_ids=[dataset_id])
lineage = catalog.get_lineage(model_id)  # Full parent→child graph

πŸ“¦ Installation

# Install core
pip install flowyml

# Install with everything (recommended)
pip install "flowyml[all]"

πŸ“š Documentation

Visit FlowyML Docs for:


Built with ❀️ by UnicoLab