Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/exporter/daily_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
import logging

from exporter.parquet import parquet_min_max_value

logger = logging.getLogger(__name__)


def make_daily_hash_summary(daily_files, out_json_file):
min_timestamp_key = "min_timestamp"
max_timestamp_key = "max_timestamp"
hash_summary_by_csn = {}
for daily_file in daily_files:
entry = {}
original_parquet = daily_file.get_original_parquet_path()
entry["csn"] = daily_file.csn
entry["hashed_csn"] = daily_file.hashed_csn
min_timestamp, max_timestamp = parquet_min_max_value(
original_parquet, "timestamp"
)
if min_timestamp is None or max_timestamp is None:
# do not contribute to stats
logger.warning(
f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}"
)
break
entry[min_timestamp_key] = min_timestamp
entry[max_timestamp_key] = max_timestamp
existing_entry = hash_summary_by_csn.get(daily_file.csn)
if existing_entry is None:
hash_summary_by_csn[daily_file.csn] = entry
else:
# update the limits (there can be multiple files for the same CSN because each variable/channel
# is in its own file)
existing_entry[min_timestamp_key] = min(
min_timestamp, existing_entry[min_timestamp_key]
)
existing_entry[max_timestamp_key] = max(
max_timestamp, existing_entry[max_timestamp_key]
)

hash_summary = list(hash_summary_by_csn.values())

with open(out_json_file, "w") as fh:
json.dump(hash_summary, fh, indent=0)
logger.info(f"Wrote {len(hash_summary)} entries to {out_json_file}")
37 changes: 37 additions & 0 deletions src/exporter/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pathlib import Path

from pyarrow import parquet as pq


def parquet_min_max_value(parquet_path: Path, column_name):
"""By the magic of parquet files we can get the min/max timestamps without loading
it all into memory or even reading every row."""
parquet_file = pq.ParquetFile(parquet_path)
column_index = parquet_file.schema_arrow.get_field_index(column_name)
if column_index == -1:
raise ValueError(f"Column '{column_name}' not found in {parquet_path}")

lowest_min = None
highest_max = None

metadata = parquet_file.metadata
if metadata.num_rows == 0:
return None, None

# each row group will have its own min/max, so take the min of mins and the max of maxes
for row_group_index in range(metadata.num_row_groups):
column_meta = metadata.row_group(row_group_index).column(column_index)
column_stats = column_meta.statistics
# We created the parquets so we know they have up-to-date statistics.
# We have already checked the file is not empty (which causes empty stats), so treat missing
# statistics as an invalid file.
if column_stats is None or not column_stats.has_min_max:
raise ValueError(
f"columns stats missing or min_max missing: {column_stats}"
)
if lowest_min is None or column_stats.min < lowest_min:
lowest_min = column_stats.min
if highest_max is None or column_stats.max > highest_max:
highest_max = column_stats.max

return lowest_min, highest_max
3 changes: 2 additions & 1 deletion src/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
PSEUDONYMISED_PARQUET_PATTERN = WAVEFORM_PSEUDONYMISED_PARQUET / (
FILE_STEM_PATTERN_HASHED + ".parquet"
)
HASH_LOOKUP_JSON = WAVEFORM_HASH_LOOKUPS / "{date}/{date}.hashes.json"
HASH_LOOKUP_JSON_REL = Path("{date}/{date}.hashes.json")
HASH_LOOKUP_JSON = WAVEFORM_HASH_LOOKUPS / HASH_LOOKUP_JSON_REL


def make_file_name(template: str, subs: dict[str, str]):
Expand Down
76 changes: 9 additions & 67 deletions src/pipeline/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import pyarrow.parquet as pq
from snakemake.io import glob_wildcards

from exporter.daily_summary import make_daily_hash_summary
from exporter.ftps import do_upload
from locations import (
WAVEFORM_ORIGINAL_CSV,
WAVEFORM_SNAKEMAKE_LOGS,
WAVEFORM_PSEUDONYMISED_PARQUET,
WAVEFORM_FTPS_LOGS,
HASH_LOOKUP_JSON,
HASH_LOOKUP_JSON_REL,
FILE_STEM_PATTERN,
FILE_STEM_PATTERN_HASHED,
make_file_name,
Expand Down Expand Up @@ -62,39 +61,6 @@ def configure_file_logging(log_file):
return logger


def parquet_min_max_value(parquet_path: Path, column_name):
"""By the magic of parquet files we can get the min/max timestamps without loading it all into
memory or even reading every row."""
parquet_file = pq.ParquetFile(parquet_path)
column_index = parquet_file.schema_arrow.get_field_index(column_name)
if column_index == -1:
raise ValueError(f"Column '{column_name}' not found in {parquet_path}")

lowest_min = None
highest_max = None

metadata = parquet_file.metadata
if metadata.num_rows == 0:
return None, None


# each row group will have its own min/max, so take the min of mins and the max of maxes
for row_group_index in range(metadata.num_row_groups):
column_meta = metadata.row_group(row_group_index).column(column_index)
column_stats = column_meta.statistics
# We created the parquets so we know they have up-to-date statistics.
# We have already checked the file is not empty (which causes empty stats), so treat missing
# statistics as an invalid file.
if column_stats is None or not column_stats.has_min_max:
raise ValueError(f"columns stats missing or min_max missing: {column_stats}")
if lowest_min is None or column_stats.min < lowest_min:
lowest_min = column_stats.min
if highest_max is None or column_stats.max > highest_max:
highest_max = column_stats.max

return lowest_min, highest_max


rule all:
input:
ftps_uploaded = ALL_FTPS_UPLOADED,
Expand Down Expand Up @@ -131,7 +97,7 @@ rule csv_to_parquet:
WAVEFORM_PSEUDONYMISED_PARQUET / (FILE_STEM_PATTERN_HASHED + ".parquet")
log:
# these do not get uploaded as they may contain identifiers
WAVEFORM_SNAKEMAKE_LOGS / (FILE_STEM_PATTERN_HASHED + ".log")
WAVEFORM_SNAKEMAKE_LOGS / "csv_to_parquet" / (FILE_STEM_PATTERN_HASHED + ".log")
run:
logger = configure_file_logging(log[0])
original_csn = hash_to_csn[wildcards.hashed_csn]
Expand All @@ -149,7 +115,7 @@ def pseudonymised_parquet_files_for_date(wc):
return [ao.get_pseudonymised_parquet_path() for ao in all_outputs if ao.date == wc.date]


rule make_daily_hash_lookup:
rule daily_hash_lookup:
input:
# Because we don't declare the original parquets in the output of csv_to_parquet,
# they are not present in the Snakemake dependency DAG. Therefore, we can't reference them
Expand All @@ -159,40 +125,16 @@ rule make_daily_hash_lookup:
pseudonymised_parquets = pseudonymised_parquet_files_for_date
output:
hash_lookup_json = HASH_LOOKUP_JSON
log:
WAVEFORM_SNAKEMAKE_LOGS / "daily_hash_lookup" / HASH_LOOKUP_JSON_REL.with_suffix(".log")
run:
logger = configure_file_logging(log[0])
daily_files = [ao for ao in all_outputs if ao.date == wildcards.date]
print(
logger.info(
f"Making daily hash lookup {output.hash_lookup_json} from {len(daily_files)} files: "
f"{input.pseudonymised_parquets}"
)
min_timestamp_key = 'min_timestamp'
max_timestamp_key = 'max_timestamp'
hash_summary_by_csn = {}
for daily_file in daily_files:
entry = {}
original_parquet = daily_file.get_original_parquet_path()
entry["csn"] = daily_file.csn
entry["hashed_csn"] = daily_file.hashed_csn
min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp")
if min_timestamp is None or max_timestamp is None:
# do not contribute to stats
print(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}")
break
entry[min_timestamp_key] = min_timestamp
entry[max_timestamp_key] = max_timestamp
existing_entry = hash_summary_by_csn.get(daily_file.csn)
if existing_entry is None:
hash_summary_by_csn[daily_file.csn] = entry
else:
# update the limits (there can be multiple files for the same CSN because each variable/channel
# is in its own file)
existing_entry[min_timestamp_key] = min(min_timestamp, existing_entry[min_timestamp_key])
existing_entry[max_timestamp_key] = max(max_timestamp, existing_entry[max_timestamp_key])

hash_summary = list(hash_summary_by_csn.values())

with open(output.hash_lookup_json, "w") as fh:
json.dump(hash_summary, fh, indent=0)
make_daily_hash_summary(daily_files, output.hash_lookup_json)


rule send_ftps:
Expand Down