branch-4.1: [fix](streamingjob) fix postgres DML silently dropped on task restart #61481#61564
Open
github-actions[bot] wants to merge 1 commit intobranch-4.1from
Open
branch-4.1: [fix](streamingjob) fix postgres DML silently dropped on task restart #61481#61564github-actions[bot] wants to merge 1 commit intobranch-4.1from
github-actions[bot] wants to merge 1 commit intobranch-4.1from
Conversation
…#61481) ### What problem does this PR solve? #### Problem When a streaming job restarts a task, the first DML of the new transaction is occasionally silently dropped (10-20% failure rate). The affected record never appears in the Doris target table, with no error logged — only "identified as already processed" in cdc-client.log. #### Root Cause debezium 1.9.x hardcodes `proto_version=1` (non-streaming pgoutput) for all PG versions. In non-streaming mode, the walsender batches all changes of a transaction and sends them after COMMIT, and all messages (BEGIN + DML) share the same `XLogData.data_start` = the transaction's `begin_lsn`. When this `begin_lsn` equals the previous transaction's `commit_lsn` (i.e. the two transactions are adjacent in WAL with no other writes between them), `WalPositionLocator` behaves incorrectly: 1. **Find phase**: `COMMIT(T1)` at `lsn=Y` sets `storeLsnAfterLastEventStoredLsn=true`. `BEGIN(T2)` and `INSERT(T2)` both have `lsn=Y`, so they keep returning `Optional.empty()`. Only `COMMIT(T2)` at `lsn=Z` sets `startStreamingLsn=Z`, with `lsnSeen={Y, Z}`. 2. **Actual streaming**: `INSERT(T2)` arrives with `lastReceiveLsn=Y`. `skipMessage(Y)`: `Y ∈ lsnSeen` and `Y ≠ startStreamingLsn(Z)` → filtered. The bug is intermittent because it only triggers when no other WAL activity (autovacuum, other connections) occurs between the two transactions. #### Fix Override `extractBinlogStateOffset()` in `PostgresSourceReader` to strip `lsn_proc` and `lsn_commit` from the offset before it is passed to debezium. This constructs `WalPositionLocator(lastCommitStoredLsn=null, lsn=Y)`, which causes the find phase to exit immediately at the first received message (`startStreamingLsn=Y`). In actual streaming, `COMMIT(T1)` triggers switch-off (`lastReceiveLsn=Y = startStreamingLsn`), and all subsequent messages including `INSERT(T2)` pass through. See https://issues.apache.org/jira/browse/FLINK-39265. #### Test Run `test_streaming_postgres_job` multiple times. Before this fix the 'Apache' assertion fails ~10-20% of the time; after this fix it passes consistently.
Contributor
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
Contributor
|
run buildall |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Cherry-picked from #61481