From a5510240865233cb66e41ff28540a89d49848a09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 18 Mar 2026 21:21:16 +0100 Subject: [PATCH] Optimize primitive group-by intern with batch hashing, chunked processing, and local dedup - Use `with_hashes` for batch hash computation via thread-local buffer, separating hashing from hash table ops for better vectorization - Process 4 rows at a time via `chunks_exact(4)` with local dedup to reduce redundant hash table operations - Split lookup into find + insert_unique phases (lighter than `entry`) - Extract find_group, insert_new_group, get_or_create_null_group helpers to consolidate duplicated unsafe hash table logic with SAFETY comments - Separate null/no-null fast paths to eliminate validity checks when no nulls are present Co-Authored-By: Claude Opus 4.6 (1M context) --- .../group_values/single_group_by/primitive.rs | 242 +++++++++++++++--- 1 file changed, 211 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index 4686648fb1e3..0da7b061e3cf 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -18,9 +18,10 @@ use crate::aggregates::group_values::GroupValues; use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; use arrow::array::{ - ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, NullBufferBuilder, PrimitiveArray, - cast::AsArray, + Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, NullBufferBuilder, + PrimitiveArray, cast::AsArray, }; +use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, i256}; use datafusion_common::Result; use datafusion_common::hash_utils::RandomState; @@ -28,6 +29,7 @@ use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; use half::f16; use hashbrown::hash_table::HashTable; +#[cfg(not(feature = "force_hash_collisions"))] use std::hash::BuildHasher; use std::mem::size_of; use std::sync::Arc; @@ -109,46 +111,224 @@ impl GroupValuesPrimitive { } } +impl GroupValuesPrimitive +where + T::Native: HashValue, +{ + /// Find the group index for `key` if it already exists in the map. + #[inline(always)] + fn find_group( + map: &HashTable<(usize, u64)>, + values: &[T::Native], + key: T::Native, + hash: u64, + ) -> Option { + // SAFETY: `g` is always a valid index into `values` because it was set + // to `values.len()` at insertion time and values are never removed + // (only via emit, which also clears or adjusts the map). + map.find(hash, |&(g, h)| unsafe { + hash == h && values.get_unchecked(g).is_eq(key) + }) + .map(|&(g, _)| g) + } + + /// Insert a new group for `key` that is known not to exist yet. + #[inline(always)] + fn insert_new_group( + map: &mut HashTable<(usize, u64)>, + values: &mut Vec, + key: T::Native, + hash: u64, + ) -> usize { + let g = values.len(); + values.push(key); + map.insert_unique(hash, (g, hash), |&(_, h)| h); + g + } + + /// Find an existing group or insert a new one. + #[inline(always)] + fn lookup_or_insert( + map: &mut HashTable<(usize, u64)>, + values: &mut Vec, + key: T::Native, + hash: u64, + ) -> usize { + if let Some(g) = Self::find_group(map, values, key, hash) { + g + } else { + Self::insert_new_group(map, values, key, hash) + } + } + + /// Get or create the null group index. + #[inline(always)] + fn get_or_create_null_group( + null_group: &mut Option, + values: &mut Vec, + ) -> usize { + *null_group.get_or_insert_with(|| { + let g = values.len(); + values.push(Default::default()); + g + }) + } +} + impl GroupValues for GroupValuesPrimitive where T::Native: HashValue, { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { assert_eq!(cols.len(), 1); + let array = cols[0].as_primitive::(); + let len = array.len(); groups.clear(); + groups.reserve(len); + + if len == 0 { + return Ok(()); + } + + let values = array.values().as_ref(); + let nulls: Option<&NullBuffer> = array.nulls(); + + // Step 1: Batch compute all hashes using thread-local buffer + // (avoids per-call allocation and separates hashing from hash table ops + // for better vectorization / instruction pipelining) + datafusion_common::hash_utils::with_hashes(cols, &self.random_state, |hashes| { + // Step 2: Process in chunks of 4 for better ILP and local dedup + let value_chunks = values.chunks_exact(4); + let hash_chunks = hashes.chunks_exact(4); + let rem_len = value_chunks.remainder().len(); + + for (chunk_idx, (vs, hs)) in value_chunks.zip(hash_chunks).enumerate() { + let base = chunk_idx * 4; + let mut gids = [0usize; 4]; + + // Local dedup within the chunk: dedup[i] = index of first + // equivalent entry in this chunk (i itself if unique). + // Compare only values (not hashes) since for 4 elements + // the value comparison is cheap enough. + let mut dedup = [0u8, 1, 2, 3]; + + if let Some(nulls) = nulls { + let valid = [ + nulls.is_valid(base), + nulls.is_valid(base + 1), + nulls.is_valid(base + 2), + nulls.is_valid(base + 3), + ]; + + for i in 1..4 { + for j in 0..i { + if (!valid[i] && !valid[j]) + || (valid[i] && valid[j] && vs[i].is_eq(vs[j])) + { + dedup[i] = dedup[j]; + break; + } + } + } + + // Phase 1: Batch find - lookup all unique entries + let mut found = [None; 4]; + for i in 0..4 { + if dedup[i] as usize != i { + continue; + } + if !valid[i] { + found[i] = self.null_group; + } else { + found[i] = + Self::find_group(&self.map, &self.values, vs[i], hs[i]); + } + } + + // Phase 2: Insert entries not found + for i in 0..4 { + if dedup[i] as usize != i { + gids[i] = gids[dedup[i] as usize]; + continue; + } + if let Some(g) = found[i] { + gids[i] = g; + } else if !valid[i] { + gids[i] = Self::get_or_create_null_group( + &mut self.null_group, + &mut self.values, + ); + } else { + gids[i] = Self::insert_new_group( + &mut self.map, + &mut self.values, + vs[i], + hs[i], + ); + } + } + } else { + // Fast path: no nulls + for i in 1..4 { + for j in 0..i { + if vs[i].is_eq(vs[j]) { + dedup[i] = dedup[j]; + break; + } + } + } - for v in cols[0].as_primitive::() { - let group_id = match v { - None => *self.null_group.get_or_insert_with(|| { - let group_id = self.values.len(); - self.values.push(Default::default()); - group_id - }), - Some(key) => { - let state = &self.random_state; - let hash = key.hash(state); - let insert = self.map.entry( - hash, - |&(g, h)| unsafe { - hash == h && self.values.get_unchecked(g).is_eq(key) - }, - |&(_, h)| h, - ); - - match insert { - hashbrown::hash_table::Entry::Occupied(o) => o.get().0, - hashbrown::hash_table::Entry::Vacant(v) => { - let g = self.values.len(); - v.insert((g, hash)); - self.values.push(key); - g + // Phase 1: Batch find - lookup all unique entries + let mut found = [None; 4]; + for i in 0..4 { + if dedup[i] as usize != i { + continue; + } + found[i] = + Self::find_group(&self.map, &self.values, vs[i], hs[i]); + } + + // Phase 2: Insert entries not found + for i in 0..4 { + if dedup[i] as usize != i { + gids[i] = gids[dedup[i] as usize]; + } else if let Some(g) = found[i] { + gids[i] = g; + } else { + gids[i] = Self::insert_new_group( + &mut self.map, + &mut self.values, + vs[i], + hs[i], + ); } } } - }; - groups.push(group_id) - } - Ok(()) + + groups.extend_from_slice(&gids); + } + + // Handle remainder (0-3 elements) + let rem_start = len - rem_len; + for i in 0..rem_len { + let idx = rem_start + i; + let is_valid = nulls.is_none_or(|n: &NullBuffer| n.is_valid(idx)); + + let group_id = if !is_valid { + Self::get_or_create_null_group(&mut self.null_group, &mut self.values) + } else { + Self::lookup_or_insert( + &mut self.map, + &mut self.values, + values[idx], + hashes[idx], + ) + }; + groups.push(group_id); + } + + Ok(()) + }) } fn size(&self) -> usize {