-
Notifications
You must be signed in to change notification settings - Fork 538
[phase-31 4/4] PostgreSQL metastore — migration + compaction columns #6245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: gtt/phase-31-writer-wiring
Are you sure you want to change the base?
Changes from all commits
ff605b9
723168f
9ca263d
73a20ef
75c15a0
ef21859
b4dac46
db51a96
605708e
f21fff5
f790519
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| -- Reverse Phase 31: Remove compaction metadata columns and triggers. | ||
| DROP TRIGGER IF EXISTS set_publish_timestamp_on_metrics_split_publish ON metrics_splits CASCADE; | ||
| DROP FUNCTION IF EXISTS set_publish_timestamp_for_metrics_split(); | ||
| DROP INDEX IF EXISTS idx_metrics_splits_compaction_scope; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS node_id; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS delete_opstamp; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS maturity_timestamp; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS zonemap_regexes; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS row_keys; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS num_merge_ops; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS sort_fields; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_duration_secs; | ||
| ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_start; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| -- Phase 31: Add compaction metadata columns to metrics_splits. | ||
| -- These columns support time-windowed compaction planning and execution. | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_start BIGINT; | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_duration_secs INTEGER; | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS sort_fields TEXT NOT NULL DEFAULT ''; | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS num_merge_ops INTEGER NOT NULL DEFAULT 0; | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS row_keys BYTEA; | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS zonemap_regexes JSONB NOT NULL DEFAULT '{}'; | ||
|
|
||
| -- Columns present on the `splits` table that were missing from `metrics_splits`. | ||
| -- maturity_timestamp: compaction planner needs this to restrict candidates to | ||
| -- Published-and-immature splits, matching the logic the log-side merge planner uses. | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS maturity_timestamp TIMESTAMP DEFAULT TO_TIMESTAMP(0); | ||
| -- delete_opstamp: tracks which delete tasks have been applied to a split. | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS delete_opstamp BIGINT CHECK (delete_opstamp >= 0) DEFAULT 0; | ||
| -- node_id: identifies which node produced the split. | ||
| ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS node_id VARCHAR(253); | ||
|
|
||
| -- Auto-set publish_timestamp when a split transitions Staged → Published, | ||
| -- matching the trigger on the `splits` table (migration 3). | ||
| CREATE OR REPLACE FUNCTION set_publish_timestamp_for_metrics_split() RETURNS trigger AS $$ | ||
| BEGIN | ||
| IF (TG_OP = 'UPDATE') AND (NEW.split_state = 'Published') AND (OLD.split_state = 'Staged') THEN | ||
| NEW.publish_timestamp := (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'); | ||
| END IF; | ||
| RETURN NEW; | ||
| END; | ||
| $$ LANGUAGE plpgsql; | ||
|
|
||
| DROP TRIGGER IF EXISTS set_publish_timestamp_on_metrics_split_publish ON metrics_splits CASCADE; | ||
| CREATE TRIGGER set_publish_timestamp_on_metrics_split_publish | ||
| BEFORE UPDATE ON metrics_splits | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE set_publish_timestamp_for_metrics_split(); | ||
|
|
||
| -- Compaction scope index: supports the compaction planner's primary query pattern | ||
| -- "give me all Published splits for a given (index_uid, sort_fields, window_start) triple." | ||
| CREATE INDEX IF NOT EXISTS idx_metrics_splits_compaction_scope | ||
| ON metrics_splits (index_uid, sort_fields, window_start) | ||
| WHERE split_state = 'Published'; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,19 @@ pub(crate) struct StoredMetricsSplit { | |
| pub state: MetricsSplitState, | ||
| /// Update timestamp (Unix epoch seconds). | ||
| pub update_timestamp: i64, | ||
| /// Create timestamp (Unix epoch seconds). | ||
| pub create_timestamp: i64, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| /// Node that produced this split. | ||
| #[serde(default)] | ||
| pub node_id: String, | ||
| /// Delete opstamp. | ||
| #[serde(default)] | ||
| pub delete_opstamp: u64, | ||
| /// Maturity timestamp (Unix epoch seconds). Splits with | ||
| /// maturity_timestamp <= now are considered mature. | ||
| /// Defaults to 0 (epoch), meaning mature immediately. | ||
| #[serde(default)] | ||
| pub maturity_timestamp: i64, | ||
| } | ||
|
|
||
| /// A `FileBackedIndex` object carries an index metadata and its split metadata. | ||
|
|
@@ -759,6 +772,10 @@ impl FileBackedIndex { | |
| metadata, | ||
| state: MetricsSplitState::Staged, | ||
| update_timestamp: now, | ||
| create_timestamp: now, | ||
| node_id: String::new(), | ||
| delete_opstamp: 0, | ||
| maturity_timestamp: 0, | ||
| }; | ||
| self.metrics_splits.insert(split_id, stored); | ||
| } | ||
|
|
@@ -907,21 +924,37 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp | |
| // Filter by state | ||
| if !query.split_states.is_empty() { | ||
| let state_str = split.state.as_str(); | ||
| if !query.split_states.iter().any(|s| s == state_str) { | ||
| if !query.split_states.iter().any(|s| s.as_str() == state_str) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| // Filter by time range | ||
| if let Some(start) = query.time_range_start | ||
| && (split.metadata.time_range.end_secs as i64) < start | ||
| { | ||
| return false; | ||
| } | ||
| if let Some(end) = query.time_range_end | ||
| && (split.metadata.time_range.start_secs as i64) > end | ||
| { | ||
| return false; | ||
| // Filter by time range. | ||
| // When sort_fields is set this is a compaction query and time_range | ||
| // refers to the compaction window; otherwise it refers to the data | ||
| // time range. Both use intersection semantics via FilterRange. | ||
| if !query.time_range.is_unbounded() { | ||
| if query.sort_fields.is_some() { | ||
| // Compaction path: intersect against the split's window. | ||
| let split_start = split.metadata.window_start(); | ||
| let split_duration = split.metadata.window_duration_secs() as i64; | ||
| match split_start { | ||
| Some(split_start) if split_duration > 0 => { | ||
| let split_end = split_start + split_duration - 1; | ||
| if !query.time_range.overlaps_with(split_start..=split_end) { | ||
| return false; | ||
| } | ||
| } | ||
| _ => return false, | ||
| } | ||
| } else { | ||
| // Read path: intersect against the split's data time range. | ||
| let data_range = split.metadata.time_range.start_secs as i64 | ||
| ..=split.metadata.time_range.end_secs as i64; | ||
| if !query.time_range.overlaps_with(data_range) { | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Filter by metric names | ||
|
|
@@ -979,6 +1012,44 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp | |
| } | ||
| } | ||
|
|
||
| if let Some(ref sort_fields) = query.sort_fields | ||
| && split.metadata.sort_fields != *sort_fields | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| if let Some(node_id) = &query.node_id | ||
| && split.node_id != *node_id | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| if !query.delete_opstamp.contains(&split.delete_opstamp) { | ||
| return false; | ||
| } | ||
|
|
||
| if !query.update_timestamp.contains(&split.update_timestamp) { | ||
| return false; | ||
| } | ||
|
|
||
| if !query.create_timestamp.contains(&split.create_timestamp) { | ||
| return false; | ||
| } | ||
|
|
||
| match &query.mature { | ||
| Bound::Included(evaluation_datetime) => { | ||
| if split.maturity_timestamp > evaluation_datetime.unix_timestamp() { | ||
| return false; | ||
| } | ||
| } | ||
| Bound::Excluded(evaluation_datetime) => { | ||
| if split.maturity_timestamp <= evaluation_datetime.unix_timestamp() { | ||
| return false; | ||
| } | ||
| } | ||
| Bound::Unbounded => {} | ||
| } | ||
|
|
||
| true | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,17 +51,15 @@ use crate::checkpoint::IndexCheckpointDelta; | |
| use crate::{Split, SplitMetadata, SplitState}; | ||
|
|
||
| /// Query parameters for listing metrics splits. | ||
| #[derive(Debug, Clone, Default, Serialize, Deserialize)] | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct ListMetricsSplitsQuery { | ||
| /// Index UID to filter by (required). | ||
| pub index_uid: IndexUid, | ||
| /// Split states to include. | ||
| #[serde(default)] | ||
| pub split_states: Vec<String>, | ||
| /// Time range start (inclusive). | ||
| pub time_range_start: Option<i64>, | ||
| /// Time range end (inclusive). | ||
| pub time_range_end: Option<i64>, | ||
| pub split_states: Vec<SplitState>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for updating these, my bad for not making sure these were aligned with logs earlier |
||
| /// The time range to filter by. | ||
| pub time_range: FilterRange<i64>, | ||
| /// Metric names to filter by (any match). | ||
| #[serde(default)] | ||
| pub metric_names: Vec<String>, | ||
|
|
@@ -75,30 +73,70 @@ pub struct ListMetricsSplitsQuery { | |
| pub tag_region: Option<String>, | ||
| /// Host tag filter. | ||
| pub tag_host: Option<String>, | ||
| /// Sort fields filter for compaction scope queries. | ||
| pub sort_fields: Option<String>, | ||
| /// A specific node ID to filter by. | ||
| pub node_id: Option<NodeId>, | ||
| /// The delete opstamp range to filter by. | ||
| pub delete_opstamp: FilterRange<u64>, | ||
| /// The update timestamp range to filter by. | ||
| pub update_timestamp: FilterRange<i64>, | ||
| /// The create timestamp range to filter by. | ||
| pub create_timestamp: FilterRange<i64>, | ||
| /// The datetime at which you include or exclude mature splits. | ||
| pub mature: Bound<OffsetDateTime>, | ||
| /// Limit number of results. | ||
| pub limit: Option<usize>, | ||
| } | ||
|
|
||
| impl Default for ListMetricsSplitsQuery { | ||
| fn default() -> Self { | ||
| Self { | ||
| index_uid: IndexUid::default(), | ||
| split_states: Vec::new(), | ||
| time_range: Default::default(), | ||
| metric_names: Vec::new(), | ||
| tag_service: None, | ||
| tag_env: None, | ||
| tag_datacenter: None, | ||
| tag_region: None, | ||
| tag_host: None, | ||
| sort_fields: None, | ||
| node_id: None, | ||
| delete_opstamp: Default::default(), | ||
| update_timestamp: Default::default(), | ||
| create_timestamp: Default::default(), | ||
| mature: Bound::Unbounded, | ||
| limit: None, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ListMetricsSplitsQuery { | ||
| /// Creates a query for all splits in an index. | ||
| pub fn for_index(index_uid: impl Into<IndexUid>) -> Self { | ||
| Self { | ||
| index_uid: index_uid.into(), | ||
| split_states: vec!["Published".to_string()], | ||
| split_states: vec![SplitState::Published], | ||
| ..Default::default() | ||
| } | ||
| } | ||
|
|
||
| /// Filter by split states. | ||
| pub fn with_split_states(mut self, states: Vec<String>) -> Self { | ||
| self.split_states = states; | ||
| pub fn with_split_states(mut self, states: impl AsRef<[SplitState]>) -> Self { | ||
| self.split_states = states.as_ref().to_vec(); | ||
| self | ||
| } | ||
|
|
||
| /// Filter by time range. | ||
| pub fn with_time_range(mut self, start: i64, end: i64) -> Self { | ||
| self.time_range_start = Some(start); | ||
| self.time_range_end = Some(end); | ||
| /// Filter by time range (inclusive on both ends). | ||
| pub fn with_time_range_start_gte(mut self, v: i64) -> Self { | ||
| self.time_range.start = Bound::Included(v); | ||
| self | ||
| } | ||
|
|
||
| /// Filter by time range (inclusive on both ends). | ||
| pub fn with_time_range_end_lte(mut self, v: i64) -> Self { | ||
| self.time_range.end = Bound::Included(v); | ||
| self | ||
| } | ||
|
|
||
|
|
@@ -107,6 +145,21 @@ impl ListMetricsSplitsQuery { | |
| self.metric_names = names; | ||
| self | ||
| } | ||
|
|
||
| /// Filter by compaction scope: splits whose window intersects | ||
| /// `[window_start, window_start + window_duration_secs)` and whose | ||
| /// sort_fields match exactly. | ||
| pub fn with_compaction_scope( | ||
| mut self, | ||
| window_start: i64, | ||
| window_duration_secs: u32, | ||
| sort_fields: impl Into<String>, | ||
| ) -> Self { | ||
| self.time_range.start = Bound::Included(window_start); | ||
| self.time_range.end = Bound::Excluded(window_start + window_duration_secs as i64); | ||
| self.sort_fields = Some(sort_fields.into()); | ||
| self | ||
| } | ||
| } | ||
|
|
||
| /// Splits batch size returned by the stream splits API | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.