diff --git a/.gitignore b/.gitignore index a5104f3..1580959 100644 --- a/.gitignore +++ b/.gitignore @@ -234,4 +234,5 @@ $RECYCLE.BIN/ # End of https://www.toptal.com/developers/gitignore/api/macos,python,windows -.streamlit/secrets.toml \ No newline at end of file +.streamlit/secrets.toml +*.json \ No newline at end of file diff --git a/bigquery_utils.py b/bigquery_utils.py new file mode 100644 index 0000000..04c6ddc --- /dev/null +++ b/bigquery_utils.py @@ -0,0 +1,24 @@ +import streamlit as st +from google.oauth2 import service_account +from google.cloud import bigquery + + +@st.cache_data +def load_henry_hub_from_bigquery(): + credentials = service_account.Credentials.from_service_account_info( + st.secrets["gcp_service_account"] + ) + + client = bigquery.Client( + credentials=credentials, + project=credentials.project_id, + ) + + query = """ + SELECT date, series_description, price + FROM `sipa-adv-c-dancing-cactus.dataset.henry_hub_prices` + ORDER BY date + """ + + df = client.query(query).to_dataframe() + return df diff --git a/data_load/load_henry_hub_to_bq.py b/data_load/load_henry_hub_to_bq.py new file mode 100644 index 0000000..d012936 --- /dev/null +++ b/data_load/load_henry_hub_to_bq.py @@ -0,0 +1,102 @@ +import streamlit as st +import requests +import pandas as pd +from google.cloud import bigquery +from pandas_gbq import to_gbq + +PROJECT_ID = "sipa-adv-c-dancing-cactus" +DATASET_ID = "dataset" +TABLE_ID = "henry_hub_prices" +TABLE_FULL_ID = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" + + +def fetch_henry_hub_data(): + api_key = st.secrets["EIA_API_KEY"] + + url = ( + "https://api.eia.gov/v2/natural-gas/pri/fut/data/" + "?frequency=daily" + "&data[0]=value" + "&start=1993-12-24" + "&sort[0][column]=period" + "&sort[0][direction]=desc" + f"&api_key={api_key}" + ) + + response = requests.get(url, timeout=30) + response.raise_for_status() + raw = response.json() + + records = raw["response"]["data"] + df = pd.DataFrame(records) + + print("Columns from API:", df.columns.tolist()) + + keep_cols = [ + c for c in ["period", "series-description", "value"] if c in df.columns + ] + df = df[keep_cols].copy() + + df = df.rename( + columns={ + "period": "date", + "series-description": "series_description", + "value": "price", + } + ) + + df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.date + df["price"] = pd.to_numeric(df["price"], errors="coerce") + + if "series_description" not in df.columns: + df["series_description"] = "Henry Hub Natural Gas Price" + + df = df.dropna(subset=["date", "price"]).drop_duplicates() + + return df + + +def ensure_table_exists(): + client = bigquery.Client(project=PROJECT_ID) + + dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}") + client.get_dataset(dataset_ref) + + schema = [ + bigquery.SchemaField("date", "DATE"), + bigquery.SchemaField("series_description", "STRING"), + bigquery.SchemaField("price", "FLOAT"), + ] + + try: + client.get_table(TABLE_FULL_ID) + print(f"Table already exists: {TABLE_FULL_ID}") + except Exception: + table = bigquery.Table(TABLE_FULL_ID, schema=schema) + client.create_table(table) + print(f"Created table: {TABLE_FULL_ID}") + + +def upload_to_bigquery(df): + to_gbq( + dataframe=df, + destination_table=f"{DATASET_ID}.{TABLE_ID}", + project_id=PROJECT_ID, + if_exists="replace", + ) + print(f"Uploaded {len(df)} rows to {TABLE_FULL_ID}") + + +def main(): + df = fetch_henry_hub_data() + + print(df.head()) + print(df.dtypes) + print(f"Row count: {len(df)}") + + ensure_table_exists() + upload_to_bigquery(df) + + +if __name__ == "__main__": + main() diff --git a/market_analysis.py b/market_analysis.py index 0f1ecab..f108ddb 100644 --- a/market_analysis.py +++ b/market_analysis.py @@ -1,15 +1,15 @@ from __future__ import annotations -import os import datetime import pandas as pd -import requests import streamlit as st import pandas_gbq from google.oauth2 import service_account +from bigquery_utils import load_henry_hub_from_bigquery + # ------ Page config ------ st.set_page_config(page_title="Energy Market Analysis", layout="wide") @@ -19,52 +19,12 @@ "https://mis.nyiso.com/public/csv/realtime/{month}01realtime_zone_csv.zip" ) -EIA_HENRY_HUB_BASE_URL = "https://api.eia.gov/v2/natural-gas/pri/fut/data/" # ------ Credentials ------ creds = st.secrets["gcp_service_account"] credentials = service_account.Credentials.from_service_account_info(creds) -# ------ Utility helpers ------ -def normalize_columns(df: pd.DataFrame) -> pd.DataFrame: - df = df.copy() - df.columns = [str(col).strip() for col in df.columns] - return df - - -def find_column(df: pd.DataFrame, candidates: list[str]) -> str | None: - lower_map = {col.lower(): col for col in df.columns} - for candidate in candidates: - if candidate.lower() in lower_map: - return lower_map[candidate.lower()] - return None - - -def get_eia_api_key() -> str: - """ - Try Streamlit secrets first, then environment variable. - """ - # 1) Streamlit secrets - try: - if "EIA_API_KEY" in st.secrets: - key = str(st.secrets["EIA_API_KEY"]).strip() - if key: - return key - except Exception: - pass - - # 2) Environment variable - key = os.getenv("EIA_API_KEY", "").strip() - if key: - return key - - raise ValueError( - "Missing EIA_API_KEY. Please set it in .streamlit/secrets.toml " - "or export it in the same terminal session before running Streamlit." - ) - - # ------ API loaders ------- @st.cache_data(ttl=3600) def load_nyiso_realtime(selected_month) -> any: @@ -87,71 +47,20 @@ def load_nyiso_realtime(selected_month) -> any: @st.cache_data(ttl=3600) -def load_henry_hub_data(start_date: str = "1993-12-24") -> pd.DataFrame: - """ - Load Henry Hub natural gas prices from EIA API. - """ - api_key = get_eia_api_key() - today_str = datetime.date.today().isoformat() - - params = { - "api_key": api_key, - "frequency": "daily", - "data[0]": "value", - "start": start_date, - "end": today_str, - "sort[0][column]": "period", - "sort[0][direction]": "desc", - "offset": 0, - "length": 5000, - } - - response = requests.get(EIA_HENRY_HUB_BASE_URL, params=params, timeout=60) - response.raise_for_status() - - payload = response.json() - - if "response" not in payload or "data" not in payload["response"]: - raise ValueError(f"Unexpected EIA API response format: {payload}") - - records = payload["response"]["data"] - if not records: - raise ValueError("EIA API returned no Henry Hub records.") - - df = pd.DataFrame(records) - df = normalize_columns(df) - - period_col = find_column(df, ["period"]) - value_col = find_column(df, ["value"]) - series_col = find_column( - df, ["series-description", "seriesDescription", "series description"] - ) - - if period_col is None or value_col is None: - raise ValueError( - f"Could not identify 'period' and 'value' columns in EIA response. " - f"Columns found: {list(df.columns)}" - ) - - keep_cols = [period_col, value_col] - if series_col: - keep_cols.append(series_col) - - df = df[keep_cols].copy() - - rename_map = { - period_col: "date", - value_col: "price", - } - if series_col: - rename_map[series_col] = "series_description" - - df = df.rename(columns=rename_map) - +def load_henry_hub_data() -> pd.DataFrame: + df = load_henry_hub_from_bigquery().copy() df["date"] = pd.to_datetime(df["date"], errors="coerce") df["price"] = pd.to_numeric(df["price"], errors="coerce") - df = df.dropna(subset=["date", "price"]).copy() - df = df.sort_values("date") + df = df.dropna(subset=["date", "price"]).sort_values("date") + + if "series_description" in df.columns: + df = df[ + df["series_description"].str.contains( + "Henry Hub Natural Gas Spot Price", + case=False, + na=False, + ) + ].copy() return df @@ -227,12 +136,12 @@ def render_sidebar() -> None: """ **This page combines** - NYISO real-time electricity prices - - EIA Henry Hub natural gas prices + - Henry Hub natural gas prices - exploratory market interpretation **Data source logic** - electricity: online NYISO public data - - gas: online EIA API + - gas: Henry Hub data stored in BigQuery """ ) @@ -242,7 +151,7 @@ def render_intro() -> None: st.write( """ This page combines two related parts of our project: NYISO real-time electricity prices and - Henry Hub natural gas prices from the EIA API. The goal is to present a more coherent energy-market + Henry Hub natural gas prices stored in BigQuery. The goal is to present a more coherent energy-market view by showing both local electricity price outcomes and broader benchmark fuel-market conditions. """ ) @@ -297,7 +206,7 @@ def render_gas_section(gas_df: pd.DataFrame) -> None: st.write( """ - This section uses Henry Hub daily prices from the EIA API. Henry Hub is treated here as a benchmark + This section uses Henry Hub daily prices loaded from BigQuery. Henry Hub is treated here as a benchmark U.S. natural gas series, which provides fuel-market context for interpreting electricity price movements. """ ) @@ -337,7 +246,7 @@ def render_gas_section(gas_df: pd.DataFrame) -> None: st.line_chart(chart_df, use_container_width=True) st.caption( - "Henry Hub price is a benchmark U.S. natural gas series from the EIA API." + "Henry Hub price is a benchmark U.S. natural gas series loaded from BigQuery." ) st.write("**Interpretation**") @@ -352,7 +261,7 @@ def render_gas_section(gas_df: pd.DataFrame) -> None: def render_gas_unavailable(exc: Exception) -> None: st.header("Natural Gas Benchmark Context") st.warning( - "Henry Hub gas data could not be loaded in the current runtime environment. " + "Henry Hub gas data could not be loaded from BigQuery in the current runtime environment. " "Electricity content is still available below the intro section." ) st.code(str(exc)) @@ -376,8 +285,8 @@ def render_comparison_section(gas_available: bool) -> None: st.write( """ NYISO real-time prices capture local power-market outcomes. Henry Hub is intended to provide broader - benchmark fuel-market context, but it is unavailable in the current runtime because the EIA API key - was not detected or the API request failed. + benchmark fuel-market context, but it is unavailable in the current runtime because the BigQuery query + failed or credentials were not configured correctly. """ )