-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-38558: [C++] Add support for null sort option per sort key #46926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Taepper
wants to merge
85
commits into
apache:main
Choose a base branch
from
Taepper:GH-38558
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,826
−697
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1.Reconstruct the SortKey structure and add NullPlacement. 2.Remove NullPlacement from SortOptions 3.Fix selectk not displaying non-empty results in null AtEnd scenario. When limit k is greater than the actual table data and the table contains Null/NaN, the data cannot be obtained and only non-empty results are available. Therefore, we support returning non-null and supporting the order of setting Null for each SortKey. 4.Add relevant unit tests and change the interface implemented by multiple versions
…8558 # Conflicts: # c_glib/arrow-glib/compute.cpp # c_glib/arrow-glib/compute.h # cpp/src/arrow/compute/kernels/vector_rank.cc # cpp/src/arrow/compute/kernels/vector_select_k.cc # cpp/src/arrow/compute/kernels/vector_sort.cc # cpp/src/arrow/compute/kernels/vector_sort_internal.h # python/pyarrow/_acero.pyx # python/pyarrow/_compute.pyx # python/pyarrow/array.pxi # python/pyarrow/tests/test_compute.py # python/pyarrow/tests/test_table.py
# Conflicts: # cpp/src/arrow/compute/api_vector.cc # cpp/src/arrow/compute/api_vector.h # cpp/src/arrow/compute/kernels/vector_rank.cc # cpp/src/arrow/compute/kernels/vector_select_k.cc # cpp/src/arrow/compute/kernels/vector_sort.cc # cpp/src/arrow/compute/kernels/vector_sort_internal.h # cpp/src/arrow/compute/kernels/vector_sort_test.cc # cpp/src/arrow/compute/ordering.cc # cpp/src/arrow/compute/ordering.h
…most likely human-error while merging)
diff --git c/cpp/src/arrow/compute/kernels/vector_select_k.cc i/cpp/src/arrow/compute/kernels/vector_select_k.cc index 8c14abd..ed1a89b 100644 --- c/cpp/src/arrow/compute/kernels/vector_select_k.cc +++ i/cpp/src/arrow/compute/kernels/vector_select_k.cc @@ -16,6 +16,7 @@ // under the License. #include <algorithm> +#include <queue> #include <span> #include "arrow/compute/function.h" @@ -82,8 +83,9 @@ struct OutputRangesByNullLikeness { }; OutputRangesByNullLikeness calculateNumberNonNullAndNullLikesToTake( - int64_t non_null_like_count, int64_t nan_count, int64_t null_count, int64_t k, - NullPlacement null_placement, uint64_t* output_begin) { + int64_t non_null_like_count, int64_t nan_count, int64_t null_count, + NullPlacement null_placement, std::span<uint64_t> output_indices) { + int64_t k = output_indices.size(); int64_t non_null_like_to_take = 0; int64_t nan_to_take = 0; int64_t null_to_take = 0; @@ -91,38 +93,31 @@ OutputRangesByNullLikeness calculateNumberNonNullAndNullLikesToTake( non_null_like_to_take = std::min(k, non_null_like_count); nan_to_take = std::min(k - non_null_like_to_take, nan_count); null_to_take = std::min(k - non_null_like_to_take - nan_to_take, null_count); - // TODO.TAE make this prettier return OutputRangesByNullLikeness{ - .non_null_like_output = {output_begin, output_begin + non_null_like_to_take}, - .nan_output = {output_begin + non_null_like_to_take, - output_begin + non_null_like_to_take + nan_to_take}, - .null_output = { - output_begin + non_null_like_to_take + nan_to_take, - output_begin + non_null_like_to_take + nan_to_take + null_to_take}}; + .non_null_like_output = output_indices.subspan(0, non_null_like_to_take), + .nan_output = output_indices.subspan(non_null_like_to_take, nan_to_take), + .null_output = + output_indices.subspan(non_null_like_to_take + nan_to_take, null_to_take)}; } else { null_to_take = std::min(k, null_count); nan_to_take = std::min(k - null_to_take, nan_count); non_null_like_to_take = std::min(k - null_to_take - nan_to_take, non_null_like_count); - // TODO.TAE make this prettier return OutputRangesByNullLikeness{ - .non_null_like_output = {output_begin + null_to_take + nan_to_take, - output_begin + null_to_take + nan_to_take + - non_null_like_to_take}, - .nan_output = {output_begin + null_to_take, - output_begin + null_to_take + nan_to_take}, - .null_output = {output_begin, output_begin + null_to_take}}; + .non_null_like_output = + output_indices.subspan(null_to_take + nan_to_take, non_null_like_to_take), + .nan_output = output_indices.subspan(null_to_take, nan_to_take), + .null_output = output_indices.subspan(0, null_to_take)}; } } template <typename Comparator> void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range, Comparator cmp, std::span<uint64_t> output_range) { - std::span<uint64_t> heap{non_null_input_range.begin(), - non_null_input_range.begin() + output_range.size()}; + std::span<uint64_t> heap = non_null_input_range.subspan(0, output_range.size()); std::make_heap(heap.begin(), heap.end(), cmp); - for (auto iter = non_null_input_range.begin() + output_range.size(); - iter != non_null_input_range.end(); ++iter) { - uint64_t x_index = *iter; + + std::span<uint64_t> remaining_input = non_null_input_range.subspan(output_range.size()); + for (uint64_t x_index : remaining_input) { if (cmp(x_index, heap.front())) { std::pop_heap(heap.begin(), heap.end(), cmp); heap.back() = x_index; @@ -132,14 +127,12 @@ void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range, Comparat // fill output in reverse when destructing, // as the "worst" (next-to-would-have-been-replaced) element is at heap-top - // TODO.TAE remove these &* - uint64_t* heap_begin = &*heap.begin(); - uint64_t* heap_end = &*heap.begin() + output_range.size(); for (auto reverse_out_iter = output_range.rbegin(); reverse_out_iter != output_range.rend(); reverse_out_iter++) { - *reverse_out_iter = *heap_begin; // heap-top has the next element - std::pop_heap(heap_begin, heap_end, cmp); - --heap_end; + *reverse_out_iter = heap.front(); // heap-top has the next element + std::ranges::pop_heap(heap, cmp); + // Decrease heap-size by one + heap = heap.first(heap.size() - 1); } } @@ -158,7 +151,6 @@ void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range, } template <typename InType> -// TODO.TAE Could merge l and output into one span now void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range, const typename TypeTraits<InType>::ArrayType& arr, SortOrder order, std::span<uint64_t> output_range) { @@ -171,6 +163,28 @@ void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range, } } +struct NullNanPartitionResult { + std::span<uint64_t> non_null_like_range; + std::span<uint64_t> null_range; + std::span<uint64_t> nan_range; +}; + +template <typename ArrayType, typename Partitioner> +NullNanPartitionResult PartitionNullsAndNans(uint64_t* indices_begin, + uint64_t* indices_end, + const ArrayType& values, int64_t offset, + NullPlacement null_placement) { + // Partition nulls at start (resp. end), and null-like values just before (resp. after) + NullPartitionResult p = PartitionNullsOnly<Partitioner>(indices_begin, indices_end, + values, offset, null_placement); + NullPartitionResult q = PartitionNullLikes<ArrayType, Partitioner>( + p.non_nulls_begin, p.non_nulls_end, values, offset, null_placement); + return NullNanPartitionResult{ + .non_null_like_range = {q.non_nulls_begin, q.non_nulls_end}, + .null_range = {p.nulls_begin, p.nulls_end}, + .nan_range = {q.nulls_begin, q.nulls_end}}; +} + class ArraySelector : public TypeVisitor { public: ArraySelector(ExecContext* ctx, const Array& array, const SelectKOptions& options, @@ -220,25 +234,22 @@ class ArraySelector : public TypeVisitor { auto* output = take_indices->template GetMutableValues<uint64_t>(1); // From k, calculate - // l = non-null elements to take from PartitionResult - // m = null-like elements to take from PartitionResult - // k = l + m if enough elements in input + // l = non_null_like elements to take from PartitionResult + // m = nan elements to take from PartitionResult + // n = null elements to take from PartitionResult + // k = l + m + n because k was clipped to arr.length() + // And directly compute the ranges in {output, output+k} where we will need to place + // the selected elements from each group -> no longer need to track null_placement auto output_ranges = calculateNumberNonNullAndNullLikesToTake( static_cast<int64_t>(p.non_nulls_end - p.non_nulls_begin), - 0, // TODO.TAE it would be okay to consider these equal, but better not? - static_cast<int64_t>(p.nulls_end - p.nulls_begin), k_, null_placement_, output); + 0, // TODO.TAE it would be okay to consider null/nan equal, but better not? + static_cast<int64_t>(p.nulls_end - p.nulls_begin), null_placement_, + {output, output + k_}); - if (null_placement_ == NullPlacement::AtEnd) { - HeapSortNonNullsToOutput<InType, sort_order>( - {p.non_nulls_begin, p.non_nulls_end}, arr, output_ranges.non_null_like_output); - std::copy(p.nulls_begin, p.nulls_begin + output_ranges.null_output.size(), - output_ranges.null_output.begin()); - } else { - std::copy(p.nulls_begin, p.nulls_begin + output_ranges.null_output.size(), - output_ranges.null_output.begin()); - HeapSortNonNullsToOutput<InType, sort_order>( - {p.non_nulls_begin, p.non_nulls_end}, arr, output_ranges.non_null_like_output); - } + HeapSortNonNullsToOutput<InType, sort_order>({p.non_nulls_begin, p.non_nulls_end}, + arr, output_ranges.non_null_like_output); + std::copy(p.nulls_begin, p.nulls_begin + output_ranges.null_output.size(), + output_ranges.null_output.begin()); *output_ = Datum(take_indices); return Status::OK(); @@ -290,25 +301,24 @@ class ChunkedArraySelector : public TypeVisitor { VISIT_SORTABLE_PHYSICAL_TYPES(VISIT) #undef VISIT - // template<typename InType> - // int64_t ComputeNanCount(){ - // using GetView = GetViewType<InType>; - // using ArrayType = typename TypeTraits<InType>::ArrayType; - // if constexpr (has_null_like_values<typename ArrayType::TypeClass>()) { - // int64_t nan_count = 0; - // for (const auto& chunk : physical_chunks_) { - // auto values = std::make_shared<ArrayType>(chunk->data()); - // int64_t length = values->length(); - // for(int64_t index = 0; index < length; ++index){ - // if(std::isnan(values->GetView(index))){ - // nan_count++; - // } - // } - // } - // return nan_count; - // } - // return 0; - // } + template <typename InType> + int64_t ComputeNanCount() { + using ArrayType = typename TypeTraits<InType>::ArrayType; + if constexpr (has_null_like_values<typename ArrayType::TypeClass>()) { + int64_t nan_count = 0; + for (const auto& chunk : physical_chunks_) { + auto values = std::make_shared<ArrayType>(chunk->data()); + int64_t length = values->length(); + for (int64_t index = 0; index < length; ++index) { + if (std::isnan(values->GetView(index))) { + nan_count++; + } + } + } + return nan_count; + } + return 0; + } template <typename InType, SortOrder sort_order> Status SelectKthInternal() { @@ -323,14 +333,28 @@ class ChunkedArraySelector : public TypeVisitor { if (k_ > chunked_array_.length()) { k_ = chunked_array_.length(); } - // int64_t null_count = chunked_array_.null_count(); - // int64_t nan_count = ComputeNanCount<InType>(); - // TODO.TAE int64_t non_null_like_count = chunked_array_.length() - null_count - - // nan_count; + + ARROW_ASSIGN_OR_RAISE(auto take_indices, + MakeMutableUInt64Array(k_, ctx_->memory_pool())); + auto* output_begin = take_indices->GetMutableValues<uint64_t>(1); + + int64_t null_count = chunked_array_.null_count(); + int64_t nan_count = ComputeNanCount<InType>(); + int64_t non_null_like_count = chunked_array_.length() - null_count - nan_count; + + auto output = calculateNumberNonNullAndNullLikesToTake( + non_null_like_count, nan_count, null_count, null_placement_, + {output_begin, output_begin + k_}); + + // Now we can independently fill the output with non_null, nan and null items. + // For non_null, we do a heap_sort, the others can just be copied until + // nan_taken = output.nan_range.size() and + // null_taken = output.null_range.size() respectively + size_t nan_taken = 0; + size_t null_taken = 0; std::function<bool(const HeapItem&, const HeapItem&)> cmp; SelectKComparator<sort_order> comparator; - cmp = [&comparator](const HeapItem& left, const HeapItem& right) -> bool { const auto lval = GetView::LogicalValue(left.array->GetView(left.index)); const auto rval = GetView::LogicalValue(right.array->GetView(right.index)); @@ -338,9 +362,9 @@ class ChunkedArraySelector : public TypeVisitor { }; using HeapContainer = std::priority_queue<HeapItem, std::vector<HeapItem>, decltype(cmp)>; - HeapContainer heap(cmp); std::vector<std::shared_ptr<ArrayType>> chunks_holder; + uint64_t offset = 0; for (const auto& chunk : physical_chunks_) { if (chunk->length() == 0) continue; @@ -352,12 +376,29 @@ class ChunkedArraySelector : public TypeVisitor { uint64_t* indices_end = indices_begin + indices.size(); std::iota(indices_begin, indices_end, 0); - auto kth_begin = std::min(indices_begin + k_, indices_end); - uint64_t* iter = indices_begin; - for (; iter != kth_begin && heap.size() < static_cast<size_t>(k_); ++iter) { + const auto p = PartitionNullsAndNans<ArrayType, NonStablePartitioner>( + indices_begin, indices_end, arr, 0, null_placement_); + + // First do nulls and nans + auto iter = p.null_range.begin(); + for (; iter != p.null_range.end() && null_taken < output.null_output.size(); + ++iter) { + output.null_output[null_taken] = offset + *iter; + null_taken++; + } + iter = p.nan_range.begin(); + for (; iter != p.nan_range.end() && nan_taken < output.nan_output.size(); ++iter) { + output.nan_output[nan_taken] = offset + *iter; + nan_taken++; + } + + iter = p.non_null_like_range.begin(); + for (; iter != p.non_null_like_range.end() && + heap.size() < output.non_null_like_output.size(); + ++iter) { heap.push(HeapItem{*iter, offset, &arr}); } - for (; iter != indices_end && !heap.empty(); ++iter) { + for (; iter != p.non_null_like_range.end() && !heap.empty(); ++iter) { uint64_t x_index = *iter; const auto& xval = GetView::LogicalValue(arr.GetView(x_index)); auto top_item = heap.top(); @@ -371,16 +412,17 @@ class ChunkedArraySelector : public TypeVisitor { offset += chunk->length(); } - auto out_size = static_cast<int64_t>(heap.size()); - ARROW_ASSIGN_OR_RAISE(auto take_indices, - MakeMutableUInt64Array(out_size, ctx_->memory_pool())); - auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size - 1; - while (heap.size() > 0) { - auto top_item = heap.top(); - *out_cbegin = top_item.index + top_item.offset; + // We sized output.non_null_like_output to hold exactly sufficient indices, + // so the heap must have been completely filled + assert(heap.size() == output.non_null_like_output.size()); + + for (auto reverse_out_iter = output.non_null_like_output.rbegin(); + reverse_out_iter != output.non_null_like_output.rend(); reverse_out_iter++) { + *reverse_out_iter = + heap.top().index + heap.top().offset; // heap-top has the next element heap.pop(); - --out_cbegin; } + *output_ = Datum(take_indices); return Status::OK(); } @@ -395,28 +437,6 @@ class ChunkedArraySelector : public TypeVisitor { Datum* output_; }; -struct NullNanPartitionResult { - std::span<uint64_t> non_null_like_range; - std::span<uint64_t> null_range; - std::span<uint64_t> nan_range; -}; - -template <typename ArrayType, typename Partitioner> -NullNanPartitionResult PartitionNullsAndNans(uint64_t* indices_begin, - uint64_t* indices_end, - const ArrayType& values, int64_t offset, - NullPlacement null_placement) { - // Partition nulls at start (resp. end), and null-like values just before (resp. after) - NullPartitionResult p = PartitionNullsOnly<Partitioner>(indices_begin, indices_end, - values, offset, null_placement); - NullPartitionResult q = PartitionNullLikes<ArrayType, Partitioner>( - p.non_nulls_begin, p.non_nulls_end, values, offset, null_placement); - return NullNanPartitionResult{ - .non_null_like_range = {q.non_nulls_begin, q.non_nulls_end}, - .null_range = {p.nulls_begin, p.nulls_end}, - .nan_range = {q.nulls_begin, q.nulls_end}}; -} - class RecordBatchSelector { private: using ResolvedSortKey = ResolvedRecordBatchSortKey; @@ -455,13 +475,11 @@ class RecordBatchSelector { class SelectKForKey : public TypeVisitor { public: SelectKForKey(RecordBatchSelector* selector, size_t start_sort_key_index, - std::span<uint64_t> input_indices, int64_t k_remaining, - uint64_t* output_indices) + std::span<uint64_t> input_indices, std::span<uint64_t> output_indices) : TypeVisitor(), selector_(selector), start_sort_key_index_(start_sort_key_index), input_indices_(input_indices), - k_remaining_(k_remaining), output_indices_(output_indices) {} private: @@ -471,23 +489,24 @@ class RecordBatchSelector { const auto& first_remaining_sort_key = selector_->sort_keys_[start_sort_key_index_]; const auto& arr = checked_cast<const ArrayType&>(first_remaining_sort_key.array); - // TODO.TAE uhh this could be prettier - uint64_t* input_indices_begin = &*input_indices_.begin(); - uint64_t* input_indices_end = input_indices_begin + input_indices_.size(); + uint64_t* input_indices_begin = input_indices_.data(); + uint64_t* input_indices_end = input_indices_.data() + input_indices_.size(); const auto p = PartitionNullsAndNans<ArrayType, NonStablePartitioner>( input_indices_begin, input_indices_end, arr, 0, first_remaining_sort_key.null_placement); - // From k, calculate + // From k = output_range.size(), calculate // l = non_null elements to take from PartitionResult - // m = null elements to take from PartitionResult - // k = l + m because k was clipped to num_rows() - + // m = nan elements to take from PartitionResult + // n = null elements to take from PartitionResult + // k = l + m + n because k was clipped to num_rows() + // And directly compute the ranges in output_indices_ where we will need to place + // the selected elements from each group -> no longer need to track null_placement auto output_ranges = calculateNumberNonNullAndNullLikesToTake( static_cast<int64_t>(p.non_null_like_range.size()), static_cast<int64_t>(p.nan_range.size()), - static_cast<int64_t>(p.null_range.size()), k_remaining_, + static_cast<int64_t>(p.null_range.size()), first_remaining_sort_key.null_placement, output_indices_); bool last_sort_key = start_sort_key_index_ + 1 == selector_->sort_keys_.size(); @@ -496,7 +515,6 @@ class RecordBatchSelector { if (!output_ranges.non_null_like_output.empty()) { HeapSortNonNullsToOutput<InType>(p.non_null_like_range, arr, first_remaining_sort_key.order, - // TODO.TAE remove this &* output_ranges.non_null_like_output); } if (output_ranges.nan_output.size() > 0) { @@ -521,15 +539,11 @@ class RecordBatchSelector { } if (output_ranges.nan_output.size() > 0) { ARROW_RETURN_NOT_OK(selector_->DoSelectKForKey( - start_sort_key_index_ + 1, p.nan_range, output_ranges.nan_output.size(), - // TODO.TAE remove this &* - &*output_ranges.nan_output.begin())); + start_sort_key_index_ + 1, p.nan_range, output_ranges.nan_output)); } if (output_ranges.null_output.size() > 0) { ARROW_RETURN_NOT_OK(selector_->DoSelectKForKey( - start_sort_key_index_ + 1, p.null_range, output_ranges.null_output.size(), - // TODO.TAE remove this &* - &*output_ranges.null_output.begin())); + start_sort_key_index_ + 1, p.null_range, output_ranges.null_output)); } } @@ -545,14 +559,12 @@ class RecordBatchSelector { RecordBatchSelector* selector_; size_t start_sort_key_index_; std::span<uint64_t> input_indices_; - int64_t k_remaining_; - uint64_t* output_indices_; + std::span<uint64_t> output_indices_; }; Status DoSelectKForKey(size_t start_sort_key_index, std::span<uint64_t> input_indices, - int64_t k_remaining, uint64_t* output_indices) { - SelectKForKey tmp(this, start_sort_key_index, input_indices, k_remaining, - output_indices); + std::span<uint64_t> output_indices) { + SelectKForKey tmp(this, start_sort_key_index, input_indices, output_indices); return sort_keys_.at(start_sort_key_index).type->Accept(&tmp); } @@ -571,7 +583,8 @@ class RecordBatchSelector { auto* output_indices = take_indices->template GetMutableValues<uint64_t>(1); std::span<uint64_t> input_indices_span(input_indices); - ARROW_RETURN_NOT_OK(DoSelectKForKey(0, input_indices_span, k_, output_indices)); + ARROW_RETURN_NOT_OK( + DoSelectKForKey(0, input_indices_span, {output_indices, output_indices + k_})); *output_ = Datum(take_indices); return arrow::Status::OK(); }
Author
|
@pitrou |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
awaiting change review
Awaiting change review
Component: C++
Component: GLib
Component: Python
Component: Ruby
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
See #38584 for original PR. Will be quoted for this PR description.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
This PR includes breaking changes to public APIs. (If there are any breaking changes to public APIs, please explain which changes are breaking. If not, you can remove this.)
I amended the original PR to be less breaking in public APIs.
Still Ordering, SortOptions, RankOptions, and RankQuantileOptions now accept a
std::optional<NullPlacement>instead of NullPlacement, which did lead to some changes in downstream APIs and bindings.I also need some help with fixing thec_glibbindings.