Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 105 additions & 53 deletions custody/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"math/big"
"regexp"
"strconv"
Expand Down Expand Up @@ -70,6 +71,7 @@ func (l *Listener) WatchWithdrawStarted(ctx context.Context, sink chan<- *Withdr
listenEvents(ctx, l.client, "withdraw-started", l.contractAddr, 0, fromBlock, fromLogIndex,
[][]common.Hash{{topic}},
func(log types.Log) {

ev, err := l.withdrawFilterer.ParseWithdrawStarted(log)
if err != nil {
return
Expand Down Expand Up @@ -102,6 +104,7 @@ func (l *Listener) WatchWithdrawFinalized(ctx context.Context, sink chan<- *With
listenEvents(ctx, l.client, "withdraw-finalized", l.contractAddr, 0, fromBlock, fromLogIndex,
[][]common.Hash{{topic}},
func(log types.Log) {

ev, err := l.withdrawFilterer.ParseWithdrawFinalized(log)
if err != nil {
return
Expand Down Expand Up @@ -135,6 +138,7 @@ func (l *Listener) WatchDeposited(ctx context.Context, sink chan<- *DepositedEve
listenEvents(ctx, l.client, "deposited", l.contractAddr, 0, fromBlock, fromLogIndex,
[][]common.Hash{{topic}},
func(log types.Log) {

ev, err := l.depositFilterer.ParseDeposited(log)
if err != nil {
return
Expand Down Expand Up @@ -167,8 +171,13 @@ func listenEvents(
handler logHandler,
) {
var backOffCount atomic.Uint64
var historicalCh, currentCh chan types.Log
var eventSubscription event.Subscription
var headCh chan *types.Header

// Interface for SubscribeNewHead, as it's not in bind.ContractBackend
type headerSubscriber interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}

listenerLogger.Debugw("starting listening events", "subID", subID, "contractAddress", contractAddress.String())
for {
Expand All @@ -185,73 +194,90 @@ func listenEvents(
return
}

historicalCh = make(chan types.Log, 1)
currentCh = make(chan types.Log, 100)
// Channel for new block headers
headCh = make(chan *types.Header, 100)

if lastBlock == 0 {
listenerLogger.Infow("skipping historical logs fetching", "subID", subID, "contractAddress", contractAddress.String())
} else {
var header *types.Header
var err error
headerCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
err = debounce.Debounce(headerCtx, listenerLogger, func(ctx context.Context) error {
header, err = client.HeaderByNumber(ctx, nil)
return err
})
cancel()
if err != nil {
if ctx.Err() != nil {
return
}
listenerLogger.Errorw("failed to get latest block", "error", err, "subID", subID, "contractAddress", contractAddress.String())
backOffCount.Add(1)
continue
subscriber, ok := client.(headerSubscriber)
if !ok {
listenerLogger.Errorw("client does not support SubscribeNewHead", "subID", subID)
return
}

sub, subscribeErr := subscriber.SubscribeNewHead(ctx, headCh)
if subscribeErr != nil {
if ctx.Err() != nil {
return
}
// Return instead of retrying: the underlying connection is
// likely dead, so resubscribing on the same client will keep
// failing. Returning causes the Service-level loop to dial a
// fresh ethclient and restart the listener.
listenerLogger.Errorw("failed to subscribe to new heads, exiting listener", "error", subscribeErr, "subID", subID)
return
}
eventSubscription = sub
listenerLogger.Infow("subscribed to new heads", "subID", subID)
backOffCount.Store(0)

// Initial sync up to current head
var header *types.Header
var err error
headerCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
err = debounce.Debounce(headerCtx, listenerLogger, func(ctx context.Context) error {
header, err = client.HeaderByNumber(ctx, nil)
return err
})
cancel()
if err != nil {
listenerLogger.Errorw("failed to get latest block header", "error", err, "subID", subID)
eventSubscription.Unsubscribe()
eventSubscription = nil
backOffCount.Add(1)
continue
}

currentHead := header.Number.Uint64()
targetBlock := currentHead

go reconcileBlockRange(
if targetBlock > lastBlock {
lastBlock, lastIndex = reconcileBlockRange(
ctx,
client,
subID,
contractAddress,
networkID,
header.Number.Uint64(),
targetBlock,
lastBlock,
lastIndex,
topics,
historicalCh,
handler,
)
}

watchFQ := ethereum.FilterQuery{
Addresses: []common.Address{contractAddress},
}
eventSub, err := client.SubscribeFilterLogs(ctx, watchFQ, currentCh)
if err != nil {
if ctx.Err() != nil {
return
}
listenerLogger.Errorw("failed to subscribe on events", "error", err, "subID", subID, "contractAddress", contractAddress.String())
backOffCount.Add(1)
continue
}

eventSubscription = eventSub
listenerLogger.Infow("watching events", "subID", subID, "contractAddress", contractAddress.String())
backOffCount.Store(0)
}

select {
case <-ctx.Done():
listenerLogger.Infow("context cancelled, stopping listener", "subID", subID)
eventSubscription.Unsubscribe()
return
case eventLog := <-historicalCh:
listenerLogger.Debugw("received historical event", "subID", subID, "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index)
handler(eventLog)
case eventLog := <-currentCh:
lastBlock = eventLog.BlockNumber
listenerLogger.Debugw("received new event", "subID", subID, "blockNumber", lastBlock, "logIndex", eventLog.Index)
handler(eventLog)
case header := <-headCh:
currentHead := header.Number.Uint64()
targetBlock := currentHead

if targetBlock > lastBlock {
lastBlock, lastIndex = reconcileBlockRange(
ctx,
client,
subID,
contractAddress,
networkID,
targetBlock,
lastBlock,
lastIndex,
topics,
handler,
)
}
case err := <-eventSubscription.Err():
if err != nil {
listenerLogger.Errorw("event subscription error", "error", err, "subID", subID, "contractAddress", contractAddress.String())
Expand All @@ -265,6 +291,18 @@ func listenEvents(
}
}

// reconcileBlockRange fetches historical logs from lastBlock up to
// currentBlock and feeds them to handler. It returns the cursor
// (block, logIndex) representing the most recent log that was fully
// processed. When an entire batch is consumed without any matching
// logs, the cursor advances to (endBlock, math.MaxUint32) so that the
// caller never re-queries logs from blocks that were already scanned.
//
// The returned logIndex is math.MaxUint32 when the indicated block was
// fully consumed (i.e. either it contained no matching events or all
// its events were dispatched). Callers should propagate these values
// into lastBlock / lastIndex so the skip-guard at the top of the
// processing loop works correctly on the next invocation.
func reconcileBlockRange(
ctx context.Context,
client bind.ContractBackend,
Expand All @@ -275,8 +313,11 @@ func reconcileBlockRange(
lastBlock uint64,
lastIndex uint32,
topics [][]common.Hash,
historicalCh chan types.Log,
) {
handler logHandler,
) (finishedBlock uint64, finishedIndex uint32) {
finishedBlock = lastBlock
finishedIndex = lastIndex

var backOffCount atomic.Uint64
const blockStep = 10000
startBlock := lastBlock
Expand Down Expand Up @@ -335,12 +376,23 @@ func reconcileBlockRange(
continue
}

historicalCh <- ethLog
handler(ethLog)
finishedBlock = ethLog.BlockNumber
finishedIndex = uint32(ethLog.Index)
}

// Even if no matching logs were found in this batch, we have
// fully scanned up to endBlock — advance the cursor so the
// next call does not re-query the same range.
if finishedBlock < endBlock {
finishedBlock = endBlock
finishedIndex = math.MaxUint32
}

startBlock = endBlock + 1
endBlock += blockStep
}
return
}

func extractAdvisedBlockRange(msg string) (startBlock, endBlock uint64, err error) {
Expand Down Expand Up @@ -372,13 +424,13 @@ func extractAdvisedBlockRange(msg string) (startBlock, endBlock uint64, err erro
func waitForBackOffTimeout(ctx context.Context, backOffCount int, originator string) bool {
if backOffCount > maxBackOffCount {
listenerLogger.Errorw("back off limit reached, exiting", "originator", originator, "backOffCount", backOffCount)
return true
return false
}

if backOffCount > 0 {
listenerLogger.Infow("backing off", "originator", originator, "backOffCount", backOffCount)
select {
case <-time.After(time.Duration(2^backOffCount-1) * time.Second):
case <-time.After(time.Duration(1<<backOffCount) * time.Second):
case <-ctx.Done():
return false
}
Expand Down
102 changes: 88 additions & 14 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,96 @@ func (svc *Service) RunWorkerWithContext(ctx context.Context) error {
})

g.Go(func() error {
fromBlock, fromLogIdx, err := svc.store.GetCursor("withdraw_started")
if err != nil {
svc.Logger.Warn("Failed to read withdraw_started cursor, starting from head", "error", err)
// If cursor is missing, we default to 0. But if StartBlock is configured, we should use that.
}
if fromBlock == 0 && svc.Config.Blockchain.StartBlock > 0 {
fromBlock = svc.Config.Blockchain.StartBlock
}
defer func() {
svc.Logger.Info("WithdrawStarted event watcher stopped")
}()

firstRun := true

for {
if ctx.Err() != nil {
return nil
}

if !firstRun {
// Reconnection only works when we have a dialable RPC URL.
// When the service was created via NewWithBackend (e.g. integration
// tests with a simulated backend), RPCURL is empty and there is
// nothing to re-dial — just exit.
if svc.Config.Blockchain.RPCURL == "" {
svc.Logger.Info("No RPC URL configured, cannot reconnect")
return nil
}

svc.Logger.Info("Reconnecting to Ethereum RPC...")
newClient, err := ethclient.DialContext(ctx, svc.Config.Blockchain.RPCURL)
if err != nil {
svc.Logger.Error("Failed to reconnect to Ethereum RPC", "error", err)
select {
case <-time.After(5 * time.Second):
continue
case <-ctx.Done():
return nil
}
}

// Close old client and swap in the new one.
if svc.ethClient != nil {
svc.ethClient.Close()
}
svc.ethClient = newClient

// Re-bind contract with the new client.
addr := common.HexToAddress(svc.Config.Blockchain.ContractAddr)
withdrawContract, err := custody.NewIWithdraw(addr, newClient)
if err != nil {
svc.Logger.Error("Failed to re-bind contract", "error", err)
newClient.Close()
select {
case <-time.After(5 * time.Second):
continue
case <-ctx.Done():
return nil
}
}
svc.contract = withdrawContract
svc.listener = custody.NewListener(newClient, addr, withdrawContract, nil)
}
firstRun = false

// Determine where to resume from.
fromBlock, fromLogIdx, err := svc.store.GetCursor("withdraw_started")
if err != nil {
svc.Logger.Error("Failed to read withdraw_started cursor, retrying", "error", err)
select {
case <-time.After(5 * time.Second):
continue
case <-ctx.Done():
return nil
}
}
if fromBlock == 0 && svc.Config.Blockchain.StartBlock > 0 {
fromBlock = svc.Config.Blockchain.StartBlock
}

svc.Logger.Info("Starting WithdrawStarted event watcher", "from_block", fromBlock, "from_log_index", fromLogIdx)

withdrawals := make(chan *custody.WithdrawStartedEvent)
go svc.listener.WatchWithdrawStarted(ctx, withdrawals, fromBlock, fromLogIdx)

svc.Logger.Info("Starting WithdrawStarted event watcher", "from_block", fromBlock, "from_log_index", fromLogIdx)
withdrawals := make(chan *custody.WithdrawStartedEvent)
go svc.listener.WatchWithdrawStarted(ctx, withdrawals, fromBlock, fromLogIdx)
for event := range withdrawals {
svc.processWithdrawal(ctx, event)
for event := range withdrawals {
svc.processWithdrawal(ctx, event)
}

// Watcher returned — back off before reconnecting to avoid
// tight reconnect storms if the listener exits immediately.
svc.Logger.Warn("Event watcher stopped, will retry after backoff")
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return nil
}
}
return nil
})

g.Go(func() error {
Expand Down