Real-time crypto market data ingestion into Snowflake via Openflow (Snowpipe Streaming v2), with dbt transformations downstream.
Source API ──► Openflow (NiFi on SPCS) ──► Snowflake
│ │
InvokeHTTP CRYPTO.RAW_KUCOIN.MARKET_ALL_TICKERS
JoltTransformJSON (VARIANT column)
PutSnowpipeStreaming2 │
dbt transforms
(staging → base → mart)
│
semantic view + listing
Each data source is defined as a single JSON file in openflow/flows/. The generic deployer reads the JSON and creates the full NiFi flow automatically. A programmatic builder (Python class + nipyapi) is also available for full-stack deploys where DCM pre-creates all Snowflake objects.
scripts/
├── deploy_all.sh # Full stack: Terraform → DCM → Openflow → dbt
├── snowflake_tf_vars.py # TF_VAR_* derivation + shared connection loader
├── run_terraform_apply.py
├── run_terraform_destroy.py
└── openflow_delete_kucoin.py # Remove KuCoin PGs from NiFi (teardown helper)
openflow/
├── deploy.py # Generic JSON deployer (NiFi REST via requests)
├── deploy.sh # Shell wrapper for the programmatic builder
├── flows/
│ ├── kucoin_alltickers.json # Declarative flow definition (standalone deploys)
│ └── kucoin_alltickers.py # Programmatic flow builder (full-stack deploys)
└── shared/
└── flow_builder.py # Base class for programmatic builders (nipyapi SDK)
sources/
├── definitions/
│ └── 01_crypto_analytics.sql # DCM definitions (schemas, table, grants, warehouse, dbt project)
└── dbt_kucoin/ # dbt project (EXECUTE DBT PROJECT on Snowflake)
├── models/
│ ├── staging/ # Flatten VARIANT → typed columns (view)
│ ├── base/ # Incremental dedup + derived columns
│ ├── mart/ # Daily aggregates, volatility, health, latest snapshot
│ └── semantic/ # Semantic view for natural-language queries
├── macros/
│ └── generate_schema_name.sql
└── packages.yml
terraform/ # Transient CRYPTO database + dbt external access integration
manifest.yml # DCM project manifest (targets, Jinja templating vars)
pyproject.toml # Python deps (nipyapi)
- Openflow runtime running on Snowflake SPCS
- nipyapi profile in
~/.nipyapi/profiles.yml:my_profile: nifi_url: "https://of--<account>.snowflakecomputing.app/<runtime>/nifi-api" nifi_bearer_token: "<token>"
- Snow CLI with a working connection in
~/.snowflake/connections.toml - OpenTofu or Terraform (or set
SKIP_TERRAFORM=1) - Snowflake grants — see
sources/definitions/01_crypto_analytics.sqlfor the full set; the key requirement is that PUBLIC role grants are needed for Snowpipe Streaming v2
Runs Terraform → DCM → Openflow → EXECUTE DBT PROJECT in one shot:
uv sync
./scripts/deploy_all.shTeardown (reverse order, drops the CRYPTO database):
./scripts/deploy_all.sh --teardownEnvironment variables (see header of scripts/deploy_all.sh):
| Variable | Default | Purpose |
|---|---|---|
SNOWFLAKE_CONNECTION |
ahuck |
Connection name in ~/.snowflake/connections.toml |
OPENFLOW_SNOWFLAKE_ACCOUNT |
(from connection) | Snowflake account for Openflow runtime |
CRYPTO_DATABASE |
CRYPTO |
Database name (must match Terraform) |
SKIP_TERRAFORM |
— | Set 1 to skip Terraform phase |
SKIP_OPENFLOW |
— | Set 1 to skip Openflow phase |
SKIP_DBT_EXECUTE |
— | Set 1 to skip dbt execution |
- Terraform — creates an empty transient
CRYPTOdatabase and the dbt external access integration (HTTPS egress to GitHub, dbt Hub, PyPI).scripts/run_terraform_apply.pyderivesTF_VAR_*from your Snow CLI connection profile. - DCM — deploys
sources/definitions/01_crypto_analytics.sql: schemas, transient raw table, grants, warehouse, and theDEFINE DBT PROJECTblock. - Openflow — the programmatic builder (
openflow/flows/kucoin_alltickers.py) creates the NiFi flow targetingCRYPTO.RAW_KUCOIN.MARKET_ALL_TICKERS. - dbt —
EXECUTE DBT PROJECTruns the dbt models on Snowflake, thenon-run-endhooks create a share and organization listing.
For deploying additional sources without the full stack — includes self-healing DDL:
uv run openflow/deploy.py --profile <your_profile> --flow openflow/flows/kucoin_alltickers.jsonAdd --no-start to deploy without starting.
uv run python scripts/run_terraform_apply.py
# or:
uv run python scripts/run_terraform_destroy.pyManual tofu apply is also supported — copy terraform/terraform.tfvars.example to terraform.tfvars (gitignored).
- Copy an existing flow JSON:
cp openflow/flows/kucoin_alltickers.json openflow/flows/<new_source>.json
- Edit the new file — update
name,api.url,destination.*, andsnowflake.* - Grant PUBLIC role access to the target database/schema (see Prerequisites)
- Deploy:
uv run openflow/deploy.py --profile <your_profile> --flow openflow/flows/<new_source>.json
Every source JSON has these fields:
{
"name": "Human-readable name",
"description": "What this flow does",
"api": {
"url": "https://api.example.com/endpoint",
"method": "GET",
"schedule": "30 sec"
},
"destination": {
"database": "SOURCE_NAME",
"schema": "RAW",
"table": "TABLE_NAME",
"table_ddl": "CREATE TABLE IF NOT EXISTS #{Database}.#{Schema}.#{Table} (RAW VARIANT)"
},
"jolt": {
"transform": "jolt-transform-shift",
"spec": "{\"*\": \"RAW.&\"}"
},
"snowflake": {
"account": "<YOUR_ACCOUNT>",
"user": "<YOUR_USER>",
"role": "<YOUR_ROLE>"
}
}The dbt project runs natively on Snowflake via EXECUTE DBT PROJECT. Model layers:
| Layer | Schema | Materialization | Description |
|---|---|---|---|
staging/ |
STG_KUCOIN |
view | Flatten VARIANT JSON → typed columns |
base/ |
BASE_KUCOIN |
incremental | Dedup + spread, mid-price, order-book imbalance |
mart/ |
MART_KUCOIN |
incremental / table | Daily OHLCV, volatility, health scores, latest snapshot |
semantic/ |
MART_KUCOIN |
semantic_view | Pre-defined facts/dimensions/metrics for semantic_view() |
After EXECUTE DBT PROJECT, on-run-end hooks create a Snowflake share (CRYPTO_ANALYTICS_SHARE) and an organization listing with featured tables and sample queries.
- PUBLIC role grants: Snowpipe Streaming v2 requires them — even ACCOUNTADMIN fails without them
- Streaming pipe: Snowflake auto-creates
TABLE-STREAMING. You cannot manually create pipes with that suffix - Offset tokens: Must be numeric (
${now():toNumber()}), not UUIDs - Jolt spec format:
jolt-transform-shiftuses a plain object{"*": "RAW.&"}, not an array - SNOWFLAKE_MANAGED auth: On SPCS, the Role parameter must match the runtime's default role
- Nested PG parameter context: Child process groups don't inherit parent context — assign via API