Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ pub fn is_metrics_index(index_id: &str) -> bool {
index_id.starts_with("otel-metrics") || index_id.starts_with("metrics-")
}

/// Returns whether the given index ID corresponds to a sketches index.
///
/// Sketches indexes use the Parquet/DataFusion pipeline with sketch-specific
/// processors and writers.
pub fn is_sketches_index(index_id: &str) -> bool {
index_id.starts_with("sketches-")
}

/// Returns whether the given index ID uses the Parquet/DataFusion pipeline.
pub fn is_parquet_pipeline_index(index_id: &str) -> bool {
is_metrics_index(index_id) || is_sketches_index(index_id)
}

#[macro_export]
macro_rules! ignore_error_kind {
($kind:path, $expr:expr) => {
Expand Down Expand Up @@ -444,6 +457,21 @@ mod tests {
assert!(!is_metrics_index("my-metrics-index")); // Not prefixed
}

#[test]
fn test_is_sketches_index() {
assert!(is_sketches_index("sketches-default"));
assert!(!is_sketches_index("otel-metrics"));
assert!(!is_sketches_index("my-index"));
}

#[test]
fn test_is_parquet_pipeline_index() {
assert!(is_parquet_pipeline_index("otel-metrics"));
assert!(is_parquet_pipeline_index("sketches-default"));
assert!(!is_parquet_pipeline_index("otel-logs-v0_7"));
assert!(!is_parquet_pipeline_index("my-index"));
}

#[test]
fn test_parse_bool_lenient() {
assert_eq!(parse_bool_lenient("true"), Some(true));
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::{Duration, Instant};
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_common::is_metrics_index;
use quickwit_common::is_parquet_pipeline_index;
use quickwit_common::pretty::PrettySample;
use quickwit_config::{FileSourceParams, SourceParams, indexing_pipeline_params_fingerprint};
use quickwit_proto::indexing::{
Expand Down Expand Up @@ -218,7 +218,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
SourceParams::IngestApi => {
// Metrics indexes should use IngestV2 only, not IngestV1.
// The ParquetSourceLoader doesn't support IngestV1.
if is_metrics_index(&source_uid.index_uid.index_id) {
if is_parquet_pipeline_index(&source_uid.index_uid.index_id) {
continue;
}
// TODO ingest v1 is scheduled differently
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] }
tempfile = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-parquet-engine = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
Expand Down
48 changes: 38 additions & 10 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use quickwit_actors::{
use quickwit_common::metrics::OwnedGaugeGuard;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::{KillSwitch, is_metrics_index};
use quickwit_common::{KillSwitch, is_parquet_pipeline_index, is_sketches_index};
use quickwit_config::{IndexingSettings, RetentionPolicy, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_ingest::IngesterPool;
Expand Down Expand Up @@ -409,9 +409,12 @@ impl IndexingPipeline {

let index_id = &self.params.pipeline_id.index_uid.index_id;

// Route metrics indexes to the Parquet/DataFusion pipeline
if is_metrics_index(index_id) {
return self.spawn_parquet_pipeline(ctx).await;
// Route metrics and sketches indexes to the Parquet/DataFusion pipeline
if is_parquet_pipeline_index(index_id) {
let use_sketch_processors = is_sketches_index(index_id);
return self
.spawn_parquet_pipeline(ctx, use_sketch_processors)
.await;
}

let source_id = &self.params.pipeline_id.source_id;
Expand Down Expand Up @@ -585,16 +588,21 @@ impl IndexingPipeline {
index=%self.params.pipeline_id.index_uid.index_id,
r#gen=self.generation()
))]
async fn spawn_parquet_pipeline(&mut self, ctx: &ActorContext<Self>) -> anyhow::Result<()> {
async fn spawn_parquet_pipeline(
&mut self,
ctx: &ActorContext<Self>,
use_sketch_processors: bool,
) -> anyhow::Result<()> {
let index_id = &self.params.pipeline_id.index_uid.index_id;
let source_id = &self.params.pipeline_id.source_id;

info!(
index_id,
source_id,
use_sketch_processors,
pipeline_uid=%self.params.pipeline_id.pipeline_uid,
root_dir=%self.params.indexing_directory.path().display(),
"spawning parquet indexing pipeline for metrics",
"spawning parquet indexing pipeline",
);

let (source_mailbox, source_inbox) = ctx
Expand Down Expand Up @@ -637,14 +645,24 @@ impl IndexingPipeline {
.spawn(parquet_uploader);

// ParquetPackager
let parquet_schema = quickwit_parquet_engine::schema::ParquetSchema::new();
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
parquet_schema,
let split_kind = if use_sketch_processors {
quickwit_parquet_engine::split::ParquetSplitKind::Sketches
} else {
quickwit_parquet_engine::split::ParquetSplitKind::Metrics
};
let sort_order = if use_sketch_processors {
quickwit_parquet_engine::schema::SKETCH_SORT_ORDER
} else {
quickwit_parquet_engine::schema::SORT_ORDER
};
let split_writer_kind = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
split_kind,
writer_config,
sort_order,
self.params.indexing_directory.path(),
);
let parquet_packager = ParquetPackager::new(split_writer, parquet_uploader_mailbox);
let parquet_packager = ParquetPackager::new(split_writer_kind, parquet_uploader_mailbox);
let (parquet_packager_mailbox, parquet_packager_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
Expand All @@ -666,7 +684,17 @@ impl IndexingPipeline {
.spawn(parquet_indexer);

// ParquetDocProcessor
let processor = if use_sketch_processors {
crate::actors::parquet_doc_processor::IngestProcessor::Sketches(
quickwit_parquet_engine::ingest::SketchParquetIngestProcessor::new(),
)
} else {
crate::actors::parquet_doc_processor::IngestProcessor::Metrics(
quickwit_parquet_engine::ingest::ParquetIngestProcessor,
)
};
let parquet_doc_processor = ParquetDocProcessor::new(
processor,
index_id.to_string(),
source_id.to_string(),
parquet_indexer_mailbox,
Expand Down
Loading
Loading