-
Notifications
You must be signed in to change notification settings - Fork 538
[phase-31 2/4] Compaction metadata types #6258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: gtt/phase-31-sort-schema
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| //! Metrics split metadata definitions. | ||
|
|
||
| use std::collections::{HashMap, HashSet}; | ||
| use std::ops::Range; | ||
| use std::time::SystemTime; | ||
|
|
||
| use serde::{Deserialize, Serialize}; | ||
|
|
@@ -120,7 +121,12 @@ impl std::fmt::Display for MetricsSplitState { | |
| } | ||
|
|
||
| /// Metadata for a metrics split. | ||
| /// | ||
| /// The `window` field stores the time window as `[start, start + duration)`. | ||
| /// For JSON serialization, it is decomposed into `window_start` and | ||
| /// `window_duration_secs` for backward compatibility with pre-Phase-31 code. | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| #[serde(from = "MetricsSplitMetadataSerde", into = "MetricsSplitMetadataSerde")] | ||
| pub struct MetricsSplitMetadata { | ||
| /// Unique split identifier. | ||
| pub split_id: SplitId, | ||
|
|
@@ -153,6 +159,117 @@ pub struct MetricsSplitMetadata { | |
|
|
||
| /// When this split was created. | ||
| pub created_at: SystemTime, | ||
|
|
||
| /// Parquet file path relative to storage root. | ||
| pub parquet_file: String, | ||
|
|
||
| /// Time window as `[start, start + duration)` in epoch seconds. | ||
| /// None for pre-Phase-31 splits (backward compat). | ||
| pub window: Option<Range<i64>>, | ||
|
|
||
| /// Sort schema as Husky-style string (e.g., "metric_name|host|timestamp/V2"). | ||
| /// Empty string for pre-Phase-31 splits. | ||
| pub sort_fields: String, | ||
|
|
||
| /// Number of merge operations this split has been through. | ||
| /// 0 for newly ingested splits. | ||
| pub num_merge_ops: u32, | ||
|
|
||
| /// RowKeys (sort-key min/max boundaries) as proto bytes. | ||
| /// None for pre-Phase-31 splits or splits without sort schema. | ||
| pub row_keys_proto: Option<Vec<u8>>, | ||
|
|
||
| /// Per-column zonemap regex strings, keyed by column name. | ||
| /// Empty for pre-Phase-31 splits. | ||
| pub zonemap_regexes: HashMap<String, String>, | ||
| } | ||
|
|
||
| /// Serde helper struct that uses `window_start` / `window_duration_secs` field | ||
| /// names for JSON backward compatibility while the in-memory representation uses | ||
| /// `Option<Range<i64>>`. | ||
| #[derive(Serialize, Deserialize)] | ||
| struct MetricsSplitMetadataSerde { | ||
| split_id: SplitId, | ||
| index_uid: String, | ||
| time_range: TimeRange, | ||
| num_rows: u64, | ||
| size_bytes: u64, | ||
| metric_names: HashSet<String>, | ||
| low_cardinality_tags: HashMap<String, HashSet<String>>, | ||
| high_cardinality_tag_keys: HashSet<String>, | ||
| created_at: SystemTime, | ||
| parquet_file: String, | ||
|
|
||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| window_start: Option<i64>, | ||
|
|
||
| #[serde(default)] | ||
| window_duration_secs: u32, | ||
|
|
||
| #[serde(default)] | ||
| sort_fields: String, | ||
|
|
||
| #[serde(default)] | ||
| num_merge_ops: u32, | ||
|
|
||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| row_keys_proto: Option<Vec<u8>>, | ||
|
|
||
| #[serde(default, skip_serializing_if = "HashMap::is_empty")] | ||
| zonemap_regexes: HashMap<String, String>, | ||
| } | ||
|
|
||
| impl From<MetricsSplitMetadataSerde> for MetricsSplitMetadata { | ||
| fn from(s: MetricsSplitMetadataSerde) -> Self { | ||
| let window = match (s.window_start, s.window_duration_secs) { | ||
| (Some(start), dur) if dur > 0 => Some(start..start + dur as i64), | ||
| _ => None, | ||
| }; | ||
| Self { | ||
| split_id: s.split_id, | ||
| index_uid: s.index_uid, | ||
| time_range: s.time_range, | ||
| num_rows: s.num_rows, | ||
| size_bytes: s.size_bytes, | ||
| metric_names: s.metric_names, | ||
| low_cardinality_tags: s.low_cardinality_tags, | ||
| high_cardinality_tag_keys: s.high_cardinality_tag_keys, | ||
| created_at: s.created_at, | ||
| parquet_file: s.parquet_file, | ||
| window, | ||
| sort_fields: s.sort_fields, | ||
| num_merge_ops: s.num_merge_ops, | ||
| row_keys_proto: s.row_keys_proto, | ||
| zonemap_regexes: s.zonemap_regexes, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl From<MetricsSplitMetadata> for MetricsSplitMetadataSerde { | ||
| fn from(m: MetricsSplitMetadata) -> Self { | ||
| let (window_start, window_duration_secs) = match &m.window { | ||
| Some(w) => (Some(w.start), (w.end - w.start) as u32), | ||
| None => (None, 0), | ||
| }; | ||
| Self { | ||
| split_id: m.split_id, | ||
| index_uid: m.index_uid, | ||
| time_range: m.time_range, | ||
| num_rows: m.num_rows, | ||
| size_bytes: m.size_bytes, | ||
| metric_names: m.metric_names, | ||
| low_cardinality_tags: m.low_cardinality_tags, | ||
| high_cardinality_tag_keys: m.high_cardinality_tag_keys, | ||
| created_at: m.created_at, | ||
| parquet_file: m.parquet_file, | ||
| window_start, | ||
| window_duration_secs, | ||
| sort_fields: m.sort_fields, | ||
| num_merge_ops: m.num_merge_ops, | ||
| row_keys_proto: m.row_keys_proto, | ||
| zonemap_regexes: m.zonemap_regexes, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl MetricsSplitMetadata { | ||
|
|
@@ -167,6 +284,19 @@ impl MetricsSplitMetadata { | |
| /// Tags with >= CARDINALITY_THRESHOLD unique values use Parquet bloom filters. | ||
| pub const CARDINALITY_THRESHOLD: usize = 1000; | ||
|
|
||
| /// Returns the window start in epoch seconds, or `None` for pre-Phase-31 splits. | ||
| pub fn window_start(&self) -> Option<i64> { | ||
| self.window.as_ref().map(|w| w.start) | ||
| } | ||
|
|
||
| /// Returns the window duration in seconds, or 0 for pre-Phase-31 splits. | ||
| pub fn window_duration_secs(&self) -> u32 { | ||
| match &self.window { | ||
| Some(w) => (w.end - w.start) as u32, | ||
| None => 0, | ||
| } | ||
| } | ||
|
|
||
| /// Create a new MetricsSplitMetadata builder. | ||
| pub fn builder() -> MetricsSplitMetadataBuilder { | ||
| MetricsSplitMetadataBuilder::default() | ||
|
|
@@ -221,8 +351,19 @@ pub struct MetricsSplitMetadataBuilder { | |
| metric_names: HashSet<String>, | ||
| low_cardinality_tags: HashMap<String, HashSet<String>>, | ||
| high_cardinality_tag_keys: HashSet<String>, | ||
| parquet_file: String, | ||
| window_start: Option<i64>, | ||
| window_duration_secs: u32, | ||
| sort_fields: String, | ||
| num_merge_ops: u32, | ||
| row_keys_proto: Option<Vec<u8>>, | ||
| zonemap_regexes: HashMap<String, String>, | ||
| } | ||
|
|
||
| // The builder still accepts window_start and window_duration_secs separately | ||
| // to remain compatible with callers that compute them independently (e.g., | ||
| // split_writer). The `build()` method fuses them into `Option<Range<i64>>`. | ||
|
|
||
| impl MetricsSplitMetadataBuilder { | ||
| pub fn split_id(mut self, id: SplitId) -> Self { | ||
| self.split_id = Some(id); | ||
|
|
@@ -284,7 +425,71 @@ impl MetricsSplitMetadataBuilder { | |
| self | ||
| } | ||
|
|
||
| pub fn parquet_file(mut self, path: impl Into<String>) -> Self { | ||
| self.parquet_file = path.into(); | ||
| self | ||
| } | ||
|
|
||
| pub fn window_start_secs(mut self, epoch_secs: i64) -> Self { | ||
| self.window_start = Some(epoch_secs); | ||
| self | ||
| } | ||
|
|
||
| pub fn window_duration_secs(mut self, dur: u32) -> Self { | ||
| self.window_duration_secs = dur; | ||
| self | ||
| } | ||
|
|
||
| pub fn sort_fields(mut self, schema: impl Into<String>) -> Self { | ||
| self.sort_fields = schema.into(); | ||
| self | ||
| } | ||
|
|
||
| pub fn num_merge_ops(mut self, ops: u32) -> Self { | ||
| self.num_merge_ops = ops; | ||
| self | ||
| } | ||
|
|
||
| pub fn row_keys_proto(mut self, bytes: Vec<u8>) -> Self { | ||
| self.row_keys_proto = Some(bytes); | ||
| self | ||
| } | ||
|
|
||
| pub fn add_zonemap_regex( | ||
| mut self, | ||
| column: impl Into<String>, | ||
| regex: impl Into<String>, | ||
| ) -> Self { | ||
| self.zonemap_regexes.insert(column.into(), regex.into()); | ||
| self | ||
| } | ||
|
|
||
| pub fn build(self) -> MetricsSplitMetadata { | ||
| // TW-2 (ADR-003): window_duration must evenly divide 3600. | ||
| // Enforced at build time so no invalid metadata propagates to storage. | ||
| debug_assert!( | ||
| self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0, | ||
| "TW-2 violated: window_duration_secs={} does not divide 3600", | ||
| self.window_duration_secs | ||
| ); | ||
|
|
||
| // TW-1 (ADR-003, partial): window_start and window_duration_secs are paired. | ||
| // If one is set, the other must be too. Pre-Phase-31 splits have both at defaults. | ||
| debug_assert!( | ||
| (self.window_start.is_none() && self.window_duration_secs == 0) | ||
| || (self.window_start.is_some() && self.window_duration_secs > 0), | ||
| "TW-1 violated: window_start and window_duration_secs must be set together \ | ||
| (window_start={:?}, window_duration_secs={})", | ||
| self.window_start, | ||
| self.window_duration_secs | ||
| ); | ||
|
|
||
| // Fuse the two builder fields into a single Range. | ||
| let window = match (self.window_start, self.window_duration_secs) { | ||
| (Some(start), dur) if dur > 0 => Some(start..start + dur as i64), | ||
| _ => None, | ||
| }; | ||
|
|
||
| MetricsSplitMetadata { | ||
| split_id: self.split_id.unwrap_or_else(SplitId::generate), | ||
| index_uid: self.index_uid.expect("index_uid is required"), | ||
|
|
@@ -295,6 +500,12 @@ impl MetricsSplitMetadataBuilder { | |
| low_cardinality_tags: self.low_cardinality_tags, | ||
| high_cardinality_tag_keys: self.high_cardinality_tag_keys, | ||
| created_at: SystemTime::now(), | ||
| parquet_file: self.parquet_file, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The builder stores Useful? React with 👍 / 👎. |
||
| window, | ||
| sort_fields: self.sort_fields, | ||
| num_merge_ops: self.num_merge_ops, | ||
| row_keys_proto: self.row_keys_proto, | ||
| zonemap_regexes: self.zonemap_regexes, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -401,4 +612,98 @@ mod tests { | |
| ); | ||
| assert_eq!(format!("{}", MetricsSplitState::Published), "Published"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_backward_compat_deserialize_pre_phase31_json() { | ||
| // Simulate a JSON string from pre-Phase-31 code (no compaction fields). | ||
| let pre_phase31_json = r#"{ | ||
| "split_id": "metrics_abc123", | ||
| "index_uid": "test-index:00000000000000000000000000", | ||
| "time_range": {"start_secs": 1000, "end_secs": 2000}, | ||
| "num_rows": 500, | ||
| "size_bytes": 1024, | ||
| "metric_names": ["cpu.usage"], | ||
| "low_cardinality_tags": {}, | ||
| "high_cardinality_tag_keys": [], | ||
| "created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0}, | ||
| "parquet_file": "split1.parquet" | ||
| }"#; | ||
|
|
||
| let metadata: MetricsSplitMetadata = | ||
| serde_json::from_str(pre_phase31_json).expect("should deserialize pre-Phase-31 JSON"); | ||
|
|
||
| // New fields should be at their defaults. | ||
| assert!(metadata.window.is_none()); | ||
| assert!(metadata.window_start().is_none()); | ||
| assert_eq!(metadata.window_duration_secs(), 0); | ||
| assert_eq!(metadata.sort_fields, ""); | ||
| assert_eq!(metadata.num_merge_ops, 0); | ||
| assert!(metadata.row_keys_proto.is_none()); | ||
| assert!(metadata.zonemap_regexes.is_empty()); | ||
|
|
||
| // Existing fields should be intact. | ||
| assert_eq!(metadata.split_id.as_str(), "metrics_abc123"); | ||
| assert_eq!(metadata.index_uid, "test-index:00000000000000000000000000"); | ||
| assert_eq!(metadata.num_rows, 500); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_round_trip_with_compaction_fields() { | ||
| let metadata = MetricsSplitMetadata::builder() | ||
| .split_id(SplitId::new("roundtrip-compaction")) | ||
| .index_uid("test-index:00000000000000000000000000") | ||
| .time_range(TimeRange::new(1000, 2000)) | ||
| .num_rows(100) | ||
| .size_bytes(500) | ||
| .window_start_secs(1700000000) | ||
| .window_duration_secs(3600) | ||
| .sort_fields("metric_name|host|timestamp/V2") | ||
| .num_merge_ops(3) | ||
| .row_keys_proto(vec![0x08, 0x01, 0x10, 0x02]) | ||
| .add_zonemap_regex("metric_name", "cpu\\..*") | ||
| .add_zonemap_regex("host", "host-\\d+") | ||
| .build(); | ||
|
|
||
| let json = serde_json::to_string(&metadata).expect("should serialize"); | ||
| let recovered: MetricsSplitMetadata = | ||
| serde_json::from_str(&json).expect("should deserialize"); | ||
|
|
||
| assert_eq!(recovered.window, Some(1700000000..1700003600)); | ||
| assert_eq!(recovered.window_start(), Some(1700000000)); | ||
| assert_eq!(recovered.window_duration_secs(), 3600); | ||
| assert_eq!(recovered.sort_fields, "metric_name|host|timestamp/V2"); | ||
| assert_eq!(recovered.num_merge_ops, 3); | ||
| assert_eq!(recovered.row_keys_proto, Some(vec![0x08, 0x01, 0x10, 0x02])); | ||
| assert_eq!(recovered.zonemap_regexes.len(), 2); | ||
| assert_eq!( | ||
| recovered.zonemap_regexes.get("metric_name").unwrap(), | ||
| "cpu\\..*" | ||
| ); | ||
| assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_skip_serializing_empty_compaction_fields() { | ||
| let metadata = MetricsSplitMetadata::builder() | ||
| .split_id(SplitId::new("skip-test")) | ||
| .index_uid("test-index:00000000000000000000000000") | ||
| .time_range(TimeRange::new(1000, 2000)) | ||
| .build(); | ||
|
|
||
| let json = serde_json::to_string(&metadata).expect("should serialize"); | ||
|
|
||
| // Optional fields with skip_serializing_if should be absent. | ||
| assert!( | ||
| !json.contains("\"window_start\""), | ||
| "window_start should not appear when None" | ||
| ); | ||
| assert!( | ||
| !json.contains("\"row_keys_proto\""), | ||
| "row_keys_proto should not appear when None" | ||
| ); | ||
| assert!( | ||
| !json.contains("\"zonemap_regexes\""), | ||
| "zonemap_regexes should not appear when empty" | ||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MetricsSplitMetadataSerdenow requiresparquet_file, but metadata JSON produced before this commit did not contain that field, so deserialization will fail for existing rows during upgrades.PgMetricsSplit::to_metadata()parsessplit_metadata_json, andmetastore.rscurrently drops rows on parse failure viato_metadata().ok()?, which makes legacy splits disappear from list/query results instead of remaining readable.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean, true if we were in production already, but i don't think this matters.