-
Notifications
You must be signed in to change notification settings - Fork 75
Feature/remove chunked filter logs #734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: next/2.0
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -6,7 +6,6 @@ package claimer | |||||
| import ( | ||||||
| "context" | ||||||
| "fmt" | ||||||
| "iter" | ||||||
| "log/slog" | ||||||
| "math/big" | ||||||
|
|
||||||
|
|
@@ -15,8 +14,6 @@ import ( | |||||
| "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" | ||||||
| "github.com/cartesi/rollups-node/pkg/ethutil" | ||||||
|
|
||||||
| "github.com/ethereum/go-ethereum" | ||||||
| "github.com/ethereum/go-ethereum/accounts/abi" | ||||||
| "github.com/ethereum/go-ethereum/accounts/abi/bind" | ||||||
| "github.com/ethereum/go-ethereum/common" | ||||||
| "github.com/ethereum/go-ethereum/core/types" | ||||||
|
|
@@ -61,7 +58,7 @@ type iclaimerBlockchain interface { | |||||
| error, | ||||||
| ) | ||||||
|
|
||||||
| getBlockNumber(ctx context.Context) (*big.Int, error) | ||||||
| getCommitmentBlockNumber(ctx context.Context) (*big.Int, error) | ||||||
|
|
||||||
| getConsensusAddress( | ||||||
| ctx context.Context, | ||||||
|
|
@@ -77,51 +74,87 @@ type claimerBlockchain struct { | |||||
| defaultBlock config.DefaultBlock | ||||||
| } | ||||||
|
|
||||||
| func (self *claimerBlockchain) submitClaimToBlockchain( | ||||||
| func (cb *claimerBlockchain) submitClaimToBlockchain( | ||||||
| ic *iconsensus.IConsensus, | ||||||
| application *model.Application, | ||||||
| epoch *model.Epoch, | ||||||
| ) (common.Hash, error) { | ||||||
| txHash := common.Hash{} | ||||||
| maybeTxHash := common.Hash{} | ||||||
| if cb.txOpts == nil { | ||||||
| return maybeTxHash, fmt.Errorf("txOpts is required for claim submission") | ||||||
| } | ||||||
| lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock) | ||||||
| tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress, | ||||||
| tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress, | ||||||
| lastBlockNumber, *epoch.OutputsMerkleRoot) | ||||||
| if err != nil { | ||||||
| self.logger.Error("submitClaimToBlockchain:failed", | ||||||
| cb.logger.Error("submitClaimToBlockchain:failed", | ||||||
| "appContractAddress", application.IApplicationAddress, | ||||||
| "claimHash", *epoch.OutputsMerkleRoot, | ||||||
| "last_block", epoch.LastBlock, | ||||||
| "error", err) | ||||||
| } else { | ||||||
| txHash = tx.Hash() | ||||||
| self.logger.Debug("submitClaimToBlockchain:success", | ||||||
| maybeTxHash = tx.Hash() | ||||||
| cb.logger.Debug("submitClaimToBlockchain:success", | ||||||
| "appContractAddress", application.IApplicationAddress, | ||||||
| "claimHash", *epoch.OutputsMerkleRoot, | ||||||
| "last_block", epoch.LastBlock, | ||||||
| "TxHash", txHash) | ||||||
| "TxHash", maybeTxHash) | ||||||
| } | ||||||
| return txHash, err | ||||||
| return maybeTxHash, err | ||||||
| } | ||||||
|
|
||||||
| func unwrapClaimSubmitted( | ||||||
| ic *iconsensus.IConsensus, | ||||||
| pull func() (log *types.Log, err error, ok bool), | ||||||
| type EventIterator interface { | ||||||
| Next() bool | ||||||
| Close() error | ||||||
| } | ||||||
|
|
||||||
| func newOracle( | ||||||
| nr func(*bind.CallOpts) (*big.Int, error), | ||||||
| ) ( | ||||||
| *iconsensus.IConsensusClaimSubmitted, | ||||||
| bool, | ||||||
| error, | ||||||
| func(ctx context.Context, block uint64) (*big.Int, error), | ||||||
| ) { | ||||||
| log, err, ok := pull() | ||||||
| if !ok || err != nil { | ||||||
| return nil, false, err | ||||||
| return func(ctx context.Context, block uint64) (*big.Int, error) { | ||||||
| callOpts := &bind.CallOpts{ | ||||||
| Context: ctx, | ||||||
| BlockNumber: new(big.Int).SetUint64(block), | ||||||
| } | ||||||
| numEvents, err := nr(callOpts) | ||||||
|
|
||||||
| if err != nil { | ||||||
| return nil, fmt.Errorf("failed to get event count in block %d: %w", block, err) | ||||||
| } | ||||||
| return numEvents, nil | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func newOnHit[IT EventIterator]( | ||||||
| ctx context.Context, | ||||||
| address common.Address, | ||||||
| filter func (*bind.FilterOpts, []common.Address, []common.Address) (IT, error), | ||||||
| onEvent func(IT), | ||||||
| ) ( | ||||||
| func(block uint64) error, | ||||||
| ) { | ||||||
| return func(block uint64) error { | ||||||
| filterOpts := &bind.FilterOpts{ | ||||||
| Context: ctx, | ||||||
| Start: block, | ||||||
| End: &block, | ||||||
| } | ||||||
| it, err := filter(filterOpts, nil, []common.Address{address}) | ||||||
| if err != nil { | ||||||
| return fmt.Errorf("failed to retrieve events at block %d: %w", block, err) | ||||||
| } | ||||||
| for it.Next() { | ||||||
| onEvent(it) | ||||||
| } | ||||||
| return it.Close() | ||||||
| } | ||||||
|
Comment on lines
+106
to
152
|
||||||
| ev, err := ic.ParseClaimSubmitted(*log) | ||||||
| return ev, true, err | ||||||
| } | ||||||
|
|
||||||
| // scan the event stream for a claimSubmitted event that matches claim. | ||||||
| // return this event and its successor | ||||||
| func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( | ||||||
| func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( | ||||||
| ctx context.Context, | ||||||
| application *model.Application, | ||||||
| epoch *model.Epoch, | ||||||
|
|
@@ -132,78 +165,44 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( | |||||
| *iconsensus.IConsensusClaimSubmitted, | ||||||
| error, | ||||||
| ) { | ||||||
| ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) | ||||||
| ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) | ||||||
| if err != nil { | ||||||
| return nil, nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| // filter must match: | ||||||
| // - `ClaimSubmitted` events | ||||||
| // - submitter == nil (any) | ||||||
| // - appContract == claim.IApplicationAddress | ||||||
| c, err := iconsensus.IConsensusMetaData.GetAbi() | ||||||
| topics, err := abi.MakeTopics( | ||||||
| []any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID}, | ||||||
| nil, | ||||||
| []any{application.IApplicationAddress}, | ||||||
| oracle := newOracle(ic.GetNumberOfSubmittedClaims) | ||||||
| events := []*iconsensus.IConsensusClaimSubmitted{} | ||||||
| onHit := newOnHit(ctx, application.IApplicationAddress, ic.FilterClaimSubmitted, | ||||||
| func(it *iconsensus.IConsensusClaimSubmittedIterator) { | ||||||
| event := it.Event | ||||||
| if (len(events) == 0) && claimSubmittedEventMatches(application, epoch, event) { | ||||||
| events = append(events, event) | ||||||
| } else if len(events) != 0 { | ||||||
| events = append(events, event) | ||||||
| } | ||||||
| }, | ||||||
| ) | ||||||
| if err != nil { | ||||||
| return nil, nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ | ||||||
| FromBlock: new(big.Int).SetUint64(epoch.LastBlock), | ||||||
| ToBlock: endBlock, | ||||||
| Addresses: []common.Address{application.IConsensusAddress}, | ||||||
| Topics: topics, | ||||||
| }) | ||||||
| numSubmittedClaims, err := oracle(ctx, epoch.LastBlock) | ||||||
| if err != nil { | ||||||
| return nil, nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| // pull events instead of iterating | ||||||
| next, stop := iter.Pull2(it) | ||||||
| defer stop() | ||||||
| for { | ||||||
| event, ok, err := unwrapClaimSubmitted(ic, next) | ||||||
| if !ok || err != nil { | ||||||
| return ic, event, nil, err | ||||||
| } | ||||||
| lastBlock := event.LastProcessedBlockNumber.Uint64() | ||||||
|
|
||||||
| if claimSubmittedEventMatches(application, epoch, event) { | ||||||
| // found the event, does it has a successor? try to fetch it | ||||||
| succ, ok, err := unwrapClaimSubmitted(ic, next) | ||||||
| if !ok || err != nil { | ||||||
| return ic, event, nil, err | ||||||
| } | ||||||
| return ic, event, succ, err | ||||||
| } else if lastBlock > epoch.LastBlock { | ||||||
| err = fmt.Errorf("No matching claim, searched up to %v", event) | ||||||
| return nil, nil, nil, err | ||||||
| } | ||||||
| _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numSubmittedClaims, oracle, onHit) | ||||||
| if err != nil { | ||||||
| return nil, nil, nil, fmt.Errorf("failed to walk ClaimSubmitted transitions: %w", err) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func unwrapClaimAccepted( | ||||||
| ic *iconsensus.IConsensus, | ||||||
| pull func() (log *types.Log, err error, ok bool), | ||||||
| ) ( | ||||||
| *iconsensus.IConsensusClaimAccepted, | ||||||
| bool, | ||||||
| error, | ||||||
| ) { | ||||||
| log, err, ok := pull() | ||||||
| if !ok || err != nil { | ||||||
| return nil, false, err | ||||||
| if len(events) == 0 { | ||||||
| return ic, nil, nil, nil | ||||||
| } else if len(events) == 1 { | ||||||
| return ic, events[0], nil, nil | ||||||
| } else { | ||||||
| return ic, events[0], events[1], nil | ||||||
| } | ||||||
mpolitzer marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| ev, err := ic.ParseClaimAccepted(*log) | ||||||
| return ev, true, err | ||||||
| } | ||||||
|
|
||||||
| // scan the event stream for a claimAccepted event that matches claim. | ||||||
| // return this event and its successor | ||||||
| func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( | ||||||
| func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( | ||||||
| ctx context.Context, | ||||||
| application *model.Application, | ||||||
| epoch *model.Epoch, | ||||||
|
|
@@ -214,91 +213,79 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( | |||||
| *iconsensus.IConsensusClaimAccepted, | ||||||
| error, | ||||||
| ) { | ||||||
| ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) | ||||||
| ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) | ||||||
| if err != nil { | ||||||
| return nil, nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| // filter must match: | ||||||
| // - `ClaimAccepted` events | ||||||
| // - appContract == claim.IApplicationAddress | ||||||
| c, err := iconsensus.IConsensusMetaData.GetAbi() | ||||||
| topics, err := abi.MakeTopics( | ||||||
| []any{c.Events[model.MonitoredEvent_ClaimAccepted.String()].ID}, | ||||||
| []any{application.IApplicationAddress}, | ||||||
| oracle := newOracle(ic.GetNumberOfAcceptedClaims) | ||||||
| events := []*iconsensus.IConsensusClaimAccepted{} | ||||||
| filter := func( | ||||||
| opts *bind.FilterOpts, | ||||||
| _ []common.Address, | ||||||
| appContract []common.Address, | ||||||
| ) (*iconsensus.IConsensusClaimAcceptedIterator, error) { | ||||||
| return ic.FilterClaimAccepted(opts, appContract) | ||||||
| } | ||||||
| onHit := newOnHit(ctx, application.IApplicationAddress, filter, | ||||||
| func(it *iconsensus.IConsensusClaimAcceptedIterator) { | ||||||
| event := it.Event | ||||||
| if (len(events) == 0) && claimAcceptedEventMatches(application, epoch, event) { | ||||||
| events = append(events, event) | ||||||
| } else if len(events) != 0 { | ||||||
| events = append(events, event) | ||||||
| } | ||||||
| }, | ||||||
| ) | ||||||
|
|
||||||
| numAcceptedClaims, err := oracle(ctx, epoch.LastBlock) | ||||||
| if err != nil { | ||||||
| return nil, nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ | ||||||
| FromBlock: new(big.Int).SetUint64(epoch.LastBlock), | ||||||
| ToBlock: endBlock, | ||||||
| Addresses: []common.Address{application.IConsensusAddress}, | ||||||
| Topics: topics, | ||||||
| }) | ||||||
| _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numAcceptedClaims, oracle, onHit) | ||||||
| if err != nil { | ||||||
| return nil, nil, nil, err | ||||||
| return nil, nil, nil, fmt.Errorf("failed to walk ClaimAccepted transitions: %w", err) | ||||||
| } | ||||||
|
|
||||||
| // pull events instead of iterating | ||||||
| next, stop := iter.Pull2(it) | ||||||
| defer stop() | ||||||
| for { | ||||||
| event, ok, err := unwrapClaimAccepted(ic, next) | ||||||
| if !ok || err != nil { | ||||||
| return ic, event, nil, err | ||||||
| } | ||||||
| lastBlock := event.LastProcessedBlockNumber.Uint64() | ||||||
|
|
||||||
| if claimAcceptedEventMatches(application, epoch, event) { | ||||||
| // found the event, does it has a successor? try to fetch it | ||||||
| succ, ok, err := unwrapClaimAccepted(ic, next) | ||||||
| if !ok || err != nil { | ||||||
| return ic, event, nil, err | ||||||
| } | ||||||
| return ic, event, succ, err | ||||||
| } else if lastBlock > epoch.LastBlock { | ||||||
| err = fmt.Errorf("No matching claim, searched up to %v", event) | ||||||
| return nil, nil, nil, err | ||||||
| } | ||||||
| if len(events) == 0 { | ||||||
| return ic, nil, nil, nil | ||||||
| } else if len(events) == 1 { | ||||||
| return ic, events[0], nil, nil | ||||||
| } else { | ||||||
| return ic, events[0], events[1], nil | ||||||
| } | ||||||
mpolitzer marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| } | ||||||
|
|
||||||
| func (self *claimerBlockchain) getConsensusAddress( | ||||||
| func (cb *claimerBlockchain) getConsensusAddress( | ||||||
| ctx context.Context, | ||||||
| app *model.Application, | ||||||
| ) (common.Address, error) { | ||||||
| return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress) | ||||||
| return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress) | ||||||
| } | ||||||
|
|
||||||
| /* poll a transaction hash for its submission status and receipt */ | ||||||
| func (self *claimerBlockchain) pollTransaction( | ||||||
| // poll a transaction for its receipt | ||||||
| func (cb *claimerBlockchain) pollTransaction( | ||||||
| ctx context.Context, | ||||||
| txHash common.Hash, | ||||||
| endBlock *big.Int, | ||||||
| commitmentBlockNumber *big.Int, | ||||||
| ) (bool, *types.Receipt, error) { | ||||||
| _, isPending, err := self.client.TransactionByHash(ctx, txHash) | ||||||
| if err != nil || isPending { | ||||||
| return false, nil, err | ||||||
| } | ||||||
|
|
||||||
| receipt, err := self.client.TransactionReceipt(ctx, txHash) | ||||||
| receipt, err := cb.client.TransactionReceipt(ctx, txHash) | ||||||
| if err != nil { | ||||||
| return false, nil, err | ||||||
| } | ||||||
|
Comment on lines
+272
to
275
|
||||||
|
|
||||||
| if receipt.BlockNumber.Cmp(endBlock) >= 0 { | ||||||
| return false, receipt, err | ||||||
| // receipt must be committed before use. Return false until it is. | ||||||
| if receipt.BlockNumber.Cmp(commitmentBlockNumber) >= 0 { | ||||||
|
||||||
| if receipt.BlockNumber.Cmp(commitmentBlockNumber) >= 0 { | |
| if receipt.BlockNumber.Cmp(commitmentBlockNumber) > 0 { |
Uh oh!
There was an error while loading. Please reload this page.