From f374cf1c41161a8b1c128faae1d8ce27275f3859 Mon Sep 17 00:00:00 2001 From: Hanghai Li Date: Fri, 3 Apr 2026 03:09:19 -0400 Subject: [PATCH] Implement Lab 10 BigQuery-backed dashboard --- LAB_10_WRITEUP.md | 79 ++++ README.md | 21 +- load_data_to_bq.py | 193 +++++++-- pages/1_MTA_Ridership.py | 251 +++++------ pages/2_Second_Dataset.py | 126 ++++-- requirements.txt | 4 +- streamlit_app.py | 887 ++++++++++++++++++-------------------- utils.py | 240 ++++++++++- 8 files changed, 1122 insertions(+), 679 deletions(-) create mode 100644 LAB_10_WRITEUP.md diff --git a/LAB_10_WRITEUP.md b/LAB_10_WRITEUP.md new file mode 100644 index 0000000..3168142 --- /dev/null +++ b/LAB_10_WRITEUP.md @@ -0,0 +1,79 @@ +# Lab 10 Writeup + +## BigQuery Data Loading + +This project now uses BigQuery for every dataset shown in the Streamlit app. + +### Dataset 1: MTA Daily Ridership + +- Source: `https://data.ny.gov/resource/vxuj-8kew` +- BigQuery table: `sipa-adv-c-bouncing-penguin.mta_data.daily_ridership` +- Loading type: batch full refresh +- Why: the dataset is small, updated on a daily cadence, and easy to keep consistent by reloading the full table instead of managing row-by-row updates. + +### Dataset 2: NYC COVID-19 Daily Cases + +- Source: `https://data.cityofnewyork.us/resource/rc75-m7u3` +- BigQuery table: `sipa-adv-c-bouncing-penguin.mta_data.nyc_covid_cases` +- Loading type: batch full refresh +- Why: this table is also small enough for a daily refresh, and full replacement keeps the historical series in sync without extra incremental-loading logic. + +### Loader Script + +The repository includes `load_data_to_bq.py`, which: + +1. Authenticates with Google BigQuery +2. Creates the `mta_data` dataset if it does not already exist +3. Pulls source data from both Open Data APIs +4. Cleans date and numeric fields before upload +5. Replaces the target BigQuery tables +6. Verifies each upload with row counts and date ranges + +Run it with: + +```bash +python load_data_to_bq.py --dataset all +``` + +You can also load a single table: + +```bash +python load_data_to_bq.py --dataset mta +python load_data_to_bq.py --dataset covid +``` + +## App Changes for BigQuery + +The Streamlit app no longer reads API responses directly inside page files. + +- `utils.py` now provides shared BigQuery helpers for both datasets +- `streamlit_app.py` reads MTA data from BigQuery +- `pages/1_MTA_Ridership.py` reads MTA data from BigQuery +- `pages/2_Second_Dataset.py` reads COVID data from BigQuery + +This keeps all pages aligned with the lab requirement that every dataset come from BigQuery. + +## Performance Work + +To improve load time and make performance visible: + +- Each page uses a custom `display_load_time()` context manager and shows total load time in the UI +- BigQuery results are cached with Streamlit caching +- Queries select only the columns used by the app instead of `SELECT *` +- Repeated client setup is cached with a shared BigQuery client helper +- Basic data cleaning is centralized in `utils.py` so pages do less work on every rerun +- The homepage dashboard is split into lighter sections so each view renders only the charts needed for that section +- Default chart selections were reduced to fewer transit modes so the initial render sends fewer Plotly traces + +These changes improve both initial and subsequent page loads, while keeping the code easier to maintain. + +## Local Verification Steps + +1. Run `python load_data_to_bq.py --dataset all` +2. Run `streamlit run streamlit_app.py` +3. Open each page and confirm the caption shows the page load time +4. Record the screen while loading the main page and both sub-pages + +## Assumption + +I interpreted "repeat the middle steps from Part 5" as: load the datasets into BigQuery, point the app at BigQuery tables, and document the table-level setup in the repository. diff --git a/README.md b/README.md index ee64ffe..33bf2b9 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,11 @@ This project analyzes MTA daily ridership trends in New York City to understand - **NYC COVID-19 Daily Cases** https://data.cityofnewyork.us/Health/Coronavirus-Data/rc75-m7u3 +Both datasets are loaded into BigQuery for the Streamlit app: + +- `sipa-adv-c-bouncing-penguin.mta_data.daily_ridership` +- `sipa-adv-c-bouncing-penguin.mta_data.nyc_covid_cases` + ## Repository Structure - `streamlit_app.py` - homepage and project introduction @@ -33,7 +38,8 @@ This project analyzes MTA daily ridership trends in New York City to understand - `utils.py` - helper functions for cleaning and plotting - `validation.py` - Pandera schema validation - `tests/` - unit tests for utility and validation code -- `load_data_to_bq.py` - script for loading data into BigQuery +- `load_data_to_bq.py` - script for loading both datasets into BigQuery +- `LAB_10_WRITEUP.md` - Lab 10 notes on data loading and performance ## Setup @@ -43,7 +49,18 @@ This project analyzes MTA daily ridership trends in New York City to understand - Mac/Linux: `source .venv/bin/activate` - Windows: `.venv\Scripts\activate` 4. Install dependencies: `pip install -r requirements.txt` +5. Load the BigQuery tables: `python load_data_to_bq.py --dataset all` ## Usage -Open `mta_ridership_project.ipynb` in Jupyter Notebook or VS Code to run the analysis. +Run the Streamlit app locally: + +```bash +streamlit run streamlit_app.py +``` + +You can still open `mta_ridership_project.ipynb` in Jupyter Notebook or VS Code for notebook-based exploration. + +## Lab 10 + +Lab 10 documentation lives in [LAB_10_WRITEUP.md](./LAB_10_WRITEUP.md). diff --git a/load_data_to_bq.py b/load_data_to_bq.py index d9284fe..1d6a840 100644 --- a/load_data_to_bq.py +++ b/load_data_to_bq.py @@ -1,20 +1,102 @@ -"""Load MTA ridership data from NYC Open Data API into BigQuery.""" +"""Load project datasets from NYC Open Data into BigQuery.""" +import argparse +from dataclasses import dataclass import sys import pandas as pd -import pydata_google_auth - import pandas_gbq +import pydata_google_auth +import requests +from google.cloud import bigquery PROJECT_ID = "sipa-adv-c-bouncing-penguin" -DATASET_TABLE = "mta_data.daily_ridership" +DATASET_ID = "mta_data" SCOPES = [ "https://www.googleapis.com/auth/bigquery", ] +@dataclass(frozen=True) +class DataSource: + name: str + api_url: str + destination_table: str + order_column: str + date_columns: tuple[str, ...] + numeric_columns: tuple[str, ...] + + +DATA_SOURCES = { + "mta": DataSource( + name="MTA ridership", + api_url="https://data.ny.gov/resource/vxuj-8kew.json", + destination_table=f"{DATASET_ID}.daily_ridership", + order_column="date", + date_columns=("date",), + numeric_columns=( + "subways_total_estimated_ridership", + "subways_pct_of_comparable_pre_pandemic_day", + "buses_total_estimated_ridership", + "buses_pct_of_comparable_pre_pandemic_day", + "lirr_total_estimated_ridership", + "lirr_pct_of_comparable_pre_pandemic_day", + "metro_north_total_estimated_ridership", + "metro_north_pct_of_comparable_pre_pandemic_day", + "access_a_ride_total_scheduled_trips", + "access_a_ride_pct_of_comparable_pre_pandemic_day", + "bridges_and_tunnels_total_traffic", + "bridges_and_tunnels_pct_of_comparable_pre_pandemic_day", + "staten_island_railway_total_estimated_ridership", + "staten_island_railway_pct_of_comparable_pre_pandemic_day", + ), + ), + "covid": DataSource( + name="NYC COVID cases", + api_url="https://data.cityofnewyork.us/resource/rc75-m7u3.json", + destination_table=f"{DATASET_ID}.nyc_covid_cases", + order_column="date_of_interest", + date_columns=("date_of_interest",), + numeric_columns=( + "case_count", + "probable_case_count", + "hospitalized_count", + "death_count", + "probable_death_count", + "bx_case_count", + "bk_case_count", + "mn_case_count", + "qn_case_count", + "si_case_count", + ), + ), +} + +MTA_RENAME_MAP = { + "subways_of_comparable_pre_pandemic_day": "subways_pct_of_comparable_pre_pandemic_day", + "buses_of_comparable_pre_pandemic_day": "buses_pct_of_comparable_pre_pandemic_day", + "lirr_of_comparable_pre_pandemic_day": "lirr_pct_of_comparable_pre_pandemic_day", + "metro_north_of_comparable_pre_pandemic_day": "metro_north_pct_of_comparable_pre_pandemic_day", + "bridges_and_tunnels_of_comparable_pre_pandemic_day": "bridges_and_tunnels_pct_of_comparable_pre_pandemic_day", + "access_a_ride_of_comparable_pre_pandemic_day": "access_a_ride_pct_of_comparable_pre_pandemic_day", + "staten_island_railway_of_comparable_pre_pandemic_day": "staten_island_railway_pct_of_comparable_pre_pandemic_day", +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Load project datasets from NYC Open Data into BigQuery." + ) + parser.add_argument( + "--dataset", + choices=("all", "mta", "covid"), + default="all", + help="Which dataset to load. Defaults to all.", + ) + return parser.parse_args() + + def get_credentials(): """Get Google credentials with browser-based auth flow.""" print("Authenticating with Google... A browser window should open.") @@ -27,40 +109,101 @@ def get_credentials(): return credentials -def fetch_mta_data() -> pd.DataFrame: - """Pull MTA ridership data from NYC Open Data API.""" - print("Fetching MTA data from NYC Open Data API...") +def ensure_dataset_exists(credentials) -> None: + """Create the BigQuery dataset if it does not already exist.""" + client = bigquery.Client(project=PROJECT_ID, credentials=credentials) + dataset = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}") + dataset.location = "US" + client.create_dataset(dataset, exists_ok=True) + + +def fetch_source(source: DataSource) -> pd.DataFrame: + """Pull a dataset from an NYC Open Data endpoint.""" + print(f"Fetching {source.name} from {source.api_url} ...") sys.stdout.flush() - url = "https://data.ny.gov/resource/vxuj-8kew.csv?$limit=50000" - df = pd.read_csv(url) - df["date"] = pd.to_datetime(df["date"]) - print(f"Fetched {len(df)} rows (from {df['date'].min().date()} to {df['date'].max().date()})") - return df + response = requests.get( + source.api_url, + params={"$limit": 50000, "$order": source.order_column}, + timeout=60, + ) + response.raise_for_status() + df = pd.DataFrame(response.json()) + if df.empty: + raise RuntimeError(f"{source.name} returned no rows.") -def main(): - # Step 1: Authenticate - credentials = get_credentials() + if source.destination_table.endswith("daily_ridership"): + df = df.rename(columns=MTA_RENAME_MAP) + + for column in source.date_columns: + if column in df.columns: + df[column] = pd.to_datetime(df[column]) - # Step 2: Fetch data - df = fetch_mta_data() + for column in source.numeric_columns: + if column in df.columns: + df[column] = pd.to_numeric(df[column], errors="coerce") - # Step 3: Upload to BigQuery - print(f"Uploading to BigQuery: {PROJECT_ID}.{DATASET_TABLE} ...") + date_column = source.date_columns[0] + print( + "Fetched " + f"{len(df)} rows " + f"({df[date_column].min().date()} to {df[date_column].max().date()})" + ) + return df + + +def upload_source(df: pd.DataFrame, source: DataSource, credentials) -> None: + """Upload a dataframe into its destination BigQuery table.""" + print(f"Uploading to BigQuery: {PROJECT_ID}.{source.destination_table} ...") sys.stdout.flush() pandas_gbq.to_gbq( df, - destination_table=DATASET_TABLE, + destination_table=source.destination_table, project_id=PROJECT_ID, if_exists="replace", credentials=credentials, ) - print("Done! Data loaded to BigQuery successfully.") + print("Upload complete.") + + +def verify_source(source: DataSource, credentials) -> None: + """Print a quick verification summary for the target table.""" + date_column = source.date_columns[0] + query = f""" + SELECT + COUNT(*) AS row_count, + MIN(`{date_column}`) AS min_date, + MAX(`{date_column}`) AS max_date + FROM `{PROJECT_ID}.{source.destination_table}` + """ + result = pandas_gbq.read_gbq( + query, + project_id=PROJECT_ID, + credentials=credentials, + ) + row = result.iloc[0] + print( + "Verification: " + f"{row['row_count']} rows " + f"({pd.Timestamp(row['min_date']).date()} to {pd.Timestamp(row['max_date']).date()})" + ) + + +def main() -> None: + args = parse_args() + selected_keys = list(DATA_SOURCES) if args.dataset == "all" else [args.dataset] + + credentials = get_credentials() + ensure_dataset_exists(credentials) + + for key in selected_keys: + source = DATA_SOURCES[key] + df = fetch_source(source) + upload_source(df, source, credentials) + verify_source(source, credentials) + print("") - # Step 4: Verify - query = f"SELECT COUNT(*) as row_count FROM `{PROJECT_ID}.{DATASET_TABLE}`" - result = pandas_gbq.read_gbq(query, project_id=PROJECT_ID, credentials=credentials) - print(f"Verification: {result['row_count'].iloc[0]} rows in BigQuery table.") + print("Done! BigQuery tables are ready for the Streamlit app.") if __name__ == "__main__": diff --git a/pages/1_MTA_Ridership.py b/pages/1_MTA_Ridership.py index 6efa8f3..294e755 100644 --- a/pages/1_MTA_Ridership.py +++ b/pages/1_MTA_Ridership.py @@ -1,129 +1,132 @@ -import pandas as pd -import plotly.express as px -import requests +from datetime import date, timedelta + import streamlit as st +from utils import MTA_MIN_DATE, TRANSIT_MODES, display_load_time, load_mta_data + st.set_page_config(page_title="MTA Ridership", layout="wide") -st.title("📊 MTA Daily Ridership Analysis") - - -@st.cache_data(ttl=3600) -def load_mta_data(): - """Load MTA Daily Ridership data from NYC Open Data API.""" - url = "https://data.ny.gov/resource/vxuj-8kew.json" - all_data = [] - offset = 0 - limit = 50000 - while True: - params = {"$limit": limit, "$offset": offset, "$order": "date"} - response = requests.get(url, params=params) - data = response.json() - if not data: - break - all_data.extend(data) - offset += limit - df = pd.DataFrame(all_data) - df["date"] = pd.to_datetime(df["date"]) - numeric_cols = [ - "subways_total_estimated_ridership", - "subways_pct_of_comparable_pre_pandemic_day", - "buses_total_estimated_ridership", - "buses_pct_of_comparable_pre_pandemic_day", - "lirr_total_estimated_ridership", - "lirr_pct_of_comparable_pre_pandemic_day", - "metro_north_total_estimated_ridership", - "metro_north_pct_of_comparable_pre_pandemic_day", + + +def get_mta_page_columns(selected_services: list[str]) -> tuple[str, ...]: + columns = {"date", TRANSIT_MODES["Subway"]["ridership"], TRANSIT_MODES["Subway"]["recovery"]} + for service in selected_services: + mode_columns = TRANSIT_MODES.get(service, {}) + columns.update(mode_columns.values()) + return tuple(columns) + + +def main() -> None: + st.title("MTA Daily Ridership Analysis") + + selected_services = st.multiselect( + "Select services", + ["Subway", "Bus", "LIRR", "Metro-North"], + default=["Subway"], + key="mta_page_services_v3", + ) + if not selected_services: + st.info("Choose at least one service to display the ridership charts.") + return + + time_window = st.radio( + "Time window", + options=["Recent 180 days", "Recent 365 days", "Full history", "Custom range"], + index=0, + key="mta_page_time_window_v1", + ) + + try: + if time_window == "Recent 180 days": + df = load_mta_data(columns=get_mta_page_columns(selected_services), lookback_days=180) + elif time_window == "Recent 365 days": + df = load_mta_data(columns=get_mta_page_columns(selected_services), lookback_days=365) + elif time_window == "Full history": + df = load_mta_data(columns=get_mta_page_columns(selected_services)) + else: + today = date.today() + default_start = today - timedelta(days=180) + selected_dates = st.date_input( + "Date range", + value=(default_start, today), + min_value=MTA_MIN_DATE, + max_value=today, + key="mta_page_date_range_v3", + ) + start_date = default_start + end_date = today + if len(selected_dates) == 2: + start_date, end_date = selected_dates + df = load_mta_data( + columns=get_mta_page_columns(selected_services), + start_date=str(start_date), + end_date=str(end_date), + ) + except Exception as exc: + st.error(f"Failed to load MTA data from BigQuery: {exc}") + return + + st.caption( + "Source: BigQuery table `mta_data.daily_ridership` refreshed with `load_data_to_bq.py`." + ) + st.write( + f"Loaded {len(df)} rows from {df['date'].min().date()} to {df['date'].max().date()}." + ) + + st.caption("Fast default: recent 180 days. Expand only when you need the full history.") + rolling_window = st.slider( + "Rolling average (days)", + min_value=1, + max_value=60, + value=7, + key="mta_page_rolling_v2", + ) + + services = { + "Subway": TRANSIT_MODES["Subway"]["ridership"], + "Bus": TRANSIT_MODES["Bus"]["ridership"], + "LIRR": TRANSIT_MODES["LIRR"]["ridership"], + "Metro-North": TRANSIT_MODES["Metro-North"]["ridership"], + } + selected_services = [ + service for service in selected_services if services[service] in df.columns ] - for col in numeric_cols: - if col in df.columns: - df[col] = pd.to_numeric(df[col], errors="coerce") - return df - - -df = load_mta_data() - -st.write( - f"Data loaded: {len(df)} rows, from {df['date'].min().date()} to {df['date'].max().date()}" -) - -# --- Visualization 1: Ridership over time --- -st.subheader("Daily Ridership Over Time") - -services = { - "Subways": "subways_total_estimated_ridership", - "Buses": "buses_total_estimated_ridership", - "LIRR": "lirr_total_estimated_ridership", - "Metro-North": "metro_north_total_estimated_ridership", -} - -selected = st.multiselect( - "Select services:", list(services.keys()), default=["Subways"] -) - -if selected: - fig_data = df[["date"]].copy() - for s in selected: - fig_data[s] = df[services[s]] - - melted = fig_data.melt(id_vars="date", var_name="Service", value_name="Ridership") - fig = px.line( - melted, - x="date", - y="Ridership", - color="Service", - title="MTA Daily Ridership by Service", + if not selected_services: + st.error("The selected ridership columns are not available in the current BigQuery table.") + return + + st.subheader("Daily Ridership Over Time") + ridership_frame = df[["date"]].copy() + for service in selected_services: + ridership_frame[service] = df[services[service]].rolling(rolling_window).mean() + st.line_chart(ridership_frame.set_index("date"), height=320) + + st.subheader("Recovery Rate (% of Pre-Pandemic Levels)") + recovery_frame = df[["date"]].copy() + recovery_lookup = { + "Subway": TRANSIT_MODES["Subway"]["recovery"], + "Bus": TRANSIT_MODES["Bus"]["recovery"], + "LIRR": TRANSIT_MODES["LIRR"]["recovery"], + "Metro-North": TRANSIT_MODES["Metro-North"]["recovery"], + } + for service, column in recovery_lookup.items(): + if column not in df.columns: + continue + recovery_frame[service] = df[column].rolling(rolling_window).mean() + st.line_chart(recovery_frame.set_index("date"), height=320) + st.caption("The pre-pandemic baseline is 100% recovery.") + + st.subheader("Weekday vs Weekend Subway Ridership") + day_type_frame = df.copy() + day_type_frame["Day Type"] = day_type_frame["is_weekend"].map( + {True: "Weekend", False: "Weekday"} + ) + weekend_average = ( + day_type_frame.groupby("Day Type")[TRANSIT_MODES["Subway"]["ridership"]] + .mean() + .reset_index() ) - fig.update_layout(xaxis_title="Date", yaxis_title="Estimated Ridership") - st.plotly_chart(fig, use_container_width=True) - -# --- Visualization 2: Recovery percentage --- -st.subheader("Recovery Rate (% of Pre-Pandemic Levels)") - -recovery_cols = { - "Subways": "subways_pct_of_comparable_pre_pandemic_day", - "Buses": "buses_pct_of_comparable_pre_pandemic_day", - "LIRR": "lirr_pct_of_comparable_pre_pandemic_day", - "Metro-North": "metro_north_pct_of_comparable_pre_pandemic_day", -} - -recovery_data = df[["date"]].copy() -for name, col in recovery_cols.items(): - if col in df.columns: - recovery_data[name] = df[col] * 100 - -melted_recovery = recovery_data.melt( - id_vars="date", var_name="Service", value_name="% of Pre-Pandemic" -) -fig2 = px.line( - melted_recovery, - x="date", - y="% of Pre-Pandemic", - color="Service", - title="Ridership Recovery: % of Comparable Pre-Pandemic Day", -) -fig2.add_hline( - y=100, - line_dash="dash", - line_color="gray", - annotation_text="Pre-Pandemic Level", -) -fig2.update_layout(xaxis_title="Date", yaxis_title="% of Pre-Pandemic") -st.plotly_chart(fig2, use_container_width=True) - -# --- Weekday vs Weekend --- -st.subheader("Weekday vs. Weekend Ridership") -df["day_of_week"] = df["date"].dt.day_name() -df["is_weekend"] = df["date"].dt.dayofweek >= 5 - -weekend_avg = ( - df.groupby("is_weekend")["subways_total_estimated_ridership"].mean().reset_index() -) -weekend_avg["Type"] = weekend_avg["is_weekend"].map({True: "Weekend", False: "Weekday"}) -fig3 = px.bar( - weekend_avg, - x="Type", - y="subways_total_estimated_ridership", - title="Average Subway Ridership: Weekday vs Weekend", -) -fig3.update_layout(yaxis_title="Average Estimated Ridership") -st.plotly_chart(fig3, use_container_width=True) + st.bar_chart(weekend_average.set_index("Day Type")) + + +with display_load_time(): + main() diff --git a/pages/2_Second_Dataset.py b/pages/2_Second_Dataset.py index 1df5c1c..7cdc281 100644 --- a/pages/2_Second_Dataset.py +++ b/pages/2_Second_Dataset.py @@ -1,46 +1,86 @@ -import pandas as pd -import plotly.express as px -import requests +from datetime import date, timedelta + import streamlit as st +from utils import COVID_MIN_DATE, display_load_time, load_covid_data + st.set_page_config(page_title="NYC COVID Data", layout="wide") -st.title("🦠 NYC COVID-19 Cases (Second Dataset)") - -st.markdown( - """ -This page brings in NYC COVID-19 case data to contextualize MTA ridership -recovery patterns. -""" -) - - -@st.cache_data(ttl=3600) -def load_covid_data(): - url = "https://data.cityofnewyork.us/resource/rc75-m7u3.json" - params = {"$limit": 50000, "$order": "date_of_interest"} - response = requests.get(url, params=params) - df = pd.DataFrame(response.json()) - df["date_of_interest"] = pd.to_datetime(df["date_of_interest"]) - df["case_count"] = pd.to_numeric(df["case_count"], errors="coerce") - return df - - -df_covid = load_covid_data() - -st.write(f"Data: {len(df_covid)} rows") - -fig = px.line( - df_covid, - x="date_of_interest", - y="case_count", - title="NYC Daily COVID-19 Cases", -) -fig.update_layout(xaxis_title="Date", yaxis_title="Case Count") -st.plotly_chart(fig, use_container_width=True) - -st.markdown( - """ -**Connection to MTA Ridership:** Comparing COVID case surges with ridership -dips helps us understand how public health events drive transit behavior. -""" -) + + +def main() -> None: + st.title("NYC COVID-19 Cases") + st.markdown( + "This page uses BigQuery-hosted COVID case data to contextualize changes in MTA ridership." + ) + + time_window = st.radio( + "Time window", + options=["Recent 180 days", "Recent 365 days", "Full history", "Custom range"], + index=0, + key="covid_page_time_window_v1", + ) + + try: + if time_window == "Recent 180 days": + df = load_covid_data(lookback_days=180) + elif time_window == "Recent 365 days": + df = load_covid_data(lookback_days=365) + elif time_window == "Full history": + df = load_covid_data() + else: + today = date.today() + default_start = today - timedelta(days=180) + selected_dates = st.date_input( + "Date range", + value=(default_start, today), + min_value=COVID_MIN_DATE, + max_value=today, + key="covid_page_date_range_v3", + ) + start_date = default_start + end_date = today + if len(selected_dates) == 2: + start_date, end_date = selected_dates + df = load_covid_data(start_date=str(start_date), end_date=str(end_date)) + except Exception as exc: + st.error(f"Failed to load COVID data from BigQuery: {exc}") + return + + st.caption("Source: BigQuery table `mta_data.nyc_covid_cases`.") + st.write( + "Loaded " + f"{len(df)} rows from {df['date_of_interest'].min().date()} " + f"to {df['date_of_interest'].max().date()}." + ) + + st.caption("Fast default: recent 180 days. Expand only when you need the full history.") + rolling_window = st.slider( + "Rolling average (days)", + min_value=1, + max_value=30, + value=7, + key="covid_page_rolling_v2", + ) + + plot_df = df[["date_of_interest", "case_count"]].copy() + plot_df["7-day avg"] = plot_df["case_count"].rolling(rolling_window).mean() + + st.line_chart(plot_df.set_index("date_of_interest"), height=320) + + monthly = ( + df.groupby("year_month", as_index=False)["case_count"] + .mean() + .rename(columns={"case_count": "avg_case_count"}) + ) + st.bar_chart(monthly.set_index("year_month"), height=280) + + st.markdown( + """ + **Connection to MTA Ridership:** comparing COVID case surges with ridership dips helps + explain why commuter rail and subway recovery lagged during major waves. + """ + ) + + +with display_load_time(): + main() diff --git a/requirements.txt b/requirements.txt index b106761..ae4b34f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,6 @@ matplotlib pandera pandas-gbq google-cloud-bigquery -db-dtypes \ No newline at end of file +google-cloud-bigquery-storage +db-dtypes +pydata-google-auth diff --git a/streamlit_app.py b/streamlit_app.py index 58e52bd..a496f2f 100644 --- a/streamlit_app.py +++ b/streamlit_app.py @@ -1,10 +1,13 @@ +from datetime import date, timedelta + import pandas as pd import plotly.express as px -import plotly.graph_objects as go import streamlit as st from utils import ( + MTA_MIN_DATE, TRANSIT_MODES, + display_load_time, get_holiday_df, get_latest_recovery, get_weekday_weekend_comparison, @@ -13,548 +16,490 @@ st.set_page_config(page_title="MTA Ridership Dashboard", layout="wide") -st.title("🚇 MTA Ridership Recovery Dashboard") -st.markdown( - "Exploring MTA ridership trends and COVID-19 recovery patterns " - "across transit services in New York City." -) - -# --------------------------------------------------------------------------- -# Tabs: Dashboard vs Proposal -# --------------------------------------------------------------------------- -tab_dashboard, tab_proposal = st.tabs(["📊 Dashboard", "📝 Proposal"]) - -# =========================== -# DATA LOADING (cached) -# =========================== - - -@st.cache_data(ttl=3600) -def fetch_data(): - return load_mta_data() - -try: - df = fetch_data() - data_loaded = True -except Exception as e: - data_loaded = False - st.error(f"Failed to load data: {e}") - -# =========================== -# DASHBOARD TAB -# =========================== -with tab_dashboard: - if not data_loaded: - st.warning("Could not load MTA data. Please try again later.") - st.stop() - - # -- Sidebar filters -- - st.sidebar.header("Filters") - - min_date = df["date"].min().date() - max_date = df["date"].max().date() - date_range = st.sidebar.date_input( - "Date range", - value=(min_date, max_date), - min_value=min_date, - max_value=max_date, - ) - if len(date_range) == 2: - start_date, end_date = date_range - mask = (df["date"].dt.date >= start_date) & (df["date"].dt.date <= end_date) - filtered = df[mask].copy() +def get_dashboard_columns(view: str, selected_modes: list[str]) -> tuple[str, ...]: + columns = {"date"} + + if view == "Overview": + columns.add(TRANSIT_MODES["Subway"]["ridership"]) + for mode in selected_modes: + mode_columns = TRANSIT_MODES.get(mode, {}) + columns.update(mode_columns.values()) + for mode_columns in TRANSIT_MODES.values(): + columns.add(mode_columns["recovery"]) + elif view == "Comparison": + for mode_columns in TRANSIT_MODES.values(): + columns.add(mode_columns["recovery"]) + elif view == "Calendar": + for mode_columns in TRANSIT_MODES.values(): + columns.add(mode_columns["recovery"]) else: - filtered = df.copy() + columns.add(TRANSIT_MODES["Subway"]["recovery"]) - selected_modes = st.sidebar.multiselect( - "Transit modes", - options=list(TRANSIT_MODES.keys()), - default=["Subway", "Bus", "LIRR", "Metro-North"], - ) + return tuple(columns) - rolling_window = st.sidebar.slider( - "Rolling average (days)", min_value=1, max_value=60, value=7 - ) - # ------------------------------------------------------- - # Section 1: KPI Cards - # ------------------------------------------------------- +def render_kpis(filtered: pd.DataFrame) -> None: st.subheader("Current Recovery Snapshot") - st.caption("Average recovery rate over the most recent 30 days in the dataset") + st.caption("Average recovery rate over the most recent 30 days in the filtered view") recovery = get_latest_recovery(filtered, days=30) - kpi_cols = st.columns(len(recovery)) - for i, (mode, rate) in enumerate(recovery.items()): - with kpi_cols[i]: - st.metric( - label=mode, - value=f"{rate:.0%}", - delta=None, - ) + if not recovery: + st.info("No recovery metrics are available for the current filter selection.") + return + + kpi_columns = st.columns(len(recovery)) + for index, (mode, rate) in enumerate(recovery.items()): + with kpi_columns[index]: + st.metric(mode, f"{rate:.0%}") - st.markdown("---") - # ------------------------------------------------------- - # Section 2: Recovery Trend (interactive plotly) - # ------------------------------------------------------- +def render_recovery_chart( + filtered: pd.DataFrame, + selected_modes: list[str], + rolling_window: int, +) -> None: st.subheader("Recovery Trend Over Time") - fig_recovery = go.Figure() + chart_df = filtered[["date"]].copy() for mode in selected_modes: - col = TRANSIT_MODES[mode]["recovery"] - if col not in filtered.columns: + column = TRANSIT_MODES[mode]["recovery"] + if column not in filtered.columns: continue - series = filtered.set_index("date")[col].rolling(rolling_window).mean() - fig_recovery.add_trace( - go.Scatter( - x=series.index, - y=series.values, - mode="lines", - name=mode, - ) - ) + chart_df[mode] = filtered[column].rolling(rolling_window).mean() - # Baseline - fig_recovery.add_hline( - y=1.0, - line_dash="dash", - line_color="gray", - annotation_text="Pre-pandemic baseline (100%)", - ) + chart_df = chart_df.set_index("date").dropna(how="all") + if chart_df.empty: + st.info("No recovery series are available for the selected transit modes.") + return + + st.line_chart(chart_df, height=320) + st.caption("The pre-pandemic baseline is 100% recovery.") - fig_recovery.update_layout( - yaxis_title="% of Pre-Pandemic Ridership", - xaxis_title="Date", - hovermode="x unified", - legend=dict(orientation="h", y=-0.15), - height=500, - yaxis_tickformat=".0%", - ) - st.plotly_chart(fig_recovery, use_container_width=True) - # ------------------------------------------------------- - # Section 3: Total Ridership Trend - # ------------------------------------------------------- +def render_total_chart( + filtered: pd.DataFrame, + selected_modes: list[str], + rolling_window: int, +) -> None: st.subheader("Total Daily Ridership") - fig_total = go.Figure() + chart_df = filtered[["date"]].copy() for mode in selected_modes: - col = TRANSIT_MODES[mode]["ridership"] - if col not in filtered.columns: + column = TRANSIT_MODES[mode]["ridership"] + if column not in filtered.columns: continue - series = filtered.set_index("date")[col].rolling(rolling_window).mean() - fig_total.add_trace( - go.Scatter( - x=series.index, - y=series.values, - mode="lines", - name=mode, - ) - ) + chart_df[mode] = filtered[column].rolling(rolling_window).mean() + + chart_df = chart_df.set_index("date").dropna(how="all") + if chart_df.empty: + st.info("No ridership series are available for the selected transit modes.") + return + + st.line_chart(chart_df, height=320) + + +def render_subway_day_type_summary(filtered: pd.DataFrame) -> None: + st.subheader("Weekday vs Weekend Subway Ridership") + + subway_column = TRANSIT_MODES["Subway"]["ridership"] + if subway_column not in filtered.columns: + st.info("Subway ridership data is not available in the current dataset.") + return - fig_total.update_layout( - yaxis_title="Daily Ridership", - xaxis_title="Date", - hovermode="x unified", - legend=dict(orientation="h", y=-0.15), - height=500, + summary = filtered.copy() + summary["Day Type"] = summary["is_weekend"].map({True: "Weekend", False: "Weekday"}) + averages = ( + summary.groupby("Day Type")[subway_column] + .mean() + .reset_index() + .set_index("Day Type") ) - st.plotly_chart(fig_total, use_container_width=True) + st.bar_chart(averages, height=240) + + +def render_mode_recovery_summary(filtered: pd.DataFrame) -> None: + st.subheader("Average Recovery by Mode") + + rows = [] + for mode, columns in TRANSIT_MODES.items(): + recovery_column = columns["recovery"] + if recovery_column not in filtered.columns: + continue + rows.append({"Mode": mode, "Recovery": filtered[recovery_column].mean()}) + + if not rows: + st.info("No recovery summary is available for the current dataset.") + return + + summary_df = pd.DataFrame(rows).set_index("Mode") + st.bar_chart(summary_df, height=240) - # ------------------------------------------------------- - # Section 4: Weekday vs Weekend - # ------------------------------------------------------- + +def render_weekday_weekend(filtered: pd.DataFrame) -> None: st.subheader("Weekday vs Weekend Recovery") - available_years = sorted(filtered["year"].unique()) + available_years = [str(year) for year in sorted(filtered["year"].unique())] selected_year = st.selectbox( "Select year for comparison", - options=["All Years"] + available_years, + options=["All Years", *available_years], index=0, ) - year_val = None if selected_year == "All Years" else int(selected_year) - comparison = get_weekday_weekend_comparison(filtered, year=year_val) - - if not comparison.empty: - # Grouped bar chart - comp_melted = comparison.melt( - id_vars="Transit Mode", - value_vars=["Weekday Avg Recovery", "Weekend Avg Recovery"], - var_name="Day Type", - value_name="Recovery Rate", - ) - comp_melted["Day Type"] = comp_melted["Day Type"].str.replace( - " Avg Recovery", "" - ) + year_value = None if selected_year == "All Years" else int(selected_year) + comparison = get_weekday_weekend_comparison(filtered, year=year_value) + if comparison.empty: + st.info("No weekday/weekend comparison is available for the current filter.") + return + + comparison_long = comparison.melt( + id_vars="Transit Mode", + value_vars=["Weekday Avg Recovery", "Weekend Avg Recovery"], + var_name="Day Type", + value_name="Recovery Rate", + ) + comparison_long["Day Type"] = comparison_long["Day Type"].str.replace( + " Avg Recovery", + "", + ) + comparison_long["Recovery Percent"] = comparison_long["Recovery Rate"] * 100 + comparison_fig = px.bar( + comparison_long, + x="Transit Mode", + y="Recovery Percent", + color="Day Type", + barmode="group", + category_orders={"Day Type": ["Weekday", "Weekend"]}, + color_discrete_sequence=["#7dd3fc", "#2563eb"], + ) + comparison_fig.update_layout( + height=320, + margin=dict(l=0, r=0, t=10, b=0), + yaxis_title="Recovery Rate", + legend_title_text="", + ) + comparison_fig.update_yaxes(ticksuffix="%", rangemode="tozero") + st.plotly_chart(comparison_fig, use_container_width=True, config={"displayModeBar": False}) + + st.markdown("**Monthly Weekend Minus Weekday Gap (Subway)**") + subway_column = TRANSIT_MODES["Subway"]["recovery"] + monthly = ( + filtered.groupby(["year_month", "is_weekend"])[subway_column] + .mean() + .unstack() + .rename(columns={False: "Weekday", True: "Weekend"}) + ) + if monthly.empty: + st.info("Not enough data to compute the monthly weekday/weekend gap.") + return + + monthly["Gap"] = monthly["Weekend"] - monthly["Weekday"] + monthly = monthly.reset_index() + monthly["Gap Percent"] = monthly["Gap"] * 100 + gap_fig = px.bar( + monthly, + x="year_month", + y="Gap Percent", + color="Gap Percent", + color_continuous_scale="RdBu", + color_continuous_midpoint=0, + ) + gap_fig.update_layout( + height=300, + margin=dict(l=0, r=0, t=10, b=0), + xaxis_title="Month", + yaxis_title="Weekend - Weekday", + coloraxis_showscale=False, + ) + gap_fig.update_yaxes(ticksuffix="%", zeroline=True, zerolinewidth=1) + st.plotly_chart(gap_fig, use_container_width=True, config={"displayModeBar": False}) - fig_wkday = px.bar( - comp_melted, - x="Transit Mode", - y="Recovery Rate", - color="Day Type", - barmode="group", - color_discrete_map={"Weekday": "#636EFA", "Weekend": "#EF553B"}, - ) - fig_wkday.update_layout( - yaxis_tickformat=".0%", - yaxis_title="Avg Recovery Rate (% of Pre-Pandemic)", - height=450, - ) - st.plotly_chart(fig_wkday, use_container_width=True) - - # Weekday vs Weekend gap over time (monthly) - st.markdown("**Monthly Weekday-Weekend Gap (Subway)**") - subway_col = TRANSIT_MODES["Subway"]["recovery"] - if subway_col in filtered.columns: - monthly = ( - filtered.groupby(["year_month", "is_weekend"])[subway_col] - .mean() - .unstack() - .rename(columns={False: "Weekday", True: "Weekend"}) - ) - monthly["Gap"] = monthly["Weekend"] - monthly["Weekday"] - monthly = monthly.reset_index() - - fig_gap = px.bar( - monthly, - x="year_month", - y="Gap", - color="Gap", - color_continuous_scale=["#EF553B", "#CCCCCC", "#636EFA"], - color_continuous_midpoint=0, - ) - fig_gap.update_layout( - xaxis_title="Month", - yaxis_title="Weekend - Weekday Recovery Gap", - yaxis_tickformat=".0%", - height=350, - showlegend=False, - xaxis=dict(tickangle=-45, dtick=3), - ) - st.plotly_chart(fig_gap, use_container_width=True) - st.caption( - "Positive values mean weekend recovery is higher than weekday. " - "This is consistent with reduced weekday commuting due to remote work." - ) - # ------------------------------------------------------- - # Section 5: Holiday Impact - # ------------------------------------------------------- - st.subheader("Holiday & Event Impact on Ridership") +def render_holiday_impact(filtered: pd.DataFrame) -> None: + st.subheader("Holiday & Event Impact on Subway Recovery") holidays_df = get_holiday_df() holiday_names = sorted(holidays_df["holiday"].unique()) selected_holidays = st.multiselect( - "Select holidays/events to highlight", + "Select holidays or events to highlight", options=holiday_names, default=["Thanksgiving", "Christmas", "Congestion Pricing Launch"], ) + if not selected_holidays: + st.info("Choose at least one holiday or event to draw comparison lines.") + return + + subway_column = TRANSIT_MODES["Subway"]["recovery"] + if subway_column not in filtered.columns: + st.info("Subway recovery data is not available in the current dataset.") + return + + series = filtered.set_index("date")[subway_column].rolling(7).mean().rename("Subway") + st.line_chart(series, height=320) + + selected_rows = holidays_df[holidays_df["holiday"].isin(selected_holidays)] + visible_events = selected_rows[ + (selected_rows["date"] >= filtered["date"].min()) + & (selected_rows["date"] <= filtered["date"].max()) + ][["holiday", "date"]].copy() + if not visible_events.empty: + visible_events["date"] = visible_events["date"].dt.strftime("%Y-%m-%d") + st.dataframe(visible_events, use_container_width=True, hide_index=True) + + impact_rows = [] + for _, row in selected_rows.iterrows(): + holiday_date = pd.Timestamp(row["date"]) + holiday_window = filtered[ + (filtered["date"] >= holiday_date - pd.Timedelta(days=1)) + & (filtered["date"] <= holiday_date + pd.Timedelta(days=1)) + ] + baseline_window = filtered[ + (filtered["date"] >= holiday_date - pd.Timedelta(days=8)) + & (filtered["date"] < holiday_date - pd.Timedelta(days=1)) + ] + if holiday_window.empty or baseline_window.empty: + continue + holiday_average = holiday_window[subway_column].mean() + baseline_average = baseline_window[subway_column].mean() + impact_rows.append( + { + "Holiday": row["holiday"], + "Date": holiday_date.strftime("%Y-%m-%d"), + "Holiday Recovery": f"{holiday_average:.0%}", + "Prior Week Recovery": f"{baseline_average:.0%}", + "Change": f"{holiday_average - baseline_average:+.0%}", + } + ) - if selected_holidays: - fig_holiday = go.Figure() - - # Plot subway recovery as the background line - subway_col = TRANSIT_MODES["Subway"]["recovery"] - if subway_col in filtered.columns: - series = filtered.set_index("date")[subway_col].rolling(7).mean() - fig_holiday.add_trace( - go.Scatter( - x=series.index, - y=series.values, - mode="lines", - name="Subway (7-day avg)", - line=dict(color="#636EFA"), - ) - ) + if impact_rows: + st.dataframe(pd.DataFrame(impact_rows), use_container_width=True, hide_index=True) - # Add vertical lines for selected holidays - colors = px.colors.qualitative.Set2 - sel_holidays = holidays_df[holidays_df["holiday"].isin(selected_holidays)] - for i, holiday in enumerate(selected_holidays): - dates = sel_holidays[sel_holidays["holiday"] == holiday]["date"] - color = colors[i % len(colors)] - for j, d in enumerate(dates): - d = pd.Timestamp(d) - if filtered["date"].min() <= d <= filtered["date"].max(): - d_str = d.strftime("%Y-%m-%d") - fig_holiday.add_shape( - type="line", - x0=d_str, x1=d_str, - y0=0, y1=1, - yref="paper", - line=dict(dash="dot", color=color), - ) - if j == 0: - fig_holiday.add_annotation( - x=d_str, y=1, yref="paper", - text=holiday, - showarrow=False, - xanchor="left", - ) - - fig_holiday.update_layout( - yaxis_title="Subway Recovery (% of Pre-Pandemic)", - xaxis_title="Date", - yaxis_tickformat=".0%", - hovermode="x unified", - height=500, - ) - st.plotly_chart(fig_holiday, use_container_width=True) - - # Holiday impact table - st.markdown("**Average Subway Ridership Around Holidays**") - impact_rows = [] - for _, row in sel_holidays.iterrows(): - h_date = pd.Timestamp(row["date"]) - # 3-day window around the holiday - window = filtered[ - (filtered["date"] >= h_date - pd.Timedelta(days=1)) - & (filtered["date"] <= h_date + pd.Timedelta(days=1)) - ] - # Surrounding week for comparison - baseline = filtered[ - (filtered["date"] >= h_date - pd.Timedelta(days=8)) - & (filtered["date"] < h_date - pd.Timedelta(days=1)) - ] - if not window.empty and not baseline.empty: - h_avg = window[subway_col].mean() - b_avg = baseline[subway_col].mean() - impact_rows.append( - { - "Holiday": row["holiday"], - "Date": h_date.strftime("%Y-%m-%d"), - "Holiday Recovery": f"{h_avg:.0%}", - "Prior Week Recovery": f"{b_avg:.0%}", - "Change": f"{h_avg - b_avg:+.0%}", - } - ) - if impact_rows: - st.dataframe( - pd.DataFrame(impact_rows), - use_container_width=True, - hide_index=True, - ) - # ------------------------------------------------------- - # Section 6: Year-over-Year Recovery - # ------------------------------------------------------- +def render_yearly_recovery(filtered: pd.DataFrame) -> None: st.subheader("Year-over-Year Recovery by Transit Mode") - yearly_rows = [] + rows = [] for year in sorted(filtered["year"].unique()): year_data = filtered[filtered["year"] == year] - for mode, cols in TRANSIT_MODES.items(): - col = cols["recovery"] - if col in year_data.columns: - avg = year_data[col].mean() - yearly_rows.append( - { - "Year": str(year), - "Transit Mode": mode, - "Avg Recovery": avg, - } - ) + for mode, columns in TRANSIT_MODES.items(): + recovery_column = columns["recovery"] + if recovery_column not in year_data.columns: + continue + rows.append( + { + "Year": str(year), + "Transit Mode": mode, + "Avg Recovery": year_data[recovery_column].mean(), + } + ) + + if not rows: + st.info("No yearly recovery view is available for the current filter.") + return + + yearly_df = pd.DataFrame(rows) + yearly_df["Avg Recovery Percent"] = yearly_df["Avg Recovery"] * 100 + yearly_fig = px.bar( + yearly_df, + x="Year", + y="Avg Recovery Percent", + color="Transit Mode", + barmode="group", + color_discrete_sequence=["#60a5fa", "#f97316", "#ef4444", "#34d399", "#a78bfa"], + ) + yearly_fig.update_layout( + height=340, + margin=dict(l=0, r=0, t=10, b=0), + yaxis_title="Average Recovery", + legend_title_text="", + ) + yearly_fig.update_yaxes(ticksuffix="%", rangemode="tozero") + st.plotly_chart(yearly_fig, use_container_width=True, config={"displayModeBar": False}) - if yearly_rows: - yearly_df = pd.DataFrame(yearly_rows) - fig_yearly = px.bar( - yearly_df, - x="Year", - y="Avg Recovery", - color="Transit Mode", - barmode="group", - ) - fig_yearly.update_layout( - yaxis_title="Avg Recovery Rate (% of Pre-Pandemic)", - yaxis_tickformat=".0%", - height=450, - ) - st.plotly_chart(fig_yearly, use_container_width=True) - # ------------------------------------------------------- - # Section 7: Day-of-Week Heatmap - # ------------------------------------------------------- +def render_heatmap(filtered: pd.DataFrame) -> None: st.subheader("Ridership by Day of Week") - heatmap_mode = st.selectbox( + selected_mode = st.selectbox( "Select transit mode for heatmap", options=list(TRANSIT_MODES.keys()), index=0, ) - heatmap_col = TRANSIT_MODES[heatmap_mode]["recovery"] - if heatmap_col in filtered.columns: - pivot = filtered.groupby(["year", "day_name"])[heatmap_col].mean().reset_index() - day_order = [ - "Monday", - "Tuesday", - "Wednesday", - "Thursday", - "Friday", - "Saturday", - "Sunday", - ] - pivot["day_name"] = pd.Categorical( - pivot["day_name"], categories=day_order, ordered=True - ) - pivot = pivot.sort_values(["year", "day_name"]) - pivot_wide = pivot.pivot(index="day_name", columns="year", values=heatmap_col) - - fig_heat = px.imshow( - pivot_wide, - color_continuous_scale="RdYlGn", - aspect="auto", - labels=dict(x="Year", y="Day of Week", color="Recovery %"), - zmin=0, - zmax=1.2, - ) - fig_heat.update_layout(height=350) - st.plotly_chart(fig_heat, use_container_width=True) + recovery_column = TRANSIT_MODES[selected_mode]["recovery"] + if recovery_column not in filtered.columns: + st.info("The selected transit mode is unavailable in the current dataset.") + return + + pivot = filtered.groupby(["year", "day_name"])[recovery_column].mean().reset_index() + day_order = [ + "Monday", + "Tuesday", + "Wednesday", + "Thursday", + "Friday", + "Saturday", + "Sunday", + ] + pivot["day_name"] = pd.Categorical(pivot["day_name"], categories=day_order, ordered=True) + pivot = pivot.sort_values(["year", "day_name"]) + pivot_wide = pivot.pivot(index="day_name", columns="year", values=recovery_column) + + st.dataframe( + pivot_wide.style.format("{:.0%}").background_gradient(cmap="RdYlGn"), + use_container_width=True, + ) + + +def render_dashboard( + df: pd.DataFrame, + view: str, + selected_modes: list[str], + rolling_window: int, +) -> None: + st.sidebar.header("Filters") + st.sidebar.caption("Fast default: recent 180 days. Expand the range only when needed.") + + if df.empty: + st.warning("No data is available for the current filters.") + return + + st.caption("Sections are split to keep each page load fast while preserving the full analysis.") + + if view == "Overview": + render_kpis(df) + st.markdown("---") + render_recovery_chart(df, selected_modes, rolling_window) + render_total_chart(df, selected_modes, rolling_window) + render_subway_day_type_summary(df) + render_mode_recovery_summary(df) + elif view == "Comparison": + render_weekday_weekend(df) + render_yearly_recovery(df) + elif view == "Calendar": + render_heatmap(df) + else: + render_holiday_impact(df) - # ------------------------------------------------------- - # Footer - # ------------------------------------------------------- st.markdown("---") st.caption( - "Data source: [MTA Daily Ridership Data](https://data.ny.gov/Transportation/" - "MTA-Daily-Ridership-Data-Beginning-2020/vxuj-8kew) | " - "Team bouncing-penguin: Haixin Liu & Hanghai Li" + "Data source: BigQuery tables " + "`mta_data.daily_ridership` and supporting holiday metadata in the app." ) -# =========================== -# PROPOSAL TAB -# =========================== -with tab_proposal: - st.header("Project Proposal") - st.subheader("Background and Motivation") - st.markdown(""" - The COVID-19 pandemic caused an unprecedented drop in public transit ridership across - New York City. At its lowest point in April 2020, subway ridership fell to roughly - 10% of normal levels, and other MTA services experienced similar declines. Since then, - ridership has been gradually recovering, but the pace and pattern of that recovery - has varied significantly depending on the transit mode, time of week, and even - specific events or holidays. - - As of late 2025, subway ridership has climbed back to about 85% of pre-pandemic levels, - while the Long Island Rail Road (LIRR) has reached 92% and Metro-North sits at around 88%. - Bridges and tunnels traffic has actually exceeded pre-pandemic levels at roughly 105%, - suggesting a shift in how New Yorkers choose to commute. Paratransit ridership has surged - to 161% of pre-pandemic levels, pointing to growing demand for accessible transit options. - These differences raise interesting questions about what drives recovery in different parts - of the transit system and whether these patterns will continue. - - Understanding these recovery dynamics matters not just for transit planning but also - for broader urban policy. Transit ridership affects fare revenue, congestion, air quality, - and economic activity across the region. - """) +def render_proposal() -> None: + st.header("Project Proposal") st.subheader("Research Questions") - st.markdown(""" - We started this project with three main research questions. After working through the - data over the past few weeks, we've refined them based on what we've actually been - able to observe: - - **1. How do weekday vs. weekend ridership patterns differ across MTA services, and - has that gap changed over time?** - - Our original question was simply about weekday vs. weekend differences, but we've found - that the more interesting story is how that gap has evolved. Early in the pandemic, - weekend ridership actually recovered faster than weekday ridership for subways and - commuter rail, likely due to remote work reducing weekday commuting. We want to explore - whether that trend has continued or whether weekday ridership is catching up as - return-to-office policies have become more common. - - **2. How do holidays and major events affect ridership across different transit modes?** - - We initially framed this broadly, but we're now focusing on specific events: major holidays - (Thanksgiving, Christmas, July 4th, New Year's), large-scale events (marathon, Times Square - NYE), and policy changes like the launch of congestion pricing in early 2025. The congestion - pricing angle is particularly interesting because it directly connects transit policy to - ridership behavior. - - **3. Which transit modes have recovered fastest, and what factors explain the differences?** - - This remains our core question, but we've added more nuance. Rather than just looking at - which mode recovered fastest, we're now also examining the rate of recovery over time. - For example, LIRR's recovery accelerated after Grand Central Madison opened, and bus - ridership got a boost from the Queens Bus Network Redesign. We want to see whether these - service improvements show up clearly in the data. - """) - - st.subheader("Dataset") - st.markdown(""" - We're using the **MTA Daily Ridership Data** from the New York State Open Data portal - (data.ny.gov), which is updated daily and covers all major MTA services starting from - March 2020. - - The dataset includes daily total ridership estimates and the percentage of comparable - pre-pandemic day ridership for each transit mode: Subways, Buses, LIRR, Metro-North, - Access-A-Ride, and Bridges & Tunnels. This gives us both absolute numbers and a built-in - recovery metric (the pre-pandemic percentage), which makes cross-mode comparison - straightforward. - - One limitation we've noticed is that the "comparable pre-pandemic day" metric can be noisy - around holidays, since the comparison day may not perfectly match the current day's - conditions. We handle this by using rolling averages for trend analysis instead of relying - on individual daily values. - - We pull the data directly from the NYC Open Data API so the dashboard always reflects - the most recent available data without needing manual updates. - """) - - st.subheader("Methodology") - st.markdown(""" - Our analysis approach includes the following: - - - **Recovery trend analysis:** We track the pre-pandemic percentage for each transit mode - over time, using 7-day and 30-day rolling averages to smooth out daily fluctuations. - This helps us identify the overall trajectory and any inflection points. + st.markdown( + """ + 1. How have subway, bus, LIRR, and Metro-North recovered relative to comparable pre-pandemic days? + 2. How different are weekday and weekend recovery patterns, and how has that gap changed over time? + 3. Do holidays, major events, and policy changes line up with visible ridership shifts? + """ + ) - - **Weekday vs. weekend comparison:** We categorize each day as weekday or weekend, - then compare average ridership and recovery rates for each transit mode across these - two groups. We also look at how this gap has changed year over year. + st.subheader("Datasets") + st.markdown( + """ + The app uses two BigQuery-backed datasets: - - **Holiday and event impact:** We flag known holidays and major events in the data - and examine ridership patterns in the days surrounding them. We compare holiday - ridership to the surrounding week's average to quantify the impact. + - `mta_data.daily_ridership`: the statewide MTA daily ridership dataset from `data.ny.gov` + - `mta_data.nyc_covid_cases`: NYC daily COVID case counts from `data.cityofnewyork.us` - - **Cross-mode comparison:** We rank transit modes by their recovery rate and visualize - them side by side. We also look at whether modes that serve different geographic areas - or rider demographics have recovered differently. + We use batch loading for both sources because the tables are relatively small, update on a daily cadence, + and are easier to keep consistent with a full refresh than with event-by-event ingestion. + """ + ) - All visualizations use Plotly for interactivity, allowing users to zoom in on specific - time periods, toggle transit modes on and off, and hover over data points for details. - """) + st.subheader("Methodology") + st.markdown( + """ + The dashboard focuses on rolling averages, weekday-versus-weekend comparisons, and event windows around + holidays. Those views let us compare absolute ridership and recovery percentages without moving large + raw files into Streamlit on every rerun. + """ + ) - st.subheader("Preliminary Findings") - st.markdown(""" - Based on our analysis so far, here are the key patterns we've identified: + st.subheader("Performance Notes") + st.markdown( + """ + For Lab 10, the app now reads both datasets from BigQuery, caches query results in Streamlit, + selects only the columns each page actually needs, and shows total page load time with a custom + context manager. + """ + ) - - **Commuter rail is recovering faster than subway and bus.** LIRR leads at 92% recovery, - followed by Metro-North at 88%, while subway sits at 85%. This may reflect the - return-to-office trend among suburban commuters and service improvements like Grand - Central Madison. - - **Bridges and tunnels traffic has fully recovered and then some**, currently at 105% of - pre-pandemic levels. This suggests some riders may have permanently shifted from transit - to driving, or that overall regional travel volume has increased. +def main() -> None: + st.title("MTA Ridership Recovery Dashboard") + st.markdown( + "Explore how New York City's transit system has recovered since 2020 using BigQuery-backed data." + ) - - **Weekend ridership recovery has been proportionally stronger than weekday ridership** - for subway and bus, consistent with the shift toward remote and hybrid work reducing - traditional weekday commuting. + page = st.radio("View", ["Dashboard", "Proposal"], horizontal=True) + if page == "Dashboard": + view = st.radio( + "Dashboard section", + options=["Overview", "Comparison", "Calendar", "Events"], + horizontal=True, + key="dashboard_section_v2", + ) + selected_modes = st.sidebar.multiselect( + "Transit modes", + options=list(TRANSIT_MODES.keys()), + default=["Subway"], + key="dashboard_modes_v2", + ) + rolling_window = st.sidebar.slider( + "Rolling average (days)", + 1, + 60, + 7, + key="dashboard_rolling_v2", + ) + time_window = st.sidebar.radio( + "Time window", + options=["Recent 180 days", "Recent 365 days", "Full history", "Custom range"], + index=0, + key="dashboard_time_window_v1", + ) - - **Paratransit demand has surged well beyond pre-pandemic levels** (161%), indicating - growing need for accessible transit services that goes beyond simple pandemic recovery. + requested_columns = get_dashboard_columns(view, selected_modes) + try: + if time_window == "Recent 180 days": + df = load_mta_data(columns=requested_columns, lookback_days=180) + elif time_window == "Recent 365 days": + df = load_mta_data(columns=requested_columns, lookback_days=365) + elif time_window == "Full history": + df = load_mta_data(columns=requested_columns) + else: + today = date.today() + default_start = today - timedelta(days=180) + selected_dates = st.sidebar.date_input( + "Date range", + value=(default_start, today), + min_value=MTA_MIN_DATE, + max_value=today, + key="dashboard_date_range_v3", + ) + start_date = default_start + end_date = today + if len(selected_dates) == 2: + start_date, end_date = selected_dates + df = load_mta_data( + columns=requested_columns, + start_date=str(start_date), + end_date=str(end_date), + ) + except Exception as exc: + st.error(f"Failed to load data from BigQuery: {exc}") + return - - **Recovery is not linear.** There are clear seasonal dips (winter holidays, summer), - and specific events like congestion pricing launch appear to have boosted transit - ridership in early 2025. + render_dashboard(df, view, selected_modes, rolling_window) + else: + render_proposal() - These findings are preliminary and will be refined as we build out the full dashboard - with interactive visualizations. - """) - st.markdown("---") - st.markdown("**Team bouncing-penguin:** Haixin Liu & Hanghai Li") +with display_load_time(): + main() diff --git a/utils.py b/utils.py index e7e6026..4db9f6a 100644 --- a/utils.py +++ b/utils.py @@ -1,29 +1,241 @@ -import matplotlib.pyplot as plt +from contextlib import contextmanager +from datetime import date +import time +from typing import Iterable + import pandas as pd +import streamlit as st from google.cloud import bigquery from google.oauth2 import service_account PROJECT_ID = "sipa-adv-c-bouncing-penguin" -DATASET_TABLE = "mta_data.daily_ridership" +MTA_TABLE = "mta_data.daily_ridership" +COVID_TABLE = "mta_data.nyc_covid_cases" +MTA_MIN_DATE = date(2020, 3, 1) +COVID_MIN_DATE = date(2020, 3, 1) +MTA_COLUMNS = ( + "date", + "subways_total_estimated_ridership", + "subways_pct_of_comparable_pre_pandemic_day", + "buses_total_estimated_ridership", + "buses_pct_of_comparable_pre_pandemic_day", + "lirr_total_estimated_ridership", + "lirr_pct_of_comparable_pre_pandemic_day", + "metro_north_total_estimated_ridership", + "metro_north_pct_of_comparable_pre_pandemic_day", + "bridges_and_tunnels_total_traffic", + "bridges_and_tunnels_pct_of_comparable_pre_pandemic_day", +) + +COVID_COLUMNS = ( + "date_of_interest", + "case_count", +) + +MTA_LEGACY_COLUMN_MAP = { + "subways_pct_of_comparable_pre_pandemic_day": "subways_of_comparable_pre_pandemic_day", + "buses_pct_of_comparable_pre_pandemic_day": "buses_of_comparable_pre_pandemic_day", + "lirr_pct_of_comparable_pre_pandemic_day": "lirr_of_comparable_pre_pandemic_day", + "metro_north_pct_of_comparable_pre_pandemic_day": "metro_north_of_comparable_pre_pandemic_day", + "bridges_and_tunnels_pct_of_comparable_pre_pandemic_day": "bridges_and_tunnels_of_comparable_pre_pandemic_day", + "access_a_ride_pct_of_comparable_pre_pandemic_day": "access_a_ride_of_comparable_pre_pandemic_day", + "staten_island_railway_pct_of_comparable_pre_pandemic_day": "staten_island_railway_of_comparable_pre_pandemic_day", +} -def load_mta_data() -> pd.DataFrame: - """Load MTA ridership data from BigQuery.""" - try: - import streamlit as st +@st.cache_resource(show_spinner=False) +def get_bigquery_client() -> bigquery.Client: + """Create and cache the BigQuery client used by the app.""" + try: credentials = service_account.Credentials.from_service_account_info( st.secrets["gcp_service_account"] ) - client = bigquery.Client(credentials=credentials, project=PROJECT_ID) + return bigquery.Client(credentials=credentials, project=PROJECT_ID) except Exception: # Fallback: use default credentials (e.g. local gcloud auth) - client = bigquery.Client(project=PROJECT_ID) + return bigquery.Client(project=PROJECT_ID) + + +def _render_select_clause(columns: Iterable[str]) -> str: + return ", ".join(f"`{column}`" for column in columns) + + +def _get_table_columns(table_name: str) -> set[str]: + client = get_bigquery_client() + table = client.get_table(f"{PROJECT_ID}.{table_name}") + return {field.name for field in table.schema} + + +def _resolve_mta_columns(columns: Iterable[str]) -> list[str]: + available_columns = _get_table_columns(MTA_TABLE) + resolved_columns = [] + + for column in columns: + if column in available_columns: + resolved_columns.append(f"`{column}`") + continue + + legacy_column = MTA_LEGACY_COLUMN_MAP.get(column) + if legacy_column and legacy_column in available_columns: + resolved_columns.append(f"`{legacy_column}` AS `{column}`") + continue + + return resolved_columns + + +def _get_source_column( + table_name: str, + output_column: str, + available_columns: set[str], +) -> str | None: + if output_column in available_columns: + return output_column + + if table_name == MTA_TABLE: + legacy_column = MTA_LEGACY_COLUMN_MAP.get(output_column) + if legacy_column in available_columns: + return legacy_column + + return None + + +def _build_select_expressions( + table_name: str, + requested_columns: Iterable[str], +) -> tuple[list[str], str]: + available_columns = _get_table_columns(table_name) + date_column = "date" if table_name == MTA_TABLE else "date_of_interest" + expressions = [] + + for output_column in requested_columns: + source_column = _get_source_column(table_name, output_column, available_columns) + if not source_column: + continue + + if output_column == date_column: + expressions.append( + f"SAFE_CAST(`{source_column}` AS DATE) AS `{output_column}`" + ) + else: + expressions.append( + f"SAFE_CAST(`{source_column}` AS FLOAT64) AS `{output_column}`" + ) - query = f"SELECT * FROM `{PROJECT_ID}.{DATASET_TABLE}`" - df = client.query(query).to_dataframe() - df = clean_mta_df(df) - return df + return expressions, date_column + + +def _load_table( + table_name: str, + columns: Iterable[str], + order_by: str, + start_date: str | None = None, + end_date: str | None = None, + lookback_days: int | None = None, +) -> pd.DataFrame: + select_expressions, date_column = _build_select_expressions(table_name, columns) + if not select_expressions: + raise KeyError(f"No requested columns were found in BigQuery table {table_name}.") + + where_clause = "" + if lookback_days is not None: + where_clause = f""" + WHERE `{date_column}` BETWEEN ( + SELECT DATE_SUB(MAX(`{date_column}`), INTERVAL {lookback_days} DAY) + FROM normalized + ) AND ( + SELECT MAX(`{date_column}`) + FROM normalized + ) + """ + elif start_date and end_date: + where_clause = ( + f"\n WHERE `{date_column}` BETWEEN '{start_date}' AND '{end_date}'" + ) + + query = f""" + WITH normalized AS ( + SELECT + {", ".join(select_expressions)} + FROM `{PROJECT_ID}.{table_name}` + ) + SELECT * + FROM normalized + {where_clause} + ORDER BY `{order_by}` + """ + client = get_bigquery_client() + query_job = client.query(query) + return query_job.to_dataframe(create_bqstorage_client=False) + + +@st.cache_data(show_spinner=False, persist="disk") +def load_mta_data( + columns: tuple[str, ...] = MTA_COLUMNS, + start_date: str | None = None, + end_date: str | None = None, + lookback_days: int | None = None, +) -> pd.DataFrame: + """Load MTA ridership data from BigQuery.""" + df = _load_table( + MTA_TABLE, + columns, + order_by="date", + start_date=start_date, + end_date=end_date, + lookback_days=lookback_days, + ) + return clean_mta_df(df) + + +def clean_covid_df(df: pd.DataFrame) -> pd.DataFrame: + out = df.copy() + + if "date_of_interest" not in out.columns: + raise KeyError("Missing 'date_of_interest' column") + + out["date_of_interest"] = pd.to_datetime(out["date_of_interest"]) + out = out.sort_values("date_of_interest").reset_index(drop=True) + + if "case_count" in out.columns: + out["case_count"] = pd.to_numeric(out["case_count"], errors="coerce") + + out["year"] = out["date_of_interest"].dt.year + out["month"] = out["date_of_interest"].dt.month + out["year_month"] = out["date_of_interest"].dt.to_period("M").astype(str) + + return out + + +@st.cache_data(show_spinner=False, persist="disk") +def load_covid_data( + columns: tuple[str, ...] = COVID_COLUMNS, + start_date: str | None = None, + end_date: str | None = None, + lookback_days: int | None = None, +) -> pd.DataFrame: + """Load NYC COVID case data from BigQuery.""" + df = _load_table( + COVID_TABLE, + columns, + order_by="date_of_interest", + start_date=start_date, + end_date=end_date, + lookback_days=lookback_days, + ) + return clean_covid_df(df) + + +@contextmanager +def display_load_time(): + """Display total Streamlit page load time in the footer.""" + start_time = time.perf_counter() + + try: + yield + finally: + elapsed = time.perf_counter() - start_time + st.caption(f"Page loaded in {elapsed:.2f} seconds") def clean_mta_df(df: pd.DataFrame) -> pd.DataFrame: @@ -192,8 +404,10 @@ def get_weekday_weekend_comparison(df: pd.DataFrame, year: int = None) -> pd.Dat return pd.DataFrame(rows) -def plot_ridership_recovery(df: pd.DataFrame) -> plt.Figure: +def plot_ridership_recovery(df: pd.DataFrame): """Plot MTA ridership recovery by transit mode as % of pre-pandemic levels.""" + import matplotlib.pyplot as plt + required_cols = [ "date", "subways_pct_of_comparable_pre_pandemic_day",