From da7064186d940602b1f28f4fbb4b6116cd3fa1e4 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Thu, 2 Jan 2025 22:10:42 -0500 Subject: [PATCH 01/11] add params and check EnableOnlineAccountCatchpoints in catchpointFileWriter --- ledger/catchpointfilewriter.go | 26 ++++++++----- ledger/catchpointfilewriter_test.go | 57 +++++++++++++++++++---------- ledger/catchpointtracker.go | 8 ++-- ledger/catchpointtracker_test.go | 7 ++-- 4 files changed, 63 insertions(+), 35 deletions(-) diff --git a/ledger/catchpointfilewriter.go b/ledger/catchpointfilewriter.go index 606da98aff..2e7d8f4401 100644 --- a/ledger/catchpointfilewriter.go +++ b/ledger/catchpointfilewriter.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" + "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/ledger/encoded" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/ledger/store/trackerdb" @@ -53,6 +54,7 @@ const ( type catchpointFileWriter struct { ctx context.Context tx trackerdb.SnapshotScope + params config.ConsensusParams filePath string totalAccounts uint64 totalKVs uint64 @@ -104,7 +106,7 @@ func (data catchpointStateProofVerificationContext) ToBeHashed() (protocol.HashI return protocol.StateProofVerCtx, protocol.Encode(&data) } -func makeCatchpointFileWriter(ctx context.Context, filePath string, tx trackerdb.SnapshotScope, maxResourcesPerChunk int) (*catchpointFileWriter, error) { +func makeCatchpointFileWriter(ctx context.Context, params config.ConsensusParams, filePath string, tx trackerdb.SnapshotScope, maxResourcesPerChunk int) (*catchpointFileWriter, error) { aw, err := tx.MakeAccountsReader() if err != nil { return nil, err @@ -120,14 +122,17 @@ func makeCatchpointFileWriter(ctx context.Context, filePath string, tx trackerdb return nil, err } - totalOnlineAccounts, err := aw.TotalOnlineAccountRows(ctx) - if err != nil { - return nil, err - } + var totalOnlineAccounts, totalOnlineRoundParams uint64 + if params.EnableCatchpointsWithOnlineAccounts { + totalOnlineAccounts, err = aw.TotalOnlineAccountRows(ctx) + if err != nil { + return nil, err + } - totalOnlineRoundParams, err := aw.TotalOnlineRoundParams(ctx) - if err != nil { - return nil, err + totalOnlineRoundParams, err = aw.TotalOnlineRoundParams(ctx) + if err != nil { + return nil, err + } } err = os.MkdirAll(filepath.Dir(filePath), 0700) @@ -147,6 +152,7 @@ func makeCatchpointFileWriter(ctx context.Context, filePath string, tx trackerdb res := &catchpointFileWriter{ ctx: ctx, tx: tx, + params: params, filePath: filePath, totalAccounts: totalAccounts, totalKVs: totalKVs, @@ -370,7 +376,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error { cw.kvDone = true } - if !cw.onlineAccountsDone { + if cw.params.EnableOnlineAccountCatchpoints && !cw.onlineAccountsDone { // Create the OnlineAccounts iterator JIT if cw.onlineAccountRows == nil { rows, err := cw.tx.MakeOnlineAccountsIter(ctx, false) @@ -399,7 +405,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error { cw.onlineAccountsDone = true } - if !cw.onlineRoundParamsDone { + if cw.params.EnableOnlineAccountCatchpoints && !cw.onlineRoundParamsDone { // Create the OnlineRoundParams iterator JIT if cw.onlineRoundParamsRows == nil { rows, err := cw.tx.MakeOnlineRoundParamsIter(ctx, false) diff --git a/ledger/catchpointfilewriter_test.go b/ledger/catchpointfilewriter_test.go index d091cd5331..d3480e6d8d 100644 --- a/ledger/catchpointfilewriter_test.go +++ b/ledger/catchpointfilewriter_test.go @@ -149,7 +149,7 @@ func verifyStateProofVerificationContextWrite(t *testing.T, data []ledgercore.St require.NoError(t, err) err = ml.trackerDB().Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { - writer, err := makeCatchpointFileWriter(context.Background(), fileName, tx, ResourcesPerCatchpointFileChunk) + writer, err := makeCatchpointFileWriter(context.Background(), protoParams, fileName, tx, ResourcesPerCatchpointFileChunk) if err != nil { return err } @@ -264,7 +264,7 @@ func TestBasicCatchpointWriter(t *testing.T) { fileName := filepath.Join(temporaryDirectory, "15.data") err = ml.trackerDB().Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { - writer, err := makeCatchpointFileWriter(context.Background(), fileName, tx, ResourcesPerCatchpointFileChunk) + writer, err := makeCatchpointFileWriter(context.Background(), protoParams, fileName, tx, ResourcesPerCatchpointFileChunk) if err != nil { return err } @@ -297,7 +297,7 @@ func TestBasicCatchpointWriter(t *testing.T) { require.Equal(t, uint64(len(accts)), uint64(len(chunk.Balances))) } -func testWriteCatchpoint(t *testing.T, rdb trackerdb.Store, datapath string, filepath string, maxResourcesPerChunk int) CatchpointFileHeader { +func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb trackerdb.Store, datapath string, filepath string, maxResourcesPerChunk int) CatchpointFileHeader { var totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks uint64 var biggestChunkLen uint64 var accountsRnd basics.Round @@ -307,7 +307,7 @@ func testWriteCatchpoint(t *testing.T, rdb trackerdb.Store, datapath string, fil } err := rdb.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { - writer, err := makeCatchpointFileWriter(context.Background(), datapath, tx, maxResourcesPerChunk) + writer, err := makeCatchpointFileWriter(context.Background(), params, datapath, tx, maxResourcesPerChunk) if err != nil { return err } @@ -440,7 +440,7 @@ func TestCatchpointReadDatabaseOverflowSingleAccount(t *testing.T) { totalAccountsWritten := uint64(0) totalResources := 0 totalChunks := 0 - cw, err := makeCatchpointFileWriter(context.Background(), catchpointDataFilePath, tx, maxResourcesPerChunk) + cw, err := makeCatchpointFileWriter(context.Background(), protoParams, catchpointDataFilePath, tx, maxResourcesPerChunk) require.NoError(t, err) ar, err := tx.MakeAccountsReader() @@ -546,7 +546,7 @@ func TestCatchpointReadDatabaseOverflowAccounts(t *testing.T) { totalAccountsWritten := uint64(0) totalResources := 0 - cw, err := makeCatchpointFileWriter(context.Background(), catchpointDataFilePath, tx, maxResourcesPerChunk) + cw, err := makeCatchpointFileWriter(context.Background(), protoParams, catchpointDataFilePath, tx, maxResourcesPerChunk) require.NoError(t, err) // repeat this until read all accts @@ -604,7 +604,7 @@ func TestFullCatchpointWriterOverflowAccounts(t *testing.T) { catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint") const maxResourcesPerChunk = 5 - testWriteCatchpoint(t, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk) + testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk) l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath) defer l.Close() @@ -802,7 +802,7 @@ func TestFullCatchpointWriter(t *testing.T) { catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint") - testWriteCatchpoint(t, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath) defer l.Close() @@ -833,9 +833,25 @@ func TestExactAccountChunk(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() + t.Run("v33", func(t *testing.T) { + proto := protocol.ConsensusV33 + testExactAccountChunk(t, proto, 1) + }) + t.Run("v34", func(t *testing.T) { + proto := protocol.ConsensusV34 + testExactAccountChunk(t, proto, 2) + }) + t.Run("future", func(t *testing.T) { + proto := protocol.ConsensusFuture + testExactAccountChunk(t, proto, 2) + }) +} + +func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, totalChunks int) { genBalances, addrs, _ := ledgertesting.NewTestGenesis() cfg := config.GetDefaultLocal() - dl := NewDoubleLedger(t, genBalances, protocol.ConsensusFuture, cfg) + + dl := NewDoubleLedger(t, genBalances, proto, cfg) defer dl.Close() pay := txntest.Txn{ @@ -879,8 +895,8 @@ func TestExactAccountChunk(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) - require.EqualValues(t, cph.TotalChunks, 2) + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + require.EqualValues(t, cph.TotalChunks, totalChunks) l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) defer l.Close() @@ -894,7 +910,8 @@ func TestCatchpointAfterTxns(t *testing.T) { genBalances, addrs, _ := ledgertesting.NewTestGenesis() cfg := config.GetDefaultLocal() - dl := NewDoubleLedger(t, genBalances, protocol.ConsensusFuture, cfg) + proto := protocol.ConsensusFuture + dl := NewDoubleLedger(t, genBalances, proto, cfg) defer dl.Close() boxApp := dl.fundedApp(addrs[1], 1_000_000, boxAppSource) @@ -931,7 +948,7 @@ func TestCatchpointAfterTxns(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, 3, cph.TotalChunks) l := testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath) @@ -947,7 +964,7 @@ func TestCatchpointAfterTxns(t *testing.T) { dl.fullBlock(&newacctpay) // Write and read back in, and ensure even the last effect exists. - cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, cph.TotalChunks, 3) // Still only 3 chunks, as last was in a recent block // Drive home the point that `last` is _not_ included in the catchpoint by inspecting balance read from catchpoint. @@ -963,7 +980,7 @@ func TestCatchpointAfterTxns(t *testing.T) { dl.fullBlock(pay.Noted(strconv.Itoa(i))) } - cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, cph.TotalChunks, 4) l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath) @@ -994,7 +1011,8 @@ func TestCatchpointAfterStakeLookupTxns(t *testing.T) { ledgertesting.TurnOffRewards(cfg) }) cfg := config.GetDefaultLocal() - dl := NewDoubleLedger(t, genBalances, protocol.ConsensusFuture, cfg, simpleLedgerOnDisk()) + proto := protocol.ConsensusFuture + dl := NewDoubleLedger(t, genBalances, proto, cfg, simpleLedgerOnDisk()) defer dl.Close() initialStake := uint64(833333333333333) @@ -1117,7 +1135,7 @@ assert catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, 7, cph.TotalChunks) l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) @@ -1161,7 +1179,8 @@ func TestCatchpointAfterBoxTxns(t *testing.T) { genBalances, addrs, _ := ledgertesting.NewTestGenesis() cfg := config.GetDefaultLocal() - dl := NewDoubleLedger(t, genBalances, protocol.ConsensusFuture, cfg) + proto := protocol.ConsensusFuture + dl := NewDoubleLedger(t, genBalances, proto, cfg) defer dl.Close() boxApp := dl.fundedApp(addrs[1], 1_000_000, boxAppSource) @@ -1216,7 +1235,7 @@ func TestCatchpointAfterBoxTxns(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, 3, cph.TotalChunks) l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 603672f8e6..45bfbdae0b 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -264,7 +264,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic catchpointGenerationStats.BalancesWriteTime = uint64(updatingBalancesDuration.Nanoseconds()) totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks, biggestChunkLen, err = ct.generateCatchpointData( - ctx, dbRound, &catchpointGenerationStats, spVerificationEncodedData) + ctx, params, dbRound, &catchpointGenerationStats, spVerificationEncodedData) ct.catchpointDataWriting.Store(0) if err != nil { return err @@ -1209,7 +1209,7 @@ func (ct *catchpointTracker) isWritingCatchpointDataFile() bool { // - Balance and KV chunk (named balances.x.msgpack). // ... // - Balance and KV chunk (named balances.x.msgpack). -func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, accountsRound basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails, encodedSPData []byte) (totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks, biggestChunkLen uint64, err error) { +func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, params config.ConsensusParams, accountsRound basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails, encodedSPData []byte) (totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks, biggestChunkLen uint64, err error) { ct.log.Debugf("catchpointTracker.generateCatchpointData() writing catchpoint accounts for round %d", accountsRound) startTime := time.Now() @@ -1233,7 +1233,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account start := time.Now() ledgerGeneratecatchpointCount.Inc(nil) err = ct.dbs.SnapshotContext(ctx, func(dbCtx context.Context, tx trackerdb.SnapshotScope) (err error) { - catchpointWriter, err = makeCatchpointFileWriter(dbCtx, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk) + catchpointWriter, err = makeCatchpointFileWriter(dbCtx, params, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk) if err != nil { return } @@ -1382,6 +1382,8 @@ func (ct *catchpointTracker) recordFirstStageInfo(ctx context.Context, tx tracke With("balancesWriteTime", catchpointGenerationStats.BalancesWriteTime). With("accountsCount", catchpointGenerationStats.AccountsCount). With("kvsCount", catchpointGenerationStats.KVsCount). + With("onlineAccountsCount", catchpointGenerationStats.OnlineAccountsCount). + With("onlineRoundParamsCount", catchpointGenerationStats.OnlineRoundParamsCount). With("fileSize", catchpointGenerationStats.FileSize). With("MerkleTrieRootHash", catchpointGenerationStats.MerkleTrieRootHash). With("SPVerificationCtxsHash", catchpointGenerationStats.SPVerificationCtxsHash). diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index ff84cabcb3..3c2b766081 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -362,9 +362,10 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics. spVerificationEncodedData, stateProofVerificationHash, err := ct.getSPVerificationData() require.NoError(t, err) + proto := protocol.ConsensusCurrentVersion var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails _, _, _, _, _, biggestChunkLen, err := ct.generateCatchpointData( - context.Background(), accountsRound, &catchpointGenerationStats, spVerificationEncodedData) + context.Background(), config.Consensus[proto], accountsRound, &catchpointGenerationStats, spVerificationEncodedData) require.NoError(t, err) require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash) @@ -372,7 +373,7 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics. err = ct.createCatchpoint( context.Background(), accountsRound, round, trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, - crypto.Digest{}, protocol.ConsensusCurrentVersion) + crypto.Digest{}, proto) require.NoError(t, err) } @@ -605,7 +606,7 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) { encodedSPData, _, err := ct.getSPVerificationData() require.NoError(b, err) b.ResetTimer() - ct.generateCatchpointData(context.Background(), basics.Round(0), &catchpointGenerationStats, encodedSPData) + ct.generateCatchpointData(context.Background(), proto, basics.Round(0), &catchpointGenerationStats, encodedSPData) b.StopTimer() b.ReportMetric(float64(accountsNumber), "accounts") } From ba2dea20ee250d1de1e72b4c33ae8f85373d22be Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Thu, 2 Jan 2025 22:11:21 -0500 Subject: [PATCH 02/11] catchpointdump: add onlineaccounts and onlineroundparams --- cmd/catchpointdump/database.go | 8 +++ cmd/catchpointdump/file.go | 89 +++++++++++++++++++++++++++++++++- cmd/catchpointdump/net.go | 9 +++- 3 files changed, 104 insertions(+), 2 deletions(-) diff --git a/cmd/catchpointdump/database.go b/cmd/catchpointdump/database.go index 31f18cc867..57b05e84c8 100644 --- a/cmd/catchpointdump/database.go +++ b/cmd/catchpointdump/database.go @@ -88,6 +88,14 @@ var databaseCmd = &cobra.Command{ if err != nil { reportErrorf("Unable to print state proof verification database : %v", err) } + err = printOnlineAccounts(ledgerTrackerFilename, ledgerTrackerStaging, outFile) + if err != nil { + reportErrorf("Unable to print online accounts : %v", err) + } + err = printOnlineRoundParams(ledgerTrackerFilename, ledgerTrackerStaging, outFile) + if err != nil { + reportErrorf("Unable to print online round params : %v", err) + } }, } diff --git a/cmd/catchpointdump/file.go b/cmd/catchpointdump/file.go index eeda7f25ec..869cbdd1e3 100644 --- a/cmd/catchpointdump/file.go +++ b/cmd/catchpointdump/file.go @@ -142,6 +142,14 @@ var fileCmd = &cobra.Command{ if err != nil { reportErrorf("Unable to print state proof verification database : %v", err) } + err = printOnlineAccounts("./ledger.tracker.sqlite", true, outFile) + if err != nil { + reportErrorf("Unable to print online accounts : %v", err) + } + err = printOnlineRoundParams("./ledger.tracker.sqlite", true, outFile) + if err != nil { + reportErrorf("Unable to print online round params : %v", err) + } } }, } @@ -219,8 +227,17 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc if err != nil { return fileHeader, err } - fmt.Printf("accounts digest=%s, spver digest=%s, onlineaccounts digest=%s onlineroundparams digest=%s\n\n", + fmt.Printf("accounts digest=%s, spver digest=%s, onlineaccounts digest=%s onlineroundparams digest=%s\n", balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash) + + fmt.Printf("Catchpoint label: %s\n", fileHeader.Catchpoint) + // make v7 label + v7Label := ledgercore.MakeCatchpointLabelMakerV7(fileHeader.BlocksRound, &fileHeader.BlockHeaderDigest, &balanceHash, fileHeader.Totals, &spverHash) + fmt.Printf("catchpoint v7 label: %s\n", ledgercore.MakeLabel(v7Label)) + + // make v8 label (current) + v8Label := ledgercore.MakeCatchpointLabelMakerCurrent(fileHeader.BlocksRound, &fileHeader.BlockHeaderDigest, &balanceHash, fileHeader.Totals, &spverHash, &onlineAccountsHash, &onlineRoundParamsHash) + fmt.Printf("catchpoint v8 label: %s\n\n", ledgercore.MakeLabel(v8Label)) } return fileHeader, nil } @@ -296,6 +313,8 @@ func printAccountsDatabase(databaseName string, stagingTables bool, fileHeader l "Catchpoint: %s", "Total Accounts: %d", "Total KVs: %d", + "Total Online Accounts: %d", + "Total Online Round Params: %d", "Total Chunks: %d", } var headerValues = []interface{}{ @@ -306,6 +325,8 @@ func printAccountsDatabase(databaseName string, stagingTables bool, fileHeader l fileHeader.Catchpoint, fileHeader.TotalAccounts, fileHeader.TotalKVs, + fileHeader.TotalOnlineAccounts, + fileHeader.TotalOnlineRoundParams, fileHeader.TotalChunks, } // safety check @@ -560,3 +581,69 @@ func printKeyValueStore(databaseName string, stagingTables bool, outFile *os.Fil return nil }) } + +func printOnlineAccounts(databaseName string, stagingTables bool, outFile *os.File) error { + fmt.Printf("\n") + + fileWriter := bufio.NewWriterSize(outFile, 1024*1024) + defer fileWriter.Flush() + + dbAccessor, err := db.MakeAccessor(databaseName, true, false) + if err != nil || dbAccessor.Handle == nil { + return err + } + + return dbAccessor.Atomic(func(ctx context.Context, tx *sql.Tx) error { + rows, err := sqlitedriver.MakeOnlineAccountsIter(ctx, tx, stagingTables) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + row, err := rows.GetItem() + if err != nil { + return err + } + jsonData, err := json.Marshal(row) + if err != nil { + return err + } + + fmt.Fprintf(fileWriter, "onlineaccount: %s\n", string(jsonData)) + } + return nil + }) +} + +func printOnlineRoundParams(databaseName string, stagingTables bool, outFile *os.File) error { + fmt.Printf("\n") + + fileWriter := bufio.NewWriterSize(outFile, 1024*1024) + defer fileWriter.Flush() + + dbAccessor, err := db.MakeAccessor(databaseName, true, false) + if err != nil || dbAccessor.Handle == nil { + return err + } + + return dbAccessor.Atomic(func(ctx context.Context, tx *sql.Tx) error { + rows, err := sqlitedriver.MakeOnlineRoundParamsIter(ctx, tx, stagingTables) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + row, err := rows.GetItem() + if err != nil { + return err + } + jsonData, err := json.Marshal(row) + if err != nil { + return err + } + + fmt.Fprintf(fileWriter, "onlineroundparams: %s\n", string(jsonData)) + } + return nil + }) +} diff --git a/cmd/catchpointdump/net.go b/cmd/catchpointdump/net.go index 41e1fd1dd0..1ca11502bf 100644 --- a/cmd/catchpointdump/net.go +++ b/cmd/catchpointdump/net.go @@ -365,7 +365,14 @@ func loadAndDump(addr string, tarFile string, genesisInitState ledgercore.InitSt if err != nil { return err } - + err = printOnlineAccounts("./ledger.tracker.sqlite", true, outFile) + if err != nil { + return err + } + err = printOnlineRoundParams("./ledger.tracker.sqlite", true, outFile) + if err != nil { + return err + } } return nil } From a12b3597ec28afca35b1fe8eda97400ca8e0a27d Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Fri, 3 Jan 2025 01:07:11 -0500 Subject: [PATCH 03/11] handle votersTracker setting lowestRound to lower than dbRound+1-320 --- ledger/catchpointfilewriter.go | 9 ++- ledger/catchpointtracker.go | 40 +++++++++--- ledger/catchupaccessor.go | 9 +-- ledger/store/trackerdb/generickv/reader.go | 5 +- .../trackerdb/sqlitedriver/accountsV2.go | 6 +- .../store/trackerdb/sqlitedriver/kvsIter.go | 65 +++++++++++++++++-- .../trackerdb/sqlitedriver/sqlitedriver.go | 8 +-- ledger/store/trackerdb/store.go | 5 +- 8 files changed, 115 insertions(+), 32 deletions(-) diff --git a/ledger/catchpointfilewriter.go b/ledger/catchpointfilewriter.go index 2e7d8f4401..e2a1c40ce1 100644 --- a/ledger/catchpointfilewriter.go +++ b/ledger/catchpointfilewriter.go @@ -25,6 +25,7 @@ import ( "path/filepath" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger/encoded" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/ledger/store/trackerdb" @@ -69,6 +70,7 @@ type catchpointFileWriter struct { biggestChunkLen uint64 accountsIterator trackerdb.EncodedAccountsBatchIter maxResourcesPerChunk int + onlineExcludeBefore basics.Round accountsDone bool kvRows trackerdb.KVsIter kvDone bool @@ -106,7 +108,7 @@ func (data catchpointStateProofVerificationContext) ToBeHashed() (protocol.HashI return protocol.StateProofVerCtx, protocol.Encode(&data) } -func makeCatchpointFileWriter(ctx context.Context, params config.ConsensusParams, filePath string, tx trackerdb.SnapshotScope, maxResourcesPerChunk int) (*catchpointFileWriter, error) { +func makeCatchpointFileWriter(ctx context.Context, params config.ConsensusParams, filePath string, tx trackerdb.SnapshotScope, maxResourcesPerChunk int, onlineExcludeBefore basics.Round) (*catchpointFileWriter, error) { aw, err := tx.MakeAccountsReader() if err != nil { return nil, err @@ -163,6 +165,7 @@ func makeCatchpointFileWriter(ctx context.Context, params config.ConsensusParams tar: tar, accountsIterator: tx.MakeEncodedAccountsBatchIter(), maxResourcesPerChunk: maxResourcesPerChunk, + onlineExcludeBefore: onlineExcludeBefore, } return res, nil } @@ -379,7 +382,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error { if cw.params.EnableOnlineAccountCatchpoints && !cw.onlineAccountsDone { // Create the OnlineAccounts iterator JIT if cw.onlineAccountRows == nil { - rows, err := cw.tx.MakeOnlineAccountsIter(ctx, false) + rows, err := cw.tx.MakeOnlineAccountsIter(ctx, false, cw.onlineExcludeBefore) if err != nil { return err } @@ -408,7 +411,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error { if cw.params.EnableOnlineAccountCatchpoints && !cw.onlineRoundParamsDone { // Create the OnlineRoundParams iterator JIT if cw.onlineRoundParamsRows == nil { - rows, err := cw.tx.MakeOnlineRoundParamsIter(ctx, false) + rows, err := cw.tx.MakeOnlineRoundParamsIter(ctx, false, cw.onlineExcludeBefore) if err != nil { return err } diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 45bfbdae0b..07754410ce 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -213,7 +213,7 @@ func (ct *catchpointTracker) getSPVerificationData() (encodedData []byte, spVeri return encodedData, spVerificationHash, nil } -func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, blockProto protocol.ConsensusVersion, updatingBalancesDuration time.Duration) error { +func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, onlineAccountsForgetBefore basics.Round, blockProto protocol.ConsensusVersion, updatingBalancesDuration time.Duration) error { ct.log.Infof("finishing catchpoint's first stage dbRound: %d", dbRound) var totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams uint64 @@ -223,8 +223,26 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic var spVerificationEncodedData []byte var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails var onlineAccountsHash, onlineRoundParamsHash crypto.Digest - params := config.Consensus[blockProto] + + // Usually onlineAccountsForgetBefore is dbRound - params.MaxBalLookback (320 roudns of history), + // but if votersTracker needs more state, it can set lowestRound to be earlier than that. + // We want to only write MaxBalLookback rounds of history to the catchpoint file. + var onlineExcludeBefore basics.Round + if (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) == onlineAccountsForgetBefore { + // this is the common case, so we pass 0 so the DB dumps the full table, as is + onlineExcludeBefore = 0 + } else if (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) > onlineAccountsForgetBefore { + // the previous flush left more online-related rows than we want in the DB. we need to tell + // the catchpoint writer to exclude the rows that are older than the ones we want to keep. + onlineExcludeBefore = (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) + } else { + // The previous flush left less online-related rows than we want in the DB. This should not happen; return error + ct.log.Errorf("catchpointTracker.finishFirstStage: dbRound %d and onlineAccountsForgetBefore %d has less history than MaxBalLookback %d", + dbRound, onlineAccountsForgetBefore, params.MaxBalLookback) + return errors.New("catchpointTracker.finishFirstStage: onlineAccountsForgetBefore doesn't provide enough history") + } + if params.EnableCatchpointsWithSPContexts { // Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints, // and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled. @@ -238,13 +256,13 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic // Generate hashes of the onlineaccounts and onlineroundparams tables. err := ct.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) error { var dbErr error - onlineAccountsHash, _, dbErr = calculateVerificationHash(ctx, tx.MakeOnlineAccountsIter, false) + onlineAccountsHash, _, dbErr = calculateVerificationHash(ctx, tx.MakeOnlineAccountsIter, onlineExcludeBefore, false) if dbErr != nil { return dbErr } - onlineRoundParamsHash, _, dbErr = calculateVerificationHash(ctx, tx.MakeOnlineRoundParamsIter, false) + onlineRoundParamsHash, _, dbErr = calculateVerificationHash(ctx, tx.MakeOnlineRoundParamsIter, onlineExcludeBefore, false) if dbErr != nil { return dbErr } @@ -264,7 +282,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic catchpointGenerationStats.BalancesWriteTime = uint64(updatingBalancesDuration.Nanoseconds()) totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks, biggestChunkLen, err = ct.generateCatchpointData( - ctx, params, dbRound, &catchpointGenerationStats, spVerificationEncodedData) + ctx, params, dbRound, onlineExcludeBefore, &catchpointGenerationStats, spVerificationEncodedData) ct.catchpointDataWriting.Store(0) if err != nil { return err @@ -308,7 +326,11 @@ func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round, bl return err } - return ct.finishFirstStage(context.Background(), dbRound, blockProto, 0) + // pass dbRound+1-maxBalLookback as the onlineExcludeBefore parameter: since we can't be sure whether + // there are more than 320 rounds of history in the online accounts tables, this ensures the catchpoint + // will only contain the most recent 320 rounds. + onlineExcludeBefore := (dbRound + 1).SubSaturate(basics.Round(config.Consensus[blockProto].MaxBalLookback)) + return ct.finishFirstStage(context.Background(), dbRound, onlineExcludeBefore, blockProto, 0) } func (ct *catchpointTracker) finishCatchpointsAfterCrash(blockProto protocol.ConsensusVersion, catchpointLookback uint64) error { @@ -977,7 +999,7 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr if dcc.catchpointFirstStage { round := dcc.newBase() blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1] - err := ct.finishFirstStage(ctx, round, blockProto, dcc.updatingBalancesDuration) + err := ct.finishFirstStage(ctx, round, dcc.onlineAccountsForgetBefore, blockProto, dcc.updatingBalancesDuration) if err != nil { ct.log.Warnf( "error finishing catchpoint's first stage dcc.newBase: %d err: %v", @@ -1209,7 +1231,7 @@ func (ct *catchpointTracker) isWritingCatchpointDataFile() bool { // - Balance and KV chunk (named balances.x.msgpack). // ... // - Balance and KV chunk (named balances.x.msgpack). -func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, params config.ConsensusParams, accountsRound basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails, encodedSPData []byte) (totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks, biggestChunkLen uint64, err error) { +func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, params config.ConsensusParams, accountsRound basics.Round, onlineExcludeBefore basics.Round, catchpointGenerationStats *telemetryspec.CatchpointGenerationEventDetails, encodedSPData []byte) (totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks, biggestChunkLen uint64, err error) { ct.log.Debugf("catchpointTracker.generateCatchpointData() writing catchpoint accounts for round %d", accountsRound) startTime := time.Now() @@ -1233,7 +1255,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, params start := time.Now() ledgerGeneratecatchpointCount.Inc(nil) err = ct.dbs.SnapshotContext(ctx, func(dbCtx context.Context, tx trackerdb.SnapshotScope) (err error) { - catchpointWriter, err = makeCatchpointFileWriter(dbCtx, params, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk) + catchpointWriter, err = makeCatchpointFileWriter(dbCtx, params, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk, onlineExcludeBefore) if err != nil { return } diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index 7418ebc98c..68e6c12461 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -1031,12 +1031,12 @@ func (c *catchpointCatchupAccessorImpl) GetVerifyData(ctx context.Context) (bala return fmt.Errorf("unable to get state proof verification data: %v", err) } - onlineAccountsHash, _, err = calculateVerificationHash(ctx, tx.MakeOnlineAccountsIter, true) + onlineAccountsHash, _, err = calculateVerificationHash(ctx, tx.MakeOnlineAccountsIter, 0, true) if err != nil { return fmt.Errorf("unable to get online accounts verification data: %v", err) } - onlineRoundParamsHash, _, err = calculateVerificationHash(ctx, tx.MakeOnlineRoundParamsIter, true) + onlineRoundParamsHash, _, err = calculateVerificationHash(ctx, tx.MakeOnlineRoundParamsIter, 0, true) if err != nil { return fmt.Errorf("unable to get online round params verification data: %v", err) } @@ -1058,11 +1058,12 @@ func (c *catchpointCatchupAccessorImpl) GetVerifyData(ctx context.Context) (bala // both at restore time (in catchpointCatchupAccessorImpl) and snapshot time (in catchpointTracker). func calculateVerificationHash[T crypto.Hashable]( ctx context.Context, - iterFactory func(context.Context, bool) (trackerdb.TableIterator[T], error), + iterFactory func(context.Context, bool, basics.Round) (trackerdb.TableIterator[T], error), + excludeBefore basics.Round, useStaging bool, ) (crypto.Digest, uint64, error) { - rows, err := iterFactory(ctx, useStaging) + rows, err := iterFactory(ctx, useStaging, excludeBefore) if err != nil { return crypto.Digest{}, 0, err } diff --git a/ledger/store/trackerdb/generickv/reader.go b/ledger/store/trackerdb/generickv/reader.go index bbbfadc9c9..ddbc894761 100644 --- a/ledger/store/trackerdb/generickv/reader.go +++ b/ledger/store/trackerdb/generickv/reader.go @@ -20,6 +20,7 @@ import ( "context" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger/encoded" "github.com/algorand/go-algorand/ledger/store/trackerdb" ) @@ -79,13 +80,13 @@ func (r *reader) MakeKVsIter(ctx context.Context) (trackerdb.KVsIter, error) { } // MakeOnlineAccountsIter implements trackerdb.Reader -func (r *reader) MakeOnlineAccountsIter(context.Context, bool) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { +func (r *reader) MakeOnlineAccountsIter(context.Context, bool, basics.Round) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { // TODO: catchpoint panic("unimplemented") } // MakeOnlineRoundParamsIter implements trackerdb.Reader -func (r *reader) MakeOnlineRoundParamsIter(context.Context, bool) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { +func (r *reader) MakeOnlineRoundParamsIter(context.Context, bool, basics.Round) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { // TODO: catchpoint panic("unimplemented") } diff --git a/ledger/store/trackerdb/sqlitedriver/accountsV2.go b/ledger/store/trackerdb/sqlitedriver/accountsV2.go index b5443f0cda..7f2eba4520 100644 --- a/ledger/store/trackerdb/sqlitedriver/accountsV2.go +++ b/ledger/store/trackerdb/sqlitedriver/accountsV2.go @@ -723,7 +723,11 @@ func (w *accountsV2Writer) TxtailNewRound(ctx context.Context, baseRound basics. // After this cleanup runs, accounts in this table will have either one entry (if all entries besides the latest are expired), // or will have more than one entry (if multiple entries are not yet expired). func (w *accountsV2Writer) OnlineAccountsDelete(forgetBefore basics.Round) (err error) { - rows, err := w.e.Query("SELECT rowid, address, updRound, data FROM onlineaccounts WHERE updRound < ? ORDER BY address, updRound DESC", forgetBefore) + return w.onlineAccountsDelete(forgetBefore, "onlineaccounts") +} + +func (w *accountsV2Writer) onlineAccountsDelete(forgetBefore basics.Round, table string) (err error) { + rows, err := w.e.Query("SELECT rowid, address, updRound, data FROM %s WHERE updRound < ? ORDER BY address, updRound DESC", table, forgetBefore) if err != nil { return err } diff --git a/ledger/store/trackerdb/sqlitedriver/kvsIter.go b/ledger/store/trackerdb/sqlitedriver/kvsIter.go index 05fc769d6b..04f40d5254 100644 --- a/ledger/store/trackerdb/sqlitedriver/kvsIter.go +++ b/ledger/store/trackerdb/sqlitedriver/kvsIter.go @@ -25,6 +25,7 @@ import ( "github.com/algorand/go-algorand/ledger/encoded" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/ledger/store/trackerdb" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" ) @@ -62,29 +63,73 @@ func (iter *kvsIter) Close() { // tableIterator is used to dump onlineaccounts and onlineroundparams tables for catchpoints. type tableIterator[T any] struct { - rows *sql.Rows - scan func(*sql.Rows) (T, error) + rows *sql.Rows + scan func(*sql.Rows) (T, error) + onClose func() } func (iter *tableIterator[T]) Next() bool { return iter.rows.Next() } -func (iter *tableIterator[T]) Close() { iter.rows.Close() } +func (iter *tableIterator[T]) Close() { + iter.rows.Close() + if iter.onClose != nil { + iter.onClose() + } +} func (iter *tableIterator[T]) GetItem() (T, error) { return iter.scan(iter.rows) } // MakeOnlineAccountsIter creates an onlineAccounts iterator. -func MakeOnlineAccountsIter(ctx context.Context, q db.Queryable, useStaging bool) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { +func MakeOnlineAccountsIter(ctx context.Context, q db.Queryable, useStaging bool, excludeBefore basics.Round) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { table := "onlineaccounts" if useStaging { table = "catchpointonlineaccounts" } + var onClose func() + if excludeBefore != 0 { + // cheat: use Rdb to make a temporary table that we will delete later + e, ok := q.(*sql.Tx) + if !ok { + return nil, fmt.Errorf("MakeOnlineAccountsIter: cannot convert Queryable to sql.Tx") + } + // create a new table by selecting from the original table + destTable := table + "_iterator" + _, err := e.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", destTable)) + if err != nil { + return nil, err + } + _, err = e.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s AS SELECT * FROM %s", destTable, table)) + if err != nil { + return nil, err + } + // call prune on the new copied table, using the same logic as OnlineAccountsDelete + aw := accountsV2Writer{e: e} + err = aw.onlineAccountsDelete(excludeBefore, destTable) + if err != nil { + return nil, err + } + // remember to drop the table when the iterator is closed + onClose = func() { + _, err = e.ExecContext(ctx, fmt.Sprintf("DROP TABLE %s", destTable)) + if err != nil { + logging.Base().Errorf("Failed to drop table %s: %v", destTable, err) + } + } + // use the new table to create the iterator + table = destTable + } + rows, err := q.QueryContext(ctx, fmt.Sprintf("SELECT address, updround, normalizedonlinebalance, votelastvalid, data FROM %s ORDER BY address, updround", table)) if err != nil { return nil, err } - return &tableIterator[*encoded.OnlineAccountRecordV6]{rows: rows, scan: scanOnlineAccount}, nil + return &tableIterator[*encoded.OnlineAccountRecordV6]{ + rows: rows, + scan: scanOnlineAccount, + onClose: onClose, + }, nil } func scanOnlineAccount(rows *sql.Rows) (*encoded.OnlineAccountRecordV6, error) { @@ -136,12 +181,18 @@ func scanOnlineAccount(rows *sql.Rows) (*encoded.OnlineAccountRecordV6, error) { } // MakeOnlineRoundParamsIter creates an onlineRoundParams iterator. -func MakeOnlineRoundParamsIter(ctx context.Context, q db.Queryable, useStaging bool) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { +func MakeOnlineRoundParamsIter(ctx context.Context, q db.Queryable, useStaging bool, excludeBefore basics.Round) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { table := "onlineroundparamstail" if useStaging { table = "catchpointonlineroundparamstail" } - rows, err := q.QueryContext(ctx, fmt.Sprintf("SELECT rnd, data FROM %s ORDER BY rnd", table)) + + where := "" + if excludeBefore != 0 { + where = fmt.Sprintf("WHERE rnd >= %d", excludeBefore) + } + + rows, err := q.QueryContext(ctx, fmt.Sprintf("SELECT rnd, data FROM %s %s ORDER BY rnd", table, where)) if err != nil { return nil, err } diff --git a/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go b/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go index 247744e01f..f39b9c993d 100644 --- a/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go +++ b/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go @@ -212,13 +212,13 @@ func (r *sqlReader) MakeKVsIter(ctx context.Context) (trackerdb.KVsIter, error) } // MakeOnlineAccountsIter implements trackerdb.Reader -func (r *sqlReader) MakeOnlineAccountsIter(ctx context.Context, useStaging bool) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { - return MakeOnlineAccountsIter(ctx, r.q, useStaging) +func (r *sqlReader) MakeOnlineAccountsIter(ctx context.Context, useStaging bool, excludeBefore basics.Round) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { + return MakeOnlineAccountsIter(ctx, r.q, useStaging, excludeBefore) } // MakeOnlineRoundParamsIter implements trackerdb.Reader -func (r *sqlReader) MakeOnlineRoundParamsIter(ctx context.Context, useStaging bool) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { - return MakeOnlineRoundParamsIter(ctx, r.q, useStaging) +func (r *sqlReader) MakeOnlineRoundParamsIter(ctx context.Context, useStaging bool, excludeBefore basics.Round) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { + return MakeOnlineRoundParamsIter(ctx, r.q, useStaging, excludeBefore) } type sqlWriter struct { diff --git a/ledger/store/trackerdb/store.go b/ledger/store/trackerdb/store.go index 66f7fd0f19..f08bf46987 100644 --- a/ledger/store/trackerdb/store.go +++ b/ledger/store/trackerdb/store.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger/encoded" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/util/db" @@ -66,8 +67,8 @@ type Reader interface { MakeCatchpointReader() (CatchpointReader, error) MakeEncodedAccountsBatchIter() EncodedAccountsBatchIter MakeKVsIter(ctx context.Context) (KVsIter, error) - MakeOnlineAccountsIter(ctx context.Context, useStaging bool) (TableIterator[*encoded.OnlineAccountRecordV6], error) - MakeOnlineRoundParamsIter(ctx context.Context, useStaging bool) (TableIterator[*encoded.OnlineRoundParamsRecordV6], error) + MakeOnlineAccountsIter(ctx context.Context, useStaging bool, excludeBefore basics.Round) (TableIterator[*encoded.OnlineAccountRecordV6], error) + MakeOnlineRoundParamsIter(ctx context.Context, useStaging bool, excludeBefore basics.Round) (TableIterator[*encoded.OnlineRoundParamsRecordV6], error) } // Writer is the interface for the trackerdb write operations. From ef0ae38f1e49b92dc3bed952f06e2b9eefd8fdbe Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Fri, 3 Jan 2025 01:30:22 -0500 Subject: [PATCH 04/11] fix remaining compilation errors from interface change --- cmd/catchpointdump/file.go | 4 ++-- ledger/catchpointfilewriter_test.go | 22 +++++++++---------- ledger/catchpointtracker_test.go | 4 ++-- .../store/trackerdb/dualdriver/dualdriver.go | 5 +++-- 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/cmd/catchpointdump/file.go b/cmd/catchpointdump/file.go index 869cbdd1e3..5c28bcde1e 100644 --- a/cmd/catchpointdump/file.go +++ b/cmd/catchpointdump/file.go @@ -594,7 +594,7 @@ func printOnlineAccounts(databaseName string, stagingTables bool, outFile *os.Fi } return dbAccessor.Atomic(func(ctx context.Context, tx *sql.Tx) error { - rows, err := sqlitedriver.MakeOnlineAccountsIter(ctx, tx, stagingTables) + rows, err := sqlitedriver.MakeOnlineAccountsIter(ctx, tx, stagingTables, 0) if err != nil { return err } @@ -627,7 +627,7 @@ func printOnlineRoundParams(databaseName string, stagingTables bool, outFile *os } return dbAccessor.Atomic(func(ctx context.Context, tx *sql.Tx) error { - rows, err := sqlitedriver.MakeOnlineRoundParamsIter(ctx, tx, stagingTables) + rows, err := sqlitedriver.MakeOnlineRoundParamsIter(ctx, tx, stagingTables, 0) if err != nil { return err } diff --git a/ledger/catchpointfilewriter_test.go b/ledger/catchpointfilewriter_test.go index d3480e6d8d..9b855027cc 100644 --- a/ledger/catchpointfilewriter_test.go +++ b/ledger/catchpointfilewriter_test.go @@ -149,7 +149,7 @@ func verifyStateProofVerificationContextWrite(t *testing.T, data []ledgercore.St require.NoError(t, err) err = ml.trackerDB().Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { - writer, err := makeCatchpointFileWriter(context.Background(), protoParams, fileName, tx, ResourcesPerCatchpointFileChunk) + writer, err := makeCatchpointFileWriter(context.Background(), protoParams, fileName, tx, ResourcesPerCatchpointFileChunk, 0) if err != nil { return err } @@ -264,7 +264,7 @@ func TestBasicCatchpointWriter(t *testing.T) { fileName := filepath.Join(temporaryDirectory, "15.data") err = ml.trackerDB().Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { - writer, err := makeCatchpointFileWriter(context.Background(), protoParams, fileName, tx, ResourcesPerCatchpointFileChunk) + writer, err := makeCatchpointFileWriter(context.Background(), protoParams, fileName, tx, ResourcesPerCatchpointFileChunk, 0) if err != nil { return err } @@ -307,7 +307,7 @@ func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb tracke } err := rdb.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { - writer, err := makeCatchpointFileWriter(context.Background(), params, datapath, tx, maxResourcesPerChunk) + writer, err := makeCatchpointFileWriter(context.Background(), params, datapath, tx, maxResourcesPerChunk, 0) if err != nil { return err } @@ -440,7 +440,7 @@ func TestCatchpointReadDatabaseOverflowSingleAccount(t *testing.T) { totalAccountsWritten := uint64(0) totalResources := 0 totalChunks := 0 - cw, err := makeCatchpointFileWriter(context.Background(), protoParams, catchpointDataFilePath, tx, maxResourcesPerChunk) + cw, err := makeCatchpointFileWriter(context.Background(), protoParams, catchpointDataFilePath, tx, maxResourcesPerChunk, 0) require.NoError(t, err) ar, err := tx.MakeAccountsReader() @@ -546,7 +546,7 @@ func TestCatchpointReadDatabaseOverflowAccounts(t *testing.T) { totalAccountsWritten := uint64(0) totalResources := 0 - cw, err := makeCatchpointFileWriter(context.Background(), protoParams, catchpointDataFilePath, tx, maxResourcesPerChunk) + cw, err := makeCatchpointFileWriter(context.Background(), protoParams, catchpointDataFilePath, tx, maxResourcesPerChunk, 0) require.NoError(t, err) // repeat this until read all accts @@ -1113,18 +1113,18 @@ assert t.Log("DB round generator", genDBRound, "validator", valDBRound) t.Log("Latest round generator", genLatestRound, "validator", valLatestRound) - genOAHash, genOARows, err := calculateVerificationHash(context.Background(), dl.generator.trackerDB().MakeOnlineAccountsIter, false) + genOAHash, genOARows, err := calculateVerificationHash(context.Background(), dl.generator.trackerDB().MakeOnlineAccountsIter, 0, false) require.NoError(t, err) - valOAHash, valOARows, err := calculateVerificationHash(context.Background(), dl.validator.trackerDB().MakeOnlineAccountsIter, false) + valOAHash, valOARows, err := calculateVerificationHash(context.Background(), dl.validator.trackerDB().MakeOnlineAccountsIter, 0, false) require.NoError(t, err) require.Equal(t, genOAHash, valOAHash) require.NotZero(t, genOAHash) require.Equal(t, genOARows, valOARows) require.NotZero(t, genOARows) - genORPHash, genORPRows, err := calculateVerificationHash(context.Background(), dl.generator.trackerDB().MakeOnlineRoundParamsIter, false) + genORPHash, genORPRows, err := calculateVerificationHash(context.Background(), dl.generator.trackerDB().MakeOnlineRoundParamsIter, 0, false) require.NoError(t, err) - valORPHash, valORPRows, err := calculateVerificationHash(context.Background(), dl.validator.trackerDB().MakeOnlineRoundParamsIter, false) + valORPHash, valORPRows, err := calculateVerificationHash(context.Background(), dl.validator.trackerDB().MakeOnlineRoundParamsIter, 0, false) require.NoError(t, err) require.Equal(t, genORPHash, valORPHash) require.NotZero(t, genORPHash) @@ -1141,13 +1141,13 @@ assert l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) defer l.Close() - catchpointOAHash, catchpointOARows, err := calculateVerificationHash(context.Background(), l.trackerDBs.MakeOnlineAccountsIter, false) + catchpointOAHash, catchpointOARows, err := calculateVerificationHash(context.Background(), l.trackerDBs.MakeOnlineAccountsIter, 0, false) require.NoError(t, err) require.Equal(t, genOAHash, catchpointOAHash) t.Log("catchpoint onlineaccounts hash", catchpointOAHash, "matches") require.Equal(t, genOARows, catchpointOARows) - catchpointORPHash, catchpointORPRows, err := calculateVerificationHash(context.Background(), l.trackerDBs.MakeOnlineRoundParamsIter, false) + catchpointORPHash, catchpointORPRows, err := calculateVerificationHash(context.Background(), l.trackerDBs.MakeOnlineRoundParamsIter, 0, false) require.NoError(t, err) require.Equal(t, genORPHash, catchpointORPHash) t.Log("catchpoint onlineroundparams hash", catchpointORPHash, "matches") diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 3c2b766081..0acf74f3b1 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -365,7 +365,7 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics. proto := protocol.ConsensusCurrentVersion var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails _, _, _, _, _, biggestChunkLen, err := ct.generateCatchpointData( - context.Background(), config.Consensus[proto], accountsRound, &catchpointGenerationStats, spVerificationEncodedData) + context.Background(), config.Consensus[proto], accountsRound, 0, &catchpointGenerationStats, spVerificationEncodedData) require.NoError(t, err) require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash) @@ -606,7 +606,7 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) { encodedSPData, _, err := ct.getSPVerificationData() require.NoError(b, err) b.ResetTimer() - ct.generateCatchpointData(context.Background(), proto, basics.Round(0), &catchpointGenerationStats, encodedSPData) + ct.generateCatchpointData(context.Background(), proto, 0, 0, &catchpointGenerationStats, encodedSPData) b.StopTimer() b.ReportMetric(float64(accountsNumber), "accounts") } diff --git a/ledger/store/trackerdb/dualdriver/dualdriver.go b/ledger/store/trackerdb/dualdriver/dualdriver.go index 382db683a7..b9b0196d3a 100644 --- a/ledger/store/trackerdb/dualdriver/dualdriver.go +++ b/ledger/store/trackerdb/dualdriver/dualdriver.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger/encoded" "github.com/algorand/go-algorand/ledger/store/trackerdb" "github.com/algorand/go-algorand/logging" @@ -278,13 +279,13 @@ func (*reader) MakeKVsIter(ctx context.Context) (trackerdb.KVsIter, error) { } // MakeOnlineAccountsIter implements trackerdb.Reader -func (*reader) MakeOnlineAccountsIter(context.Context, bool) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { +func (*reader) MakeOnlineAccountsIter(context.Context, bool, basics.Round) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { // TODO: catchpoint return nil, nil } // MakeOnlineRoundParamsIter implements trackerdb.Reader -func (*reader) MakeOnlineRoundParamsIter(context.Context, bool) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { +func (*reader) MakeOnlineRoundParamsIter(context.Context, bool, basics.Round) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) { // TODO: catchpoint return nil, nil } From 9cade980dc193e8df84293c60c83cd179cb5c45a Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Fri, 3 Jan 2025 01:38:40 -0500 Subject: [PATCH 05/11] fix onlineAccountsDelete table selection --- ledger/store/trackerdb/sqlitedriver/accountsV2.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/store/trackerdb/sqlitedriver/accountsV2.go b/ledger/store/trackerdb/sqlitedriver/accountsV2.go index 7f2eba4520..708b0af673 100644 --- a/ledger/store/trackerdb/sqlitedriver/accountsV2.go +++ b/ledger/store/trackerdb/sqlitedriver/accountsV2.go @@ -727,7 +727,7 @@ func (w *accountsV2Writer) OnlineAccountsDelete(forgetBefore basics.Round) (err } func (w *accountsV2Writer) onlineAccountsDelete(forgetBefore basics.Round, table string) (err error) { - rows, err := w.e.Query("SELECT rowid, address, updRound, data FROM %s WHERE updRound < ? ORDER BY address, updRound DESC", table, forgetBefore) + rows, err := w.e.Query(fmt.Sprintf("SELECT rowid, address, updRound, data FROM %s WHERE updRound < ? ORDER BY address, updRound DESC", table), forgetBefore) if err != nil { return err } From 50994b0fca16e96fcc2d4d0059e3f052d976381f Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:32:34 -0500 Subject: [PATCH 06/11] update TestAccuontOnlineRoundParams, TestOnlineAccountsDeletion, and TestAcctOnlineVotersLongerHistory --- ledger/acctdeltas_test.go | 94 ++++++++++++++++++- ledger/acctonline_test.go | 49 ++++++++++ .../trackerdb/sqlitedriver/accountsV2.go | 6 +- .../store/trackerdb/sqlitedriver/kvsIter.go | 2 +- 4 files changed, 145 insertions(+), 6 deletions(-) diff --git a/ledger/acctdeltas_test.go b/ledger/acctdeltas_test.go index b16657dc64..4dad63d475 100644 --- a/ledger/acctdeltas_test.go +++ b/ledger/acctdeltas_test.go @@ -2750,6 +2750,27 @@ func TestAccountOnlineRoundParams(t *testing.T) { require.Equal(t, onlineRoundParams, dbOnlineRoundParams[1:]) require.Equal(t, maxRounds, int(endRound)) + // Use MakeOnlineRoundParamsIter to dump all data, starting from 10 + iter, err := sqlitedriver.MakeOnlineRoundParamsIter(context.Background(), tx, false, 10) + require.NoError(t, err) + defer iter.Close() + var roundParamsIterData []ledgercore.OnlineRoundParamsData + var roundParamsIterLastRound basics.Round + for iter.Next() { + item, err := iter.GetItem() + require.NoError(t, err) + + var orpData ledgercore.OnlineRoundParamsData + err = protocol.Decode(item.Data, &orpData) + require.NoError(t, err) + roundParamsIterLastRound = item.Round + + roundParamsIterData = append(roundParamsIterData, orpData) + } + require.Equal(t, onlineRoundParams[9:], roundParamsIterData) + require.Equal(t, maxRounds, int(roundParamsIterLastRound)) + + // Prune online round params to rnd 10 err = arw.AccountsPruneOnlineRoundParams(10) require.NoError(t, err) @@ -2772,6 +2793,15 @@ func TestAccountOnlineRoundParams(t *testing.T) { func TestOnlineAccountsDeletion(t *testing.T) { partitiontest.PartitionTest(t) + t.Run("delete", func(t *testing.T) { + runTestOnlineAccountsDeletion(t, testOnlineAccountsDeletion) + }) + t.Run("excludeBefore", func(t *testing.T) { + runTestOnlineAccountsDeletion(t, testOnlineAccountsExcludeBefore) + }) +} + +func runTestOnlineAccountsDeletion(t *testing.T, assertFunc func(*testing.T, basics.Address, basics.Address, *sql.Tx)) { dbs, _ := storetesting.DbOpenTest(t, true) storetesting.SetDbLogging(t, dbs) defer dbs.Close() @@ -2783,8 +2813,6 @@ func TestOnlineAccountsDeletion(t *testing.T) { var accts map[basics.Address]basics.AccountData sqlitedriver.AccountsInitTest(t, tx, accts, protocol.ConsensusCurrentVersion) - arw := sqlitedriver.NewAccountsSQLReaderWriter(tx) - updates := compactOnlineAccountDeltas{} addrA := ledgertesting.RandomAddress() addrB := ledgertesting.RandomAddress() @@ -2837,6 +2865,12 @@ func TestOnlineAccountsDeletion(t *testing.T) { require.NoError(t, err) require.Len(t, updated, 5) + assertFunc(t, addrA, addrB, tx) +} + +func testOnlineAccountsDeletion(t *testing.T, addrA, addrB basics.Address, tx *sql.Tx) { + arw := sqlitedriver.NewAccountsSQLReaderWriter(tx) + queries, err := sqlitedriver.OnlineAccountsInitDbQueries(tx) require.NoError(t, err) @@ -2898,6 +2932,62 @@ func TestOnlineAccountsDeletion(t *testing.T) { } } +// same assertions as testOnlineAccountsDeletion but with excludeBefore +func testOnlineAccountsExcludeBefore(t *testing.T, addrA, addrB basics.Address, tx *sql.Tx) { + // Use MakeOnlineAccountsIter to dump all data, starting from rnd + getAcctDataForRound := func(rnd basics.Round, expectedCount int64) map[basics.Address][]*encoded.OnlineAccountRecordV6 { + it, err := sqlitedriver.MakeOnlineAccountsIter(context.Background(), tx, false, rnd) + require.NoError(t, err) + + var count int64 + ret := make(map[basics.Address][]*encoded.OnlineAccountRecordV6) + for it.Next() { + acct, err := it.GetItem() + require.NoError(t, err) + ret[acct.Address] = append(ret[acct.Address], acct) + count++ + } + require.Equal(t, expectedCount, count) + return ret + } + + for _, rnd := range []basics.Round{1, 2, 3} { + vals := getAcctDataForRound(rnd, 5) + + history, ok := vals[addrA] + require.True(t, ok) + require.Len(t, history, 3) + + history, ok = vals[addrB] + require.True(t, ok) + require.Len(t, history, 2) + } + + for _, rnd := range []basics.Round{4, 5, 6, 7} { + vals := getAcctDataForRound(rnd, 3) + + history, ok := vals[addrA] + require.True(t, ok) + require.Len(t, history, 1) + + history, ok = vals[addrB] + require.True(t, ok) + require.Len(t, history, 2) + } + + for _, rnd := range []basics.Round{8, 9} { + vals := getAcctDataForRound(rnd, 2) + + history, ok := vals[addrA] + require.True(t, ok) + require.Len(t, history, 1) + + history, ok = vals[addrB] + require.True(t, ok) + require.Len(t, history, 1) + } +} + type mockOnlineAccountsErrorWriter struct { } diff --git a/ledger/acctonline_test.go b/ledger/acctonline_test.go index 296a5a2481..2871d6a4c8 100644 --- a/ledger/acctonline_test.go +++ b/ledger/acctonline_test.go @@ -1314,6 +1314,7 @@ func TestAcctOnlineVotersLongerHistory(t *testing.T) { require.NoError(t, err) require.Equal(t, oa.latest()-basics.Round(conf.MaxAcctLookback), endRound) require.Equal(t, maxBlocks-int(lowest)-int(conf.MaxAcctLookback)+1, len(dbOnlineRoundParams)) + require.Equal(t, endRound, oa.cachedDBRoundOnline) _, err = oa.onlineTotalsEx(lowest) require.NoError(t, err) @@ -1324,6 +1325,54 @@ func TestAcctOnlineVotersLongerHistory(t *testing.T) { // ensure the cache size for addrA does not have more entries than maxBalLookback + 1 // +1 comes from the deletion before X without checking account state at X require.Equal(t, maxBalLookback+1, oa.onlineAccountsCache.accounts[addrA].Len()) + + // Test if "excludeBefore" argument works for MakeOnlineAccountsIter & MakeOnlineRoundParamsIter + // when longer history is being used. Exclude rows older than round=lowest+2 + excludeRound := lowest + 2 + + // Test MakeOnlineAccountsIter + var foundCount int + err = oa.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) error { + // read staging = false, excludeBefore = excludeRound + it, err2 := tx.MakeOnlineAccountsIter(ctx, false, excludeRound) + require.NoError(t, err2) + defer it.Close() + + firstSeen := make(map[basics.Address]basics.Round) + for it.Next() { + acct, acctErr := it.GetItem() + require.NoError(t, acctErr) + // We expect all rows to either: + // - have updRound >= excludeRound + // - or have updRound < excludeRound, and only appear once in the iteration (no updates since excludeRound) + if acct.UpdateRound < excludeRound { + require.NotContains(t, firstSeen, acct.Address, "MakeOnlineAccountsIter produced two rows acct %s for dbRound %d updRound %d < excludeRound %d (first seen %d)", acct.Address, endRound, acct.UpdateRound, excludeRound, firstSeen[acct.Address]) + } + firstSeen[acct.Address] = acct.UpdateRound + foundCount++ + } + return nil + }) + require.NoError(t, err) + require.True(t, foundCount > 0, "Should see some accounts that satisfy updRound >= excludeRound") + + // Test MakeOnlineRoundParamsIter + foundCount = 0 + err = oa.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) error { + it, err2 := tx.MakeOnlineRoundParamsIter(ctx, false, excludeRound) + require.NoError(t, err2) + defer it.Close() + + for it.Next() { + roundParams, roundParamsErr := it.GetItem() + require.NoError(t, roundParamsErr) + require.True(t, roundParams.Round >= excludeRound, "MakeOnlineRoundParamsIter produced row for round %d < excludeRound %d", roundParams.Round, excludeRound) + foundCount++ + } + return nil + }) + require.NoError(t, err) + require.EqualValues(t, endRound-excludeRound+1, foundCount, "Should see all round params for rounds >= excludeRound") } // compareTopAccounts makes sure that accounts returned from OnlineTop function are sorted and contains the online accounts on the test diff --git a/ledger/store/trackerdb/sqlitedriver/accountsV2.go b/ledger/store/trackerdb/sqlitedriver/accountsV2.go index 708b0af673..8e87c8cc72 100644 --- a/ledger/store/trackerdb/sqlitedriver/accountsV2.go +++ b/ledger/store/trackerdb/sqlitedriver/accountsV2.go @@ -782,10 +782,10 @@ func (w *accountsV2Writer) onlineAccountsDelete(forgetBefore basics.Round, table rowids = append(rowids, rowid.Int64) } - return onlineAccountsDeleteByRowIDs(w.e, rowids) + return onlineAccountsDeleteByRowIDs(w.e, rowids, table) } -func onlineAccountsDeleteByRowIDs(e db.Executable, rowids []int64) (err error) { +func onlineAccountsDeleteByRowIDs(e db.Executable, rowids []int64, table string) (err error) { if len(rowids) == 0 { return } @@ -795,7 +795,7 @@ func onlineAccountsDeleteByRowIDs(e db.Executable, rowids []int64) (err error) { // rowids might be larger => split to chunks are remove chunks := rowidsToChunkedArgs(rowids) for _, chunk := range chunks { - _, err = e.Exec("DELETE FROM onlineaccounts WHERE rowid IN (?"+strings.Repeat(",?", len(chunk)-1)+")", chunk...) + _, err = e.Exec("DELETE FROM "+table+" WHERE rowid IN (?"+strings.Repeat(",?", len(chunk)-1)+")", chunk...) if err != nil { return } diff --git a/ledger/store/trackerdb/sqlitedriver/kvsIter.go b/ledger/store/trackerdb/sqlitedriver/kvsIter.go index 04f40d5254..92795ea3ed 100644 --- a/ledger/store/trackerdb/sqlitedriver/kvsIter.go +++ b/ledger/store/trackerdb/sqlitedriver/kvsIter.go @@ -91,7 +91,7 @@ func MakeOnlineAccountsIter(ctx context.Context, q db.Queryable, useStaging bool // cheat: use Rdb to make a temporary table that we will delete later e, ok := q.(*sql.Tx) if !ok { - return nil, fmt.Errorf("MakeOnlineAccountsIter: cannot convert Queryable to sql.Tx") + return nil, fmt.Errorf("MakeOnlineAccountsIter: cannot convert Queryable to sql.Tx, q is %T", q) } // create a new table by selecting from the original table destTable := table + "_iterator" From 7f8cdd1162ccb953a89db2e35a9f224876f3f0e8 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:38:33 -0500 Subject: [PATCH 07/11] remove unnecessary newline in catchpointdump --- cmd/catchpointdump/file.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cmd/catchpointdump/file.go b/cmd/catchpointdump/file.go index 5c28bcde1e..1aa4b1f4e9 100644 --- a/cmd/catchpointdump/file.go +++ b/cmd/catchpointdump/file.go @@ -532,7 +532,6 @@ func printKeyValue(writer *bufio.Writer, key, value []byte) { } func printKeyValueStore(databaseName string, stagingTables bool, outFile *os.File) error { - fmt.Printf("\n") printDumpingCatchpointProgressLine(0, 50, 0) lastProgressUpdate := time.Now() progress := uint64(0) @@ -583,8 +582,6 @@ func printKeyValueStore(databaseName string, stagingTables bool, outFile *os.Fil } func printOnlineAccounts(databaseName string, stagingTables bool, outFile *os.File) error { - fmt.Printf("\n") - fileWriter := bufio.NewWriterSize(outFile, 1024*1024) defer fileWriter.Flush() @@ -616,8 +613,6 @@ func printOnlineAccounts(databaseName string, stagingTables bool, outFile *os.Fi } func printOnlineRoundParams(databaseName string, stagingTables bool, outFile *os.File) error { - fmt.Printf("\n") - fileWriter := bufio.NewWriterSize(outFile, 1024*1024) defer fileWriter.Flush() From 4fdd46a6da4d109865215f68c856ef242f15a5e4 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:52:46 -0500 Subject: [PATCH 08/11] update comments for CR --- ledger/catchpointtracker.go | 6 +++--- ledger/store/trackerdb/sqlitedriver/kvsIter.go | 18 ++++++++++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 07754410ce..a07687b25a 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -326,11 +326,11 @@ func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round, bl return err } - // pass dbRound+1-maxBalLookback as the onlineExcludeBefore parameter: since we can't be sure whether + // pass dbRound+1-maxBalLookback as the onlineAccountsForgetBefore parameter: since we can't be sure whether // there are more than 320 rounds of history in the online accounts tables, this ensures the catchpoint // will only contain the most recent 320 rounds. - onlineExcludeBefore := (dbRound + 1).SubSaturate(basics.Round(config.Consensus[blockProto].MaxBalLookback)) - return ct.finishFirstStage(context.Background(), dbRound, onlineExcludeBefore, blockProto, 0) + onlineAccountsForgetBefore := (dbRound + 1).SubSaturate(basics.Round(config.Consensus[blockProto].MaxBalLookback)) + return ct.finishFirstStage(context.Background(), dbRound, onlineAccountsForgetBefore, blockProto, 0) } func (ct *catchpointTracker) finishCatchpointsAfterCrash(blockProto protocol.ConsensusVersion, catchpointLookback uint64) error { diff --git a/ledger/store/trackerdb/sqlitedriver/kvsIter.go b/ledger/store/trackerdb/sqlitedriver/kvsIter.go index 92795ea3ed..9d9014af3b 100644 --- a/ledger/store/trackerdb/sqlitedriver/kvsIter.go +++ b/ledger/store/trackerdb/sqlitedriver/kvsIter.go @@ -79,7 +79,11 @@ func (iter *tableIterator[T]) GetItem() (T, error) { return iter.scan(iter.rows) } -// MakeOnlineAccountsIter creates an onlineAccounts iterator. +// MakeOnlineAccountsIter creates an onlineAccounts iterator, used by the catchpoint system to dump the +// onlineaccounts table to a catchpoint snapshot file. +// +// If excludeBefore is non-zero, the iterator will exclude all data that would have been deleted if +// OnlineAccountsDelete(excludeBefore) were called on this DB before calling MakeOnlineAccountsIter. func MakeOnlineAccountsIter(ctx context.Context, q db.Queryable, useStaging bool, excludeBefore basics.Round) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) { table := "onlineaccounts" if useStaging { @@ -88,7 +92,17 @@ func MakeOnlineAccountsIter(ctx context.Context, q db.Queryable, useStaging bool var onClose func() if excludeBefore != 0 { - // cheat: use Rdb to make a temporary table that we will delete later + // This is a special case to resolve the issue found in #6214. When the state proof votersTracker has not + // yet validated the recent state proof, the onlineaccounts table will hold more than 320 rows, + // to support state proof recovery (votersTracker.lowestRound() sets deferredCommitRange.lowestRound). + // + // While rare, this may happen e.g. during catchup, where blocks may be flying by so quickly that the + // catchpoint snapshot is started before the latest state proof was validated. In this case, excludeBefore + // will be set to R-320 (MaxBalLookback) where R is the DB snapshot round (specified by CatchpointLookback). + // + // Unfortunately catchpoint snapshots occur within a SnapshotScope, and so a db.Queryable cannot + // execute DDL statements. To work around this, we create a temporary table that we will delete + // when the iterator is closed. e, ok := q.(*sql.Tx) if !ok { return nil, fmt.Errorf("MakeOnlineAccountsIter: cannot convert Queryable to sql.Tx, q is %T", q) From 0097856e9fda0ab875b92c303b0938d6c8dd1912 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:23:07 -0500 Subject: [PATCH 09/11] fix name confusion with EnableOnlineAccountCatchpoints --- ledger/catchpointfilewriter.go | 4 ++-- ledger/catchpointfilewriter_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ledger/catchpointfilewriter.go b/ledger/catchpointfilewriter.go index e2a1c40ce1..0e09b7b44e 100644 --- a/ledger/catchpointfilewriter.go +++ b/ledger/catchpointfilewriter.go @@ -379,7 +379,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error { cw.kvDone = true } - if cw.params.EnableOnlineAccountCatchpoints && !cw.onlineAccountsDone { + if cw.params.EnableCatchpointsWithOnlineAccounts && !cw.onlineAccountsDone { // Create the OnlineAccounts iterator JIT if cw.onlineAccountRows == nil { rows, err := cw.tx.MakeOnlineAccountsIter(ctx, false, cw.onlineExcludeBefore) @@ -408,7 +408,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error { cw.onlineAccountsDone = true } - if cw.params.EnableOnlineAccountCatchpoints && !cw.onlineRoundParamsDone { + if cw.params.EnableCatchpointsWithOnlineAccounts && !cw.onlineRoundParamsDone { // Create the OnlineRoundParams iterator JIT if cw.onlineRoundParamsRows == nil { rows, err := cw.tx.MakeOnlineRoundParamsIter(ctx, false, cw.onlineExcludeBefore) diff --git a/ledger/catchpointfilewriter_test.go b/ledger/catchpointfilewriter_test.go index 9b855027cc..1bb8191569 100644 --- a/ledger/catchpointfilewriter_test.go +++ b/ledger/catchpointfilewriter_test.go @@ -831,14 +831,14 @@ func testCatchpointFlushRound(l *Ledger) { func TestExactAccountChunk(t *testing.T) { partitiontest.PartitionTest(t) - t.Parallel() + // t.Parallel() // probably not good to parallelize catchpoint file save/load - t.Run("v33", func(t *testing.T) { - proto := protocol.ConsensusV33 + t.Run("v39", func(t *testing.T) { + proto := protocol.ConsensusV39 testExactAccountChunk(t, proto, 1) }) - t.Run("v34", func(t *testing.T) { - proto := protocol.ConsensusV34 + t.Run("v40", func(t *testing.T) { + proto := protocol.ConsensusV40 testExactAccountChunk(t, proto, 2) }) t.Run("future", func(t *testing.T) { From 11a5f46b4f096a4bcc70f0eda9a8ce53ef14a0ad Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Wed, 8 Jan 2025 12:22:47 -0500 Subject: [PATCH 10/11] assert on catchpoint content in TestExactAccountChunk --- ledger/catchpointfilewriter_test.go | 66 ++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/ledger/catchpointfilewriter_test.go b/ledger/catchpointfilewriter_test.go index 1bb8191569..1888aae774 100644 --- a/ledger/catchpointfilewriter_test.go +++ b/ledger/catchpointfilewriter_test.go @@ -818,7 +818,7 @@ func TestFullCatchpointWriter(t *testing.T) { // ensure both committed all pending changes before taking a catchpoint // another approach is to modify the test and craft round numbers, // and make the ledger to generate catchpoint itself when it is time -func testCatchpointFlushRound(l *Ledger) { +func testCatchpointFlushRound(l *Ledger) basics.Round { // Clear the timer to ensure a flush l.trackers.mu.Lock() l.trackers.lastFlushTime = time.Time{} @@ -827,36 +827,31 @@ func testCatchpointFlushRound(l *Ledger) { r, _ := l.LatestCommitted() l.trackers.committedUpTo(r) l.trackers.waitAccountsWriting() + return r } func TestExactAccountChunk(t *testing.T) { partitiontest.PartitionTest(t) // t.Parallel() // probably not good to parallelize catchpoint file save/load - t.Run("v39", func(t *testing.T) { - proto := protocol.ConsensusV39 - testExactAccountChunk(t, proto, 1) - }) - t.Run("v40", func(t *testing.T) { - proto := protocol.ConsensusV40 - testExactAccountChunk(t, proto, 2) - }) - t.Run("future", func(t *testing.T) { - proto := protocol.ConsensusFuture - testExactAccountChunk(t, proto, 2) - }) + t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39) }) + t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40) }) + t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture) }) } -func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, totalChunks int) { - genBalances, addrs, _ := ledgertesting.NewTestGenesis() +func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) { + genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(c *ledgertesting.GenesisCfg) { + c.OnlineCount = 1 // addrs[0] is online + }, ledgertesting.TurnOffRewards) cfg := config.GetDefaultLocal() dl := NewDoubleLedger(t, genBalances, proto, cfg) defer dl.Close() + payFrom := addrs[1] // offline account sends pays pay := txntest.Txn{ Type: "pay", - Sender: addrs[0], + Sender: payFrom, Amount: 1_000_000, } // There are 12 accounts in the NewTestGenesis, so we create more so that we @@ -871,13 +866,15 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, totalC // At least 32 more blocks so that we catchpoint after the accounts exist for i := 0; i < 40; i++ { selfpay := pay - selfpay.Receiver = addrs[0] + selfpay.Receiver = payFrom selfpay.Note = ledgertesting.RandomNote() dl.fullBlock(&selfpay) } - testCatchpointFlushRound(dl.generator) - testCatchpointFlushRound(dl.validator) + genR := testCatchpointFlushRound(dl.generator) + valR := testCatchpointFlushRound(dl.validator) + require.Equal(t, genR, valR) + require.EqualValues(t, BalancesPerCatchpointFileChunk-12+40, genR) // 540 (512-12+40) require.Eventually(t, func() bool { dl.generator.accts.accountsMu.RLock() @@ -896,7 +893,36 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, totalC catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) - require.EqualValues(t, cph.TotalChunks, totalChunks) + + decodedData := readCatchpointFile(t, catchpointFilePath) + + // decode and verify some stats about balances chunk contents + var chunks []catchpointFileChunkV6 + for i, d := range decodedData { + t.Logf("section %d: %s", i, d.headerName) + if strings.HasPrefix(d.headerName, "balances.") { + var chunk catchpointFileChunkV6 + err := protocol.Decode(d.data, &chunk) + require.NoError(t, err) + t.Logf("chunk %d balances: %d, kvs: %d, onlineaccounts: %d, onlineroundparams: %d", i, len(chunk.Balances), len(chunk.KVs), len(chunk.OnlineAccounts), len(chunk.OnlineRoundParams)) + chunks = append(chunks, chunk) + } + } + if config.Consensus[proto].EnableCatchpointsWithOnlineAccounts { + require.Len(t, chunks, 3) + } else { + require.Len(t, chunks, 1) + } + require.Len(t, chunks, int(cph.TotalChunks)) + + // first chunk is maxed out (512 accounts) + require.Len(t, chunks[0].Balances, BalancesPerCatchpointFileChunk) + + if config.Consensus[proto].EnableCatchpointsWithOnlineAccounts { + // second and third chunks are onlinaccounts and onlineroundparams + require.Len(t, chunks[1].OnlineAccounts, 1) + require.Len(t, chunks[2].OnlineRoundParams, 320) + } l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) defer l.Close() From 69be46bf705b63ae5474375845be03d239aad812 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Wed, 8 Jan 2025 12:50:38 -0500 Subject: [PATCH 11/11] update TestExactAccountChunk to cause SP votersTracker to extend history before making catchpoint --- ledger/catchpointfilewriter_test.go | 58 +++++++++++++++++++---------- ledger/catchpointtracker.go | 8 ++-- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/ledger/catchpointfilewriter_test.go b/ledger/catchpointfilewriter_test.go index 1888aae774..a361c18f36 100644 --- a/ledger/catchpointfilewriter_test.go +++ b/ledger/catchpointfilewriter_test.go @@ -297,7 +297,7 @@ func TestBasicCatchpointWriter(t *testing.T) { require.Equal(t, uint64(len(accts)), uint64(len(chunk.Balances))) } -func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb trackerdb.Store, datapath string, filepath string, maxResourcesPerChunk int) CatchpointFileHeader { +func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb trackerdb.Store, datapath string, filepath string, maxResourcesPerChunk int, onlineExcludeBefore basics.Round) CatchpointFileHeader { var totalAccounts, totalKVs, totalOnlineAccounts, totalOnlineRoundParams, totalChunks uint64 var biggestChunkLen uint64 var accountsRnd basics.Round @@ -307,7 +307,7 @@ func testWriteCatchpoint(t *testing.T, params config.ConsensusParams, rdb tracke } err := rdb.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { - writer, err := makeCatchpointFileWriter(context.Background(), params, datapath, tx, maxResourcesPerChunk, 0) + writer, err := makeCatchpointFileWriter(context.Background(), params, datapath, tx, maxResourcesPerChunk, onlineExcludeBefore) if err != nil { return err } @@ -604,7 +604,7 @@ func TestFullCatchpointWriterOverflowAccounts(t *testing.T) { catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint") const maxResourcesPerChunk = 5 - testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk) + testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk, 0) l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath) defer l.Close() @@ -802,7 +802,7 @@ func TestFullCatchpointWriter(t *testing.T) { catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint") - testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + testWriteCatchpoint(t, protoParams, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0) l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath) defer l.Close() @@ -834,16 +834,19 @@ func TestExactAccountChunk(t *testing.T) { partitiontest.PartitionTest(t) // t.Parallel() // probably not good to parallelize catchpoint file save/load - t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39) }) - t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40) }) - t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture) }) + t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39, 40) }) + t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 40) }) + t.Run("v40_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 100) }) + t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 40) }) + t.Run("future_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 100) }) } -func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) { +func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraBlocks int) { genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(c *ledgertesting.GenesisCfg) { c.OnlineCount = 1 // addrs[0] is online }, ledgertesting.TurnOffRewards) cfg := config.GetDefaultLocal() + params := config.Consensus[proto] dl := NewDoubleLedger(t, genBalances, proto, cfg) defer dl.Close() @@ -863,8 +866,8 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) { dl.fullBlock(&newacctpay) } - // At least 32 more blocks so that we catchpoint after the accounts exist - for i := 0; i < 40; i++ { + // Add more blocks so that we catchpoint after the accounts exist + for i := 0; i < extraBlocks; i++ { selfpay := pay selfpay.Receiver = payFrom selfpay.Note = ledgertesting.RandomNote() @@ -874,7 +877,7 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) { genR := testCatchpointFlushRound(dl.generator) valR := testCatchpointFlushRound(dl.validator) require.Equal(t, genR, valR) - require.EqualValues(t, BalancesPerCatchpointFileChunk-12+40, genR) // 540 (512-12+40) + require.EqualValues(t, BalancesPerCatchpointFileChunk-12+extraBlocks, genR) require.Eventually(t, func() bool { dl.generator.accts.accountsMu.RLock() @@ -892,7 +895,24 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + genDBRound := dl.generator.trackers.acctsOnline.cachedDBRoundOnline + valDBRound := dl.validator.trackers.acctsOnline.cachedDBRoundOnline + genLowestRound := dl.generator.trackers.acctsOnline.voters.lowestRound(genDBRound) + valLowestRound := dl.validator.trackers.acctsOnline.voters.lowestRound(valDBRound) + require.Equal(t, genLowestRound, valLowestRound) + require.Equal(t, genDBRound, valDBRound) + + var onlineExcludeBefore basics.Round + // we added so many blocks that lowestRound is stuck at first state proof, round 240? + if normalHorizon := (genDBRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)); normalHorizon == genLowestRound { + t.Logf("subtest is exercising case where 320 rounds of history are already in DB") + require.EqualValues(t, genLowestRound, params.StateProofInterval-params.StateProofVotersLookback) + } else if normalHorizon > genLowestRound { + t.Logf("subtest is exercising case where votersTracker causes onlineaccounts & onlineroundparams to extend history to round %d (DBRound %d)", genLowestRound, genDBRound) + onlineExcludeBefore = normalHorizon // fails without this adjustment + } + + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, onlineExcludeBefore) decodedData := readCatchpointFile(t, catchpointFilePath) @@ -920,8 +940,8 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion) { if config.Consensus[proto].EnableCatchpointsWithOnlineAccounts { // second and third chunks are onlinaccounts and onlineroundparams - require.Len(t, chunks[1].OnlineAccounts, 1) - require.Len(t, chunks[2].OnlineRoundParams, 320) + require.Len(t, chunks[1].OnlineAccounts, 1) // only 1 online account + require.Len(t, chunks[2].OnlineRoundParams, int(params.MaxBalLookback)) // 320 } l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) @@ -974,7 +994,7 @@ func TestCatchpointAfterTxns(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0) require.EqualValues(t, 3, cph.TotalChunks) l := testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath) @@ -990,7 +1010,7 @@ func TestCatchpointAfterTxns(t *testing.T) { dl.fullBlock(&newacctpay) // Write and read back in, and ensure even the last effect exists. - cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0) require.EqualValues(t, cph.TotalChunks, 3) // Still only 3 chunks, as last was in a recent block // Drive home the point that `last` is _not_ included in the catchpoint by inspecting balance read from catchpoint. @@ -1006,7 +1026,7 @@ func TestCatchpointAfterTxns(t *testing.T) { dl.fullBlock(pay.Noted(strconv.Itoa(i))) } - cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph = testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0) require.EqualValues(t, cph.TotalChunks, 4) l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath) @@ -1161,7 +1181,7 @@ assert catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0) require.EqualValues(t, 7, cph.TotalChunks) l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) @@ -1261,7 +1281,7 @@ func TestCatchpointAfterBoxTxns(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, config.Consensus[proto], dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, 0) require.EqualValues(t, 3, cph.TotalChunks) l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index a07687b25a..9394eae8ed 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -225,17 +225,17 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic var onlineAccountsHash, onlineRoundParamsHash crypto.Digest params := config.Consensus[blockProto] - // Usually onlineAccountsForgetBefore is dbRound - params.MaxBalLookback (320 roudns of history), + // Usually onlineAccountsForgetBefore is dbRound - params.MaxBalLookback (320 rounds of history), // but if votersTracker needs more state, it can set lowestRound to be earlier than that. // We want to only write MaxBalLookback rounds of history to the catchpoint file. var onlineExcludeBefore basics.Round - if (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) == onlineAccountsForgetBefore { + if normalOnlineHorizon := (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)); normalOnlineHorizon == onlineAccountsForgetBefore { // this is the common case, so we pass 0 so the DB dumps the full table, as is onlineExcludeBefore = 0 - } else if (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) > onlineAccountsForgetBefore { + } else if normalOnlineHorizon > onlineAccountsForgetBefore { // the previous flush left more online-related rows than we want in the DB. we need to tell // the catchpoint writer to exclude the rows that are older than the ones we want to keep. - onlineExcludeBefore = (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)) + onlineExcludeBefore = normalOnlineHorizon } else { // The previous flush left less online-related rows than we want in the DB. This should not happen; return error ct.log.Errorf("catchpointTracker.finishFirstStage: dbRound %d and onlineAccountsForgetBefore %d has less history than MaxBalLookback %d",