diff --git a/custody/listener.go b/custody/listener.go index 280aa4a..7db2ee6 100644 --- a/custody/listener.go +++ b/custody/listener.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "math/big" "regexp" "strconv" @@ -70,6 +71,7 @@ func (l *Listener) WatchWithdrawStarted(ctx context.Context, sink chan<- *Withdr listenEvents(ctx, l.client, "withdraw-started", l.contractAddr, 0, fromBlock, fromLogIndex, [][]common.Hash{{topic}}, func(log types.Log) { + ev, err := l.withdrawFilterer.ParseWithdrawStarted(log) if err != nil { return @@ -102,6 +104,7 @@ func (l *Listener) WatchWithdrawFinalized(ctx context.Context, sink chan<- *With listenEvents(ctx, l.client, "withdraw-finalized", l.contractAddr, 0, fromBlock, fromLogIndex, [][]common.Hash{{topic}}, func(log types.Log) { + ev, err := l.withdrawFilterer.ParseWithdrawFinalized(log) if err != nil { return @@ -135,6 +138,7 @@ func (l *Listener) WatchDeposited(ctx context.Context, sink chan<- *DepositedEve listenEvents(ctx, l.client, "deposited", l.contractAddr, 0, fromBlock, fromLogIndex, [][]common.Hash{{topic}}, func(log types.Log) { + ev, err := l.depositFilterer.ParseDeposited(log) if err != nil { return @@ -167,8 +171,13 @@ func listenEvents( handler logHandler, ) { var backOffCount atomic.Uint64 - var historicalCh, currentCh chan types.Log var eventSubscription event.Subscription + var headCh chan *types.Header + + // Interface for SubscribeNewHead, as it's not in bind.ContractBackend + type headerSubscriber interface { + SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) + } listenerLogger.Debugw("starting listening events", "subID", subID, "contractAddress", contractAddress.String()) for { @@ -185,59 +194,65 @@ func listenEvents( return } - historicalCh = make(chan types.Log, 1) - currentCh = make(chan types.Log, 100) + // Channel for new block headers + headCh = make(chan *types.Header, 100) - if lastBlock == 0 { - listenerLogger.Infow("skipping historical logs fetching", "subID", subID, "contractAddress", contractAddress.String()) - } else { - var header *types.Header - var err error - headerCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) - err = debounce.Debounce(headerCtx, listenerLogger, func(ctx context.Context) error { - header, err = client.HeaderByNumber(ctx, nil) - return err - }) - cancel() - if err != nil { - if ctx.Err() != nil { - return - } - listenerLogger.Errorw("failed to get latest block", "error", err, "subID", subID, "contractAddress", contractAddress.String()) - backOffCount.Add(1) - continue + subscriber, ok := client.(headerSubscriber) + if !ok { + listenerLogger.Errorw("client does not support SubscribeNewHead", "subID", subID) + return + } + + sub, subscribeErr := subscriber.SubscribeNewHead(ctx, headCh) + if subscribeErr != nil { + if ctx.Err() != nil { + return } + // Return instead of retrying: the underlying connection is + // likely dead, so resubscribing on the same client will keep + // failing. Returning causes the Service-level loop to dial a + // fresh ethclient and restart the listener. + listenerLogger.Errorw("failed to subscribe to new heads, exiting listener", "error", subscribeErr, "subID", subID) + return + } + eventSubscription = sub + listenerLogger.Infow("subscribed to new heads", "subID", subID) + backOffCount.Store(0) + + // Initial sync up to current head + var header *types.Header + var err error + headerCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + err = debounce.Debounce(headerCtx, listenerLogger, func(ctx context.Context) error { + header, err = client.HeaderByNumber(ctx, nil) + return err + }) + cancel() + if err != nil { + listenerLogger.Errorw("failed to get latest block header", "error", err, "subID", subID) + eventSubscription.Unsubscribe() + eventSubscription = nil + backOffCount.Add(1) + continue + } + + currentHead := header.Number.Uint64() + targetBlock := currentHead - go reconcileBlockRange( + if targetBlock > lastBlock { + lastBlock, lastIndex = reconcileBlockRange( ctx, client, subID, contractAddress, networkID, - header.Number.Uint64(), + targetBlock, lastBlock, lastIndex, topics, - historicalCh, + handler, ) } - - watchFQ := ethereum.FilterQuery{ - Addresses: []common.Address{contractAddress}, - } - eventSub, err := client.SubscribeFilterLogs(ctx, watchFQ, currentCh) - if err != nil { - if ctx.Err() != nil { - return - } - listenerLogger.Errorw("failed to subscribe on events", "error", err, "subID", subID, "contractAddress", contractAddress.String()) - backOffCount.Add(1) - continue - } - - eventSubscription = eventSub - listenerLogger.Infow("watching events", "subID", subID, "contractAddress", contractAddress.String()) - backOffCount.Store(0) } select { @@ -245,13 +260,24 @@ func listenEvents( listenerLogger.Infow("context cancelled, stopping listener", "subID", subID) eventSubscription.Unsubscribe() return - case eventLog := <-historicalCh: - listenerLogger.Debugw("received historical event", "subID", subID, "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index) - handler(eventLog) - case eventLog := <-currentCh: - lastBlock = eventLog.BlockNumber - listenerLogger.Debugw("received new event", "subID", subID, "blockNumber", lastBlock, "logIndex", eventLog.Index) - handler(eventLog) + case header := <-headCh: + currentHead := header.Number.Uint64() + targetBlock := currentHead + + if targetBlock > lastBlock { + lastBlock, lastIndex = reconcileBlockRange( + ctx, + client, + subID, + contractAddress, + networkID, + targetBlock, + lastBlock, + lastIndex, + topics, + handler, + ) + } case err := <-eventSubscription.Err(): if err != nil { listenerLogger.Errorw("event subscription error", "error", err, "subID", subID, "contractAddress", contractAddress.String()) @@ -265,6 +291,18 @@ func listenEvents( } } +// reconcileBlockRange fetches historical logs from lastBlock up to +// currentBlock and feeds them to handler. It returns the cursor +// (block, logIndex) representing the most recent log that was fully +// processed. When an entire batch is consumed without any matching +// logs, the cursor advances to (endBlock, math.MaxUint32) so that the +// caller never re-queries logs from blocks that were already scanned. +// +// The returned logIndex is math.MaxUint32 when the indicated block was +// fully consumed (i.e. either it contained no matching events or all +// its events were dispatched). Callers should propagate these values +// into lastBlock / lastIndex so the skip-guard at the top of the +// processing loop works correctly on the next invocation. func reconcileBlockRange( ctx context.Context, client bind.ContractBackend, @@ -275,8 +313,11 @@ func reconcileBlockRange( lastBlock uint64, lastIndex uint32, topics [][]common.Hash, - historicalCh chan types.Log, -) { + handler logHandler, +) (finishedBlock uint64, finishedIndex uint32) { + finishedBlock = lastBlock + finishedIndex = lastIndex + var backOffCount atomic.Uint64 const blockStep = 10000 startBlock := lastBlock @@ -335,12 +376,23 @@ func reconcileBlockRange( continue } - historicalCh <- ethLog + handler(ethLog) + finishedBlock = ethLog.BlockNumber + finishedIndex = uint32(ethLog.Index) + } + + // Even if no matching logs were found in this batch, we have + // fully scanned up to endBlock — advance the cursor so the + // next call does not re-query the same range. + if finishedBlock < endBlock { + finishedBlock = endBlock + finishedIndex = math.MaxUint32 } startBlock = endBlock + 1 endBlock += blockStep } + return } func extractAdvisedBlockRange(msg string) (startBlock, endBlock uint64, err error) { @@ -372,13 +424,13 @@ func extractAdvisedBlockRange(msg string) (startBlock, endBlock uint64, err erro func waitForBackOffTimeout(ctx context.Context, backOffCount int, originator string) bool { if backOffCount > maxBackOffCount { listenerLogger.Errorw("back off limit reached, exiting", "originator", originator, "backOffCount", backOffCount) - return true + return false } if backOffCount > 0 { listenerLogger.Infow("backing off", "originator", originator, "backOffCount", backOffCount) select { - case <-time.After(time.Duration(2^backOffCount-1) * time.Second): + case <-time.After(time.Duration(1< 0 { - fromBlock = svc.Config.Blockchain.StartBlock - } + defer func() { + svc.Logger.Info("WithdrawStarted event watcher stopped") + }() + + firstRun := true + + for { + if ctx.Err() != nil { + return nil + } + + if !firstRun { + // Reconnection only works when we have a dialable RPC URL. + // When the service was created via NewWithBackend (e.g. integration + // tests with a simulated backend), RPCURL is empty and there is + // nothing to re-dial — just exit. + if svc.Config.Blockchain.RPCURL == "" { + svc.Logger.Info("No RPC URL configured, cannot reconnect") + return nil + } + + svc.Logger.Info("Reconnecting to Ethereum RPC...") + newClient, err := ethclient.DialContext(ctx, svc.Config.Blockchain.RPCURL) + if err != nil { + svc.Logger.Error("Failed to reconnect to Ethereum RPC", "error", err) + select { + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return nil + } + } + + // Close old client and swap in the new one. + if svc.ethClient != nil { + svc.ethClient.Close() + } + svc.ethClient = newClient + + // Re-bind contract with the new client. + addr := common.HexToAddress(svc.Config.Blockchain.ContractAddr) + withdrawContract, err := custody.NewIWithdraw(addr, newClient) + if err != nil { + svc.Logger.Error("Failed to re-bind contract", "error", err) + newClient.Close() + select { + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return nil + } + } + svc.contract = withdrawContract + svc.listener = custody.NewListener(newClient, addr, withdrawContract, nil) + } + firstRun = false + + // Determine where to resume from. + fromBlock, fromLogIdx, err := svc.store.GetCursor("withdraw_started") + if err != nil { + svc.Logger.Error("Failed to read withdraw_started cursor, retrying", "error", err) + select { + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return nil + } + } + if fromBlock == 0 && svc.Config.Blockchain.StartBlock > 0 { + fromBlock = svc.Config.Blockchain.StartBlock + } + + svc.Logger.Info("Starting WithdrawStarted event watcher", "from_block", fromBlock, "from_log_index", fromLogIdx) + + withdrawals := make(chan *custody.WithdrawStartedEvent) + go svc.listener.WatchWithdrawStarted(ctx, withdrawals, fromBlock, fromLogIdx) - svc.Logger.Info("Starting WithdrawStarted event watcher", "from_block", fromBlock, "from_log_index", fromLogIdx) - withdrawals := make(chan *custody.WithdrawStartedEvent) - go svc.listener.WatchWithdrawStarted(ctx, withdrawals, fromBlock, fromLogIdx) - for event := range withdrawals { - svc.processWithdrawal(ctx, event) + for event := range withdrawals { + svc.processWithdrawal(ctx, event) + } + + // Watcher returned — back off before reconnecting to avoid + // tight reconnect storms if the listener exits immediately. + svc.Logger.Warn("Event watcher stopped, will retry after backoff") + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return nil + } } - return nil }) g.Go(func() error {