From b487887075ec1e0a72e09869e4207bf137324c33 Mon Sep 17 00:00:00 2001 From: Camille Meulien Date: Fri, 6 Mar 2026 10:42:57 +0100 Subject: [PATCH 1/4] feat: add WebSocket auto-reconnect with head-based polling Rework the event listener to subscribe to new block headers (SubscribeNewHead) instead of SubscribeFilterLogs, then reconcile logs per block range. This makes the listener resilient to WebSocket disconnections (e.g. Infura error 1006). The service goroutine now loops: on first run it uses the existing client; on subsequent iterations it re-dials via RPCURL, re-binds the contract, and resumes from the persisted cursor. When RPCURL is empty (simulated backend in tests) it exits cleanly instead of attempting to dial. Also fixes two bugs in waitForBackOffTimeout: - return false (not true) when max retries exceeded - use 1< lastBlock { + reconcileBlockRange( ctx, client, subID, contractAddress, networkID, - header.Number.Uint64(), + targetBlock, lastBlock, lastIndex, topics, - historicalCh, + handler, ) + lastBlock = targetBlock + lastIndex = 0 } - - watchFQ := ethereum.FilterQuery{ - Addresses: []common.Address{contractAddress}, - } - eventSub, err := client.SubscribeFilterLogs(ctx, watchFQ, currentCh) - if err != nil { - if ctx.Err() != nil { - return - } - listenerLogger.Errorw("failed to subscribe on events", "error", err, "subID", subID, "contractAddress", contractAddress.String()) - backOffCount.Add(1) - continue - } - - eventSubscription = eventSub - listenerLogger.Infow("watching events", "subID", subID, "contractAddress", contractAddress.String()) - backOffCount.Store(0) } select { @@ -245,13 +261,26 @@ func listenEvents( listenerLogger.Infow("context cancelled, stopping listener", "subID", subID) eventSubscription.Unsubscribe() return - case eventLog := <-historicalCh: - listenerLogger.Debugw("received historical event", "subID", subID, "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index) - handler(eventLog) - case eventLog := <-currentCh: - lastBlock = eventLog.BlockNumber - listenerLogger.Debugw("received new event", "subID", subID, "blockNumber", lastBlock, "logIndex", eventLog.Index) - handler(eventLog) + case header := <-headCh: + currentHead := header.Number.Uint64() + targetBlock := currentHead + + if targetBlock > lastBlock { + reconcileBlockRange( + ctx, + client, + subID, + contractAddress, + networkID, + targetBlock, + lastBlock, + lastIndex, + topics, + handler, + ) + lastBlock = targetBlock + lastIndex = 0 + } case err := <-eventSubscription.Err(): if err != nil { listenerLogger.Errorw("event subscription error", "error", err, "subID", subID, "contractAddress", contractAddress.String()) @@ -275,7 +304,7 @@ func reconcileBlockRange( lastBlock uint64, lastIndex uint32, topics [][]common.Hash, - historicalCh chan types.Log, + handler logHandler, ) { var backOffCount atomic.Uint64 const blockStep = 10000 @@ -335,7 +364,7 @@ func reconcileBlockRange( continue } - historicalCh <- ethLog + handler(ethLog) } startBlock = endBlock + 1 @@ -372,13 +401,13 @@ func extractAdvisedBlockRange(msg string) (startBlock, endBlock uint64, err erro func waitForBackOffTimeout(ctx context.Context, backOffCount int, originator string) bool { if backOffCount > maxBackOffCount { listenerLogger.Errorw("back off limit reached, exiting", "originator", originator, "backOffCount", backOffCount) - return true + return false } if backOffCount > 0 { listenerLogger.Infow("backing off", "originator", originator, "backOffCount", backOffCount) select { - case <-time.After(time.Duration(2^backOffCount-1) * time.Second): + case <-time.After(time.Duration(1< 0 { - fromBlock = svc.Config.Blockchain.StartBlock - } + defer func() { + svc.Logger.Info("WithdrawStarted event watcher stopped") + }() + + firstRun := true + + for { + if ctx.Err() != nil { + return nil + } + + if !firstRun { + // Reconnection only works when we have a dialable RPC URL. + // When the service was created via NewWithBackend (e.g. integration + // tests with a simulated backend), RPCURL is empty and there is + // nothing to re-dial — just exit. + if svc.Config.Blockchain.RPCURL == "" { + svc.Logger.Info("No RPC URL configured, cannot reconnect") + return nil + } + + svc.Logger.Info("Reconnecting to Ethereum RPC...") + newClient, err := ethclient.Dial(svc.Config.Blockchain.RPCURL) + if err != nil { + svc.Logger.Error("Failed to reconnect to Ethereum RPC", "error", err) + select { + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return nil + } + } + + // Close old client and swap in the new one. + if svc.ethClient != nil { + svc.ethClient.Close() + } + svc.ethClient = newClient + + // Re-bind contract with the new client. + addr := common.HexToAddress(svc.Config.Blockchain.ContractAddr) + withdrawContract, err := custody.NewIWithdraw(addr, newClient) + if err != nil { + svc.Logger.Error("Failed to re-bind contract", "error", err) + newClient.Close() + select { + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return nil + } + } + svc.contract = withdrawContract + svc.listener = custody.NewListener(newClient, addr, withdrawContract, nil) + } + firstRun = false - svc.Logger.Info("Starting WithdrawStarted event watcher", "from_block", fromBlock, "from_log_index", fromLogIdx) - withdrawals := make(chan *custody.WithdrawStartedEvent) - go svc.listener.WatchWithdrawStarted(ctx, withdrawals, fromBlock, fromLogIdx) - for event := range withdrawals { - svc.processWithdrawal(ctx, event) + // Determine where to resume from. + fromBlock, fromLogIdx, err := svc.store.GetCursor("withdraw_started") + if err != nil { + svc.Logger.Warn("Failed to read withdraw_started cursor", "error", err) + } + if fromBlock == 0 && svc.Config.Blockchain.StartBlock > 0 { + fromBlock = svc.Config.Blockchain.StartBlock + } + + svc.Logger.Info("Starting WithdrawStarted event watcher", "from_block", fromBlock, "from_log_index", fromLogIdx) + + withdrawals := make(chan *custody.WithdrawStartedEvent) + go svc.listener.WatchWithdrawStarted(ctx, withdrawals, fromBlock, fromLogIdx) + + for event := range withdrawals { + svc.processWithdrawal(ctx, event) + } + + // Watcher returned — loop back to reconnect (or exit). } - return nil }) g.Go(func() error { From 4ef068ab11dc5749c8b022f3ef766b19376cd4ea Mon Sep 17 00:00:00 2001 From: Camille Meulien Date: Fri, 6 Mar 2026 16:43:35 +0100 Subject: [PATCH 2/4] fix: return cursor from reconcileBlockRange to prevent log replays reconcileBlockRange previously returned nothing, so callers unconditionally set lastBlock=targetBlock and lastIndex=0 after each call. This caused already-processed logs to be re-delivered when the next head arrived and re-queried the same block. The function now returns (finishedBlock, finishedIndex): - When logs are dispatched: the block/index of the last processed log. - When a batch has no matching logs: (endBlock, math.MaxUint32) to signal the block was fully consumed and all its logs should be skipped on the next pass. - On early exit (context cancelled, backoff exhausted): the cursor from entry, so no false progress is claimed. Also exits the listener immediately on SubscribeNewHead failure instead of retrying on a dead client, allowing the Service-level reconnection loop to dial a fresh connection. --- custody/listener.go | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/custody/listener.go b/custody/listener.go index b039e45..7db2ee6 100644 --- a/custody/listener.go +++ b/custody/listener.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "math/big" "regexp" "strconv" @@ -239,7 +240,7 @@ func listenEvents( targetBlock := currentHead if targetBlock > lastBlock { - reconcileBlockRange( + lastBlock, lastIndex = reconcileBlockRange( ctx, client, subID, @@ -251,8 +252,6 @@ func listenEvents( topics, handler, ) - lastBlock = targetBlock - lastIndex = 0 } } @@ -266,7 +265,7 @@ func listenEvents( targetBlock := currentHead if targetBlock > lastBlock { - reconcileBlockRange( + lastBlock, lastIndex = reconcileBlockRange( ctx, client, subID, @@ -278,8 +277,6 @@ func listenEvents( topics, handler, ) - lastBlock = targetBlock - lastIndex = 0 } case err := <-eventSubscription.Err(): if err != nil { @@ -294,6 +291,18 @@ func listenEvents( } } +// reconcileBlockRange fetches historical logs from lastBlock up to +// currentBlock and feeds them to handler. It returns the cursor +// (block, logIndex) representing the most recent log that was fully +// processed. When an entire batch is consumed without any matching +// logs, the cursor advances to (endBlock, math.MaxUint32) so that the +// caller never re-queries logs from blocks that were already scanned. +// +// The returned logIndex is math.MaxUint32 when the indicated block was +// fully consumed (i.e. either it contained no matching events or all +// its events were dispatched). Callers should propagate these values +// into lastBlock / lastIndex so the skip-guard at the top of the +// processing loop works correctly on the next invocation. func reconcileBlockRange( ctx context.Context, client bind.ContractBackend, @@ -305,7 +314,10 @@ func reconcileBlockRange( lastIndex uint32, topics [][]common.Hash, handler logHandler, -) { +) (finishedBlock uint64, finishedIndex uint32) { + finishedBlock = lastBlock + finishedIndex = lastIndex + var backOffCount atomic.Uint64 const blockStep = 10000 startBlock := lastBlock @@ -365,11 +377,22 @@ func reconcileBlockRange( } handler(ethLog) + finishedBlock = ethLog.BlockNumber + finishedIndex = uint32(ethLog.Index) + } + + // Even if no matching logs were found in this batch, we have + // fully scanned up to endBlock — advance the cursor so the + // next call does not re-query the same range. + if finishedBlock < endBlock { + finishedBlock = endBlock + finishedIndex = math.MaxUint32 } startBlock = endBlock + 1 endBlock += blockStep } + return } func extractAdvisedBlockRange(msg string) (startBlock, endBlock uint64, err error) { From 48c10111a2329bd9d6b5a8b36de15db84dce22cf Mon Sep 17 00:00:00 2001 From: Camille Meulien Date: Fri, 6 Mar 2026 16:45:06 +0100 Subject: [PATCH 3/4] fix: use DialContext for reconnection so shutdown cancels pending dials ethclient.Dial can block indefinitely on DNS or TCP handshake. Replace with ethclient.DialContext(ctx, ...) so the reconnect attempt is interrupted when the service context is cancelled. --- service/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/service.go b/service/service.go index 80d1896..f81604b 100644 --- a/service/service.go +++ b/service/service.go @@ -186,7 +186,7 @@ func (svc *Service) RunWorkerWithContext(ctx context.Context) error { } svc.Logger.Info("Reconnecting to Ethereum RPC...") - newClient, err := ethclient.Dial(svc.Config.Blockchain.RPCURL) + newClient, err := ethclient.DialContext(ctx, svc.Config.Blockchain.RPCURL) if err != nil { svc.Logger.Error("Failed to reconnect to Ethereum RPC", "error", err) select { From b534854c7435379a642d62a03306837fe3cab62c Mon Sep 17 00:00:00 2001 From: Camille Meulien Date: Fri, 6 Mar 2026 17:01:14 +0100 Subject: [PATCH 4/4] fix: harden reconnection loop with cursor retry and watcher backoff - GetCursor failure now retries after 5s instead of proceeding with zeroed cursor values that could cause a full replay from StartBlock. - After the event watcher channel closes, wait 5s before attempting reconnection to prevent tight reconnect storms when the listener exits immediately (e.g. instant subscription failure). Both delays respect context cancellation for clean shutdown. --- service/service.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/service/service.go b/service/service.go index f81604b..2a88e13 100644 --- a/service/service.go +++ b/service/service.go @@ -224,7 +224,13 @@ func (svc *Service) RunWorkerWithContext(ctx context.Context) error { // Determine where to resume from. fromBlock, fromLogIdx, err := svc.store.GetCursor("withdraw_started") if err != nil { - svc.Logger.Warn("Failed to read withdraw_started cursor", "error", err) + svc.Logger.Error("Failed to read withdraw_started cursor, retrying", "error", err) + select { + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return nil + } } if fromBlock == 0 && svc.Config.Blockchain.StartBlock > 0 { fromBlock = svc.Config.Blockchain.StartBlock @@ -239,7 +245,14 @@ func (svc *Service) RunWorkerWithContext(ctx context.Context) error { svc.processWithdrawal(ctx, event) } - // Watcher returned — loop back to reconnect (or exit). + // Watcher returned — back off before reconnecting to avoid + // tight reconnect storms if the listener exits immediately. + svc.Logger.Warn("Event watcher stopped, will retry after backoff") + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return nil + } } })