A stateful PySpark ELT framework for Microsoft Fabric
ArcFlow, named after an electrical arc (a continiuous discharge of electricity across two conductors), is a stateful PySpark data engineering framework designed for blazing fast, scalable, and testable lakehouse architectures. It provides a flexible, zone-agnostic approach to building either streaming or batch ELT pipelines, all using a unified API backed by Spark Structured Streaming semantics.
ArcFlow provides a means of rapidly moving and transforming data between zones of a lakehouse, all while implementing Fabric Spark best pracitces out-of-the-box.
- Zone-Agnostic Architecture: No hardcoded bronze/silver/gold - use any zone names you want
- Single-Source Processing: Process tables through zones with
FlowConfig - Stream & Batch Toggle: Easy development-to-production switch
- Factory Patterns: Extensible readers and writers for any format
- Custom Transformations: Table-specific, zone-specific transformations via registry
- Dimensional Modeling: Coming soon — multi-source fact, dimension, and bridge table support
Landing (raw files)
↓
Bronze (raw ingestion)
↓
Silver (curation, deduplication, semi-structured data parsing)
↓
Gold (aggregations, business logic)
- Models:
FlowConfig,StageConfig- Type-safe table definitions - Readers: Factory pattern for Parquet, JSON, CSV, etc.
- Writers: Delta Lake append and upsert operations
- Transformations: Universal + custom per-table-per-zone
- Pipelines:
ZonePipeline(single-source processing) - Controller:
Controller- Coordinates all pipelines - Job Lock:
JobLock- Singleton lock to prevent duplicate concurrent runs
# Clone repository
git clone <your-repo>
cd ArcFlow
# Install dependencies with UV
uv sync
# Or with pip
pip install -e .Tables can be defined in Python (dataclasses) or YAML — both produce the same FlowConfig objects.
from arcflow import FlowConfig, StageConfig
tables = {}
tables['sensor_data'] = FlowConfig(
name='sensor_data',
format='parquet',
landing_path='Files/landing/opc_ua/sensor_data/',
schema=StructType([...]),
zones={
'bronze': StageConfig(mode='append'),
'silver': StageConfig(
mode='upsert',
merge_keys=['sensor_id', 'timestamp'],
custom_transform='clean_sensor_data',
),
'gold': StageConfig(
mode='append',
custom_transform='aggregate_hourly',
)
}
)# pipeline.yml
config:
streaming_enabled: true
checkpoint_uri: "Files/checkpoints/"
landing_uri: "Files/landing/"
tables:
sensor_data:
format: parquet
source_uri: "Files/landing/opc_ua/sensor_data/"
schema: "sensor_id STRING, timestamp TIMESTAMP, value DOUBLE"
zones:
bronze:
mode: append
silver:
mode: upsert
merge_keys: [sensor_id, timestamp]
custom_transform: clean_sensor_data
gold:
mode: append
custom_transform: aggregate_sensor_dataLoad and use it:
from arcflow import load_yaml_config, Controller
tables, config = load_yaml_config("pipeline.yml")
controller = Controller(spark, config, tables)Schemas in YAML use PySpark DDL syntax — including nested types:
schema: >-
_meta STRUCT<producer: STRING, recordType: STRING>,
data ARRAY<STRUCT<ItemId: STRING, Cost: DOUBLE>>See examples/pipeline_config.yml for a full example.
Register table-specific, zone-specific transformations:
from arcflow.transformations.zone_transforms import register_zone_transformer
from pyspark.sql import DataFrame
@register_zone_transformer()
def clean_sensor_data(df: DataFrame) -> DataFrame:
"""Custom cleaning logic for sensor data in silver zone"""
return df.filter(df.sensor_value.isNotNull()) \
.filter(df.sensor_value >= 0)from arcflow import Controller
from arcflow.core.spark_session import create_spark_session
# Create Spark session
spark = create_spark_session(app_name="MyELT")
# Configure pipeline
config = {
'streaming_enabled': True,
'checkpoint_uri': 'Files/checkpoints/',
'landing_uri': 'Files/landing/',
}
# Initialize controller
controller = Controller(
spark=spark,
config=config,
table_registry=tables
)
# Run full pipeline
controller.run_full_pipeline(zones=['bronze', 'silver', 'gold'])Coming soon — Multi-source dimensional modeling (fact tables, dimensions, bridge tables) is planned for a future release.
In production, overlapping scheduled runs or manual re-triggers can cause data corruption, duplicate writes, or checkpoint conflicts. ArcFlow includes a file-based singleton lock to prevent duplicate concurrent runs of the same pipeline job.
The lock is opt-in — enable it by setting job_lock_enabled and providing a job_id in your config.
When enabled, the lock is automatically acquired at the start of run_full_pipeline() or run_zone_pipeline() and released on completion (or error). Nested calls (e.g., run_full_pipeline calling run_zone_pipeline internally) do not re-acquire.
from arcflow import Controller, get_config
config = get_config({
'job_id': 'shipment-etl-prod', # unique identifier for this job (scopes the lock)
'job_lock_enabled': True, # enable singleton lock
'job_lock_path': 'Files/locks/', # directory for lock files
'job_lock_timeout_seconds': 1800, # wait up to 30 min for existing lock
# poll_interval and heartbeat_interval are auto-derived from timeout_seconds
# (see Derived Intervals below) — override only if needed
})
controller = Controller(spark, config, tables)
controller.run_full_pipeline() # lock acquired here, released on completion
controller.stop_all() # also releases lock if still heldUse JobLock directly for fine-grained control:
from arcflow import JobLock
with JobLock(job_id='my-etl-job', lock_path='Files/locks/', timeout_seconds=600):
# Only one instance of this block can run at a time per job_id
controller.run_full_pipeline()| Scenario | Behavior |
|---|---|
| Lock not held | Acquire immediately, write lock file |
| Lock held by another run | Wait and retry every poll_interval seconds |
Wait exceeds timeout_seconds |
Raise JobLockError |
| Same process (notebook re-run) | Re-acquire immediately (auto-detected via instance ID) |
| Lock older than holder's timeout | Auto-recover as stale (log warning) |
| Corrupt lock file | Treated as stale, auto-recovered |
| Pipeline error | Lock released in finally block |
| Long-running job | Heartbeat thread refreshes lock file to prevent false stale recovery |
The lock file includes an auto-generated instance_id (a UUID created once per Python process at import time). If a new Controller is created in the same session — e.g. re-running a notebook cell — it sees its own instance_id in the lock file and re-acquires silently. A different Spark job (separate process) gets a different UUID and will block as expected.
# First run — acquires lock
controller = Controller(spark, config, tables)
controller.run_full_pipeline()
# Re-run in same notebook session — re-acquires, no conflict
controller = Controller(spark, config, tables)
controller.run_full_pipeline()While the lock is held, a background daemon thread rewrites the lock file to keep acquired_at fresh. This prevents a legitimate long-running job from being mistaken for stale by another instance.
Both poll_interval and heartbeat_interval are automatically derived from timeout_seconds when not explicitly set, so all timing scales consistently from a single value:
| Interval | Formula | Floor | Example (timeout=3600) |
Example (timeout=30) |
|---|---|---|---|---|
poll_interval |
timeout_seconds // 10 |
5s | 360s | 5s |
heartbeat_interval |
timeout_seconds // 3 |
10s | 1200s | 10s |
Override either interval explicitly if the defaults don't suit your workload.
The lock file (<lock_path>/<job_id>.lock) is human-readable JSON for debugging:
{
"job_id": "shipment-etl-prod",
"acquired_at": "2026-03-13T22:00:00+00:00",
"timeout_seconds": 1800,
"instance_id": "a3f8b2c1d4e5",
"hostname": "spark-worker-01",
"pid": 12345
}| Key | Type | Default | Description |
|---|---|---|---|
job_id |
str |
None |
Unique job identifier (required to enable lock) |
job_lock_enabled |
bool |
False |
Enable singleton lock |
job_lock_path |
str |
"Files/locks/" |
Directory for lock files |
job_lock_timeout_seconds |
int |
3600 |
Max wait time before failing (seconds) |
job_lock_poll_interval |
int |
None |
Retry interval while waiting (default: timeout_seconds // 10, min 5s) |
job_lock_heartbeat_interval |
int |
None |
Lock file refresh interval (default: timeout_seconds // 3, min 10s) |
Fast iteration with batch processing:
config = {
'streaming_enabled': False # Batch mode
}Continuous processing:
config = {
'streaming_enabled': True, # Streaming mode
'checkpoint_uri': 'Files/checkpoints/',
}uv buildThis creates a wheel file in dist/:
dist/arcflow-0.1.0-py3-none-any.whl
- Open your Fabric workspace
- Go to Settings → Workspace settings → Data Engineering/Science
- Upload the wheel file
- Create new Spark Job Definition
- Set Main file:
src/arcflow/main.py - Add attached libraries:
arcflow-0.1.0-py3-none-any.whl - Configure Spark properties if needed
- Run the job
Edit src/arcflow/main.py to:
- Define your table registry
- Configure zones
- Register custom transformations
- Set pipeline behavior
src/arcflow/
├── __init__.py # Package exports
├── models.py # FlowConfig, StageConfig
├── yaml_loader.py # YAML configuration loader
├── controller.py # Controller — orchestrates all pipelines
├── lock.py # Singleton job lock (duplicate run prevention)
├── main.py # Entry point for Spark Job Definition
│
├── core/
│ ├── spark_session.py # Spark session factory
│ └── stream_manager.py # Streaming query lifecycle
│
├── readers/
│ ├── base_reader.py # BaseReader abstract class
│ ├── parquet_reader.py # Parquet implementation
│ ├── json_reader.py # JSON implementation
│ └── reader_factory.py # ReaderFactory
│
├── writers/
│ ├── base_writer.py # BaseWriter abstract class
│ ├── delta_writer.py # Delta Lake implementation
│ └── writer_factory.py # WriterFactory
│
├── transformations/
│ ├── common.py # Universal transformations
│ └── zone_transforms.py # Custom zone transformations registry
│
└── pipelines/
└── zone_pipeline.py # Single-source zone processing
Run tests with pytest:
# All tests
uv run pytest
# With coverage
uv run pytest --cov=src/arcflow --cov-report=html
# Specific test
uv run pytest tests/test_transformations.pyFlowConfig(
name: str, # Table name
format: str, # 'parquet', 'json', 'csv', etc.
landing_uri: str, # Path to raw files
zones: dict, # Dict of zone_name -> StageConfig
json_explode_arrays: bool = False,
json_archive_after_read: bool = False,
csv_header: bool = True,
csv_delimiter: str = ',',
csv_infer_schema: bool = True,
)StageConfig(
enabled: bool, # Enable this zone
mode: str, # 'append' or 'upsert'
merge_keys: List[str] = None, # Required for upsert
custom_transform: str = None, # Name of registered transformer
description: str = '',
)- Bronze: Raw ingestion, append-only, minimal transformations
- Silver: Curated, deduplicated, upsert mode with merge keys
- Gold: Business aggregations, materialized views
- Keep transformations pure functions
- Make them testable with unit tests
- Use descriptive names:
clean_sensor_data,enrich_customer_data - Register with decorators for discoverability
- Always specify merge keys for upsert mode
- Include timestamp or version columns for change tracking
- Use batch mode (
streaming_enabled=False) for development - Use streaming mode in production for continuous processing
- Test with batch before deploying streaming
- Monitor streaming query status with
controller.get_status() - Implement graceful shutdown with
controller.stop_all() - Use checkpointing for exactly-once processing
See src/arcflow/main.py for complete examples including:
- Sensor data processing
- Equipment maintenance logs
- Production schedule ingestion
Contributions welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Submit a pull request
MIT License - See LICENSE file for details
For issues, questions, or feature requests, please open a GitHub issue.
Built with ❤️ for data engineers working with Microsoft Fabric