Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
pool.reserver.Release(from)
pool.releaseReservation(from)
}
}()
}
Expand Down Expand Up @@ -1120,6 +1120,19 @@ func (pool *LegacyPool) get(hash common.Hash) *types.Transaction {
return pool.all.Get(hash)
}

type ownerReserver interface {
Owns(common.Address) bool
}

func (pool *LegacyPool) releaseReservation(addr common.Address) {
if ownerAware, ok := pool.reserver.(ownerReserver); ok {
if !ownerAware.Owns(addr) {
return
}
}
_ = pool.reserver.Release(addr)
}

// Has returns an indicator whether txpool has a transaction cached with the
// given hash.
func (pool *LegacyPool) Has(hash common.Hash) bool {
Expand Down Expand Up @@ -1153,7 +1166,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
_, hasQueued = pool.queue.get(addr)
)
if !hasPending && !hasQueued {
pool.reserver.Release(addr)
pool.releaseReservation(addr)
}
}()
}
Expand Down Expand Up @@ -1511,7 +1524,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
for _, addr := range removedAddresses {
_, hasPending := pool.pending[addr]
if !hasPending {
pool.reserver.Release(addr)
pool.releaseReservation(addr)
}
}
return promoted
Expand Down Expand Up @@ -1611,7 +1624,7 @@ func (pool *LegacyPool) truncateQueue() {
for _, addr := range removedAddresses {
_, hasPending := pool.pending[addr]
if !hasPending {
pool.reserver.Release(addr)
pool.releaseReservation(addr)
}
}
}
Expand Down Expand Up @@ -1676,7 +1689,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
delete(pool.pending, addr)
pendingAddrsGauge.Dec(1)
if _, ok := pool.queue.get(addr); !ok {
pool.reserver.Release(addr)
pool.releaseReservation(addr)
}
}
}
Expand Down Expand Up @@ -1932,11 +1945,11 @@ func (pool *LegacyPool) Clear() {
// acquire the subpool lock until the transaction addition is completed.
for addr := range pool.pending {
if _, ok := pool.queue.get(addr); !ok {
pool.reserver.Release(addr)
pool.releaseReservation(addr)
}
}
for _, addr := range pool.queue.addresses() {
pool.reserver.Release(addr)
pool.releaseReservation(addr)
}
pool.all.Clear()
pool.priced.Reheap()
Expand Down
48 changes: 48 additions & 0 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ func (r *reserver) Has(address common.Address) bool {
return false // reserver only supports a single pool
}

func (r *reserver) Owns(address common.Address) bool {
r.lock.RLock()
defer r.lock.RUnlock()
_, exists := r.accounts[address]
return exists
}

func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.PrivateKey) {
diskdb := rawdb.NewMemoryDatabase()
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(diskdb))
Expand Down Expand Up @@ -492,6 +499,47 @@ func TestPromoteSpecialTxOverflowReturnsErrorWithoutMutation(t *testing.T) {
}
}

func TestPromoteExecutablesQueueEmptyWithoutReservation(t *testing.T) {
t.Parallel()

diskdb := rawdb.NewMemoryDatabase()
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(diskdb))
chain := newTestBlockChain(params.TestChainConfig, 10000000, statedb, new(event.Feed))

key, err := crypto.GenerateKey()
if err != nil {
t.Fatalf("failed to generate key: %v", err)
}
addr := crypto.PubkeyToAddress(key.PublicKey)
statedb.AddBalance(addr, big.NewInt(1_000_000_000_000_000), tracing.BalanceChangeUnspecified)

r := &reserver{accounts: make(map[common.Address]struct{})}
pool := New(testTxPoolConfig, chain)
if err := pool.Init(testTxPoolConfig.PriceLimit, chain.CurrentBlock(), r); err != nil {
t.Fatalf("failed to init pool: %v", err)
}
defer pool.Close()
<-pool.initDoneCh

queuedTx := pricedTransaction(5, 100000, new(big.Int).Add(new(big.Int).Set(common.MinGasPrice), big.NewInt(1)), key)
if err := pool.addRemoteSync(queuedTx); err != nil {
t.Fatalf("failed to add queued tx: %v", err)
}

r.lock.Lock()
delete(r.accounts, addr)
r.lock.Unlock()

pool.mu.Lock()
pool.currentState.SetNonce(addr, 10)
pool.promoteExecutables([]common.Address{addr})
pool.mu.Unlock()

if _, ok := pool.queue.get(addr); ok {
t.Fatal("queue should be empty after stale tx is dropped")
}
}

type testChain struct {
*testBlockChain
address common.Address
Expand Down
9 changes: 9 additions & 0 deletions core/txpool/reserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,12 @@ func (h *ReservationHandle) Has(address common.Address) bool {
id, exists := h.tracker.accounts[address]
return exists && id != h.id
}

// Owns reports whether this handle currently owns the reservation for address.
func (h *ReservationHandle) Owns(address common.Address) bool {
h.tracker.lock.RLock()
defer h.tracker.lock.RUnlock()

id, exists := h.tracker.accounts[address]
return exists && id == h.id
}
Loading