Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf test: Transaction group handle/verify #4652

Merged
merged 12 commits into from
Oct 24, 2022
320 changes: 320 additions & 0 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package data

import (
"encoding/binary"
"fmt"
"io"
"math/rand"
"strings"
"sync"
"testing"
"time"

Expand All @@ -38,6 +41,7 @@ import (
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
)

func BenchmarkTxHandlerProcessing(b *testing.B) {
Expand Down Expand Up @@ -248,3 +252,319 @@ func BenchmarkTxHandlerDecoderMsgp(b *testing.B) {
require.Equal(b, benchTxnNum, idx)
}
}

func TestIncomingTxHandle(t *testing.T) {
incomingTxHandlerProcessing(1, t)
}

func TestIncomingTxGroupHandle(t *testing.T) {
incomingTxHandlerProcessing(proto.MaxTxGroupSize, t)
}

// incomingTxHandlerProcessing is a comprehensive transaction handling test
// It handles the singed transactions by passing them to the backlog for verification
func incomingTxHandlerProcessing(maxGroupSize int, t *testing.T) {
const numUsers = 100
numberOfTransactionGroups := 1000
log := logging.TestingLog(t)
log.SetLevel(logging.Warn)
addresses := make([]basics.Address, numUsers)
secrets := make([]*crypto.SignatureSecrets, numUsers)

// prepare the accounts
genesis := make(map[basics.Address]basics.AccountData)
for i := 0; i < numUsers; i++ {
secret := keypair()
addr := basics.Address(secret.SignatureVerifier)
secrets[i] = secret
addresses[i] = addr
genesis[addr] = basics.AccountData{
Status: basics.Online,
MicroAlgos: basics.MicroAlgos{Raw: 10000000000000},
}
}
genesis[poolAddr] = basics.AccountData{
Status: basics.NotParticipating,
MicroAlgos: basics.MicroAlgos{Raw: config.Consensus[protocol.ConsensusCurrentVersion].MinBalance},
}

require.Equal(t, len(genesis), numUsers+1)
genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
ledgerName := fmt.Sprintf("%s-mem-%d", t.Name(), numberOfTransactionGroups)
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(t, err)

l := ledger
tp := pools.MakeTransactionPool(l.Ledger, cfg, logging.Base())
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
handler := MakeTxHandler(tp, l, &mocks.MockNetwork{}, "", crypto.Digest{}, backlogPool)
defer handler.ctxCancel()

outChan := make(chan *txBacklogMsg, 10)
wg := sync.WaitGroup{}
wg.Add(1)
// Make a test backlog worker, which is simiar to backlogWorker, but sends the results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment in txHandler.go at backlogWorker() noting that updates should be reflected in this test

// through the outChan instead of passing it to postprocessCheckedTxn
go func() {
defer wg.Done()
defer close(outChan)
for {
// prioritize the postVerificationQueue
select {
case wi, ok := <-handler.postVerificationQueue:
if !ok {
return
}
outChan <- wi
// restart the loop so that we could empty out the post verification queue.
continue
default:
}

// we have no more post verification items. wait for either backlog queue item or post verification item.
select {
case wi, ok := <-handler.backlogQueue:
if !ok {
// shut down to end the test
handler.txVerificationPool.Shutdown()
close(handler.postVerificationQueue)
// wait until all the pending responses are obtained.
// this is not in backlogWorker, maybe should be
for wi := range handler.postVerificationQueue {
outChan <- wi
}
return
}
if handler.checkAlreadyCommitted(wi) {
// this is not expected during the test
continue
}

// enqueue the task to the verification pool.
handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil)

case wi, ok := <-handler.postVerificationQueue:
if !ok {
return
}
outChan <- wi

case <-handler.ctx.Done():
return
}
}
}()

// Prepare the transactions
signedTransactionGroups, badTxnGroups :=
makeSignedTxnGroups(numberOfTransactionGroups, numUsers, maxGroupSize, 0.5, addresses, secrets)
encodedSignedTransactionGroups := make([]network.IncomingMessage, 0, numberOfTransactionGroups)
for _, stxngrp := range signedTransactionGroups {
data := make([]byte, 0)
for _, stxn := range stxngrp {
data = append(data, protocol.Encode(&stxn)...)
}
encodedSignedTransactionGroups =
append(encodedSignedTransactionGroups, network.IncomingMessage{Data: data})
}

// Process the results and make sure they are correct
wg.Add(1)
go func() {
defer wg.Done()
groupCounter := 0
txnCounter := 0
invalidCounter := 0
defer func() {
t.Logf("processed %d txn groups (%d txns)\n", groupCounter, txnCounter)
}()
for wi := range outChan {
txnCounter = txnCounter + len(wi.unverifiedTxGroup)
groupCounter++
u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note)
_, inBad := badTxnGroups[u]
if wi.verificationErr == nil {
require.False(t, inBad, "No error for invalid signature")
} else {
invalidCounter++
require.True(t, inBad, "Error for good signature")
}
}
t.Logf("Txn groups with invalid sigs: %d\n", invalidCounter)
}()

// Send the transactions to the verifier
for _, tg := range encodedSignedTransactionGroups {
handler.processIncomingTxn(tg)
randduration := time.Duration(uint64(((1 + rand.Float32()) * 3)))
time.Sleep(randduration * time.Microsecond)
}
close(handler.backlogQueue)
wg.Wait()

// Report the number of transactions dropped because the backlog was busy
var buf strings.Builder
metrics.DefaultRegistry().WriteMetrics(&buf, "")
str := buf.String()
x := strings.Index(str, "\nalgod_transaction_messages_dropped_backlog")
str = str[x+44 : x+44+strings.Index(str[x+44:], "\n")]
str = strings.TrimSpace(strings.ReplaceAll(str, "}", " "))
t.Logf("dropped %s txn gropus\n", str)
}

// makeSignedTxnGroups prepares N transaction groups of random (maxGroupSize) sizes with random
// invalid signatures of a given probability (invalidProb)
func makeSignedTxnGroups(N, numUsers, maxGroupSize int, invalidProb float32, addresses []basics.Address,
secrets []*crypto.SignatureSecrets) (ret [][]transactions.SignedTxn,
badTxnGroups map[uint64]interface{}) {
badTxnGroups = make(map[uint64]interface{})

protoMaxGrpSize := proto.MaxTxGroupSize
ret = make([][]transactions.SignedTxn, 0, N)
for u := 0; u < N; u++ {
grpSize := rand.Intn(protoMaxGrpSize-1) + 1
if grpSize > maxGroupSize {
grpSize = maxGroupSize
}
var txGroup transactions.TxGroup
txns := make([]transactions.Transaction, 0, grpSize)
for g := 0; g < grpSize; g++ {
// generate transactions
noteField := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(noteField, uint64(u))
tx := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[(u+g)%numUsers],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
Note: noteField,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: addresses[(u+g+1)%numUsers],
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
}
txGroup.TxGroupHashes = append(txGroup.TxGroupHashes, crypto.Digest(tx.ID()))
txns = append(txns, tx)
}
groupHash := crypto.HashObj(txGroup)
signedTxGroup := make([]transactions.SignedTxn, 0, grpSize)
for g, txn := range txns {
txn.Group = groupHash
signedTx := txn.Sign(secrets[(u+g)%numUsers])
signedTx.Txn = txn
signedTxGroup = append(signedTxGroup, signedTx)
}
// randomly make bad signatures
if rand.Float32() < invalidProb {
tinGrp := rand.Intn(grpSize)
signedTxGroup[tinGrp].Sig[0] = signedTxGroup[tinGrp].Sig[0] + 1
badTxnGroups[uint64(u)] = struct{}{}
}
ret = append(ret, signedTxGroup)
}
return
}

// BenchmarkHandler sends singed transactions the the verifier
func BenchmarkHandleTxns(b *testing.B) {
b.N = b.N * proto.MaxTxGroupSize / 2
runHandlerBenchmark(1, b)
}

// BenchmarkHandler sends singed transaction groups to the verifier
func BenchmarkHandleTxnGroups(b *testing.B) {
runHandlerBenchmark(proto.MaxTxGroupSize, b)
}

// runHandlerBenchmark has a similar workflow to incomingTxHandlerProcessing,
// but bypasses the backlog, and sends the transactions directly to the verifier
func runHandlerBenchmark(maxGroupSize int, b *testing.B) {
const numUsers = 100
log := logging.TestingLog(b)
log.SetLevel(logging.Warn)
addresses := make([]basics.Address, numUsers)
secrets := make([]*crypto.SignatureSecrets, numUsers)

// prepare the accounts
genesis := make(map[basics.Address]basics.AccountData)
for i := 0; i < numUsers; i++ {
secret := keypair()
addr := basics.Address(secret.SignatureVerifier)
secrets[i] = secret
addresses[i] = addr
genesis[addr] = basics.AccountData{
Status: basics.Online,
MicroAlgos: basics.MicroAlgos{Raw: 10000000000000},
}
}
genesis[poolAddr] = basics.AccountData{
Status: basics.NotParticipating,
MicroAlgos: basics.MicroAlgos{Raw: config.Consensus[protocol.ConsensusCurrentVersion].MinBalance},
}

require.Equal(b, len(genesis), numUsers+1)
genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
ledgerName := fmt.Sprintf("%s-mem-%d", b.Name(), b.N)
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
require.NoError(b, err)

l := ledger
tp := pools.MakeTransactionPool(l.Ledger, cfg, logging.Base())
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
handler := MakeTxHandler(tp, l, &mocks.MockNetwork{}, "", crypto.Digest{}, backlogPool)
defer handler.ctxCancel()

// Prepare the transactions
signedTransactionGroups, badTxnGroups := makeSignedTxnGroups(b.N, numUsers, maxGroupSize, 0.001, addresses, secrets)
outChan := handler.postVerificationQueue
wg := sync.WaitGroup{}

var tt time.Time
// Process the results and make sure they are correct
wg.Add(1)
go func() {
defer wg.Done()
groupCounter := 0
var txnCounter uint64
invalidCounter := 0
for wi := range outChan {
txnCounter = txnCounter + uint64(len(wi.unverifiedTxGroup))
groupCounter++
u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note)
_, inBad := badTxnGroups[u]
if wi.verificationErr == nil {
require.False(b, inBad, "No error for invalid signature")
} else {
invalidCounter++
require.True(b, inBad, "Error for good signature")
}
}
if txnCounter > 0 {
b.Logf("TPS: %d\n", uint64(txnCounter)*1000000000/uint64(time.Since(tt)))
b.Logf("Time/txn: %d(microsec)\n", uint64((time.Since(tt)/time.Microsecond))/txnCounter)
b.Logf("processed total: [%d groups (%d invalid)] [%d txns]\n", groupCounter, invalidCounter, txnCounter)
}
}()

b.ResetTimer()
tt = time.Now()
for _, stxngrp := range signedTransactionGroups {
blm := txBacklogMsg{rawmsg: nil, unverifiedTxGroup: stxngrp}
handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, &blm, nil)
}
// shut down to end the test
handler.txVerificationPool.Shutdown()
close(handler.postVerificationQueue)
close(handler.backlogQueue)
wg.Wait()
}