From 6a5961f60bc9418864761203a6e2452299262602 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Tue, 17 Feb 2026 12:11:56 +0000 Subject: [PATCH 1/3] Keep rule logs out of the outer snakemake log file. Organise log files a bit. --- src/locations.py | 3 ++- src/pipeline/Snakefile | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/locations.py b/src/locations.py index cfc9ae2..bb15847 100644 --- a/src/locations.py +++ b/src/locations.py @@ -19,7 +19,8 @@ PSEUDONYMISED_PARQUET_PATTERN = WAVEFORM_PSEUDONYMISED_PARQUET / ( FILE_STEM_PATTERN_HASHED + ".parquet" ) -HASH_LOOKUP_JSON = WAVEFORM_HASH_LOOKUPS / "{date}/{date}.hashes.json" +HASH_LOOKUP_JSON_REL = Path("{date}/{date}.hashes.json") +HASH_LOOKUP_JSON = WAVEFORM_HASH_LOOKUPS / HASH_LOOKUP_JSON_REL def make_file_name(template: str, subs: dict[str, str]): diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index ae0bf6b..de6914e 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -13,6 +13,7 @@ from locations import ( WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, HASH_LOOKUP_JSON, + HASH_LOOKUP_JSON_REL, FILE_STEM_PATTERN, FILE_STEM_PATTERN_HASHED, make_file_name, @@ -131,7 +132,7 @@ rule csv_to_parquet: WAVEFORM_PSEUDONYMISED_PARQUET / (FILE_STEM_PATTERN_HASHED + ".parquet") log: # these do not get uploaded as they may contain identifiers - WAVEFORM_SNAKEMAKE_LOGS / (FILE_STEM_PATTERN_HASHED + ".log") + WAVEFORM_SNAKEMAKE_LOGS / "csv_to_parquet" / (FILE_STEM_PATTERN_HASHED + ".log") run: logger = configure_file_logging(log[0]) original_csn = hash_to_csn[wildcards.hashed_csn] @@ -149,7 +150,7 @@ def pseudonymised_parquet_files_for_date(wc): return [ao.get_pseudonymised_parquet_path() for ao in all_outputs if ao.date == wc.date] -rule make_daily_hash_lookup: +rule daily_hash_lookup: input: # Because we don't declare the original parquets in the output of csv_to_parquet, # they are not present in the Snakemake dependency DAG. Therefore, we can't reference them @@ -159,9 +160,12 @@ rule make_daily_hash_lookup: pseudonymised_parquets = pseudonymised_parquet_files_for_date output: hash_lookup_json = HASH_LOOKUP_JSON + log: + WAVEFORM_SNAKEMAKE_LOGS / "daily_hash_lookup" / HASH_LOOKUP_JSON_REL.with_suffix(".log") run: + logger = configure_file_logging(log[0]) daily_files = [ao for ao in all_outputs if ao.date == wildcards.date] - print( + logger.info( f"Making daily hash lookup {output.hash_lookup_json} from {len(daily_files)} files: " f"{input.pseudonymised_parquets}" ) @@ -176,7 +180,7 @@ rule make_daily_hash_lookup: min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp") if min_timestamp is None or max_timestamp is None: # do not contribute to stats - print(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}") + logger.info(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}") break entry[min_timestamp_key] = min_timestamp entry[max_timestamp_key] = max_timestamp From 65f7353eac894175b6ce619e42ad081a5c8aaf4d Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Tue, 17 Feb 2026 12:19:24 +0000 Subject: [PATCH 2/3] Refactor Snakefile to remove code to separate modules to make it more concise and enable unit testing. --- src/exporter/daily_summary.py | 37 ++++++++++++++++++++ src/exporter/parquet.py | 37 ++++++++++++++++++++ src/pipeline/Snakefile | 66 ++--------------------------------- 3 files changed, 76 insertions(+), 64 deletions(-) create mode 100644 src/exporter/daily_summary.py create mode 100644 src/exporter/parquet.py diff --git a/src/exporter/daily_summary.py b/src/exporter/daily_summary.py new file mode 100644 index 0000000..aa306de --- /dev/null +++ b/src/exporter/daily_summary.py @@ -0,0 +1,37 @@ +import json +import logging + +from exporter.parquet import parquet_min_max_value + +logger = logging.getLogger(__name__) + +def make_daily_hash_summary(daily_files, out_json_file): + min_timestamp_key = 'min_timestamp' + max_timestamp_key = 'max_timestamp' + hash_summary_by_csn = {} + for daily_file in daily_files: + entry = {} + original_parquet = daily_file.get_original_parquet_path() + entry["csn"] = daily_file.csn + entry["hashed_csn"] = daily_file.hashed_csn + min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp") + if min_timestamp is None or max_timestamp is None: + # do not contribute to stats + logger.warning(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}") + break + entry[min_timestamp_key] = min_timestamp + entry[max_timestamp_key] = max_timestamp + existing_entry = hash_summary_by_csn.get(daily_file.csn) + if existing_entry is None: + hash_summary_by_csn[daily_file.csn] = entry + else: + # update the limits (there can be multiple files for the same CSN because each variable/channel + # is in its own file) + existing_entry[min_timestamp_key] = min(min_timestamp, existing_entry[min_timestamp_key]) + existing_entry[max_timestamp_key] = max(max_timestamp, existing_entry[max_timestamp_key]) + + hash_summary = list(hash_summary_by_csn.values()) + + with open(out_json_file, "w") as fh: + json.dump(hash_summary, fh, indent=0) + logger.info(f"Wrote {len(hash_summary)} entries to {out_json_file}") diff --git a/src/exporter/parquet.py b/src/exporter/parquet.py new file mode 100644 index 0000000..7362497 --- /dev/null +++ b/src/exporter/parquet.py @@ -0,0 +1,37 @@ +from pathlib import Path + +from pyarrow import parquet as pq + + +def parquet_min_max_value(parquet_path: Path, column_name): + """By the magic of parquet files we can get the min/max timestamps without loading + it all into memory or even reading every row.""" + parquet_file = pq.ParquetFile(parquet_path) + column_index = parquet_file.schema_arrow.get_field_index(column_name) + if column_index == -1: + raise ValueError(f"Column '{column_name}' not found in {parquet_path}") + + lowest_min = None + highest_max = None + + metadata = parquet_file.metadata + if metadata.num_rows == 0: + return None, None + + # each row group will have its own min/max, so take the min of mins and the max of maxes + for row_group_index in range(metadata.num_row_groups): + column_meta = metadata.row_group(row_group_index).column(column_index) + column_stats = column_meta.statistics + # We created the parquets so we know they have up-to-date statistics. + # We have already checked the file is not empty (which causes empty stats), so treat missing + # statistics as an invalid file. + if column_stats is None or not column_stats.has_min_max: + raise ValueError( + f"columns stats missing or min_max missing: {column_stats}" + ) + if lowest_min is None or column_stats.min < lowest_min: + lowest_min = column_stats.min + if highest_max is None or column_stats.max > highest_max: + highest_max = column_stats.max + + return lowest_min, highest_max diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index de6914e..dbce7db 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -3,9 +3,7 @@ import time from datetime import datetime, timedelta, timezone from pathlib import Path -import pyarrow.parquet as pq -from snakemake.io import glob_wildcards - +from exporter.daily_summary import make_daily_hash_summary from exporter.ftps import do_upload from locations import ( WAVEFORM_ORIGINAL_CSV, @@ -63,39 +61,6 @@ def configure_file_logging(log_file): return logger -def parquet_min_max_value(parquet_path: Path, column_name): - """By the magic of parquet files we can get the min/max timestamps without loading it all into - memory or even reading every row.""" - parquet_file = pq.ParquetFile(parquet_path) - column_index = parquet_file.schema_arrow.get_field_index(column_name) - if column_index == -1: - raise ValueError(f"Column '{column_name}' not found in {parquet_path}") - - lowest_min = None - highest_max = None - - metadata = parquet_file.metadata - if metadata.num_rows == 0: - return None, None - - - # each row group will have its own min/max, so take the min of mins and the max of maxes - for row_group_index in range(metadata.num_row_groups): - column_meta = metadata.row_group(row_group_index).column(column_index) - column_stats = column_meta.statistics - # We created the parquets so we know they have up-to-date statistics. - # We have already checked the file is not empty (which causes empty stats), so treat missing - # statistics as an invalid file. - if column_stats is None or not column_stats.has_min_max: - raise ValueError(f"columns stats missing or min_max missing: {column_stats}") - if lowest_min is None or column_stats.min < lowest_min: - lowest_min = column_stats.min - if highest_max is None or column_stats.max > highest_max: - highest_max = column_stats.max - - return lowest_min, highest_max - - rule all: input: ftps_uploaded = ALL_FTPS_UPLOADED, @@ -169,34 +134,7 @@ rule daily_hash_lookup: f"Making daily hash lookup {output.hash_lookup_json} from {len(daily_files)} files: " f"{input.pseudonymised_parquets}" ) - min_timestamp_key = 'min_timestamp' - max_timestamp_key = 'max_timestamp' - hash_summary_by_csn = {} - for daily_file in daily_files: - entry = {} - original_parquet = daily_file.get_original_parquet_path() - entry["csn"] = daily_file.csn - entry["hashed_csn"] = daily_file.hashed_csn - min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp") - if min_timestamp is None or max_timestamp is None: - # do not contribute to stats - logger.info(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}") - break - entry[min_timestamp_key] = min_timestamp - entry[max_timestamp_key] = max_timestamp - existing_entry = hash_summary_by_csn.get(daily_file.csn) - if existing_entry is None: - hash_summary_by_csn[daily_file.csn] = entry - else: - # update the limits (there can be multiple files for the same CSN because each variable/channel - # is in its own file) - existing_entry[min_timestamp_key] = min(min_timestamp, existing_entry[min_timestamp_key]) - existing_entry[max_timestamp_key] = max(max_timestamp, existing_entry[max_timestamp_key]) - - hash_summary = list(hash_summary_by_csn.values()) - - with open(output.hash_lookup_json, "w") as fh: - json.dump(hash_summary, fh, indent=0) + make_daily_hash_summary(daily_files, output.hash_lookup_json) rule send_ftps: From bf628e43370a92c1ee023442c3614a52af3418b2 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 20 Feb 2026 11:15:59 +0000 Subject: [PATCH 3/3] Linting fixes --- src/exporter/daily_summary.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/exporter/daily_summary.py b/src/exporter/daily_summary.py index aa306de..a067d78 100644 --- a/src/exporter/daily_summary.py +++ b/src/exporter/daily_summary.py @@ -5,19 +5,24 @@ logger = logging.getLogger(__name__) + def make_daily_hash_summary(daily_files, out_json_file): - min_timestamp_key = 'min_timestamp' - max_timestamp_key = 'max_timestamp' + min_timestamp_key = "min_timestamp" + max_timestamp_key = "max_timestamp" hash_summary_by_csn = {} for daily_file in daily_files: entry = {} original_parquet = daily_file.get_original_parquet_path() entry["csn"] = daily_file.csn entry["hashed_csn"] = daily_file.hashed_csn - min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp") + min_timestamp, max_timestamp = parquet_min_max_value( + original_parquet, "timestamp" + ) if min_timestamp is None or max_timestamp is None: # do not contribute to stats - logger.warning(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}") + logger.warning( + f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}" + ) break entry[min_timestamp_key] = min_timestamp entry[max_timestamp_key] = max_timestamp @@ -27,8 +32,12 @@ def make_daily_hash_summary(daily_files, out_json_file): else: # update the limits (there can be multiple files for the same CSN because each variable/channel # is in its own file) - existing_entry[min_timestamp_key] = min(min_timestamp, existing_entry[min_timestamp_key]) - existing_entry[max_timestamp_key] = max(max_timestamp, existing_entry[max_timestamp_key]) + existing_entry[min_timestamp_key] = min( + min_timestamp, existing_entry[min_timestamp_key] + ) + existing_entry[max_timestamp_key] = max( + max_timestamp, existing_entry[max_timestamp_key] + ) hash_summary = list(hash_summary_by_csn.values())