Skip to content

Commit

Permalink
ledger: shorter deltas lookback implementation (320 rounds project) (#…
Browse files Browse the repository at this point in the history
…4003)

Reduce deltas size from 320 to 8 by introducing a new online accounts tracker
that preserves history of state (online/offline, stake, protos) for at least
MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback rounds back from Latest.
New data are stores in new tables and they are excluded from catchpoints.

TxTail stores its data into a table as well in order to prevent full blocks loading on startup
and preserves up to MaxTxnLife + DeeperBlockHistory blocks and caches their headers.

Catchpoint generation is made in two stages: data file at X-320 and catchpoint itself at round X.

Regular nodes see ~3x memory consumption decrease on high 6,000 TPS load.

Co-authored-by: Tsachi Herman <tsachi.herman@algorand.com>
Co-authored-by: nicholasguoalgorand <67928479+nicholasguoalgorand@users.noreply.github.com>
Co-authored-by: chris erway <chris.erway@algorand.com>
  • Loading branch information
4 people authored Jul 6, 2022
1 parent 8a2ae16 commit 90b1c05
Show file tree
Hide file tree
Showing 64 changed files with 11,527 additions and 1,768 deletions.
10 changes: 6 additions & 4 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,12 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
return cs.abort(fmt.Errorf("processStageBlocksDownload failed, unable to ensure first block : %v", err))
}

// pick the lookback with the greater of either MaxTxnLife or MaxBalLookback
lookback := config.Consensus[topBlock.CurrentProtocol].MaxTxnLife
if lookback < config.Consensus[topBlock.CurrentProtocol].MaxBalLookback {
lookback = config.Consensus[topBlock.CurrentProtocol].MaxBalLookback
// pick the lookback with the greater of either (MaxTxnLife+DeeperBlockHeaderHistory)
// or MaxBalLookback
proto := config.Consensus[topBlock.CurrentProtocol]
lookback := proto.MaxTxnLife + proto.DeeperBlockHeaderHistory
if lookback < proto.MaxBalLookback {
lookback = proto.MaxBalLookback
}
// in case the effective lookback is going before our rounds count, trim it there.
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife)
Expand Down
17 changes: 3 additions & 14 deletions catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,9 @@ func (lf *ledgerFetcher) getPeerLedger(ctx context.Context, peer network.HTTPPee
return fmt.Errorf("getPeerLedger received a tar header with data size of %d", header.Size)
}
balancesBlockBytes := make([]byte, header.Size)
readComplete := int64(0)

for readComplete < header.Size {
bytesRead, err := tarReader.Read(balancesBlockBytes[readComplete:])
readComplete += int64(bytesRead)
if err != nil {
if err == io.EOF {
if readComplete == header.Size {
break
}
err = fmt.Errorf("getPeerLedger received io.EOF while reading from tar file stream prior of reaching chunk size %d / %d", readComplete, header.Size)
}
return err
}
_, err = io.ReadFull(tarReader, balancesBlockBytes)
if err != nil {
return err
}
start := time.Now()
err = lf.processBalancesBlock(ctx, header.Name, balancesBlockBytes, &downloadProgress)
Expand Down
6 changes: 3 additions & 3 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Ledger interface {
EnsureBlock(block *bookkeeping.Block, c agreement.Certificate)
LastRound() basics.Round
Block(basics.Round) (bookkeeping.Block, error)
IsWritingCatchpointFile() bool
IsWritingCatchpointDataFile() bool
Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
}
Expand Down Expand Up @@ -493,7 +493,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
}
// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
if s.ledger.IsWritingCatchpointFile() {
if s.ledger.IsWritingCatchpointDataFile() {
s.log.Info("Catchup is stopping due to catchpoint file being written")
s.suspendForCatchpointWriting = true
return
Expand Down Expand Up @@ -554,7 +554,7 @@ func (s *Service) periodicSync() {
continue
}
// check to see if we're currently writing a catchpoint file. If so, wait longer before attempting again.
if s.ledger.IsWritingCatchpointFile() {
if s.ledger.IsWritingCatchpointDataFile() {
// keep the existing sleep duration and try again later.
continue
}
Expand Down
2 changes: 1 addition & 1 deletion catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (m *mockedLedger) LookupAgreement(basics.Round, basics.Address) (basics.Onl
return basics.OnlineAccountData{}, errors.New("not needed for mockedLedger")
}

func (m *mockedLedger) IsWritingCatchpointFile() bool {
func (m *mockedLedger) IsWritingCatchpointDataFile() bool {
return false
}

Expand Down
23 changes: 21 additions & 2 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,18 @@ type ConsensusParams struct {
// This new header is in addition to the existing SHA512_256 merkle root.
// It is useful for verifying transaction on different blockchains, as some may not support SHA512_256 OPCODE natively but SHA256 is common.
EnableSHA256TxnCommitmentHeader bool

// CatchpointLookback specifies a round lookback to take catchpoints at.
// Accounts snapshot for round X will be taken at X-CatchpointLookback
CatchpointLookback uint64

// DeeperBlockHeaderHistory defines number of rounds in addition to MaxTxnLife
// available for lookup for smart contracts and smart signatures.
// Setting it to 1 for example allows querying data up to MaxTxnLife + 1 rounds back from the Latest.
DeeperBlockHeaderHistory uint64

// EnableOnlineAccountCatchpoints specifies when to re-enable catchpoints after the online account table migration has occurred.
EnableOnlineAccountCatchpoints bool
}

// PaysetCommitType enumerates possible ways for the block header to commit to
Expand All @@ -452,7 +464,7 @@ const (
// PaysetCommitFlat hashes the entire payset array.
PaysetCommitFlat

// PaysetCommitMerkle uses merklearray to commit to the payset.
// PaysetCommitMerkle uses merkle array to commit to the payset.
PaysetCommitMerkle
)

Expand Down Expand Up @@ -593,7 +605,7 @@ func (cp ConsensusProtocols) DeepCopy() ConsensusProtocols {
return staticConsensus
}

// Merge merges a configurable consensus ontop of the existing consensus protocol and return
// Merge merges a configurable consensus on top of the existing consensus protocol and return
// a new consensus protocol without modify any of the incoming structures.
func (cp ConsensusProtocols) Merge(configurableConsensus ConsensusProtocols) ConsensusProtocols {
staticConsensus := cp.DeepCopy()
Expand Down Expand Up @@ -1135,6 +1147,12 @@ func initConsensusProtocols() {
// FilterTimeout for period 0 should take a new optimized, configured value, need to revisit this later
vFuture.AgreementFilterTimeoutPeriod0 = 4 * time.Second

// Make the accounts snapshot for round X at X-CatchpointLookback
vFuture.CatchpointLookback = 320

// Require MaxTxnLife + X blocks and headers preserved by a node
vFuture.DeeperBlockHeaderHistory = 1

// Enable compact certificates.
vFuture.CompactCertRounds = 256
vFuture.CompactCertTopVoters = 1024 * 1024
Expand All @@ -1148,6 +1166,7 @@ func initConsensusProtocols() {
vFuture.UnifyInnerTxIDs = true

vFuture.EnableSHA256TxnCommitmentHeader = true
vFuture.EnableOnlineAccountCatchpoints = true

Consensus[protocol.ConsensusFuture] = vFuture
}
Expand Down
11 changes: 8 additions & 3 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ type Local struct {
// Version tracks the current version of the defaults so we can migrate old -> new
// This is specifically important whenever we decide to change the default value
// for an existing parameter. This field tag must be updated any time we add a new version.
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22"`
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23"`

// environmental (may be overridden)
// When enabled, stores blocks indefinitally, otherwise, only the most recents blocks
// When enabled, stores blocks indefinitely, otherwise, only the most recent blocks
// are being kept around. ( the precise number of recent blocks depends on the consensus parameters )
Archival bool `version[0]:"false"`

Expand Down Expand Up @@ -318,7 +318,8 @@ type Local struct {

// CatchpointTracking determines if catchpoints are going to be tracked. The value is interpreted as follows:
// A value of -1 means "don't track catchpoints".
// A value of 1 means "track catchpoints as long as CatchpointInterval is also set to a positive non-zero value". If CatchpointInterval <= 0, no catchpoint tracking would be performed.
// A value of 1 means "track catchpoints as long as CatchpointInterval > 0".
// A value of 2 means "track catchpoints and always generate catchpoint files as long as CatchpointInterval > 0".
// A value of 0 means automatic, which is the default value. In this mode, a non archival node would not track the catchpoints, and an archival node would track the catchpoints as long as CatchpointInterval > 0.
// Other values of CatchpointTracking would give a warning in the log file, and would behave as if the default value was provided.
CatchpointTracking int64 `version[11]:"0"`
Expand Down Expand Up @@ -447,6 +448,10 @@ type Local struct {

// AgreementIncomingBundlesQueueLength sets the size of the buffer holding incoming bundles.
AgreementIncomingBundlesQueueLength uint64 `version[21]:"7"`

// MaxAcctLookback sets the maximum lookback range for account states,
// i.e. the ledger can answer account states questions for the range Latest-MaxAcctLookback...Latest
MaxAcctLookback uint64 `version[23]:"8"`
}

// DNSBootstrapArray returns an array of one or more DNS Bootstrap identifiers
Expand Down
3 changes: 2 additions & 1 deletion config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package config

var defaultLocal = Local{
Version: 22,
Version: 23,
AccountUpdatesStatsInterval: 5000000000,
AccountsRebuildSynchronousMode: 1,
AgreementIncomingBundlesQueueLength: 7,
Expand Down Expand Up @@ -85,6 +85,7 @@ var defaultLocal = Local{
LogArchiveName: "node.archive.log",
LogSizeLimit: 1073741824,
MaxAPIResourcesPerAccount: 100000,
MaxAcctLookback: 8,
MaxCatchpointDownloadDuration: 7200000000000,
MaxConnectionsPerIP: 30,
MinCatchpointFileDownloadBytesPerSecond: 20480,
Expand Down
41 changes: 24 additions & 17 deletions data/basics/userBalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,24 @@ func UnmarshalStatus(value string) (s Status, err error) {
return
}

// OnlineAccountData contains the voting information for a single account.
//msgp:ignore OnlineAccountData
type OnlineAccountData struct {
MicroAlgosWithRewards MicroAlgos

VoteID crypto.OneTimeSignatureVerifier
SelectionID crypto.VRFVerifier
// VotingData holds voting-related data
type VotingData struct {
VoteID crypto.OneTimeSignatureVerifier
SelectionID crypto.VRFVerifier
StateProofID merklesignature.Verifier

VoteFirstValid Round
VoteLastValid Round
VoteKeyDilution uint64
}

// OnlineAccountData contains the voting information for a single account.
//msgp:ignore OnlineAccountData
type OnlineAccountData struct {
MicroAlgosWithRewards MicroAlgos
VotingData
}

// AccountData contains the data associated with a given address.
//
// This includes the account balance, cryptographic public keys,
Expand Down Expand Up @@ -522,12 +527,14 @@ func (u AccountData) OnlineAccountData() OnlineAccountData {

return OnlineAccountData{
MicroAlgosWithRewards: u.MicroAlgos,

VoteID: u.VoteID,
SelectionID: u.SelectionID,
VoteFirstValid: u.VoteFirstValid,
VoteLastValid: u.VoteLastValid,
VoteKeyDilution: u.VoteKeyDilution,
VotingData: VotingData{
VoteID: u.VoteID,
SelectionID: u.SelectionID,
StateProofID: u.StateProofID,
VoteFirstValid: u.VoteFirstValid,
VoteLastValid: u.VoteLastValid,
VoteKeyDilution: u.VoteKeyDilution,
},
}
}

Expand Down Expand Up @@ -598,26 +605,26 @@ func (u AccountData) NormalizedOnlineBalance(proto config.ConsensusParams) uint6
// on how recently the account has been touched (our rewards do not implement
// compounding). However, online accounts have to periodically renew
// participation keys, so the scale of the inconsistency is small.
func NormalizedOnlineAccountBalance(status Status, rewardsBase uint64, microAlgos MicroAlgos, proto config.ConsensusParams) uint64 {
func NormalizedOnlineAccountBalance(status Status, rewardsBase uint64, microAlgos MicroAlgos, genesisProto config.ConsensusParams) uint64 {
if status != Online {
return 0
}

// If this account had one RewardUnit of microAlgos in round 0, it would
// have perRewardUnit microAlgos at the account's current rewards level.
perRewardUnit := rewardsBase + proto.RewardUnit
perRewardUnit := rewardsBase + genesisProto.RewardUnit

// To normalize, we compute, mathematically,
// u.MicroAlgos / perRewardUnit * proto.RewardUnit, as
// (u.MicroAlgos * proto.RewardUnit) / perRewardUnit.
norm, overflowed := Muldiv(microAlgos.ToUint64(), proto.RewardUnit, perRewardUnit)
norm, overflowed := Muldiv(microAlgos.ToUint64(), genesisProto.RewardUnit, perRewardUnit)

// Mathematically should be impossible to overflow
// because perRewardUnit >= proto.RewardUnit, as long
// as u.RewardBase isn't huge enough to cause overflow..
if overflowed {
logging.Base().Panicf("overflow computing normalized balance %d * %d / (%d + %d)",
microAlgos.ToUint64(), proto.RewardUnit, rewardsBase, proto.RewardUnit)
microAlgos.ToUint64(), genesisProto.RewardUnit, rewardsBase, genesisProto.RewardUnit)
}

return norm
Expand Down
8 changes: 4 additions & 4 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Lo
}
pool.cond.L = &pool.mu
pool.assemblyCond.L = &pool.assemblyMu
pool.recomputeBlockEvaluator(make(map[transactions.Txid]basics.Round), 0)
pool.recomputeBlockEvaluator(nil, 0)
return &pool
}

Expand Down Expand Up @@ -181,7 +181,7 @@ func (pool *TransactionPool) Reset() {
pool.numPendingWholeBlocks = 0
pool.pendingBlockEvaluator = nil
pool.statusCache.reset()
pool.recomputeBlockEvaluator(make(map[transactions.Txid]basics.Round), 0)
pool.recomputeBlockEvaluator(nil, 0)
}

// NumExpired returns the number of transactions that expired at the
Expand Down Expand Up @@ -640,7 +640,7 @@ func (pool *TransactionPool) addToPendingBlockEvaluator(txgroup []transactions.S
// recomputeBlockEvaluator constructs a new BlockEvaluator and feeds all
// in-pool transactions to it (removing any transactions that are rejected
// by the BlockEvaluator). Expects that the pool.mu mutex would be already taken.
func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transactions.Txid]basics.Round, knownCommitted uint) (stats telemetryspec.ProcessBlockMetrics) {
func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transactions.Txid]ledgercore.IncludedTransactions, knownCommitted uint) (stats telemetryspec.ProcessBlockMetrics) {
pool.pendingBlockEvaluator = nil

latest := pool.ledger.Latest()
Expand Down Expand Up @@ -944,7 +944,7 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Valid
defer pool.mu.Unlock()

// drop the current block evaluator and start with a new one.
pool.recomputeBlockEvaluator(make(map[transactions.Txid]basics.Round), 0)
pool.recomputeBlockEvaluator(nil, 0)

// The above was already pregenerating the entire block,
// so there won't be any waiting on this call.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/gen2brain/beeep v0.0.0-20180718162406-4e430518395f
github.com/getkin/kin-openapi v0.22.0
github.com/gofrs/flock v0.7.0
github.com/golang/snappy v0.0.4
github.com/google/go-querystring v1.0.0
github.com/gorilla/mux v1.6.2
github.com/jmoiron/sqlx v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ github.com/godbus/dbus v0.0.0-20181101234600-2ff6f7ffd60f/go.mod h1:/YcGZj5zSblf
github.com/gofrs/flock v0.7.0 h1:pGFUjl501gafK9HBt1VGL1KCOd/YhIooID+xgyJCf3g=
github.com/gofrs/flock v0.7.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
Expand Down
3 changes: 2 additions & 1 deletion installer/config.json.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"Version": 22,
"Version": 23,
"AccountUpdatesStatsInterval": 5000000000,
"AccountsRebuildSynchronousMode": 1,
"AgreementIncomingBundlesQueueLength": 7,
Expand Down Expand Up @@ -64,6 +64,7 @@
"LogArchiveName": "node.archive.log",
"LogSizeLimit": 1073741824,
"MaxAPIResourcesPerAccount": 100000,
"MaxAcctLookback": 8,
"MaxCatchpointDownloadDuration": 7200000000000,
"MaxConnectionsPerIP": 30,
"MinCatchpointFileDownloadBytesPerSecond": 20480,
Expand Down
Loading

0 comments on commit 90b1c05

Please sign in to comment.