Version: 2.0
Focus: Intelligent Data Processing Pipelines with Automatic Context Management
Author: Piotr Laczkowski
Date: October 9, 2025
GraphFlow is a production-ready Python framework designed for intelligent data processing pipelines that automatically manage context and dependencies while scaling from prototype to production without code changes.
- Zero-friction pipeline development: Write pandas/polars code, get distributed execution
- Automatic context resolution: Smart parameter detection and dependency injection based on graph analysis
- Production-grade reliability: Built-in caching, retries, monitoring, and data quality validation
- Flexible execution: Seamlessly choose between local, distributed, or cloud execution
- Intelligence First: Automatic context detection and dependency resolution
- Developer Experience: Minimal boilerplate, maximum productivity
- Scale Transparently: Same code runs on laptop or 1000-node cluster
- Production Ready: Monitoring, caching, and reliability built-in
- Broad Applicability: Optimized for any data processing workflow - ETL, ML features, analytics, etc.
Smart Context Resolution: Parameters and dependencies are automatically detected through graph analysis - no manual specification needed.
from graphflow import Pipeline, node, context, dataset
import pandas as pd
# Define global context - parameters automatically flow where needed
ctx = context(
# Data processing context
env="prod",
date_range=("2024-01-01", "2024-12-31"),
# Processing parameters - automatically detected based on usage
lookback_days=30,
min_samples=100,
categorical_threshold=10,
# ML context (when doing feature engineering)
target_col="churn",
# Infrastructure context
cache_ttl="7d",
validation_strict=True
)
# Create pipeline with intelligent context
pipeline = Pipeline(
name="data_processing_pipeline",
base_uri="s3://data-lake/",
context=ctx # Automatically analyzed and distributed
)Automatic Parameter Detection: Context parameters are automatically injected based on function signatures and graph dependencies.
# Raw datasets
customers = dataset("bronze/customers", partition_by=["date"])
transactions = dataset("bronze/transactions", partition_by=["date", "customer_id"])
events = dataset("bronze/events", partition_by=["date"])
@node(inputs=[customers], outputs=[customers.derived("_enriched")])
def enrich_customers(df: pd.DataFrame,
categorical_threshold: int, # Auto-detected from context
date_range: tuple) -> pd.DataFrame:
"""Enrich customer data - context params automatically injected."""
df = df.copy()
# Parameters automatically available - no manual wiring needed
start_date, end_date = date_range
df = df[(df['signup_date'] >= start_date) & (df['signup_date'] <= end_date)]
# Automatic context-aware processing
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() > categorical_threshold:
df[f"{col}_encoded"] = df[col].astype('category').cat.codes
return df
@node(
inputs=[transactions],
outputs=[dataset("silver/transaction_aggregates", partition_by=["customer_id"])]
# No context_deps needed - automatically detected!
)
def compute_transaction_features(df: pd.DataFrame,
lookback_days: int, # Auto-injected
min_samples: int) -> pd.DataFrame: # Auto-injected
"""Compute transaction features - context flows automatically."""
features = []
for customer_id in df['customer_id'].unique():
cust_data = df[df['customer_id'] == customer_id].sort_values('transaction_date')
# Context parameters automatically available
if len(cust_data) < min_samples:
continue
# Use lookback_days automatically provided from context
recent_data = cust_data.tail(lookback_days)
cust_features = {
'customer_id': customer_id,
f'transaction_count_{lookback_days}d': len(recent_data),
f'avg_amount_{lookback_days}d': recent_data['amount'].mean(),
}
features.append(cust_features)
return pd.DataFrame(features)
# Chain nodes with automatic context flow - no manual dependency management!
@node(
inputs=[customers.derived("_enriched"), dataset("silver/transaction_aggregates")],
outputs=[dataset("gold/customer_data_mart")]
)
def create_data_mart(customer_df: pd.DataFrame,
transaction_features: pd.DataFrame,
target_col: str) -> pd.DataFrame: # Auto-injected from context
"""Merge all data - target_col automatically provided."""
# Context automatically provides target_col and other needed parameters
data_mart = customer_df.merge(
transaction_features,
on='customer_id',
how='left'
)
# Target column handling from context
if target_col in data_mart.columns:
cols = [c for c in data_mart.columns if c != target_col] + [target_col]
data_mart = data_mart[cols]
return data_mart# Pipeline automatically detects dependencies and context flow
pipeline.add_nodes([
enrich_customers,
compute_transaction_features,
create_data_mart
])
# Flexible execution - choose your backend
result = pipeline.run(
targets=["gold/customer_data_mart"],
where={"date": "2024-10-01"}, # Partition filter
# Choose execution backend dynamically
executor="auto", # Auto-select best executor
# executor="local", # Force local execution
# executor="ray", # Force Ray distributed
# executor="cloud", # Force cloud execution
max_workers=16,
validate_data=True, # Built-in validation
profile_data=True # Automatic profiling
)Intelligent Graph Interface: Export and visualize your pipeline in multiple formats with rich interactivity.
# Dynamic graph exports - choose your format
pipeline.export_graph(
format="html", # Interactive HTML with PyVis
output="pipeline.html",
include_context=True, # Show context flow
include_data_flow=True, # Show data dependencies
show_cache_status=True # Highlight cached vs dirty nodes
)
# Export to different formats
pipeline.export_graph(format="graphviz", output="pipeline.dot") # GraphViz DOT
pipeline.export_graph(format="mermaid", output="pipeline.md") # Mermaid diagram
pipeline.export_graph(format="json", output="pipeline.json") # JSON graph structure
pipeline.export_graph(format="yaml", output="pipeline.yaml") # YAML config
# Real-time pipeline inspection
inspector = pipeline.inspector()
# Get live graph state
graph_state = inspector.get_graph_state()
print(f"Nodes: {graph_state.node_count}")
print(f"Edges: {graph_state.edge_count}")
print(f"Cached nodes: {graph_state.cached_nodes}")
print(f"Dirty nodes: {graph_state.dirty_nodes}")
# Interactive graph exploration
for node_name, node_info in inspector.nodes.items():
print(f"Node: {node_name}")
print(f" Status: {node_info.status}")
print(f" Context deps: {node_info.context_dependencies}")
print(f" Data deps: {node_info.data_dependencies}")
print(f" Cache key: {node_info.cache_key}")
print(f" Last run: {node_info.last_execution}")
# Dynamic graph filtering and querying
subgraph = inspector.filter_nodes(
status="dirty", # Only dirty nodes
tags=["feature_engineering"], # Nodes with specific tags
context_deps=["lookback_days"] # Nodes using specific context
)
# Export filtered subgraph
subgraph.export("dirty_nodes.html", format="html")Smart Executor Selection: Choose the right execution backend for your workload automatically or manually.
# Automatic executor selection based on workload characteristics
result = pipeline.run(
executor="auto", # GraphFlow chooses best executor
execution_hints={
"data_size_gb": 50, # Help auto-selection
"cpu_intensive": True, # CPU vs I/O bound
"memory_intensive": False, # Memory requirements
"requires_gpu": False, # GPU acceleration needed
"max_runtime_hours": 2 # Time constraints
}
)
# Manual executor selection with detailed configuration
executors = {
# Local execution options
"local_threads": {
"type": "local",
"backend": "threads",
"max_workers": 8,
"memory_limit": "16GB"
},
"local_processes": {
"type": "local",
"backend": "processes",
"max_workers": 4,
"memory_per_worker": "4GB"
},
# Distributed execution
"ray_cluster": {
"type": "distributed",
"backend": "ray",
"ray_address": "ray://head-node:10001",
"max_workers": 100,
"resources_per_worker": {"CPU": 2, "memory": 8000000000}
},
"dask_cluster": {
"type": "distributed",
"backend": "dask",
"scheduler_address": "tcp://scheduler:8786",
"max_workers": 50
},
# Cloud execution
"vertex_ai": {
"type": "cloud",
"backend": "vertex",
"project": "my-project",
"region": "us-central1",
"machine_type": "n1-standard-8",
"accelerator": {"type": "NVIDIA_TESLA_T4", "count": 1}
},
"aws_batch": {
"type": "cloud",
"backend": "batch",
"job_queue": "my-compute-queue",
"job_definition": "graphflow-job-def",
"vcpus": 8,
"memory": 32768
}
}
# Use specific executor
result = pipeline.run(
executor="ray_cluster",
executor_config=executors["ray_cluster"]
)
# Hybrid execution - different nodes on different executors
pipeline.set_node_executor("heavy_computation", "vertex_ai")
pipeline.set_node_executor("light_processing", "local_threads")
pipeline.set_node_executor("gpu_inference", "ray_cluster")
result = pipeline.run() # Uses mixed executionVersatile Processing Patterns: GraphFlow supports any data processing workflow, not just feature engineering.
from graphflow.patterns import (
time_series_processing, aggregation_processing, transformation_processing,
validation_processing, ml_feature_processing
)
# ETL Pipeline Example
@node(inputs=[raw_sales], outputs=[clean_sales])
@validation_processing(
null_checks=["customer_id", "amount", "date"],
range_checks={"amount": (0, 1000000)},
format_checks={"date": "YYYY-MM-DD"}
)
def clean_sales_data(df: pd.DataFrame) -> pd.DataFrame:
"""Data cleaning with automatic validation."""
pass # Validation pattern handles the implementation
# Analytics Pipeline Example
@node(inputs=[clean_sales], outputs=[sales_metrics])
@aggregation_processing(
group_by=["region", "product_category"],
agg_specs={
"amount": ["sum", "mean", "count"],
"quantity": ["sum", "mean"]
},
time_windows=["daily", "weekly", "monthly"]
)
def compute_sales_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""Business analytics aggregation."""
pass # Aggregation pattern handles the implementation
# Time Series Analysis Pipeline
@node(inputs=[sensor_data], outputs=[processed_timeseries])
@time_series_processing(
timestamp_col="timestamp",
value_cols=["temperature", "pressure", "humidity"],
operations=["smoothing", "anomaly_detection", "forecasting"],
window_size="1h",
forecast_horizon="24h"
)
def process_sensor_data(df: pd.DataFrame) -> pd.DataFrame:
"""Time series processing and forecasting."""
pass # Time series pattern handles the implementation
# ML Feature Engineering (when needed)
@node(inputs=[customer_data], outputs=[customer_features])
@ml_feature_processing(
categorical_encoding="auto",
numerical_scaling="robust",
feature_selection=True,
interaction_features=True
)
def create_ml_features(df: pd.DataFrame, target_col: str) -> pd.DataFrame:
"""ML-specific feature engineering."""
pass # ML pattern handles the implementation
# Data Transformation Pipeline
@node(inputs=[raw_logs], outputs=[structured_events])
@transformation_processing(
parsers={"log_line": "regex", "timestamp": "datetime"},
extractors=["ip_address", "user_agent", "status_code"],
enrichers=["geo_location", "device_type"]
)
def parse_log_data(df: pd.DataFrame) -> pd.DataFrame:
"""Log parsing and enrichment."""
pass # Transformation pattern handles the implementationContext for Any Workflow: Context system works for any data processing scenario.
# ETL Context
etl_ctx = context(
# Data quality rules
null_tolerance=0.05,
outlier_threshold=3.0,
# Processing windows
batch_size=10000,
processing_date="2024-10-01",
# Infrastructure
memory_limit="8GB",
timeout_minutes=30
)
# Analytics Context
analytics_ctx = context(
# Business rules
fiscal_year_start="2024-01-01",
currency="USD",
# Aggregation rules
min_sample_size=100,
confidence_level=0.95,
# Reporting
report_format="daily",
include_trends=True
)
# ML Context
ml_ctx = context(
# Feature engineering
lookback_days=30,
min_samples=1000,
feature_selection_threshold=0.01,
# Model parameters
target_col="conversion",
test_size=0.2,
random_state=42,
# Validation
cv_folds=5,
scoring_metric="auc"
)
# Time Series Context
ts_ctx = context(
# Time series parameters
frequency="1H",
seasonality="daily",
trend_detection=True,
# Forecasting
forecast_horizon=24,
confidence_intervals=[0.8, 0.95],
# Anomaly detection
anomaly_threshold=2.5,
min_anomaly_duration="30min"
)Intelligent Context Resolution Engine: Automatically analyzes graph dependencies and function signatures to inject the right parameters.
# graphflow/core/context.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Callable, get_type_hints, Set
from inspect import signature
import networkx as nx
import logging
@dataclass(frozen=True)
class Context:
"""Immutable context with automatic parameter resolution."""
# Core context categories
data: Dict[str, Any] = field(default_factory=dict)
processing: Dict[str, Any] = field(default_factory=dict)
ml: Dict[str, Any] = field(default_factory=dict)
infrastructure: Dict[str, Any] = field(default_factory=dict)
runtime: Dict[str, Any] = field(default_factory=dict)
def evolve(self, **updates) -> Context:
"""Create new context with updates."""
new_data = {**self.data, **updates.get('data', {})}
new_processing = {**self.processing, **updates.get('processing', {})}
new_ml = {**self.ml, **updates.get('ml', {})}
new_infra = {**self.infrastructure, **updates.get('infrastructure', {})}
new_runtime = {**self.runtime, **updates.get('runtime', {})}
# Auto-categorize based on parameter name patterns
for key, value in updates.items():
if key not in ['data', 'processing', 'ml', 'infrastructure', 'runtime']:
category = self._auto_categorize_param(key)
if category == 'processing':
new_processing[key] = value
elif category == 'ml':
new_ml[key] = value
elif category == 'infrastructure':
new_infra[key] = value
elif category == 'runtime':
new_runtime[key] = value
else:
new_data[key] = value
return Context(
data=new_data,
processing=new_processing,
ml=new_ml,
infrastructure=new_infra,
runtime=new_runtime
)
def _auto_categorize_param(self, param_name: str) -> str:
"""Automatically categorize parameter based on name patterns."""
processing_patterns = ['_days', '_window', '_threshold', '_samples', '_size', 'lookback', 'batch']
ml_patterns = ['_col', 'target', 'model', 'feature', 'score', 'cv_', 'test_']
infra_patterns = ['_uri', '_config', 'cache', 'memory', 'timeout', 'workers']
runtime_patterns = ['env', 'debug', 'verbose', 'log_']
param_lower = param_name.lower()
if any(pattern in param_lower for pattern in processing_patterns):
return 'processing'
elif any(pattern in param_lower for pattern in ml_patterns):
return 'ml'
elif any(pattern in param_lower for pattern in infra_patterns):
return 'infrastructure'
elif any(pattern in param_lower for pattern in runtime_patterns):
return 'runtime'
else:
return 'data'
def resolve_for_function(self, func: Callable) -> Dict[str, Any]:
"""Resolve context parameters for a specific function signature."""
sig = signature(func)
type_hints = get_type_hints(func)
resolved = {}
# Flatten all context parameters
all_params = {
**self.data,
**self.processing,
**self.ml,
**self.infrastructure,
**self.runtime
}
for param_name, param in sig.parameters.items():
# Skip data parameters (DataFrames, datasets, etc.)
if self._is_data_parameter(param_name, type_hints.get(param_name)):
continue
if param_name in all_params:
resolved[param_name] = all_params[param_name]
elif param.default is not param.empty:
# Use default value if no context value provided
resolved[param_name] = param.default
else:
# Parameter needed but not found in context
logging.warning(f"Parameter {param_name} not found in context for {func.__name__}")
return resolved
def _is_data_parameter(self, param_name: str, param_type: Any) -> bool:
"""Check if parameter is a data parameter (DataFrame, etc.)."""
data_param_names = ['df', 'data', 'dataframe', 'dataset']
if param_name.lower() in data_param_names:
return True
# Check type hints for DataFrame types
if param_type:
type_str = str(param_type)
if any(df_type in type_str for df_type in ['DataFrame', 'LazyFrame', 'Dataset']):
return True
return False
class ContextAnalyzer:
"""Analyzes pipeline graph to automatically detect context dependencies."""
def __init__(self, graph: nx.DiGraph, nodes: Dict[str, Any]):
self.graph = graph
self.nodes = nodes
def analyze_context_dependencies(self) -> Dict[str, Set[str]]:
"""Analyze which context parameters each node actually needs."""
dependencies = {}
for node_name, node_spec in self.nodes.items():
func = node_spec.func
sig = signature(func)
# Extract parameter names (excluding data parameters)
context_params = set()
for param_name, param in sig.parameters.items():
if not self._is_data_parameter(param_name):
context_params.add(param_name)
dependencies[node_name] = context_params
return dependencies
def _is_data_parameter(self, param_name: str) -> bool:
"""Check if parameter name indicates a data parameter."""
data_indicators = ['df', 'data', 'dataframe', 'dataset', '_df', '_data']
return any(indicator in param_name.lower() for indicator in data_indicators)
def get_context_flow_graph(self) -> nx.DiGraph:
"""Create a graph showing context parameter flow between nodes."""
context_graph = nx.DiGraph()
dependencies = self.analyze_context_dependencies()
# Add nodes with their context dependencies
for node_name, context_params in dependencies.items():
context_graph.add_node(node_name, context_deps=context_params)
# Add edges for data dependencies (from original graph)
for edge in self.graph.edges():
context_graph.add_edge(edge[0], edge[1])
return context_graph
# Convenience factory with auto-categorization
def context(**kwargs) -> Context:
"""Create context with automatic parameter categorization."""
return Context().evolve(**kwargs)Automatic Context Detection: Node decorators automatically analyze function signatures - no manual dependency specification needed.
# graphflow/core/decorators.py
from functools import wraps
from typing import List, Optional, Callable, Any, Dict
from .context import Context, ContextAnalyzer
from .dataset import Dataset
import inspect
class NodeSpec:
"""Specification for a processing node with automatic context detection."""
def __init__(self, func: Callable, inputs: List[Dataset], outputs: List[Dataset],
cache_ttl: Optional[str] = None,
validation: Optional[Dict[str, Any]] = None):
self.func = func
self.inputs = inputs
self.outputs = outputs
self.cache_ttl = cache_ttl
self.validation = validation or {}
self.name = func.__name__
# Automatically detect context dependencies
self.context_deps = self._detect_context_dependencies()
def _detect_context_dependencies(self) -> List[str]:
"""Automatically detect which context parameters this function needs."""
sig = inspect.signature(self.func)
context_params = []
for param_name, param in sig.parameters.items():
# Skip data parameters
if self._is_data_parameter(param_name):
continue
context_params.append(param_name)
return context_params
def _is_data_parameter(self, param_name: str) -> bool:
"""Check if parameter is a data parameter."""
data_indicators = ['df', 'data', 'dataframe', 'dataset', '_df', '_data']
return any(indicator in param_name.lower() for indicator in data_indicators)
def node(
inputs: List[Dataset] = None,
outputs: List[Dataset] = None,
cache_ttl: str = "24h",
validate_schema: bool = True,
validate_data_quality: bool = True,
executor: str = "auto",
retries: int = 2,
timeout: int = 3600,
tags: List[str] = None
):
"""
Decorator for processing nodes with automatic context detection.
No need to specify context_deps - they're automatically detected!
"""
def decorator(func: Callable) -> NodeSpec:
@wraps(func)
def wrapper(context: Context, *data_args, **data_kwargs):
# Automatically resolve context parameters for this function
context_params = context.resolve_for_function(func)
# Merge with any explicit kwargs
final_kwargs = {**context_params, **data_kwargs}
# Call original function with resolved parameters
result = func(*data_args, **final_kwargs)
# Apply post-processing (validation, profiling, etc.)
if validate_schema:
result = _validate_schema(result, func.__name__)
if validate_data_quality:
result = _validate_data_quality(result, func.__name__)
return result
# Create node specification with automatic context detection
spec = NodeSpec(
func=wrapper,
inputs=inputs or [],
outputs=outputs or [],
cache_ttl=cache_ttl
)
# Attach metadata
wrapper._graphflow_spec = spec
wrapper._original_func = func
wrapper._tags = tags or []
wrapper._executor = executor
wrapper._retries = retries
wrapper._timeout = timeout
return spec
return decorator
def _validate_schema(df, func_name: str):
"""Validate DataFrame schema consistency."""
# Implementation for schema validation
return df
def _validate_data_quality(df, func_name: str):
"""Validate data quality metrics."""
# Implementation for DQ validation
return dfFlexible Dataset Handling: Supports multiple formats and automatic format detection.
# featureflow/core/dataset.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Tuple, List, Union
from pathlib import Path
@dataclass(frozen=True)
class Dataset:
"""Logical dataset with intelligent format handling."""
path: str
format: str = "auto" # auto-detect from path/content
partition_by: Tuple[str, ...] = ()
schema: Optional[dict] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
object.__setattr__(self, 'metadata', {})
def derived(self, suffix: str) -> Dataset:
"""Create derived dataset with suffix."""
return Dataset(
path=f"{self.path}{suffix}",
format=self.format,
partition_by=self.partition_by,
schema=self.schema,
metadata={**self.metadata, "derived_from": self.path}
)
def with_partitions(self, *partition_cols: str) -> Dataset:
"""Create dataset with different partitioning."""
return Dataset(
path=self.path,
format=self.format,
partition_by=tuple(partition_cols),
schema=self.schema,
metadata=self.metadata
)
def with_format(self, format: str) -> Dataset:
"""Create dataset with different format."""
return Dataset(
path=self.path,
format=format,
partition_by=self.partition_by,
schema=self.schema,
metadata=self.metadata
)
@property
def uri(self) -> str:
"""Full URI when combined with base_uri."""
return self.path
@property
def name(self) -> str:
"""Human-readable name."""
return Path(self.path).name
# Convenience factory
def dataset(path: str, *, format: str = "auto", partition_by: List[str] = None,
schema: dict = None, **metadata) -> Dataset:
"""Create dataset with optional metadata."""
return Dataset(
path=path,
format=format,
partition_by=tuple(partition_by or []),
schema=schema,
metadata=metadata
)Intelligent Pipeline Orchestration: Automatically detects dependencies and provides rich graph export capabilities.
# graphflow/core/pipeline.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Union
import networkx as nx
from .context import Context, ContextAnalyzer
from .decorators import NodeSpec
from .dataset import Dataset
from .scheduler import Scheduler
from .graph_exporter import GraphExporter
@dataclass
class Pipeline:
"""Intelligent data processing pipeline with automatic dependency detection."""
name: str
base_uri: str
context: Context
_graph: nx.DiGraph = field(default_factory=nx.DiGraph, init=False)
_nodes: Dict[str, NodeSpec] = field(default_factory=dict, init=False)
_scheduler: Optional[Scheduler] = field(default=None, init=False)
_context_analyzer: Optional[ContextAnalyzer] = field(default=None, init=False)
_graph_exporter: Optional[GraphExporter] = field(default=None, init=False)
def __post_init__(self):
self._scheduler = Scheduler(self)
self._context_analyzer = ContextAnalyzer(self._graph, self._nodes)
self._graph_exporter = GraphExporter(self)
def add_node(self, node_spec: NodeSpec) -> None:
"""Add a single node to the pipeline."""
if node_spec.name in self._nodes:
raise ValueError(f"Node {node_spec.name} already exists")
self._nodes[node_spec.name] = node_spec
self._graph.add_node(node_spec.name, spec=node_spec)
# Auto-detect dependencies from inputs/outputs
self._auto_detect_dependencies(node_spec)
# Validate DAG property
if not nx.is_directed_acyclic_graph(self._graph):
raise ValueError("Pipeline must be a DAG")
# Update context analyzer
self._context_analyzer = ContextAnalyzer(self._graph, self._nodes)
def add_nodes(self, node_specs: List[NodeSpec]) -> None:
"""Add multiple nodes to the pipeline."""
for spec in node_specs:
self.add_node(spec)
def _auto_detect_dependencies(self, node_spec: NodeSpec) -> None:
"""Automatically detect dependencies based on input/output datasets."""
for input_dataset in node_spec.inputs:
# Find nodes that output to this input
for other_name, other_spec in self._nodes.items():
if other_name == node_spec.name:
continue
for output_dataset in other_spec.outputs:
if self._datasets_match(input_dataset, output_dataset):
self._graph.add_edge(other_name, node_spec.name)
def _datasets_match(self, ds1: Dataset, ds2: Dataset) -> bool:
"""Check if two datasets represent the same logical data."""
return ds1.path == ds2.path or ds1.path in ds2.metadata.get('derived_from', '')
def export_graph(self,
format: str = "html",
output: str = None,
include_context: bool = True,
include_data_flow: bool = True,
show_cache_status: bool = True,
**kwargs) -> str:
"""
Export pipeline graph in various formats.
Formats: html, graphviz, mermaid, json, yaml
"""
return self._graph_exporter.export(
format=format,
output=output,
include_context=include_context,
include_data_flow=include_data_flow,
show_cache_status=show_cache_status,
**kwargs
)
def inspector(self) -> 'PipelineInspector':
"""Get pipeline inspector for real-time graph analysis."""
return PipelineInspector(self)
def set_node_executor(self, node_name: str, executor: str) -> None:
"""Set specific executor for a node (for hybrid execution)."""
if node_name not in self._nodes:
raise ValueError(f"Node {node_name} not found")
self._nodes[node_name]._executor = executor
def run(self,
targets: List[Union[str, NodeSpec]] = None,
where: Dict[str, Any] = None,
executor: str = "auto",
executor_config: Dict[str, Any] = None,
execution_hints: Dict[str, Any] = None,
max_workers: int = None,
validate_data: bool = True,
profile_data: bool = False,
cache_policy: str = "read_write",
dry_run: bool = False) -> PipelineResult:
"""Execute the pipeline with intelligent scheduling."""
target_names = []
if targets:
for target in targets:
if isinstance(target, str):
target_names.append(target)
elif isinstance(target, NodeSpec):
target_names.append(target.name)
else:
target_names.append(str(target))
else:
# Run all nodes if no targets specified
target_names = list(self._nodes.keys())
return self._scheduler.execute(
targets=target_names,
where=where or {},
executor=executor,
executor_config=executor_config or {},
execution_hints=execution_hints or {},
max_workers=max_workers,
validate_data=validate_data,
profile_data=profile_data,
cache_policy=cache_policy,
dry_run=dry_run
)
def describe(self) -> Dict[str, Any]:
"""Get pipeline description and statistics."""
context_deps = self._context_analyzer.analyze_context_dependencies()
return {
"name": self.name,
"nodes": len(self._nodes),
"dependencies": self._graph.number_of_edges(),
"complexity": nx.density(self._graph),
"longest_path": len(nx.dag_longest_path(self._graph)) if self._nodes else 0,
"context": {
"data_params": len(self.context.data),
"processing_params": len(self.context.processing),
"ml_params": len(self.context.ml),
},
"context_dependencies": {
node: list(deps) for node, deps in context_deps.items()
}
}
class PipelineInspector:
"""Real-time pipeline inspection and analysis."""
def __init__(self, pipeline: Pipeline):
self.pipeline = pipeline
def get_graph_state(self) -> 'GraphState':
"""Get current graph state."""
return GraphState(
node_count=len(self.pipeline._nodes),
edge_count=self.pipeline._graph.number_of_edges(),
cached_nodes=self._get_cached_nodes(),
dirty_nodes=self._get_dirty_nodes()
)
def _get_cached_nodes(self) -> List[str]:
"""Get list of cached nodes."""
# Implementation would check cache status
return []
def _get_dirty_nodes(self) -> List[str]:
"""Get list of dirty nodes that need recomputation."""
# Implementation would check dirty status
return []
@property
def nodes(self) -> Dict[str, 'NodeInfo']:
"""Get detailed information about each node."""
node_info = {}
context_deps = self.pipeline._context_analyzer.analyze_context_dependencies()
for node_name, node_spec in self.pipeline._nodes.items():
node_info[node_name] = NodeInfo(
name=node_name,
status="ready", # Would be computed from cache/execution status
context_dependencies=list(context_deps.get(node_name, set())),
data_dependencies=list(self.pipeline._graph.predecessors(node_name)),
cache_key="cache_key_placeholder",
last_execution=None
)
return node_info
def filter_nodes(self, **filters) -> 'SubGraph':
"""Filter nodes based on criteria."""
filtered_nodes = {}
for node_name, node_info in self.nodes.items():
include = True
if 'status' in filters and node_info.status != filters['status']:
include = False
if 'context_deps' in filters:
required_deps = filters['context_deps']
if not any(dep in node_info.context_dependencies for dep in required_deps):
include = False
if include:
filtered_nodes[node_name] = node_info
return SubGraph(filtered_nodes, self.pipeline)
@dataclass
class GraphState:
node_count: int
edge_count: int
cached_nodes: List[str]
dirty_nodes: List[str]
@dataclass
class NodeInfo:
name: str
status: str
context_dependencies: List[str]
data_dependencies: List[str]
cache_key: str
last_execution: Optional[str]
class SubGraph:
def __init__(self, nodes: Dict[str, NodeInfo], pipeline: Pipeline):
self.nodes = nodes
self.pipeline = pipeline
def export(self, output: str, format: str = "html") -> None:
"""Export filtered subgraph."""
# Create subgraph and export
pass
@dataclass
class PipelineResult:
"""Result of pipeline execution with metadata."""
pipeline_name: str
executed_nodes: List[str]
execution_time: float
cache_hits: int
cache_misses: int
artifacts_created: List[str]
validation_results: Dict[str, Any]
profiling_results: Optional[Dict[str, Any]] = None
def summary(self) -> str:
"""Human-readable execution summary."""
cache_rate = self.cache_hits / (self.cache_hits + self.cache_misses) * 100 if (self.cache_hits + self.cache_misses) > 0 else 0
return f"""
Pipeline: {self.pipeline_name}
Nodes executed: {len(self.executed_nodes)}
Execution time: {self.execution_time:.2f}s
Cache hit rate: {cache_rate:.1f}%
Artifacts created: {len(self.artifacts_created)}
""".strip()Multi-Backend Execution: Supports local, distributed, and cloud execution.
# featureflow/core/executors.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging
class Executor(ABC):
"""Base executor interface."""
@abstractmethod
def execute_feature(self, feature_spec: 'FeatureSpec', context: 'Context',
inputs: Dict[str, Any]) -> Any:
"""Execute a single feature."""
pass
@abstractmethod
def execute_batch(self, feature_batch: List['FeatureSpec'],
context: 'Context') -> Dict[str, Any]:
"""Execute a batch of features."""
pass
@abstractmethod
def get_resource_usage(self) -> Dict[str, float]:
"""Get current resource usage."""
pass
class LocalExecutor(Executor):
"""Local single-threaded executor."""
def execute_feature(self, feature_spec, context, inputs):
return feature_spec.func(context, **inputs)
def execute_batch(self, feature_batch, context):
results = {}
for spec in feature_batch:
# Load inputs for this feature
inputs = self._load_inputs(spec, context)
results[spec.name] = self.execute_feature(spec, context, inputs)
return results
def get_resource_usage(self):
return {"cpu": 0.0, "memory": 0.0}
def _load_inputs(self, spec, context):
# Implementation to load input datasets
return {}
class ThreadPoolExecutor(Executor):
"""Thread-based parallel executor."""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self._executor = ThreadPoolExecutor(max_workers=max_workers)
def execute_batch(self, feature_batch, context):
futures = {}
for spec in feature_batch:
inputs = self._load_inputs(spec, context)
future = self._executor.submit(self.execute_feature, spec, context, inputs)
futures[spec.name] = future
results = {}
for name, future in futures.items():
results[name] = future.result()
return results
class RayExecutor(Executor):
"""Ray-based distributed executor."""
def __init__(self, ray_address: Optional[str] = None):
try:
import ray
if not ray.is_initialized():
ray.init(address=ray_address)
self.ray = ray
except ImportError:
raise ImportError("Ray is required for RayExecutor")
def execute_feature(self, feature_spec, context, inputs):
@self.ray.remote
def remote_feature_exec(spec, ctx, inp):
return spec.func(ctx, **inp)
future = remote_feature_exec.remote(feature_spec, context, inputs)
return self.ray.get(future)
def execute_batch(self, feature_batch, context):
@self.ray.remote
def remote_batch_exec(specs, ctx):
results = {}
for spec in specs:
inputs = self._load_inputs(spec, ctx)
results[spec.name] = spec.func(ctx, **inputs)
return results
future = remote_batch_exec.remote(feature_batch, context)
return self.ray.get(future)
class CloudExecutor(Executor):
"""Base class for cloud executors (Vertex AI, AWS Batch, etc.)."""
def __init__(self, cloud_config: Dict[str, Any]):
self.cloud_config = cloud_config
@abstractmethod
def submit_remote_job(self, feature_spec, context, inputs) -> str:
"""Submit job to cloud service and return job ID."""
pass
@abstractmethod
def wait_for_completion(self, job_id: str) -> Any:
"""Wait for job completion and return results."""
pass
# Executor registry
EXECUTORS = {
"local": LocalExecutor,
"threads": ThreadPoolExecutor,
"ray": RayExecutor,
}
def get_executor(name: str, **kwargs) -> Executor:
"""Get executor by name."""
if name not in EXECUTORS:
raise ValueError(f"Unknown executor: {name}")
return EXECUTORS[name](**kwargs)Scenario: Build comprehensive customer features for churn prediction using transaction, behavioral, and demographic data.
from featureflow import Pipeline, feature, context, dataset
import pandas as pd
import numpy as np
# Define feature engineering context
ctx = context(
# Time windows for feature computation
short_window=7,
medium_window=30,
long_window=90,
# Business rules
min_transaction_amount=5.0,
high_value_threshold=500.0,
churn_definition_days=60,
# ML parameters
target_col="will_churn",
categorical_encoding_threshold=50,
# Infrastructure
cache_ttl="24h",
validation_strict=True
)
# Create pipeline
pipeline = Pipeline(
name="customer_churn_features",
base_uri="s3://ml-features/churn/",
context=ctx
)
# Raw datasets
customers = dataset("bronze/customers", partition_by=["signup_month"])
transactions = dataset("bronze/transactions", partition_by=["date", "customer_id"])
website_events = dataset("bronze/website_events", partition_by=["date"])
support_tickets = dataset("bronze/support_tickets", partition_by=["created_date"])
# Feature 1: Customer Demographics & Account Features
@feature(
inputs=[customers],
outputs=[dataset("silver/customer_demographics")]
)
def engineer_customer_demographics(df: pd.DataFrame,
categorical_encoding_threshold: int) -> pd.DataFrame:
"""Engineer demographic and account-based features."""
# Age-based features
df['account_age_days'] = (pd.Timestamp.now() - pd.to_datetime(df['signup_date'])).dt.days
df['account_age_months'] = df['account_age_days'] / 30.44
df['is_new_customer'] = df['account_age_days'] < 30
# Geographic features
df['is_major_city'] = df['city'].isin(['New York', 'Los Angeles', 'Chicago', 'Houston'])
# Categorical encoding based on cardinality
for col in ['state', 'city', 'acquisition_channel']:
if col in df.columns:
cardinality = df[col].nunique()
if cardinality > categorical_encoding_threshold:
# High cardinality: use frequency encoding
freq_map = df[col].value_counts(normalize=True).to_dict()
df[f'{col}_frequency'] = df[col].map(freq_map)
else:
# Low cardinality: use one-hot encoding
dummies = pd.get_dummies(df[col], prefix=col)
df = pd.concat([df, dummies], axis=1)
return df
# Feature 2: Transaction-based Features
@feature(
inputs=[transactions],
outputs=[dataset("silver/transaction_features", partition_by=["customer_id"])],
context_deps=["short_window", "medium_window", "long_window", "min_transaction_amount"]
)
def engineer_transaction_features(df: pd.DataFrame,
short_window: int,
medium_window: int,
long_window: int,
min_transaction_amount: float) -> pd.DataFrame:
"""Comprehensive transaction-based feature engineering."""
# Filter valid transactions
df = df[df['amount'] >= min_transaction_amount].copy()
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
features = []
for customer_id in df['customer_id'].unique():
cust_data = df[df['customer_id'] == customer_id].sort_values('transaction_date')
if len(cust_data) < 3: # Skip customers with too few transactions
continue
cust_features = {'customer_id': customer_id}
# Rolling window features for each time period
for window_days, window_name in [(short_window, 'short'),
(medium_window, 'medium'),
(long_window, 'long')]:
# Set up rolling window
window_data = cust_data.set_index('transaction_date').rolling(f'{window_days}D')
# Transaction frequency features
cust_features[f'transaction_count_{window_name}'] = len(cust_data.tail(window_days))
cust_features[f'avg_days_between_transactions_{window_name}'] = (
cust_data['transaction_date'].diff().dt.days.tail(window_days).mean()
)
# Amount-based features
recent_amounts = cust_data['amount'].tail(window_days)
cust_features[f'total_amount_{window_name}'] = recent_amounts.sum()
cust_features[f'avg_amount_{window_name}'] = recent_amounts.mean()
cust_features[f'std_amount_{window_name}'] = recent_amounts.std()
cust_features[f'max_amount_{window_name}'] = recent_amounts.max()
# Behavioral patterns
cust_features[f'unique_categories_{window_name}'] = (
cust_data['category'].tail(window_days).nunique()
)
cust_features[f'weekend_transaction_ratio_{window_name}'] = (
cust_data.tail(window_days)['transaction_date'].dt.weekday.isin([5, 6]).mean()
)
# Trend features (comparing windows)
cust_features['amount_trend_short_vs_medium'] = (
cust_features['avg_amount_short'] / (cust_features['avg_amount_medium'] + 1e-6)
)
cust_features['frequency_trend_short_vs_long'] = (
cust_features['transaction_count_short'] / (cust_features['transaction_count_long'] + 1e-6)
)
features.append(cust_features)
return pd.DataFrame(features)
# Feature 3: Behavioral/Engagement Features
@feature(
inputs=[website_events],
outputs=[dataset("silver/behavioral_features", partition_by=["customer_id"])],
context_deps=["medium_window", "long_window"]
)
def engineer_behavioral_features(df: pd.DataFrame,
medium_window: int,
long_window: int) -> pd.DataFrame:
"""Web engagement and behavioral features."""
df['event_date'] = pd.to_datetime(df['event_date'])
features = []
for customer_id in df['customer_id'].unique():
cust_events = df[df['customer_id'] == customer_id].sort_values('event_date')
cust_features = {'customer_id': customer_id}
for window_days, window_name in [(medium_window, 'medium'), (long_window, 'long')]:
recent_events = cust_events.tail(window_days)
# Session-based features
cust_features[f'total_sessions_{window_name}'] = recent_events['session_id'].nunique()
cust_features[f'avg_session_duration_{window_name}'] = (
recent_events.groupby('session_id')['session_duration'].first().mean()
)
# Page engagement
cust_features[f'total_page_views_{window_name}'] = len(recent_events)
cust_features[f'unique_pages_viewed_{window_name}'] = recent_events['page_url'].nunique()
# Product interaction
product_events = recent_events[recent_events['event_type'] == 'product_view']
cust_features[f'products_viewed_{window_name}'] = product_events['product_id'].nunique()
# Cart and checkout behavior
cart_events = recent_events[recent_events['event_type'] == 'add_to_cart']
cust_features[f'cart_additions_{window_name}'] = len(cart_events)
checkout_events = recent_events[recent_events['event_type'] == 'checkout_start']
cust_features[f'checkout_starts_{window_name}'] = len(checkout_events)
# Engagement ratios
if cust_features[f'total_sessions_{window_name}'] > 0:
cust_features[f'pages_per_session_{window_name}'] = (
cust_features[f'total_page_views_{window_name}'] /
cust_features[f'total_sessions_{window_name}']
)
features.append(cust_features)
return pd.DataFrame(features)
# Feature 4: Customer Support Features
@feature(
inputs=[support_tickets],
outputs=[dataset("silver/support_features", partition_by=["customer_id"])],
context_deps=["long_window"]
)
def engineer_support_features(df: pd.DataFrame, long_window: int) -> pd.DataFrame:
"""Customer support interaction features."""
df['created_date'] = pd.to_datetime(df['created_date'])
features = []
for customer_id in df['customer_id'].unique():
tickets = df[df['customer_id'] == customer_id].sort_values('created_date')
recent_tickets = tickets.tail(long_window)
cust_features = {
'customer_id': customer_id,
'total_tickets': len(recent_tickets),
'avg_resolution_time': recent_tickets['resolution_time_hours'].mean(),
'unresolved_tickets': len(recent_tickets[recent_tickets['status'] == 'open']),
'escalated_tickets': len(recent_tickets[recent_tickets['escalated'] == True]),
'satisfaction_score': recent_tickets['satisfaction_rating'].mean(),
}
# Categorize ticket types
for ticket_type in recent_tickets['category'].unique():
cust_features[f'tickets_{ticket_type.lower()}'] = len(
recent_tickets[recent_tickets['category'] == ticket_type]
)
features.append(cust_features)
return pd.DataFrame(features)
# Feature 5: Final Feature Matrix Assembly
@feature(
inputs=[
dataset("silver/customer_demographics"),
dataset("silver/transaction_features"),
dataset("silver/behavioral_features"),
dataset("silver/support_features")
],
outputs=[dataset("gold/customer_feature_matrix")],
context_deps=["target_col", "churn_definition_days"]
)
def create_final_feature_matrix(demo_df: pd.DataFrame,
trans_df: pd.DataFrame,
behav_df: pd.DataFrame,
support_df: pd.DataFrame,
target_col: str,
churn_definition_days: int) -> pd.DataFrame:
"""Assemble final feature matrix with target variable."""
# Start with demographics as base
feature_matrix = demo_df.copy()
# Merge all feature sets
for df_to_merge in [trans_df, behav_df, support_df]:
feature_matrix = feature_matrix.merge(
df_to_merge,
on='customer_id',
how='left'
)
# Fill missing values with appropriate defaults
numeric_cols = feature_matrix.select_dtypes(include=[np.number]).columns
feature_matrix[numeric_cols] = feature_matrix[numeric_cols].fillna(0)
# Create target variable (churn prediction)
# In real scenario, this would be computed based on future behavior
feature_matrix[target_col] = np.random.choice([0, 1], size=len(feature_matrix), p=[0.8, 0.2])
# Feature engineering metadata
feature_matrix['feature_generation_date'] = pd.Timestamp.now()
feature_matrix['total_features'] = len(feature_matrix.columns) - 3 # Exclude ID, target, date
return feature_matrix
# Assemble and run the pipeline
pipeline.add_features([
engineer_customer_demographics,
engineer_transaction_features,
engineer_behavioral_features,
engineer_support_features,
create_final_feature_matrix
])
# Execute with intelligent scheduling
result = pipeline.run(
targets=["gold/customer_feature_matrix"],
executor="ray",
max_workers=16,
validate_features=True,
profile_data=True
)
print(result.summary())Scenario: Real-time credit risk features for loan applications.
from featureflow import Pipeline, feature, context, dataset
from featureflow.patterns import time_series_features, aggregation_features
import pandas as pd
# Risk assessment context
risk_ctx = context(
# Risk windows
immediate_window=7,
short_risk_window=30,
medium_risk_window=90,
long_risk_window=365,
# Risk thresholds
high_risk_amount=10000,
default_rate_threshold=0.05,
credit_score_threshold=650,
# Regulatory requirements
pii_anonymization=True,
audit_trail=True,
# Model parameters
target_col="default_risk_score",
feature_importance_threshold=0.01
)
pipeline = Pipeline(
name="credit_risk_features",
base_uri="s3://risk-models/features/",
context=risk_ctx
)
# Data sources
applications = dataset("bronze/loan_applications", partition_by=["application_date"])
credit_history = dataset("bronze/credit_bureau", partition_by=["customer_id"])
bank_transactions = dataset("bronze/bank_statements", partition_by=["date", "account_id"])
employment_data = dataset("bronze/employment_verification")
@feature(
inputs=[applications, credit_history],
outputs=[dataset("silver/credit_profile_features")]
)
@aggregation_features(
group_by="customer_id",
agg_specs={
"credit_history": {
"payment_amount": ["sum", "mean", "std"],
"days_late": ["max", "mean", "count"],
"credit_utilization": ["mean", "max"]
}
},
time_windows=["30d", "90d", "365d"]
)
def compute_credit_profile_features(apps_df: pd.DataFrame,
credit_df: pd.DataFrame) -> pd.DataFrame:
"""Credit bureau and application-based risk features."""
# Implementation handled by aggregation_features decorator
pass
@feature(
inputs=[bank_transactions],
outputs=[dataset("silver/cash_flow_features", partition_by=["customer_id"])]
)
@time_series_features(
timestamp_col="transaction_date",
group_by="customer_id",
windows=["7d", "30d", "90d"],
aggregations=["sum", "mean", "std", "count"]
)
def compute_cash_flow_features(df: pd.DataFrame,
high_risk_amount: float) -> pd.DataFrame:
"""Bank transaction cash flow analysis."""
# Additional custom logic beyond the decorator
df['is_high_risk_transaction'] = df['amount'] > high_risk_amount
df['transaction_velocity'] = df.groupby('customer_id')['amount'].rolling(7).count()
return df
# Run risk pipeline
result = pipeline.run(
targets=["silver/credit_profile_features", "silver/cash_flow_features"],
executor="threads",
max_workers=8,
validate_features=True
)Deliverables:
- Context system with intelligent parameter resolution
- Feature decorator with basic validation
- Dataset abstraction with format detection
- Local executor implementation
- Basic pipeline orchestration
Package Structure:
graphflow/
├── __init__.py # Public API exports
├── core/
│ ├── __init__.py
│ ├── context.py # Automatic context system
│ ├── decorators.py # @node decorator with auto-detection
│ ├── dataset.py # Dataset abstraction
│ ├── pipeline.py # Pipeline orchestration with graph exports
│ ├── executors.py # Execution backends
│ ├── scheduler.py # Task scheduling
│ └── graph_exporter.py # Graph export utilities
├── patterns/ # Processing patterns (not just ML)
│ ├── __init__.py
│ ├── time_series.py # Time series processing
│ ├── aggregation.py # Data aggregation
│ ├── transformation.py # Data transformation
│ ├── validation.py # Data validation
│ └── ml_features.py # ML-specific feature engineering
├── io/ # Data I/O adapters
│ ├── __init__.py
│ ├── parquet.py # Parquet adapter
│ ├── arrow.py # Arrow adapter
│ └── formats.py # Format detection
├── cache/ # Caching system
│ ├── __init__.py
│ ├── manager.py # Cache manager
│ ├── stores.py # Cache stores (FS, Redis, S3)
│ └── hashing.py # Content addressing
├── validation/ # Data validation
│ ├── __init__.py
│ ├── schema.py # Schema validation
│ ├── quality.py # Data quality checks
│ └── profiling.py # Data profiling
├── observability/ # Monitoring & reporting
│ ├── __init__.py
│ ├── logging.py # Structured logging
│ ├── metrics.py # Prometheus metrics
│ ├── tracing.py # OpenTelemetry tracing
│ └── reporting.py # HTML reports
├── integrations/ # External integrations
│ ├── __init__.py
│ ├── feast.py # Feast feature store
│ ├── sagemaker.py # AWS SageMaker
│ └── vertex.py # Google Vertex AI
├── cli/ # Command line interface
│ ├── __init__.py
│ ├── main.py # CLI entry point
│ ├── run.py # Pipeline execution
│ └── visualize.py # Pipeline visualization
└── tests/ # Test suite
├── unit/ # Unit tests
├── integration/ # Integration tests
└── examples/ # Example pipelines
Deliverables:
- Ray/Dask distributed execution
- Advanced caching with TTL and eviction
- Feature pattern decorators
- Data validation and profiling
- HTML reporting and visualization
Deliverables:
- Cloud executors (Vertex AI, AWS Batch)
- Feature store integrations
- CLI interface
- Comprehensive monitoring
- Performance optimizations
Deliverables:
- Advanced security and governance
- Multi-tenant support
- API server for pipeline management
- Advanced scheduling and orchestration
- Enterprise integrations
Core Dependencies:
# setup.py or pyproject.toml
dependencies = [
"pandas>=2.0.0",
"polars>=0.19.0",
"pyarrow>=12.0.0",
"networkx>=3.0",
"fsspec>=2023.6.0",
"pydantic>=2.0.0",
"loguru>=0.7.0",
"cloudpickle>=2.2.0",
"blake3>=0.3.0", # Fast hashing
"typer>=0.9.0", # CLI framework
]
# Optional dependencies
extras_require = {
"distributed": ["ray>=2.6.0", "dask>=2023.8.0"],
"cloud": ["google-cloud-aiplatform", "boto3", "azure-ml"],
"visualization": ["pyvis>=0.3.0", "graphviz>=0.20.0", "jinja2>=3.1.0"],
"feature-store": ["feast>=0.32.0"],
"monitoring": ["prometheus-client>=0.17.0", "opentelemetry-api>=1.19.0"],
"validation": ["great-expectations>=0.17.0", "pandera>=0.16.0"],
"all": ["ray", "dask", "feast", "great-expectations", "pyvis", "prometheus-client"]
}| Metric | Target | Notes |
|---|---|---|
| Context Resolution | < 1ms per function | Parameter injection overhead |
| Feature Execution | 10K+ rows/sec/core | Single-threaded performance |
| Cache Lookup | < 10ms | Including hash computation |
| Pipeline Startup | < 5 seconds | DAG analysis and planning |
| Memory Overhead | < 100MB base | Framework footprint |
| Distributed Scaling | Linear to 100+ nodes | Ray/Dask scaling |
Code Quality:
- 90%+ test coverage
- Type hints for all public APIs
- Black code formatting
- Pylint score > 8.5
- No security vulnerabilities (Bandit)
Performance:
- Benchmark suite for core operations
- Memory profiling for large datasets
- Distributed execution tests
- Cache efficiency measurements
Documentation:
- API documentation (Sphinx)
- User guide with examples
- Architecture decision records
- Performance tuning guide
- Time to first feature: < 15 minutes from install to running pipeline
- Learning curve: Data scientist productive in < 2 hours
- API satisfaction: 90%+ positive feedback on ease of use
- Execution efficiency: 80%+ cache hit rate in typical workflows
- Scaling: Linear performance scaling to 100+ cores
- Resource utilization: < 20% overhead vs direct pandas/polars
- Reliability: 99.9% successful pipeline executions
- Observability: Complete visibility into pipeline health
- Maintenance: < 1 hour/week operational overhead
- Community: 1000+ GitHub stars in first 6 months
- Enterprise: 10+ production deployments
- Ecosystem: Integration with 5+ major ML platforms
# Basic installation
pip install graphflow
# With all optional dependencies
pip install "graphflow[all]"
# For distributed execution
pip install "graphflow[distributed]"
# For cloud execution
pip install "graphflow[cloud]"from graphflow import Pipeline, node, context, dataset
import pandas as pd
# Create context - parameters automatically flow where needed
ctx = context(
lookback_days=30,
min_samples=100,
target_col="churn"
)
# Create pipeline
pipeline = Pipeline(
name="my_data_pipeline",
base_uri="s3://my-bucket/data/",
context=ctx
)
# Define processing node - context params auto-detected and injected!
@node(
inputs=[dataset("raw/customers")],
outputs=[dataset("processed/customer_features")]
)
def process_customers(df: pd.DataFrame,
lookback_days: int, # Auto-injected from context
min_samples: int) -> pd.DataFrame: # Auto-injected from context
# Context automatically provides lookback_days and min_samples
return df.groupby('customer_id').tail(lookback_days)
# Add to pipeline and run
pipeline.add_node(process_customers)
# Choose execution backend dynamically
result = pipeline.run(
executor="auto" # or "local", "ray", "cloud"
)
print(result.summary())
# Export pipeline graph
pipeline.export_graph(format="html", output="my_pipeline.html")This comprehensive PRD provides a complete blueprint for building GraphFlow as a production-ready data processing framework with automatic context management, intelligent graph exports, and flexible execution - perfect for any data workflow from ETL to ML feature engineering!