Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub mod types;
pub mod vector;

pub use crate::traits::*;
pub use crate::types::IndexSegment;
pub use crate::types::{IndexSegment, StagingIndexShard, VectorIndexSegmentPlan};

pub const INDEX_FILE_NAME: &str = "index.idx";
/// The name of the auxiliary index file.
Expand Down
23 changes: 23 additions & 0 deletions rust/lance-index/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub trait IndexDescription: Send + Sync {
#[async_trait]
pub trait DatasetIndexExt {
type IndexBuilder<'a>
where
Self: 'a;
type IndexSegmentBuilder<'a>
where
Self: 'a;

Expand All @@ -145,6 +148,21 @@ pub trait DatasetIndexExt {
params: &'a dyn IndexParams,
) -> Self::IndexBuilder<'a>;

/// Create a builder for materializing final index segments from staging shards.
///
/// The staging UUID identifies a directory containing previously-built shard
/// outputs. The caller must provide the shard contract with
/// [`crate::StagingIndexShard`] so the planner knows which fragments each
/// shard covers.
///
/// This is the canonical entry point for distributed vector index finalize.
/// After materializing the final physical segments, publish them as a
/// logical index with [`Self::commit_existing_index_segments`].
fn create_index_segment_builder<'a>(
&'a self,
staging_index_uuid: String,
) -> Self::IndexSegmentBuilder<'a>;

/// Create indices on columns.
///
/// Upon finish, a new dataset version is generated.
Expand Down Expand Up @@ -275,6 +293,11 @@ pub trait DatasetIndexExt {
async fn index_statistics(&self, index_name: &str) -> Result<String>;

/// Commit one or more existing physical index segments as a logical index.
///
/// This publishes already-materialized physical segments. It does not build
/// or merge index data; callers should first materialize final segments with
/// [`Self::create_index_segment_builder`] or another index-specific build
/// path and then pass the resulting segments here.
async fn commit_existing_index_segments(
&mut self,
index_name: &str,
Expand Down
98 changes: 98 additions & 0 deletions rust/lance-index/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::sync::Arc;

use crate::IndexType;
use roaring::RoaringBitmap;
use uuid::Uuid;

Expand Down Expand Up @@ -73,3 +74,100 @@ impl IndexSegment {
)
}
}

/// Coordinator-provided metadata for one staging shard.
///
/// During distributed index build, each worker writes one shard under a shared
/// staging root. The coordinator is responsible for tracking which fragments
/// were assigned to each shard and the approximate shard size used for segment
/// grouping.
#[derive(Debug, Clone, PartialEq)]
pub struct StagingIndexShard {
uuid: Uuid,
fragment_bitmap: RoaringBitmap,
estimated_bytes: u64,
}

impl StagingIndexShard {
/// Create metadata for one staging shard.
pub fn new<I>(uuid: Uuid, fragment_bitmap: I, estimated_bytes: u64) -> Self
where
I: IntoIterator<Item = u32>,
{
Self {
uuid,
fragment_bitmap: fragment_bitmap.into_iter().collect(),
estimated_bytes,
}
}

/// Return the shard UUID.
pub fn uuid(&self) -> Uuid {
self.uuid
}

/// Return the fragment coverage of this shard.
pub fn fragment_bitmap(&self) -> &RoaringBitmap {
&self.fragment_bitmap
}

/// Return the approximate number of bytes represented by this shard.
pub fn estimated_bytes(&self) -> u64 {
self.estimated_bytes
}
}

/// A plan for materializing one final physical segment from one or more
/// vector index staging shard outputs.
#[derive(Debug, Clone, PartialEq)]
pub struct VectorIndexSegmentPlan {
staging_index_uuid: Uuid,
final_segment: IndexSegment,
partial_shards: Vec<StagingIndexShard>,
estimated_bytes: u64,
requested_index_type: Option<IndexType>,
}

impl VectorIndexSegmentPlan {
/// Create a plan for one final segment.
pub fn new(
staging_index_uuid: Uuid,
final_segment: IndexSegment,
partial_shards: Vec<StagingIndexShard>,
estimated_bytes: u64,
requested_index_type: Option<IndexType>,
) -> Self {
Self {
staging_index_uuid,
final_segment,
partial_shards,
estimated_bytes,
requested_index_type,
}
}

/// Return the staging index UUID that owns the partial shard outputs.
pub fn staging_index_uuid(&self) -> Uuid {
self.staging_index_uuid
}

/// Return the final segment metadata that should be committed.
pub fn final_segment(&self) -> &IndexSegment {
&self.final_segment
}

/// Return the shard metadata that should be combined into the final segment.
pub fn partial_shards(&self) -> &[StagingIndexShard] {
&self.partial_shards
}

/// Return the estimated number of bytes covered by this plan.
pub fn estimated_bytes(&self) -> u64 {
self.estimated_bytes
}

/// Return the requested logical index type, if one was supplied to the planner.
pub fn requested_index_type(&self) -> Option<IndexType> {
self.requested_index_type
}
}
96 changes: 46 additions & 50 deletions rust/lance-index/src/vector/distributed/index_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,47 +608,21 @@ async fn read_shard_window_partitions(
/// Supports IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_FLAT, IVF_HNSW_PQ, IVF_HNSW_SQ storage types.
/// For PQ and SQ, this assumes all partial indices share the same quantizer/codebook
/// and distance type; it will reuse the first encountered metadata.
/// Merge the selected partial shard auxiliary files into `target_dir`.
///
/// Callers provide the exact shard auxiliary files that should participate in
/// the merge, which lets the segmented finalize path read directly from the
/// original staging root without first restaging `partial_*` directories under
/// a new target.
pub async fn merge_partial_vector_auxiliary_files(
object_store: &lance_io::object_store::ObjectStore,
index_dir: &object_store::path::Path,
aux_paths: &[object_store::path::Path],
target_dir: &object_store::path::Path,
) -> Result<()> {
let mut aux_paths: Vec<object_store::path::Path> = Vec::new();
let mut stream = object_store.list(Some(index_dir.clone()));
while let Some(item) = stream.next().await {
if let Ok(meta) = item
&& let Some(fname) = meta.location.filename()
&& fname == INDEX_AUXILIARY_FILE_NAME
{
// Check parent dir name starts with partial_
let parts: Vec<_> = meta.location.parts().collect();
if parts.len() >= 2 {
let pname = parts[parts.len() - 2].as_ref();
if pname.starts_with("partial_") {
aux_paths.push(meta.location.clone());
}
}
}
}

if aux_paths.is_empty() {
// If a unified auxiliary file already exists at the root, no merge is required.
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
if object_store.exists(&aux_out).await.unwrap_or(false) {
log::warn!(
"No partial_* auxiliary files found under index dir: {}, but unified auxiliary file already exists; skipping merge",
index_dir
);
return Ok(());
}
// For certain index types (e.g., FLAT/HNSW-only) the merge may be a no-op in distributed setups
// where shards were committed directly. In such cases, proceed without error to avoid blocking
// index manifest merge. PQ/SQ variants still require merging artifacts and will be handled by
// downstream open logic if missing.
log::warn!(
"No partial_* auxiliary files found under index dir: {}; proceeding without merge for index types that do not require auxiliary shards",
index_dir
);
return Ok(());
return Err(Error::index(
"No partial auxiliary files were selected for merge".to_string(),
));
}

// Prepare IVF model and storage metadata aggregation
Expand All @@ -661,7 +635,7 @@ pub async fn merge_partial_vector_auxiliary_files(
let mut format_version: Option<LanceFileVersion> = None;

// Prepare output path; we'll create writer once when we know schema
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
let aux_out = target_dir.child(INDEX_AUXILIARY_FILE_NAME);

// We'll delay creating the V2 writer until we know the vector schema (dim and quantizer type)
let mut v2w_opt: Option<V2Writer> = None;
Expand All @@ -682,7 +656,7 @@ pub async fn merge_partial_vector_auxiliary_files(
let mut shard_infos: Vec<ShardInfo> = Vec::new();

// Iterate over each shard auxiliary file and merge its metadata and collect lengths
for aux in &aux_paths {
for aux in aux_paths {
let fh = sched.open_file(aux, &CachedFileSize::unknown()).await?;
let reader = V2Reader::try_open(
fh,
Expand Down Expand Up @@ -1417,9 +1391,13 @@ mod tests {
.await
.unwrap();

merge_partial_vector_auxiliary_files(&object_store, &index_dir)
.await
.unwrap();
merge_partial_vector_auxiliary_files(
&object_store,
&[aux0.clone(), aux1.clone()],
&index_dir,
)
.await
.unwrap();

let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
assert!(object_store.exists(&aux_out).await.unwrap());
Expand Down Expand Up @@ -1515,7 +1493,12 @@ mod tests {
.await
.unwrap();

let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await;
let res = merge_partial_vector_auxiliary_files(
&object_store,
&[aux0.clone(), aux1.clone()],
&index_dir,
)
.await;
match res {
Err(Error::Index { message, .. }) => {
assert!(
Expand Down Expand Up @@ -1690,9 +1673,13 @@ mod tests {
.unwrap();

// Merge PQ auxiliary files.
merge_partial_vector_auxiliary_files(&object_store, &index_dir)
.await
.unwrap();
merge_partial_vector_auxiliary_files(
&object_store,
&[aux0.clone(), aux1.clone()],
&index_dir,
)
.await
.unwrap();

// 3) Unified auxiliary file exists.
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
Expand Down Expand Up @@ -1818,7 +1805,12 @@ mod tests {
.await
.unwrap();

let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await;
let res = merge_partial_vector_auxiliary_files(
&object_store,
&[aux0.clone(), aux1.clone()],
&index_dir,
)
.await;
match res {
Err(Error::Index { message, .. }) => {
assert!(
Expand Down Expand Up @@ -1893,9 +1885,13 @@ mod tests {
.unwrap();

// Merge must succeed and produce a unified auxiliary file.
merge_partial_vector_auxiliary_files(&object_store, &index_dir)
.await
.unwrap();
merge_partial_vector_auxiliary_files(
&object_store,
&[aux_a.clone(), aux_b.clone()],
&index_dir,
)
.await
.unwrap();

let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
assert!(object_store.exists(&aux_out).await.unwrap());
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/vector/sq/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{

pub const SQ_METADATA_KEY: &str = "lance:sq";

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ScalarQuantizationMetadata {
pub dim: usize,
pub num_bits: u16,
Expand Down
Loading
Loading