Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchpoints: more support for EnableOnlineAccountCatchpoints #6214

Merged
Prev Previous commit
Next Next commit
update comments for CR
  • Loading branch information
cce committed Jan 7, 2025
commit 4fdd46a6da4d109865215f68c856ef242f15a5e4
6 changes: 3 additions & 3 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@
onlineExcludeBefore = (dbRound + 1).SubSaturate(basics.Round(params.MaxBalLookback))
} else {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
// The previous flush left less online-related rows than we want in the DB. This should not happen; return error
ct.log.Errorf("catchpointTracker.finishFirstStage: dbRound %d and onlineAccountsForgetBefore %d has less history than MaxBalLookback %d",
dbRound, onlineAccountsForgetBefore, params.MaxBalLookback)
return errors.New("catchpointTracker.finishFirstStage: onlineAccountsForgetBefore doesn't provide enough history")

Check warning on line 243 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L241-L243

Added lines #L241 - L243 were not covered by tests
}

if params.EnableCatchpointsWithSPContexts {
Expand Down Expand Up @@ -326,11 +326,11 @@
return err
}

// pass dbRound+1-maxBalLookback as the onlineExcludeBefore parameter: since we can't be sure whether
// pass dbRound+1-maxBalLookback as the onlineAccountsForgetBefore parameter: since we can't be sure whether
// there are more than 320 rounds of history in the online accounts tables, this ensures the catchpoint
// will only contain the most recent 320 rounds.
onlineExcludeBefore := (dbRound + 1).SubSaturate(basics.Round(config.Consensus[blockProto].MaxBalLookback))
return ct.finishFirstStage(context.Background(), dbRound, onlineExcludeBefore, blockProto, 0)
onlineAccountsForgetBefore := (dbRound + 1).SubSaturate(basics.Round(config.Consensus[blockProto].MaxBalLookback))
return ct.finishFirstStage(context.Background(), dbRound, onlineAccountsForgetBefore, blockProto, 0)
}

func (ct *catchpointTracker) finishCatchpointsAfterCrash(blockProto protocol.ConsensusVersion, catchpointLookback uint64) error {
Expand Down
18 changes: 16 additions & 2 deletions ledger/store/trackerdb/sqlitedriver/kvsIter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,55 +69,69 @@
}

func (iter *tableIterator[T]) Next() bool { return iter.rows.Next() }
func (iter *tableIterator[T]) Close() {
iter.rows.Close()
if iter.onClose != nil {
iter.onClose()

Check warning on line 75 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L72-L75

Added lines #L72 - L75 were not covered by tests
}
}
func (iter *tableIterator[T]) GetItem() (T, error) {
return iter.scan(iter.rows)
}

// MakeOnlineAccountsIter creates an onlineAccounts iterator.
// MakeOnlineAccountsIter creates an onlineAccounts iterator, used by the catchpoint system to dump the
// onlineaccounts table to a catchpoint snapshot file.
//
// If excludeBefore is non-zero, the iterator will exclude all data that would have been deleted if
// OnlineAccountsDelete(excludeBefore) were called on this DB before calling MakeOnlineAccountsIter.
func MakeOnlineAccountsIter(ctx context.Context, q db.Queryable, useStaging bool, excludeBefore basics.Round) (trackerdb.TableIterator[*encoded.OnlineAccountRecordV6], error) {

Check warning on line 87 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L87

Added line #L87 was not covered by tests
table := "onlineaccounts"
if useStaging {
table = "catchpointonlineaccounts"
}

var onClose func()
if excludeBefore != 0 {

Check warning on line 94 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L93-L94

Added lines #L93 - L94 were not covered by tests
// cheat: use Rdb to make a temporary table that we will delete later
// This is a special case to resolve the issue found in #6214. When the state proof votersTracker has not
// yet validated the recent state proof, the onlineaccounts table will hold more than 320 rows,
// to support state proof recovery (votersTracker.lowestRound() sets deferredCommitRange.lowestRound).
//
// While rare, this may happen e.g. during catchup, where blocks may be flying by so quickly that the
// catchpoint snapshot is started before the latest state proof was validated. In this case, excludeBefore
// will be set to R-320 (MaxBalLookback) where R is the DB snapshot round (specified by CatchpointLookback).
//
// Unfortunately catchpoint snapshots occur within a SnapshotScope, and so a db.Queryable cannot
// execute DDL statements. To work around this, we create a temporary table that we will delete
// when the iterator is closed.
e, ok := q.(*sql.Tx)
if !ok {
return nil, fmt.Errorf("MakeOnlineAccountsIter: cannot convert Queryable to sql.Tx, q is %T", q)

Check warning on line 108 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L106-L108

Added lines #L106 - L108 were not covered by tests
}
// create a new table by selecting from the original table
destTable := table + "_iterator"
_, err := e.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", destTable))
if err != nil {
return nil, err

Check warning on line 114 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L111-L114

Added lines #L111 - L114 were not covered by tests
}
_, err = e.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s AS SELECT * FROM %s", destTable, table))
if err != nil {
return nil, err

Check warning on line 118 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L116-L118

Added lines #L116 - L118 were not covered by tests
}
// call prune on the new copied table, using the same logic as OnlineAccountsDelete
aw := accountsV2Writer{e: e}
err = aw.onlineAccountsDelete(excludeBefore, destTable)
if err != nil {
return nil, err

Check warning on line 124 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L121-L124

Added lines #L121 - L124 were not covered by tests
}
// remember to drop the table when the iterator is closed
onClose = func() {
_, err = e.ExecContext(ctx, fmt.Sprintf("DROP TABLE %s", destTable))
if err != nil {
logging.Base().Errorf("Failed to drop table %s: %v", destTable, err)

Check warning on line 130 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L127-L130

Added lines #L127 - L130 were not covered by tests
}
}
// use the new table to create the iterator
table = destTable

Check warning on line 134 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L134

Added line #L134 was not covered by tests
}

rows, err := q.QueryContext(ctx, fmt.Sprintf("SELECT address, updround, normalizedonlinebalance, votelastvalid, data FROM %s ORDER BY address, updround", table))
Expand All @@ -125,11 +139,11 @@
return nil, err
}

return &tableIterator[*encoded.OnlineAccountRecordV6]{
rows: rows,
scan: scanOnlineAccount,
onClose: onClose,
}, nil

Check warning on line 146 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L142-L146

Added lines #L142 - L146 were not covered by tests
}

func scanOnlineAccount(rows *sql.Rows) (*encoded.OnlineAccountRecordV6, error) {
Expand Down Expand Up @@ -181,18 +195,18 @@
}

// MakeOnlineRoundParamsIter creates an onlineRoundParams iterator.
func MakeOnlineRoundParamsIter(ctx context.Context, q db.Queryable, useStaging bool, excludeBefore basics.Round) (trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6], error) {

Check warning on line 198 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L198

Added line #L198 was not covered by tests
table := "onlineroundparamstail"
if useStaging {
table = "catchpointonlineroundparamstail"
}

where := ""
if excludeBefore != 0 {
where = fmt.Sprintf("WHERE rnd >= %d", excludeBefore)

Check warning on line 206 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L204-L206

Added lines #L204 - L206 were not covered by tests
}

rows, err := q.QueryContext(ctx, fmt.Sprintf("SELECT rnd, data FROM %s %s ORDER BY rnd", table, where))

Check warning on line 209 in ledger/store/trackerdb/sqlitedriver/kvsIter.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/sqlitedriver/kvsIter.go#L209

Added line #L209 was not covered by tests
if err != nil {
return nil, err
}
Expand Down
Loading