From 8e0bed80be3b7385c7cc85477862f541cbcd82c5 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 20 Mar 2026 16:52:41 +0800 Subject: [PATCH] [fix](streamingjob) fix postgres DML silently dropped on task restart (#61481) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? #### Problem When a streaming job restarts a task, the first DML of the new transaction is occasionally silently dropped (10-20% failure rate). The affected record never appears in the Doris target table, with no error logged — only "identified as already processed" in cdc-client.log. #### Root Cause debezium 1.9.x hardcodes `proto_version=1` (non-streaming pgoutput) for all PG versions. In non-streaming mode, the walsender batches all changes of a transaction and sends them after COMMIT, and all messages (BEGIN + DML) share the same `XLogData.data_start` = the transaction's `begin_lsn`. When this `begin_lsn` equals the previous transaction's `commit_lsn` (i.e. the two transactions are adjacent in WAL with no other writes between them), `WalPositionLocator` behaves incorrectly: 1. **Find phase**: `COMMIT(T1)` at `lsn=Y` sets `storeLsnAfterLastEventStoredLsn=true`. `BEGIN(T2)` and `INSERT(T2)` both have `lsn=Y`, so they keep returning `Optional.empty()`. Only `COMMIT(T2)` at `lsn=Z` sets `startStreamingLsn=Z`, with `lsnSeen={Y, Z}`. 2. **Actual streaming**: `INSERT(T2)` arrives with `lastReceiveLsn=Y`. `skipMessage(Y)`: `Y ∈ lsnSeen` and `Y ≠ startStreamingLsn(Z)` → filtered. The bug is intermittent because it only triggers when no other WAL activity (autovacuum, other connections) occurs between the two transactions. #### Fix Override `extractBinlogStateOffset()` in `PostgresSourceReader` to strip `lsn_proc` and `lsn_commit` from the offset before it is passed to debezium. This constructs `WalPositionLocator(lastCommitStoredLsn=null, lsn=Y)`, which causes the find phase to exit immediately at the first received message (`startStreamingLsn=Y`). In actual streaming, `COMMIT(T1)` triggers switch-off (`lastReceiveLsn=Y = startStreamingLsn`), and all subsequent messages including `INSERT(T2)` pass through. See https://issues.apache.org/jira/browse/FLINK-39265. #### Test Run `test_streaming_postgres_job` multiple times. Before this fix the 'Apache' assertion fails ~10-20% of the time; after this fix it passes consistently. --- .../streaming/StreamingMultiTblTask.java | 2 +- .../reader/postgres/PostgresSourceReader.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 45d2cf2ffbb604..32526f9c513d6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -357,7 +357,7 @@ public String getTimeoutReason() { log.warn("Failed to get task timeout reason, response: {}", response); } } catch (ExecutionException | InterruptedException ex) { - log.error("Send get task fail reason request failed: ", ex); + log.warn("Send get task fail reason request failed: ", ex); } return ""; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 737e36045d9692..6a5670ad6dee62 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -64,6 +64,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.SourceInfo; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; @@ -430,6 +431,25 @@ && getCurrentFetchTask() instanceof PostgresStreamFetchTask) { } } + /** + * Strip lsn_proc and lsn_commit from the binlog state offset before it is passed to debezium's + * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1, used by debezium 1.9.x + * even on PG14), BEGIN and DML messages within a transaction share the same XLogData.data_start + * as the transaction's begin_lsn. When begin_lsn equals the previous transaction's commit_lsn + * (i.e. no other WAL write exists between them), WalPositionLocator adds that lsn to lsnSeen + * during the find phase and then incorrectly filters the DML as already-processed during actual + * streaming. Removing these keys sets lastCommitStoredLsn=null, so the find phase exits + * immediately at the first received message and switch-off happens before any DML is filtered. + * See https://issues.apache.org/jira/browse/FLINK-39265. + */ + @Override + public Map extractBinlogStateOffset(Object splitState) { + Map offset = super.extractBinlogStateOffset(splitState); + offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); + offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); + return offset; + } + @Override public void close(JobBaseConfig jobConfig) { super.close(jobConfig);