Skip to content

feat: add WebSocket auto-reconnect with head-based polling#21

Closed
calj wants to merge 4 commits intomasterfrom
feat/websocket-reconnect
Closed

feat: add WebSocket auto-reconnect with head-based polling#21
calj wants to merge 4 commits intomasterfrom
feat/websocket-reconnect

Conversation

@calj
Copy link
Contributor

@calj calj commented Mar 6, 2026

Rework the event listener to subscribe to new block headers (SubscribeNewHead) instead of SubscribeFilterLogs, then reconcile logs per block range. This makes the listener resilient to WebSocket disconnections (e.g. Infura error 1006).

The service goroutine now loops: on first run it uses the existing client; on subsequent iterations it re-dials via RPCURL, re-binds the contract, and resumes from the persisted cursor. When RPCURL is empty (simulated backend in tests) it exits cleanly instead of attempting to dial.

Also fixes two bugs in waitForBackOffTimeout:

  • return false (not true) when max retries exceeded
  • use 1<<n (bit-shift) instead of 2^n (XOR) for exponential backoff

Summary by CodeRabbit

  • New Features

    • Switch to head-driven updates with a one-time initial sync to the current head for faster catch-up and consistent state.
  • Bug Fixes

    • Watchers now persistently reconnect and resume from the last cursor after reconnects.
    • Subscription failures cause clean restarts and clearer error handling.
    • Reconciliation advances progress even when no matching logs are found.
    • Backoff logic refined to stop after repeated failures and use more predictable retry timing.

@coderabbitai
Copy link

coderabbitai bot commented Mar 6, 2026

Warning

Rate limit exceeded

@calj has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 5 minutes and 31 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ce714180-329d-494a-bcc2-8c4f9cb0cb6e

📥 Commits

Reviewing files that changed from the base of the PR and between 48c1011 and b534854.

📒 Files selected for processing (1)
  • service/service.go
📝 Walkthrough

Walkthrough

Replaces log-channel-driven streaming with a head-based subscription in custody/listener.go, introducing header-driven reconciliation and a handler-based reconcile API; refactors service/service.go into a persistent, reconnect-capable WithdrawStarted watcher that resumes from the stored cursor.

Changes

Cohort / File(s) Summary
Head-based subscription & reconciliation
custody/listener.go
Adds headerSubscriber and headCh, subscribes to new heads, performs initial sync to latest header, and triggers reconcileBlockRange on head events; removes prior historical/current log streaming path.
reconcileBlockRange API & behavior
custody/listener.go
Changes signature to accept a logHandler and return (finishedBlock, finishedIndex); advances cursor even with no matches and uses math.MaxUint32 semantics for end-of-range index.
Subscription/backoff logic
custody/listener.go
Updates waitForBackOffTimeout to use 1<<backOffCount and return false when backOffCount > maxBackOffCount; resets backoff on successful subscription and handles clients without SubscribeNewHead.
Reconnect-capable WithdrawStarted watcher
service/service.go
Converts single-run watcher into a loop that supports RPC re-dial, swaps ethClient, re-binds contract and listener, reads/resumes from GetCursor("withdraw_started") (or StartBlock), and logs lifecycle events.
Comments & scaffolding
custody/listener.go
Adds interface scaffolding, comments, and safeguards around head subscription capability and subscription failure handling.

Sequence Diagrams

sequenceDiagram
    participant Listener
    participant Blockchain as Blockchain/RPC
    participant Handler as Log Handler
    rect rgba(200,200,255,0.5)
    Listener->>Blockchain: SubscribeNewHead()
    activate Blockchain
    Listener->>Listener: Fetch latest header (initial sync)
    Listener->>Listener: Compute targetBlock
    Listener->>Listener: reconcileBlockRange(targetBlock, handler)
    Listener->>Handler: Deliver logs via handler
    Handler-->>Listener: Acknowledge
    end
    loop On new header event
        Blockchain-->>Listener: New header notification
        Listener->>Listener: reconcileBlockRange(newHead, handler)
        Listener->>Handler: Deliver logs via handler
        Handler-->>Listener: Acknowledge
    end
    deactivate Blockchain
Loading
sequenceDiagram
    participant Service
    participant RPC as Ethereum RPC
    participant Contract as Withdraw Contract
    participant Listener as Event Listener
    rect rgba(200,255,200,0.5)
    Service->>Service: firstRun = true
    loop Watcher loop
        alt First run
            Service->>Service: GetCursor("withdraw_started") or StartBlock
        else Reconnect iteration
            Service->>RPC: Re-dial RPC endpoint
            activate RPC
            RPC-->>Service: Connection established
            deactivate RPC
            Service->>Service: Swap ethClient
            Service->>Contract: Re-bind contract
            Service->>Listener: Recreate listener
        end
        Service->>Listener: Start watcher from cursor
        Listener-->>Service: Events / processing
    end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I hopped from logs to headers bright,
Subscribed to heads at morning light,
Reconciled ranges, then leapt anew,
The watcher woke and stitched the queue,
🥕 A rabbit's cheer for resilient glue!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main changes: adding WebSocket auto-reconnect capability and implementing head-based polling for event listening. It aligns with the primary objectives of making the listener resilient to disconnections and improving event reconciliation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/websocket-reconnect

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@custody/listener.go`:
- Around line 238-252: reconcileBlockRange currently doesn't report where it
finished, so callers reset the cursor to (targetBlock, 0) which causes replays
of already-processed logs in a head block; change reconcileBlockRange to return
the final cursor (e.g., (finishedBlock, finishedIndex) or a sentinel indicating
the block was fully consumed) and update all call sites that pass targetBlock
(notably the calls around the blocks handling head sync and catch-up — the sites
shown near the current diff and the other call sites you flagged) to set
lastBlock and lastIndex from that return value instead of unconditionally
setting lastBlock=targetBlock and lastIndex=0; ensure the return contract
distinguishes “block fully consumed” versus “stopped at specific index” so
restarts and mid-block restarts don't reprocess logs.

In `@service/service.go`:
- Around line 188-190: The reconnect logic calls ethclient.Dial which can block
indefinitely; replace that call with ethclient.DialContext using the existing
ctx from the errgroup so reconnects respect cancellation and shutdown. Update
the code where svc.Logger.Info("Reconnecting to Ethereum RPC...") and newClient,
err := ethclient.Dial(svc.Config.Blockchain.RPCURL) are used to instead call
ethclient.DialContext(ctx, svc.Config.Blockchain.RPCURL), propagate the err
handling unchanged, and ensure any variables (newClient, err) remain the same to
minimize other changes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2905399c-a223-4039-b4aa-95cf99f55fe1

📥 Commits

Reviewing files that changed from the base of the PR and between 2c96d24 and 745a014.

📒 Files selected for processing (2)
  • custody/listener.go
  • service/service.go

Rework the event listener to subscribe to new block headers
(SubscribeNewHead) instead of SubscribeFilterLogs, then reconcile
logs per block range. This makes the listener resilient to WebSocket
disconnections (e.g. Infura error 1006).

The service goroutine now loops: on first run it uses the existing
client; on subsequent iterations it re-dials via RPCURL, re-binds
the contract, and resumes from the persisted cursor. When RPCURL is
empty (simulated backend in tests) it exits cleanly instead of
attempting to dial.

Also fixes two bugs in waitForBackOffTimeout:
- return false (not true) when max retries exceeded
- use 1<<n (bit-shift) instead of 2^n (XOR) for exponential backoff
@calj calj force-pushed the feat/websocket-reconnect branch from 745a014 to b487887 Compare March 6, 2026 15:37
calj added 2 commits March 6, 2026 16:43
reconcileBlockRange previously returned nothing, so callers
unconditionally set lastBlock=targetBlock and lastIndex=0 after each
call. This caused already-processed logs to be re-delivered when the
next head arrived and re-queried the same block.

The function now returns (finishedBlock, finishedIndex):
- When logs are dispatched: the block/index of the last processed log.
- When a batch has no matching logs: (endBlock, math.MaxUint32) to
  signal the block was fully consumed and all its logs should be
  skipped on the next pass.
- On early exit (context cancelled, backoff exhausted): the cursor
  from entry, so no false progress is claimed.

Also exits the listener immediately on SubscribeNewHead failure
instead of retrying on a dead client, allowing the Service-level
reconnection loop to dial a fresh connection.
ethclient.Dial can block indefinitely on DNS or TCP handshake.
Replace with ethclient.DialContext(ctx, ...) so the reconnect
attempt is interrupted when the service context is cancelled.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
custody/listener.go (1)

241-255: ⚠️ Potential issue | 🟠 Major

Track where reconciliation actually finished before moving the cursor.

Still seeing the same cursor issue here: reconcileBlockRange doesn't report a final cursor or completion flag, so both call sites assume (targetBlock, 0) was fully processed. That replays later logs from targetBlock on the next head, and if reconciliation returns early it can also skip an unreconciled suffix permanently. It also means a restart at the current head still can't drain the remainder of that block until a newer head arrives.

Also applies to: 268-282

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@custody/listener.go` around lines 241 - 255, reconcileBlockRange currently
doesn't return progress, so callers (the two sites invoking reconcileBlockRange
around updating lastBlock/lastIndex) assume the entire targetBlock was
processed; change reconcileBlockRange to return a final cursor (e.g.,
(finalBlock int64, finalIndex int) and/or a completion boolean) indicating where
reconciliation actually finished, update both call sites to set lastBlock =
finalBlock and lastIndex = finalIndex (or advance only if completed) instead of
unconditionally using (targetBlock, 0), and ensure reconcileBlockRange
updates/returns the cursor whenever it exits early so restarts and subsequent
heads resume correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@service/service.go`:
- Around line 235-243: The reconnection loop currently immediately restarts when
the withdrawals channel (created and passed into
svc.listener.WatchWithdrawStarted) closes, which can cause tight reconnect
storms on subscription failures; modify the loop that reads from withdrawals
(the for event := range withdrawals { svc.processWithdrawal(ctx, event) }) to
perform a short backoff before retrying (e.g., time.Sleep or an exponential
backoff) unless ctx is done; implement the delay after the range ends and before
re-invoking svc.listener.WatchWithdrawStarted, checking ctx.Err() (or select on
ctx.Done()) to avoid sleeping when cancellation was requested.
- Around line 225-231: The code currently treats a failed
svc.store.GetCursor("withdraw_started") as if it returned valid zero values,
allowing fromBlock to fall back to StartBlock and replay the stream; change the
logic in the block that calls GetCursor so that if err != nil you do not proceed
with zero defaults — instead surface the error as a retryable startup failure
for this iteration (e.g., return the error from the enclosing function or
skip/continue the loop) after logging it via svc.Logger.Warn or
svc.Logger.Error; ensure you only apply the fromBlock =
svc.Config.Blockchain.StartBlock fallback when GetCursor succeeded (no error) so
the variables returned by GetCursor (fromBlock, fromLogIdx) are only used on
success.

---

Duplicate comments:
In `@custody/listener.go`:
- Around line 241-255: reconcileBlockRange currently doesn't return progress, so
callers (the two sites invoking reconcileBlockRange around updating
lastBlock/lastIndex) assume the entire targetBlock was processed; change
reconcileBlockRange to return a final cursor (e.g., (finalBlock int64,
finalIndex int) and/or a completion boolean) indicating where reconciliation
actually finished, update both call sites to set lastBlock = finalBlock and
lastIndex = finalIndex (or advance only if completed) instead of unconditionally
using (targetBlock, 0), and ensure reconcileBlockRange updates/returns the
cursor whenever it exits early so restarts and subsequent heads resume
correctly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: caa4acad-1146-4e51-afc8-efd64a442077

📥 Commits

Reviewing files that changed from the base of the PR and between 745a014 and b487887.

📒 Files selected for processing (2)
  • custody/listener.go
  • service/service.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
service/service.go (2)

224-231: ⚠️ Potential issue | 🟠 Major

Don't start from zero when the cursor lookup fails.

If GetCursor("withdraw_started") errors, the code falls through with zero values, potentially replaying the entire stream from StartBlock. This was flagged in a previous review but remains unaddressed. Treat cursor lookup failure as a retryable error instead of silently resuming from zero.

🔧 Suggested fix
 			fromBlock, fromLogIdx, err := svc.store.GetCursor("withdraw_started")
 			if err != nil {
-				svc.Logger.Warn("Failed to read withdraw_started cursor", "error", err)
+				svc.Logger.Error("Failed to read withdraw_started cursor, retrying", "error", err)
+				select {
+				case <-time.After(5 * time.Second):
+					continue
+				case <-ctx.Done():
+					return nil
+				}
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@service/service.go` around lines 224 - 231, The code currently treats a
failed svc.store.GetCursor("withdraw_started") as if it returned a zero cursor
and may fall back to svc.Config.Blockchain.StartBlock; instead, handle cursor
lookup failures as retryable errors: if svc.store.GetCursor("withdraw_started")
returns a non-nil err, propagate or return that error (do not continue with
fromBlock/fromLogIdx zero values) so the caller can retry; update the block in
service.go to check err != nil and return or wrap the error (referencing
svc.store.GetCursor, fromBlock, fromLogIdx) rather than silently falling through
to the StartBlock fallback.

235-243: ⚠️ Potential issue | 🟠 Major

Add backoff before reconnecting after watcher closes.

When WatchWithdrawStarted returns (e.g., subscription setup failure in custody/listener.go lines 215-216), this loop immediately reconnects with no pause. A bad endpoint can spin hot and flood reconnect attempts. This was flagged in a previous review but remains unaddressed.

🔧 Suggested fix
 			for event := range withdrawals {
 				svc.processWithdrawal(ctx, event)
 			}
 
-			// Watcher returned — loop back to reconnect (or exit).
+			// Watcher returned — back off before reconnecting.
+			select {
+			case <-time.After(5 * time.Second):
+			case <-ctx.Done():
+				return nil
+			}
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@service/service.go` around lines 235 - 243, The loop that calls
svc.listener.WatchWithdrawStarted and then ranges over the withdrawals channel
should throttle reconnect attempts when the watcher returns; modify the logic
around WatchWithdrawStarted/withdrawals in the loop to wait with a backoff
(e.g., exponential backoff with jitter) before retrying, honoring ctx
cancellation so it exits promptly; specifically add a backoff timer between the
end of the for-range over withdrawals and the next invocation of
svc.listener.WatchWithdrawStarted (or when the channel is closed due to
subscription/setup failure), so reconnects are delayed and progressively
increased on repeated failures while still calling svc.processWithdrawal for
incoming events.
🧹 Nitpick comments (1)
service/service.go (1)

200-204: Unsynchronized access to svc.ethClient during shutdown.

The reconnection loop (lines 200-204) swaps svc.ethClient without synchronization, while the shutdown goroutine (lines 254-259) unconditionally closes svc.ethClient on context cancellation. If a reconnection completes just before shutdown, the shutdown goroutine may close the newly-dialed client while other operations are still in-flight, or conversely, the old client may already be closed at line 202.

In practice, this is likely benign since the context is cancelled and all operations are unwinding. However, consider either:

  1. Removing the shutdown goroutine's Close() call (since line 202 already closes old clients), or
  2. Using a mutex around client swaps and the final close.

Also applies to: 254-259

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@service/service.go` around lines 200 - 204, The swap and close of
svc.ethClient in the reconnection loop (the block that currently does "if
svc.ethClient != nil { svc.ethClient.Close() } svc.ethClient = newClient") races
with the shutdown goroutine that unconditionally calls svc.ethClient.Close() on
context cancellation; protect all accesses and the final close with a mutex to
avoid closing a client concurrently or swapping under an in-flight close. Add a
mutex (e.g., ethClientMu sync.Mutex or sync.RWMutex) as a field on svc, acquire
the lock before reading/closing/swapping svc.ethClient in the reconnection code
path and also acquire it in the shutdown goroutine before calling
svc.ethClient.Close(), and release after the operation so the swap and final
close are serialized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@service/service.go`:
- Around line 224-231: The code currently treats a failed
svc.store.GetCursor("withdraw_started") as if it returned a zero cursor and may
fall back to svc.Config.Blockchain.StartBlock; instead, handle cursor lookup
failures as retryable errors: if svc.store.GetCursor("withdraw_started") returns
a non-nil err, propagate or return that error (do not continue with
fromBlock/fromLogIdx zero values) so the caller can retry; update the block in
service.go to check err != nil and return or wrap the error (referencing
svc.store.GetCursor, fromBlock, fromLogIdx) rather than silently falling through
to the StartBlock fallback.
- Around line 235-243: The loop that calls svc.listener.WatchWithdrawStarted and
then ranges over the withdrawals channel should throttle reconnect attempts when
the watcher returns; modify the logic around WatchWithdrawStarted/withdrawals in
the loop to wait with a backoff (e.g., exponential backoff with jitter) before
retrying, honoring ctx cancellation so it exits promptly; specifically add a
backoff timer between the end of the for-range over withdrawals and the next
invocation of svc.listener.WatchWithdrawStarted (or when the channel is closed
due to subscription/setup failure), so reconnects are delayed and progressively
increased on repeated failures while still calling svc.processWithdrawal for
incoming events.

---

Nitpick comments:
In `@service/service.go`:
- Around line 200-204: The swap and close of svc.ethClient in the reconnection
loop (the block that currently does "if svc.ethClient != nil {
svc.ethClient.Close() } svc.ethClient = newClient") races with the shutdown
goroutine that unconditionally calls svc.ethClient.Close() on context
cancellation; protect all accesses and the final close with a mutex to avoid
closing a client concurrently or swapping under an in-flight close. Add a mutex
(e.g., ethClientMu sync.Mutex or sync.RWMutex) as a field on svc, acquire the
lock before reading/closing/swapping svc.ethClient in the reconnection code path
and also acquire it in the shutdown goroutine before calling
svc.ethClient.Close(), and release after the operation so the swap and final
close are serialized.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9234975f-d17e-45a0-ad07-b09ba6f8dbc5

📥 Commits

Reviewing files that changed from the base of the PR and between b487887 and 48c1011.

📒 Files selected for processing (2)
  • custody/listener.go
  • service/service.go

- GetCursor failure now retries after 5s instead of proceeding with
  zeroed cursor values that could cause a full replay from StartBlock.
- After the event watcher channel closes, wait 5s before attempting
  reconnection to prevent tight reconnect storms when the listener
  exits immediately (e.g. instant subscription failure).

Both delays respect context cancellation for clean shutdown.
@calj calj closed this Mar 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant