From 139827c7a21a4042b584bb4db383d3e7cc9d89be Mon Sep 17 00:00:00 2001 From: gregsacar Date: Tue, 17 Mar 2026 17:52:25 -0700 Subject: [PATCH] [SPARK] Fix streaming writer overwriting batch data by scoping OVERWRITE_BY_FILTER to batch only --- .../paimon/spark/PaimonSparkTableBase.scala | 3 +- .../apache/paimon/spark/PaimonSinkTest.scala | 50 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala index 1732d6778de4..0fc4bd9eb5f7 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala @@ -92,17 +92,18 @@ abstract class PaimonSparkTableBase(val table: Table) override def capabilities: JSet[TableCapability] = { val capabilities = JEnumSet.of( TableCapability.BATCH_READ, - TableCapability.OVERWRITE_BY_FILTER, TableCapability.MICRO_BATCH_READ ) if (useV2Write) { capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA) capabilities.add(TableCapability.BATCH_WRITE) + capabilities.add(TableCapability.OVERWRITE_BY_FILTER) capabilities.add(TableCapability.OVERWRITE_DYNAMIC) } else { capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA) capabilities.add(TableCapability.V1_BATCH_WRITE) + capabilities.add(TableCapability.OVERWRITE_BY_FILTER) } capabilities diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index c43170d7ba1b..4d5ea2d9a434 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -362,4 +362,54 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { } } } + + test("Paimon Sink: batch then stream should not overwrite batch data") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // create a primary key table + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + // Phase 1 - batch insert + spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')") + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + // Phase 2 - streaming should append, not overwrite + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], id: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + try { + inputData.addData((4, "d")) + stream.processAllAvailable() + // batch data must still be present alongside stream data + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) + + inputData.addData((5, "e")) + stream.processAllAvailable() + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Row(5, "e") :: Nil) + } finally { + stream.stop() + } + } + } + } }