Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cpp/src/arrow/util/hashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,16 @@ class ScalarMemoTable : public MemoTable {
// Merge entries from `other_table` into `this->hash_table_`.
Status MergeTable(const ScalarMemoTable& other_table) {
const HashTableType& other_hashtable = other_table.hash_table_;
Status status = Status::OK();

other_hashtable.VisitEntries([this](const HashTableEntry* other_entry) {
other_hashtable.VisitEntries([this, &status](const HashTableEntry* other_entry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have VisitEntries(...) propagate callback failures, so this can just be RETURN_NOT_OK(other_hashtable.VisitEntries(...))? That would avoid the manual Status status plumbing here. If changing VisitEntries is too broad, a status-aware variant/helper could work too.

if (ARROW_PREDICT_FALSE(!status.ok())) {
return;
}
int32_t unused;
ARROW_DCHECK_OK(this->GetOrInsert(other_entry->payload.value, &unused));
status = this->GetOrInsert(other_entry->payload.value, &unused);
});
// TODO: ARROW-17074 - implement proper error handling
return Status::OK();
return status;
}
};

Expand Down Expand Up @@ -899,11 +902,15 @@ class BinaryMemoTable : public MemoTable {

public:
Status MergeTable(const BinaryMemoTable& other_table) {
other_table.VisitValues(0, [this](std::string_view other_value) {
Status status = Status::OK();
other_table.VisitValues(0, [this, &status](std::string_view other_value) {
if (ARROW_PREDICT_FALSE(!status.ok())) {
return;
}
int32_t unused;
ARROW_DCHECK_OK(this->GetOrInsert(other_value, &unused));
status = this->GetOrInsert(other_value, &unused);
});
return Status::OK();
return status;
}
};

Expand Down
27 changes: 27 additions & 0 deletions cpp/src/arrow/util/hashing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "arrow/array/builder_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/memory_pool.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/hashing.h"
Expand Down Expand Up @@ -376,6 +377,32 @@ TEST(ScalarMemoTable, StressInt64) {
ASSERT_EQ(table.size(), map.size());
}

TEST(ScalarMemoTable, MergeTablePropagatesInsertError) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR changes both ScalarMemoTable::MergeTable() and BinaryMemoTable::MergeTable(), could we add a matching regression test for the binary path too? Right now the new coverage only protects the scalar merge case.

int64_t bytes_allocated_limit = 0;
{
ProxyMemoryPool probe(default_memory_pool());
ScalarMemoTable<int64_t> target(&probe, 0);
for (int64_t value = 0; value < 15; ++value) {
AssertGetOrInsert(target, value, static_cast<int32_t>(value));
}
bytes_allocated_limit = probe.bytes_allocated();
}
ASSERT_GT(bytes_allocated_limit, 0);

ScalarMemoTable<int64_t> source(default_memory_pool(), 0);
AssertGetOrInsert(source, 15, 0);

ProxyMemoryPool proxy(default_memory_pool());
CappedMemoryPool pool(&proxy, bytes_allocated_limit);
ScalarMemoTable<int64_t> target(&pool, 0);
for (int64_t value = 0; value < 15; ++value) {
AssertGetOrInsert(target, value, static_cast<int32_t>(value));
}
ASSERT_EQ(proxy.bytes_allocated(), bytes_allocated_limit);

ASSERT_RAISES(OutOfMemory, target.MergeTable(source));
}

TEST(BinaryMemoTable, Basics) {
std::string A = "", B = "a", C = "foo", D = "bar", E, F;
E += '\0';
Expand Down
Loading