Skip to content

Latest commit

 

History

History
1875 lines (1532 loc) · 63.8 KB

File metadata and controls

1875 lines (1532 loc) · 63.8 KB

GraphFlow — Intelligent Graph-Based Data Processing Framework

Version: 2.0
Focus: Intelligent Data Processing Pipelines with Automatic Context Management
Author: Piotr Laczkowski
Date: October 9, 2025


🎯 Vision & Mission

GraphFlow is a production-ready Python framework designed for intelligent data processing pipelines that automatically manage context and dependencies while scaling from prototype to production without code changes.

Core Promise

  • Zero-friction pipeline development: Write pandas/polars code, get distributed execution
  • Automatic context resolution: Smart parameter detection and dependency injection based on graph analysis
  • Production-grade reliability: Built-in caching, retries, monitoring, and data quality validation
  • Flexible execution: Seamlessly choose between local, distributed, or cloud execution

Design Philosophy

  1. Intelligence First: Automatic context detection and dependency resolution
  2. Developer Experience: Minimal boilerplate, maximum productivity
  3. Scale Transparently: Same code runs on laptop or 1000-node cluster
  4. Production Ready: Monitoring, caching, and reliability built-in
  5. Broad Applicability: Optimized for any data processing workflow - ETL, ML features, analytics, etc.

🚀 Intelligent Data Processing API

1. Automatic Context Detection System

Smart Context Resolution: Parameters and dependencies are automatically detected through graph analysis - no manual specification needed.

from graphflow import Pipeline, node, context, dataset
import pandas as pd

# Define global context - parameters automatically flow where needed
ctx = context(
    # Data processing context
    env="prod",
    date_range=("2024-01-01", "2024-12-31"),
    
    # Processing parameters - automatically detected based on usage
    lookback_days=30,
    min_samples=100,
    categorical_threshold=10,
    
    # ML context (when doing feature engineering)
    target_col="churn",
    
    # Infrastructure context
    cache_ttl="7d",
    validation_strict=True
)

# Create pipeline with intelligent context
pipeline = Pipeline(
    name="data_processing_pipeline",
    base_uri="s3://data-lake/",
    context=ctx  # Automatically analyzed and distributed
)

2. Zero-Configuration Context Flow

Automatic Parameter Detection: Context parameters are automatically injected based on function signatures and graph dependencies.

# Raw datasets
customers = dataset("bronze/customers", partition_by=["date"])
transactions = dataset("bronze/transactions", partition_by=["date", "customer_id"])
events = dataset("bronze/events", partition_by=["date"])

@node(inputs=[customers], outputs=[customers.derived("_enriched")])
def enrich_customers(df: pd.DataFrame, 
                    categorical_threshold: int,  # Auto-detected from context
                    date_range: tuple) -> pd.DataFrame:
    """Enrich customer data - context params automatically injected."""
    df = df.copy()
    
    # Parameters automatically available - no manual wiring needed
    start_date, end_date = date_range
    df = df[(df['signup_date'] >= start_date) & (df['signup_date'] <= end_date)]
    
    # Automatic context-aware processing
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() > categorical_threshold:
            df[f"{col}_encoded"] = df[col].astype('category').cat.codes
    
    return df

@node(
    inputs=[transactions], 
    outputs=[dataset("silver/transaction_aggregates", partition_by=["customer_id"])]
    # No context_deps needed - automatically detected!
)
def compute_transaction_features(df: pd.DataFrame, 
                               lookback_days: int,      # Auto-injected
                               min_samples: int) -> pd.DataFrame:   # Auto-injected
    """Compute transaction features - context flows automatically."""
    
    features = []
    for customer_id in df['customer_id'].unique():
        cust_data = df[df['customer_id'] == customer_id].sort_values('transaction_date')
        
        # Context parameters automatically available
        if len(cust_data) < min_samples:
            continue
            
        # Use lookback_days automatically provided from context
        recent_data = cust_data.tail(lookback_days)
        
        cust_features = {
            'customer_id': customer_id,
            f'transaction_count_{lookback_days}d': len(recent_data),
            f'avg_amount_{lookback_days}d': recent_data['amount'].mean(),
        }
        
        features.append(cust_features)
    
    return pd.DataFrame(features)

# Chain nodes with automatic context flow - no manual dependency management!
@node(
    inputs=[customers.derived("_enriched"), dataset("silver/transaction_aggregates")],
    outputs=[dataset("gold/customer_data_mart")]
)
def create_data_mart(customer_df: pd.DataFrame, 
                    transaction_features: pd.DataFrame,
                    target_col: str) -> pd.DataFrame:    # Auto-injected from context
    """Merge all data - target_col automatically provided."""
    
    # Context automatically provides target_col and other needed parameters
    data_mart = customer_df.merge(
        transaction_features, 
        on='customer_id', 
        how='left'
    )
    
    # Target column handling from context
    if target_col in data_mart.columns:
        cols = [c for c in data_mart.columns if c != target_col] + [target_col]
        data_mart = data_mart[cols]
    
    return data_mart

3. Automatic Pipeline Assembly with Smart Execution Choice

# Pipeline automatically detects dependencies and context flow
pipeline.add_nodes([
    enrich_customers,
    compute_transaction_features, 
    create_data_mart
])

# Flexible execution - choose your backend
result = pipeline.run(
    targets=["gold/customer_data_mart"],
    where={"date": "2024-10-01"},  # Partition filter
    
    # Choose execution backend dynamically
    executor="auto",        # Auto-select best executor
    # executor="local",     # Force local execution
    # executor="ray",       # Force Ray distributed
    # executor="cloud",     # Force cloud execution
    
    max_workers=16,
    validate_data=True,     # Built-in validation
    profile_data=True       # Automatic profiling
)

4. Dynamic Graph Exports and Visualization

Intelligent Graph Interface: Export and visualize your pipeline in multiple formats with rich interactivity.

# Dynamic graph exports - choose your format
pipeline.export_graph(
    format="html",           # Interactive HTML with PyVis
    output="pipeline.html",
    include_context=True,    # Show context flow
    include_data_flow=True,  # Show data dependencies
    show_cache_status=True   # Highlight cached vs dirty nodes
)

# Export to different formats
pipeline.export_graph(format="graphviz", output="pipeline.dot")  # GraphViz DOT
pipeline.export_graph(format="mermaid", output="pipeline.md")    # Mermaid diagram
pipeline.export_graph(format="json", output="pipeline.json")     # JSON graph structure
pipeline.export_graph(format="yaml", output="pipeline.yaml")     # YAML config

# Real-time pipeline inspection
inspector = pipeline.inspector()

# Get live graph state
graph_state = inspector.get_graph_state()
print(f"Nodes: {graph_state.node_count}")
print(f"Edges: {graph_state.edge_count}")
print(f"Cached nodes: {graph_state.cached_nodes}")
print(f"Dirty nodes: {graph_state.dirty_nodes}")

# Interactive graph exploration
for node_name, node_info in inspector.nodes.items():
    print(f"Node: {node_name}")
    print(f"  Status: {node_info.status}")
    print(f"  Context deps: {node_info.context_dependencies}")
    print(f"  Data deps: {node_info.data_dependencies}")
    print(f"  Cache key: {node_info.cache_key}")
    print(f"  Last run: {node_info.last_execution}")

# Dynamic graph filtering and querying
subgraph = inspector.filter_nodes(
    status="dirty",                    # Only dirty nodes
    tags=["feature_engineering"],      # Nodes with specific tags
    context_deps=["lookback_days"]     # Nodes using specific context
)

# Export filtered subgraph
subgraph.export("dirty_nodes.html", format="html")

5. Flexible Execution Backends

Smart Executor Selection: Choose the right execution backend for your workload automatically or manually.

# Automatic executor selection based on workload characteristics
result = pipeline.run(
    executor="auto",  # GraphFlow chooses best executor
    execution_hints={
        "data_size_gb": 50,        # Help auto-selection
        "cpu_intensive": True,     # CPU vs I/O bound
        "memory_intensive": False, # Memory requirements
        "requires_gpu": False,     # GPU acceleration needed
        "max_runtime_hours": 2     # Time constraints
    }
)

# Manual executor selection with detailed configuration
executors = {
    # Local execution options
    "local_threads": {
        "type": "local",
        "backend": "threads",
        "max_workers": 8,
        "memory_limit": "16GB"
    },
    
    "local_processes": {
        "type": "local", 
        "backend": "processes",
        "max_workers": 4,
        "memory_per_worker": "4GB"
    },
    
    # Distributed execution
    "ray_cluster": {
        "type": "distributed",
        "backend": "ray",
        "ray_address": "ray://head-node:10001",
        "max_workers": 100,
        "resources_per_worker": {"CPU": 2, "memory": 8000000000}
    },
    
    "dask_cluster": {
        "type": "distributed",
        "backend": "dask", 
        "scheduler_address": "tcp://scheduler:8786",
        "max_workers": 50
    },
    
    # Cloud execution
    "vertex_ai": {
        "type": "cloud",
        "backend": "vertex",
        "project": "my-project",
        "region": "us-central1",
        "machine_type": "n1-standard-8",
        "accelerator": {"type": "NVIDIA_TESLA_T4", "count": 1}
    },
    
    "aws_batch": {
        "type": "cloud",
        "backend": "batch",
        "job_queue": "my-compute-queue",
        "job_definition": "graphflow-job-def",
        "vcpus": 8,
        "memory": 32768
    }
}

# Use specific executor
result = pipeline.run(
    executor="ray_cluster",
    executor_config=executors["ray_cluster"]
)

# Hybrid execution - different nodes on different executors
pipeline.set_node_executor("heavy_computation", "vertex_ai")
pipeline.set_node_executor("light_processing", "local_threads")
pipeline.set_node_executor("gpu_inference", "ray_cluster")

result = pipeline.run()  # Uses mixed execution

6. Broad Application Patterns

Versatile Processing Patterns: GraphFlow supports any data processing workflow, not just feature engineering.

from graphflow.patterns import (
    time_series_processing, aggregation_processing, transformation_processing,
    validation_processing, ml_feature_processing
)

# ETL Pipeline Example
@node(inputs=[raw_sales], outputs=[clean_sales])
@validation_processing(
    null_checks=["customer_id", "amount", "date"],
    range_checks={"amount": (0, 1000000)},
    format_checks={"date": "YYYY-MM-DD"}
)
def clean_sales_data(df: pd.DataFrame) -> pd.DataFrame:
    """Data cleaning with automatic validation."""
    pass  # Validation pattern handles the implementation

# Analytics Pipeline Example  
@node(inputs=[clean_sales], outputs=[sales_metrics])
@aggregation_processing(
    group_by=["region", "product_category"],
    agg_specs={
        "amount": ["sum", "mean", "count"],
        "quantity": ["sum", "mean"]
    },
    time_windows=["daily", "weekly", "monthly"]
)
def compute_sales_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """Business analytics aggregation."""
    pass  # Aggregation pattern handles the implementation

# Time Series Analysis Pipeline
@node(inputs=[sensor_data], outputs=[processed_timeseries])
@time_series_processing(
    timestamp_col="timestamp",
    value_cols=["temperature", "pressure", "humidity"],
    operations=["smoothing", "anomaly_detection", "forecasting"],
    window_size="1h",
    forecast_horizon="24h"
)
def process_sensor_data(df: pd.DataFrame) -> pd.DataFrame:
    """Time series processing and forecasting."""
    pass  # Time series pattern handles the implementation

# ML Feature Engineering (when needed)
@node(inputs=[customer_data], outputs=[customer_features])
@ml_feature_processing(
    categorical_encoding="auto",
    numerical_scaling="robust",
    feature_selection=True,
    interaction_features=True
)
def create_ml_features(df: pd.DataFrame, target_col: str) -> pd.DataFrame:
    """ML-specific feature engineering."""
    pass  # ML pattern handles the implementation

# Data Transformation Pipeline
@node(inputs=[raw_logs], outputs=[structured_events])
@transformation_processing(
    parsers={"log_line": "regex", "timestamp": "datetime"},
    extractors=["ip_address", "user_agent", "status_code"],
    enrichers=["geo_location", "device_type"]
)
def parse_log_data(df: pd.DataFrame) -> pd.DataFrame:
    """Log parsing and enrichment."""
    pass  # Transformation pattern handles the implementation

7. Universal Context Management

Context for Any Workflow: Context system works for any data processing scenario.

# ETL Context
etl_ctx = context(
    # Data quality rules
    null_tolerance=0.05,
    outlier_threshold=3.0,
    
    # Processing windows
    batch_size=10000,
    processing_date="2024-10-01",
    
    # Infrastructure
    memory_limit="8GB",
    timeout_minutes=30
)

# Analytics Context
analytics_ctx = context(
    # Business rules
    fiscal_year_start="2024-01-01",
    currency="USD",
    
    # Aggregation rules
    min_sample_size=100,
    confidence_level=0.95,
    
    # Reporting
    report_format="daily",
    include_trends=True
)

# ML Context
ml_ctx = context(
    # Feature engineering
    lookback_days=30,
    min_samples=1000,
    feature_selection_threshold=0.01,
    
    # Model parameters
    target_col="conversion",
    test_size=0.2,
    random_state=42,
    
    # Validation
    cv_folds=5,
    scoring_metric="auc"
)

# Time Series Context
ts_ctx = context(
    # Time series parameters
    frequency="1H",
    seasonality="daily",
    trend_detection=True,
    
    # Forecasting
    forecast_horizon=24,
    confidence_intervals=[0.8, 0.95],
    
    # Anomaly detection
    anomaly_threshold=2.5,
    min_anomaly_duration="30min"
)

🏗️ Core Architecture & Implementation

Automatic Context Detection System

Intelligent Context Resolution Engine: Automatically analyzes graph dependencies and function signatures to inject the right parameters.

# graphflow/core/context.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Callable, get_type_hints, Set
from inspect import signature
import networkx as nx
import logging

@dataclass(frozen=True)
class Context:
    """Immutable context with automatic parameter resolution."""
    
    # Core context categories
    data: Dict[str, Any] = field(default_factory=dict)
    processing: Dict[str, Any] = field(default_factory=dict) 
    ml: Dict[str, Any] = field(default_factory=dict)
    infrastructure: Dict[str, Any] = field(default_factory=dict)
    runtime: Dict[str, Any] = field(default_factory=dict)
    
    def evolve(self, **updates) -> Context:
        """Create new context with updates."""
        new_data = {**self.data, **updates.get('data', {})}
        new_processing = {**self.processing, **updates.get('processing', {})}
        new_ml = {**self.ml, **updates.get('ml', {})}
        new_infra = {**self.infrastructure, **updates.get('infrastructure', {})}
        new_runtime = {**self.runtime, **updates.get('runtime', {})}
        
        # Auto-categorize based on parameter name patterns
        for key, value in updates.items():
            if key not in ['data', 'processing', 'ml', 'infrastructure', 'runtime']:
                category = self._auto_categorize_param(key)
                if category == 'processing':
                    new_processing[key] = value
                elif category == 'ml':
                    new_ml[key] = value
                elif category == 'infrastructure':
                    new_infra[key] = value
                elif category == 'runtime':
                    new_runtime[key] = value
                else:
                    new_data[key] = value
        
        return Context(
            data=new_data,
            processing=new_processing,
            ml=new_ml,
            infrastructure=new_infra,
            runtime=new_runtime
        )
    
    def _auto_categorize_param(self, param_name: str) -> str:
        """Automatically categorize parameter based on name patterns."""
        processing_patterns = ['_days', '_window', '_threshold', '_samples', '_size', 'lookback', 'batch']
        ml_patterns = ['_col', 'target', 'model', 'feature', 'score', 'cv_', 'test_']
        infra_patterns = ['_uri', '_config', 'cache', 'memory', 'timeout', 'workers']
        runtime_patterns = ['env', 'debug', 'verbose', 'log_']
        
        param_lower = param_name.lower()
        
        if any(pattern in param_lower for pattern in processing_patterns):
            return 'processing'
        elif any(pattern in param_lower for pattern in ml_patterns):
            return 'ml'
        elif any(pattern in param_lower for pattern in infra_patterns):
            return 'infrastructure'
        elif any(pattern in param_lower for pattern in runtime_patterns):
            return 'runtime'
        else:
            return 'data'
    
    def resolve_for_function(self, func: Callable) -> Dict[str, Any]:
        """Resolve context parameters for a specific function signature."""
        sig = signature(func)
        type_hints = get_type_hints(func)
        resolved = {}
        
        # Flatten all context parameters
        all_params = {
            **self.data,
            **self.processing, 
            **self.ml,
            **self.infrastructure,
            **self.runtime
        }
        
        for param_name, param in sig.parameters.items():
            # Skip data parameters (DataFrames, datasets, etc.)
            if self._is_data_parameter(param_name, type_hints.get(param_name)):
                continue
                
            if param_name in all_params:
                resolved[param_name] = all_params[param_name]
            elif param.default is not param.empty:
                # Use default value if no context value provided
                resolved[param_name] = param.default
            else:
                # Parameter needed but not found in context
                logging.warning(f"Parameter {param_name} not found in context for {func.__name__}")
        
        return resolved
    
    def _is_data_parameter(self, param_name: str, param_type: Any) -> bool:
        """Check if parameter is a data parameter (DataFrame, etc.)."""
        data_param_names = ['df', 'data', 'dataframe', 'dataset']
        if param_name.lower() in data_param_names:
            return True
        
        # Check type hints for DataFrame types
        if param_type:
            type_str = str(param_type)
            if any(df_type in type_str for df_type in ['DataFrame', 'LazyFrame', 'Dataset']):
                return True
        
        return False

class ContextAnalyzer:
    """Analyzes pipeline graph to automatically detect context dependencies."""
    
    def __init__(self, graph: nx.DiGraph, nodes: Dict[str, Any]):
        self.graph = graph
        self.nodes = nodes
        
    def analyze_context_dependencies(self) -> Dict[str, Set[str]]:
        """Analyze which context parameters each node actually needs."""
        dependencies = {}
        
        for node_name, node_spec in self.nodes.items():
            func = node_spec.func
            sig = signature(func)
            
            # Extract parameter names (excluding data parameters)
            context_params = set()
            for param_name, param in sig.parameters.items():
                if not self._is_data_parameter(param_name):
                    context_params.add(param_name)
            
            dependencies[node_name] = context_params
            
        return dependencies
    
    def _is_data_parameter(self, param_name: str) -> bool:
        """Check if parameter name indicates a data parameter."""
        data_indicators = ['df', 'data', 'dataframe', 'dataset', '_df', '_data']
        return any(indicator in param_name.lower() for indicator in data_indicators)
    
    def get_context_flow_graph(self) -> nx.DiGraph:
        """Create a graph showing context parameter flow between nodes."""
        context_graph = nx.DiGraph()
        
        dependencies = self.analyze_context_dependencies()
        
        # Add nodes with their context dependencies
        for node_name, context_params in dependencies.items():
            context_graph.add_node(node_name, context_deps=context_params)
        
        # Add edges for data dependencies (from original graph)
        for edge in self.graph.edges():
            context_graph.add_edge(edge[0], edge[1])
        
        return context_graph

# Convenience factory with auto-categorization
def context(**kwargs) -> Context:
    """Create context with automatic parameter categorization."""
    return Context().evolve(**kwargs)

Smart Node Decorators

Automatic Context Detection: Node decorators automatically analyze function signatures - no manual dependency specification needed.

# graphflow/core/decorators.py
from functools import wraps
from typing import List, Optional, Callable, Any, Dict
from .context import Context, ContextAnalyzer
from .dataset import Dataset
import inspect

class NodeSpec:
    """Specification for a processing node with automatic context detection."""
    
    def __init__(self, func: Callable, inputs: List[Dataset], outputs: List[Dataset],
                 cache_ttl: Optional[str] = None,
                 validation: Optional[Dict[str, Any]] = None):
        self.func = func
        self.inputs = inputs
        self.outputs = outputs
        self.cache_ttl = cache_ttl
        self.validation = validation or {}
        self.name = func.__name__
        
        # Automatically detect context dependencies
        self.context_deps = self._detect_context_dependencies()
    
    def _detect_context_dependencies(self) -> List[str]:
        """Automatically detect which context parameters this function needs."""
        sig = inspect.signature(self.func)
        context_params = []
        
        for param_name, param in sig.parameters.items():
            # Skip data parameters
            if self._is_data_parameter(param_name):
                continue
            context_params.append(param_name)
        
        return context_params
    
    def _is_data_parameter(self, param_name: str) -> bool:
        """Check if parameter is a data parameter."""
        data_indicators = ['df', 'data', 'dataframe', 'dataset', '_df', '_data']
        return any(indicator in param_name.lower() for indicator in data_indicators)

def node(
    inputs: List[Dataset] = None,
    outputs: List[Dataset] = None,
    cache_ttl: str = "24h",
    validate_schema: bool = True,
    validate_data_quality: bool = True,
    executor: str = "auto",
    retries: int = 2,
    timeout: int = 3600,
    tags: List[str] = None
):
    """
    Decorator for processing nodes with automatic context detection.
    
    No need to specify context_deps - they're automatically detected!
    """
    
    def decorator(func: Callable) -> NodeSpec:
        
        @wraps(func)
        def wrapper(context: Context, *data_args, **data_kwargs):
            # Automatically resolve context parameters for this function
            context_params = context.resolve_for_function(func)
            
            # Merge with any explicit kwargs
            final_kwargs = {**context_params, **data_kwargs}
            
            # Call original function with resolved parameters
            result = func(*data_args, **final_kwargs)
            
            # Apply post-processing (validation, profiling, etc.)
            if validate_schema:
                result = _validate_schema(result, func.__name__)
            if validate_data_quality:
                result = _validate_data_quality(result, func.__name__)
                
            return result
        
        # Create node specification with automatic context detection
        spec = NodeSpec(
            func=wrapper,
            inputs=inputs or [],
            outputs=outputs or [],
            cache_ttl=cache_ttl
        )
        
        # Attach metadata
        wrapper._graphflow_spec = spec
        wrapper._original_func = func
        wrapper._tags = tags or []
        wrapper._executor = executor
        wrapper._retries = retries
        wrapper._timeout = timeout
        
        return spec
    
    return decorator

def _validate_schema(df, func_name: str):
    """Validate DataFrame schema consistency."""
    # Implementation for schema validation
    return df

def _validate_data_quality(df, func_name: str):
    """Validate data quality metrics.""" 
    # Implementation for DQ validation
    return df

Dataset Abstraction

Flexible Dataset Handling: Supports multiple formats and automatic format detection.

# featureflow/core/dataset.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Tuple, List, Union
from pathlib import Path

@dataclass(frozen=True)
class Dataset:
    """Logical dataset with intelligent format handling."""
    
    path: str
    format: str = "auto"  # auto-detect from path/content
    partition_by: Tuple[str, ...] = ()
    schema: Optional[dict] = None
    metadata: dict = None
    
    def __post_init__(self):
        if self.metadata is None:
            object.__setattr__(self, 'metadata', {})
    
    def derived(self, suffix: str) -> Dataset:
        """Create derived dataset with suffix."""
        return Dataset(
            path=f"{self.path}{suffix}",
            format=self.format,
            partition_by=self.partition_by,
            schema=self.schema,
            metadata={**self.metadata, "derived_from": self.path}
        )
    
    def with_partitions(self, *partition_cols: str) -> Dataset:
        """Create dataset with different partitioning."""
        return Dataset(
            path=self.path,
            format=self.format,
            partition_by=tuple(partition_cols),
            schema=self.schema,
            metadata=self.metadata
        )
    
    def with_format(self, format: str) -> Dataset:
        """Create dataset with different format."""
        return Dataset(
            path=self.path,
            format=format,
            partition_by=self.partition_by,
            schema=self.schema,
            metadata=self.metadata
        )
    
    @property
    def uri(self) -> str:
        """Full URI when combined with base_uri."""
        return self.path
    
    @property
    def name(self) -> str:
        """Human-readable name."""
        return Path(self.path).name

# Convenience factory
def dataset(path: str, *, format: str = "auto", partition_by: List[str] = None, 
           schema: dict = None, **metadata) -> Dataset:
    """Create dataset with optional metadata."""
    return Dataset(
        path=path,
        format=format,
        partition_by=tuple(partition_by or []),
        schema=schema,
        metadata=metadata
    )

Pipeline Engine with Graph Intelligence

Intelligent Pipeline Orchestration: Automatically detects dependencies and provides rich graph export capabilities.

# graphflow/core/pipeline.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Union
import networkx as nx
from .context import Context, ContextAnalyzer
from .decorators import NodeSpec
from .dataset import Dataset
from .scheduler import Scheduler
from .graph_exporter import GraphExporter

@dataclass
class Pipeline:
    """Intelligent data processing pipeline with automatic dependency detection."""
    
    name: str
    base_uri: str
    context: Context
    _graph: nx.DiGraph = field(default_factory=nx.DiGraph, init=False)
    _nodes: Dict[str, NodeSpec] = field(default_factory=dict, init=False)
    _scheduler: Optional[Scheduler] = field(default=None, init=False)
    _context_analyzer: Optional[ContextAnalyzer] = field(default=None, init=False)
    _graph_exporter: Optional[GraphExporter] = field(default=None, init=False)
    
    def __post_init__(self):
        self._scheduler = Scheduler(self)
        self._context_analyzer = ContextAnalyzer(self._graph, self._nodes)
        self._graph_exporter = GraphExporter(self)
    
    def add_node(self, node_spec: NodeSpec) -> None:
        """Add a single node to the pipeline."""
        if node_spec.name in self._nodes:
            raise ValueError(f"Node {node_spec.name} already exists")
        
        self._nodes[node_spec.name] = node_spec
        self._graph.add_node(node_spec.name, spec=node_spec)
        
        # Auto-detect dependencies from inputs/outputs
        self._auto_detect_dependencies(node_spec)
        
        # Validate DAG property
        if not nx.is_directed_acyclic_graph(self._graph):
            raise ValueError("Pipeline must be a DAG")
        
        # Update context analyzer
        self._context_analyzer = ContextAnalyzer(self._graph, self._nodes)
    
    def add_nodes(self, node_specs: List[NodeSpec]) -> None:
        """Add multiple nodes to the pipeline."""
        for spec in node_specs:
            self.add_node(spec)
    
    def _auto_detect_dependencies(self, node_spec: NodeSpec) -> None:
        """Automatically detect dependencies based on input/output datasets."""
        for input_dataset in node_spec.inputs:
            # Find nodes that output to this input
            for other_name, other_spec in self._nodes.items():
                if other_name == node_spec.name:
                    continue
                
                for output_dataset in other_spec.outputs:
                    if self._datasets_match(input_dataset, output_dataset):
                        self._graph.add_edge(other_name, node_spec.name)
    
    def _datasets_match(self, ds1: Dataset, ds2: Dataset) -> bool:
        """Check if two datasets represent the same logical data."""
        return ds1.path == ds2.path or ds1.path in ds2.metadata.get('derived_from', '')
    
    def export_graph(self, 
                    format: str = "html",
                    output: str = None,
                    include_context: bool = True,
                    include_data_flow: bool = True,
                    show_cache_status: bool = True,
                    **kwargs) -> str:
        """
        Export pipeline graph in various formats.
        
        Formats: html, graphviz, mermaid, json, yaml
        """
        return self._graph_exporter.export(
            format=format,
            output=output,
            include_context=include_context,
            include_data_flow=include_data_flow,
            show_cache_status=show_cache_status,
            **kwargs
        )
    
    def inspector(self) -> 'PipelineInspector':
        """Get pipeline inspector for real-time graph analysis."""
        return PipelineInspector(self)
    
    def set_node_executor(self, node_name: str, executor: str) -> None:
        """Set specific executor for a node (for hybrid execution)."""
        if node_name not in self._nodes:
            raise ValueError(f"Node {node_name} not found")
        self._nodes[node_name]._executor = executor
    
    def run(self, 
            targets: List[Union[str, NodeSpec]] = None,
            where: Dict[str, Any] = None,
            executor: str = "auto",
            executor_config: Dict[str, Any] = None,
            execution_hints: Dict[str, Any] = None,
            max_workers: int = None,
            validate_data: bool = True,
            profile_data: bool = False,
            cache_policy: str = "read_write",
            dry_run: bool = False) -> PipelineResult:
        """Execute the pipeline with intelligent scheduling."""
        
        target_names = []
        if targets:
            for target in targets:
                if isinstance(target, str):
                    target_names.append(target)
                elif isinstance(target, NodeSpec):
                    target_names.append(target.name)
                else:
                    target_names.append(str(target))
        else:
            # Run all nodes if no targets specified
            target_names = list(self._nodes.keys())
        
        return self._scheduler.execute(
            targets=target_names,
            where=where or {},
            executor=executor,
            executor_config=executor_config or {},
            execution_hints=execution_hints or {},
            max_workers=max_workers,
            validate_data=validate_data,
            profile_data=profile_data,
            cache_policy=cache_policy,
            dry_run=dry_run
        )
    
    def describe(self) -> Dict[str, Any]:
        """Get pipeline description and statistics."""
        context_deps = self._context_analyzer.analyze_context_dependencies()
        
        return {
            "name": self.name,
            "nodes": len(self._nodes),
            "dependencies": self._graph.number_of_edges(),
            "complexity": nx.density(self._graph),
            "longest_path": len(nx.dag_longest_path(self._graph)) if self._nodes else 0,
            "context": {
                "data_params": len(self.context.data),
                "processing_params": len(self.context.processing),
                "ml_params": len(self.context.ml),
            },
            "context_dependencies": {
                node: list(deps) for node, deps in context_deps.items()
            }
        }

class PipelineInspector:
    """Real-time pipeline inspection and analysis."""
    
    def __init__(self, pipeline: Pipeline):
        self.pipeline = pipeline
        
    def get_graph_state(self) -> 'GraphState':
        """Get current graph state."""
        return GraphState(
            node_count=len(self.pipeline._nodes),
            edge_count=self.pipeline._graph.number_of_edges(),
            cached_nodes=self._get_cached_nodes(),
            dirty_nodes=self._get_dirty_nodes()
        )
    
    def _get_cached_nodes(self) -> List[str]:
        """Get list of cached nodes."""
        # Implementation would check cache status
        return []
    
    def _get_dirty_nodes(self) -> List[str]:
        """Get list of dirty nodes that need recomputation."""
        # Implementation would check dirty status
        return []
    
    @property
    def nodes(self) -> Dict[str, 'NodeInfo']:
        """Get detailed information about each node."""
        node_info = {}
        context_deps = self.pipeline._context_analyzer.analyze_context_dependencies()
        
        for node_name, node_spec in self.pipeline._nodes.items():
            node_info[node_name] = NodeInfo(
                name=node_name,
                status="ready",  # Would be computed from cache/execution status
                context_dependencies=list(context_deps.get(node_name, set())),
                data_dependencies=list(self.pipeline._graph.predecessors(node_name)),
                cache_key="cache_key_placeholder",
                last_execution=None
            )
        
        return node_info
    
    def filter_nodes(self, **filters) -> 'SubGraph':
        """Filter nodes based on criteria."""
        filtered_nodes = {}
        
        for node_name, node_info in self.nodes.items():
            include = True
            
            if 'status' in filters and node_info.status != filters['status']:
                include = False
            if 'context_deps' in filters:
                required_deps = filters['context_deps']
                if not any(dep in node_info.context_dependencies for dep in required_deps):
                    include = False
            
            if include:
                filtered_nodes[node_name] = node_info
        
        return SubGraph(filtered_nodes, self.pipeline)

@dataclass
class GraphState:
    node_count: int
    edge_count: int
    cached_nodes: List[str]
    dirty_nodes: List[str]

@dataclass  
class NodeInfo:
    name: str
    status: str
    context_dependencies: List[str]
    data_dependencies: List[str]
    cache_key: str
    last_execution: Optional[str]

class SubGraph:
    def __init__(self, nodes: Dict[str, NodeInfo], pipeline: Pipeline):
        self.nodes = nodes
        self.pipeline = pipeline
    
    def export(self, output: str, format: str = "html") -> None:
        """Export filtered subgraph."""
        # Create subgraph and export
        pass

@dataclass
class PipelineResult:
    """Result of pipeline execution with metadata."""
    
    pipeline_name: str
    executed_nodes: List[str]
    execution_time: float
    cache_hits: int
    cache_misses: int
    artifacts_created: List[str]
    validation_results: Dict[str, Any]
    profiling_results: Optional[Dict[str, Any]] = None
    
    def summary(self) -> str:
        """Human-readable execution summary."""
        cache_rate = self.cache_hits / (self.cache_hits + self.cache_misses) * 100 if (self.cache_hits + self.cache_misses) > 0 else 0
        return f"""
Pipeline: {self.pipeline_name}
Nodes executed: {len(self.executed_nodes)}
Execution time: {self.execution_time:.2f}s
Cache hit rate: {cache_rate:.1f}%
Artifacts created: {len(self.artifacts_created)}
        """.strip()

Execution System

Multi-Backend Execution: Supports local, distributed, and cloud execution.

# featureflow/core/executors.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging

class Executor(ABC):
    """Base executor interface."""
    
    @abstractmethod
    def execute_feature(self, feature_spec: 'FeatureSpec', context: 'Context', 
                       inputs: Dict[str, Any]) -> Any:
        """Execute a single feature."""
        pass
    
    @abstractmethod
    def execute_batch(self, feature_batch: List['FeatureSpec'], 
                     context: 'Context') -> Dict[str, Any]:
        """Execute a batch of features."""
        pass
    
    @abstractmethod
    def get_resource_usage(self) -> Dict[str, float]:
        """Get current resource usage."""
        pass

class LocalExecutor(Executor):
    """Local single-threaded executor."""
    
    def execute_feature(self, feature_spec, context, inputs):
        return feature_spec.func(context, **inputs)
    
    def execute_batch(self, feature_batch, context):
        results = {}
        for spec in feature_batch:
            # Load inputs for this feature
            inputs = self._load_inputs(spec, context)
            results[spec.name] = self.execute_feature(spec, context, inputs)
        return results
    
    def get_resource_usage(self):
        return {"cpu": 0.0, "memory": 0.0}
    
    def _load_inputs(self, spec, context):
        # Implementation to load input datasets
        return {}

class ThreadPoolExecutor(Executor):
    """Thread-based parallel executor."""
    
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
    
    def execute_batch(self, feature_batch, context):
        futures = {}
        for spec in feature_batch:
            inputs = self._load_inputs(spec, context)
            future = self._executor.submit(self.execute_feature, spec, context, inputs)
            futures[spec.name] = future
        
        results = {}
        for name, future in futures.items():
            results[name] = future.result()
        
        return results

class RayExecutor(Executor):
    """Ray-based distributed executor."""
    
    def __init__(self, ray_address: Optional[str] = None):
        try:
            import ray
            if not ray.is_initialized():
                ray.init(address=ray_address)
            self.ray = ray
        except ImportError:
            raise ImportError("Ray is required for RayExecutor")
    
    def execute_feature(self, feature_spec, context, inputs):
        @self.ray.remote
        def remote_feature_exec(spec, ctx, inp):
            return spec.func(ctx, **inp)
        
        future = remote_feature_exec.remote(feature_spec, context, inputs)
        return self.ray.get(future)
    
    def execute_batch(self, feature_batch, context):
        @self.ray.remote
        def remote_batch_exec(specs, ctx):
            results = {}
            for spec in specs:
                inputs = self._load_inputs(spec, ctx)
                results[spec.name] = spec.func(ctx, **inputs)
            return results
        
        future = remote_batch_exec.remote(feature_batch, context)
        return self.ray.get(future)

class CloudExecutor(Executor):
    """Base class for cloud executors (Vertex AI, AWS Batch, etc.)."""
    
    def __init__(self, cloud_config: Dict[str, Any]):
        self.cloud_config = cloud_config
    
    @abstractmethod
    def submit_remote_job(self, feature_spec, context, inputs) -> str:
        """Submit job to cloud service and return job ID."""
        pass
    
    @abstractmethod
    def wait_for_completion(self, job_id: str) -> Any:
        """Wait for job completion and return results."""
        pass

# Executor registry
EXECUTORS = {
    "local": LocalExecutor,
    "threads": ThreadPoolExecutor, 
    "ray": RayExecutor,
}

def get_executor(name: str, **kwargs) -> Executor:
    """Get executor by name."""
    if name not in EXECUTORS:
        raise ValueError(f"Unknown executor: {name}")
    return EXECUTORS[name](**kwargs)

🎯 Complete Feature Engineering Examples

Example 1: E-commerce Customer Churn Prediction

Scenario: Build comprehensive customer features for churn prediction using transaction, behavioral, and demographic data.

from featureflow import Pipeline, feature, context, dataset
import pandas as pd
import numpy as np

# Define feature engineering context
ctx = context(
    # Time windows for feature computation
    short_window=7,
    medium_window=30, 
    long_window=90,
    
    # Business rules
    min_transaction_amount=5.0,
    high_value_threshold=500.0,
    churn_definition_days=60,
    
    # ML parameters
    target_col="will_churn",
    categorical_encoding_threshold=50,
    
    # Infrastructure
    cache_ttl="24h",
    validation_strict=True
)

# Create pipeline
pipeline = Pipeline(
    name="customer_churn_features",
    base_uri="s3://ml-features/churn/",
    context=ctx
)

# Raw datasets
customers = dataset("bronze/customers", partition_by=["signup_month"])
transactions = dataset("bronze/transactions", partition_by=["date", "customer_id"])
website_events = dataset("bronze/website_events", partition_by=["date"])
support_tickets = dataset("bronze/support_tickets", partition_by=["created_date"])

# Feature 1: Customer Demographics & Account Features
@feature(
    inputs=[customers],
    outputs=[dataset("silver/customer_demographics")]
)
def engineer_customer_demographics(df: pd.DataFrame, 
                                 categorical_encoding_threshold: int) -> pd.DataFrame:
    """Engineer demographic and account-based features."""
    
    # Age-based features
    df['account_age_days'] = (pd.Timestamp.now() - pd.to_datetime(df['signup_date'])).dt.days
    df['account_age_months'] = df['account_age_days'] / 30.44
    df['is_new_customer'] = df['account_age_days'] < 30
    
    # Geographic features
    df['is_major_city'] = df['city'].isin(['New York', 'Los Angeles', 'Chicago', 'Houston'])
    
    # Categorical encoding based on cardinality
    for col in ['state', 'city', 'acquisition_channel']:
        if col in df.columns:
            cardinality = df[col].nunique()
            if cardinality > categorical_encoding_threshold:
                # High cardinality: use frequency encoding
                freq_map = df[col].value_counts(normalize=True).to_dict()
                df[f'{col}_frequency'] = df[col].map(freq_map)
            else:
                # Low cardinality: use one-hot encoding
                dummies = pd.get_dummies(df[col], prefix=col)
                df = pd.concat([df, dummies], axis=1)
    
    return df

# Feature 2: Transaction-based Features
@feature(
    inputs=[transactions],
    outputs=[dataset("silver/transaction_features", partition_by=["customer_id"])],
    context_deps=["short_window", "medium_window", "long_window", "min_transaction_amount"]
)
def engineer_transaction_features(df: pd.DataFrame,
                                short_window: int,
                                medium_window: int, 
                                long_window: int,
                                min_transaction_amount: float) -> pd.DataFrame:
    """Comprehensive transaction-based feature engineering."""
    
    # Filter valid transactions
    df = df[df['amount'] >= min_transaction_amount].copy()
    df['transaction_date'] = pd.to_datetime(df['transaction_date'])
    
    features = []
    
    for customer_id in df['customer_id'].unique():
        cust_data = df[df['customer_id'] == customer_id].sort_values('transaction_date')
        
        if len(cust_data) < 3:  # Skip customers with too few transactions
            continue
            
        cust_features = {'customer_id': customer_id}
        
        # Rolling window features for each time period
        for window_days, window_name in [(short_window, 'short'), 
                                       (medium_window, 'medium'), 
                                       (long_window, 'long')]:
            
            # Set up rolling window
            window_data = cust_data.set_index('transaction_date').rolling(f'{window_days}D')
            
            # Transaction frequency features
            cust_features[f'transaction_count_{window_name}'] = len(cust_data.tail(window_days))
            cust_features[f'avg_days_between_transactions_{window_name}'] = (
                cust_data['transaction_date'].diff().dt.days.tail(window_days).mean()
            )
            
            # Amount-based features
            recent_amounts = cust_data['amount'].tail(window_days)
            cust_features[f'total_amount_{window_name}'] = recent_amounts.sum()
            cust_features[f'avg_amount_{window_name}'] = recent_amounts.mean()
            cust_features[f'std_amount_{window_name}'] = recent_amounts.std()
            cust_features[f'max_amount_{window_name}'] = recent_amounts.max()
            
            # Behavioral patterns
            cust_features[f'unique_categories_{window_name}'] = (
                cust_data['category'].tail(window_days).nunique()
            )
            cust_features[f'weekend_transaction_ratio_{window_name}'] = (
                cust_data.tail(window_days)['transaction_date'].dt.weekday.isin([5, 6]).mean()
            )
        
        # Trend features (comparing windows)
        cust_features['amount_trend_short_vs_medium'] = (
            cust_features['avg_amount_short'] / (cust_features['avg_amount_medium'] + 1e-6)
        )
        cust_features['frequency_trend_short_vs_long'] = (
            cust_features['transaction_count_short'] / (cust_features['transaction_count_long'] + 1e-6)
        )
        
        features.append(cust_features)
    
    return pd.DataFrame(features)

# Feature 3: Behavioral/Engagement Features  
@feature(
    inputs=[website_events],
    outputs=[dataset("silver/behavioral_features", partition_by=["customer_id"])],
    context_deps=["medium_window", "long_window"]
)
def engineer_behavioral_features(df: pd.DataFrame,
                               medium_window: int,
                               long_window: int) -> pd.DataFrame:
    """Web engagement and behavioral features."""
    
    df['event_date'] = pd.to_datetime(df['event_date'])
    features = []
    
    for customer_id in df['customer_id'].unique():
        cust_events = df[df['customer_id'] == customer_id].sort_values('event_date')
        
        cust_features = {'customer_id': customer_id}
        
        for window_days, window_name in [(medium_window, 'medium'), (long_window, 'long')]:
            recent_events = cust_events.tail(window_days)
            
            # Session-based features
            cust_features[f'total_sessions_{window_name}'] = recent_events['session_id'].nunique()
            cust_features[f'avg_session_duration_{window_name}'] = (
                recent_events.groupby('session_id')['session_duration'].first().mean()
            )
            
            # Page engagement
            cust_features[f'total_page_views_{window_name}'] = len(recent_events)
            cust_features[f'unique_pages_viewed_{window_name}'] = recent_events['page_url'].nunique()
            
            # Product interaction
            product_events = recent_events[recent_events['event_type'] == 'product_view']
            cust_features[f'products_viewed_{window_name}'] = product_events['product_id'].nunique()
            
            # Cart and checkout behavior
            cart_events = recent_events[recent_events['event_type'] == 'add_to_cart']
            cust_features[f'cart_additions_{window_name}'] = len(cart_events)
            
            checkout_events = recent_events[recent_events['event_type'] == 'checkout_start']
            cust_features[f'checkout_starts_{window_name}'] = len(checkout_events)
            
            # Engagement ratios
            if cust_features[f'total_sessions_{window_name}'] > 0:
                cust_features[f'pages_per_session_{window_name}'] = (
                    cust_features[f'total_page_views_{window_name}'] / 
                    cust_features[f'total_sessions_{window_name}']
                )
        
        features.append(cust_features)
    
    return pd.DataFrame(features)

# Feature 4: Customer Support Features
@feature(
    inputs=[support_tickets],
    outputs=[dataset("silver/support_features", partition_by=["customer_id"])],
    context_deps=["long_window"]
)
def engineer_support_features(df: pd.DataFrame, long_window: int) -> pd.DataFrame:
    """Customer support interaction features."""
    
    df['created_date'] = pd.to_datetime(df['created_date'])
    features = []
    
    for customer_id in df['customer_id'].unique():
        tickets = df[df['customer_id'] == customer_id].sort_values('created_date')
        recent_tickets = tickets.tail(long_window)
        
        cust_features = {
            'customer_id': customer_id,
            'total_tickets': len(recent_tickets),
            'avg_resolution_time': recent_tickets['resolution_time_hours'].mean(),
            'unresolved_tickets': len(recent_tickets[recent_tickets['status'] == 'open']),
            'escalated_tickets': len(recent_tickets[recent_tickets['escalated'] == True]),
            'satisfaction_score': recent_tickets['satisfaction_rating'].mean(),
        }
        
        # Categorize ticket types
        for ticket_type in recent_tickets['category'].unique():
            cust_features[f'tickets_{ticket_type.lower()}'] = len(
                recent_tickets[recent_tickets['category'] == ticket_type]
            )
        
        features.append(cust_features)
    
    return pd.DataFrame(features)

# Feature 5: Final Feature Matrix Assembly
@feature(
    inputs=[
        dataset("silver/customer_demographics"),
        dataset("silver/transaction_features"),
        dataset("silver/behavioral_features"), 
        dataset("silver/support_features")
    ],
    outputs=[dataset("gold/customer_feature_matrix")],
    context_deps=["target_col", "churn_definition_days"]
)
def create_final_feature_matrix(demo_df: pd.DataFrame,
                              trans_df: pd.DataFrame,
                              behav_df: pd.DataFrame,
                              support_df: pd.DataFrame,
                              target_col: str,
                              churn_definition_days: int) -> pd.DataFrame:
    """Assemble final feature matrix with target variable."""
    
    # Start with demographics as base
    feature_matrix = demo_df.copy()
    
    # Merge all feature sets
    for df_to_merge in [trans_df, behav_df, support_df]:
        feature_matrix = feature_matrix.merge(
            df_to_merge, 
            on='customer_id', 
            how='left'
        )
    
    # Fill missing values with appropriate defaults
    numeric_cols = feature_matrix.select_dtypes(include=[np.number]).columns
    feature_matrix[numeric_cols] = feature_matrix[numeric_cols].fillna(0)
    
    # Create target variable (churn prediction)
    # In real scenario, this would be computed based on future behavior
    feature_matrix[target_col] = np.random.choice([0, 1], size=len(feature_matrix), p=[0.8, 0.2])
    
    # Feature engineering metadata
    feature_matrix['feature_generation_date'] = pd.Timestamp.now()
    feature_matrix['total_features'] = len(feature_matrix.columns) - 3  # Exclude ID, target, date
    
    return feature_matrix

# Assemble and run the pipeline
pipeline.add_features([
    engineer_customer_demographics,
    engineer_transaction_features,
    engineer_behavioral_features,
    engineer_support_features,  
    create_final_feature_matrix
])

# Execute with intelligent scheduling
result = pipeline.run(
    targets=["gold/customer_feature_matrix"],
    executor="ray",
    max_workers=16,
    validate_features=True,
    profile_data=True
)

print(result.summary())

Example 2: Financial Risk Assessment Pipeline

Scenario: Real-time credit risk features for loan applications.

from featureflow import Pipeline, feature, context, dataset
from featureflow.patterns import time_series_features, aggregation_features
import pandas as pd

# Risk assessment context
risk_ctx = context(
    # Risk windows
    immediate_window=7,
    short_risk_window=30,
    medium_risk_window=90,
    long_risk_window=365,
    
    # Risk thresholds
    high_risk_amount=10000,
    default_rate_threshold=0.05,
    credit_score_threshold=650,
    
    # Regulatory requirements
    pii_anonymization=True,
    audit_trail=True,
    
    # Model parameters
    target_col="default_risk_score",
    feature_importance_threshold=0.01
)

pipeline = Pipeline(
    name="credit_risk_features",
    base_uri="s3://risk-models/features/",
    context=risk_ctx
)

# Data sources
applications = dataset("bronze/loan_applications", partition_by=["application_date"])
credit_history = dataset("bronze/credit_bureau", partition_by=["customer_id"])
bank_transactions = dataset("bronze/bank_statements", partition_by=["date", "account_id"])
employment_data = dataset("bronze/employment_verification")

@feature(
    inputs=[applications, credit_history],
    outputs=[dataset("silver/credit_profile_features")]
)
@aggregation_features(
    group_by="customer_id",
    agg_specs={
        "credit_history": {
            "payment_amount": ["sum", "mean", "std"],
            "days_late": ["max", "mean", "count"],
            "credit_utilization": ["mean", "max"]
        }
    },
    time_windows=["30d", "90d", "365d"]
)
def compute_credit_profile_features(apps_df: pd.DataFrame, 
                                  credit_df: pd.DataFrame) -> pd.DataFrame:
    """Credit bureau and application-based risk features."""
    # Implementation handled by aggregation_features decorator
    pass

@feature(
    inputs=[bank_transactions],
    outputs=[dataset("silver/cash_flow_features", partition_by=["customer_id"])]
)
@time_series_features(
    timestamp_col="transaction_date",
    group_by="customer_id", 
    windows=["7d", "30d", "90d"],
    aggregations=["sum", "mean", "std", "count"]
)
def compute_cash_flow_features(df: pd.DataFrame,
                             high_risk_amount: float) -> pd.DataFrame:
    """Bank transaction cash flow analysis."""
    # Additional custom logic beyond the decorator
    df['is_high_risk_transaction'] = df['amount'] > high_risk_amount
    df['transaction_velocity'] = df.groupby('customer_id')['amount'].rolling(7).count()
    return df

# Run risk pipeline
result = pipeline.run(
    targets=["silver/credit_profile_features", "silver/cash_flow_features"],
    executor="threads",
    max_workers=8,
    validate_features=True
)

🛠️ Implementation Roadmap

Phase 1: Core Framework (Weeks 1-4)

Deliverables:

  • Context system with intelligent parameter resolution
  • Feature decorator with basic validation
  • Dataset abstraction with format detection
  • Local executor implementation
  • Basic pipeline orchestration

Package Structure:

graphflow/
├── __init__.py                 # Public API exports
├── core/
│   ├── __init__.py
│   ├── context.py             # Automatic context system
│   ├── decorators.py          # @node decorator with auto-detection
│   ├── dataset.py             # Dataset abstraction
│   ├── pipeline.py            # Pipeline orchestration with graph exports
│   ├── executors.py           # Execution backends
│   ├── scheduler.py           # Task scheduling
│   └── graph_exporter.py      # Graph export utilities
├── patterns/                  # Processing patterns (not just ML)
│   ├── __init__.py
│   ├── time_series.py         # Time series processing
│   ├── aggregation.py         # Data aggregation
│   ├── transformation.py      # Data transformation
│   ├── validation.py          # Data validation
│   └── ml_features.py         # ML-specific feature engineering
├── io/                        # Data I/O adapters
│   ├── __init__.py
│   ├── parquet.py            # Parquet adapter
│   ├── arrow.py              # Arrow adapter
│   └── formats.py            # Format detection
├── cache/                     # Caching system
│   ├── __init__.py
│   ├── manager.py            # Cache manager
│   ├── stores.py             # Cache stores (FS, Redis, S3)
│   └── hashing.py            # Content addressing
├── validation/                # Data validation
│   ├── __init__.py
│   ├── schema.py             # Schema validation
│   ├── quality.py            # Data quality checks
│   └── profiling.py          # Data profiling
├── observability/             # Monitoring & reporting
│   ├── __init__.py
│   ├── logging.py            # Structured logging
│   ├── metrics.py            # Prometheus metrics
│   ├── tracing.py            # OpenTelemetry tracing
│   └── reporting.py          # HTML reports
├── integrations/              # External integrations
│   ├── __init__.py
│   ├── feast.py              # Feast feature store
│   ├── sagemaker.py          # AWS SageMaker
│   └── vertex.py             # Google Vertex AI
├── cli/                       # Command line interface
│   ├── __init__.py
│   ├── main.py               # CLI entry point
│   ├── run.py                # Pipeline execution
│   └── visualize.py          # Pipeline visualization
└── tests/                     # Test suite
    ├── unit/                  # Unit tests
    ├── integration/           # Integration tests
    └── examples/              # Example pipelines

Phase 2: Advanced Features (Weeks 5-8)

Deliverables:

  • Ray/Dask distributed execution
  • Advanced caching with TTL and eviction
  • Feature pattern decorators
  • Data validation and profiling
  • HTML reporting and visualization

Phase 3: Production Features (Weeks 9-12)

Deliverables:

  • Cloud executors (Vertex AI, AWS Batch)
  • Feature store integrations
  • CLI interface
  • Comprehensive monitoring
  • Performance optimizations

Phase 4: Enterprise Features (Weeks 13-16)

Deliverables:

  • Advanced security and governance
  • Multi-tenant support
  • API server for pipeline management
  • Advanced scheduling and orchestration
  • Enterprise integrations

📋 Technical Specifications

Dependencies

Core Dependencies:

# setup.py or pyproject.toml
dependencies = [
    "pandas>=2.0.0",
    "polars>=0.19.0", 
    "pyarrow>=12.0.0",
    "networkx>=3.0",
    "fsspec>=2023.6.0",
    "pydantic>=2.0.0",
    "loguru>=0.7.0",
    "cloudpickle>=2.2.0",
    "blake3>=0.3.0",  # Fast hashing
    "typer>=0.9.0",   # CLI framework
]

# Optional dependencies
extras_require = {
    "distributed": ["ray>=2.6.0", "dask>=2023.8.0"],
    "cloud": ["google-cloud-aiplatform", "boto3", "azure-ml"],
    "visualization": ["pyvis>=0.3.0", "graphviz>=0.20.0", "jinja2>=3.1.0"],
    "feature-store": ["feast>=0.32.0"],
    "monitoring": ["prometheus-client>=0.17.0", "opentelemetry-api>=1.19.0"],
    "validation": ["great-expectations>=0.17.0", "pandera>=0.16.0"],
    "all": ["ray", "dask", "feast", "great-expectations", "pyvis", "prometheus-client"]
}

Performance Requirements

Metric Target Notes
Context Resolution < 1ms per function Parameter injection overhead
Feature Execution 10K+ rows/sec/core Single-threaded performance
Cache Lookup < 10ms Including hash computation
Pipeline Startup < 5 seconds DAG analysis and planning
Memory Overhead < 100MB base Framework footprint
Distributed Scaling Linear to 100+ nodes Ray/Dask scaling

Quality Gates

Code Quality:

  • 90%+ test coverage
  • Type hints for all public APIs
  • Black code formatting
  • Pylint score > 8.5
  • No security vulnerabilities (Bandit)

Performance:

  • Benchmark suite for core operations
  • Memory profiling for large datasets
  • Distributed execution tests
  • Cache efficiency measurements

Documentation:

  • API documentation (Sphinx)
  • User guide with examples
  • Architecture decision records
  • Performance tuning guide

🎯 Success Metrics

Developer Experience

  • Time to first feature: < 15 minutes from install to running pipeline
  • Learning curve: Data scientist productive in < 2 hours
  • API satisfaction: 90%+ positive feedback on ease of use

Performance

  • Execution efficiency: 80%+ cache hit rate in typical workflows
  • Scaling: Linear performance scaling to 100+ cores
  • Resource utilization: < 20% overhead vs direct pandas/polars

Production Readiness

  • Reliability: 99.9% successful pipeline executions
  • Observability: Complete visibility into pipeline health
  • Maintenance: < 1 hour/week operational overhead

Adoption

  • Community: 1000+ GitHub stars in first 6 months
  • Enterprise: 10+ production deployments
  • Ecosystem: Integration with 5+ major ML platforms

🚀 Getting Started

Installation

# Basic installation
pip install graphflow

# With all optional dependencies
pip install "graphflow[all]"

# For distributed execution
pip install "graphflow[distributed]"

# For cloud execution
pip install "graphflow[cloud]"

Quick Start

from graphflow import Pipeline, node, context, dataset
import pandas as pd

# Create context - parameters automatically flow where needed
ctx = context(
    lookback_days=30,
    min_samples=100,
    target_col="churn"
)

# Create pipeline
pipeline = Pipeline(
    name="my_data_pipeline",
    base_uri="s3://my-bucket/data/",
    context=ctx
)

# Define processing node - context params auto-detected and injected!
@node(
    inputs=[dataset("raw/customers")],
    outputs=[dataset("processed/customer_features")]
)
def process_customers(df: pd.DataFrame, 
                     lookback_days: int,    # Auto-injected from context
                     min_samples: int) -> pd.DataFrame:  # Auto-injected from context
    # Context automatically provides lookback_days and min_samples
    return df.groupby('customer_id').tail(lookback_days)

# Add to pipeline and run
pipeline.add_node(process_customers)

# Choose execution backend dynamically
result = pipeline.run(
    executor="auto"  # or "local", "ray", "cloud"
)

print(result.summary())

# Export pipeline graph
pipeline.export_graph(format="html", output="my_pipeline.html")

This comprehensive PRD provides a complete blueprint for building GraphFlow as a production-ready data processing framework with automatic context management, intelligent graph exports, and flexible execution - perfect for any data workflow from ETL to ML feature engineering!