Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,12 @@ impl MetastoreService for PostgresqlMetastore {
size_bytes: row.13,
split_metadata_json: row.14,
update_timestamp: row.15,
window_start: None,
window_duration_secs: 0,
sort_fields: String::new(),
num_merge_ops: 0,
row_keys: None,
zonemap_regexes: String::new(),
};

let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged);
Expand Down
305 changes: 305 additions & 0 deletions quickwit/quickwit-parquet-engine/src/split/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Metrics split metadata definitions.

use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::time::SystemTime;

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -120,7 +121,12 @@ impl std::fmt::Display for MetricsSplitState {
}

/// Metadata for a metrics split.
///
/// The `window` field stores the time window as `[start, start + duration)`.
/// For JSON serialization, it is decomposed into `window_start` and
/// `window_duration_secs` for backward compatibility with pre-Phase-31 code.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(from = "MetricsSplitMetadataSerde", into = "MetricsSplitMetadataSerde")]
pub struct MetricsSplitMetadata {
/// Unique split identifier.
pub split_id: SplitId,
Expand Down Expand Up @@ -153,6 +159,117 @@ pub struct MetricsSplitMetadata {

/// When this split was created.
pub created_at: SystemTime,

/// Parquet file path relative to storage root.
pub parquet_file: String,

/// Time window as `[start, start + duration)` in epoch seconds.
/// None for pre-Phase-31 splits (backward compat).
pub window: Option<Range<i64>>,

/// Sort schema as Husky-style string (e.g., "metric_name|host|timestamp/V2").
/// Empty string for pre-Phase-31 splits.
pub sort_fields: String,

/// Number of merge operations this split has been through.
/// 0 for newly ingested splits.
pub num_merge_ops: u32,

/// RowKeys (sort-key min/max boundaries) as proto bytes.
/// None for pre-Phase-31 splits or splits without sort schema.
pub row_keys_proto: Option<Vec<u8>>,

/// Per-column zonemap regex strings, keyed by column name.
/// Empty for pre-Phase-31 splits.
pub zonemap_regexes: HashMap<String, String>,
}

/// Serde helper struct that uses `window_start` / `window_duration_secs` field
/// names for JSON backward compatibility while the in-memory representation uses
/// `Option<Range<i64>>`.
#[derive(Serialize, Deserialize)]
struct MetricsSplitMetadataSerde {
split_id: SplitId,
index_uid: String,
time_range: TimeRange,
num_rows: u64,
size_bytes: u64,
metric_names: HashSet<String>,
low_cardinality_tags: HashMap<String, HashSet<String>>,
high_cardinality_tag_keys: HashSet<String>,
created_at: SystemTime,
parquet_file: String,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Add serde default for parquet_file in legacy metadata

MetricsSplitMetadataSerde now requires parquet_file, but metadata JSON produced before this commit did not contain that field, so deserialization will fail for existing rows during upgrades. PgMetricsSplit::to_metadata() parses split_metadata_json, and metastore.rs currently drops rows on parse failure via to_metadata().ok()?, which makes legacy splits disappear from list/query results instead of remaining readable.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, true if we were in production already, but i don't think this matters.


#[serde(default, skip_serializing_if = "Option::is_none")]
window_start: Option<i64>,

#[serde(default)]
window_duration_secs: u32,

#[serde(default)]
sort_fields: String,

#[serde(default)]
num_merge_ops: u32,

#[serde(default, skip_serializing_if = "Option::is_none")]
row_keys_proto: Option<Vec<u8>>,

#[serde(default, skip_serializing_if = "HashMap::is_empty")]
zonemap_regexes: HashMap<String, String>,
}

impl From<MetricsSplitMetadataSerde> for MetricsSplitMetadata {
fn from(s: MetricsSplitMetadataSerde) -> Self {
let window = match (s.window_start, s.window_duration_secs) {
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
_ => None,
};
Self {
split_id: s.split_id,
index_uid: s.index_uid,
time_range: s.time_range,
num_rows: s.num_rows,
size_bytes: s.size_bytes,
metric_names: s.metric_names,
low_cardinality_tags: s.low_cardinality_tags,
high_cardinality_tag_keys: s.high_cardinality_tag_keys,
created_at: s.created_at,
parquet_file: s.parquet_file,
window,
sort_fields: s.sort_fields,
num_merge_ops: s.num_merge_ops,
row_keys_proto: s.row_keys_proto,
zonemap_regexes: s.zonemap_regexes,
}
}
}

impl From<MetricsSplitMetadata> for MetricsSplitMetadataSerde {
fn from(m: MetricsSplitMetadata) -> Self {
let (window_start, window_duration_secs) = match &m.window {
Some(w) => (Some(w.start), (w.end - w.start) as u32),
None => (None, 0),
};
Self {
split_id: m.split_id,
index_uid: m.index_uid,
time_range: m.time_range,
num_rows: m.num_rows,
size_bytes: m.size_bytes,
metric_names: m.metric_names,
low_cardinality_tags: m.low_cardinality_tags,
high_cardinality_tag_keys: m.high_cardinality_tag_keys,
created_at: m.created_at,
parquet_file: m.parquet_file,
window_start,
window_duration_secs,
sort_fields: m.sort_fields,
num_merge_ops: m.num_merge_ops,
row_keys_proto: m.row_keys_proto,
zonemap_regexes: m.zonemap_regexes,
}
}
}

impl MetricsSplitMetadata {
Expand All @@ -167,6 +284,19 @@ impl MetricsSplitMetadata {
/// Tags with >= CARDINALITY_THRESHOLD unique values use Parquet bloom filters.
pub const CARDINALITY_THRESHOLD: usize = 1000;

/// Returns the window start in epoch seconds, or `None` for pre-Phase-31 splits.
pub fn window_start(&self) -> Option<i64> {
self.window.as_ref().map(|w| w.start)
}

/// Returns the window duration in seconds, or 0 for pre-Phase-31 splits.
pub fn window_duration_secs(&self) -> u32 {
match &self.window {
Some(w) => (w.end - w.start) as u32,
None => 0,
}
}

/// Create a new MetricsSplitMetadata builder.
pub fn builder() -> MetricsSplitMetadataBuilder {
MetricsSplitMetadataBuilder::default()
Expand Down Expand Up @@ -221,8 +351,19 @@ pub struct MetricsSplitMetadataBuilder {
metric_names: HashSet<String>,
low_cardinality_tags: HashMap<String, HashSet<String>>,
high_cardinality_tag_keys: HashSet<String>,
parquet_file: String,
window_start: Option<i64>,
window_duration_secs: u32,
sort_fields: String,
num_merge_ops: u32,
row_keys_proto: Option<Vec<u8>>,
zonemap_regexes: HashMap<String, String>,
}

// The builder still accepts window_start and window_duration_secs separately
// to remain compatible with callers that compute them independently (e.g.,
// split_writer). The `build()` method fuses them into `Option<Range<i64>>`.

impl MetricsSplitMetadataBuilder {
pub fn split_id(mut self, id: SplitId) -> Self {
self.split_id = Some(id);
Expand Down Expand Up @@ -284,7 +425,71 @@ impl MetricsSplitMetadataBuilder {
self
}

pub fn parquet_file(mut self, path: impl Into<String>) -> Self {
self.parquet_file = path.into();
self
}

pub fn window_start_secs(mut self, epoch_secs: i64) -> Self {
self.window_start = Some(epoch_secs);
self
}

pub fn window_duration_secs(mut self, dur: u32) -> Self {
self.window_duration_secs = dur;
self
}

pub fn sort_fields(mut self, schema: impl Into<String>) -> Self {
self.sort_fields = schema.into();
self
}

pub fn num_merge_ops(mut self, ops: u32) -> Self {
self.num_merge_ops = ops;
self
}

pub fn row_keys_proto(mut self, bytes: Vec<u8>) -> Self {
self.row_keys_proto = Some(bytes);
self
}

pub fn add_zonemap_regex(
mut self,
column: impl Into<String>,
regex: impl Into<String>,
) -> Self {
self.zonemap_regexes.insert(column.into(), regex.into());
self
}

pub fn build(self) -> MetricsSplitMetadata {
// TW-2 (ADR-003): window_duration must evenly divide 3600.
// Enforced at build time so no invalid metadata propagates to storage.
debug_assert!(
self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0,
"TW-2 violated: window_duration_secs={} does not divide 3600",
self.window_duration_secs
);

// TW-1 (ADR-003, partial): window_start and window_duration_secs are paired.
// If one is set, the other must be too. Pre-Phase-31 splits have both at defaults.
debug_assert!(
(self.window_start.is_none() && self.window_duration_secs == 0)
|| (self.window_start.is_some() && self.window_duration_secs > 0),
"TW-1 violated: window_start and window_duration_secs must be set together \
(window_start={:?}, window_duration_secs={})",
self.window_start,
self.window_duration_secs
);

// Fuse the two builder fields into a single Range.
let window = match (self.window_start, self.window_duration_secs) {
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
_ => None,
};

MetricsSplitMetadata {
split_id: self.split_id.unwrap_or_else(SplitId::generate),
index_uid: self.index_uid.expect("index_uid is required"),
Expand All @@ -295,6 +500,12 @@ impl MetricsSplitMetadataBuilder {
low_cardinality_tags: self.low_cardinality_tags,
high_cardinality_tag_keys: self.high_cardinality_tag_keys,
created_at: SystemTime::now(),
parquet_file: self.parquet_file,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Derive parquet_file when builder callers omit it

The builder stores parquet_file as a default-initialized String and build() writes it through unchanged, so callers that do not explicitly call .parquet_file(...) now emit metadata with an empty parquet path. This silently records invalid split metadata for current construction paths and loses the file location that was previously derivable from split_id.

Useful? React with 👍 / 👎.

window,
sort_fields: self.sort_fields,
num_merge_ops: self.num_merge_ops,
row_keys_proto: self.row_keys_proto,
zonemap_regexes: self.zonemap_regexes,
}
}
}
Expand Down Expand Up @@ -401,4 +612,98 @@ mod tests {
);
assert_eq!(format!("{}", MetricsSplitState::Published), "Published");
}

#[test]
fn test_backward_compat_deserialize_pre_phase31_json() {
// Simulate a JSON string from pre-Phase-31 code (no compaction fields).
let pre_phase31_json = r#"{
"split_id": "metrics_abc123",
"index_uid": "test-index:00000000000000000000000000",
"time_range": {"start_secs": 1000, "end_secs": 2000},
"num_rows": 500,
"size_bytes": 1024,
"metric_names": ["cpu.usage"],
"low_cardinality_tags": {},
"high_cardinality_tag_keys": [],
"created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0},
"parquet_file": "split1.parquet"
}"#;

let metadata: MetricsSplitMetadata =
serde_json::from_str(pre_phase31_json).expect("should deserialize pre-Phase-31 JSON");

// New fields should be at their defaults.
assert!(metadata.window.is_none());
assert!(metadata.window_start().is_none());
assert_eq!(metadata.window_duration_secs(), 0);
assert_eq!(metadata.sort_fields, "");
assert_eq!(metadata.num_merge_ops, 0);
assert!(metadata.row_keys_proto.is_none());
assert!(metadata.zonemap_regexes.is_empty());

// Existing fields should be intact.
assert_eq!(metadata.split_id.as_str(), "metrics_abc123");
assert_eq!(metadata.index_uid, "test-index:00000000000000000000000000");
assert_eq!(metadata.num_rows, 500);
}

#[test]
fn test_round_trip_with_compaction_fields() {
let metadata = MetricsSplitMetadata::builder()
.split_id(SplitId::new("roundtrip-compaction"))
.index_uid("test-index:00000000000000000000000000")
.time_range(TimeRange::new(1000, 2000))
.num_rows(100)
.size_bytes(500)
.window_start_secs(1700000000)
.window_duration_secs(3600)
.sort_fields("metric_name|host|timestamp/V2")
.num_merge_ops(3)
.row_keys_proto(vec![0x08, 0x01, 0x10, 0x02])
.add_zonemap_regex("metric_name", "cpu\\..*")
.add_zonemap_regex("host", "host-\\d+")
.build();

let json = serde_json::to_string(&metadata).expect("should serialize");
let recovered: MetricsSplitMetadata =
serde_json::from_str(&json).expect("should deserialize");

assert_eq!(recovered.window, Some(1700000000..1700003600));
assert_eq!(recovered.window_start(), Some(1700000000));
assert_eq!(recovered.window_duration_secs(), 3600);
assert_eq!(recovered.sort_fields, "metric_name|host|timestamp/V2");
assert_eq!(recovered.num_merge_ops, 3);
assert_eq!(recovered.row_keys_proto, Some(vec![0x08, 0x01, 0x10, 0x02]));
assert_eq!(recovered.zonemap_regexes.len(), 2);
assert_eq!(
recovered.zonemap_regexes.get("metric_name").unwrap(),
"cpu\\..*"
);
assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+");
}

#[test]
fn test_skip_serializing_empty_compaction_fields() {
let metadata = MetricsSplitMetadata::builder()
.split_id(SplitId::new("skip-test"))
.index_uid("test-index:00000000000000000000000000")
.time_range(TimeRange::new(1000, 2000))
.build();

let json = serde_json::to_string(&metadata).expect("should serialize");

// Optional fields with skip_serializing_if should be absent.
assert!(
!json.contains("\"window_start\""),
"window_start should not appear when None"
);
assert!(
!json.contains("\"row_keys_proto\""),
"row_keys_proto should not appear when None"
);
assert!(
!json.contains("\"zonemap_regexes\""),
"zonemap_regexes should not appear when empty"
);
}
}
Loading
Loading