Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/AggregateFunctions/AggregateFunctionSum.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, Agg
size_t to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin();

for (size_t i = from; i < to; ++i)
add(places[offsets[i]] + place_offset, &values, i + 1, arena);
if (places[offsets[i]])
add(places[offsets[i]] + place_offset, &values, i + 1, arena);
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
Expand Down
5 changes: 3 additions & 2 deletions src/AggregateFunctions/IAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,9 @@ class IAggregateFunctionHelper : public IAggregateFunction
auto offset_it = column_sparse.getIterator(row_begin);

for (size_t i = row_begin; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(places[offset_it.getCurrentRow()] + place_offset,
&values, offset_it.getValueIndex(), arena);
if (places[offset_it.getCurrentRow()])
static_cast<const Derived *>(this)->add(places[offset_it.getCurrentRow()] + place_offset,
&values, offset_it.getValueIndex(), arena);
}

void mergeBatch(
Expand Down
19 changes: 0 additions & 19 deletions src/AggregateFunctions/TimeSeries/AggregateFunctionLast2Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,25 +272,6 @@ class AggregateFunctionLast2Samples final :
{
}

void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const override
{
const auto & column_sparse = typeid_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
const auto & offsets = column_sparse.getOffsetsData();

size_t from = std::lower_bound(offsets.begin(), offsets.end(), row_begin) - offsets.begin();
size_t to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin();

for (size_t i = from; i < to; ++i)
add(places[offsets[i]] + place_offset, &values, i + 1, arena);
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
data(place).merge(data(rhs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,25 +358,6 @@ class AggregateFunctionTimeSeriesGroupArray final :
{
}

void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const override
{
const auto & column_sparse = typeid_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
const auto & offsets = column_sparse.getOffsetsData();

size_t from = std::lower_bound(offsets.begin(), offsets.end(), row_begin) - offsets.begin();
size_t to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin();

for (size_t i = from; i < to; ++i)
add(places[offsets[i]] + place_offset, &values, i + 1, arena);
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
data(place).merge(data(rhs), arena);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,25 +336,6 @@ class AggregateFunctionTimeseriesBase :
{
}

void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const override
{
const auto & column_sparse = typeid_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
const auto & offsets = column_sparse.getOffsetsData();

size_t from = std::lower_bound(offsets.begin(), offsets.end(), row_begin) - offsets.begin();
size_t to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin();

for (size_t i = from; i < to; ++i)
add(places[offsets[i]] + place_offset, &values, i + 1, arena);
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
auto & buckets = data(place)->buckets;
Expand Down
10 changes: 10 additions & 0 deletions tests/queries/0_stateless/03657_gby_overflow_any_sparse.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
0 0
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
14 changes: 14 additions & 0 deletions tests/queries/0_stateless/03657_gby_overflow_any_sparse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
DROP TABLE IF EXISTS 03657_gby_overflow;

CREATE TABLE 03657_gby_overflow(key UInt64, val UInt16) ENGINE = MergeTree ORDER BY tuple()
AS SELECT number, 0 from numbers(100000);

SELECT key, any(val) FROM 03657_gby_overflow GROUP BY key ORDER BY key LIMIT 10
SETTINGS group_by_overflow_mode = 'any',
max_rows_to_group_by = 100,
max_threads = 1,
max_block_size = 100,
group_by_two_level_threshold = 1000000000,
group_by_two_level_threshold_bytes = 1000000000;

DROP TABLE 03657_gby_overflow;
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE 03811_sparse_column_aggregation_with_sum(key UInt128, val UInt16) ENGINE = MergeTree ORDER BY tuple();

INSERT INTO 03811_sparse_column_aggregation_with_sum
SELECT number, number % 10000 = 0 FROM numbers(100000)
SETTINGS min_insert_block_size_rows = 1000,
max_block_size =1000,
max_threads = 2;

SELECT key, sum(val) AS c
FROM 03811_sparse_column_aggregation_with_sum
GROUP BY key
ORDER BY c DESC
LIMIT 100
FORMAT Null
SETTINGS group_by_overflow_mode = 'any',
max_rows_to_group_by = 100,
group_by_two_level_threshold_bytes = 1,
group_by_two_level_threshold = 1,
max_threads = 2;

DROP TABLE 03811_sparse_column_aggregation_with_sum;
Loading