diff --git a/internal/claimer/blockchain.go b/internal/claimer/blockchain.go index e148460dd..8a6586ca4 100644 --- a/internal/claimer/blockchain.go +++ b/internal/claimer/blockchain.go @@ -77,24 +77,24 @@ type claimerBlockchain struct { defaultBlock config.DefaultBlock } -func (self *claimerBlockchain) submitClaimToBlockchain( +func (cb *claimerBlockchain) submitClaimToBlockchain( ic *iconsensus.IConsensus, application *model.Application, epoch *model.Epoch, ) (common.Hash, error) { txHash := common.Hash{} lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock) - tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress, + tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress, lastBlockNumber, *epoch.OutputsMerkleRoot) if err != nil { - self.logger.Error("submitClaimToBlockchain:failed", + cb.logger.Error("submitClaimToBlockchain:failed", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, "error", err) } else { txHash = tx.Hash() - self.logger.Debug("submitClaimToBlockchain:success", + cb.logger.Debug("submitClaimToBlockchain:success", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, @@ -103,25 +103,21 @@ func (self *claimerBlockchain) submitClaimToBlockchain( return txHash, err } -func unwrapClaimSubmitted( - ic *iconsensus.IConsensus, +func takeEventAndParse[T any]( pull func() (log *types.Log, err error, ok bool), -) ( - *iconsensus.IConsensusClaimSubmitted, - bool, - error, -) { + parse func(log types.Log) (*T, error), +) (*T, bool, error) { log, err, ok := pull() if !ok || err != nil { return nil, false, err } - ev, err := ic.ParseClaimSubmitted(*log) - return ev, true, err + ev, err := parse(*log) + return ev, err == nil, err } // scan the event stream for a claimSubmitted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( +func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -132,7 +128,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( *iconsensus.IConsensusClaimSubmitted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } @@ -142,6 +138,10 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( // - submitter == nil (any) // - appContract == claim.IApplicationAddress c, err := iconsensus.IConsensusMetaData.GetAbi() + if err != nil { + return nil, nil, nil, err + } + topics, err := abi.MakeTopics( []any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID}, nil, @@ -151,7 +151,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( return nil, nil, nil, err } - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ + it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{ FromBlock: new(big.Int).SetUint64(epoch.LastBlock), ToBlock: endBlock, Addresses: []common.Address{application.IConsensusAddress}, @@ -165,7 +165,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( next, stop := iter.Pull2(it) defer stop() for { - event, ok, err := unwrapClaimSubmitted(ic, next) + event, ok, err := takeEventAndParse(next, ic.ParseClaimSubmitted) if !ok || err != nil { return ic, event, nil, err } @@ -173,7 +173,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( if claimSubmittedEventMatches(application, epoch, event) { // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimSubmitted(ic, next) + succ, ok, err := takeEventAndParse(next, ic.ParseClaimSubmitted) if !ok || err != nil { return ic, event, nil, err } @@ -185,25 +185,9 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( } } -func unwrapClaimAccepted( - ic *iconsensus.IConsensus, - pull func() (log *types.Log, err error, ok bool), -) ( - *iconsensus.IConsensusClaimAccepted, - bool, - error, -) { - log, err, ok := pull() - if !ok || err != nil { - return nil, false, err - } - ev, err := ic.ParseClaimAccepted(*log) - return ev, true, err -} - // scan the event stream for a claimAccepted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( +func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -214,7 +198,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( *iconsensus.IConsensusClaimAccepted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } @@ -231,7 +215,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( return nil, nil, nil, err } - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ + it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{ FromBlock: new(big.Int).SetUint64(epoch.LastBlock), ToBlock: endBlock, Addresses: []common.Address{application.IConsensusAddress}, @@ -245,7 +229,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( next, stop := iter.Pull2(it) defer stop() for { - event, ok, err := unwrapClaimAccepted(ic, next) + event, ok, err := takeEventAndParse(next, ic.ParseClaimAccepted) if !ok || err != nil { return ic, event, nil, err } @@ -253,7 +237,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( if claimAcceptedEventMatches(application, epoch, event) { // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimAccepted(ic, next) + succ, ok, err := takeEventAndParse(next, ic.ParseClaimAccepted) if !ok || err != nil { return ic, event, nil, err } @@ -265,40 +249,44 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( } } -func (self *claimerBlockchain) getConsensusAddress( +func (cb *claimerBlockchain) getConsensusAddress( ctx context.Context, app *model.Application, ) (common.Address, error) { - return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress) + return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress) } -/* poll a transaction hash for its submission status and receipt */ -func (self *claimerBlockchain) pollTransaction( +// poll a transaction hash for its submission status and receipt. Wait until +// the receipt is older than commitment to avoid speculative execution, we don't +// want to be subject to reorgs. +// receipt returned value is only valid when (ready == true && error == nil) +func (cb *claimerBlockchain) pollTransaction( ctx context.Context, txHash common.Hash, - endBlock *big.Int, + commitmentBlockNumber *big.Int, ) (bool, *types.Receipt, error) { - _, isPending, err := self.client.TransactionByHash(ctx, txHash) + _, isPending, err := cb.client.TransactionByHash(ctx, txHash) if err != nil || isPending { return false, nil, err } - receipt, err := self.client.TransactionReceipt(ctx, txHash) + receipt, err := cb.client.TransactionReceipt(ctx, txHash) if err != nil { return false, nil, err } - if receipt.BlockNumber.Cmp(endBlock) >= 0 { - return false, receipt, err + // wait until receipt is older than commitment. + if receipt.BlockNumber.Cmp(commitmentBlockNumber) >= 0 { + return false, nil, err } return receipt.Status == 1, receipt, err } -/* Retrieve the block number of "DefaultBlock" */ -func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) { +// Retrieve the block number of a commitment level +func (cb *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) { var nr int64 - switch self.defaultBlock { + switch cb.defaultBlock { case model.DefaultBlock_Pending: nr = rpc.PendingBlockNumber.Int64() case model.DefaultBlock_Latest: @@ -308,10 +296,10 @@ func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, er case model.DefaultBlock_Safe: nr = rpc.SafeBlockNumber.Int64() default: - return nil, fmt.Errorf("default block '%v' not supported", self.defaultBlock) + return nil, fmt.Errorf("default block '%v' not supported", cb.defaultBlock) } - hdr, err := self.client.HeaderByNumber(ctx, big.NewInt(nr)) + hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr)) if err != nil { return nil, err } diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index f852d448f..c784b3a87 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -104,7 +104,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( apps map[int64]*model.Application, endBlock *big.Int, ) []error { - errs := []error{} + var errs []error var err error // check claims in flight @@ -130,7 +130,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) if err != nil { errs = append(errs, err) - return errs + continue } s.Logger.Info("Claim submitted", "app", apps[key].IApplicationAddress, @@ -147,13 +147,14 @@ func (s *Service) submitClaimsAndUpdateDatabase( } // check computed epochs +nextApp: for key, currEpoch := range computedEpochs { var ic *iconsensus.IConsensus var prevClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted var currClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted if _, isClaimInFlight := s.claimsInFlight[key]; isClaimInFlight { - continue + continue nextApp } app := apps[key] // guaranteed to exist because of the query and database constraints @@ -163,8 +164,10 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } + + // are we dealing with cases {3, 4} or {1, 2}. Do the checks if previousEpochExists { err := checkEpochSequenceConstraint(prevEpoch, currEpoch) if err != nil { @@ -181,17 +184,17 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } - // the previous epoch must have a matching claim submission event. - // current epoch may or may not be present + // previous epoch must have a matching claim submission event. + // current epoch may or may not have one at this point. ic, prevClaimSubmissionEvent, currClaimSubmissionEvent, err = s.blockchain.findClaimSubmittedEventAndSucc(s.Context, app, prevEpoch, endBlock) if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevClaimSubmissionEvent == nil { err = s.setApplicationInoperable( @@ -205,7 +208,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if !claimSubmittedEventMatches(app, prevEpoch, prevClaimSubmissionEvent) { s.Logger.Error("event mismatch", @@ -225,7 +228,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } else { // first claim @@ -234,10 +237,12 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } + // claim submission must match the current epoch if we found it. + // otherwise we submit it if the submission flag is enabled. if currClaimSubmissionEvent != nil { s.Logger.Debug("Found ClaimSubmitted Event", "app", currClaimSubmissionEvent.AppContract, @@ -254,7 +259,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.Logger.Debug("Updating claim status to submitted", "app", app.IApplicationAddress, @@ -271,7 +276,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } delete(s.claimsInFlight, key) s.Logger.Info("Claim previously submitted", @@ -287,7 +292,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( "claim_hash", fmt.Sprintf("%x", prevEpoch.OutputsMerkleRoot), "last_block", prevEpoch.LastBlock, ) - goto nextApp + continue nextApp } s.Logger.Debug("Submitting claim to blockchain", "app", app.IApplicationAddress, @@ -298,7 +303,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.claimsInFlight[key] = txHash } else { @@ -309,7 +314,6 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) } - nextApp: } return errs } @@ -325,6 +329,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( var err error // check submitted claims +nextApp: for key, submittedEpoch := range submittedEpochs { var prevEvent *iconsensus.IConsensusClaimAccepted var currEvent *iconsensus.IConsensusClaimAccepted @@ -335,7 +340,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevExists { err := checkEpochSequenceConstraint(acceptedEpoch, submittedEpoch) @@ -348,7 +353,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } // if prevClaimRow exists, there must be a matching event @@ -357,7 +362,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevEvent == nil { s.Logger.Error("Missing event", @@ -367,7 +372,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, ErrMissingEvent) - goto nextApp + continue nextApp } if !claimAcceptedEventMatches(app, acceptedEpoch, prevEvent) { s.Logger.Error("Event mismatch", @@ -378,7 +383,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, ErrEventMismatch) - goto nextApp + continue nextApp } } else { // first claim @@ -387,7 +392,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } @@ -405,7 +410,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, ErrEventMismatch) - goto nextApp + continue nextApp } s.Logger.Debug("Updating claim status to accepted", "app", app.IApplicationAddress, @@ -417,7 +422,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.Logger.Info("Claim accepted", "app", currEvent.AppContract, @@ -427,7 +432,6 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "tx", txHash, ) } - nextApp: } return errs } @@ -481,7 +485,7 @@ func checkEpochConstraint(c *model.Epoch) error { if c.FirstBlock > c.LastBlock { return fmt.Errorf("unexpected epoch state. first_block: %v > last_block: %v", c.FirstBlock, c.LastBlock) } - if c.Status == model.EpochStatus_ClaimSubmitted { + if c.Status == model.EpochStatus_ClaimSubmitted || c.Status == model.EpochStatus_ClaimComputed || c.Status == model.EpochStatus_ClaimAccepted { if c.OutputsMerkleRoot == nil { return fmt.Errorf("unexpected epoch state. missing claim_hash.") } @@ -520,12 +524,14 @@ func checkEpochSequenceConstraint(prevEpoch *model.Epoch, currEpoch *model.Epoch func claimSubmittedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimSubmitted) bool { return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } func claimAcceptedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimAccepted) bool { return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } diff --git a/internal/claimer/service.go b/internal/claimer/service.go index 0efa3c6f4..411ac2129 100644 --- a/internal/claimer/service.go +++ b/internal/claimer/service.go @@ -34,9 +34,14 @@ type CreateInfo struct { type Service struct { service.Service - repository iclaimerRepository - blockchain iclaimerBlockchain - claimsInFlight map[int64]common.Hash // application.ID -> txHash + repository iclaimerRepository + blockchain iclaimerBlockchain + + // submitted claims waiting for confirmation from the blockchain. + // only accessed from tick, so no need for a lock + // contains: application ID -> transaction hash, with a maximum of one + // key per application due to the epoch advancement logic. + claimsInFlight map[int64]common.Hash submissionEnabled bool } @@ -197,6 +202,6 @@ func setupPersistentConfig( return &config.Value, nil } - logger.Error("Could not retrieve persistent config from Database. %w", "error", err) + logger.Error("Could not retrieve persistent config from Database", "error", err) return nil, err } diff --git a/internal/repository/repotest/builders.go b/internal/repository/repotest/builders.go index e4fac2969..ad31ba0d6 100644 --- a/internal/repository/repotest/builders.go +++ b/internal/repository/repotest/builders.go @@ -75,6 +75,11 @@ func (b *ApplicationBuilder) WithConsensus(c Consensus) *ApplicationBuilder { return b } +func (b *ApplicationBuilder) WithConsensusAddress(addr common.Address) *ApplicationBuilder { + b.app.IConsensusAddress = addr + return b +} + func (b *ApplicationBuilder) WithState(s ApplicationState) *ApplicationBuilder { b.app.State = s return b