Skip to content

branch-4.1: [fix](streamingjob) fix postgres DML silently dropped on task restart #61481#61564

Open
github-actions[bot] wants to merge 1 commit intobranch-4.1from
auto-pick-61481-branch-4.1
Open

branch-4.1: [fix](streamingjob) fix postgres DML silently dropped on task restart #61481#61564
github-actions[bot] wants to merge 1 commit intobranch-4.1from
auto-pick-61481-branch-4.1

Conversation

@github-actions
Copy link
Contributor

Cherry-picked from #61481

…#61481)

### What problem does this PR solve?

#### Problem
When a streaming job restarts a task, the first DML of the new
transaction
is occasionally silently dropped (10-20% failure rate). The affected
record
never appears in the Doris target table, with no error logged — only
"identified as already processed" in cdc-client.log.
#### Root Cause
debezium 1.9.x hardcodes `proto_version=1` (non-streaming pgoutput) for
all
PG versions. In non-streaming mode, the walsender batches all changes of
a
transaction and sends them after COMMIT, and all messages (BEGIN + DML)
share
the same `XLogData.data_start` = the transaction's `begin_lsn`.
When this `begin_lsn` equals the previous transaction's `commit_lsn`
(i.e.
the two transactions are adjacent in WAL with no other writes between
them),
`WalPositionLocator` behaves incorrectly:
1. **Find phase**: `COMMIT(T1)` at `lsn=Y` sets
`storeLsnAfterLastEventStoredLsn=true`.
`BEGIN(T2)` and `INSERT(T2)` both have `lsn=Y`, so they keep returning
`Optional.empty()`. Only `COMMIT(T2)` at `lsn=Z` sets
`startStreamingLsn=Z`, with `lsnSeen={Y, Z}`.
2. **Actual streaming**: `INSERT(T2)` arrives with `lastReceiveLsn=Y`.
`skipMessage(Y)`: `Y ∈ lsnSeen` and `Y ≠ startStreamingLsn(Z)` →
filtered.
The bug is intermittent because it only triggers when no other WAL
activity
(autovacuum, other connections) occurs between the two transactions.
#### Fix
Override `extractBinlogStateOffset()` in `PostgresSourceReader` to strip
`lsn_proc` and `lsn_commit` from the offset before it is passed to
debezium.
This constructs `WalPositionLocator(lastCommitStoredLsn=null, lsn=Y)`,
which
causes the find phase to exit immediately at the first received message
(`startStreamingLsn=Y`). In actual streaming, `COMMIT(T1)` triggers
switch-off (`lastReceiveLsn=Y = startStreamingLsn`), and all subsequent
messages including `INSERT(T2)` pass through.
See https://issues.apache.org/jira/browse/FLINK-39265.
#### Test
Run `test_streaming_postgres_job` multiple times. Before this fix the
'Apache' assertion fails ~10-20% of the time; after this fix it passes
  consistently.
@github-actions github-actions bot requested a review from yiguolei as a code owner March 20, 2026 08:54
@hello-stephen
Copy link
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@dataroaring dataroaring reopened this Mar 20, 2026
@hello-stephen
Copy link
Contributor

run buildall

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants