Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 41 additions & 53 deletions internal/claimer/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,24 @@ type claimerBlockchain struct {
defaultBlock config.DefaultBlock
}

func (self *claimerBlockchain) submitClaimToBlockchain(
func (cb *claimerBlockchain) submitClaimToBlockchain(
ic *iconsensus.IConsensus,
application *model.Application,
epoch *model.Epoch,
) (common.Hash, error) {
txHash := common.Hash{}
lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock)
tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress,
tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress,
lastBlockNumber, *epoch.OutputsMerkleRoot)
if err != nil {
self.logger.Error("submitClaimToBlockchain:failed",
cb.logger.Error("submitClaimToBlockchain:failed",
"appContractAddress", application.IApplicationAddress,
"claimHash", *epoch.OutputsMerkleRoot,
"last_block", epoch.LastBlock,
"error", err)
} else {
txHash = tx.Hash()
self.logger.Debug("submitClaimToBlockchain:success",
cb.logger.Debug("submitClaimToBlockchain:success",
"appContractAddress", application.IApplicationAddress,
"claimHash", *epoch.OutputsMerkleRoot,
"last_block", epoch.LastBlock,
Expand All @@ -103,25 +103,21 @@ func (self *claimerBlockchain) submitClaimToBlockchain(
return txHash, err
}

func unwrapClaimSubmitted(
ic *iconsensus.IConsensus,
func takeEventAndParse[T any](
pull func() (log *types.Log, err error, ok bool),
) (
*iconsensus.IConsensusClaimSubmitted,
bool,
error,
) {
parse func(log types.Log) (*T, error),
) (*T, bool, error) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
}
ev, err := ic.ParseClaimSubmitted(*log)
return ev, true, err
ev, err := parse(*log)
return ev, err == nil, err
}

// scan the event stream for a claimSubmitted event that matches claim.
// return this event and its successor
func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc(
ctx context.Context,
application *model.Application,
epoch *model.Epoch,
Expand All @@ -132,7 +128,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
*iconsensus.IConsensusClaimSubmitted,
error,
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client)
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -142,6 +138,10 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
// - submitter == nil (any)
// - appContract == claim.IApplicationAddress
c, err := iconsensus.IConsensusMetaData.GetAbi()
if err != nil {
return nil, nil, nil, err
}

topics, err := abi.MakeTopics(
[]any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID},
nil,
Expand All @@ -151,7 +151,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
return nil, nil, nil, err
}

it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{
it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{application.IConsensusAddress},
Expand All @@ -165,15 +165,15 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimSubmitted(ic, next)
event, ok, err := takeEventAndParse(next, ic.ParseClaimSubmitted)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimSubmittedEventMatches(application, epoch, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimSubmitted(ic, next)
succ, ok, err := takeEventAndParse(next, ic.ParseClaimSubmitted)
if !ok || err != nil {
return ic, event, nil, err
}
Expand All @@ -185,25 +185,9 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
}
}

func unwrapClaimAccepted(
ic *iconsensus.IConsensus,
pull func() (log *types.Log, err error, ok bool),
) (
*iconsensus.IConsensusClaimAccepted,
bool,
error,
) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
}
ev, err := ic.ParseClaimAccepted(*log)
return ev, true, err
}

// scan the event stream for a claimAccepted event that matches claim.
// return this event and its successor
func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc(
ctx context.Context,
application *model.Application,
epoch *model.Epoch,
Expand All @@ -214,7 +198,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
*iconsensus.IConsensusClaimAccepted,
error,
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client)
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -231,7 +215,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
return nil, nil, nil, err
}

it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{
it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{application.IConsensusAddress},
Expand All @@ -245,15 +229,15 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimAccepted(ic, next)
event, ok, err := takeEventAndParse(next, ic.ParseClaimAccepted)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimAcceptedEventMatches(application, epoch, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimAccepted(ic, next)
succ, ok, err := takeEventAndParse(next, ic.ParseClaimAccepted)
if !ok || err != nil {
return ic, event, nil, err
}
Expand All @@ -265,40 +249,44 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
}
}

func (self *claimerBlockchain) getConsensusAddress(
func (cb *claimerBlockchain) getConsensusAddress(
ctx context.Context,
app *model.Application,
) (common.Address, error) {
return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress)
return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress)
}

/* poll a transaction hash for its submission status and receipt */
func (self *claimerBlockchain) pollTransaction(
// poll a transaction hash for its submission status and receipt. Wait until
// the receipt is older than commitment to avoid speculative execution, we don't
// want to be subject to reorgs.
// receipt returned value is only valid when (ready == true && error == nil)
func (cb *claimerBlockchain) pollTransaction(
ctx context.Context,
txHash common.Hash,
endBlock *big.Int,
commitmentBlockNumber *big.Int,
) (bool, *types.Receipt, error) {
_, isPending, err := self.client.TransactionByHash(ctx, txHash)
_, isPending, err := cb.client.TransactionByHash(ctx, txHash)
if err != nil || isPending {
return false, nil, err
}

receipt, err := self.client.TransactionReceipt(ctx, txHash)
receipt, err := cb.client.TransactionReceipt(ctx, txHash)
if err != nil {
return false, nil, err
}

if receipt.BlockNumber.Cmp(endBlock) >= 0 {
return false, receipt, err
// wait until receipt is older than commitment.
if receipt.BlockNumber.Cmp(commitmentBlockNumber) >= 0 {
return false, nil, err
}

return receipt.Status == 1, receipt, err
}

/* Retrieve the block number of "DefaultBlock" */
func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) {
// Retrieve the block number of a commitment level
func (cb *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) {
var nr int64
switch self.defaultBlock {
switch cb.defaultBlock {
case model.DefaultBlock_Pending:
nr = rpc.PendingBlockNumber.Int64()
case model.DefaultBlock_Latest:
Expand All @@ -308,10 +296,10 @@ func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, er
case model.DefaultBlock_Safe:
nr = rpc.SafeBlockNumber.Int64()
default:
return nil, fmt.Errorf("default block '%v' not supported", self.defaultBlock)
return nil, fmt.Errorf("default block '%v' not supported", cb.defaultBlock)
}

hdr, err := self.client.HeaderByNumber(ctx, big.NewInt(nr))
hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr))
if err != nil {
return nil, err
}
Expand Down
Loading
Loading