-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathload_data_to_bq.py
More file actions
305 lines (259 loc) · 10.2 KB
/
load_data_to_bq.py
File metadata and controls
305 lines (259 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
"""Load project datasets from NYC Open Data into BigQuery."""
import argparse
from dataclasses import dataclass
import json
import os
import sys
import pandas as pd
import pandas_gbq
import requests
from google.cloud import bigquery
from google.oauth2 import service_account
PROJECT_ID = "sipa-adv-c-bouncing-penguin"
DATASET_ID = "mta_data"
API_ROW_LIMIT = 50000
SCOPES = [
"https://www.googleapis.com/auth/bigquery",
]
@dataclass(frozen=True)
class DataSource:
name: str
api_url: str
destination_table: str
order_column: str
date_columns: tuple[str, ...]
numeric_columns: tuple[str, ...]
required_columns: tuple[str, ...]
minimum_rows: int
minimum_date: str
DATA_SOURCES = {
"mta": DataSource(
name="MTA ridership",
api_url="https://data.ny.gov/resource/vxuj-8kew.json",
destination_table=f"{DATASET_ID}.daily_ridership",
order_column="date",
date_columns=("date",),
numeric_columns=(
"subways_total_estimated_ridership",
"subways_pct_of_comparable_pre_pandemic_day",
"buses_total_estimated_ridership",
"buses_pct_of_comparable_pre_pandemic_day",
"lirr_total_estimated_ridership",
"lirr_pct_of_comparable_pre_pandemic_day",
"metro_north_total_estimated_ridership",
"metro_north_pct_of_comparable_pre_pandemic_day",
"access_a_ride_total_scheduled_trips",
"access_a_ride_pct_of_comparable_pre_pandemic_day",
"bridges_and_tunnels_total_traffic",
"bridges_and_tunnels_pct_of_comparable_pre_pandemic_day",
"staten_island_railway_total_estimated_ridership",
"staten_island_railway_pct_of_comparable_pre_pandemic_day",
),
required_columns=("date",),
minimum_rows=1000,
minimum_date="2020-03-01",
),
"covid": DataSource(
name="NYC COVID cases",
api_url="https://data.cityofnewyork.us/resource/rc75-m7u3.json",
destination_table=f"{DATASET_ID}.nyc_covid_cases",
order_column="date_of_interest",
date_columns=("date_of_interest",),
numeric_columns=(
"case_count",
"probable_case_count",
"hospitalized_count",
"death_count",
"probable_death_count",
"bx_case_count",
"bk_case_count",
"mn_case_count",
"qn_case_count",
"si_case_count",
),
required_columns=("date_of_interest",),
minimum_rows=1000,
minimum_date="2020-03-01",
),
}
MTA_RENAME_MAP = {
"subways_of_comparable_pre_pandemic_day": "subways_pct_of_comparable_pre_pandemic_day",
"buses_of_comparable_pre_pandemic_day": "buses_pct_of_comparable_pre_pandemic_day",
"lirr_of_comparable_pre_pandemic_day": "lirr_pct_of_comparable_pre_pandemic_day",
"metro_north_of_comparable_pre_pandemic_day": "metro_north_pct_of_comparable_pre_pandemic_day",
"bridges_and_tunnels_of_comparable_pre_pandemic_day": "bridges_and_tunnels_pct_of_comparable_pre_pandemic_day",
"access_a_ride_of_comparable_pre_pandemic_day": "access_a_ride_pct_of_comparable_pre_pandemic_day",
"staten_island_railway_of_comparable_pre_pandemic_day": "staten_island_railway_pct_of_comparable_pre_pandemic_day",
# Upstream typo workaround: the API returns "ridersip" (missing the trailing
# "h") for bus totals. Map it to the canonical name so the column survives.
"buses_total_estimated_ridersip": "buses_total_estimated_ridership",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Load project datasets from NYC Open Data into BigQuery."
)
parser.add_argument(
"--dataset",
choices=("all", "mta", "covid"),
default="all",
help="Which dataset to load. Defaults to all.",
)
return parser.parse_args()
def get_credentials():
"""Get Google credentials from service account JSON or env var."""
sa_key = os.environ.get("GCP_SA_KEY")
if sa_key:
print("Using service account credentials from environment.")
info = json.loads(sa_key)
credentials = service_account.Credentials.from_service_account_info(
info, scopes=SCOPES
)
else:
import pydata_google_auth
print("Authenticating with Google... A browser window should open.")
print("If it doesn't, copy the URL shown below and open it manually.")
credentials = pydata_google_auth.get_user_credentials(
SCOPES,
auth_local_webserver=False,
)
print("Authentication successful!")
return credentials
def ensure_dataset_exists(credentials) -> None:
"""Create the BigQuery dataset if it does not already exist."""
client = bigquery.Client(project=PROJECT_ID, credentials=credentials)
dataset = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
dataset.location = "US"
client.create_dataset(dataset, exists_ok=True)
def validate_source_frame(df: pd.DataFrame, source: DataSource) -> None:
"""Sanity-check a source pull before replacing the BigQuery table.
Hard failures (raise) are reserved for things that would corrupt the table:
missing date column, far too few rows, hitting the API page cap, or data
that does not extend back to the expected start date. Missing optional
numeric columns only emit a warning so a single dropped upstream column
cannot freeze the entire table.
"""
missing_required = [column for column in source.required_columns if column not in df.columns]
if missing_required:
raise RuntimeError(
f"{source.name} is missing required columns: {missing_required}"
)
if len(df) < source.minimum_rows:
raise RuntimeError(
f"{source.name} returned only {len(df)} rows; "
f"expected at least {source.minimum_rows}."
)
if len(df) >= API_ROW_LIMIT:
raise RuntimeError(
f"{source.name} hit the {API_ROW_LIMIT:,}-row API limit. "
"Add pagination before replacing the BigQuery table."
)
date_column = source.date_columns[0]
if df[date_column].isna().any():
raise RuntimeError(f"{source.name} has null values in {date_column}.")
expected_min = pd.Timestamp(source.minimum_date)
actual_min = df[date_column].min()
if actual_min > expected_min:
raise RuntimeError(
f"{source.name} starts at {actual_min.date()}, "
f"but should include data from {expected_min.date()}."
)
actual_max = df[date_column].max()
if actual_max <= actual_min:
raise RuntimeError(f"{source.name} does not cover a usable date range.")
for column in source.numeric_columns:
if column not in df.columns:
print(
f"WARNING: column '{column}' missing from {source.name} API response, "
"will be skipped."
)
continue
if df[column].isna().all():
print(
f"WARNING: column '{column}' is all-null in {source.name} API response, "
"will be uploaded as nulls."
)
def fetch_source(source: DataSource) -> pd.DataFrame:
"""Pull a dataset from an NYC Open Data endpoint."""
print(f"Fetching {source.name} from {source.api_url} ...")
sys.stdout.flush()
response = requests.get(
source.api_url,
params={"$limit": API_ROW_LIMIT, "$order": source.order_column},
timeout=60,
)
response.raise_for_status()
df = pd.DataFrame(response.json())
if df.empty:
raise RuntimeError(f"{source.name} returned no rows.")
if source.destination_table.endswith("daily_ridership"):
df = df.rename(columns=MTA_RENAME_MAP)
for column in source.date_columns:
if column in df.columns:
df[column] = pd.to_datetime(df[column])
for column in source.numeric_columns:
if column in df.columns:
df[column] = pd.to_numeric(df[column], errors="coerce")
validate_source_frame(df, source)
date_column = source.date_columns[0]
print(
"Fetched "
f"{len(df)} rows "
f"({df[date_column].min().date()} to {df[date_column].max().date()})"
)
return df
def upload_source(df: pd.DataFrame, source: DataSource, credentials) -> None:
"""Upload a dataframe into its destination BigQuery table."""
expected = set(source.numeric_columns) | set(source.date_columns)
present = [column for column in df.columns if column in expected]
skipped = sorted(expected - set(df.columns))
print(
f"Uploading {len(df.columns)} columns to BigQuery "
f"{PROJECT_ID}.{source.destination_table}: {sorted(df.columns)}"
)
if skipped:
print(f" Skipped (missing upstream): {skipped}")
print(f" Of those, {len(present)} match the expected schema.")
sys.stdout.flush()
pandas_gbq.to_gbq(
df,
destination_table=source.destination_table,
project_id=PROJECT_ID,
if_exists="replace",
credentials=credentials,
)
print("Upload complete.")
def verify_source(source: DataSource, credentials) -> None:
"""Print a quick verification summary for the target table."""
date_column = source.date_columns[0]
query = f"""
SELECT
COUNT(*) AS row_count,
MIN(`{date_column}`) AS min_date,
MAX(`{date_column}`) AS max_date
FROM `{PROJECT_ID}.{source.destination_table}`
"""
result = pandas_gbq.read_gbq(
query,
project_id=PROJECT_ID,
credentials=credentials,
)
row = result.iloc[0]
print(
"Verification: "
f"{row['row_count']} rows "
f"({pd.Timestamp(row['min_date']).date()} to {pd.Timestamp(row['max_date']).date()})"
)
def main() -> None:
args = parse_args()
selected_keys = list(DATA_SOURCES) if args.dataset == "all" else [args.dataset]
credentials = get_credentials()
ensure_dataset_exists(credentials)
for key in selected_keys:
source = DATA_SOURCES[key]
df = fetch_source(source)
upload_source(df, source, credentials)
verify_source(source, credentials)
print("")
print("Done! BigQuery tables are ready for the Streamlit app.")
if __name__ == "__main__":
main()