Skip to content

anthu/crypto-analytics

Repository files navigation

Crypto Analytics

Real-time crypto market data ingestion into Snowflake via Openflow (Snowpipe Streaming v2), with dbt transformations downstream.

Architecture

Source API ──► Openflow (NiFi on SPCS) ──► Snowflake
                  │                            │
          InvokeHTTP               CRYPTO.RAW_KUCOIN.MARKET_ALL_TICKERS
          JoltTransformJSON                (VARIANT column)
          PutSnowpipeStreaming2                 │
                                          dbt transforms
                                     (staging → base → mart)
                                               │
                                     semantic view + listing

Each data source is defined as a single JSON file in openflow/flows/. The generic deployer reads the JSON and creates the full NiFi flow automatically. A programmatic builder (Python class + nipyapi) is also available for full-stack deploys where DCM pre-creates all Snowflake objects.

Project Structure

scripts/
├── deploy_all.sh               # Full stack: Terraform → DCM → Openflow → dbt
├── snowflake_tf_vars.py        # TF_VAR_* derivation + shared connection loader
├── run_terraform_apply.py
├── run_terraform_destroy.py
└── openflow_delete_kucoin.py   # Remove KuCoin PGs from NiFi (teardown helper)

openflow/
├── deploy.py                   # Generic JSON deployer (NiFi REST via requests)
├── deploy.sh                   # Shell wrapper for the programmatic builder
├── flows/
│   ├── kucoin_alltickers.json  # Declarative flow definition (standalone deploys)
│   └── kucoin_alltickers.py    # Programmatic flow builder (full-stack deploys)
└── shared/
    └── flow_builder.py         # Base class for programmatic builders (nipyapi SDK)

sources/
├── definitions/
│   └── 01_crypto_analytics.sql # DCM definitions (schemas, table, grants, warehouse, dbt project)
└── dbt_kucoin/                 # dbt project (EXECUTE DBT PROJECT on Snowflake)
    ├── models/
    │   ├── staging/            # Flatten VARIANT → typed columns (view)
    │   ├── base/               # Incremental dedup + derived columns
    │   ├── mart/               # Daily aggregates, volatility, health, latest snapshot
    │   └── semantic/           # Semantic view for natural-language queries
    ├── macros/
    │   └── generate_schema_name.sql
    └── packages.yml

terraform/                      # Transient CRYPTO database + dbt external access integration
manifest.yml                    # DCM project manifest (targets, Jinja templating vars)
pyproject.toml                  # Python deps (nipyapi)

Prerequisites

  1. Openflow runtime running on Snowflake SPCS
  2. nipyapi profile in ~/.nipyapi/profiles.yml:
    my_profile:
      nifi_url: "https://of--<account>.snowflakecomputing.app/<runtime>/nifi-api"
      nifi_bearer_token: "<token>"
  3. Snow CLI with a working connection in ~/.snowflake/connections.toml
  4. OpenTofu or Terraform (or set SKIP_TERRAFORM=1)
  5. Snowflake grants — see sources/definitions/01_crypto_analytics.sql for the full set; the key requirement is that PUBLIC role grants are needed for Snowpipe Streaming v2

Deploy

Full stack (recommended)

Runs Terraform → DCM → Openflow → EXECUTE DBT PROJECT in one shot:

uv sync
./scripts/deploy_all.sh

Teardown (reverse order, drops the CRYPTO database):

./scripts/deploy_all.sh --teardown

Environment variables (see header of scripts/deploy_all.sh):

Variable Default Purpose
SNOWFLAKE_CONNECTION ahuck Connection name in ~/.snowflake/connections.toml
OPENFLOW_SNOWFLAKE_ACCOUNT (from connection) Snowflake account for Openflow runtime
CRYPTO_DATABASE CRYPTO Database name (must match Terraform)
SKIP_TERRAFORM Set 1 to skip Terraform phase
SKIP_OPENFLOW Set 1 to skip Openflow phase
SKIP_DBT_EXECUTE Set 1 to skip dbt execution

Full stack phases

  1. Terraform — creates an empty transient CRYPTO database and the dbt external access integration (HTTPS egress to GitHub, dbt Hub, PyPI). scripts/run_terraform_apply.py derives TF_VAR_* from your Snow CLI connection profile.
  2. DCM — deploys sources/definitions/01_crypto_analytics.sql: schemas, transient raw table, grants, warehouse, and the DEFINE DBT PROJECT block.
  3. Openflow — the programmatic builder (openflow/flows/kucoin_alltickers.py) creates the NiFi flow targeting CRYPTO.RAW_KUCOIN.MARKET_ALL_TICKERS.
  4. dbtEXECUTE DBT PROJECT runs the dbt models on Snowflake, then on-run-end hooks create a share and organization listing.

Standalone flow deploy (JSON deployer)

For deploying additional sources without the full stack — includes self-healing DDL:

uv run openflow/deploy.py --profile <your_profile> --flow openflow/flows/kucoin_alltickers.json

Add --no-start to deploy without starting.

Terraform only

uv run python scripts/run_terraform_apply.py
# or:
uv run python scripts/run_terraform_destroy.py

Manual tofu apply is also supported — copy terraform/terraform.tfvars.example to terraform.tfvars (gitignored).

Adding a New Source

  1. Copy an existing flow JSON:
    cp openflow/flows/kucoin_alltickers.json openflow/flows/<new_source>.json
  2. Edit the new file — update name, api.url, destination.*, and snowflake.*
  3. Grant PUBLIC role access to the target database/schema (see Prerequisites)
  4. Deploy:
    uv run openflow/deploy.py --profile <your_profile> --flow openflow/flows/<new_source>.json

Flow JSON Schema

Every source JSON has these fields:

{
  "name": "Human-readable name",
  "description": "What this flow does",
  "api": {
    "url": "https://api.example.com/endpoint",
    "method": "GET",
    "schedule": "30 sec"
  },
  "destination": {
    "database": "SOURCE_NAME",
    "schema": "RAW",
    "table": "TABLE_NAME",
    "table_ddl": "CREATE TABLE IF NOT EXISTS #{Database}.#{Schema}.#{Table} (RAW VARIANT)"
  },
  "jolt": {
    "transform": "jolt-transform-shift",
    "spec": "{\"*\": \"RAW.&\"}"
  },
  "snowflake": {
    "account": "<YOUR_ACCOUNT>",
    "user": "<YOUR_USER>",
    "role": "<YOUR_ROLE>"
  }
}

dbt Models

The dbt project runs natively on Snowflake via EXECUTE DBT PROJECT. Model layers:

Layer Schema Materialization Description
staging/ STG_KUCOIN view Flatten VARIANT JSON → typed columns
base/ BASE_KUCOIN incremental Dedup + spread, mid-price, order-book imbalance
mart/ MART_KUCOIN incremental / table Daily OHLCV, volatility, health scores, latest snapshot
semantic/ MART_KUCOIN semantic_view Pre-defined facts/dimensions/metrics for semantic_view()

After EXECUTE DBT PROJECT, on-run-end hooks create a Snowflake share (CRYPTO_ANALYTICS_SHARE) and an organization listing with featured tables and sample queries.

Key Gotchas

  • PUBLIC role grants: Snowpipe Streaming v2 requires them — even ACCOUNTADMIN fails without them
  • Streaming pipe: Snowflake auto-creates TABLE-STREAMING. You cannot manually create pipes with that suffix
  • Offset tokens: Must be numeric (${now():toNumber()}), not UUIDs
  • Jolt spec format: jolt-transform-shift uses a plain object {"*": "RAW.&"}, not an array
  • SNOWFLAKE_MANAGED auth: On SPCS, the Role parameter must match the runtime's default role
  • Nested PG parameter context: Child process groups don't inherit parent context — assign via API

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors