Skip to content

Commit

Permalink
stateproof: ignore TxPoolSize for SP transactions (#4325)
Browse files Browse the repository at this point in the history
Allow a single SP transaction to the pool when the pool is full. Allow again after a round.
  • Loading branch information
algonautshant authored Aug 6, 2022
1 parent 5106ea8 commit eaeeade
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 5 deletions.
23 changes: 19 additions & 4 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type TransactionPool struct {

// proposalAssemblyTime is the ProposalAssemblyTime configured for this node.
proposalAssemblyTime time.Duration

// stateproofOverflowed indicates that a stateproof transaction was allowed to
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool
}

// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
Expand Down Expand Up @@ -238,6 +242,7 @@ func (pool *TransactionPool) rememberCommit(flush bool) {

if flush {
pool.pendingTxGroups = pool.rememberedTxGroups
pool.stateproofOverflowed = false
pool.pendingTxids = pool.rememberedTxids
pool.ledger.VerifiedTransactionCache().UpdatePinned(pool.pendingTxids)
} else {
Expand Down Expand Up @@ -270,12 +275,22 @@ func (pool *TransactionPool) pendingCountNoLock() int {
}

// checkPendingQueueSize tests to see if we can grow the pending group transaction list
// by adding txCount more transactions. The limits comes from the total number of transactions
// by adding len(txnGroup) more transactions. The limits comes from the total number of transactions
// and not from the total number of transaction groups.
// As long as we haven't surpassed the size limit, we should be good to go.
func (pool *TransactionPool) checkPendingQueueSize(txCount int) error {
func (pool *TransactionPool) checkPendingQueueSize(txnGroup []transactions.SignedTxn) error {
pendingSize := pool.pendingTxIDsCount()
txCount := len(txnGroup)
if pendingSize+txCount > pool.txPoolMaxSize {
// Allow the state proof transaction to go over the txPoolMaxSize if it already didn't
if len(txnGroup) == 1 && txnGroup[0].Txn.Type == protocol.StateProofTx {
pool.pendingMu.Lock()
defer pool.pendingMu.Unlock()
if !pool.stateproofOverflowed {
pool.stateproofOverflowed = true
return nil
}
}
return fmt.Errorf("TransactionPool.checkPendingQueueSize: transaction pool have reached capacity")
}
return nil
Expand Down Expand Up @@ -356,7 +371,7 @@ func (pool *TransactionPool) checkSufficientFee(txgroup []transactions.SignedTxn
// Test performs basic duplicate detection and well-formedness checks
// on a transaction group without storing the group.
func (pool *TransactionPool) Test(txgroup []transactions.SignedTxn) error {
if err := pool.checkPendingQueueSize(len(txgroup)); err != nil {
if err := pool.checkPendingQueueSize(txgroup); err != nil {
return err
}

Expand Down Expand Up @@ -443,7 +458,7 @@ func (pool *TransactionPool) RememberOne(t transactions.SignedTxn) error {
// Remember stores the provided transaction group.
// Precondition: Only Remember() properly-signed and well-formed transactions (i.e., ensure t.WellFormed())
func (pool *TransactionPool) Remember(txgroup []transactions.SignedTxn) error {
if err := pool.checkPendingQueueSize(len(txgroup)); err != nil {
if err := pool.checkPendingQueueSize(txgroup); err != nil {
return err
}

Expand Down
1 change: 0 additions & 1 deletion data/transactions/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ func (tx Transaction) WellFormed(spec SpecialAddresses, proto config.ConsensusPa
if !suppliesNullKeys {
return errKeyregTxnGoingOnlineWithNonParticipating
}

}

if err := tx.stateProofPKWellFormed(proto); err != nil {
Expand Down
218 changes: 218 additions & 0 deletions test/e2e-go/features/stateproofs/stateproofs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -821,3 +823,219 @@ func TestTotalWeightChanges(t *testing.T) {

a.Equalf(int(consensusParams.StateProofInterval*expectedNumberOfStateProofs), int(lastStateProofBlock.Round()), "the expected last state proof block wasn't the one that was observed")
}

func TestSPWithTXPoolFull(t *testing.T) {
partitiontest.PartitionTest(t)
defer fixtures.ShutdownSynchronizedTest(t)

a := require.New(fixtures.SynchronizedTest(t))

var fixture fixtures.RestClientFixture
configurableConsensus := make(config.ConsensusProtocols)
consensusParams := getDefaultStateProofConsensusParams()
consensusParams.StateProofInterval = 4
configurableConsensus[protocol.ConsensusFuture] = consensusParams

fixture.SetConsensus(configurableConsensus)
fixture.SetupNoStart(t, filepath.Join("nettemplates", "TwoNodes50EachFuture.json"))

dir, err := fixture.GetNodeDir("Primary")
a.NoError(err)

cfg, err := config.LoadConfigFromDisk(dir)
a.NoError(err)
cfg.TxPoolSize = 0
cfg.SaveToDisk(dir)

dir, err = fixture.GetNodeDir("Node")
a.NoError(err)
cfg.SaveToDisk(dir)

fixture.Start()
defer fixture.Shutdown()

relay := fixture.GetLibGoalClientForNamedNode("Primary")

params, err := relay.SuggestedParams()
require.NoError(t, err)

var genesisHash crypto.Digest
copy(genesisHash[:], params.GenesisHash)

round := uint64(0)
for round < uint64(20) {
params, err = relay.SuggestedParams()
require.NoError(t, err)

round = params.LastRound
err = fixture.WaitForRound(round+1, 6*time.Second)
require.NoError(t, err)

b, err := relay.Block(round + 1)
require.NoError(t, err)
if len(b.Transactions.Transactions) == 0 {
continue
}
require.Equal(t, string(protocol.StateProofTx), b.Transactions.Transactions[0].Type)
require.Equal(t, uint64(8), b.Transactions.Transactions[0].StateProof.StateProofIntervalLatestRound)
break
}
require.Less(t, round, uint64(20))
}

type specialAddr string

func (a specialAddr) ToBeHashed() (protocol.HashID, []byte) {
return protocol.SpecialAddr, []byte(a)
}

// TestSPWithCounterReset tests if the state proof transaction is getting into the pool and the blcok
// when the transaction pool is full (TxPoolSize=0) and when there is bad sp and payment transaction traffic.
func TestSPWithCounterReset(t *testing.T) {
partitiontest.PartitionTest(t)
defer fixtures.ShutdownSynchronizedTest(t)

a := require.New(fixtures.SynchronizedTest(t))

var fixture fixtures.RestClientFixture
configurableConsensus := make(config.ConsensusProtocols)
consensusParams := getDefaultStateProofConsensusParams()
consensusParams.StateProofInterval = 4
configurableConsensus[protocol.ConsensusFuture] = consensusParams

fixture.SetConsensus(configurableConsensus)
fixture.SetupNoStart(t, filepath.Join("nettemplates", "OneNodeFuture.json"))

dir, err := fixture.GetNodeDir("Primary")
a.NoError(err)

cfg, err := config.LoadConfigFromDisk(dir)
a.NoError(err)
cfg.TxPoolSize = 0
cfg.SaveToDisk(dir)

fixture.Start()
defer fixture.Shutdown()

relay := fixture.GetLibGoalClientForNamedNode("Primary")

params, err := relay.SuggestedParams()
require.NoError(t, err)

var genesisHash crypto.Digest
copy(genesisHash[:], params.GenesisHash)

wg := sync.WaitGroup{}
var done uint32

defer func() {
atomic.StoreUint32(&done, uint32(1))
wg.Wait()
}()

params, err = relay.SuggestedParams()
require.NoError(t, err)
stxn := getWellformedSPTransaction(params.LastRound+1, genesisHash, consensusParams, t)

// Send well formed but bad stateproof transactions from two goroutines
for spSpam := 0; spSpam < 2; spSpam++ {
wg.Add(1)
go func() {
defer wg.Done()
for atomic.LoadUint32(&done) != 1 {
_, err := relay.BroadcastTransaction(stxn)
// The pool is full, and only one SP transaction will be admitted in per round. Otherwise, pool is full error will be returned
// However, if this is the lucky SP transaction to get into the pool, it will eventually be rejected by ValidateStateProof and a different
// error will be returned
require.Error(t, err)
time.Sleep(25 * time.Millisecond)
}
}()
}

// Send payment transactions from two goroutines
for txnSpam := 0; txnSpam < 2; txnSpam++ {
wg.Add(1)
go func(amt uint64) {
defer wg.Done()
cntr := uint64(1)
params, err := relay.SuggestedParams()
require.NoError(t, err)

ps := paymentSender{
from: accountFetcher{nodeName: "Primary", accountNumber: 0},
amount: amt,
}
account0 := ps.from.getAccount(a, &fixture)

for atomic.LoadUint32(&done) != 1 {
ps.amount = cntr
cntr = cntr + 1
// ignore the returned error (most of the time will be error)
_, err := relay.SendPaymentFromUnencryptedWallet(account0, account0, params.Fee, ps.amount, []byte{byte(params.LastRound)})
require.Error(t, err)
require.Equal(t, "HTTP 400 Bad Request: TransactionPool.checkPendingQueueSize: transaction pool have reached capacity", err.Error())
time.Sleep(25 * time.Millisecond)
}
}(uint64(txnSpam + 1))
}

// Check that the first 2 stateproofs are added to the blockchain
round := uint64(0)
expectedSPRound := consensusParams.StateProofInterval * 2
for round < consensusParams.StateProofInterval*5 {
round = params.LastRound

err := fixture.WaitForRound(round+1, 6*time.Second)
require.NoError(t, err)

b, err := relay.Block(round + 1)
require.NoError(t, err)

params, err = relay.SuggestedParams()
require.NoError(t, err)
if len(b.Transactions.Transactions) == 0 {
continue
}
tid := 0
// Find a SP transaction in the block. The SP should be for StateProofIntervalLatestRound expectedSPRound
// Since the pool is full, only one additional SP transaction is allowed in. So only one SP can be added to be block
// break after finding it, and look for the next one in a subsequent block
// In case two SP transactions get into the same block, the following loop will not find the second one, and fail the test
for ; tid < len(b.Transactions.Transactions); tid++ {
if b.Transactions.Transactions[tid].Type == string(protocol.StateProofTx) {
require.Equal(t, string(protocol.StateProofTx), b.Transactions.Transactions[tid].Type)
require.Equal(t, int(expectedSPRound), int(b.Transactions.Transactions[tid].StateProof.StateProofIntervalLatestRound))
expectedSPRound = expectedSPRound + consensusParams.StateProofInterval
break
}
}
if expectedSPRound == consensusParams.StateProofInterval*4 {
break
}
}
// If waited till round 20 and did not yet get the stateproof with last round 12, fail the test
require.Less(t, round, consensusParams.StateProofInterval*5)
}

func getWellformedSPTransaction(round uint64, genesisHash crypto.Digest, consensusParams config.ConsensusParams, t *testing.T) (stxn transactions.SignedTxn) {

msg := stateproofmsg.Message{}
proof := &sp.StateProof{}
proto := consensusParams

stxn.Txn.Type = protocol.StateProofTx
stxn.Txn.Sender = transactions.StateProofSender
stxn.Txn.FirstValid = basics.Round(round)
stxn.Txn.LastValid = basics.Round(round + 1000)
stxn.Txn.GenesisHash = genesisHash
stxn.Txn.StateProofIntervalLastRound = basics.Round(round - 1)
stxn.Txn.StateProofType = protocol.StateProofBasic
stxn.Txn.StateProof = *proof
stxn.Txn.Message = msg

err := stxn.Txn.WellFormed(transactions.SpecialAddresses{}, proto)
require.NoError(t, err)

return stxn
}

0 comments on commit eaeeade

Please sign in to comment.