Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,5 @@ $RECYCLE.BIN/

# End of https://www.toptal.com/developers/gitignore/api/macos,python,windows

.streamlit/secrets.toml
.streamlit/secrets.toml
*.json
24 changes: 24 additions & 0 deletions bigquery_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import streamlit as st
from google.oauth2 import service_account
from google.cloud import bigquery


@st.cache_data
def load_henry_hub_from_bigquery():
credentials = service_account.Credentials.from_service_account_info(
st.secrets["gcp_service_account"]
)

client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)

query = """
SELECT date, series_description, price
FROM `sipa-adv-c-dancing-cactus.dataset.henry_hub_prices`
ORDER BY date
"""

df = client.query(query).to_dataframe()
return df
102 changes: 102 additions & 0 deletions data_load/load_henry_hub_to_bq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import streamlit as st
import requests
import pandas as pd
from google.cloud import bigquery
from pandas_gbq import to_gbq

PROJECT_ID = "sipa-adv-c-dancing-cactus"
DATASET_ID = "dataset"
TABLE_ID = "henry_hub_prices"
TABLE_FULL_ID = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"


def fetch_henry_hub_data():
api_key = st.secrets["EIA_API_KEY"]

url = (
"https://api.eia.gov/v2/natural-gas/pri/fut/data/"
"?frequency=daily"
"&data[0]=value"
"&start=1993-12-24"
"&sort[0][column]=period"
"&sort[0][direction]=desc"
f"&api_key={api_key}"
)

response = requests.get(url, timeout=30)
response.raise_for_status()
raw = response.json()

records = raw["response"]["data"]
df = pd.DataFrame(records)

print("Columns from API:", df.columns.tolist())

keep_cols = [
c for c in ["period", "series-description", "value"] if c in df.columns
]
df = df[keep_cols].copy()

df = df.rename(
columns={
"period": "date",
"series-description": "series_description",
"value": "price",
}
)

df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.date
df["price"] = pd.to_numeric(df["price"], errors="coerce")

if "series_description" not in df.columns:
df["series_description"] = "Henry Hub Natural Gas Price"

df = df.dropna(subset=["date", "price"]).drop_duplicates()

return df


def ensure_table_exists():
client = bigquery.Client(project=PROJECT_ID)

dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
client.get_dataset(dataset_ref)

schema = [
bigquery.SchemaField("date", "DATE"),
bigquery.SchemaField("series_description", "STRING"),
bigquery.SchemaField("price", "FLOAT"),
]

try:
client.get_table(TABLE_FULL_ID)
print(f"Table already exists: {TABLE_FULL_ID}")
except Exception:
table = bigquery.Table(TABLE_FULL_ID, schema=schema)
client.create_table(table)
print(f"Created table: {TABLE_FULL_ID}")


def upload_to_bigquery(df):
to_gbq(
dataframe=df,
destination_table=f"{DATASET_ID}.{TABLE_ID}",
project_id=PROJECT_ID,
if_exists="replace",
)
print(f"Uploaded {len(df)} rows to {TABLE_FULL_ID}")


def main():
df = fetch_henry_hub_data()

print(df.head())
print(df.dtypes)
print(f"Row count: {len(df)}")

ensure_table_exists()
upload_to_bigquery(df)


if __name__ == "__main__":
main()
135 changes: 22 additions & 113 deletions market_analysis.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from __future__ import annotations

import os
import datetime

import pandas as pd
import requests
import streamlit as st

import pandas_gbq
from google.oauth2 import service_account

from bigquery_utils import load_henry_hub_from_bigquery

# ------ Page config ------
st.set_page_config(page_title="Energy Market Analysis", layout="wide")

Expand All @@ -19,52 +19,12 @@
"https://mis.nyiso.com/public/csv/realtime/{month}01realtime_zone_csv.zip"
)

EIA_HENRY_HUB_BASE_URL = "https://api.eia.gov/v2/natural-gas/pri/fut/data/"

# ------ Credentials ------
creds = st.secrets["gcp_service_account"]
credentials = service_account.Credentials.from_service_account_info(creds)


# ------ Utility helpers ------
def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df.columns = [str(col).strip() for col in df.columns]
return df


def find_column(df: pd.DataFrame, candidates: list[str]) -> str | None:
lower_map = {col.lower(): col for col in df.columns}
for candidate in candidates:
if candidate.lower() in lower_map:
return lower_map[candidate.lower()]
return None


def get_eia_api_key() -> str:
"""
Try Streamlit secrets first, then environment variable.
"""
# 1) Streamlit secrets
try:
if "EIA_API_KEY" in st.secrets:
key = str(st.secrets["EIA_API_KEY"]).strip()
if key:
return key
except Exception:
pass

# 2) Environment variable
key = os.getenv("EIA_API_KEY", "").strip()
if key:
return key

raise ValueError(
"Missing EIA_API_KEY. Please set it in .streamlit/secrets.toml "
"or export it in the same terminal session before running Streamlit."
)


# ------ API loaders -------
@st.cache_data(ttl=3600)
def load_nyiso_realtime(selected_month) -> any:
Expand All @@ -87,71 +47,20 @@ def load_nyiso_realtime(selected_month) -> any:


@st.cache_data(ttl=3600)
def load_henry_hub_data(start_date: str = "1993-12-24") -> pd.DataFrame:
"""
Load Henry Hub natural gas prices from EIA API.
"""
api_key = get_eia_api_key()
today_str = datetime.date.today().isoformat()

params = {
"api_key": api_key,
"frequency": "daily",
"data[0]": "value",
"start": start_date,
"end": today_str,
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"offset": 0,
"length": 5000,
}

response = requests.get(EIA_HENRY_HUB_BASE_URL, params=params, timeout=60)
response.raise_for_status()

payload = response.json()

if "response" not in payload or "data" not in payload["response"]:
raise ValueError(f"Unexpected EIA API response format: {payload}")

records = payload["response"]["data"]
if not records:
raise ValueError("EIA API returned no Henry Hub records.")

df = pd.DataFrame(records)
df = normalize_columns(df)

period_col = find_column(df, ["period"])
value_col = find_column(df, ["value"])
series_col = find_column(
df, ["series-description", "seriesDescription", "series description"]
)

if period_col is None or value_col is None:
raise ValueError(
f"Could not identify 'period' and 'value' columns in EIA response. "
f"Columns found: {list(df.columns)}"
)

keep_cols = [period_col, value_col]
if series_col:
keep_cols.append(series_col)

df = df[keep_cols].copy()

rename_map = {
period_col: "date",
value_col: "price",
}
if series_col:
rename_map[series_col] = "series_description"

df = df.rename(columns=rename_map)

def load_henry_hub_data() -> pd.DataFrame:
df = load_henry_hub_from_bigquery().copy()
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["price"] = pd.to_numeric(df["price"], errors="coerce")
df = df.dropna(subset=["date", "price"]).copy()
df = df.sort_values("date")
df = df.dropna(subset=["date", "price"]).sort_values("date")

if "series_description" in df.columns:
df = df[
df["series_description"].str.contains(
"Henry Hub Natural Gas Spot Price",
case=False,
na=False,
)
].copy()

return df

Expand Down Expand Up @@ -227,12 +136,12 @@ def render_sidebar() -> None:
"""
**This page combines**
- NYISO real-time electricity prices
- EIA Henry Hub natural gas prices
- Henry Hub natural gas prices
- exploratory market interpretation

**Data source logic**
- electricity: online NYISO public data
- gas: online EIA API
- gas: Henry Hub data stored in BigQuery
"""
)

Expand All @@ -242,7 +151,7 @@ def render_intro() -> None:
st.write(
"""
This page combines two related parts of our project: NYISO real-time electricity prices and
Henry Hub natural gas prices from the EIA API. The goal is to present a more coherent energy-market
Henry Hub natural gas prices stored in BigQuery. The goal is to present a more coherent energy-market
view by showing both local electricity price outcomes and broader benchmark fuel-market conditions.
"""
)
Expand Down Expand Up @@ -297,7 +206,7 @@ def render_gas_section(gas_df: pd.DataFrame) -> None:

st.write(
"""
This section uses Henry Hub daily prices from the EIA API. Henry Hub is treated here as a benchmark
This section uses Henry Hub daily prices loaded from BigQuery. Henry Hub is treated here as a benchmark
U.S. natural gas series, which provides fuel-market context for interpreting electricity price movements.
"""
)
Expand Down Expand Up @@ -337,7 +246,7 @@ def render_gas_section(gas_df: pd.DataFrame) -> None:
st.line_chart(chart_df, use_container_width=True)

st.caption(
"Henry Hub price is a benchmark U.S. natural gas series from the EIA API."
"Henry Hub price is a benchmark U.S. natural gas series loaded from BigQuery."
)

st.write("**Interpretation**")
Expand All @@ -352,7 +261,7 @@ def render_gas_section(gas_df: pd.DataFrame) -> None:
def render_gas_unavailable(exc: Exception) -> None:
st.header("Natural Gas Benchmark Context")
st.warning(
"Henry Hub gas data could not be loaded in the current runtime environment. "
"Henry Hub gas data could not be loaded from BigQuery in the current runtime environment. "
"Electricity content is still available below the intro section."
)
st.code(str(exc))
Expand All @@ -376,8 +285,8 @@ def render_comparison_section(gas_available: bool) -> None:
st.write(
"""
NYISO real-time prices capture local power-market outcomes. Henry Hub is intended to provide broader
benchmark fuel-market context, but it is unavailable in the current runtime because the EIA API key
was not detected or the API request failed.
benchmark fuel-market context, but it is unavailable in the current runtime because the BigQuery query
failed or credentials were not configured correctly.
"""
)

Expand Down
Loading