diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 45d2cf2ffbb604..32526f9c513d6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -357,7 +357,7 @@ public String getTimeoutReason() { log.warn("Failed to get task timeout reason, response: {}", response); } } catch (ExecutionException | InterruptedException ex) { - log.error("Send get task fail reason request failed: ", ex); + log.warn("Send get task fail reason request failed: ", ex); } return ""; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 737e36045d9692..6a5670ad6dee62 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -64,6 +64,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.SourceInfo; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; @@ -430,6 +431,25 @@ && getCurrentFetchTask() instanceof PostgresStreamFetchTask) { } } + /** + * Strip lsn_proc and lsn_commit from the binlog state offset before it is passed to debezium's + * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1, used by debezium 1.9.x + * even on PG14), BEGIN and DML messages within a transaction share the same XLogData.data_start + * as the transaction's begin_lsn. When begin_lsn equals the previous transaction's commit_lsn + * (i.e. no other WAL write exists between them), WalPositionLocator adds that lsn to lsnSeen + * during the find phase and then incorrectly filters the DML as already-processed during actual + * streaming. Removing these keys sets lastCommitStoredLsn=null, so the find phase exits + * immediately at the first received message and switch-off happens before any DML is filtered. + * See https://issues.apache.org/jira/browse/FLINK-39265. + */ + @Override + public Map extractBinlogStateOffset(Object splitState) { + Map offset = super.extractBinlogStateOffset(splitState); + offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); + offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); + return offset; + } + @Override public void close(JobBaseConfig jobConfig) { super.close(jobConfig);