Skip to content

codezelaca/de-advanced-airflow

Repository files navigation

Crypto History ETL Training Project

Scenario

This project is a training lab to build a real Airflow ETL pipeline.

The business case:

  1. Extract daily historical crypto market snapshots from CoinGecko.
  2. Transform nested API payloads into a clean tabular shape.
  3. Load data into Postgres with idempotent upsert logic so DAG reruns do not create duplicates.

Target coins in scope:

  • bitcoin
  • ethereum
  • solana

What Interns Will Learn

  1. How to run Airflow locally using Astro CLI.
  2. How to run an additional database service alongside Airflow with Docker Compose override.
  3. How to configure Airflow connections for external systems.
  4. How to design an ETL DAG with extraction, transformation, and loading tasks.
  5. How to write rerunnable pipelines using ON CONFLICT upsert.
  6. How to troubleshoot common local development issues.

Project Files You Should Know

Architecture and Ports

Two Postgres databases run locally:

  1. Airflow metadata Postgres
  2. Target analytics Postgres used by this ETL pipeline

Port mapping in docker-compose.override.yml:

  • Airflow metadata Postgres: host 5434 -> container 5432
  • Target DB (target-db): host 5433 -> container 5432

Why this matters:

  • Host port 5432 is commonly already in use.
  • Splitting ports avoids conflicts and keeps both databases available.

Step-by-Step Guide for Interns

Step 1: Prerequisites

Install and verify:

  1. Docker Desktop
  2. Astro CLI

Then check Astro CLI:

astro version

Step 2: Start the Project

From project root:

astro dev start

Airflow UI will be available at:

If Airflow is already running and you changed config or dependencies:

astro dev restart

Step 3: Verify Target Database Service

This project already contains docker-compose.override.yml with the target-db service.

Expected target DB credentials:

  • user: admin
  • password: password
  • database: crypto_db
  • host port from your machine: 5433

Step 4: Ensure Airflow Connection Exists

This project sets the connection via .env:

AIRFLOW_CONN_TARGET_DB_CONN=postgresql://admin:password@host.docker.internal:5433/crypto_db

Why host.docker.internal:

  • Tasks run inside Airflow containers.
  • localhost inside container is not your host machine.
  • host.docker.internal allows container-to-host access on Docker Desktop.

Optional validation command:

astro dev run connections get target_db_conn

Step 5: Understand the DAG

The pipeline DAG is dags/crypto_history_etl.py.

DAG id:

  • intern_crypto_history_pipeline

Flow:

  1. start_pipeline
  2. create_table
  3. process_crypto task group
  4. end_pipeline

Inside process_crypto, each coin has:

  1. extract_coin
  2. transform_coin
  3. load_coin

Key ETL behaviors:

  • Extract: pulls historical daily data from CoinGecko.
  • Transform: isolates date, coin, price_usd, market_cap_usd, volume_usd.
  • Load: uses PostgresHook and upsert to prevent duplicates.

Step 6: Trigger and Observe

In Airflow UI:

  1. Open DAGs.
  2. Find intern_crypto_history_pipeline.
  3. Unpause DAG.
  4. Trigger DAG.
  5. Open Graph view and task logs.

CLI checks:

astro dev run dags list
astro dev run dags list-import-errors

Step 7: Validate Loaded Data

After a successful run, table created in target DB:

  • crypto_historical

Columns:

  • date
  • coin
  • price_usd
  • market_cap_usd
  • volume_usd

Primary key:

  • (date, coin)

This key powers idempotent upsert behavior.

Dependencies

Current Python dependencies in requirements.txt:

  • requests
  • pandas
  • apache-airflow-providers-postgres

Notes:

  • The DAG currently relies on requests and Postgres provider.
  • pandas is available for future transforms.

Common Issues and Fixes

1. Ports are not available on 5432

Symptom:

  • Docker bind error on localhost:5432.

Fix:

2. Unresolved tag !override in editor

Symptom:

  • YAML warning for !override.

Fix:

3. ModuleNotFoundError for PostgresOperator path

Symptom:

  • airflow.providers.postgres.operators.postgres import fails.

Fix:

4. DAG.init unexpected keyword schedule_interval

Symptom:

  • Parsing error in newer Airflow runtime.

Fix:

5. could not translate host name target-db

Symptom:

  • Connection fails resolving target-db from Airflow task.

Fix:

  • Use target_db_conn mapped to host.docker.internal:5433 via .env.

Quick Command Reference

astro dev start
astro dev restart
astro dev run dags list
astro dev run dags list-import-errors
astro dev run connections get target_db_conn

Expected Outcome

By the end of this lab, interns should be able to:

  1. Build and run a production-style ETL DAG locally.
  2. Store daily crypto history in Postgres reliably.
  3. Re-run pipelines safely without duplicate rows.
  4. Diagnose and resolve common Airflow local dev issues.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors