diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 8f92038de6925..fb49e8221c2e8 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1366,6 +1366,22 @@ bitmap: `bitmap1 BITMAP, bitmap2 BITMAP` Returns a `BITMAP`. `NULL` if any of the arguments are `NULL`. + - sql: BITMAP_AND_AGG(bitmap) + table: bitmap.bitmapAndAgg() + description: | + Aggregates the AND (intersection) of multiple bitmaps. + + `bitmap BITMAP` + + Returns a `BITMAP`. + - sql: BITMAP_AND_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapAndCardinalityAgg() + description: | + Aggregates the AND (intersection) of multiple bitmaps and returns its 64-bit cardinality. + + `bitmap BITMAP` + + Returns a `BIGINT`. - sql: BITMAP_BUILD(array) table: array.bitmapBuild() description: | @@ -1374,6 +1390,22 @@ bitmap: `array ARRAY` Returns a `BITMAP`. `NULL` if the argument is `NULL`. + - sql: BITMAP_BUILD_AGG(value) + table: value.bitmapBuildAgg() + description: | + Aggregates 32-bit integers into a bitmap. + + `value INT` + + Returns a `BITMAP`. + - sql: BITMAP_BUILD_CARDINALITY_AGG(value) + table: value.bitmapBuildCardinalityAgg() + description: | + Aggregates 32-bit integers into a bitmap and returns its 64-bit cardinality. + + `value INT` + + Returns a `BIGINT`. - sql: BITMAP_CARDINALITY(bitmap) table: bitmap.bitmapCardinality() description: | @@ -1400,6 +1432,22 @@ bitmap: `bitmap1 BITMAP, bitmap2 BITMAP` Returns a `BITMAP`. `NULL` if any of the arguments are `NULL`. + - sql: BITMAP_OR_AGG(bitmap) + table: bitmap.bitmapOrAgg() + description: | + Aggregates the OR (union) of multiple bitmaps. + + `bitmap BITMAP` + + Returns a `BITMAP`. + - sql: BITMAP_OR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapOrCardinalityAgg() + description: | + Aggregates the OR (union) of multiple bitmaps and returns its 64-bit cardinality. + + `bitmap BITMAP` + + Returns a `BIGINT`. - sql: BITMAP_TO_ARRAY(bitmap) table: bitmap.bitmapToArray() description: | @@ -1440,6 +1488,22 @@ bitmap: `bitmap1 BITMAP, bitmap2 BITMAP` Returns a `BITMAP`. `NULL` if any of the arguments are `NULL`. + - sql: BITMAP_XOR_AGG(bitmap) + table: bitmap.bitmapXorAgg() + description: | + Aggregates the XOR (symmetric difference) of multiple bitmaps. + + `bitmap BITMAP` + + Returns a `BITMAP`. + - sql: BITMAP_XOR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapXorCardinalityAgg() + description: | + Aggregates the XOR (symmetric difference) of multiple bitmaps and returns its 64-bit cardinality. + + `bitmap BITMAP` + + Returns a `BIGINT`. auxilary: - table: callSql(STRING) diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index c28996064d7f7..aaf08dc5b0efe 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1439,7 +1439,7 @@ bitmap: - sql: BITMAP_AND(bitmap1, bitmap2) table: bitmap1.bitmapAnd(bitmap2) description: | - 计算两个位图的交集 (AND)。 + 计算两个位图的交集(AND)。 `bitmap1 BITMAP, bitmap2 BITMAP` @@ -1447,11 +1447,27 @@ bitmap: - sql: BITMAP_ANDNOT(bitmap1, bitmap2) table: bitmap1.bitmapAndnot(bitmap2) description: | - 计算两个位图的差集 (AND NOT)。 + 计算两个位图的差集(AND NOT)。 `bitmap1 BITMAP, bitmap2 BITMAP` 返回一个 `BITMAP`。如果任一参数为 `NULL`,则返回 `NULL`。 + - sql: BITMAP_AND_AGG(bitmap) + table: bitmap.bitmapAndAgg() + description: | + 聚合多个位图的交集(AND)。 + + `bitmap BITMAP` + + 返回一个 `BITMAP`。 + - sql: BITMAP_AND_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapAndCardinalityAgg() + description: | + 聚合多个位图的交集(AND)并返回其 64 位基数。 + + `bitmap BITMAP` + + 返回一个 `BIGINT`。 - sql: BITMAP_BUILD(array) table: array.bitmapBuild() description: | @@ -1460,6 +1476,22 @@ bitmap: `array ARRAY` 返回一个 `BITMAP`。如果参数为 `NULL`,则返回 `NULL`。 + - sql: BITMAP_BUILD_AGG(value) + table: value.bitmapBuildAgg() + description: | + 将 32 位整数聚合成位图。 + + `value INT` + + 返回一个 `BITMAP`。 + - sql: BITMAP_BUILD_CARDINALITY_AGG(value) + table: value.bitmapBuildCardinalityAgg() + description: | + 将 32 位整数聚合成位图并返回其 64 位基数。 + + `value INT` + + 返回一个 `BIGINT`。 - sql: BITMAP_CARDINALITY(bitmap) table: bitmap.bitmapCardinality() description: | @@ -1481,11 +1513,27 @@ bitmap: - sql: BITMAP_OR(bitmap1, bitmap2) table: bitmap1.bitmapOr(bitmap2) description: | - 计算两个位图的并集 (OR)。 + 计算两个位图的并集(OR)。 `bitmap1 BITMAP, bitmap2 BITMAP` 返回一个 `BITMAP`。如果任一参数为 `NULL`,则返回 `NULL`。 + - sql: BITMAP_OR_AGG(bitmap) + table: bitmap.bitmapOrAgg() + description: | + 聚合多个位图的并集(OR)。 + + `bitmap BITMAP` + + 返回一个 `BITMAP`。 + - sql: BITMAP_OR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapOrCardinalityAgg() + description: | + 聚合多个位图的并集(OR)并返回其 64 位基数。 + + `bitmap BITMAP` + + 返回一个 `BIGINT`。 - sql: BITMAP_TO_ARRAY(bitmap) table: bitmap.bitmapToArray() description: | @@ -1521,11 +1569,27 @@ bitmap: - sql: BITMAP_XOR(bitmap1, bitmap2) table: bitmap1.bitmapXor(bitmap2) description: | - 计算两个位图的异或 (XOR)。 + 计算两个位图的异或(XOR)。 `bitmap1 BITMAP, bitmap2 BITMAP` 返回一个 `BITMAP`。如果任一参数为 `NULL`,则返回 `NULL`。 + - sql: BITMAP_XOR_AGG(bitmap) + table: bitmap.bitmapXorAgg() + description: | + 聚合多个位图的异或(XOR)。 + + `bitmap BITMAP` + + 返回一个 `BITMAP`。 + - sql: BITMAP_XOR_CARDINALITY_AGG(bitmap) + table: bitmap.bitmapXorCardinalityAgg() + description: | + 聚合多个位图的异或(XOR)并返回其 64 位基数。 + + `bitmap BITMAP` + + 返回一个 `BIGINT`。 auxilary: - table: callSql(STRING) diff --git a/flink-core/src/main/java/org/apache/flink/types/bitmap/RoaringBitmapData.java b/flink-core/src/main/java/org/apache/flink/types/bitmap/RoaringBitmapData.java index d9ee1988cc671..e8f8624093de7 100644 --- a/flink-core/src/main/java/org/apache/flink/types/bitmap/RoaringBitmapData.java +++ b/flink-core/src/main/java/org/apache/flink/types/bitmap/RoaringBitmapData.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.types.DeserializationException; -import org.apache.flink.util.Preconditions; import org.roaringbitmap.IntConsumer; import org.roaringbitmap.RoaringBitmap; @@ -28,6 +27,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Objects; @@ -36,7 +36,9 @@ * modifications. */ @Internal -public final class RoaringBitmapData implements Bitmap { +public final class RoaringBitmapData implements Bitmap, Serializable { + + private static final long serialVersionUID = 1L; private final RoaringBitmap roaringBitmap; @@ -48,10 +50,6 @@ private RoaringBitmapData(RoaringBitmapData other) { this.roaringBitmap = other.roaringBitmap.clone(); } - private RoaringBitmapData(RoaringBitmap roaringBitmap) { - this.roaringBitmap = roaringBitmap; - } - // ~ Static Methods ---------------------------------------------------------------- public static RoaringBitmapData empty() { @@ -79,15 +77,6 @@ public static RoaringBitmapData fromArray(@Nonnull int[] values) { return rb32; } - /** - * Wraps the given {@link RoaringBitmap} without copying. The returned {@link RoaringBitmapData} - * shares the same internal object as the input. - */ - public static RoaringBitmapData wrap(@Nonnull RoaringBitmap roaringBitmap) { - Preconditions.checkNotNull(roaringBitmap); - return new RoaringBitmapData(roaringBitmap); - } - private static RoaringBitmapData toRoaringBitmapData(Bitmap bm) throws IllegalArgumentException { if (!(bm instanceof RoaringBitmapData)) { diff --git a/flink-core/src/test/java/org/apache/flink/types/RoaringBitmapDataTest.java b/flink-core/src/test/java/org/apache/flink/types/RoaringBitmapDataTest.java index cce27ddb26dcc..837f889c2311f 100644 --- a/flink-core/src/test/java/org/apache/flink/types/RoaringBitmapDataTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/RoaringBitmapDataTest.java @@ -66,12 +66,6 @@ void testStaticConstructors() { // fromArray assertThat(RoaringBitmapData.fromArray(new int[0]).toArray()).containsExactly(); assertThat(RoaringBitmapData.fromArray(new int[] {1, 2}).toArray()).containsExactly(1, 2); - - // wrap - assertThatThrownBy(() -> RoaringBitmapData.wrap(null)) - .isInstanceOf(NullPointerException.class); - assertThat(RoaringBitmapData.wrap(RoaringBitmap.bitmapOf(1, 2)).toArray()) - .containsExactly(1, 2); } @Test diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 9ad848c167fd7..e2aeb34f41107 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -347,11 +347,19 @@ Bitmap functions Expression.bitmap_and Expression.bitmap_andnot + Expression.bitmap_and_agg + Expression.bitmap_and_cardinality_agg Expression.bitmap_build + Expression.bitmap_build_agg + Expression.bitmap_build_cardinality_agg Expression.bitmap_cardinality Expression.bitmap_from_bytes Expression.bitmap_or + Expression.bitmap_or_agg + Expression.bitmap_or_cardinality_agg Expression.bitmap_to_array Expression.bitmap_to_bytes Expression.bitmap_to_string Expression.bitmap_xor + Expression.bitmap_xor_agg + Expression.bitmap_xor_cardinality_agg diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index a251a2dfcda2c..7bf0084b47095 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -2299,6 +2299,22 @@ def bitmap_andnot(self, bitmap2) -> 'Expression': """ return _binary_op("bitmapAndnot")(self, bitmap2) + def bitmap_and_agg(self): + """ + Aggregates the AND (intersection) of multiple bitmaps. + + :return: a BITMAP expression + """ + return _unary_op("bitmapAndAgg")(self) + + def bitmap_and_cardinality_agg(self): + """ + Aggregates the AND (intersection) of multiple bitmaps and returns its 64-bit cardinality. + + :return: a BIGINT expression + """ + return _unary_op("bitmapAndCardinalityAgg")(self) + def bitmap_build(self) -> 'Expression': """ Creates a bitmap from an array of 32-bit integers. @@ -2309,6 +2325,22 @@ def bitmap_build(self) -> 'Expression': """ return _unary_op("bitmapBuild")(self) + def bitmap_build_agg(self): + """ + Aggregates 32-bit integers into a bitmap. + + :return: a BITMAP expression + """ + return _unary_op("bitmapBuildAgg")(self) + + def bitmap_build_cardinality_agg(self): + """ + Aggregates 32-bit integers into a bitmap and returns its 64-bit cardinality. + + :return: a BIGINT expression + """ + return _unary_op("bitmapBuildCardinalityAgg")(self) + def bitmap_cardinality(self) -> 'Expression': """ Returns the cardinality of a bitmap. @@ -2343,6 +2375,22 @@ def bitmap_or(self, bitmap2) -> 'Expression': """ return _binary_op("bitmapOr")(self, bitmap2) + def bitmap_or_agg(self): + """ + Aggregates the OR (union) of multiple bitmaps. + + :return: a BITMAP expression + """ + return _unary_op("bitmapOrAgg")(self) + + def bitmap_or_cardinality_agg(self): + """ + Aggregates the OR (union) of multiple bitmaps and returns its 64-bit cardinality. + + :return: a BIGINT expression + """ + return _unary_op("bitmapOrCardinalityAgg")(self) + def bitmap_to_array(self) -> 'Expression': """ Converts a bitmap to an array of 32-bit integers, the values are sorted by \ @@ -2395,6 +2443,23 @@ def bitmap_xor(self, bitmap2) -> 'Expression': """ return _binary_op("bitmapXor")(self, bitmap2) + def bitmap_xor_agg(self): + """ + Aggregates the XOR (symmetric difference) of multiple bitmaps. + + :return: a BITMAP expression + """ + return _unary_op("bitmapXorAgg")(self) + + def bitmap_xor_cardinality_agg(self): + """ + Aggregates the XOR (symmetric difference) of multiple bitmaps and returns its 64-bit + cardinality. + + :return: a BIGINT expression + """ + return _unary_op("bitmapXorCardinalityAgg")(self) + # add the docs _make_math_log_doc() diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index edc0dab1770ff..e7a2078d1dbe4 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -268,14 +268,23 @@ def test_expression(self): # bitmap functions self.assertEqual("BITMAP_AND(a, b)", str(expr1.bitmap_and(expr2))) self.assertEqual("BITMAP_ANDNOT(a, b)", str(expr1.bitmap_andnot(expr2))) + self.assertEqual("BITMAP_AND_AGG(a)", str(expr1.bitmap_and_agg())) + self.assertEqual("BITMAP_AND_CARDINALITY_AGG(a)", str(expr1.bitmap_and_cardinality_agg())) self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build())) + self.assertEqual("BITMAP_BUILD_AGG(a)", str(expr1.bitmap_build_agg())) + self.assertEqual("BITMAP_BUILD_CARDINALITY_AGG(a)", + str(expr1.bitmap_build_cardinality_agg())) self.assertEqual("BITMAP_CARDINALITY(a)", str(expr1.bitmap_cardinality())) self.assertEqual("BITMAP_FROM_BYTES(a)", str(expr1.bitmap_from_bytes())) self.assertEqual("BITMAP_OR(a, b)", str(expr1.bitmap_or(expr2))) + self.assertEqual("BITMAP_OR_AGG(a)", str(expr1.bitmap_or_agg())) + self.assertEqual("BITMAP_OR_CARDINALITY_AGG(a)", str(expr1.bitmap_or_cardinality_agg())) self.assertEqual("BITMAP_TO_ARRAY(a)", str(expr1.bitmap_to_array())) self.assertEqual("BITMAP_TO_BYTES(a)", str(expr1.bitmap_to_bytes())) self.assertEqual("BITMAP_TO_STRING(a)", str(expr1.bitmap_to_string())) self.assertEqual("BITMAP_XOR(a, b)", str(expr1.bitmap_xor(expr2))) + self.assertEqual("BITMAP_XOR_AGG(a)", str(expr1.bitmap_xor_agg())) + self.assertEqual("BITMAP_XOR_CARDINALITY_AGG(a)", str(expr1.bitmap_xor_cardinality_agg())) def test_expressions(self): expr1 = col('a') diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 3704f80ea8b70..362a7179bbc9d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -78,14 +78,22 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_ANDNOT; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_ARRAY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_TO_STRING; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_AGG; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_XOR_CARDINALITY_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST; @@ -2640,6 +2648,24 @@ public OutType bitmapAndnot(InType bitmap2) { unresolvedCall(BITMAP_ANDNOT, toExpr(), objectToExpression(bitmap2))); } + /** + * Aggregates the AND (intersection) of multiple bitmaps. + * + * @return a BITMAP expression + */ + public OutType bitmapAndAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_AND_AGG, toExpr())); + } + + /** + * Aggregates the AND (intersection) of multiple bitmaps and returns its 64-bit cardinality. + * + * @return a BIGINT expression + */ + public OutType bitmapAndCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_AND_CARDINALITY_AGG, toExpr())); + } + /** * Creates a bitmap from an array of 32-bit integers. * @@ -2651,6 +2677,24 @@ public OutType bitmapBuild() { return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD, toExpr())); } + /** + * Aggregates 32-bit integers into a bitmap. + * + * @return a BITMAP expression + */ + public OutType bitmapBuildAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_AGG, toExpr())); + } + + /** + * Aggregates 32-bit integers into a bitmap and returns its 64-bit cardinality. + * + * @return a BIGINT expression + */ + public OutType bitmapBuildCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_CARDINALITY_AGG, toExpr())); + } + /** * Returns the cardinality of a bitmap. * @@ -2690,6 +2734,24 @@ public OutType bitmapOr(InType bitmap2) { unresolvedCall(BITMAP_OR, toExpr(), objectToExpression(bitmap2))); } + /** + * Aggregates the OR (union) of multiple bitmaps. + * + * @return a BITMAP expression + */ + public OutType bitmapOrAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_OR_AGG, toExpr())); + } + + /** + * Aggregates the OR (union) of multiple bitmaps and returns its 64-bit cardinality. + * + * @return a BIGINT expression + */ + public OutType bitmapOrCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_OR_CARDINALITY_AGG, toExpr())); + } + /** * Converts a bitmap to an array of 32-bit integers, the values are sorted by {@link * Integer#compareUnsigned}. @@ -2749,4 +2811,23 @@ public OutType bitmapXor(InType bitmap2) { return toApiSpecificExpression( unresolvedCall(BITMAP_XOR, toExpr(), objectToExpression(bitmap2))); } + + /** + * Aggregates the XOR (symmetric difference) of multiple bitmaps. + * + * @return a BITMAP expression + */ + public OutType bitmapXorAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_XOR_AGG, toExpr())); + } + + /** + * Aggregates the XOR (symmetric difference) of multiple bitmaps and returns its 64-bit + * cardinality. + * + * @return a BIGINT expression + */ + public OutType bitmapXorCardinalityAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_XOR_CARDINALITY_AGG, toExpr())); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 7dac1d91cc8b3..e8783ba51c859 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -3015,6 +3015,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.BitmapAndnotFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_AND_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_AND_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + + public static final BuiltInFunctionDefinition BITMAP_AND_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_AND_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_BUILD = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_BUILD") @@ -3037,6 +3061,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.BitmapBuildFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_BUILD_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_BUILD_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("value"), + Collections.singletonList(logical(LogicalTypeRoot.INTEGER)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + + public static final BuiltInFunctionDefinition BITMAP_BUILD_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_BUILD_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("value"), + Collections.singletonList(logical(LogicalTypeRoot.INTEGER)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_CARDINALITY = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_CARDINALITY") @@ -3079,6 +3127,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.BitmapOrFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_OR_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_OR_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + + public static final BuiltInFunctionDefinition BITMAP_OR_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_OR_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_TO_ARRAY = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_TO_ARRAY") @@ -3133,6 +3205,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.BitmapXorFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_XOR_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_XOR_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + + public static final BuiltInFunctionDefinition BITMAP_XOR_CARDINALITY_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_XOR_CARDINALITY_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("bitmap"), + Collections.singletonList(logical(LogicalTypeRoot.BITMAP)))) + .outputTypeStrategy(explicit(DataTypes.BIGINT())) + .runtimeProvided() + .build(); + // -------------------------------------------------------------------------------------------- // Other functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index ce6bbece4240f..37ba914bacd99 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -174,6 +174,22 @@ class AggFunctionFactory( // built-in imperativeFunction case BuiltInFunctionDefinitions.PERCENTILE => createPercentileAggFunction(argTypes) + case BuiltInFunctionDefinitions.BITMAP_BUILD_AGG => + createBitmapBuildAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_BUILD_CARDINALITY_AGG => + createBitmapBuildCardinalityAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_AND_AGG => + createBitmapAndAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_AND_CARDINALITY_AGG => + createBitmapAndCardinalityAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_OR_AGG => + createBitmapOrAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_OR_CARDINALITY_AGG => + createBitmapOrCardinalityAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_XOR_AGG => + createBitmapXorAggFunction(argTypes, index) + case BuiltInFunctionDefinitions.BITMAP_XOR_CARDINALITY_AGG => + createBitmapXorCardinalityAggFunction(argTypes, index) // DeclarativeAggregateFunction & UDF case _ => bridge.getDefinition.asInstanceOf[UserDefinedFunction] @@ -649,4 +665,88 @@ class AggFunctionFactory( new SinglePercentileAggFunction(firstArg, secondArg) } } + + private def createBitmapBuildAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAggFunction(argTypes(0)) + } else { + new AbstractBitmapBuildAggFunction.BitmapBuildAggFunction(argTypes(0)) + } + } + + private def createBitmapBuildCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapBuildWithRetractAggFunction.BitmapBuildCardinalityWithRetractAggFunction( + argTypes(0)) + } else { + new AbstractBitmapBuildAggFunction.BitmapBuildCardinalityAggFunction(argTypes(0)) + } + } + + private def createBitmapAndAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapAndWithRetractAggFunction.BitmapAndWithRetractAggFunction(argTypes(0)) + } else { + new AbstractBitmapAndAggFunction.BitmapAndAggFunction(argTypes(0)) + } + } + + private def createBitmapAndCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapAndWithRetractAggFunction.BitmapAndCardinalityWithRetractAggFunction( + argTypes(0)) + } else { + new AbstractBitmapAndAggFunction.BitmapAndCardinalityAggFunction(argTypes(0)) + } + } + + private def createBitmapOrAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapOrWithRetractAggFunction.BitmapOrWithRetractAggFunction(argTypes(0)) + } else { + new AbstractBitmapOrAggFunction.BitmapOrAggFunction(argTypes(0)) + } + } + + private def createBitmapOrCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapOrWithRetractAggFunction.BitmapOrCardinalityWithRetractAggFunction( + argTypes(0)) + } else { + new AbstractBitmapOrAggFunction.BitmapOrCardinalityAggFunction(argTypes(0)) + } + } + + private def createBitmapXorAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapXorWithRetractAggFunction.BitmapXorWithRetractAggFunction(argTypes(0)) + } else { + new AbstractBitmapXorAggFunction.BitmapXorAggFunction(argTypes(0)) + } + } + + private def createBitmapXorCardinalityAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new AbstractBitmapXorWithRetractAggFunction.BitmapXorCardinalityWithRetractAggFunction( + argTypes(0)) + } else { + new AbstractBitmapXorAggFunction.BitmapXorCardinalityAggFunction(argTypes(0)) + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java new file mode 100644 index 0000000000000..2d67062b155cb --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.types.Row; +import org.apache.flink.types.bitmap.Bitmap; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.DataTypes.ARRAY; +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BITMAP; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.array; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.apache.flink.types.RowKind.UPDATE_BEFORE; + +/** Tests for built-in bitmap aggregation functions. */ +class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + + private static final Bitmap OVERSIZE_BITMAP; + + static { + // size 0x80000000L + OVERSIZE_BITMAP = Bitmap.empty(); + OVERSIZE_BITMAP.add(0L, Integer.MAX_VALUE); + OVERSIZE_BITMAP.add(Integer.MAX_VALUE); + } + + @Override + Stream getTestCaseSpecs() { + final List specs = new ArrayList<>(); + specs.addAll(bitmapAndAggTestCases()); + specs.addAll(bitmapBuildAggTestCases()); + specs.addAll(bitmapOrAggTestCases()); + specs.addAll(bitmapXorAggTestCases()); + return specs.stream(); + } + + private List bitmapAndAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) + .withDescription("without retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(2, 3, 4), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(4, 6, 8, 12, 16), "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), + Row.ofKind(INSERT, fromArray(-1, -2), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(INSERT, null, "D"), + Row.ofKind(INSERT, OVERSIZE_BITMAP, "E"))) + .testResult( + source -> + "SELECT f1, BITMAP_AND_AGG(f0), BITMAP_AND_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapAndAgg(), + $("f0").bitmapAndCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", fromArray(3), 1L), + Row.of("B", fromArray(4, 6), 2L), + Row.of("C", fromArray(-1), 1L), + Row.of("D", null, null), + Row.of("E", OVERSIZE_BITMAP, 0x80000000L))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) + .withDescription("with retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(2, 4, 6), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "B"), + Row.ofKind(DELETE, fromArray(1, 3, 5), "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 2, 3, 4), "B"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "B"), // count < 0 + Row.ofKind(INSERT, fromArray(2, 3, 4, 5, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 4, 7), "B"), + Row.ofKind(DELETE, fromArray(1, 4, 7), "B"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 4, 6), "B"), + Row.ofKind(UPDATE_AFTER, fromArray(3, 4, 5), "B"), + Row.ofKind(INSERT, fromArray(2, 3, 11), "C"), + Row.ofKind(INSERT, fromArray(1, 5, 13), "C"), + Row.ofKind(INSERT, fromArray(-1, -3, 0), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, fromArray(-1, -3, 0), "C"), + Row.ofKind(DELETE, fromArray(1, 5, 13), "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 3, 11), "C"), + Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_AND_AGG(f0), BITMAP_AND_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapAndAgg(), + $("f0").bitmapAndCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", null, null), + Row.of("B", fromArray(3, 4), 2L), + Row.of("C", fromArray(1, 2), 2L))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_AND_AGG) + .withDescription("Validation Error") + .withSource( + ROW(INT(), ARRAY(INT()), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1, array(1, 2), "A"))) + .testValidationError( + source -> + "SELECT f2, BITMAP_AND_AGG(f0) FROM " + + source + + " GROUP BY f2", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f2")), + $("f2"), + $("f1").bitmapAndAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_AND_AGG(bitmap )")); + } + + private List bitmapBuildAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) + .withDescription("without retraction") + .withSource( + ROW(INT(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, 1, "A"), + Row.ofKind(INSERT, 2, "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, 4, "A"), + Row.ofKind(INSERT, 3, "A"), + Row.ofKind(INSERT, 2, "B"), + Row.ofKind(INSERT, -1, "A"), + Row.ofKind(INSERT, 1, "B"), + Row.ofKind(INSERT, -1, "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, null, "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_BUILD_AGG(f0), BITMAP_BUILD_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapBuildAgg(), + $("f0").bitmapBuildCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", fromArray(-1, 1, 2, 3, 4), 5L), + Row.of("B", fromArray(-1, 1, 2), 3L), + Row.of("C", null, null))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) + .withDescription("with retraction") + .withSource( + ROW(INT(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, 1, "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, 1, "A"), + Row.ofKind(INSERT, 1, "B"), + Row.ofKind(DELETE, 1, "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, 3, "B"), + Row.ofKind(DELETE, 2, "B"), // count < 0 + Row.ofKind(INSERT, -1, "B"), + Row.ofKind(INSERT, 2, "B"), + Row.ofKind(INSERT, 2, "B"), + Row.ofKind(INSERT, 1, "B"), + Row.ofKind(DELETE, 1, "B"), + Row.ofKind(UPDATE_BEFORE, 3, "B"), + Row.ofKind(UPDATE_AFTER, 1, "B"), + Row.ofKind(INSERT, 2, "C"), + Row.ofKind(INSERT, 1, "C"), + Row.ofKind(INSERT, -1, "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, 1, "C"), + Row.ofKind(DELETE, -1, "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, 2, "C"), + Row.ofKind(UPDATE_AFTER, 1, "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_BUILD_AGG(f0), BITMAP_BUILD_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapBuildAgg(), + $("f0").bitmapBuildCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", null, null), + Row.of("B", fromArray(-1, 1, 2), 3L), + Row.of("C", fromArray(1), 1L))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) + .withDescription("Validation Error") + .withSource( + ROW(BIGINT(), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1L, "A"))) + .testValidationError( + source -> + "SELECT f1, BITMAP_BUILD_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapBuildAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_BUILD_AGG(value )")); + } + + private List bitmapOrAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) + .withDescription("without retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(2, 3, 4), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(4, 6, 8, 12, 16), "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), + Row.ofKind(INSERT, fromArray(-1, -2), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(INSERT, null, "D"), + Row.ofKind(INSERT, OVERSIZE_BITMAP, "E"))) + .testResult( + source -> + "SELECT f1, BITMAP_OR_AGG(f0), BITMAP_OR_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapOrAgg(), + $("f0").bitmapOrCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", fromArray(1, 2, 3, 4, 5), 5L), + Row.of( + "B", + Bitmap.fromArray(new int[] {1, 2, 4, 6, 8, 12, 16}), + 7L), + Row.of("C", fromArray(0, 1, -2, -1), 4L), + Row.of("D", null, null), + Row.of("E", OVERSIZE_BITMAP, 0x80000000L))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) + .withDescription("with retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "B"), + Row.ofKind(DELETE, fromArray(1, 3, 5), "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 2, 3, 4), "B"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "B"), // count < 0 + Row.ofKind(INSERT, fromArray(2, 3, 4, 5, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 4, 7), "B"), + Row.ofKind(DELETE, fromArray(1, 4, 7), "B"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 4, 6), "B"), + Row.ofKind(UPDATE_AFTER, fromArray(3, 4, 5), "B"), + Row.ofKind(INSERT, fromArray(2, 3, 11), "C"), + Row.ofKind(INSERT, fromArray(1, 5, 13), "C"), + Row.ofKind(INSERT, fromArray(-1, -3, 0), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, fromArray(-1, -3, 0), "C"), + Row.ofKind(DELETE, fromArray(1, 5, 13), "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 3, 11), "C"), + Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_OR_AGG(f0), BITMAP_OR_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapOrAgg(), + $("f0").bitmapOrCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", null, null), + Row.of("B", fromArray(0, 2, 3, 4, 5, 6, -1), 7L), + Row.of("C", fromArray(1, 2), 2L))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_OR_AGG) + .withDescription("Validation Error") + .withSource( + ROW(INT(), ARRAY(INT()), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1, array(1, 2), "A"))) + .testValidationError( + source -> + "SELECT f2, BITMAP_OR_AGG(f0) FROM " + + source + + " GROUP BY f2", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f2")), + $("f2"), + $("f1").bitmapOrAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_OR_AGG(bitmap )")); + } + + private List bitmapXorAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) + .withDescription("without retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(2, 3, 4), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(4, 6, 8, 12, 16), "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 1), "C"), + Row.ofKind(INSERT, fromArray(-1, -2), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(INSERT, null, "D"), + Row.ofKind(INSERT, OVERSIZE_BITMAP, "E"))) + .testResult( + source -> + "SELECT f1, BITMAP_XOR_AGG(f0), BITMAP_XOR_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapXorAgg(), + $("f0").bitmapXorCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", fromArray(3, 4, 5), 3L), + Row.of("B", fromArray(1, 4, 6, 8, 12, 16), 6L), + Row.of("C", fromArray(0, 1, -2), 3L), + Row.of("D", null, null), + Row.of("E", OVERSIZE_BITMAP, 0x80000000L))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) + .withDescription("with retraction") + .withSource( + ROW(BITMAP(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, fromArray(1, 2, 3), "A"), + Row.ofKind(INSERT, fromArray(1, 3, 5), "B"), + Row.ofKind(DELETE, fromArray(1, 3, 5), "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, fromArray(-1, 0, 2, 3, 4), "B"), + Row.ofKind(DELETE, fromArray(2, 4, 6), "B"), // count < 0 + Row.ofKind(INSERT, fromArray(2, 3, 4, 5, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(2, 4, 6), "B"), + Row.ofKind(INSERT, fromArray(1, 4, 7), "B"), + Row.ofKind(DELETE, fromArray(1, 4, 7), "B"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 4, 6), "B"), + Row.ofKind(UPDATE_AFTER, fromArray(3, 4, 5), "B"), + Row.ofKind(INSERT, fromArray(2, 3, 11), "C"), + Row.ofKind(INSERT, fromArray(1, 5, 13), "C"), + Row.ofKind(INSERT, fromArray(-1, -3, 0), "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, fromArray(-1, -3, 0), "C"), + Row.ofKind(DELETE, fromArray(1, 5, 13), "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, fromArray(2, 3, 11), "C"), + Row.ofKind(UPDATE_AFTER, fromArray(1, 2), "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_XOR_AGG(f0), BITMAP_XOR_CARDINALITY_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapXorAgg(), + $("f0").bitmapXorCardinalityAgg()), + ROW(STRING(), BITMAP(), BIGINT()), + ROW(STRING(), BITMAP(), BIGINT()), + Arrays.asList( + Row.of("A", null, null), + Row.of("B", fromArray(0, 3, 4, 6, -1), 5L), + Row.of("C", fromArray(1, 2), 2L))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_XOR_AGG) + .withDescription("Validation Error") + .withSource( + ROW(INT(), ARRAY(INT()), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1, array(1, 2), "A"))) + .testValidationError( + source -> + "SELECT f2, BITMAP_XOR_AGG(f0) FROM " + + source + + " GROUP BY f2", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f2")), + $("f2"), + $("f1").bitmapXorAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_XOR_AGG(bitmap )")); + } + + // ~ Utils -------------------------------------------------------------------- + + private Bitmap fromArray(int... values) { + return Bitmap.fromArray(values); + } +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala index b08a944b57149..69040c961d8f5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala @@ -2974,6 +2974,66 @@ class OverAggregateITCase extends BatchTestBase { ) ) } + + @Test + def testBitmapBuildAgg(): Unit = { + checkResult( + "SELECT " + + "d, e, " + + "BITMAP_BUILD_AGG(d) OVER (ORDER BY e ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " + + "BITMAP_BUILD_AGG(CAST(e AS INT)) OVER (ORDER BY e ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " + + "FROM NullTable5", + Seq( + row(1, 1L, "{1}", "{1}"), + row(2, 2L, "{1,2}", "{1,2}"), + row(2, 3L, "{1,2}", "{1,2,3}"), + row(3, 4L, "{1,2,3}", "{1,2,3,4}"), + row(3, 5L, "{1,2,3}", "{1,2,3,4,5}"), + row(3, 6L, "{2,3}", "{2,3,4,5,6}"), + row(4, 7L, "{2,3,4}", "{3,4,5,6,7}"), + row(4, 8L, "{3,4}", "{4,5,6,7,8}"), + row(4, 9L, "{3,4}", "{5,6,7,8,9}"), + row(4, 10L, "{3,4}", "{6,7,8,9,10}"), + row(5, 11L, "{4,5}", "{7,8,9,10,11}"), + row(5, 12L, "{4,5}", "{8,9,10,11,12}"), + row(5, 13L, "{4,5}", "{9,10,11,12,13}"), + row(5, 14L, "{4,5}", "{10,11,12,13,14}"), + row(5, 15L, "{5}", "{11,12,13,14,15}"), + row(null, 999L, "{5}", "{12,13,14,15,999}"), + row(null, 999L, "{5}", "{13,14,15,999}") + ) + ) + } + + @Test + def testBitmapLogicalOpsAgg(): Unit = { + checkResult( + "SELECT " + + "d, f, " + + "BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " + + "BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " + + "BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[d,f])) OVER (ORDER BY e ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " + + "FROM Table5", + Seq( + row(1, 0, "{0,1}", "{0,1}", "{0,1}"), + row(2, 1, "{1}", "{0,1,2}", "{0,2}"), + row(2, 2, "{}", "{0,1,2}", "{0}"), + row(3, 3, "{}", "{1,2,3}", "{1,3}"), + row(3, 4, "{}", "{2,3,4}", "{2,4}"), + row(3, 5, "{3}", "{3,4,5}", "{3,4,5}"), + row(4, 6, "{}", "{3,4,5,6}", "{5,6}"), + row(4, 7, "{}", "{3,4,5,6,7}", "{3,5,6,7}"), + row(4, 8, "{4}", "{4,6,7,8}", "{4,6,7,8}"), + row(4, 9, "{4}", "{4,7,8,9}", "{4,7,8,9}"), + row(5, 10, "{}", "{4,5,8,9,10}", "{5,8,9,10}"), + row(5, 11, "{}", "{4,5,9,10,11}", "{4,9,10,11}"), + row(5, 12, "{5}", "{5,10,11,12}", "{5,10,11,12}"), + row(5, 13, "{5}", "{5,11,12,13}", "{5,11,12,13}"), + row(5, 14, "{5}", "{5,12,13,14}", "{5,12,13,14}") + ) + ) + } + } /** The initial accumulator for count aggregate function */ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index 3d0e189294e61..fbee1a8585fd2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -2119,6 +2119,159 @@ class AggregateITCase( tEnv.dropTemporarySystemFunction("PERCENTILE") } + + @TestTemplate + def testBitmapBuildAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, String)] + for (i <- 0 until 5) { + data.+=((i, -i, "a")) + data.+=((i * 2, -i * 2, "b")) + } + + val sql = + """ + |SELECT + | c, + | CAST(BITMAP_BUILD_AGG(a) AS STRING), + | CAST(BITMAP_BUILD_AGG(b) AS STRING) + |FROM MyTable + |GROUP BY c + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + s"a,{0,1,2,3,4},{0,${Integer.toUnsignedLong(-4)},${Integer.toUnsignedLong(-3)},${Integer + .toUnsignedLong(-2)},${Integer.toUnsignedLong(-1)}}", + s"b,{0,2,4,6,8},{0,${Integer.toUnsignedLong(-8)},${Integer.toUnsignedLong(-6)},${Integer + .toUnsignedLong(-4)},${Integer.toUnsignedLong(-2)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testBitmapBuildAggWithRetract(): Unit = { + val data = new mutable.MutableList[(Int, Int, String)] + for (i <- 0 until 5) { + data.+=((i, i, "a")) + data.+=((i + 1, i * 2, "b")) + data.+=((i + 2, i * 3, "c")) + } + + val inner = + """ + |SELECT + | MAX(a) AS ma, + | LAST_VALUE(b) AS la, + | c + |FROM MyTable + |GROUP BY c + """.stripMargin + val sql = + s""" + |SELECT + | CAST(BITMAP_BUILD_AGG(ma) AS STRING), + | CAST(BITMAP_BUILD_AGG(la) AS STRING) + |FROM ($inner) + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List(s"{4,5,6},{4,8,12}") + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testBitmapLogicalOpsAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, Int, String)] + data.+=((-3, 5, 0, "a")) + data.+=((7, 2, 5, "b")) + data.+=((-3, 8, -8, "c")) + data.+=((2, 1, 7, "b")) + data.+=((2, 9, 0, "a")) + data.+=((8, 3, -3, "c")) + data.+=((7, 6, 2, "b")) + data.+=((0, 4, 5, "a")) + data.+=((-3, 7, 8, "c")) + data.+=((5, 0, 2, "b")) + data.+=((0, 10, 5, "a")) + data.+=((2, 5, 0, "a")) + + val sql = + """ + |SELECT + | d, + | CAST(BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING), + | CAST(BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING), + | CAST(BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[a, b, c])) AS STRING) + |FROM MyTable + |GROUP BY d + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = + List( + s"a,{0},{0,2,4,5,9,10,${Integer.toUnsignedLong(-3)}},{0,4,9,10,${Integer.toUnsignedLong(-3)}}", + "b,{2},{0,1,2,5,6,7},{0,1,6,7}", + s"c,{8,${Integer.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testBitmapLogicalOpsAggWithRetract(): Unit = { + val data = new mutable.MutableList[(Int, Int, Int, String)] + for (i <- 3 until 0 by -1) { + data.+=((i, i + 1, i + 2, "a")) + data.+=((i * 2, i * 2 + 1, i * 2 + 2, "b")) + data.+=((i * 3, i * 3 + 1, i * 3 + 2, "c")) + } + + val inner = + """ + |SELECT + | LAST_VALUE(BITMAP_BUILD(ARRAY[a, b, c])) AS last, + | d + |FROM MyTable + |GROUP BY d + """.stripMargin + val sql = + s""" + |SELECT + | CAST(BITMAP_AND_AGG(last) AS STRING), + | CAST(BITMAP_OR_AGG(last) AS STRING), + | CAST(BITMAP_XOR_AGG(last) AS STRING) + |FROM ($inner) + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List(s"{3},{1,2,3,4,5},{1,3,5}") + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } } object AggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala index 82c58da9a8795..a1f60de7746df 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala @@ -1499,6 +1499,85 @@ class OverAggregateITCase(mode: StateBackendMode, unboundedOverVersion: Int) } } } + + @TestTemplate + def testBitmapBuildAgg(): Unit = { + val sql = + """ + |SELECT + | a, b, + | BITMAP_BUILD_AGG(a) OVER (ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), + | BITMAP_BUILD_AGG(CAST(b AS INT)) OVER (ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) + |FROM MyTable + """.stripMargin + + val t = + failingDataSource(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingAppendSink() + tEnv.sqlQuery(sql).toDataStream.addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + "1,1,{1},{1}", + "2,2,{1,2},{1,2}", + "2,3,{1,2},{1,2,3}", + "3,4,{1,2,3},{1,2,3,4}", + "3,5,{1,2,3},{1,2,3,4,5}", + "3,6,{2,3},{2,3,4,5,6}", + "4,7,{2,3,4},{3,4,5,6,7}", + "4,8,{3,4},{4,5,6,7,8}", + "4,9,{3,4},{5,6,7,8,9}", + "4,10,{3,4},{6,7,8,9,10}", + "5,11,{4,5},{7,8,9,10,11}", + "5,12,{4,5},{8,9,10,11,12}", + "5,13,{4,5},{9,10,11,12,13}", + "5,14,{4,5},{10,11,12,13,14}", + "5,15,{5},{11,12,13,14,15}" + ) + assertThat(sink.getAppendResults).isEqualTo(expected) + } + + @TestTemplate + def testBitmapLogicalOpsAgg(): Unit = { + val sql = + """ + |SELECT + | a, c, + | BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), + | BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), + | BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[a,c])) OVER (ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) + |FROM MyTable + """.stripMargin + + val t = + failingDataSource(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingAppendSink() + tEnv.sqlQuery(sql).toDataStream.addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + "1,0,{0,1},{0,1},{0,1}", + "2,1,{1},{0,1,2},{0,2}", + "2,2,{},{0,1,2},{0}", + "3,3,{},{1,2,3},{1,3}", + "3,4,{},{2,3,4},{2,4}", + "3,5,{3},{3,4,5},{3,4,5}", + "4,6,{},{3,4,5,6},{5,6}", + "4,7,{},{3,4,5,6,7},{3,5,6,7}", + "4,8,{4},{4,6,7,8},{4,6,7,8}", + "4,9,{4},{4,7,8,9},{4,7,8,9}", + "5,10,{},{4,5,8,9,10},{5,8,9,10}", + "5,11,{},{4,5,9,10,11},{4,9,10,11}", + "5,12,{5},{5,10,11,12},{5,10,11,12}", + "5,13,{5},{5,11,12,13},{5,11,12,13}", + "5,14,{5},{5,12,13,14},{5,12,13,14}" + ) + assertThat(sink.getAppendResults).isEqualTo(expected) + } } object OverAggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 12ac1f7368074..52c8ff9d13185 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -988,6 +988,32 @@ class WindowAggregateITCase( } } + @TestTemplate + def testBitmapBuildAggOnEventTimeTumbleWindow(): Unit = { + val sql = + """ + |SELECT + | window_start, + | window_end, + | BITMAP_BUILD_AGG(`int`) as `bitmap` + |FROM TABLE( + | TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY window_start, window_end + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toDataStream.addSink(sink) + env.execute() + + val expected = List( + "2020-10-10T00:00,2020-10-10T00:00:05,{2,5,22}", + "2020-10-10T00:00:05,2020-10-10T00:00:10,{3,6}", + "2020-10-10T00:00:15,2020-10-10T00:00:20,{4}" + ) + val result = sink.getAppendResults.sorted + assertThat(result.mkString("\n")).isEqualTo(expected.mkString("\n")) + } + private def verifyWindowAgg( tvfFromClause: String, allExpectedData: Seq[String], @@ -1090,6 +1116,34 @@ class WindowAggregateITCase( }) .map(_.mkString(",")) } + + @TestTemplate + def testBitmapLogicalOpsAggOnEventTimeTumbleWindow(): Unit = { + val sql = + """ + |SELECT + | window_start, + | window_end, + | BITMAP_AND_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])), + | BITMAP_OR_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])), + | BITMAP_XOR_AGG(BITMAP_BUILD(ARRAY[`int`, 2*`int`, 4*`int`])) + |FROM TABLE( + | TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY window_start, window_end + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toDataStream.addSink(sink) + env.execute() + + val expected = List( + "2020-10-10T00:00,2020-10-10T00:00:05,{},{2,4,5,8,10,20,22,44,88},{2,4,5,8,10,20,22,44,88}", + "2020-10-10T00:00:05,2020-10-10T00:00:10,{6,12},{3,6,12,24},{6,12,24}", + "2020-10-10T00:00:15,2020-10-10T00:00:20,{4,8,16},{4,8,16},{4,8,16}" + ) + val result = sink.getAppendResults.sorted + assertThat(result.mkString("\n")).isEqualTo(expected.mkString("\n")) + } } object WindowAggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala index 067bfd609052f..08e27c452cc26 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala @@ -606,4 +606,74 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase } } } + + @TestTemplate + def testBitmapBuildAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, String)] + for (i <- 0 until 5) { + data.+=((i, -i, "a")) + data.+=((i * 2, -i * 2, "b")) + } + + val t = failingDataSource(data) + .toTable(tEnv, 'a, 'b, 'c) + .groupBy('c) + .select( + 'c, + 'a.bitmapBuildAgg().cast(DataTypes.STRING()), + 'b.bitmapBuildAgg().cast(DataTypes.STRING())) + + val sink = new TestingRetractSink + t.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + s"a,{0,1,2,3,4},{0,${Integer.toUnsignedLong(-4)},${Integer.toUnsignedLong(-3)},${Integer + .toUnsignedLong(-2)},${Integer.toUnsignedLong(-1)}}", + s"b,{0,2,4,6,8},{0,${Integer.toUnsignedLong(-8)},${Integer.toUnsignedLong(-6)},${Integer + .toUnsignedLong(-4)},${Integer.toUnsignedLong(-2)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testBitmapLogicalOpsAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, Int, String)] + data.+=((-3, 5, 0, "a")) + data.+=((7, 2, 5, "b")) + data.+=((-3, 8, -8, "c")) + data.+=((2, 1, 7, "b")) + data.+=((2, 9, 0, "a")) + data.+=((8, 3, -3, "c")) + data.+=((7, 6, 2, "b")) + data.+=((0, 4, 5, "a")) + data.+=((-3, 7, 8, "c")) + data.+=((5, 0, 2, "b")) + data.+=((0, 10, 5, "a")) + data.+=((2, 5, 0, "a")) + + val t = failingDataSource(data) + .toTable(tEnv, 'a, 'b, 'c, 'd) + .groupBy('d) + .select( + 'd, + array('a, 'b, 'c).bitmapBuild().bitmapAndAgg().cast(DataTypes.STRING()), + array('a, 'b, 'c).bitmapBuild().bitmapOrAgg().cast(DataTypes.STRING()), + array('a, 'b, 'c).bitmapBuild().bitmapXorAgg().cast(DataTypes.STRING()) + ) + + val sink = new TestingRetractSink + t.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = + List( + s"a,{0},{0,2,4,5,9,10,${Integer.toUnsignedLong(-3)}},{0,4,9,10,${Integer.toUnsignedLong(-3)}}", + "b,{2},{0,1,2,5,6,7},{0,1,6,7}", + s"c,{8,${Integer.toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}},{3,7,8,${Integer.toUnsignedLong(-8)},${Integer + .toUnsignedLong(-3)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java new file mode 100644 index 0000000000000..66bfb70919e0a --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndAggFunction.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG. */ +@Internal +public abstract class AbstractBitmapAndAggFunction + extends BuiltInAggregateFunction { + + private final transient DataType valueDataType; + + public AbstractBitmapAndAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapAndAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG. */ + public static class BitmapAndAccumulator { + + public @Nullable RoaringBitmapData bitmap; + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapAndAccumulator that = (BitmapAndAccumulator) obj; + return Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap); + } + } + + @Override + public BitmapAndAccumulator createAccumulator() { + return new BitmapAndAccumulator(); + } + + public void resetAccumulator(BitmapAndAccumulator acc) { + acc.bitmap = null; + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapAndAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + if (acc.bitmap != null) { + acc.bitmap.and(bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(bitmap); + } + } + + public void merge(BitmapAndAccumulator acc, Iterable its) { + for (BitmapAndAccumulator other : its) { + if (other.bitmap != null) { + if (acc.bitmap != null) { + acc.bitmap.and(other.bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(other.bitmap); + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_AND_AGG aggregate function that returns bitmap. */ + public static final class BitmapAndAggFunction extends AbstractBitmapAndAggFunction { + + public BitmapAndAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapAndAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + } + + /** Built-in BITMAP_AND_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapAndCardinalityAggFunction + extends AbstractBitmapAndAggFunction { + + public BitmapAndCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapAndAccumulator acc) { + return acc.bitmap == null ? null : acc.bitmap.getLongCardinality(); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java new file mode 100644 index 0000000000000..b2b2202b98bdc --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapAndWithRetractAggFunction.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.dataview.MapView; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG with retraction. */ +@Internal +public abstract class AbstractBitmapAndWithRetractAggFunction + extends BuiltInAggregateFunction< + T, AbstractBitmapAndWithRetractAggFunction.BitmapAndWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public AbstractBitmapAndWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapAndWithRetractAccumulator.class, + DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()), + DataTypes.FIELD( + "valueCount", + MapView.newMapViewDataType( + DataTypes.INT().notNull(), DataTypes.INT().notNull()))); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_AND_AGG and BITMAP_AND_CARDINALITY_AGG with retraction. */ + public static class BitmapAndWithRetractAccumulator { + + public int bitmapCount = 0; + public MapView valueCount = new MapView<>(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapAndWithRetractAccumulator that = (BitmapAndWithRetractAccumulator) obj; + return bitmapCount == that.bitmapCount && Objects.equals(valueCount, that.valueCount); + } + + @Override + public int hashCode() { + return Objects.hash(bitmapCount, valueCount); + } + } + + @Override + public BitmapAndWithRetractAccumulator createAccumulator() { + return new BitmapAndWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapAndWithRetractAccumulator acc) { + acc.bitmapCount = 0; + acc.valueCount.clear(); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapAndWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount++; + + RoaringBitmapData rbm32 = (RoaringBitmapData) bitmap; + rbm32.forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? 1 : count + 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void retract(BitmapAndWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount--; + + RoaringBitmapData rbm32 = (RoaringBitmapData) bitmap; + rbm32.forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? -1 : count - 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void merge( + BitmapAndWithRetractAccumulator acc, Iterable its) + throws Exception { + for (BitmapAndWithRetractAccumulator other : its) { + acc.bitmapCount += other.bitmapCount; + + for (Map.Entry entry : other.valueCount.entries()) { + Integer value = entry.getKey(); + Integer count = entry.getValue(); + + Integer curCount = acc.valueCount.get(value); + curCount = curCount == null ? count : curCount + count; + + if (curCount == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, curCount); + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_AND_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapAndWithRetractAggFunction + extends AbstractBitmapAndWithRetractAggFunction { + + public BitmapAndWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapAndWithRetractAccumulator acc) { + if (acc.bitmapCount <= 0) { + return null; + } + + RoaringBitmapData bitmap = RoaringBitmapData.empty(); + try { + for (Map.Entry entry : acc.valueCount.entries()) { + if (entry.getValue() == acc.bitmapCount) { + bitmap.add(entry.getKey()); + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + + return bitmap; + } + } + + /** + * Built-in BITMAP_AND_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapAndCardinalityWithRetractAggFunction + extends AbstractBitmapAndWithRetractAggFunction { + + public BitmapAndCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapAndWithRetractAccumulator acc) { + if (acc.bitmapCount <= 0) { + return null; + } + + long cardinality = 0L; + try { + for (Map.Entry entry : acc.valueCount.entries()) { + if (entry.getValue() == acc.bitmapCount) { + cardinality++; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + + return cardinality; + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java new file mode 100644 index 0000000000000..3d966102cf5f7 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildAggFunction.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG. */ +@Internal +public abstract class AbstractBitmapBuildAggFunction + extends BuiltInAggregateFunction { + + private final transient DataType valueDataType; + + public AbstractBitmapBuildAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.BITMAP().notNull(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + @Override + public Bitmap createAccumulator() { + return Bitmap.empty(); + } + + public void resetAccumulator(Bitmap acc) { + acc.clear(); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(Bitmap acc, @Nullable Integer value) { + if (value != null) { + acc.add(value); + } + } + + public void merge(Bitmap acc, Iterable its) { + for (Bitmap other : its) { + acc.or(other); + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_BUILD_AGG aggregate function that returns bitmap. */ + public static final class BitmapBuildAggFunction + extends AbstractBitmapBuildAggFunction { + + public BitmapBuildAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(Bitmap acc) { + return acc.isEmpty() ? null : RoaringBitmapData.from(acc); + } + } + + /** Built-in BITMAP_BUILD_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapBuildCardinalityAggFunction + extends AbstractBitmapBuildAggFunction { + + public BitmapBuildCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(Bitmap acc) { + return acc.isEmpty() ? null : acc.getLongCardinality(); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java new file mode 100644 index 0000000000000..fb9c040e26406 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapBuildWithRetractAggFunction.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.dataview.MapView; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG with retraction. */ +@Internal +public abstract class AbstractBitmapBuildWithRetractAggFunction + extends BuiltInAggregateFunction< + T, AbstractBitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public AbstractBitmapBuildWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapBuildWithRetractAccumulator.class, + DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()), + DataTypes.FIELD( + "valueCount", + MapView.newMapViewDataType( + DataTypes.INT().notNull(), DataTypes.INT().notNull()))); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_BUILD_AGG and BITMAP_BUILD_CARDINALITY_AGG with retraction. */ + public static class BitmapBuildWithRetractAccumulator { + + // bitmap should reflect the actual data based on valueCount + public RoaringBitmapData bitmap = RoaringBitmapData.empty(); + public MapView valueCount = new MapView<>(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapBuildWithRetractAccumulator that = (BitmapBuildWithRetractAccumulator) obj; + return Objects.equals(bitmap, that.bitmap) + && Objects.equals(valueCount, that.valueCount); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap, valueCount); + } + } + + @Override + public BitmapBuildWithRetractAccumulator createAccumulator() { + return new BitmapBuildWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapBuildWithRetractAccumulator acc) { + acc.bitmap.clear(); + acc.valueCount.clear(); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapBuildWithRetractAccumulator acc, @Nullable Integer value) + throws Exception { + if (value != null) { + Integer count = acc.valueCount.get(value); + count = count == null ? 1 : count + 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + // add value to bitmap if count changes from 0 to 1 or expires + if (count == 1) { + acc.bitmap.add(value); + } + } + } + } + + public void retract(BitmapBuildWithRetractAccumulator acc, @Nullable Integer value) + throws Exception { + if (value != null) { + Integer count = acc.valueCount.get(value); + count = count == null ? -1 : count - 1; + + if (count == 0) { + acc.valueCount.remove(value); + // remove value from bitmap if count changes from 1 to 0 + acc.bitmap.remove(value); + } else { + acc.valueCount.put(value, count); + if (count == -1) { + // remove value from bitmap if count expires + acc.bitmap.remove(value); + } + } + } + } + + public void merge( + BitmapBuildWithRetractAccumulator acc, Iterable its) + throws Exception { + for (BitmapBuildWithRetractAccumulator other : its) { + for (Map.Entry entry : other.valueCount.entries()) { + Integer value = entry.getKey(); + // count != 0 + Integer count = entry.getValue(); + + Integer curCount = acc.valueCount.get(value); + curCount = curCount == null ? count : curCount + count; + + if (curCount == 0) { + acc.valueCount.remove(value); + + if (curCount > count) { + // preCount > 0, value is in bitmap + acc.bitmap.remove(value); + } + } else { + acc.valueCount.put(value, curCount); + + if (0 < curCount && curCount <= count) { + // preCount < 0, value is not in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.add(value); + } + + if (0 > curCount && curCount >= count) { + // preCount > 0, value is in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.remove(value); + } + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_BUILD_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapBuildWithRetractAggFunction + extends AbstractBitmapBuildWithRetractAggFunction { + + public BitmapBuildWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapBuildWithRetractAccumulator acc) { + return acc.bitmap.isEmpty() ? null : RoaringBitmapData.from(acc.bitmap); + } + } + + /** + * Built-in BITMAP_BUILD_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapBuildCardinalityWithRetractAggFunction + extends AbstractBitmapBuildWithRetractAggFunction { + + public BitmapBuildCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapBuildWithRetractAccumulator acc) { + return acc.bitmap.isEmpty() ? null : acc.bitmap.getLongCardinality(); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java new file mode 100644 index 0000000000000..65c26029ea75d --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrAggFunction.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG. */ +@Internal +public abstract class AbstractBitmapOrAggFunction + extends BuiltInAggregateFunction { + + private final transient DataType valueDataType; + + public AbstractBitmapOrAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapOrAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG. */ + public static class BitmapOrAccumulator { + + public @Nullable RoaringBitmapData bitmap; + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapOrAccumulator that = (BitmapOrAccumulator) obj; + return Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap); + } + } + + @Override + public BitmapOrAccumulator createAccumulator() { + return new BitmapOrAccumulator(); + } + + public void resetAccumulator(BitmapOrAccumulator acc) { + acc.bitmap = null; + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapOrAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + if (acc.bitmap != null) { + acc.bitmap.or(bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(bitmap); + } + } + + public void merge(BitmapOrAccumulator acc, Iterable its) { + for (BitmapOrAccumulator other : its) { + if (other.bitmap != null) { + if (acc.bitmap != null) { + acc.bitmap.or(other.bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(other.bitmap); + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_OR_AGG aggregate function that returns bitmap. */ + public static final class BitmapOrAggFunction extends AbstractBitmapOrAggFunction { + + public BitmapOrAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapOrAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + } + + /** Built-in BITMAP_OR_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapOrCardinalityAggFunction + extends AbstractBitmapOrAggFunction { + + public BitmapOrCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapOrAccumulator acc) { + return acc.bitmap == null ? null : acc.bitmap.getLongCardinality(); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java new file mode 100644 index 0000000000000..cc205da44da92 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapOrWithRetractAggFunction.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.dataview.MapView; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG with retraction. */ +@Internal +public abstract class AbstractBitmapOrWithRetractAggFunction + extends BuiltInAggregateFunction< + T, AbstractBitmapOrWithRetractAggFunction.BitmapOrWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public AbstractBitmapOrWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapOrWithRetractAccumulator.class, + DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()), + DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()), + DataTypes.FIELD( + "valueCount", + MapView.newMapViewDataType( + DataTypes.INT().notNull(), DataTypes.INT().notNull()))); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_OR_AGG and BITMAP_OR_CARDINALITY_AGG with retraction. */ + public static class BitmapOrWithRetractAccumulator { + + public int bitmapCount = 0; + // bitmap should reflect the actual data based on valueCount + public RoaringBitmapData bitmap = RoaringBitmapData.empty(); + public MapView valueCount = new MapView<>(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapOrWithRetractAccumulator that = (BitmapOrWithRetractAccumulator) obj; + return bitmapCount == that.bitmapCount + && Objects.equals(bitmap, that.bitmap) + && Objects.equals(valueCount, that.valueCount); + } + + @Override + public int hashCode() { + return Objects.hash(bitmapCount, bitmap, valueCount); + } + } + + @Override + public BitmapOrWithRetractAccumulator createAccumulator() { + return new BitmapOrWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapOrWithRetractAccumulator acc) { + acc.bitmapCount = 0; + acc.bitmap.clear(); + acc.valueCount.clear(); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapOrWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount++; + + RoaringBitmapData rbm32 = (RoaringBitmapData) bitmap; + rbm32.forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? 1 : count + 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + // add value to bitmap if count changes from 0 to 1 or expires + if (count == 1) { + acc.bitmap.add(value); + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void retract(BitmapOrWithRetractAccumulator acc, @Nullable Bitmap bitmap) + throws Exception { + if (bitmap == null) { + return; + } + + acc.bitmapCount--; + + RoaringBitmapData rbm32 = (RoaringBitmapData) bitmap; + rbm32.forEach( + value -> { + try { + Integer count = acc.valueCount.get(value); + count = count == null ? -1 : count - 1; + + if (count == 0) { + acc.valueCount.remove(value); + // remove value from bitmap if count changes from 1 to 0 + acc.bitmap.remove(value); + } else { + acc.valueCount.put(value, count); + if (count == -1) { + // remove value from bitmap if count expires + acc.bitmap.remove(value); + } + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + public void merge( + BitmapOrWithRetractAccumulator acc, Iterable its) + throws Exception { + for (BitmapOrWithRetractAccumulator other : its) { + acc.bitmapCount += other.bitmapCount; + for (Map.Entry entry : other.valueCount.entries()) { + Integer value = entry.getKey(); + // count != 0 + Integer count = entry.getValue(); + + Integer curCount = acc.valueCount.get(value); + curCount = curCount == null ? count : curCount + count; + + if (curCount == 0) { + acc.valueCount.remove(value); + + if (curCount > count) { + // preCount > 0, value is in bitmap + acc.bitmap.remove(value); + } + } else { + acc.valueCount.put(value, curCount); + + if (0 < curCount && curCount <= count) { + // preCount < 0, value is not in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.add(value); + } + + if (0 > curCount && curCount >= count) { + // preCount > 0, value is in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.remove(value); + } + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_OR_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapOrWithRetractAggFunction + extends AbstractBitmapOrWithRetractAggFunction { + + public BitmapOrWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapOrWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); + } + } + + /** + * Built-in BITMAP_OR_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapOrCardinalityWithRetractAggFunction + extends AbstractBitmapOrWithRetractAggFunction { + + public BitmapOrCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapOrWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : acc.bitmap.getLongCardinality(); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java new file mode 100644 index 0000000000000..3890d2571921c --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorAggFunction.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG. */ +@Internal +public abstract class AbstractBitmapXorAggFunction + extends BuiltInAggregateFunction { + + private final transient DataType valueDataType; + + public AbstractBitmapXorAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapXorAccumulator.class, DataTypes.FIELD("bitmap", DataTypes.BITMAP())); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG. */ + public static class BitmapXorAccumulator { + + public @Nullable RoaringBitmapData bitmap; + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapXorAccumulator that = (BitmapXorAccumulator) obj; + return Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap); + } + } + + @Override + public BitmapXorAccumulator createAccumulator() { + return new BitmapXorAccumulator(); + } + + public void resetAccumulator(BitmapXorAccumulator acc) { + acc.bitmap = null; + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapXorAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + if (acc.bitmap != null) { + acc.bitmap.xor(bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(bitmap); + } + } + + public void merge(BitmapXorAccumulator acc, Iterable its) { + for (BitmapXorAccumulator other : its) { + if (other.bitmap != null) { + if (acc.bitmap != null) { + acc.bitmap.xor(other.bitmap); + } else { + acc.bitmap = RoaringBitmapData.from(other.bitmap); + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_XOR_AGG aggregate function that returns bitmap. */ + public static final class BitmapXorAggFunction extends AbstractBitmapXorAggFunction { + + public BitmapXorAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapXorAccumulator acc) { + return Bitmap.from(acc.bitmap); + } + } + + /** Built-in BITMAP_XOR_CARDINALITY_AGG aggregate function that returns cardinality. */ + public static final class BitmapXorCardinalityAggFunction + extends AbstractBitmapXorAggFunction { + + public BitmapXorCardinalityAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapXorAccumulator acc) { + return acc.bitmap == null ? null : acc.bitmap.getLongCardinality(); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java new file mode 100644 index 0000000000000..fed8174dc34c9 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/AbstractBitmapXorWithRetractAggFunction.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Abstract base class for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG with retraction. */ +@Internal +public abstract class AbstractBitmapXorWithRetractAggFunction + extends BuiltInAggregateFunction< + T, AbstractBitmapXorWithRetractAggFunction.BitmapXorWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public AbstractBitmapXorWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapXorWithRetractAccumulator.class, + DataTypes.FIELD("bitmapCount", DataTypes.INT().notNull()), + DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull())); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_XOR_AGG and BITMAP_XOR_CARDINALITY_AGG with retraction. */ + public static class BitmapXorWithRetractAccumulator { + + public int bitmapCount = 0; + public RoaringBitmapData bitmap = RoaringBitmapData.empty(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapXorWithRetractAccumulator that = (BitmapXorWithRetractAccumulator) obj; + return bitmapCount == that.bitmapCount && Objects.equals(bitmap, that.bitmap); + } + + @Override + public int hashCode() { + return Objects.hash(bitmapCount, bitmap); + } + } + + @Override + public BitmapXorWithRetractAccumulator createAccumulator() { + return new BitmapXorWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapXorWithRetractAccumulator acc) { + acc.bitmapCount = 0; + acc.bitmap.clear(); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapXorWithRetractAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + acc.bitmapCount++; + acc.bitmap.xor(bitmap); + } + + public void retract(BitmapXorWithRetractAccumulator acc, @Nullable Bitmap bitmap) { + if (bitmap == null) { + return; + } + + acc.bitmapCount--; + acc.bitmap.xor(bitmap); + } + + public void merge( + BitmapXorWithRetractAccumulator acc, Iterable its) { + for (BitmapXorWithRetractAccumulator other : its) { + acc.bitmapCount += other.bitmapCount; + acc.bitmap.xor(other.bitmap); + } + } + + // -------------------------------------------------------------------------------------------- + // Sub-classes + // -------------------------------------------------------------------------------------------- + + /** Built-in BITMAP_XOR_AGG with retraction aggregate function that returns bitmap. */ + public static final class BitmapXorWithRetractAggFunction + extends AbstractBitmapXorWithRetractAggFunction { + + public BitmapXorWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + @Override + public Bitmap getValue(BitmapXorWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : RoaringBitmapData.from(acc.bitmap); + } + } + + /** + * Built-in BITMAP_XOR_CARDINALITY_AGG with retraction aggregate function that returns + * cardinality. + */ + public static final class BitmapXorCardinalityWithRetractAggFunction + extends AbstractBitmapXorWithRetractAggFunction { + + public BitmapXorCardinalityWithRetractAggFunction(LogicalType valueType) { + super(valueType); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BIGINT(); + } + + @Override + public Long getValue(BitmapXorWithRetractAccumulator acc) { + return acc.bitmapCount <= 0 ? null : acc.bitmap.getLongCardinality(); + } + } +}