Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions velox-docker-image/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
FROM ghcr.io/little-big-h/dps-docker-image:nightly
FROM ubuntu:22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update -y
RUN apt-get install curl wget git build-essential sudo cmake -y
RUN apt-get install lldb clang -y
RUN cd /usr/local && git clone --recursive https://github.com/facebookincubator/velox.git \
&& cd velox && git checkout 3020196b001130a9027b09d6b3b95385b90bc2cb && git submodule sync --recursive && git submodule update --init --recursive
RUN cd /usr/local/velox && ./scripts/setup-ubuntu.sh
RUN cd /usr/local/velox && make VELOX_BUILD_TESTING=OFF
RUN apt-get install openssh-server -y
RUN apt install nano emacs-nox -y
ADD ./velox.patch /usr/local/velox/
WORKDIR /usr/local/velox
RUN git apply velox.patch
RUN apt update && DEBIAN_FRONTEND=noninteractive apt install -yy uuid-dev libfmt-dev libspdlog-dev libopenblas-dev libfftw3-dev liblapacke-dev
RUN apt update && DEBIAN_FRONTEND=noninteractive apt install -yy uuid-dev libopenblas-dev libfftw3-dev liblapacke-dev
308 changes: 294 additions & 14 deletions velox-docker-image/velox.patch
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
diff --git a/scripts/setup-helper-functions.sh b/scripts/setup-helper-functions.sh
index 4f0a11e15..52c9180e9 100644
--- a/scripts/setup-helper-functions.sh
+++ b/scripts/setup-helper-functions.sh
@@ -163,6 +163,8 @@ function cmake_install {
cmake -Wno-dev -B"${BINARY_DIR}" \
-GNinja \
-DCMAKE_POSITION_INDEPENDENT_CODE=ON \
+ -DCMAKE_CXX_COMPILER=clang++ \
+ -DCMAKE_C_COMPILER=clang \
-DCMAKE_CXX_STANDARD=17 \
"${INSTALL_PREFIX+-DCMAKE_PREFIX_PATH=}${INSTALL_PREFIX-}" \
"${INSTALL_PREFIX+-DCMAKE_INSTALL_PREFIX=}${INSTALL_PREFIX-}" \
diff --git a/velox/dwio/dwrf/proto/CMakeLists.txt b/velox/dwio/dwrf/proto/CMakeLists.txt
index 02ff2c8d..0ae65562 100644
index 02ff2c8d7..0ae655626 100644
--- a/velox/dwio/dwrf/proto/CMakeLists.txt
+++ b/velox/dwio/dwrf/proto/CMakeLists.txt
@@ -24,8 +24,6 @@ foreach(PROTO ${PROTO_FILES})
Expand All @@ -22,11 +35,215 @@ index 02ff2c8d..0ae65562 100644
DEPENDS ${Protobuf_PROTOC_EXECUTABLE}
COMMENT "Running PROTO compiler"
VERBATIM)
diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp
index bfeb1cd6f..360a54b08 100644
--- a/velox/exec/HashBuild.cpp
+++ b/velox/exec/HashBuild.cpp
@@ -158,7 +158,8 @@ void HashBuild::setupTable() {
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
- pool());
+ pool(),
+ operatorCtx_->driverCtx()->queryConfig().hashAdaptivityEnabled());
} else {
// (Left) semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
@@ -178,7 +179,8 @@ void HashBuild::setupTable() {
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
- pool());
+ pool(),
+ operatorCtx_->driverCtx()->queryConfig().hashAdaptivityEnabled());
} else {
// Ignore null keys
table_ = HashTable<true>::createForJoin(
@@ -189,7 +191,8 @@ void HashBuild::setupTable() {
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
- pool());
+ pool(),
+ operatorCtx_->driverCtx()->queryConfig().hashAdaptivityEnabled());
}
}
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp
index 228358c35..37c7c2510 100644
--- a/velox/exec/HashProbe.cpp
+++ b/velox/exec/HashProbe.cpp
@@ -23,9 +23,6 @@ namespace facebook::velox::exec {

namespace {

-// Batch size used when iterating the row container.
-constexpr int kBatchSize = 1024;
-
// Returns the type for the hash table row. Build side keys first,
// then dependent build side columns.
RowTypePtr makeTableType(
@@ -996,10 +993,10 @@ void HashProbe::fillFilterInput(vector_size_t size) {
void HashProbe::prepareFilterRowsForNullAwareJoin(
vector_size_t numRows,
bool filterPropagateNulls) {
- VELOX_CHECK_LE(numRows, kBatchSize);
+ VELOX_CHECK_LE(numRows, outputBatchSize_);
if (filterTableInput_ == nullptr) {
- filterTableInput_ =
- BaseVector::create<RowVector>(filterInputType_, kBatchSize, pool());
+ filterTableInput_ = BaseVector::create<RowVector>(
+ filterInputType_, outputBatchSize_, pool());
}

if (filterPropagateNulls) {
@@ -1067,8 +1064,8 @@ void HashProbe::applyFilterOnTableRowsForNullAwareJoin(
}
auto* tableRows = table_->rows();
VELOX_CHECK(tableRows, "Should not move rows in hash joins");
- char* data[kBatchSize];
- while (auto numRows = iterator(data, kBatchSize)) {
+ char** data = new char*[outputBatchSize_];
+ while (auto numRows = iterator(data, outputBatchSize_)) {
filterTableInput_->resize(numRows);
filterTableInputRows_.resizeFill(numRows, true);
for (auto& projection : filterTableProjections_) {
@@ -1112,6 +1109,7 @@ void HashProbe::applyFilterOnTableRowsForNullAwareJoin(
}
});
}
+ delete[] data;
}

SelectivityVector HashProbe::evalFilterForNullAwareJoin(
diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp
index 44cd1e82e..e45a95a90 100644
--- a/velox/exec/HashTable.cpp
+++ b/velox/exec/HashTable.cpp
@@ -53,16 +53,23 @@ HashTable<ignoreNullKeys>::HashTable(
bool hasProbedFlag,
uint32_t minTableSizeForParallelJoinBuild,
memory::MemoryPool* pool,
- const std::shared_ptr<velox::HashStringAllocator>& stringArena)
+ const std::shared_ptr<velox::HashStringAllocator>& stringArena, bool hashAdaptivityEnabled)
: BaseHashTable(std::move(hashers)),
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
isJoinBuild_(isJoinBuild) {
std::vector<TypePtr> keys;
- for (auto& hasher : hashers_) {
- keys.push_back(hasher->type());
- if (!VectorHasher::typeKindSupportsValueIds(hasher->typeKind())) {
- hashMode_ = HashMode::kHash;
+ if (hashAdaptivityEnabled) {
+ for (auto& hasher : hashers_) {
+ keys.push_back(hasher->type());
+ if (!VectorHasher::typeKindSupportsValueIds(hasher->typeKind())) {
+ hashMode_ = HashMode::kHash;
+ }
}
+ } else {
+ for (auto& hasher : hashers_) {
+ keys.push_back(hasher->type());
+ }
+ hashMode_ = HashMode::kHash;
}

rows_ = std::make_unique<RowContainer>(
diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h
index eec394caf..f5fb64b9c 100644
--- a/velox/exec/HashTable.h
+++ b/velox/exec/HashTable.h
@@ -427,7 +427,8 @@ class HashTable : public BaseHashTable {
bool hasProbedFlag,
uint32_t minTableSizeForParallelJoinBuild,
memory::MemoryPool* pool,
- const std::shared_ptr<velox::HashStringAllocator>& stringArena = nullptr);
+ const std::shared_ptr<velox::HashStringAllocator>& stringArena = nullptr,
+ bool hashAdaptivityEnabled = true);

static std::unique_ptr<HashTable> createForAggregation(
std::vector<std::unique_ptr<VectorHasher>>&& hashers,
@@ -453,7 +454,8 @@ class HashTable : public BaseHashTable {
bool allowDuplicates,
bool hasProbedFlag,
uint32_t minTableSizeForParallelJoinBuild,
- memory::MemoryPool* pool) {
+ memory::MemoryPool* pool,
+ bool hashAdaptivityEnabled = true) {
return std::make_unique<HashTable>(
std::move(hashers),
std::vector<Accumulator>{},
@@ -462,7 +464,9 @@ class HashTable : public BaseHashTable {
true, // isJoinBuild
hasProbedFlag,
minTableSizeForParallelJoinBuild,
- pool);
+ pool,
+ nullptr,
+ hashAdaptivityEnabled);
}

void groupProbe(HashLookup& lookup) override;
diff --git a/velox/exec/Task.h b/velox/exec/Task.h
index cb4a8507f..f4012b6be 100644
--- a/velox/exec/Task.h
+++ b/velox/exec/Task.h
@@ -618,6 +618,10 @@ class Task : public std::enable_shared_from_this<Task> {
terminate(TaskState::kFinished).wait();
}

+ auto const& childPools() const {
+ return childPools_;
+ }
+
private:
Task(
const std::string& taskId,
diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp
index bd35ea322..8b90f1784 100644
--- a/velox/expression/Expr.cpp
+++ b/velox/expression/Expr.cpp
@@ -1109,7 +1109,7 @@ bool Expr::removeSureNulls(
continue;
}

- if (values->mayHaveNulls()) {
+ /*if (values->mayHaveNulls()) {
LocalDecodedVector decoded(context, *values, rows);
if (auto* rawNulls = decoded->nulls()) {
if (!result) {
@@ -1118,7 +1118,7 @@ bool Expr::removeSureNulls(
auto bits = result->asMutableRange().bits();
bits::andBits(bits, rawNulls, rows.begin(), rows.end());
}
- }
+ }*/
}
if (result) {
result->updateBounds();
@@ -1144,7 +1144,7 @@ void Expr::evalWithNulls(
return;
}

- if (propagatesNulls_ && !skipFieldDependentOptimizations()) {
+ if (false/*propagatesNulls_ && !skipFieldDependentOptimizations()*/) {
bool mayHaveNulls = false;
for (auto* field : distinctFields_) {
const auto& vector = context.getField(field->index(context));
@@ -1199,7 +1199,7 @@ void Expr::evalWithMemo(
}
++baseOfDictionaryRepeats_;

- if (baseOfDictionaryRepeats_ == 1) {
+ if (true/*baseOfDictionaryRepeats_ == 1*/) {
evalWithNulls(rows, context, result);
baseOfDictionary_ = base;
dictionaryCache_ = result;
diff --git a/velox/functions/prestosql/Arithmetic.h b/velox/functions/prestosql/Arithmetic.h
index 36d449ce..1948374d 100644
index 05d427bf1..4539d65b6 100644
--- a/velox/functions/prestosql/Arithmetic.h
+++ b/velox/functions/prestosql/Arithmetic.h
@@ -67,6 +67,17 @@ struct MultiplyFunction {
@@ -107,6 +107,17 @@ struct IntervalMultiplyFunction {
}
};

Expand All @@ -44,8 +261,42 @@ index 36d449ce..1948374d 100644
template <typename T>
struct DivideFunction {
template <typename TInput>
@@ -123,6 +134,33 @@ struct DivideFunction {
}
};

+template <typename T>
+struct MixedDivideFunction {
+ FOLLY_ALWAYS_INLINE void
+ call(double& result, const double& a, const int64_t& b)
+// depend on compiler have correct behaviour for divide by zero
+#if defined(__has_feature)
+#if __has_feature(__address_sanitizer__)
+ __attribute__((__no_sanitize__("float-divide-by-zero")))
+#endif
+#endif
+ {
+ result = a / b;
+ }
+
+ FOLLY_ALWAYS_INLINE void
+ call(double& result, const int64_t& a, const double& b)
+// depend on compiler have correct behaviour for divide by zero
+#if defined(__has_feature)
+#if __has_feature(__address_sanitizer__)
+ __attribute__((__no_sanitize__("float-divide-by-zero")))
+#endif
+#endif
+ {
+ result = a / b;
+ }
+};
+
template <typename T>
struct IntervalDivideFunction {
FOLLY_ALWAYS_INLINE void call(int64_t& result, int64_t a, double b)
diff --git a/velox/functions/prestosql/ArithmeticImpl.h b/velox/functions/prestosql/ArithmeticImpl.h
index 9b4d1ae1..f87df490 100644
index 11fe000d7..44bbe9fb3 100644
--- a/velox/functions/prestosql/ArithmeticImpl.h
+++ b/velox/functions/prestosql/ArithmeticImpl.h
@@ -86,6 +86,17 @@ T multiply(const T a, const T b)
Expand All @@ -66,18 +317,47 @@ index 9b4d1ae1..f87df490 100644
// This is used by Velox for floating points divide.
template <typename T>
T divide(const T& a, const T& b)
@@ -99,6 +110,18 @@ T divide(const T& a, const T& b)
return result;
}

+template <typename T, typename U>
+T divide(const T& a, const U& b)
+#if defined(__has_feature)
+#if __has_feature(__address_sanitizer__)
+ __attribute__((__no_sanitize__("float-divide-by-zero")))
+#endif
+#endif
+{
+ T result = a / b;
+ return result;
+}
+
// This is used by Velox for floating points modulus.
template <typename T>
T modulus(const T a, const T b) {
diff --git a/velox/functions/prestosql/registration/ArithmeticFunctionsRegistration.cpp b/velox/functions/prestosql/registration/ArithmeticFunctionsRegistration.cpp
index 652367e5..6af1897a 100644
index b04695d11..d629967f5 100644
--- a/velox/functions/prestosql/registration/ArithmeticFunctionsRegistration.cpp
+++ b/velox/functions/prestosql/registration/ArithmeticFunctionsRegistration.cpp
@@ -26,6 +26,10 @@ void registerSimpleFunctions() {
registerBinaryFloatingPoint<PlusFunction>({"plus"});
registerBinaryFloatingPoint<MinusFunction>({"minus"});
registerBinaryFloatingPoint<MultiplyFunction>({"multiply"});
@@ -51,12 +51,20 @@ void registerSimpleFunctions(const std::string& prefix) {
IntervalDayTime,
double,
IntervalDayTime>({prefix + "multiply"});
+ registerFunction<MixedMultiplyFunction, double, double, int64_t>(
+ {"multiply"});
+ {prefix + "multiply"});
+ registerFunction<MixedMultiplyFunction, double, int64_t, double>(
+ {"multiply"});
registerBinaryFloatingPoint<DivideFunction>({"divide"});
registerBinaryFloatingPoint<ModulusFunction>({"mod"});
registerUnaryNumeric<CeilFunction>({"ceil", "ceiling"});
+ {prefix + "multiply"});
registerBinaryFloatingPoint<DivideFunction>({prefix + "divide"});
registerFunction<
IntervalDivideFunction,
IntervalDayTime,
IntervalDayTime,
double>({prefix + "divide"});
+ registerFunction<MixedDivideFunction, double, double, int64_t>(
+ {prefix + "divide"});
+ registerFunction<MixedDivideFunction, double, int64_t, double>(
+ {prefix + "divide"});
registerBinaryFloatingPoint<ModulusFunction>({prefix + "mod"});
registerUnaryNumeric<CeilFunction>({prefix + "ceil", prefix + "ceiling"});
registerUnaryNumeric<FloorFunction>({prefix + "floor"});