Skip to content

Commit 75a584b

Browse files
committed
review: parquet_file singular, proto doc link, fix metastore accessor
1 parent 6a6a1b3 commit 75a584b

5 files changed

Lines changed: 75 additions & 40 deletions

File tree

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp
981981

982982
// Filter by compaction scope
983983
if let Some(ws) = query.window_start
984-
&& split.metadata.window_start != Some(ws)
984+
&& split.metadata.window_start() != Some(ws)
985985
{
986986
return false;
987987
}

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2440,45 +2440,80 @@ impl MetastoreService for PostgresqlMetastore {
24402440
);
24412441

24422442
// Only delete splits that are marked for deletion
2443+
// Match the non-metrics delete_splits pattern: distinguish
2444+
// "not found" (warn + succeed) from "not deletable" (FailedPrecondition).
24432445
const DELETE_SPLITS_QUERY: &str = r#"
2444-
DELETE FROM metrics_splits
2445-
WHERE
2446-
index_uid = $1
2447-
AND split_id = ANY($2)
2448-
AND split_state = 'MarkedForDeletion'
2449-
RETURNING split_id
2446+
WITH input_splits AS (
2447+
SELECT input_splits.split_id, metrics_splits.split_state
2448+
FROM UNNEST($2::text[]) AS input_splits(split_id)
2449+
LEFT JOIN metrics_splits
2450+
ON metrics_splits.index_uid = $1
2451+
AND metrics_splits.split_id = input_splits.split_id
2452+
),
2453+
deleted AS (
2454+
DELETE FROM metrics_splits
2455+
USING input_splits
2456+
WHERE
2457+
metrics_splits.index_uid = $1
2458+
AND metrics_splits.split_id = input_splits.split_id
2459+
AND NOT EXISTS (
2460+
SELECT 1 FROM input_splits
2461+
WHERE split_state IN ('Staged', 'Published')
2462+
)
2463+
RETURNING metrics_splits.split_id
2464+
)
2465+
SELECT
2466+
(SELECT COUNT(*) FROM input_splits WHERE split_state IS NOT NULL) as num_found,
2467+
(SELECT COUNT(*) FROM deleted) as num_deleted,
2468+
COALESCE(
2469+
(SELECT ARRAY_AGG(split_id) FROM input_splits
2470+
WHERE split_state IN ('Staged', 'Published')),
2471+
ARRAY[]::text[]
2472+
) as not_deletable,
2473+
COALESCE(
2474+
(SELECT ARRAY_AGG(split_id) FROM input_splits
2475+
WHERE split_state IS NULL),
2476+
ARRAY[]::text[]
2477+
) as not_found
24502478
"#;
24512479

2452-
let deleted_split_ids: Vec<String> = sqlx::query_scalar(DELETE_SPLITS_QUERY)
2480+
let (num_found, num_deleted, not_deletable_ids, not_found_ids): (
2481+
i64,
2482+
i64,
2483+
Vec<String>,
2484+
Vec<String>,
2485+
) = sqlx::query_as(DELETE_SPLITS_QUERY)
24532486
.bind(request.index_uid())
24542487
.bind(&request.split_ids)
2455-
.fetch_all(&self.connection_pool)
2488+
.fetch_one(&self.connection_pool)
24562489
.await
24572490
.map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?;
24582491

2459-
// Log if some splits were not deleted (either non-existent or not
2460-
// in MarkedForDeletion state). Delete is idempotent — we don't error
2461-
// for missing splits.
2462-
if deleted_split_ids.len() != request.split_ids.len() {
2463-
let not_deleted: Vec<String> = request
2464-
.split_ids
2465-
.iter()
2466-
.filter(|id| !deleted_split_ids.contains(id))
2467-
.cloned()
2468-
.collect();
2492+
if !not_deletable_ids.is_empty() {
2493+
let message = format!(
2494+
"splits `{}` are not deletable",
2495+
not_deletable_ids.join(", ")
2496+
);
2497+
let entity = EntityKind::Splits {
2498+
split_ids: not_deletable_ids,
2499+
};
2500+
return Err(MetastoreError::FailedPrecondition { entity, message });
2501+
}
24692502

2470-
if !not_deleted.is_empty() {
2471-
warn!(
2472-
index_uid = %request.index_uid(),
2473-
not_deleted = ?not_deleted,
2474-
"some metrics splits were not deleted (non-existent or not marked for deletion)"
2475-
);
2476-
}
2503+
if !not_found_ids.is_empty() {
2504+
warn!(
2505+
index_uid = %request.index_uid(),
2506+
not_found = ?not_found_ids,
2507+
"{} metrics splits were not found and could not be deleted",
2508+
not_found_ids.len()
2509+
);
24772510
}
24782511

2512+
let _ = (num_found, num_deleted); // used by the CTE logic
2513+
24792514
info!(
24802515
index_uid = %request.index_uid(),
2481-
deleted_count = deleted_split_ids.len(),
2516+
num_deleted,
24822517
"deleted metrics splits successfully"
24832518
);
24842519
Ok(EmptyResponse {})

quickwit/quickwit-metastore/src/tests/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use quickwit_proto::metastore::{
2222
DeleteIndexRequest, DeleteSplitsRequest, ListSplitsRequest, MarkSplitsForDeletionRequest,
2323
MetastoreServiceClient, MetastoreServiceGrpcClientAdapter,
2424
};
25-
2625
use quickwit_proto::tonic::transport::Channel;
2726
use quickwit_proto::types::IndexUid;
2827

quickwit/quickwit-parquet-engine/src/split/metadata.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ pub struct MetricsSplitMetadata {
160160
/// When this split was created.
161161
pub created_at: SystemTime,
162162

163-
/// Parquet file path(s) relative to storage root.
164-
pub parquet_files: Vec<String>,
163+
/// Parquet file path relative to storage root.
164+
pub parquet_file: String,
165165

166166
/// Time window as `[start, start + duration)` in epoch seconds.
167167
/// None for pre-Phase-31 splits (backward compat).
@@ -175,7 +175,8 @@ pub struct MetricsSplitMetadata {
175175
/// 0 for newly ingested splits.
176176
pub num_merge_ops: u32,
177177

178-
/// RowKeys (sort-key min/max boundaries) as proto bytes.
178+
/// RowKeys (sort-key min/max boundaries) as serialized proto bytes
179+
/// ([`sortschema::RowKeys`](../../quickwit-proto/protos/event_store_sortschema/event_store_sortschema.proto)).
179180
/// None for pre-Phase-31 splits or splits without sort schema.
180181
pub row_keys_proto: Option<Vec<u8>>,
181182

@@ -198,7 +199,7 @@ struct MetricsSplitMetadataSerde {
198199
low_cardinality_tags: HashMap<String, HashSet<String>>,
199200
high_cardinality_tag_keys: HashSet<String>,
200201
created_at: SystemTime,
201-
parquet_files: Vec<String>,
202+
parquet_file: String,
202203

203204
#[serde(default, skip_serializing_if = "Option::is_none")]
204205
window_start: Option<i64>,
@@ -235,7 +236,7 @@ impl From<MetricsSplitMetadataSerde> for MetricsSplitMetadata {
235236
low_cardinality_tags: s.low_cardinality_tags,
236237
high_cardinality_tag_keys: s.high_cardinality_tag_keys,
237238
created_at: s.created_at,
238-
parquet_files: s.parquet_files,
239+
parquet_file: s.parquet_file,
239240
window,
240241
sort_fields: s.sort_fields,
241242
num_merge_ops: s.num_merge_ops,
@@ -261,7 +262,7 @@ impl From<MetricsSplitMetadata> for MetricsSplitMetadataSerde {
261262
low_cardinality_tags: m.low_cardinality_tags,
262263
high_cardinality_tag_keys: m.high_cardinality_tag_keys,
263264
created_at: m.created_at,
264-
parquet_files: m.parquet_files,
265+
parquet_file: m.parquet_file,
265266
window_start,
266267
window_duration_secs,
267268
sort_fields: m.sort_fields,
@@ -351,7 +352,7 @@ pub struct MetricsSplitMetadataBuilder {
351352
metric_names: HashSet<String>,
352353
low_cardinality_tags: HashMap<String, HashSet<String>>,
353354
high_cardinality_tag_keys: HashSet<String>,
354-
parquet_files: Vec<String>,
355+
parquet_file: String,
355356
window_start: Option<i64>,
356357
window_duration_secs: u32,
357358
sort_fields: String,
@@ -425,8 +426,8 @@ impl MetricsSplitMetadataBuilder {
425426
self
426427
}
427428

428-
pub fn add_parquet_file(mut self, path: impl Into<String>) -> Self {
429-
self.parquet_files.push(path.into());
429+
pub fn parquet_file(mut self, path: impl Into<String>) -> Self {
430+
self.parquet_file = path.into();
430431
self
431432
}
432433

@@ -500,7 +501,7 @@ impl MetricsSplitMetadataBuilder {
500501
low_cardinality_tags: self.low_cardinality_tags,
501502
high_cardinality_tag_keys: self.high_cardinality_tag_keys,
502503
created_at: SystemTime::now(),
503-
parquet_files: self.parquet_files,
504+
parquet_file: self.parquet_file,
504505
window,
505506
sort_fields: self.sort_fields,
506507
num_merge_ops: self.num_merge_ops,
@@ -626,7 +627,7 @@ mod tests {
626627
"low_cardinality_tags": {},
627628
"high_cardinality_tag_keys": [],
628629
"created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0},
629-
"parquet_files": ["split1.parquet"]
630+
"parquet_file": "split1.parquet"
630631
}"#;
631632

632633
let metadata: MetricsSplitMetadata =

quickwit/quickwit-parquet-engine/src/storage/split_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl ParquetSplitWriter {
122122
.size_bytes(0)
123123
.sort_fields(self.writer.sort_fields_string())
124124
.window_duration_secs(window_duration)
125-
.add_parquet_file(filename);
125+
.parquet_file(filename);
126126

127127
if let Some(ws) = window_start_secs {
128128
builder = builder.window_start_secs(ws);

0 commit comments

Comments
 (0)