diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp index c7dc34b6bed4ec..c2f4c6295a18c0 100644 --- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp +++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp @@ -56,6 +56,7 @@ void register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact void register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_regr_union(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_retention(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& factory); @@ -109,6 +110,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_percentile_approx(instance); register_aggregate_function_window_funnel(instance); register_aggregate_function_window_funnel_old(instance); + register_aggregate_function_window_funnel_v2(instance); register_aggregate_function_regr_union(instance); register_aggregate_function_retention(instance); register_aggregate_function_orthogonal_bitmap(instance); diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp b/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp index 3ef25d7de1101d..fbd9fde900e00e 100644 --- a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp @@ -47,7 +47,8 @@ AggregateFunctionPtr create_aggregate_function_window_funnel(const std::string& } void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& factory) { - factory.register_function_both("window_funnel", create_aggregate_function_window_funnel); + factory.register_function_both("window_funnel_v1", create_aggregate_function_window_funnel); + factory.register_alias("window_funnel_v1", "window_funnel"); } void register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& factory) { BeExecVersionManager::registe_restrict_function_compatibility("window_funnel"); diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.h b/be/src/exprs/aggregate/aggregate_function_window_funnel.h index 9908c8c8808d78..4e739474ad1993 100644 --- a/be/src/exprs/aggregate/aggregate_function_window_funnel.h +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel.h @@ -56,7 +56,7 @@ namespace doris { enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED, INCREASE }; -WindowFunnelMode string_to_window_funnel_mode(const String& string) { +inline WindowFunnelMode string_to_window_funnel_mode(const String& string) { if (string == "default") { return WindowFunnelMode::DEFAULT; } else if (string == "deduplication") { diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp new file mode 100644 index 00000000000000..de4017664753f9 --- /dev/null +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/aggregate/aggregate_function_window_funnel_v2.h" + +#include + +#include "common/logging.h" +#include "core/data_type/data_type.h" +#include "core/types.h" +#include "exprs/aggregate/aggregate_function_simple_factory.h" +#include "exprs/aggregate/helpers.h" + +namespace doris { +#include "common/compile_check_begin.h" + +AggregateFunctionPtr create_aggregate_function_window_funnel_v2(const std::string& name, + const DataTypes& argument_types, + const DataTypePtr& result_type, + const bool result_is_nullable, + const AggregateFunctionAttr& attr) { + if (argument_types.size() < 3) { + LOG(WARNING) << "window_funnel_v2's argument less than 3."; + return nullptr; + } + if (argument_types[2]->get_primitive_type() == TYPE_DATETIMEV2) { + return creator_without_type::create( + argument_types, result_is_nullable, attr); + } else { + LOG(WARNING) << "Only support DateTime type as window argument!"; + return nullptr; + } +} + +void register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& factory) { + factory.register_function_both("window_funnel_v2", create_aggregate_function_window_funnel_v2); +} + +} // namespace doris diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h new file mode 100644 index 00000000000000..37c34e1f0eb53c --- /dev/null +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h @@ -0,0 +1,441 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/cast_set.h" +#include "common/exception.h" +#include "common/status.h" +#include "core/assert_cast.h" +#include "core/column/column_string.h" +#include "core/data_type/data_type_number.h" +#include "core/types.h" +#include "core/value/vdatetime_value.h" +#include "exprs/aggregate/aggregate_function.h" +#include "exprs/aggregate/aggregate_function_window_funnel.h" // for WindowFunnelMode, string_to_window_funnel_mode +#include "util/var_int.h" + +namespace doris { +#include "common/compile_check_begin.h" +class Arena; +class BufferReadable; +class BufferWritable; +class IColumn; +} // namespace doris + +namespace doris { + +/// Merge two event lists, utilizing sorted flags to optimize. +/// After merge, all events are in `events_list` and it is sorted. +template +void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted, bool suffix_sorted) { + if (!prefix_sorted && !suffix_sorted) { + std::stable_sort(std::begin(events_list), std::end(events_list)); + } else { + const auto begin = std::begin(events_list); + const auto middle = std::next(begin, prefix_size); + const auto end = std::end(events_list); + + if (!prefix_sorted) { + std::stable_sort(begin, middle); + } + if (!suffix_sorted) { + std::stable_sort(middle, end); + } + std::inplace_merge(begin, middle, end); + } +} + +/// V2 state: stores only matched events as (timestamp, event_index) pairs. +/// Compared to V1 which stores all rows with N boolean columns, V2 only stores +/// events that actually match at least one condition, dramatically reducing memory. +struct WindowFunnelStateV2 { + /// (timestamp_int_val, 1-based event_index) + /// event_index 0 is unused in normal modes. + using TimestampEvent = std::pair; + + int event_count = 0; + int64_t window = 0; + WindowFunnelMode window_funnel_mode = WindowFunnelMode::INVALID; + bool sorted = true; + std::vector events_list; + + WindowFunnelStateV2() = default; + WindowFunnelStateV2(int arg_event_count) : event_count(arg_event_count) {} + + void reset() { + events_list.clear(); + sorted = true; + } + + void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) { + window = win; + window_funnel_mode = mode; + + // get_data() returns DateV2Value; convert to packed UInt64 + auto timestamp = assert_cast&>(*arg_columns[2]) + .get_data()[row_num] + .to_date_int_val(); + + // Iterate from last event to first (reverse order). + // This ensures that after stable_sort, events with the same timestamp + // appear in descending event_index order, which is important for correct + // matching when one row satisfies multiple conditions. + for (int i = event_count - 1; i >= 0; --i) { + auto event_val = + assert_cast(*arg_columns[3 + i]).get_data()[row_num]; + if (event_val) { + TimestampEvent new_event {timestamp, cast_set(i + 1)}; + if (sorted && !events_list.empty()) { + sorted = events_list.back() <= new_event; + } + events_list.emplace_back(new_event); + } + } + } + + void sort() { + if (!sorted) { + std::stable_sort(std::begin(events_list), std::end(events_list)); + sorted = true; + } + } + + void merge(const WindowFunnelStateV2& other) { + if (other.events_list.empty()) { + return; + } + + if (events_list.empty()) { + events_list = other.events_list; + sorted = other.sorted; + } else { + const auto prefix_size = events_list.size(); + events_list.insert(std::end(events_list), std::begin(other.events_list), + std::end(other.events_list)); + merge_events_list(events_list, prefix_size, sorted, other.sorted); + sorted = true; + } + + event_count = event_count > 0 ? event_count : other.event_count; + window = window > 0 ? window : other.window; + window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID + ? other.window_funnel_mode + : window_funnel_mode; + } + + void write(BufferWritable& out) const { + write_var_int(event_count, out); + write_var_int(window, out); + write_var_int(static_cast>(window_funnel_mode), + out); + write_var_int(sorted ? 1 : 0, out); + write_var_int(cast_set(events_list.size()), out); + for (const auto& [ts, idx] : events_list) { + // Use fixed-size binary write for timestamp (8 bytes) and event_idx (1 byte). + // This is more efficient than VarInt for these fixed-width values. + out.write(reinterpret_cast(&ts), sizeof(ts)); + out.write(reinterpret_cast(&idx), sizeof(idx)); + } + } + + void read(BufferReadable& in) { + Int64 tmp = 0; + read_var_int(tmp, in); + event_count = cast_set(tmp); + + read_var_int(window, in); + + read_var_int(tmp, in); + window_funnel_mode = static_cast(tmp); + + read_var_int(tmp, in); + sorted = (tmp != 0); + + Int64 size = 0; + read_var_int(size, in); + events_list.clear(); + events_list.resize(size); + for (Int64 i = 0; i < size; ++i) { + in.read(reinterpret_cast(&events_list[i].first), sizeof(events_list[i].first)); + in.read(reinterpret_cast(&events_list[i].second), sizeof(events_list[i].second)); + } + } + + using DateValueType = DateV2Value; + + /// Reconstruct DateV2Value from packed UInt64. + static DateValueType _ts_from_int(UInt64 packed) { return DateValueType(packed); } + + /// Check if `current_ts` is within `window` seconds of `base_ts`. + /// Both are packed UInt64 from DateV2Value::to_date_int_val(). + bool _within_window(UInt64 base_ts, UInt64 current_ts) const { + DateValueType end_ts = _ts_from_int(base_ts); + TimeInterval interval(SECOND, window, false); + end_ts.template date_add_interval(interval); + return current_ts <= end_ts.to_date_int_val(); + } + + /// Track (first_timestamp, last_timestamp) for each event level in a chain. + /// Uses packed UInt64 values; 0 means unset. + struct TimestampPair { + UInt64 first_ts = 0; + UInt64 last_ts = 0; + bool has_value() const { return first_ts != 0; } + void reset() { + first_ts = 0; + last_ts = 0; + } + }; + + int get() const { + if (event_count == 0 || events_list.empty()) { + return 0; + } + if (window < 0) { + throw Exception(ErrorCode::INVALID_ARGUMENT, + "the sliding time window must be a positive integer, but got: {}", + window); + } + + switch (window_funnel_mode) { + case WindowFunnelMode::DEFAULT: + return _get_default(false); + case WindowFunnelMode::INCREASE: + return _get_default(true); + case WindowFunnelMode::DEDUPLICATION: + return _get_deduplication(); + case WindowFunnelMode::FIXED: + return _get_fixed(); + default: + throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode"); + } + } + +private: + /// DEFAULT and INCREASE mode: O(N) single-pass algorithm. + /// Uses events_timestamp array to track the (first, last) timestamps for each level. + /// For each event in sorted order: + /// - If it's event 0, start a new potential chain + /// - If its predecessor level has been matched and within time window, extend the chain + int _get_default(bool increase_mode) const { + std::vector events_timestamp(event_count); + + for (const auto& [ts, raw_idx] : events_list) { + int event_idx = raw_idx - 1; + + if (event_idx == 0) { + events_timestamp[0] = {ts, ts}; + } else if (events_timestamp[event_idx - 1].has_value()) { + bool matched = _within_window(events_timestamp[event_idx - 1].first_ts, ts); + if (increase_mode) { + matched = matched && events_timestamp[event_idx - 1].last_ts < ts; + } + if (matched) { + events_timestamp[event_idx] = {events_timestamp[event_idx - 1].first_ts, ts}; + if (event_idx + 1 == event_count) { + return event_count; + } + } + } + } + + for (int event = event_count; event > 0; --event) { + if (events_timestamp[event - 1].has_value()) { + return event; + } + } + return 0; + } + + /// DEDUPLICATION mode: if a previously matched event level appears again, + /// the current chain is terminated and max_level is updated. + /// This preserves V1 semantics where duplicate events break the chain. + int _get_deduplication() const { + std::vector events_timestamp(event_count); + int max_level = -1; + int curr_level = -1; + + for (const auto& [ts, raw_idx] : events_list) { + int event_idx = raw_idx - 1; + + if (event_idx == 0) { + // Duplicate of event 0: terminate current chain first + if (events_timestamp[0].has_value()) { + if (curr_level > max_level) { + max_level = curr_level; + } + _eliminate_chain(curr_level, events_timestamp); + } + // Start a new chain + events_timestamp[0] = {ts, ts}; + curr_level = 0; + } else if (events_timestamp[event_idx].has_value()) { + // Duplicate event detected: this level was already matched + if (curr_level > max_level) { + max_level = curr_level; + } + // Eliminate current chain + _eliminate_chain(curr_level, events_timestamp); + } else if (events_timestamp[event_idx - 1].has_value()) { + if (_promote_level(events_timestamp, ts, event_idx, curr_level, false)) { + return event_count; + } + } + } + + if (curr_level > max_level) { + return curr_level + 1; + } + return max_level + 1; + } + + /// FIXED mode (StarRocks-style semantics): if a matched event appears whose + /// predecessor level has NOT been matched, the chain is broken (event level jumped). + /// Note: V2 semantics differ from V1. V1 checks physical row adjacency; + /// V2 checks event level continuity (unmatched rows don't break the chain). + int _get_fixed() const { + std::vector events_timestamp(event_count); + int max_level = -1; + int curr_level = -1; + bool first_event = false; + + for (const auto& [ts, raw_idx] : events_list) { + int event_idx = raw_idx - 1; + + if (event_idx == 0) { + // Save current chain before starting a new one + if (events_timestamp[0].has_value()) { + if (curr_level > max_level) { + max_level = curr_level; + } + _eliminate_chain(curr_level, events_timestamp); + } + events_timestamp[0] = {ts, ts}; + curr_level = 0; + first_event = true; + } else if (first_event && !events_timestamp[event_idx - 1].has_value()) { + // Event level jumped: predecessor was not matched + if (curr_level >= 0) { + if (curr_level > max_level) { + max_level = curr_level; + } + _eliminate_chain(curr_level, events_timestamp); + } + } else if (events_timestamp[event_idx - 1].has_value()) { + if (_promote_level(events_timestamp, ts, event_idx, curr_level, false)) { + return event_count; + } + } + } + + if (curr_level > max_level) { + return curr_level + 1; + } + return max_level + 1; + } + + /// Clear the current event chain back to the beginning. + static void _eliminate_chain(int& curr_level, std::vector& events_timestamp) { + for (; curr_level >= 0; --curr_level) { + events_timestamp[curr_level].reset(); + } + } + + /// Try to promote the chain to the next level. + /// Returns true if we've matched all events (early termination). + bool _promote_level(std::vector& events_timestamp, UInt64 ts, int event_idx, + int& curr_level, bool increase_mode) const { + bool matched = _within_window(events_timestamp[event_idx - 1].first_ts, ts); + if (increase_mode) { + matched = matched && events_timestamp[event_idx - 1].last_ts < ts; + } + if (matched) { + events_timestamp[event_idx] = {events_timestamp[event_idx - 1].first_ts, ts}; + if (event_idx > curr_level) { + curr_level = event_idx; + } + if (event_idx + 1 == event_count) { + return true; + } + } + return false; + } +}; + +class AggregateFunctionWindowFunnelV2 + : public IAggregateFunctionDataHelper, + MultiExpression, + NullableAggregateFunction { +public: + AggregateFunctionWindowFunnelV2(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper( + argument_types_) {} + + void create(AggregateDataPtr __restrict place) const override { + new (place) WindowFunnelStateV2( + cast_set(IAggregateFunction::get_argument_types().size() - 3)); + } + + String get_name() const override { return "window_funnel_v2"; } + + DataTypePtr get_return_type() const override { return std::make_shared(); } + + void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } + + void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, + Arena&) const override { + const auto& win = assert_cast(*columns[0]).get_data()[row_num]; + StringRef mode = columns[1]->get_data_at(row_num); + this->data(place).add(columns, row_num, win, + string_to_window_funnel_mode(mode.to_string())); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, + Arena&) const override { + this->data(place).merge(this->data(rhs)); + } + + void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { + this->data(place).write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, + Arena&) const override { + this->data(place).read(buf); + } + + void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { + this->data(const_cast(place)).sort(); + assert_cast(to).get_data().push_back( + IAggregateFunctionDataHelper::data(place) + .get()); + } + +protected: + using IAggregateFunction::version; +}; + +} // namespace doris + +#include "common/compile_check_end.h" diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp new file mode 100644 index 00000000000000..fc3592bb322ba5 --- /dev/null +++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp @@ -0,0 +1,741 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include +#include + +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_date_or_datetime_v2.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "core/string_buffer.hpp" +#include "core/value/vdatetime_value.h" +#include "exprs/aggregate/aggregate_function.h" +#include "exprs/aggregate/aggregate_function_simple_factory.h" +#include "gtest/gtest_pred_impl.h" + +namespace doris { +class IColumn; +} // namespace doris + +namespace doris { + +void register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& factory); + +class VWindowFunnelV2Test : public testing::Test { +public: + AggregateFunctionPtr agg_function; + + VWindowFunnelV2Test() {} + + void SetUp() { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + DataTypes data_types = { + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared()}; + agg_function = factory.get("window_funnel_v2", data_types, nullptr, false, + BeExecVersionManager::get_newest_version()); + EXPECT_NE(agg_function, nullptr); + } + + void TearDown() {} + + Arena arena; +}; + +TEST_F(VWindowFunnelV2Test, testEmpty) { + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + + ColumnString buf; + VectorBufferWriter buf_writer(buf); + agg_function->serialize(place, buf_writer); + buf_writer.commit(); + LOG(INFO) << "buf size : " << buf.size(); + VectorBufferReader buf_reader(buf.get_data_at(0)); + agg_function->deserialize(place, buf_reader, arena); + + std::unique_ptr memory2(new char[agg_function->size_of_data()]); + AggregateDataPtr place2 = memory2.get(); + agg_function->create(place2); + + agg_function->merge(place, place2, arena); + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], 0); + + ColumnInt32 column_result2; + agg_function->insert_result_into(place2, column_result2); + EXPECT_EQ(column_result2.get_data()[0], 0); + + agg_function->destroy(place); + agg_function->destroy(place2); +} + +TEST_F(VWindowFunnelV2Test, testSerialize) { + const int NUM_CONDS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_mode->insert(Field::create_field("default")); + } + + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_CONDS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_window->insert(Field::create_field(2)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_CONDS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], 3); + + ColumnString buf; + VectorBufferWriter buf_writer(buf); + agg_function->serialize(place, buf_writer); + buf_writer.commit(); + agg_function->destroy(place); + + std::unique_ptr memory2(new char[agg_function->size_of_data()]); + AggregateDataPtr place2 = memory2.get(); + agg_function->create(place2); + + VectorBufferReader buf_reader(buf.get_data_at(0)); + agg_function->deserialize(place2, buf_reader, arena); + + ColumnInt32 column_result2; + agg_function->insert_result_into(place2, column_result2); + EXPECT_EQ(column_result2.get_data()[0], 3); + agg_function->destroy(place2); +} + +TEST_F(VWindowFunnelV2Test, testDefaultSortedNoMerge) { + const int NUM_CONDS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_mode->insert(Field::create_field("default")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_CONDS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); + + for (int win = 0; win < NUM_CONDS + 1; win++) { + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_window->insert(Field::create_field(win)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), + column_timestamp.get(), column_event1.get(), + column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_CONDS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], + win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS)); + agg_function->destroy(place); + } +} + +TEST_F(VWindowFunnelV2Test, testDefaultSortedMerge) { + const int NUM_CONDS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_mode->insert(Field::create_field("default")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_CONDS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); + + for (int win = 0; win < NUM_CONDS + 1; win++) { + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_window->insert(Field::create_field(win)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), + column_timestamp.get(), column_event1.get(), + column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_CONDS; i++) { + agg_function->add(place, column, i, arena); + } + + std::unique_ptr memory2(new char[agg_function->size_of_data()]); + AggregateDataPtr place2 = memory2.get(); + agg_function->create(place2); + + agg_function->merge(place2, place, arena); + ColumnInt32 column_result; + agg_function->insert_result_into(place2, column_result); + EXPECT_EQ(column_result.get_data()[0], + win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS)); + agg_function->destroy(place); + agg_function->destroy(place2); + } +} + +TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedNoMerge) { + const int NUM_CONDS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_mode->insert(Field::create_field("default")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_CONDS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(1)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(1)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + + for (int win = 0; win < NUM_CONDS + 1; win++) { + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_window->insert(Field::create_field(win)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), + column_timestamp.get(), column_event1.get(), + column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_CONDS; i++) { + agg_function->add(place, column, i, arena); + } + + LOG(INFO) << "win " << win; + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], + win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS)); + agg_function->destroy(place); + } +} + +TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedMerge) { + const int NUM_CONDS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_mode->insert(Field::create_field("default")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_CONDS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(1)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(1)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + + for (int win = 0; win < NUM_CONDS + 1; win++) { + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_window->insert(Field::create_field(win)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), + column_timestamp.get(), column_event1.get(), + column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_CONDS; i++) { + agg_function->add(place, column, i, arena); + } + + std::unique_ptr memory2(new char[agg_function->size_of_data()]); + AggregateDataPtr place2 = memory2.get(); + agg_function->create(place2); + + agg_function->merge(place2, place, arena); + ColumnInt32 column_result; + agg_function->insert_result_into(place2, column_result); + EXPECT_EQ(column_result.get_data()[0], + win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS)); + agg_function->destroy(place); + agg_function->destroy(place2); + } +} + +// Test that V2 only stores matched events (unmatched rows are not stored) +// This verifies the core memory optimization. +TEST_F(VWindowFunnelV2Test, testOnlyMatchedEventsStored) { + const int NUM_ROWS = 6; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("default")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_ROWS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + // 4 events, but rows 2 and 4 (0-indexed) match nothing + // Row 0: event1=true + // Row 1: event2=true + // Row 2: all false (no match) + // Row 3: event3=true + // Row 4: all false (no match) + // Row 5: event4=true + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(10)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + // All 4 events matched in order within the window + EXPECT_EQ(column_result.get_data()[0], 4); + agg_function->destroy(place); +} + +// Test INCREASE mode: timestamps must be strictly increasing +TEST_F(VWindowFunnelV2Test, testIncreaseMode) { + const int NUM_ROWS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("increase")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + // Events 2 and 3 have the same timestamp + VecDateTimeValue tv0, tv1, tv2, tv3; + tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0); + tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1); + tv2.unchecked_set_time(2022, 2, 28, 0, 0, 1); // same as tv1 + tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3); + auto dtv2_0 = tv0.to_datetime_v2(); + auto dtv2_1 = tv1.to_datetime_v2(); + auto dtv2_2 = tv2.to_datetime_v2(); + auto dtv2_3 = tv3.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2_0, 0); + column_timestamp->insert_data((char*)&dtv2_1, 0); + column_timestamp->insert_data((char*)&dtv2_2, 0); + column_timestamp->insert_data((char*)&dtv2_3, 0); + + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(10)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + // Event 2 and 3 have same timestamp, so increase mode breaks at event 3 + // Chain: event1(t=0) -> event2(t=1), event3 has same ts as event2, so fails + // Result: 2 + EXPECT_EQ(column_result.get_data()[0], 2); + agg_function->destroy(place); +} + +// Test DEDUPLICATION mode: duplicate events break the chain +TEST_F(VWindowFunnelV2Test, testDeduplicationMode) { + const int NUM_ROWS = 5; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("deduplication")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + VecDateTimeValue tv0, tv1, tv2, tv3, tv4; + tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0); + tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1); + tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2); + tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3); + tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4); + auto dtv2_0 = tv0.to_datetime_v2(); + auto dtv2_1 = tv1.to_datetime_v2(); + auto dtv2_2 = tv2.to_datetime_v2(); + auto dtv2_3 = tv3.to_datetime_v2(); + auto dtv2_4 = tv4.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2_0, 0); + column_timestamp->insert_data((char*)&dtv2_1, 0); + column_timestamp->insert_data((char*)&dtv2_2, 0); + column_timestamp->insert_data((char*)&dtv2_3, 0); + column_timestamp->insert_data((char*)&dtv2_4, 0); + + // Events: event1, event2, event1(dup!), event3, event4 + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(1)); // duplicate + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(10)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + // Chain: event1(t=0) -> event2(t=1), then event1 dup at t=2 breaks chain (max_level=2) + // New chain: event1(t=2) -> event3(t=3) -> event4(t=4): level=3 but event2 not matched + // Actually event1(t=2) starts new chain, event3(t=3) needs event2 first, so new chain = 1 + // max(2, 1) = 2 + EXPECT_EQ(column_result.get_data()[0], 2); + agg_function->destroy(place); +} + +// Test FIXED mode (StarRocks-style): event level must not jump +TEST_F(VWindowFunnelV2Test, testFixedMode) { + const int NUM_ROWS = 5; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("fixed")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + VecDateTimeValue tv0, tv1, tv2, tv3, tv4; + tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0); + tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1); + tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2); + tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3); + tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4); + auto dtv2_0 = tv0.to_datetime_v2(); + auto dtv2_1 = tv1.to_datetime_v2(); + auto dtv2_2 = tv2.to_datetime_v2(); + auto dtv2_3 = tv3.to_datetime_v2(); + auto dtv2_4 = tv4.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2_0, 0); + column_timestamp->insert_data((char*)&dtv2_1, 0); + column_timestamp->insert_data((char*)&dtv2_2, 0); + column_timestamp->insert_data((char*)&dtv2_3, 0); + column_timestamp->insert_data((char*)&dtv2_4, 0); + + // Events: event1, event2, event4(jump! skips event3), event3, event4 + // In V2 fixed mode (StarRocks-style), event4 at t=2 has no predecessor (event3 not matched), + // so the chain breaks. + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); // jump + column_event4->insert(Field::create_field(0)); + column_event4->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(10)); + } + + std::unique_ptr memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + // Chain: event1(t=0) -> event2(t=1), then event4(t=2) jumps (no event3 predecessor), + // chain breaks, max_level=2. + // No further complete chain starts since event3 and event4 happen after. + EXPECT_EQ(column_result.get_data()[0], 2); + agg_function->destroy(place); +} + +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java index 9a7ba40a709eaa..31cd3860270677 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java @@ -95,6 +95,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.Variance; import org.apache.doris.nereids.trees.expressions.functions.agg.VarianceSamp; import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnel; +import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -193,7 +194,8 @@ private BuiltinAggregateFunctions() { agg(TopNWeighted.class, "topn_weighted"), agg(Variance.class, "var_pop", "variance_pop", "variance"), agg(VarianceSamp.class, "var_samp", "variance_samp"), - agg(WindowFunnel.class, "window_funnel") + agg(WindowFunnel.class, "window_funnel_v1"), + agg(WindowFunnelV2.class, "window_funnel_v2", "window_funnel") ); ImmutableMap.Builder aggFuncNameNullableMapBuilder diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java index fe68663ff5ff43..848d3cfd6d57db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java @@ -65,7 +65,7 @@ public WindowFunnel(boolean distinct, Expression arg0, Expression arg1, Expressi public WindowFunnel(boolean distinct, boolean alwaysNullable, Expression arg0, Expression arg1, Expression arg2, Expression arg3, Expression... varArgs) { - super("window_funnel", distinct, alwaysNullable, + super("window_funnel_v1", distinct, alwaysNullable, ExpressionUtils.mergeArguments(arg0, arg1, arg2, arg3, varArgs)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java new file mode 100644 index 00000000000000..41f61c8111d75c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.agg; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.BigIntType; +import org.apache.doris.nereids.types.BooleanType; +import org.apache.doris.nereids.types.DateTimeV2Type; +import org.apache.doris.nereids.types.DateV2Type; +import org.apache.doris.nereids.types.IntegerType; +import org.apache.doris.nereids.types.StringType; +import org.apache.doris.nereids.util.ExpressionUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * AggregateFunction 'window_funnel_v2'. V2 implementation that only stores matched events + * as (timestamp, event_index) pairs, dramatically reducing memory usage compared to V1. + */ +public class WindowFunnelV2 extends NullableAggregateFunction + implements ExplicitlyCastableSignature { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(IntegerType.INSTANCE) + .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, DateTimeV2Type.WILDCARD, + BooleanType.INSTANCE) + + ); + + /** + * constructor with 4 or more arguments. + */ + public WindowFunnelV2(Expression arg0, Expression arg1, Expression arg2, Expression arg3, + Expression... varArgs) { + this(false, arg0, arg1, arg2, arg3, varArgs); + } + + /** + * constructor with 4 or more arguments. + */ + public WindowFunnelV2(boolean distinct, Expression arg0, Expression arg1, Expression arg2, + Expression arg3, Expression... varArgs) { + this(distinct, false, arg0, arg1, arg2, arg3, varArgs); + } + + public WindowFunnelV2(boolean distinct, boolean alwaysNullable, Expression arg0, Expression arg1, Expression arg2, + Expression arg3, Expression... varArgs) { + super("window_funnel_v2", distinct, alwaysNullable, + ExpressionUtils.mergeArguments(arg0, arg1, arg2, arg3, varArgs)); + } + + /** constructor for withChildren and reuse signature */ + private WindowFunnelV2(NullableAggregateFunctionParams functionParams) { + super(functionParams); + } + + @Override + public void checkLegalityBeforeTypeCoercion() { + String functionName = getName(); + if (!getArgumentType(0).isIntegerLikeType()) { + throw new AnalysisException("The window params of " + functionName + " function must be integer"); + } + if (!getArgumentType(1).isStringLikeType()) { + throw new AnalysisException("The mode params of " + functionName + " function must be string"); + } + if (!getArgumentType(2).isDateLikeType()) { + throw new AnalysisException("The 3rd param of " + functionName + " function must be DATE or DATETIME"); + } + for (int i = 3; i < arity(); i++) { + if (!getArgumentType(i).isBooleanType()) { + throw new AnalysisException("The 4th and subsequent params of " + + functionName + " function must be boolean"); + } + } + } + + @Override + public FunctionSignature computeSignature(FunctionSignature signature) { + FunctionSignature functionSignature = super.computeSignature(signature); + if (functionSignature.getArgType(2) instanceof DateV2Type) { + return functionSignature.withArgumentTypes(getArguments(), (index, originType, arg) -> + (index == 2) ? DateTimeV2Type.SYSTEM_DEFAULT : originType + ); + } + return functionSignature; + } + + /** + * withDistinctAndChildren. + */ + @Override + public WindowFunnelV2 withDistinctAndChildren(boolean distinct, List children) { + Preconditions.checkArgument(children.size() >= 4); + return new WindowFunnelV2(getFunctionParams(distinct, children)); + } + + @Override + public WindowFunnelV2 withAlwaysNullable(boolean alwaysNullable) { + return new WindowFunnelV2(getAlwaysNullableFunctionParams(alwaysNullable)); + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitWindowFunnelV2(this, context); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java index bb4be4ffffaff6..9bd117d4c091f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java @@ -93,6 +93,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.Variance; import org.apache.doris.nereids.trees.expressions.functions.agg.VarianceSamp; import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnel; +import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2; import org.apache.doris.nereids.trees.expressions.functions.combinator.ForEachCombinator; import org.apache.doris.nereids.trees.expressions.functions.combinator.MergeCombinator; import org.apache.doris.nereids.trees.expressions.functions.combinator.UnionCombinator; @@ -404,6 +405,10 @@ default R visitWindowFunnel(WindowFunnel windowFunnel, C context) { return visitNullableAggregateFunction(windowFunnel, context); } + default R visitWindowFunnelV2(WindowFunnelV2 windowFunnelV2, C context) { + return visitNullableAggregateFunction(windowFunnelV2, context); + } + default R visitMergeCombinator(MergeCombinator combinator, C context) { return visitAggregateFunction(combinator, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java index 3dd9c6ea890b30..a1e52175a62d2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java @@ -55,6 +55,7 @@ public class AggStateType extends DataType { .put("var_samp", "variance_samp") .put("hist", "histogram") .put("map_agg", "map_agg_v2") + .put("window_funnel", "window_funnel_v2") .build(); private final String functionName; diff --git a/regression-test/data/nereids_p0/aggregate/window_funnel.out b/regression-test/data/nereids_p0/aggregate/window_funnel.out index f9c24637999e77..2d11eb1e957c8f 100644 --- a/regression-test/data/nereids_p0/aggregate/window_funnel.out +++ b/regression-test/data/nereids_p0/aggregate/window_funnel.out @@ -30,7 +30,7 @@ 2 -- !window_funnel_neq -- -2 +3 -- !window_funnel_default0 -- 100123 4 @@ -51,22 +51,22 @@ 100127 1 -- !window_funnel_default3 -- -100123 1 -100125 1 -100126 1 -100127 1 +100123 5 +100125 4 +100126 3 +100127 3 -- !window_funnel_default4 -- -100123 2 -100125 2 -100126 2 -100127 2 +100123 5 +100125 4 +100126 3 +100127 3 -- !window_funnel_default5 -- -100123 1 -100125 1 -100126 1 -100127 1 +100123 7 +100125 7 +100126 7 +100127 7 -- !window_funnel_default6 -- 100123 4 @@ -75,10 +75,10 @@ 100127 2 -- !window_funnel_default7 -- -100123 2 -100125 2 -100126 1 -100127 1 +100123 4 +100125 3 +100126 2 +100127 2 -- !window_funnel_default8 -- 100123 4 @@ -93,13 +93,13 @@ 100127 2 -- !window_funnel_deduplication0 -- -100123 3 +100123 2 100125 3 100126 2 100127 2 -- !window_funnel_deduplication1 -- -100123 3 +100123 2 100125 3 100126 2 100127 2 @@ -117,7 +117,7 @@ 100127 2 -- !window_funnel_fixed1 -- -100123 2 +100123 4 100125 3 100126 2 100127 2 diff --git a/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out b/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out new file mode 100644 index 00000000000000..7b8346018726b7 --- /dev/null +++ b/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out @@ -0,0 +1,79 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !v2_default_small_window -- +1 + +-- !v2_default_large_window -- +2 + +-- !v2_datetimev2_small -- +1 + +-- !v2_datetimev2_large -- +2 + +-- !v2_multi_user_default0 -- +100123 4 +100125 3 +100126 2 +100127 2 + +-- !v2_multi_user_default1 -- +100123 3 +100125 3 +100126 2 +100127 2 + +-- !v2_multi_user_default2 -- +100123 1 +100125 1 +100126 1 +100127 1 + +-- !v2_default_neq -- +100123 4 +100125 3 +100126 2 +100127 2 + +-- !v2_default_complex -- +100123 4 +100125 2 +100126 0 +100127 1 + +-- !v2_deduplication0 -- +100123 2 +100125 3 +100126 2 +100127 2 + +-- !v2_deduplication1 -- +100123 2 +100125 3 +100126 2 +100127 2 + +-- !v2_fixed0 -- +100123 4 +100125 3 +100126 2 +100127 2 + +-- !v2_fixed_reorder -- +2 + +-- !v2_increase0 -- +100123 3 +100125 3 +100126 2 +100127 2 + +-- !v2_increase_same_ts -- +2 + +-- !v2_fixed_vs_v1 -- +100123 4 + +-- !v2_explicit_name -- +100123 4 + diff --git a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out index eb048bacbb63f4..3c3f674ce5f8df 100644 --- a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out +++ b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out @@ -807,11 +807,11 @@ shanxi windows 1 2 5 -- !agg_window_window_funnel -- -100123 2 -100123 2 -100123 2 -100123 2 -100123 2 +100123 4 +100123 4 +100123 4 +100123 4 +100123 4 100125 3 100125 3 100125 3 diff --git a/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy b/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy new file mode 100644 index 00000000000000..516327a72e2da9 --- /dev/null +++ b/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("window_funnel_v2") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + + // ==================== Basic DEFAULT mode tests ==================== + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE IF NOT EXISTS windowfunnel_v2_test ( + xwho varchar(50) NULL COMMENT 'xwho', + xwhen datetime COMMENT 'xwhen', + xwhat int NULL COMMENT 'xwhat' + ) + DUPLICATE KEY(xwho) + DISTRIBUTED BY HASH(xwho) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:41:00', 1)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02', 2)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 16:15:01', 3)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 19:05:04', 4)" + + // window=1 second, only event1 matches (events too far apart) + order_qt_v2_default_small_window """ + select + window_funnel( + 1, + 'default', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2 + ) AS level + from windowfunnel_v2_test t; + """ + // window=20000 seconds, both events match + order_qt_v2_default_large_window """ + select + window_funnel( + 20000, + 'default', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2 + ) AS level + from windowfunnel_v2_test t; + """ + + // ==================== DateTimeV2 precision test ==================== + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE IF NOT EXISTS windowfunnel_v2_test ( + xwho varchar(50) NULL COMMENT 'xwho', + xwhen datetimev2(3) COMMENT 'xwhen', + xwhat int NULL COMMENT 'xwhat' + ) + DUPLICATE KEY(xwho) + DISTRIBUTED BY HASH(xwho) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:41:00.111111', 1)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02.111111', 2)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 16:15:01.111111', 3)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 19:05:04.111111', 4)" + + order_qt_v2_datetimev2_small """ + select + window_funnel( + 1, + 'default', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2 + ) AS level + from windowfunnel_v2_test t; + """ + order_qt_v2_datetimev2_large """ + select + window_funnel( + 20000, + 'default', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2 + ) AS level + from windowfunnel_v2_test t; + """ + + // ==================== Multi-user default mode tests ==================== + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE windowfunnel_v2_test( + user_id BIGINT, + event_name VARCHAR(64), + event_timestamp datetime, + phone_brand varchar(64), + tab_num int + ) distributed by hash(user_id) buckets 3 properties("replication_num"="1"); + """ + sql """ + INSERT INTO windowfunnel_v2_test VALUES + (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1), + (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2), + (100123, '下单', '2022-05-14 10:04:00', "HONOR", 3), + (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4), + (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1), + (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2), + (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6), + (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1), + (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2), + (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1), + (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5); + """ + // 3 hour window + order_qt_v2_multi_user_default0 """ + SELECT + user_id, + window_funnel(3600 * 3, "default", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + // 5 minute window + order_qt_v2_multi_user_default1 """ + SELECT + user_id, + window_funnel(300, "default", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + // 30 second window + order_qt_v2_multi_user_default2 """ + SELECT + user_id, + window_funnel(30, "default", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + // complicate expressions with != condition + order_qt_v2_default_neq """ + SELECT + user_id, + window_funnel(3600000000, "default", event_timestamp, event_name = '登录', event_name != '登陆', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id; + """ + // Complex filter conditions + order_qt_v2_default_complex """ + SELECT + user_id, + window_funnel(3600000000, "default", event_timestamp, + event_name = '登录' AND phone_brand in ('HONOR', 'XIAOMI', 'VIVO') AND tab_num not in (4, 5), + event_name = '访问' AND tab_num not in (4, 5), + event_name = '下单' AND tab_num not in (6, 7), + event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id; + """ + + // ==================== DEDUPLICATION mode tests ==================== + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE windowfunnel_v2_test( + user_id BIGINT, + event_name VARCHAR(64), + event_timestamp datetime, + phone_brand varchar(64), + tab_num int + ) distributed by hash(user_id) buckets 3 properties("replication_num"="1"); + """ + sql """ + INSERT INTO windowfunnel_v2_test VALUES + (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1), + (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2), + (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4), + (100123, '登录', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '登录2', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '登录3', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '登录4', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '登录5', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4), + (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1), + (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2), + (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6), + (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1), + (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2), + (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1), + (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5); + """ + order_qt_v2_deduplication0 """ + SELECT + user_id, + window_funnel(3600, "deduplication", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + + // Test dedup with duplicate event2 (访问) + sql """ truncate table windowfunnel_v2_test; """ + sql """ + INSERT INTO windowfunnel_v2_test VALUES + (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1), + (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2), + (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4), + (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '访问', '2022-05-14 10:04:00', 'HONOR', 3), + (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4), + (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1), + (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2), + (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6), + (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1), + (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2), + (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1), + (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5); + """ + order_qt_v2_deduplication1 """ + SELECT + user_id, + window_funnel(3600, "deduplication", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + + // ==================== FIXED mode tests (StarRocks-style semantics) ==================== + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE windowfunnel_v2_test( + user_id BIGINT, + event_name VARCHAR(64), + event_timestamp datetime, + phone_brand varchar(64), + tab_num int + ) distributed by hash(user_id) buckets 3 properties("replication_num"="1"); + """ + sql """ + INSERT INTO windowfunnel_v2_test VALUES + (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1), + (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2), + (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4), + (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4), + (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1), + (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2), + (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6), + (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1), + (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2), + (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1), + (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5); + """ + // Note: In V2 fixed mode (StarRocks-style), unmatched rows don't break the chain. + // The chain only breaks when a matched event's predecessor level hasn't been matched. + order_qt_v2_fixed0 """ + SELECT + user_id, + window_funnel(3600, "fixed", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + + // Test fixed mode where event order in conditions doesn't match data order + sql """ truncate table windowfunnel_v2_test; """ + sql """ + INSERT INTO windowfunnel_v2_test VALUES + (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1), + (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2), + (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4), + (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4); + """ + order_qt_v2_fixed_reorder """ + select + window_funnel( + 20000, + 'fixed', + t.event_timestamp, + t.event_name = '登录', + t.event_name = '访问', + t.event_name = '付款', + t.event_name = '下单' + ) AS level + from windowfunnel_v2_test t; + """ + + // ==================== INCREASE mode tests ==================== + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE windowfunnel_v2_test( + user_id BIGINT, + event_name VARCHAR(64), + event_timestamp datetime, + phone_brand varchar(64), + tab_num int + ) distributed by hash(user_id) buckets 3 properties("replication_num"="1"); + """ + sql """ + INSERT INTO windowfunnel_v2_test VALUES + (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1), + (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2), + (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4), + (100123, '付款', '2022-05-14 10:04:00', 'HONOR', 4), + (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1), + (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2), + (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6), + (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1), + (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2), + (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1), + (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5); + """ + order_qt_v2_increase0 """ + SELECT + user_id, + window_funnel(3600, "increase", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + + // Test increase mode with same-timestamp events + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE IF NOT EXISTS windowfunnel_v2_test ( + xwho varchar(50) NULL COMMENT 'xwho', + xwhen datetimev2(3) COMMENT 'xwhen', + xwhat int NULL COMMENT 'xwhat' + ) + DUPLICATE KEY(xwho) + DISTRIBUTED BY HASH(xwho) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:41:00.111111', 1)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02.111111', 2)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02.111111', 3)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 15:05:04.111111', 4)" + order_qt_v2_increase_same_ts """ + select + window_funnel( + 20000, + 'increase', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2, + t.xwhat = 3, + t.xwhat = 4 + ) AS level + from windowfunnel_v2_test t; + """ + + // ==================== V2 FIXED mode key difference from V1 ==================== + // In V1, unmatched rows (rows that match no event condition) break the chain in FIXED mode. + // In V2, unmatched rows are not stored, so only matched events with level jumps break the chain. + // This test shows the behavioral difference. + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE windowfunnel_v2_test( + user_id BIGINT, + event_name VARCHAR(64), + event_timestamp datetime, + phone_brand varchar(64), + tab_num int + ) distributed by hash(user_id) buckets 3 properties("replication_num"="1"); + """ + sql """ + INSERT INTO windowfunnel_v2_test VALUES + (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1), + (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2), + (100123, '登录2', '2022-05-14 10:03:00', 'HONOR', 3), + (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4), + (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4); + """ + // V2 fixed mode: 登录2 doesn't match any condition, so it's not stored. + // The chain 登录->访问->下单->付款 is unbroken because there are no level jumps. + // V1 would return 2 here (登录2 physically breaks adjacency), V2 returns 4. + order_qt_v2_fixed_vs_v1 """ + SELECT + user_id, + window_funnel(3600, "fixed", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + + // ==================== Test using window_funnel_v2 explicit name ==================== + order_qt_v2_explicit_name """ + SELECT + user_id, + window_funnel_v2(3600, "fixed", event_timestamp, event_name = '登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level + FROM windowfunnel_v2_test + GROUP BY user_id + order BY user_id + """ + + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ +}