Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions LAB_10_WRITEUP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Lab 10 Writeup

## BigQuery Data Loading

This project now uses BigQuery for every dataset shown in the Streamlit app.

### Dataset 1: MTA Daily Ridership

- Source: `https://data.ny.gov/resource/vxuj-8kew`
- BigQuery table: `sipa-adv-c-bouncing-penguin.mta_data.daily_ridership`
- Loading type: batch full refresh
- Why: the dataset is small, updated on a daily cadence, and easy to keep consistent by reloading the full table instead of managing row-by-row updates.

### Dataset 2: NYC COVID-19 Daily Cases

- Source: `https://data.cityofnewyork.us/resource/rc75-m7u3`
- BigQuery table: `sipa-adv-c-bouncing-penguin.mta_data.nyc_covid_cases`
- Loading type: batch full refresh
- Why: this table is also small enough for a daily refresh, and full replacement keeps the historical series in sync without extra incremental-loading logic.

### Loader Script

The repository includes `load_data_to_bq.py`, which:

1. Authenticates with Google BigQuery
2. Creates the `mta_data` dataset if it does not already exist
3. Pulls source data from both Open Data APIs
4. Cleans date and numeric fields before upload
5. Replaces the target BigQuery tables
6. Verifies each upload with row counts and date ranges

Run it with:

```bash
python load_data_to_bq.py --dataset all
```

You can also load a single table:

```bash
python load_data_to_bq.py --dataset mta
python load_data_to_bq.py --dataset covid
```

## App Changes for BigQuery

The Streamlit app no longer reads API responses directly inside page files.

- `utils.py` now provides shared BigQuery helpers for both datasets
- `streamlit_app.py` reads MTA data from BigQuery
- `pages/1_MTA_Ridership.py` reads MTA data from BigQuery
- `pages/2_Second_Dataset.py` reads COVID data from BigQuery

This keeps all pages aligned with the lab requirement that every dataset come from BigQuery.

## Performance Work

To improve load time and make performance visible:

- Each page uses a custom `display_load_time()` context manager and shows total load time in the UI
- BigQuery results are cached with Streamlit caching
- Queries select only the columns used by the app instead of `SELECT *`
- Repeated client setup is cached with a shared BigQuery client helper
- Basic data cleaning is centralized in `utils.py` so pages do less work on every rerun
- The homepage dashboard is split into lighter sections so each view renders only the charts needed for that section
- Default chart selections were reduced to fewer transit modes so the initial render sends fewer Plotly traces

These changes improve both initial and subsequent page loads, while keeping the code easier to maintain.

## Local Verification Steps

1. Run `python load_data_to_bq.py --dataset all`
2. Run `streamlit run streamlit_app.py`
3. Open each page and confirm the caption shows the page load time
4. Record the screen while loading the main page and both sub-pages

## Assumption

I interpreted "repeat the middle steps from Part 5" as: load the datasets into BigQuery, point the app at BigQuery tables, and document the table-level setup in the repository.
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ This project analyzes MTA daily ridership trends in New York City to understand
- **NYC COVID-19 Daily Cases**
https://data.cityofnewyork.us/Health/Coronavirus-Data/rc75-m7u3

Both datasets are loaded into BigQuery for the Streamlit app:

- `sipa-adv-c-bouncing-penguin.mta_data.daily_ridership`
- `sipa-adv-c-bouncing-penguin.mta_data.nyc_covid_cases`

## Repository Structure

- `streamlit_app.py` - homepage and project introduction
Expand All @@ -33,7 +38,8 @@ This project analyzes MTA daily ridership trends in New York City to understand
- `utils.py` - helper functions for cleaning and plotting
- `validation.py` - Pandera schema validation
- `tests/` - unit tests for utility and validation code
- `load_data_to_bq.py` - script for loading data into BigQuery
- `load_data_to_bq.py` - script for loading both datasets into BigQuery
- `LAB_10_WRITEUP.md` - Lab 10 notes on data loading and performance

## Setup

Expand All @@ -43,7 +49,18 @@ This project analyzes MTA daily ridership trends in New York City to understand
- Mac/Linux: `source .venv/bin/activate`
- Windows: `.venv\Scripts\activate`
4. Install dependencies: `pip install -r requirements.txt`
5. Load the BigQuery tables: `python load_data_to_bq.py --dataset all`

## Usage

Open `mta_ridership_project.ipynb` in Jupyter Notebook or VS Code to run the analysis.
Run the Streamlit app locally:

```bash
streamlit run streamlit_app.py
```

You can still open `mta_ridership_project.ipynb` in Jupyter Notebook or VS Code for notebook-based exploration.

## Lab 10

Lab 10 documentation lives in [LAB_10_WRITEUP.md](./LAB_10_WRITEUP.md).
193 changes: 168 additions & 25 deletions load_data_to_bq.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,102 @@
"""Load MTA ridership data from NYC Open Data API into BigQuery."""
"""Load project datasets from NYC Open Data into BigQuery."""

import argparse
from dataclasses import dataclass
import sys

import pandas as pd
import pydata_google_auth

import pandas_gbq
import pydata_google_auth
import requests
from google.cloud import bigquery

PROJECT_ID = "sipa-adv-c-bouncing-penguin"
DATASET_TABLE = "mta_data.daily_ridership"
DATASET_ID = "mta_data"

SCOPES = [
"https://www.googleapis.com/auth/bigquery",
]


@dataclass(frozen=True)
class DataSource:
name: str
api_url: str
destination_table: str
order_column: str
date_columns: tuple[str, ...]
numeric_columns: tuple[str, ...]


DATA_SOURCES = {
"mta": DataSource(
name="MTA ridership",
api_url="https://data.ny.gov/resource/vxuj-8kew.json",
destination_table=f"{DATASET_ID}.daily_ridership",
order_column="date",
date_columns=("date",),
numeric_columns=(
"subways_total_estimated_ridership",
"subways_pct_of_comparable_pre_pandemic_day",
"buses_total_estimated_ridership",
"buses_pct_of_comparable_pre_pandemic_day",
"lirr_total_estimated_ridership",
"lirr_pct_of_comparable_pre_pandemic_day",
"metro_north_total_estimated_ridership",
"metro_north_pct_of_comparable_pre_pandemic_day",
"access_a_ride_total_scheduled_trips",
"access_a_ride_pct_of_comparable_pre_pandemic_day",
"bridges_and_tunnels_total_traffic",
"bridges_and_tunnels_pct_of_comparable_pre_pandemic_day",
"staten_island_railway_total_estimated_ridership",
"staten_island_railway_pct_of_comparable_pre_pandemic_day",
),
),
"covid": DataSource(
name="NYC COVID cases",
api_url="https://data.cityofnewyork.us/resource/rc75-m7u3.json",
destination_table=f"{DATASET_ID}.nyc_covid_cases",
order_column="date_of_interest",
date_columns=("date_of_interest",),
numeric_columns=(
"case_count",
"probable_case_count",
"hospitalized_count",
"death_count",
"probable_death_count",
"bx_case_count",
"bk_case_count",
"mn_case_count",
"qn_case_count",
"si_case_count",
),
),
}

MTA_RENAME_MAP = {
"subways_of_comparable_pre_pandemic_day": "subways_pct_of_comparable_pre_pandemic_day",
"buses_of_comparable_pre_pandemic_day": "buses_pct_of_comparable_pre_pandemic_day",
"lirr_of_comparable_pre_pandemic_day": "lirr_pct_of_comparable_pre_pandemic_day",
"metro_north_of_comparable_pre_pandemic_day": "metro_north_pct_of_comparable_pre_pandemic_day",
"bridges_and_tunnels_of_comparable_pre_pandemic_day": "bridges_and_tunnels_pct_of_comparable_pre_pandemic_day",
"access_a_ride_of_comparable_pre_pandemic_day": "access_a_ride_pct_of_comparable_pre_pandemic_day",
"staten_island_railway_of_comparable_pre_pandemic_day": "staten_island_railway_pct_of_comparable_pre_pandemic_day",
}


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Load project datasets from NYC Open Data into BigQuery."
)
parser.add_argument(
"--dataset",
choices=("all", "mta", "covid"),
default="all",
help="Which dataset to load. Defaults to all.",
)
return parser.parse_args()


def get_credentials():
"""Get Google credentials with browser-based auth flow."""
print("Authenticating with Google... A browser window should open.")
Expand All @@ -27,40 +109,101 @@ def get_credentials():
return credentials


def fetch_mta_data() -> pd.DataFrame:
"""Pull MTA ridership data from NYC Open Data API."""
print("Fetching MTA data from NYC Open Data API...")
def ensure_dataset_exists(credentials) -> None:
"""Create the BigQuery dataset if it does not already exist."""
client = bigquery.Client(project=PROJECT_ID, credentials=credentials)
dataset = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
dataset.location = "US"
client.create_dataset(dataset, exists_ok=True)


def fetch_source(source: DataSource) -> pd.DataFrame:
"""Pull a dataset from an NYC Open Data endpoint."""
print(f"Fetching {source.name} from {source.api_url} ...")
sys.stdout.flush()
url = "https://data.ny.gov/resource/vxuj-8kew.csv?$limit=50000"
df = pd.read_csv(url)
df["date"] = pd.to_datetime(df["date"])
print(f"Fetched {len(df)} rows (from {df['date'].min().date()} to {df['date'].max().date()})")
return df
response = requests.get(
source.api_url,
params={"$limit": 50000, "$order": source.order_column},
timeout=60,
)
response.raise_for_status()

df = pd.DataFrame(response.json())
if df.empty:
raise RuntimeError(f"{source.name} returned no rows.")

def main():
# Step 1: Authenticate
credentials = get_credentials()
if source.destination_table.endswith("daily_ridership"):
df = df.rename(columns=MTA_RENAME_MAP)

for column in source.date_columns:
if column in df.columns:
df[column] = pd.to_datetime(df[column])

# Step 2: Fetch data
df = fetch_mta_data()
for column in source.numeric_columns:
if column in df.columns:
df[column] = pd.to_numeric(df[column], errors="coerce")

# Step 3: Upload to BigQuery
print(f"Uploading to BigQuery: {PROJECT_ID}.{DATASET_TABLE} ...")
date_column = source.date_columns[0]
print(
"Fetched "
f"{len(df)} rows "
f"({df[date_column].min().date()} to {df[date_column].max().date()})"
)
return df


def upload_source(df: pd.DataFrame, source: DataSource, credentials) -> None:
"""Upload a dataframe into its destination BigQuery table."""
print(f"Uploading to BigQuery: {PROJECT_ID}.{source.destination_table} ...")
sys.stdout.flush()
pandas_gbq.to_gbq(
df,
destination_table=DATASET_TABLE,
destination_table=source.destination_table,
project_id=PROJECT_ID,
if_exists="replace",
credentials=credentials,
)
print("Done! Data loaded to BigQuery successfully.")
print("Upload complete.")


def verify_source(source: DataSource, credentials) -> None:
"""Print a quick verification summary for the target table."""
date_column = source.date_columns[0]
query = f"""
SELECT
COUNT(*) AS row_count,
MIN(`{date_column}`) AS min_date,
MAX(`{date_column}`) AS max_date
FROM `{PROJECT_ID}.{source.destination_table}`
"""
result = pandas_gbq.read_gbq(
query,
project_id=PROJECT_ID,
credentials=credentials,
)
row = result.iloc[0]
print(
"Verification: "
f"{row['row_count']} rows "
f"({pd.Timestamp(row['min_date']).date()} to {pd.Timestamp(row['max_date']).date()})"
)


def main() -> None:
args = parse_args()
selected_keys = list(DATA_SOURCES) if args.dataset == "all" else [args.dataset]

credentials = get_credentials()
ensure_dataset_exists(credentials)

for key in selected_keys:
source = DATA_SOURCES[key]
df = fetch_source(source)
upload_source(df, source, credentials)
verify_source(source, credentials)
print("")

# Step 4: Verify
query = f"SELECT COUNT(*) as row_count FROM `{PROJECT_ID}.{DATASET_TABLE}`"
result = pandas_gbq.read_gbq(query, project_id=PROJECT_ID, credentials=credentials)
print(f"Verification: {result['row_count'].iloc[0]} rows in BigQuery table.")
print("Done! BigQuery tables are ready for the Streamlit app.")


if __name__ == "__main__":
Expand Down
Loading
Loading