Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct IndexingSchedulerState {
///
/// Scheduling executes the following steps:
/// 1. Builds a [`PhysicalIndexingPlan`] from the list of logical indexing tasks. See
/// [`build_physical_indexing_plan`] for the implementation details.
/// `build_physical_indexing_plan` for the implementation details.
/// 2. Apply the [`PhysicalIndexingPlan`]: for each indexer, the scheduler send the indexing tasks
/// by gRPC. An indexer immediately returns an Ok and apply asynchronously the received plan. Any
/// errors (network) happening in this step are ignored. The scheduler runs a control loop that
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct IndexingSchedulerState {
/// Concretely, it will send the faulty nodes of the plan they are supposed to follow.
//
/// Finally, in order to give the time for each indexer to run their indexing tasks, the control
/// plane will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired
/// plane will wait at least `MIN_DURATION_BETWEEN_SCHEDULING` before comparing the desired
/// plan with the running plan.
pub struct IndexingScheduler {
cluster_id: String,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] }
tempfile = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-parquet-engine = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,8 @@ impl IndexingPipeline {
.spawn(parquet_uploader);

// ParquetPackager
let parquet_schema = quickwit_parquet_engine::schema::ParquetSchema::new();
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
parquet_schema,
writer_config,
self.params.indexing_directory.path(),
);
Expand Down
221 changes: 7 additions & 214 deletions quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_metastore::checkpoint::SourceCheckpointDelta;
use quickwit_parquet_engine::ingest::{IngestError, ParquetIngestProcessor};
use quickwit_parquet_engine::schema::ParquetSchema;
use quickwit_proto::types::{IndexId, SourceId};
use serde::Serialize;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -143,8 +142,7 @@ impl ParquetDocProcessor {
source_id: SourceId,
indexer_mailbox: Mailbox<ParquetIndexer>,
) -> Self {
let schema = ParquetSchema::new();
let processor = ParquetIngestProcessor::new(schema);
let processor = ParquetIngestProcessor;
let counters = ParquetDocProcessorCounters::new(index_id.clone(), source_id.clone());

info!(
Expand Down Expand Up @@ -306,7 +304,7 @@ impl Handler<RawDocBatch> for ParquetDocProcessor {
// forever.
if !checkpoint_forwarded && !checkpoint_delta.is_empty() {
let empty_batch =
RecordBatch::new_empty(self.processor.schema().arrow_schema().clone());
RecordBatch::new_empty(std::sync::Arc::new(arrow::datatypes::Schema::empty()));
let processed_batch =
ProcessedParquetBatch::new(empty_batch, checkpoint_delta, force_commit);
ctx.send_message(&self.indexer_mailbox, processed_batch)
Expand Down Expand Up @@ -399,14 +397,8 @@ mod tests {

#[tokio::test]
async fn test_metrics_doc_processor_valid_arrow_ipc() {
use std::sync::Arc as StdArc;
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;

use arrow::array::{
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
StructArray, UInt8Array, UInt64Array,
};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow::record_batch::RecordBatch;
let universe = Universe::with_accelerated_time();

let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
Expand All @@ -419,103 +411,7 @@ mod tests {
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
universe.spawn_builder().spawn(metrics_doc_processor);

// Create a test batch matching the metrics schema
let schema = ParquetSchema::new();
let num_rows = 3;

// Helper to create dictionary arrays
fn create_dict_array(values: &[&str]) -> ArrayRef {
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let string_array = StringArray::from(values.to_vec());
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
let keys: Vec<Option<i32>> = values
.iter()
.enumerate()
.map(|(i, v)| v.map(|_| i as i32))
.collect();
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
let string_array = StringArray::from(string_values);
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(vec![100u64, 101u64, 102u64]));
let start_timestamp_secs: ArrayRef =
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
let value: ArrayRef = StdArc::new(Float64Array::from(vec![42.0, 43.0, 44.0]));
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
let tag_datacenter: ArrayRef =
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);

// Create empty Variant (Struct with metadata and value BinaryView fields)
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array.clone() as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array.clone() as ArrayRef,
),
]));

let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);

let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array as ArrayRef,
),
]));

let batch = RecordBatch::try_new(
schema.arrow_schema().clone(),
vec![
metric_name,
metric_type,
metric_unit,
timestamp_secs,
start_timestamp_secs,
value,
tag_service,
tag_env,
tag_datacenter,
tag_region,
tag_host,
attributes,
service_name,
resource_attributes,
],
)
.unwrap();

// Serialize to Arrow IPC
let batch = create_test_batch_with_tags(3, &["service"]);
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();

// Create RawDocBatch with the IPC bytes
Expand Down Expand Up @@ -624,13 +520,8 @@ mod tests {
async fn test_metrics_doc_processor_with_indexer() {
use std::sync::Arc as StdArc;

use arrow::array::{
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
StructArray, UInt8Array, UInt64Array,
};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow::record_batch::RecordBatch;
use quickwit_parquet_engine::storage::{ParquetSplitWriter, ParquetWriterConfig};
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;
use quickwit_proto::metastore::MockMetastoreService;
use quickwit_storage::RamStorage;

Expand All @@ -657,9 +548,8 @@ mod tests {
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

// Create ParquetPackager
let parquet_schema = ParquetSchema::new();
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(parquet_schema, writer_config, temp_dir.path());
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path());
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);

Expand All @@ -681,104 +571,7 @@ mod tests {
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
universe.spawn_builder().spawn(metrics_doc_processor);

// Create a test batch
let schema = ParquetSchema::new();
let num_rows = 5;

fn create_dict_array(values: &[&str]) -> ArrayRef {
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let string_array = StringArray::from(values.to_vec());
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
let keys: Vec<Option<i32>> = values
.iter()
.enumerate()
.map(|(i, v)| v.map(|_| i as i32))
.collect();
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
let string_array = StringArray::from(string_values);
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
let timestamps: Vec<u64> = (0..num_rows).map(|i| 100 + i as u64).collect();
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(timestamps));
let start_timestamp_secs: ArrayRef =
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
let values: Vec<f64> = (0..num_rows).map(|i| 42.0 + i as f64).collect();
let value: ArrayRef = StdArc::new(Float64Array::from(values));
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
let tag_datacenter: ArrayRef =
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);

// Create empty Variant (Struct with metadata and value BinaryView fields)
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array.clone() as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array.clone() as ArrayRef,
),
]));

let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);

let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array as ArrayRef,
),
]));

let batch = RecordBatch::try_new(
schema.arrow_schema().clone(),
vec![
metric_name,
metric_type,
metric_unit,
timestamp_secs,
start_timestamp_secs,
value,
tag_service,
tag_env,
tag_datacenter,
tag_region,
tag_host,
attributes,
service_name,
resource_attributes,
],
)
.unwrap();

// Serialize to Arrow IPC
let batch = create_test_batch_with_tags(5, &["service"]);
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();

// Create RawDocBatch with force_commit to trigger split production
Expand Down
Loading
Loading