Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add counters for checkAlreadyCommitted, split "InLedger" counters by reason #4881

Merged
merged 4 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@ var transactionGroupTxSyncAlreadyCommitted = metrics.MakeCounter(metrics.Transac
var transactionMessageTxPoolRememberCounter = metrics.NewTagCounter(
"algod_transaction_messages_txpool_remember_{TAG}", "Number of transaction messages not remembered by txpool b/c if {TAG}",
txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagEvalGeneric,
txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
)

var transactionMessageTxPoolCheckCounter = metrics.NewTagCounter(
"algod_transaction_messages_txpool_check_{TAG}", "Number of transaction messages that didn't pass check by txpool b/c if {TAG}",
txPoolRememberTagTxnNotWellFormed, txPoolRememberTagTxnDead, txPoolRememberTagTooLarge, txPoolRememberTagGroupID,
txPoolRememberTagTxID, txPoolRememberTagLease, txPoolRememberTagTxIDEval, txPoolRememberTagLeaseEval, txPoolRememberTagEvalGeneric,
)

const (
Expand All @@ -83,7 +89,11 @@ const (
txPoolRememberTagGroupID = "groupid"
txPoolRememberTagTxID = "txid"
txPoolRememberTagLease = "lease"
txPoolRememberTagTxIDEval = "txid_eval"
txPoolRememberTagLeaseEval = "lease_eval"
txPoolRememberTagEvalGeneric = "eval"

txPoolRememberTagTxnNotWellFormed = "not_well"
)

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
Expand Down Expand Up @@ -274,6 +284,41 @@ func (handler *TxHandler) postProcessReportErrors(err error) {
}
}

func (handler *TxHandler) checkReportErrors(err error) {
switch err := err.(type) {
case *ledgercore.TxnNotWellFormedError:
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagTxnNotWellFormed, 1)
return
case *transactions.TxnDeadError:
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagTxnDead, 1)
return
case *ledgercore.TransactionInLedgerError:
if err.InBlockEvaluator {
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagTxIDEval, 1)
} else {
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagTxID, 1)
}
return
case *ledgercore.LeaseInLedgerError:
if err.InBlockEvaluator {
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagLeaseEval, 1)
} else {
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagLease, 1)
}
return
case *ledgercore.TxGroupMalformedError:
switch err.Reason {
case ledgercore.TxGroupMalformedErrorReasonExceedMaxSize:
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagTooLarge, 1)
default:
transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagGroupID, 1)
}
return
}

transactionMessageTxPoolCheckCounter.Add(txPoolRememberTagEvalGeneric, 1)
}

func (handler *TxHandler) rememberReportErrors(err error) {
if errors.Is(err, pools.ErrPendingQueueReachedMaxCap) {
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagCap, 1)
Expand Down Expand Up @@ -306,10 +351,18 @@ func (handler *TxHandler) rememberReportErrors(err error) {
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagTxnDead, 1)
return
case *ledgercore.TransactionInLedgerError:
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagTxID, 1)
if err.InBlockEvaluator {
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagTxIDEval, 1)
} else {
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagTxID, 1)
}
return
case *ledgercore.LeaseInLedgerError:
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagLease, 1)
if err.InBlockEvaluator {
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagLeaseEval, 1)
} else {
transactionMessageTxPoolRememberCounter.Add(txPoolRememberTagLease, 1)
}
return
case *ledgercore.TxGroupMalformedError:
switch err.Reason {
Expand Down Expand Up @@ -552,6 +605,7 @@ func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDon
// do a quick test to check that this transaction could potentially be committed, to reject dup pending transactions
err := handler.txPool.Test(tx.unverifiedTxGroup)
if err != nil {
handler.checkReportErrors(err)
logging.Base().Debugf("txPool rejected transaction: %v", err)
return true
}
Expand Down
31 changes: 28 additions & 3 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,13 +1635,21 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) {
t.Parallel()

result := map[string]float64{}
checkResult := map[string]float64{}
getMetricName := func(tag string) string {
return strings.ReplaceAll(transactionMessageTxPoolRememberCounter.Name, "{TAG}", tag)
}
getCheckMetricName := func(tag string) string {
return strings.ReplaceAll(transactionMessageTxPoolCheckCounter.Name, "{TAG}", tag)
}
getMetricCounter := func(tag string) int {
transactionMessageTxPoolRememberCounter.AddMetric(result)
return int(result[getMetricName(tag)])
}
getCheckMetricCounter := func(tag string) int {
transactionMessageTxPoolCheckCounter.AddMetric(checkResult)
return int(checkResult[getCheckMetricName(tag)])
}

log := logging.TestingLog(t)
log.SetLevel(logging.Warn)
Expand Down Expand Up @@ -1718,10 +1726,19 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) {
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagCap))

// trigger group id error
// trigger not well-formed error
txn2 := txn1
txn2.Sender = basics.Address{}
wi.unverifiedTxGroup = []transactions.SignedTxn{txn2.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxnNotWellFormed))

// trigger group id error
txn2 = txn1
crypto.RandBytes(txn2.Group[:])
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0]), txn2.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagGroupID))
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagGroupID))

Expand All @@ -1734,6 +1751,8 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) {
}
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTooLarge))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTooLarge))

// trigger eval error
secret := keypair()
Expand All @@ -1751,13 +1770,17 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) {
wi.unverifiedTxGroup = []transactions.SignedTxn{txn2.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
require.Equal(t, prevTxnDead+1, getMetricCounter(txPoolRememberTagTxnDead))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxnDead))

// trigger TransactionInLedgerError (txid) error
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0])}
wi.rawmsg = &network.IncomingMessage{}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxID))
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxIDEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxIDEval))

// trigger LeaseInLedgerError (lease) error
txn2 = txn1
Expand All @@ -1768,7 +1791,9 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) {
handler.postProcessCheckedTxn(&wi)
wi.unverifiedTxGroup = []transactions.SignedTxn{txn3.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagLease))
require.Equal(t, 1, getMetricCounter(txPoolRememberTagLeaseEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagLeaseEval))

// TODO: not sure how to trigger fee error - need to return ErrNoSpace from ledger
// trigger pool fee error
Expand Down
4 changes: 2 additions & 2 deletions ledger/internal/cow.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ func (cb *roundCowState) lookupAssetHolding(addr basics.Address, aidx basics.Ass
func (cb *roundCowState) checkDup(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {
_, present := cb.mods.Txids[txid]
if present {
return &ledgercore.TransactionInLedgerError{Txid: txid}
return &ledgercore.TransactionInLedgerError{Txid: txid, InBlockEvaluator: true}
}

if cb.proto.SupportTransactionLeases && (txl.Lease != [32]byte{}) {
expires, ok := cb.mods.Txleases[txl]
if ok && cb.mods.Hdr.Round <= expires {
return ledgercore.MakeLeaseInLedgerError(txid, txl)
return ledgercore.MakeLeaseInLedgerError(txid, txl, true)
}
}

Expand Down
27 changes: 20 additions & 7 deletions ledger/internal/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,10 @@ func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTx
}

if len(txgroup) > eval.proto.MaxTxGroupSize {
return fmt.Errorf("group size %d exceeds maximum %d", len(txgroup), eval.proto.MaxTxGroupSize)
return &ledgercore.TxGroupMalformedError{
Msg: fmt.Sprintf("group size %d exceeds maximum %d", len(txgroup), eval.proto.MaxTxGroupSize),
Reason: ledgercore.TxGroupMalformedErrorReasonExceedMaxSize,
}
}

var group transactions.TxGroup
Expand All @@ -847,8 +850,11 @@ func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTx

// Make sure all transactions in group have the same group value
if txn.Txn.Group != txgroup[0].Txn.Group {
return fmt.Errorf("transactionGroup: inconsistent group values: %v != %v",
txn.Txn.Group, txgroup[0].Txn.Group)
return &ledgercore.TxGroupMalformedError{
Msg: fmt.Sprintf("transactionGroup: inconsistent group values: %v != %v",
txn.Txn.Group, txgroup[0].Txn.Group),
Reason: ledgercore.TxGroupMalformedErrorReasonInconsistentGroupID,
}
}

if !txn.Txn.Group.IsZero() {
Expand All @@ -857,15 +863,21 @@ func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTx

group.TxGroupHashes = append(group.TxGroupHashes, crypto.Digest(txWithoutGroup.ID()))
} else if len(txgroup) > 1 {
return fmt.Errorf("transactionGroup: [%d] had zero Group but was submitted in a group of %d", gi, len(txgroup))
return &ledgercore.TxGroupMalformedError{
Msg: fmt.Sprintf("transactionGroup: [%d] had zero Group but was submitted in a group of %d", gi, len(txgroup)),
Reason: ledgercore.TxGroupMalformedErrorReasonEmptyGroupID,
}
}
}

// If we had a non-zero Group value, check that all group members are present.
if group.TxGroupHashes != nil {
if txgroup[0].Txn.Group != crypto.HashObj(group) {
return fmt.Errorf("transactionGroup: incomplete group: %v != %v (%v)",
txgroup[0].Txn.Group, crypto.HashObj(group), group)
return &ledgercore.TxGroupMalformedError{
Msg: fmt.Sprintf("transactionGroup: incomplete group: %v != %v (%v)",
txgroup[0].Txn.Group, crypto.HashObj(group), group),
Reason: ledgercore.TxGroupMalformedErrorReasonIncompleteGroup,
}
}
}

Expand All @@ -884,7 +896,8 @@ func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {

err = txn.Txn.WellFormed(eval.specials, eval.proto)
if err != nil {
return fmt.Errorf("transaction %v: malformed: %v", txn.ID(), err)
txnErr := ledgercore.TxnNotWellFormedError(fmt.Sprintf("transaction %v: malformed: %v", txn.ID(), err))
return &txnErr
}

// Transaction already in the ledger?
Expand Down
2 changes: 1 addition & 1 deletion ledger/internal/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func (ledger *evalTestLedger) CheckDup(currentProto config.ConsensusParams, curr
}
currentTxid := txn.Txn.ID()
if bytes.Equal(txid[:], currentTxid[:]) {
return &ledgercore.TransactionInLedgerError{Txid: txid}
return &ledgercore.TransactionInLedgerError{Txid: txid, InBlockEvaluator: false}
}
}
}
Expand Down
26 changes: 19 additions & 7 deletions ledger/ledgercore/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,19 @@ import (
// ErrNoSpace indicates insufficient space for transaction in block
var ErrNoSpace = errors.New("block does not have space for transaction")

// TransactionInLedgerError is returned when a transaction cannot be added because it has already been done
// TxnNotWellFormedError indicates a transaction was not well-formed when evaluated by the BlockEvaluator
//msgp:ignore TxnNotWellFormedError
type TxnNotWellFormedError string

func (err *TxnNotWellFormedError) Error() string {
return string(*err)
}

// TransactionInLedgerError is returned when a transaction cannot be added because it has already been committed, either
// to the blockchain's ledger or to the history of changes tracked by a BlockEvaluator.
type TransactionInLedgerError struct {
Txid transactions.Txid
Txid transactions.Txid
InBlockEvaluator bool
}

// Error satisfies builtin interface `error`
Expand All @@ -39,15 +49,17 @@ func (tile TransactionInLedgerError) Error() string {

// LeaseInLedgerError is returned when a transaction cannot be added because it has a lease that already being used in the relevant rounds
type LeaseInLedgerError struct {
txid transactions.Txid
lease Txlease
txid transactions.Txid
lease Txlease
InBlockEvaluator bool
}

// MakeLeaseInLedgerError builds a LeaseInLedgerError object
func MakeLeaseInLedgerError(txid transactions.Txid, lease Txlease) *LeaseInLedgerError {
func MakeLeaseInLedgerError(txid transactions.Txid, lease Txlease, inBlockEvaluator bool) *LeaseInLedgerError {
return &LeaseInLedgerError{
txid: txid,
lease: lease,
txid: txid,
lease: lease,
InBlockEvaluator: inBlockEvaluator,
}
}

Expand Down
4 changes: 2 additions & 2 deletions ledger/txtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,13 @@ func (t *txTail) checkDup(proto config.ConsensusParams, current basics.Round, fi
for rnd := firstChecked; rnd <= lastChecked; rnd++ {
expires, ok := t.recent[rnd].txleases[txl]
if ok && current <= expires {
return ledgercore.MakeLeaseInLedgerError(txid, txl)
return ledgercore.MakeLeaseInLedgerError(txid, txl, false)
}
}
}

if _, confirmed := t.lastValid[lastValid][txid]; confirmed {
return &ledgercore.TransactionInLedgerError{Txid: txid}
return &ledgercore.TransactionInLedgerError{Txid: txid, InBlockEvaluator: false}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/txtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestTxTailLoadFromDisk(t *testing.T) {
txn.Txn.FirstValid, txn.Txn.LastValid, txn.Txn.ID(),
txl)
if r >= ledger.Latest()-testTxTailValidityRange {
require.Equal(t, ledgercore.MakeLeaseInLedgerError(txn.Txn.ID(), txl), dupResult)
require.Equal(t, ledgercore.MakeLeaseInLedgerError(txn.Txn.ID(), txl, false), dupResult)
} else {
require.Equal(t, &errTxTailMissingRound{round: txn.Txn.LastValid}, dupResult)
}
Expand Down