feat: add WebSocket auto-reconnect with head-based polling#21
feat: add WebSocket auto-reconnect with head-based polling#21
Conversation
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughReplaces log-channel-driven streaming with a head-based subscription in Changes
Sequence DiagramssequenceDiagram
participant Listener
participant Blockchain as Blockchain/RPC
participant Handler as Log Handler
rect rgba(200,200,255,0.5)
Listener->>Blockchain: SubscribeNewHead()
activate Blockchain
Listener->>Listener: Fetch latest header (initial sync)
Listener->>Listener: Compute targetBlock
Listener->>Listener: reconcileBlockRange(targetBlock, handler)
Listener->>Handler: Deliver logs via handler
Handler-->>Listener: Acknowledge
end
loop On new header event
Blockchain-->>Listener: New header notification
Listener->>Listener: reconcileBlockRange(newHead, handler)
Listener->>Handler: Deliver logs via handler
Handler-->>Listener: Acknowledge
end
deactivate Blockchain
sequenceDiagram
participant Service
participant RPC as Ethereum RPC
participant Contract as Withdraw Contract
participant Listener as Event Listener
rect rgba(200,255,200,0.5)
Service->>Service: firstRun = true
loop Watcher loop
alt First run
Service->>Service: GetCursor("withdraw_started") or StartBlock
else Reconnect iteration
Service->>RPC: Re-dial RPC endpoint
activate RPC
RPC-->>Service: Connection established
deactivate RPC
Service->>Service: Swap ethClient
Service->>Contract: Re-bind contract
Service->>Listener: Recreate listener
end
Service->>Listener: Start watcher from cursor
Listener-->>Service: Events / processing
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@custody/listener.go`:
- Around line 238-252: reconcileBlockRange currently doesn't report where it
finished, so callers reset the cursor to (targetBlock, 0) which causes replays
of already-processed logs in a head block; change reconcileBlockRange to return
the final cursor (e.g., (finishedBlock, finishedIndex) or a sentinel indicating
the block was fully consumed) and update all call sites that pass targetBlock
(notably the calls around the blocks handling head sync and catch-up — the sites
shown near the current diff and the other call sites you flagged) to set
lastBlock and lastIndex from that return value instead of unconditionally
setting lastBlock=targetBlock and lastIndex=0; ensure the return contract
distinguishes “block fully consumed” versus “stopped at specific index” so
restarts and mid-block restarts don't reprocess logs.
In `@service/service.go`:
- Around line 188-190: The reconnect logic calls ethclient.Dial which can block
indefinitely; replace that call with ethclient.DialContext using the existing
ctx from the errgroup so reconnects respect cancellation and shutdown. Update
the code where svc.Logger.Info("Reconnecting to Ethereum RPC...") and newClient,
err := ethclient.Dial(svc.Config.Blockchain.RPCURL) are used to instead call
ethclient.DialContext(ctx, svc.Config.Blockchain.RPCURL), propagate the err
handling unchanged, and ensure any variables (newClient, err) remain the same to
minimize other changes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2905399c-a223-4039-b4aa-95cf99f55fe1
📒 Files selected for processing (2)
custody/listener.goservice/service.go
Rework the event listener to subscribe to new block headers (SubscribeNewHead) instead of SubscribeFilterLogs, then reconcile logs per block range. This makes the listener resilient to WebSocket disconnections (e.g. Infura error 1006). The service goroutine now loops: on first run it uses the existing client; on subsequent iterations it re-dials via RPCURL, re-binds the contract, and resumes from the persisted cursor. When RPCURL is empty (simulated backend in tests) it exits cleanly instead of attempting to dial. Also fixes two bugs in waitForBackOffTimeout: - return false (not true) when max retries exceeded - use 1<<n (bit-shift) instead of 2^n (XOR) for exponential backoff
745a014 to
b487887
Compare
reconcileBlockRange previously returned nothing, so callers unconditionally set lastBlock=targetBlock and lastIndex=0 after each call. This caused already-processed logs to be re-delivered when the next head arrived and re-queried the same block. The function now returns (finishedBlock, finishedIndex): - When logs are dispatched: the block/index of the last processed log. - When a batch has no matching logs: (endBlock, math.MaxUint32) to signal the block was fully consumed and all its logs should be skipped on the next pass. - On early exit (context cancelled, backoff exhausted): the cursor from entry, so no false progress is claimed. Also exits the listener immediately on SubscribeNewHead failure instead of retrying on a dead client, allowing the Service-level reconnection loop to dial a fresh connection.
ethclient.Dial can block indefinitely on DNS or TCP handshake. Replace with ethclient.DialContext(ctx, ...) so the reconnect attempt is interrupted when the service context is cancelled.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
custody/listener.go (1)
241-255:⚠️ Potential issue | 🟠 MajorTrack where reconciliation actually finished before moving the cursor.
Still seeing the same cursor issue here:
reconcileBlockRangedoesn't report a final cursor or completion flag, so both call sites assume(targetBlock, 0)was fully processed. That replays later logs fromtargetBlockon the next head, and if reconciliation returns early it can also skip an unreconciled suffix permanently. It also means a restart at the current head still can't drain the remainder of that block until a newer head arrives.Also applies to: 268-282
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@custody/listener.go` around lines 241 - 255, reconcileBlockRange currently doesn't return progress, so callers (the two sites invoking reconcileBlockRange around updating lastBlock/lastIndex) assume the entire targetBlock was processed; change reconcileBlockRange to return a final cursor (e.g., (finalBlock int64, finalIndex int) and/or a completion boolean) indicating where reconciliation actually finished, update both call sites to set lastBlock = finalBlock and lastIndex = finalIndex (or advance only if completed) instead of unconditionally using (targetBlock, 0), and ensure reconcileBlockRange updates/returns the cursor whenever it exits early so restarts and subsequent heads resume correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@service/service.go`:
- Around line 235-243: The reconnection loop currently immediately restarts when
the withdrawals channel (created and passed into
svc.listener.WatchWithdrawStarted) closes, which can cause tight reconnect
storms on subscription failures; modify the loop that reads from withdrawals
(the for event := range withdrawals { svc.processWithdrawal(ctx, event) }) to
perform a short backoff before retrying (e.g., time.Sleep or an exponential
backoff) unless ctx is done; implement the delay after the range ends and before
re-invoking svc.listener.WatchWithdrawStarted, checking ctx.Err() (or select on
ctx.Done()) to avoid sleeping when cancellation was requested.
- Around line 225-231: The code currently treats a failed
svc.store.GetCursor("withdraw_started") as if it returned valid zero values,
allowing fromBlock to fall back to StartBlock and replay the stream; change the
logic in the block that calls GetCursor so that if err != nil you do not proceed
with zero defaults — instead surface the error as a retryable startup failure
for this iteration (e.g., return the error from the enclosing function or
skip/continue the loop) after logging it via svc.Logger.Warn or
svc.Logger.Error; ensure you only apply the fromBlock =
svc.Config.Blockchain.StartBlock fallback when GetCursor succeeded (no error) so
the variables returned by GetCursor (fromBlock, fromLogIdx) are only used on
success.
---
Duplicate comments:
In `@custody/listener.go`:
- Around line 241-255: reconcileBlockRange currently doesn't return progress, so
callers (the two sites invoking reconcileBlockRange around updating
lastBlock/lastIndex) assume the entire targetBlock was processed; change
reconcileBlockRange to return a final cursor (e.g., (finalBlock int64,
finalIndex int) and/or a completion boolean) indicating where reconciliation
actually finished, update both call sites to set lastBlock = finalBlock and
lastIndex = finalIndex (or advance only if completed) instead of unconditionally
using (targetBlock, 0), and ensure reconcileBlockRange updates/returns the
cursor whenever it exits early so restarts and subsequent heads resume
correctly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: caa4acad-1146-4e51-afc8-efd64a442077
📒 Files selected for processing (2)
custody/listener.goservice/service.go
There was a problem hiding this comment.
♻️ Duplicate comments (2)
service/service.go (2)
224-231:⚠️ Potential issue | 🟠 MajorDon't start from zero when the cursor lookup fails.
If
GetCursor("withdraw_started")errors, the code falls through with zero values, potentially replaying the entire stream fromStartBlock. This was flagged in a previous review but remains unaddressed. Treat cursor lookup failure as a retryable error instead of silently resuming from zero.🔧 Suggested fix
fromBlock, fromLogIdx, err := svc.store.GetCursor("withdraw_started") if err != nil { - svc.Logger.Warn("Failed to read withdraw_started cursor", "error", err) + svc.Logger.Error("Failed to read withdraw_started cursor, retrying", "error", err) + select { + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return nil + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@service/service.go` around lines 224 - 231, The code currently treats a failed svc.store.GetCursor("withdraw_started") as if it returned a zero cursor and may fall back to svc.Config.Blockchain.StartBlock; instead, handle cursor lookup failures as retryable errors: if svc.store.GetCursor("withdraw_started") returns a non-nil err, propagate or return that error (do not continue with fromBlock/fromLogIdx zero values) so the caller can retry; update the block in service.go to check err != nil and return or wrap the error (referencing svc.store.GetCursor, fromBlock, fromLogIdx) rather than silently falling through to the StartBlock fallback.
235-243:⚠️ Potential issue | 🟠 MajorAdd backoff before reconnecting after watcher closes.
When
WatchWithdrawStartedreturns (e.g., subscription setup failure incustody/listener.golines 215-216), this loop immediately reconnects with no pause. A bad endpoint can spin hot and flood reconnect attempts. This was flagged in a previous review but remains unaddressed.🔧 Suggested fix
for event := range withdrawals { svc.processWithdrawal(ctx, event) } - // Watcher returned — loop back to reconnect (or exit). + // Watcher returned — back off before reconnecting. + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return nil + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@service/service.go` around lines 235 - 243, The loop that calls svc.listener.WatchWithdrawStarted and then ranges over the withdrawals channel should throttle reconnect attempts when the watcher returns; modify the logic around WatchWithdrawStarted/withdrawals in the loop to wait with a backoff (e.g., exponential backoff with jitter) before retrying, honoring ctx cancellation so it exits promptly; specifically add a backoff timer between the end of the for-range over withdrawals and the next invocation of svc.listener.WatchWithdrawStarted (or when the channel is closed due to subscription/setup failure), so reconnects are delayed and progressively increased on repeated failures while still calling svc.processWithdrawal for incoming events.
🧹 Nitpick comments (1)
service/service.go (1)
200-204: Unsynchronized access tosvc.ethClientduring shutdown.The reconnection loop (lines 200-204) swaps
svc.ethClientwithout synchronization, while the shutdown goroutine (lines 254-259) unconditionally closessvc.ethClienton context cancellation. If a reconnection completes just before shutdown, the shutdown goroutine may close the newly-dialed client while other operations are still in-flight, or conversely, the old client may already be closed at line 202.In practice, this is likely benign since the context is cancelled and all operations are unwinding. However, consider either:
- Removing the shutdown goroutine's
Close()call (since line 202 already closes old clients), or- Using a mutex around client swaps and the final close.
Also applies to: 254-259
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@service/service.go` around lines 200 - 204, The swap and close of svc.ethClient in the reconnection loop (the block that currently does "if svc.ethClient != nil { svc.ethClient.Close() } svc.ethClient = newClient") races with the shutdown goroutine that unconditionally calls svc.ethClient.Close() on context cancellation; protect all accesses and the final close with a mutex to avoid closing a client concurrently or swapping under an in-flight close. Add a mutex (e.g., ethClientMu sync.Mutex or sync.RWMutex) as a field on svc, acquire the lock before reading/closing/swapping svc.ethClient in the reconnection code path and also acquire it in the shutdown goroutine before calling svc.ethClient.Close(), and release after the operation so the swap and final close are serialized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@service/service.go`:
- Around line 224-231: The code currently treats a failed
svc.store.GetCursor("withdraw_started") as if it returned a zero cursor and may
fall back to svc.Config.Blockchain.StartBlock; instead, handle cursor lookup
failures as retryable errors: if svc.store.GetCursor("withdraw_started") returns
a non-nil err, propagate or return that error (do not continue with
fromBlock/fromLogIdx zero values) so the caller can retry; update the block in
service.go to check err != nil and return or wrap the error (referencing
svc.store.GetCursor, fromBlock, fromLogIdx) rather than silently falling through
to the StartBlock fallback.
- Around line 235-243: The loop that calls svc.listener.WatchWithdrawStarted and
then ranges over the withdrawals channel should throttle reconnect attempts when
the watcher returns; modify the logic around WatchWithdrawStarted/withdrawals in
the loop to wait with a backoff (e.g., exponential backoff with jitter) before
retrying, honoring ctx cancellation so it exits promptly; specifically add a
backoff timer between the end of the for-range over withdrawals and the next
invocation of svc.listener.WatchWithdrawStarted (or when the channel is closed
due to subscription/setup failure), so reconnects are delayed and progressively
increased on repeated failures while still calling svc.processWithdrawal for
incoming events.
---
Nitpick comments:
In `@service/service.go`:
- Around line 200-204: The swap and close of svc.ethClient in the reconnection
loop (the block that currently does "if svc.ethClient != nil {
svc.ethClient.Close() } svc.ethClient = newClient") races with the shutdown
goroutine that unconditionally calls svc.ethClient.Close() on context
cancellation; protect all accesses and the final close with a mutex to avoid
closing a client concurrently or swapping under an in-flight close. Add a mutex
(e.g., ethClientMu sync.Mutex or sync.RWMutex) as a field on svc, acquire the
lock before reading/closing/swapping svc.ethClient in the reconnection code path
and also acquire it in the shutdown goroutine before calling
svc.ethClient.Close(), and release after the operation so the swap and final
close are serialized.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9234975f-d17e-45a0-ad07-b09ba6f8dbc5
📒 Files selected for processing (2)
custody/listener.goservice/service.go
- GetCursor failure now retries after 5s instead of proceeding with zeroed cursor values that could cause a full replay from StartBlock. - After the event watcher channel closes, wait 5s before attempting reconnection to prevent tight reconnect storms when the listener exits immediately (e.g. instant subscription failure). Both delays respect context cancellation for clean shutdown.
Rework the event listener to subscribe to new block headers (SubscribeNewHead) instead of SubscribeFilterLogs, then reconcile logs per block range. This makes the listener resilient to WebSocket disconnections (e.g. Infura error 1006).
The service goroutine now loops: on first run it uses the existing client; on subsequent iterations it re-dials via RPCURL, re-binds the contract, and resumes from the persisted cursor. When RPCURL is empty (simulated backend in tests) it exits cleanly instead of attempting to dial.
Also fixes two bugs in waitForBackOffTimeout:
Summary by CodeRabbit
New Features
Bug Fixes