Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions quickwit/quickwit-indexing/src/actors/parquet_e2e_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ async fn test_file_backed_metastore_metrics_operations() {
use quickwit_config::IndexConfig;
use quickwit_metastore::{
CreateIndexRequestExt, FileBackedMetastore, ListMetricsSplitsQuery,
ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, StageMetricsSplitsRequestExt,
ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, SplitState,
StageMetricsSplitsRequestExt,
};
use quickwit_parquet_engine::split::{MetricsSplitMetadata, MetricsSplitRecord, TimeRange};
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -306,7 +307,7 @@ async fn test_file_backed_metastore_metrics_operations() {

// Verify staged
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
.with_split_states(vec!["Staged".to_string()]);
.with_split_states([SplitState::Staged]);
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
let staged: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
Expand All @@ -327,7 +328,7 @@ async fn test_file_backed_metastore_metrics_operations() {

// Verify published
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
.with_split_states(vec!["Published".to_string()]);
.with_split_states([SplitState::Published]);
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
let published: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
Expand All @@ -336,24 +337,26 @@ async fn test_file_backed_metastore_metrics_operations() {

// Time range filtering
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
.with_split_states(vec!["Published".to_string()])
.with_time_range(1000, 1100);
.with_split_states([SplitState::Published])
.with_time_range_start_gte(1000)
.with_time_range_end_lte(1100);
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
let in_range: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
assert_eq!(in_range.len(), 1);

let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
.with_split_states(vec!["Published".to_string()])
.with_time_range(5000, 5100);
.with_split_states([SplitState::Published])
.with_time_range_start_gte(5000)
.with_time_range_end_lte(5100);
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
let out_of_range: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
assert_eq!(out_of_range.len(), 0);

// Metric name filtering
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
.with_split_states(vec!["Published".to_string()])
.with_split_states([SplitState::Published])
.with_metric_names(vec!["cpu.usage".to_string()]);
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Reverse Phase 31: Remove compaction metadata columns and triggers.
DROP TRIGGER IF EXISTS set_publish_timestamp_on_metrics_split_publish ON metrics_splits CASCADE;
DROP FUNCTION IF EXISTS set_publish_timestamp_for_metrics_split();
DROP INDEX IF EXISTS idx_metrics_splits_compaction_scope;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS node_id;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS delete_opstamp;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS maturity_timestamp;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS zonemap_regexes;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS row_keys;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS num_merge_ops;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS sort_fields;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_duration_secs;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_start;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Phase 31: Add compaction metadata columns to metrics_splits.
-- These columns support time-windowed compaction planning and execution.
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_start BIGINT;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_duration_secs INTEGER;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS sort_fields TEXT NOT NULL DEFAULT '';
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS num_merge_ops INTEGER NOT NULL DEFAULT 0;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS row_keys BYTEA;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS zonemap_regexes JSONB NOT NULL DEFAULT '{}';

-- Columns present on the `splits` table that were missing from `metrics_splits`.
-- maturity_timestamp: compaction planner needs this to restrict candidates to
-- Published-and-immature splits, matching the logic the log-side merge planner uses.
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS maturity_timestamp TIMESTAMP DEFAULT TO_TIMESTAMP(0);
-- delete_opstamp: tracks which delete tasks have been applied to a split.
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS delete_opstamp BIGINT CHECK (delete_opstamp >= 0) DEFAULT 0;
-- node_id: identifies which node produced the split.
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS node_id VARCHAR(253);

-- Auto-set publish_timestamp when a split transitions Staged → Published,
-- matching the trigger on the `splits` table (migration 3).
CREATE OR REPLACE FUNCTION set_publish_timestamp_for_metrics_split() RETURNS trigger AS $$
BEGIN
IF (TG_OP = 'UPDATE') AND (NEW.split_state = 'Published') AND (OLD.split_state = 'Staged') THEN
NEW.publish_timestamp := (CURRENT_TIMESTAMP AT TIME ZONE 'UTC');
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS set_publish_timestamp_on_metrics_split_publish ON metrics_splits CASCADE;
CREATE TRIGGER set_publish_timestamp_on_metrics_split_publish
BEFORE UPDATE ON metrics_splits
FOR EACH ROW
EXECUTE PROCEDURE set_publish_timestamp_for_metrics_split();

-- Compaction scope index: supports the compaction planner's primary query pattern
-- "give me all Published splits for a given (index_uid, sort_fields, window_start) triple."
CREATE INDEX IF NOT EXISTS idx_metrics_splits_compaction_scope
ON metrics_splits (index_uid, sort_fields, window_start)
WHERE split_state = 'Published';
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ pub(crate) struct StoredMetricsSplit {
pub state: MetricsSplitState,
/// Update timestamp (Unix epoch seconds).
pub update_timestamp: i64,
/// Create timestamp (Unix epoch seconds).
pub create_timestamp: i64,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Add default for new create_timestamp field

StoredMetricsSplit now requires create_timestamp during deserialization, but previously persisted file-backed index JSON does not contain this key. When a node upgrades and reloads an index with existing metrics_splits, serde will fail to decode those entries, preventing metastore state from loading. Marking this field with a serde default (like the other newly added fields) is needed for backward-compatible reads.

Useful? React with 👍 / 👎.

/// Node that produced this split.
#[serde(default)]
pub node_id: String,
/// Delete opstamp.
#[serde(default)]
pub delete_opstamp: u64,
/// Maturity timestamp (Unix epoch seconds). Splits with
/// maturity_timestamp <= now are considered mature.
/// Defaults to 0 (epoch), meaning mature immediately.
#[serde(default)]
pub maturity_timestamp: i64,
}

/// A `FileBackedIndex` object carries an index metadata and its split metadata.
Expand Down Expand Up @@ -759,6 +772,10 @@ impl FileBackedIndex {
metadata,
state: MetricsSplitState::Staged,
update_timestamp: now,
create_timestamp: now,
node_id: String::new(),
delete_opstamp: 0,
maturity_timestamp: 0,
};
self.metrics_splits.insert(split_id, stored);
}
Expand Down Expand Up @@ -907,21 +924,37 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp
// Filter by state
if !query.split_states.is_empty() {
let state_str = split.state.as_str();
if !query.split_states.iter().any(|s| s == state_str) {
if !query.split_states.iter().any(|s| s.as_str() == state_str) {
return false;
}
}

// Filter by time range
if let Some(start) = query.time_range_start
&& (split.metadata.time_range.end_secs as i64) < start
{
return false;
}
if let Some(end) = query.time_range_end
&& (split.metadata.time_range.start_secs as i64) > end
{
return false;
// Filter by time range.
// When sort_fields is set this is a compaction query and time_range
// refers to the compaction window; otherwise it refers to the data
// time range. Both use intersection semantics via FilterRange.
if !query.time_range.is_unbounded() {
if query.sort_fields.is_some() {
// Compaction path: intersect against the split's window.
let split_start = split.metadata.window_start();
let split_duration = split.metadata.window_duration_secs() as i64;
match split_start {
Some(split_start) if split_duration > 0 => {
let split_end = split_start + split_duration - 1;
if !query.time_range.overlaps_with(split_start..=split_end) {
return false;
}
}
_ => return false,
}
} else {
// Read path: intersect against the split's data time range.
let data_range = split.metadata.time_range.start_secs as i64
..=split.metadata.time_range.end_secs as i64;
if !query.time_range.overlaps_with(data_range) {
return false;
}
}
}

// Filter by metric names
Expand Down Expand Up @@ -979,6 +1012,44 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp
}
}

if let Some(ref sort_fields) = query.sort_fields
&& split.metadata.sort_fields != *sort_fields
{
return false;
}

if let Some(node_id) = &query.node_id
&& split.node_id != *node_id
{
return false;
}

if !query.delete_opstamp.contains(&split.delete_opstamp) {
return false;
}

if !query.update_timestamp.contains(&split.update_timestamp) {
return false;
}

if !query.create_timestamp.contains(&split.create_timestamp) {
return false;
}

match &query.mature {
Bound::Included(evaluation_datetime) => {
if split.maturity_timestamp > evaluation_datetime.unix_timestamp() {
return false;
}
}
Bound::Excluded(evaluation_datetime) => {
if split.maturity_timestamp <= evaluation_datetime.unix_timestamp() {
return false;
}
}
Bound::Unbounded => {}
}

true
}

Expand Down
79 changes: 66 additions & 13 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,15 @@ use crate::checkpoint::IndexCheckpointDelta;
use crate::{Split, SplitMetadata, SplitState};

/// Query parameters for listing metrics splits.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListMetricsSplitsQuery {
/// Index UID to filter by (required).
pub index_uid: IndexUid,
/// Split states to include.
#[serde(default)]
pub split_states: Vec<String>,
/// Time range start (inclusive).
pub time_range_start: Option<i64>,
/// Time range end (inclusive).
pub time_range_end: Option<i64>,
pub split_states: Vec<SplitState>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for updating these, my bad for not making sure these were aligned with logs earlier

/// The time range to filter by.
pub time_range: FilterRange<i64>,
/// Metric names to filter by (any match).
#[serde(default)]
pub metric_names: Vec<String>,
Expand All @@ -75,30 +73,70 @@ pub struct ListMetricsSplitsQuery {
pub tag_region: Option<String>,
/// Host tag filter.
pub tag_host: Option<String>,
/// Sort fields filter for compaction scope queries.
pub sort_fields: Option<String>,
/// A specific node ID to filter by.
pub node_id: Option<NodeId>,
/// The delete opstamp range to filter by.
pub delete_opstamp: FilterRange<u64>,
/// The update timestamp range to filter by.
pub update_timestamp: FilterRange<i64>,
/// The create timestamp range to filter by.
pub create_timestamp: FilterRange<i64>,
/// The datetime at which you include or exclude mature splits.
pub mature: Bound<OffsetDateTime>,
/// Limit number of results.
pub limit: Option<usize>,
}

impl Default for ListMetricsSplitsQuery {
fn default() -> Self {
Self {
index_uid: IndexUid::default(),
split_states: Vec::new(),
time_range: Default::default(),
metric_names: Vec::new(),
tag_service: None,
tag_env: None,
tag_datacenter: None,
tag_region: None,
tag_host: None,
sort_fields: None,
node_id: None,
delete_opstamp: Default::default(),
update_timestamp: Default::default(),
create_timestamp: Default::default(),
mature: Bound::Unbounded,
limit: None,
}
}
}

impl ListMetricsSplitsQuery {
/// Creates a query for all splits in an index.
pub fn for_index(index_uid: impl Into<IndexUid>) -> Self {
Self {
index_uid: index_uid.into(),
split_states: vec!["Published".to_string()],
split_states: vec![SplitState::Published],
..Default::default()
}
}

/// Filter by split states.
pub fn with_split_states(mut self, states: Vec<String>) -> Self {
self.split_states = states;
pub fn with_split_states(mut self, states: impl AsRef<[SplitState]>) -> Self {
self.split_states = states.as_ref().to_vec();
self
}

/// Filter by time range.
pub fn with_time_range(mut self, start: i64, end: i64) -> Self {
self.time_range_start = Some(start);
self.time_range_end = Some(end);
/// Filter by time range (inclusive on both ends).
pub fn with_time_range_start_gte(mut self, v: i64) -> Self {
self.time_range.start = Bound::Included(v);
self
}

/// Filter by time range (inclusive on both ends).
pub fn with_time_range_end_lte(mut self, v: i64) -> Self {
self.time_range.end = Bound::Included(v);
self
}

Expand All @@ -107,6 +145,21 @@ impl ListMetricsSplitsQuery {
self.metric_names = names;
self
}

/// Filter by compaction scope: splits whose window intersects
/// `[window_start, window_start + window_duration_secs)` and whose
/// sort_fields match exactly.
pub fn with_compaction_scope(
mut self,
window_start: i64,
window_duration_secs: u32,
sort_fields: impl Into<String>,
) -> Self {
self.time_range.start = Bound::Included(window_start);
self.time_range.end = Bound::Excluded(window_start + window_duration_secs as i64);
self.sort_fields = Some(sort_fields.into());
self
}
}

/// Splits batch size returned by the stream splits API
Expand Down
Loading
Loading