Skip to content

mwc360/ArcFlow

Repository files navigation

⚡️ArcFlow

A stateful PySpark ELT framework for Microsoft Fabric

ArcFlow, named after an electrical arc (a continiuous discharge of electricity across two conductors), is a stateful PySpark data engineering framework designed for blazing fast, scalable, and testable lakehouse architectures. It provides a flexible, zone-agnostic approach to building either streaming or batch ELT pipelines, all using a unified API backed by Spark Structured Streaming semantics.

ArcFlow provides a means of rapidly moving and transforming data between zones of a lakehouse, all while implementing Fabric Spark best pracitces out-of-the-box.

Features

  • Zone-Agnostic Architecture: No hardcoded bronze/silver/gold - use any zone names you want
  • Single-Source Processing: Process tables through zones with FlowConfig
  • Stream & Batch Toggle: Easy development-to-production switch
  • Factory Patterns: Extensible readers and writers for any format
  • Custom Transformations: Table-specific, zone-specific transformations via registry
  • Dimensional Modeling: Coming soon — multi-source fact, dimension, and bridge table support

Architecture

Zone-Based Processing Flow

Landing (raw files)
    ↓
Bronze (raw ingestion)
    ↓
Silver (curation, deduplication, semi-structured data parsing)
    ↓
Gold (aggregations, business logic)

Core Components

  • Models: FlowConfig, StageConfig - Type-safe table definitions
  • Readers: Factory pattern for Parquet, JSON, CSV, etc.
  • Writers: Delta Lake append and upsert operations
  • Transformations: Universal + custom per-table-per-zone
  • Pipelines: ZonePipeline (single-source processing)
  • Controller: Controller - Coordinates all pipelines
  • Job Lock: JobLock - Singleton lock to prevent duplicate concurrent runs

Installation

# Clone repository
git clone <your-repo>
cd ArcFlow

# Install dependencies with UV
uv sync

# Or with pip
pip install -e .

Quick Start

1. Define Your Tables

Tables can be defined in Python (dataclasses) or YAML — both produce the same FlowConfig objects.

Option A: Python Dataclasses

from arcflow import FlowConfig, StageConfig

tables = {}

tables['sensor_data'] = FlowConfig(
    name='sensor_data',
    format='parquet',
    landing_path='Files/landing/opc_ua/sensor_data/',
    schema=StructType([...]),
    zones={
        'bronze': StageConfig(mode='append'),
        'silver': StageConfig(
            mode='upsert',
            merge_keys=['sensor_id', 'timestamp'],
            custom_transform='clean_sensor_data',
        ),
        'gold': StageConfig(
            mode='append',
            custom_transform='aggregate_hourly',
        )
    }
)

Option B: YAML Configuration

# pipeline.yml
config:
  streaming_enabled: true
  checkpoint_uri: "Files/checkpoints/"
  landing_uri: "Files/landing/"

tables:
  sensor_data:
    format: parquet
    source_uri: "Files/landing/opc_ua/sensor_data/"
    schema: "sensor_id STRING, timestamp TIMESTAMP, value DOUBLE"
    zones:
      bronze:
        mode: append
      silver:
        mode: upsert
        merge_keys: [sensor_id, timestamp]
        custom_transform: clean_sensor_data
      gold:
        mode: append
        custom_transform: aggregate_sensor_data

Load and use it:

from arcflow import load_yaml_config, Controller

tables, config = load_yaml_config("pipeline.yml")
controller = Controller(spark, config, tables)

Schemas in YAML use PySpark DDL syntax — including nested types:

schema: >-
  _meta STRUCT<producer: STRING, recordType: STRING>,
  data ARRAY<STRUCT<ItemId: STRING, Cost: DOUBLE>>

See examples/pipeline_config.yml for a full example.

2. Define Custom DataFrame Transformers (Optional)

Register table-specific, zone-specific transformations:

from arcflow.transformations.zone_transforms import register_zone_transformer
from pyspark.sql import DataFrame

@register_zone_transformer()
def clean_sensor_data(df: DataFrame) -> DataFrame:
    """Custom cleaning logic for sensor data in silver zone"""
    return df.filter(df.sensor_value.isNotNull()) \
             .filter(df.sensor_value >= 0)

3. Run the Pipeline

from arcflow import Controller
from arcflow.core.spark_session import create_spark_session

# Create Spark session
spark = create_spark_session(app_name="MyELT")

# Configure pipeline
config = {
    'streaming_enabled': True,
    'checkpoint_uri': 'Files/checkpoints/',
    'landing_uri': 'Files/landing/',
}

# Initialize controller
controller = Controller(
    spark=spark,
    config=config,
    table_registry=tables
)

# Run full pipeline
controller.run_full_pipeline(zones=['bronze', 'silver', 'gold'])

Multi-Source Dimensional Modeling

Coming soon — Multi-source dimensional modeling (fact tables, dimensions, bridge tables) is planned for a future release.

Singleton Job Lock

In production, overlapping scheduled runs or manual re-triggers can cause data corruption, duplicate writes, or checkpoint conflicts. ArcFlow includes a file-based singleton lock to prevent duplicate concurrent runs of the same pipeline job.

The lock is opt-in — enable it by setting job_lock_enabled and providing a job_id in your config.

Automatic (Controller-integrated)

When enabled, the lock is automatically acquired at the start of run_full_pipeline() or run_zone_pipeline() and released on completion (or error). Nested calls (e.g., run_full_pipeline calling run_zone_pipeline internally) do not re-acquire.

from arcflow import Controller, get_config

config = get_config({
    'job_id': 'shipment-etl-prod',       # unique identifier for this job (scopes the lock)
    'job_lock_enabled': True,             # enable singleton lock
    'job_lock_path': 'Files/locks/',      # directory for lock files
    'job_lock_timeout_seconds': 1800,     # wait up to 30 min for existing lock
    # poll_interval and heartbeat_interval are auto-derived from timeout_seconds
    # (see Derived Intervals below) — override only if needed
})

controller = Controller(spark, config, tables)
controller.run_full_pipeline()   # lock acquired here, released on completion
controller.stop_all()            # also releases lock if still held

Manual (Context Manager)

Use JobLock directly for fine-grained control:

from arcflow import JobLock

with JobLock(job_id='my-etl-job', lock_path='Files/locks/', timeout_seconds=600):
    # Only one instance of this block can run at a time per job_id
    controller.run_full_pipeline()

Behavior

Scenario Behavior
Lock not held Acquire immediately, write lock file
Lock held by another run Wait and retry every poll_interval seconds
Wait exceeds timeout_seconds Raise JobLockError
Same process (notebook re-run) Re-acquire immediately (auto-detected via instance ID)
Lock older than holder's timeout Auto-recover as stale (log warning)
Corrupt lock file Treated as stale, auto-recovered
Pipeline error Lock released in finally block
Long-running job Heartbeat thread refreshes lock file to prevent false stale recovery

Session Re-entry

The lock file includes an auto-generated instance_id (a UUID created once per Python process at import time). If a new Controller is created in the same session — e.g. re-running a notebook cell — it sees its own instance_id in the lock file and re-acquires silently. A different Spark job (separate process) gets a different UUID and will block as expected.

# First run — acquires lock
controller = Controller(spark, config, tables)
controller.run_full_pipeline()

# Re-run in same notebook session — re-acquires, no conflict
controller = Controller(spark, config, tables)
controller.run_full_pipeline()

Heartbeat

While the lock is held, a background daemon thread rewrites the lock file to keep acquired_at fresh. This prevents a legitimate long-running job from being mistaken for stale by another instance.

Derived Intervals

Both poll_interval and heartbeat_interval are automatically derived from timeout_seconds when not explicitly set, so all timing scales consistently from a single value:

Interval Formula Floor Example (timeout=3600) Example (timeout=30)
poll_interval timeout_seconds // 10 5s 360s 5s
heartbeat_interval timeout_seconds // 3 10s 1200s 10s

Override either interval explicitly if the defaults don't suit your workload.

Lock File Format

The lock file (<lock_path>/<job_id>.lock) is human-readable JSON for debugging:

{
  "job_id": "shipment-etl-prod",
  "acquired_at": "2026-03-13T22:00:00+00:00",
  "timeout_seconds": 1800,
  "instance_id": "a3f8b2c1d4e5",
  "hostname": "spark-worker-01",
  "pid": 12345
}

Configuration Reference

Key Type Default Description
job_id str None Unique job identifier (required to enable lock)
job_lock_enabled bool False Enable singleton lock
job_lock_path str "Files/locks/" Directory for lock files
job_lock_timeout_seconds int 3600 Max wait time before failing (seconds)
job_lock_poll_interval int None Retry interval while waiting (default: timeout_seconds // 10, min 5s)
job_lock_heartbeat_interval int None Lock file refresh interval (default: timeout_seconds // 3, min 10s)

Development vs Production

Development Mode (Batch)

Fast iteration with batch processing:

config = {
    'streaming_enabled': False  # Batch mode
}

Production Mode (Streaming)

Continuous processing:

config = {
    'streaming_enabled': True,  # Streaming mode
    'checkpoint_uri': 'Files/checkpoints/',
}

Deploying to Microsoft Fabric

1. Build Package

uv build

This creates a wheel file in dist/:

dist/arcflow-0.1.0-py3-none-any.whl

2. Upload to Fabric Workspace

  1. Open your Fabric workspace
  2. Go to SettingsWorkspace settingsData Engineering/Science
  3. Upload the wheel file

3. Create Spark Job Definition

  1. Create new Spark Job Definition
  2. Set Main file: src/arcflow/main.py
  3. Add attached libraries: arcflow-0.1.0-py3-none-any.whl
  4. Configure Spark properties if needed
  5. Run the job

4. Customize Configuration

Edit src/arcflow/main.py to:

  • Define your table registry
  • Configure zones
  • Register custom transformations
  • Set pipeline behavior

Project Structure

src/arcflow/
├── __init__.py                      # Package exports
├── models.py                        # FlowConfig, StageConfig
├── yaml_loader.py                   # YAML configuration loader
├── controller.py                    # Controller — orchestrates all pipelines
├── lock.py                          # Singleton job lock (duplicate run prevention)
├── main.py                          # Entry point for Spark Job Definition
│
├── core/
│   ├── spark_session.py            # Spark session factory
│   └── stream_manager.py           # Streaming query lifecycle
│
├── readers/
│   ├── base_reader.py              # BaseReader abstract class
│   ├── parquet_reader.py           # Parquet implementation
│   ├── json_reader.py              # JSON implementation
│   └── reader_factory.py           # ReaderFactory
│
├── writers/
│   ├── base_writer.py              # BaseWriter abstract class
│   ├── delta_writer.py             # Delta Lake implementation
│   └── writer_factory.py           # WriterFactory
│
├── transformations/
│   ├── common.py                   # Universal transformations
│   └── zone_transforms.py          # Custom zone transformations registry
│
└── pipelines/
    └── zone_pipeline.py            # Single-source zone processing

Testing

Run tests with pytest:

# All tests
uv run pytest

# With coverage
uv run pytest --cov=src/arcflow --cov-report=html

# Specific test
uv run pytest tests/test_transformations.py

Configuration Reference

FlowConfig

FlowConfig(
    name: str,              # Table name
    format: str,            # 'parquet', 'json', 'csv', etc.
    landing_uri: str,      # Path to raw files
    zones: dict,            # Dict of zone_name -> StageConfig
    json_explode_arrays: bool = False,
    json_archive_after_read: bool = False,
    csv_header: bool = True,
    csv_delimiter: str = ',',
    csv_infer_schema: bool = True,
)

StageConfig

StageConfig(
    enabled: bool,                      # Enable this zone
    mode: str,                          # 'append' or 'upsert'
    merge_keys: List[str] = None,       # Required for upsert
    custom_transform: str = None,       # Name of registered transformer
    description: str = '',
)

Best Practices

1. Zone Strategy

  • Bronze: Raw ingestion, append-only, minimal transformations
  • Silver: Curated, deduplicated, upsert mode with merge keys
  • Gold: Business aggregations, materialized views

2. Custom Transformations

  • Keep transformations pure functions
  • Make them testable with unit tests
  • Use descriptive names: clean_sensor_data, enrich_customer_data
  • Register with decorators for discoverability

3. Merge Keys

  • Always specify merge keys for upsert mode
  • Include timestamp or version columns for change tracking

4. Streaming vs Batch

  • Use batch mode (streaming_enabled=False) for development
  • Use streaming mode in production for continuous processing
  • Test with batch before deploying streaming

5. Error Handling

  • Monitor streaming query status with controller.get_status()
  • Implement graceful shutdown with controller.stop_all()
  • Use checkpointing for exactly-once processing

Examples

See src/arcflow/main.py for complete examples including:

  • Sensor data processing
  • Equipment maintenance logs
  • Production schedule ingestion

Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Submit a pull request

License

MIT License - See LICENSE file for details

Support

For issues, questions, or feature requests, please open a GitHub issue.


Built with ❤️ for data engineers working with Microsoft Fabric

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors