Skip to content

Commit

Permalink
Merge pull request #3 from blocknative/mempool_feed_stage
Browse files Browse the repository at this point in the history
Mempool Feed Initial Release
  • Loading branch information
dmarzzz authored Nov 17, 2021
2 parents f94c7dd + 1ae8391 commit 7bb5abc
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 1 deletion.
10 changes: 10 additions & 0 deletions accounts/abi/bind/backends/dropped_tx_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package backends

import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/event"
)

func (fb *filterBackend) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription {
return nullSubscription()
}
20 changes: 20 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// DropTxsEvent is posted when a batch of transactions are removed from the transaction pool
type DropTxsEvent struct {
Txs []*types.Transaction
Reason string
Replacement *types.Transaction
}

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand All @@ -41,3 +48,16 @@ type ChainSideEvent struct {
}

type ChainHeadEvent struct{ Block *types.Block }

const (
dropUnderpriced = "underpriced-txs"
dropLowNonce = "low-nonce-txs"
dropUnpayable = "unpayable-txs"

dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
dropReplaced = "replaced-txs"
dropUnexecutable = "unexecutable-txs"
dropTruncating = "truncating-txs"
dropOld = "old-txs"
dropGasPriceUpdated = "updated-gas-price"
)
75 changes: 74 additions & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ type TxPool struct {
chain blockChain
gasPrice *big.Int
txFeed event.Feed
dropTxFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
Expand Down Expand Up @@ -387,6 +388,10 @@ func (pool *TxPool) loop() {
for _, tx := range list {
pool.removeTx(tx.Hash(), true)
}
pool.dropTxFeed.Send(DropTxsEvent{
Txs: list,
Reason: dropOld,
})
queuedEvictionMeter.Mark(int64(len(list)))
}
}
Expand Down Expand Up @@ -426,6 +431,12 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscripti
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// SubscribeDropTxsEvent registers a subscription of DropTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeDropTxsEvent(ch chan<- DropTxsEvent) event.Subscription {
return pool.scope.Track(pool.dropTxFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand All @@ -450,6 +461,10 @@ func (pool *TxPool) SetGasPrice(price *big.Int) {
pool.removeTx(tx.Hash(), false)
}
pool.priced.Removed(len(drop))
pool.dropTxFeed.Send(DropTxsEvent{
Txs: drop,
Reason: dropGasPriceUpdated,
})
}

log.Info("Transaction pool price threshold updated", "price", price)
Expand Down Expand Up @@ -701,6 +716,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)
}
pool.dropTxFeed.Send(DropTxsEvent{
Txs: drop,
Reason: dropUnderpriced,
})
}
// Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated
Expand All @@ -716,6 +735,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
pool.dropTxFeed.Send(DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
Replacement: tx,
})
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
Expand Down Expand Up @@ -767,6 +791,10 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
pool.dropTxFeed.Send(DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
})
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
Expand Down Expand Up @@ -823,6 +851,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
pool.dropTxFeed.Send(DropTxsEvent{
Txs: []*types.Transaction{old},
Reason: dropReplaced,
})
} else {
// Nothing was replaced, bump the pending counter
pendingGauge.Inc(1)
Expand Down Expand Up @@ -1015,6 +1047,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
pool.pendingNonces.setIfLower(addr, tx.Nonce())
// Reduce the pending counter
pendingGauge.Dec(int64(1 + len(invalids)))
pool.dropTxFeed.Send(DropTxsEvent{
Txs: invalids,
Reason: dropUnexecutable,
})
return
}
}
Expand Down Expand Up @@ -1325,6 +1361,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
pool.all.Remove(hash)
}
log.Trace("Removed old queued transactions", "count", len(forwards))
pool.dropTxFeed.Send(DropTxsEvent{
Txs: forwards,
Reason: dropLowNonce,
})
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
Expand All @@ -1333,6 +1373,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
pool.dropTxFeed.Send(DropTxsEvent{
Txs: drops,
Reason: dropUnpayable,
})

// Gather all executable transactions and promote them
readies := list.Ready(pool.pendingNonces.get(addr))
Expand All @@ -1355,6 +1399,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
pool.dropTxFeed.Send(DropTxsEvent{
Txs: caps,
Reason: dropAccountCap,
})
}
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
Expand Down Expand Up @@ -1419,6 +1467,10 @@ func (pool *TxPool) truncatePending() {
pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.dropTxFeed.Send(DropTxsEvent{
Txs: caps,
Reason: dropAccountCap,
})
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
Expand Down Expand Up @@ -1446,6 +1498,10 @@ func (pool *TxPool) truncatePending() {
pool.pendingNonces.setIfLower(addr, tx.Nonce())
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.dropTxFeed.Send(DropTxsEvent{
Txs: caps,
Reason: dropAccountCap,
})
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
Expand Down Expand Up @@ -1486,9 +1542,14 @@ func (pool *TxPool) truncateQueue() {

// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
txs := list.Flatten()
for _, tx := range txs {
pool.removeTx(tx.Hash(), true)
}
pool.dropTxFeed.Send(DropTxsEvent{
Txs: txs,
Reason: dropTruncating,
})
drop -= size
queuedRateLimitMeter.Mark(int64(size))
continue
Expand All @@ -1499,6 +1560,10 @@ func (pool *TxPool) truncateQueue() {
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitMeter.Mark(1)
pool.dropTxFeed.Send(DropTxsEvent{
Txs: []*types.Transaction{txs[i]},
Reason: dropTruncating,
})
}
}
}
Expand All @@ -1522,13 +1587,21 @@ func (pool *TxPool) demoteUnexecutables() {
pool.all.Remove(hash)
log.Trace("Removed old pending transaction", "hash", hash)
}
pool.dropTxFeed.Send(DropTxsEvent{
Txs: olds,
Reason: dropLowNonce,
})
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash)
}
pool.dropTxFeed.Send(DropTxsEvent{
Txs: drops,
Reason: dropUnpayable,
})
pendingNofundsMeter.Mark(int64(len(drops)))

for _, tx := range invalids {
Expand Down
Loading

0 comments on commit 7bb5abc

Please sign in to comment.