From cbdc68ad329d46441bc4947f27d3f6622ffa6248 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Wed, 14 Sep 2022 10:52:59 -0400 Subject: [PATCH] catchpoint: fix peer ranking (#4535) --- catchup/catchpointService.go | 87 +++++++++--------- catchup/catchpointService_test.go | 91 +++++++++++++++++++ catchup/ledgerFetcher.go | 6 +- catchup/ledgerFetcher_test.go | 4 +- .../mocks/mockCatchpointCatchupAccessor.go | 5 + ledger/catchupaccessor.go | 20 +++- ledger/catchupaccessor_test.go | 2 +- node/node.go | 13 ++- 8 files changed, 173 insertions(+), 55 deletions(-) create mode 100644 catchup/catchpointService_test.go diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go index 5e86404e2e..c55b3ea8d1 100644 --- a/catchup/catchpointService.go +++ b/catchup/catchpointService.go @@ -41,7 +41,7 @@ const ( noPeersAvailableSleepInterval = 50 * time.Millisecond ) -// CatchpointCatchupNodeServices defines the extenal node support needed +// CatchpointCatchupNodeServices defines the external node support needed // for the catchpoint service to switch the node between "regular" operational mode and catchup mode. type CatchpointCatchupNodeServices interface { SetCatchpointCatchupMode(bool) (newContextCh <-chan context.Context) @@ -65,10 +65,10 @@ type CatchpointCatchupStats struct { type CatchpointCatchupService struct { // stats is the statistics object, updated async while downloading the ledger stats CatchpointCatchupStats - // statsMu syncronizes access to stats, as we could attempt to update it while querying for it's current state + // statsMu synchronizes access to stats, as we could attempt to update it while querying for it's current state statsMu deadlock.Mutex node CatchpointCatchupNodeServices - // ctx is the node cancelation context, used when the node is being stopped. + // ctx is the node cancellation context, used when the node is being stopped. ctx context.Context cancelCtxFunc context.CancelFunc // running is a waitgroup counting the running goroutine(1), and allow us to exit cleanly. @@ -79,17 +79,17 @@ type CatchpointCatchupService struct { stage ledger.CatchpointCatchupState // log is the logger object log logging.Logger - // newService indicates whether this service was created after the node was running ( i.e. true ) or the node just started to find that it was previously perfoming catchup + // newService indicates whether this service was created after the node was running ( i.e. true ) or the node just started to find that it was previously performing catchup newService bool - // net is the underlaying network module + // net is the underlying network module net network.GossipNode // ledger points to the ledger object - ledger *ledger.Ledger + ledger ledger.CatchupAccessorClientLedger // lastBlockHeader is the latest block we have before going into catchpoint catchup mode. We use it to serve the node status requests instead of going to the ledger. lastBlockHeader bookkeeping.BlockHeader // config is a copy of the node configuration config config.Local - // abortCtx used as a syncronized flag to let us know when the user asked us to abort the catchpoint catchup process. note that it's not being used when we decided to abort + // abortCtx used as a synchronized flag to let us know when the user asked us to abort the catchpoint catchup process. note that it's not being used when we decided to abort // the catchup due to an internal issue ( such as exceeding number of retries ) abortCtx context.Context abortCtxFunc context.CancelFunc @@ -98,19 +98,20 @@ type CatchpointCatchupService struct { } // MakeResumedCatchpointCatchupService creates a catchpoint catchup service for a node that is already in catchpoint catchup mode -func MakeResumedCatchpointCatchupService(ctx context.Context, node CatchpointCatchupNodeServices, log logging.Logger, net network.GossipNode, l *ledger.Ledger, cfg config.Local) (service *CatchpointCatchupService, err error) { +func MakeResumedCatchpointCatchupService(ctx context.Context, node CatchpointCatchupNodeServices, log logging.Logger, net network.GossipNode, accessor ledger.CatchpointCatchupAccessor, cfg config.Local) (service *CatchpointCatchupService, err error) { service = &CatchpointCatchupService{ stats: CatchpointCatchupStats{ StartTime: time.Now(), }, node: node, - ledgerAccessor: ledger.MakeCatchpointCatchupAccessor(l, log), + ledgerAccessor: accessor, log: log, newService: false, net: net, - ledger: l, + ledger: accessor.Ledger(), config: cfg, } + l := accessor.Ledger() service.lastBlockHeader, err = l.BlockHdr(l.Latest()) if err != nil { return nil, err @@ -124,7 +125,7 @@ func MakeResumedCatchpointCatchupService(ctx context.Context, node CatchpointCat } // MakeNewCatchpointCatchupService creates a new catchpoint catchup service for a node that is not in catchpoint catchup mode -func MakeNewCatchpointCatchupService(catchpoint string, node CatchpointCatchupNodeServices, log logging.Logger, net network.GossipNode, l *ledger.Ledger, cfg config.Local) (service *CatchpointCatchupService, err error) { +func MakeNewCatchpointCatchupService(catchpoint string, node CatchpointCatchupNodeServices, log logging.Logger, net network.GossipNode, accessor ledger.CatchpointCatchupAccessor, cfg config.Local) (service *CatchpointCatchupService, err error) { if catchpoint == "" { return nil, fmt.Errorf("MakeNewCatchpointCatchupService: catchpoint is invalid") } @@ -134,14 +135,15 @@ func MakeNewCatchpointCatchupService(catchpoint string, node CatchpointCatchupNo StartTime: time.Now(), }, node: node, - ledgerAccessor: ledger.MakeCatchpointCatchupAccessor(l, log), + ledgerAccessor: accessor, stage: ledger.CatchpointCatchupStateInactive, log: log, newService: true, net: net, - ledger: l, + ledger: accessor.Ledger(), config: cfg, } + l := accessor.Ledger() service.lastBlockHeader, err = l.BlockHdr(l.Latest()) if err != nil { return nil, err @@ -162,7 +164,7 @@ func (cs *CatchpointCatchupService) Start(ctx context.Context) { func (cs *CatchpointCatchupService) Abort() { // In order to abort the catchpoint catchup process, we need to first set the flag of abortCtxFunc, and follow that by canceling the main context. // The order of these calls is crucial : The various stages are blocked on the main context. When that one expires, it uses the abort context to determine - // if the cancelation meaning that we want to shut down the process, or aborting the catchpoint catchup completly. + // if the cancellation meaning that we want to shut down the process, or aborting the catchpoint catchup completely. cs.abortCtxFunc() cs.cancelCtxFunc() } @@ -200,8 +202,8 @@ func (cs *CatchpointCatchupService) run() { err = cs.processStageInactive() case ledger.CatchpointCatchupStateLedgerDownload: err = cs.processStageLedgerDownload() - case ledger.CatchpointCatchupStateLastestBlockDownload: - err = cs.processStageLastestBlockDownload() + case ledger.CatchpointCatchupStateLatestBlockDownload: + err = cs.processStageLatestBlockDownload() case ledger.CatchpointCatchupStateBlocksDownload: err = cs.processStageBlocksDownload() case ledger.CatchpointCatchupStateSwitch: @@ -258,7 +260,7 @@ func (cs *CatchpointCatchupService) processStageInactive() (err error) { return cs.abort(fmt.Errorf("processStageInactive failed to update stage : %v", err)) } if cs.newService { - // we need to let the node know that it should shut down all the unneed services to avoid clashes. + // we need to let the node know that it should shut down all the unneeded services to avoid clashes. cs.updateNodeCatchupMode(true) } return nil @@ -272,7 +274,7 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { round, _, err0 := ledgercore.ParseCatchpointLabel(label) if err0 != nil { - return cs.abort(fmt.Errorf("processStageLedgerDownload failed to patse label : %v", err0)) + return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err0)) } // download balances file. @@ -326,9 +328,9 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) { cs.log.Warnf("unable to download ledger : %v", err) } - err = cs.updateStage(ledger.CatchpointCatchupStateLastestBlockDownload) + err = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload) if err != nil { - return cs.abort(fmt.Errorf("processStageLedgerDownload failed to update stage to CatchpointCatchupStateLastestBlockDownload : %v", err)) + return cs.abort(fmt.Errorf("processStageLedgerDownload failed to update stage to CatchpointCatchupStateLatestBlockDownload : %v", err)) } return nil } @@ -342,11 +344,11 @@ func (cs *CatchpointCatchupService) updateVerifiedAccounts(addedTrieHashes uint6 } } -// processStageLastestBlockDownload is the third catchpoint catchup stage. It downloads the latest block and verify that against the previously downloaded ledger. -func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err error) { +// processStageLatestBlockDownload is the third catchpoint catchup stage. It downloads the latest block and verify that against the previously downloaded ledger. +func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error) { blockRound, err := cs.ledgerAccessor.GetCatchupBlockRound(cs.ctx) if err != nil { - return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed to retrieve catchup block round : %v", err)) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed to retrieve catchup block round : %v", err)) } attemptsCount := 0 @@ -375,7 +377,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro // check block protocol version support. if protoParams, ok = config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok { - cs.log.Warnf("processStageLastestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol) + cs.log.Warnf("processStageLatestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol) if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. @@ -383,24 +385,24 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } - return cs.abort(fmt.Errorf("processStageLastestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol)) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload: unsupported protocol version detected: '%v'", blk.BlockHeader.CurrentProtocol)) } - // We need to compare explicitly the genesis hash since we're not doing any block validation. This would ensure the genesis.json file matches the block that we've receieved. + // We need to compare explicitly the genesis hash since we're not doing any block validation. This would ensure the genesis.json file matches the block that we've received. if protoParams.SupportGenesisHash && blk.GenesisHash() != cs.ledger.GenesisHash() { - cs.log.Warnf("processStageLastestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash()) + cs.log.Warnf("processStageLatestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash()) if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } - return cs.abort(fmt.Errorf("processStageLastestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash())) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload: genesis hash mismatches : genesis hash on genesis.json file is %v while genesis hash of downloaded block is %v", cs.ledger.GenesisHash(), blk.GenesisHash())) } // check to see that the block header and the block payset aligns if !blk.ContentsMatchHeader() { - cs.log.Warnf("processStageLastestBlockDownload: downloaded block content does not match downloaded block header") + cs.log.Warnf("processStageLatestBlockDownload: downloaded block content does not match downloaded block header") if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. @@ -408,7 +410,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } - return cs.abort(fmt.Errorf("processStageLastestBlockDownload: downloaded block content does not match downloaded block header")) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload: downloaded block content does not match downloaded block header")) } // verify that the catchpoint is valid. @@ -420,15 +422,18 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. blk = nil - cs.log.Infof("processStageLastestBlockDownload: block %d verification against catchpoint failed, another attempt will be made; err = %v", blockRound, err) + cs.log.Infof("processStageLatestBlockDownload: block %d verification against catchpoint failed, another attempt will be made; err = %v", blockRound, err) cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) continue } - return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed when calling VerifyCatchpoint : %v", err)) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling VerifyCatchpoint : %v", err)) + } + if psp != nil { + // give a rank to the download, as the download was successful. + // if the block might have been retrieved from the local ledger, nothing to rank + peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank) } - // give a rank to the download, as the download was successful. - peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) - cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank) err = cs.ledgerAccessor.StoreBalancesRound(cs.ctx, blk) if err != nil { @@ -437,7 +442,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro blk = nil continue } - return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed when calling StoreBalancesRound : %v", err)) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling StoreBalancesRound : %v", err)) } err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk) @@ -447,7 +452,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro blk = nil continue } - return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed when calling StoreFirstBlock : %v", err)) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling StoreFirstBlock : %v", err)) } err = cs.updateStage(ledger.CatchpointCatchupStateBlocksDownload) @@ -457,7 +462,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro blk = nil continue } - return cs.abort(fmt.Errorf("processStageLastestBlockDownload failed to update stage : %v", err)) + return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed to update stage : %v", err)) } // great ! everything is ready for next stage. @@ -466,7 +471,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro return nil } -// lookbackForStateproofsSupport calculates the lookback (from topblock round) needed to be downloaded +// lookbackForStateproofsSupport calculates the lookback (from topBlock round) needed to be downloaded // in order to support state proofs verification. func lookbackForStateproofsSupport(topBlock *bookkeeping.Block) uint64 { proto := config.Consensus[topBlock.CurrentProtocol] @@ -764,10 +769,10 @@ func (cs *CatchpointCatchupService) GetStatistics() (out CatchpointCatchupStats) } // updateBlockRetrievalStatistics updates the blocks retrieval statistics by applying the provided deltas -func (cs *CatchpointCatchupService) updateBlockRetrievalStatistics(aquiredBlocksDelta, verifiedBlocksDelta int64) { +func (cs *CatchpointCatchupService) updateBlockRetrievalStatistics(acquiredBlocksDelta, verifiedBlocksDelta int64) { cs.statsMu.Lock() defer cs.statsMu.Unlock() - cs.stats.AcquiredBlocks = uint64(int64(cs.stats.AcquiredBlocks) + aquiredBlocksDelta) + cs.stats.AcquiredBlocks = uint64(int64(cs.stats.AcquiredBlocks) + acquiredBlocksDelta) cs.stats.VerifiedBlocks = uint64(int64(cs.stats.VerifiedBlocks) + verifiedBlocksDelta) } diff --git a/catchup/catchpointService_test.go b/catchup/catchpointService_test.go new file mode 100644 index 0000000000..02f4a9b7a3 --- /dev/null +++ b/catchup/catchpointService_test.go @@ -0,0 +1,91 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package catchup + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/components/mocks" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" +) + +type catchpointCatchupLedger struct { +} + +func (l *catchpointCatchupLedger) Block(rnd basics.Round) (blk bookkeeping.Block, err error) { + blk = bookkeeping.Block{ + BlockHeader: bookkeeping.BlockHeader{ + UpgradeState: bookkeeping.UpgradeState{ + CurrentProtocol: protocol.ConsensusCurrentVersion, + }, + }, + } + commitments, err := blk.PaysetCommit() + if err != nil { + return blk, err + } + blk.TxnCommitments = commitments + + return blk, nil +} + +func (l *catchpointCatchupLedger) GenesisHash() (d crypto.Digest) { + return +} + +func (l *catchpointCatchupLedger) BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error) { + return +} + +func (l *catchpointCatchupLedger) Latest() (rnd basics.Round) { + return +} + +type catchpointCatchupAccessorMock struct { + mocks.MockCatchpointCatchupAccessor + l *catchpointCatchupLedger +} + +func (m *catchpointCatchupAccessorMock) GetCatchupBlockRound(ctx context.Context) (round basics.Round, err error) { + return 1, nil +} + +func (m *catchpointCatchupAccessorMock) Ledger() (l ledger.CatchupAccessorClientLedger) { + return m.l +} + +// TestCatchpointServicePeerRank ensures CatchpointService does not crash when a block fetched +// from the local ledger and not from network when ranking a peer +func TestCatchpointServicePeerRank(t *testing.T) { + partitiontest.PartitionTest(t) + + l := catchpointCatchupLedger{} + a := catchpointCatchupAccessorMock{l: &l} + cs := CatchpointCatchupService{ledgerAccessor: &a, ledger: &l} + cs.initDownloadPeerSelector() + + err := cs.processStageLatestBlockDownload() + require.NoError(t, err) +} diff --git a/catchup/ledgerFetcher.go b/catchup/ledgerFetcher.go index afc39414dc..fa965a1543 100644 --- a/catchup/ledgerFetcher.go +++ b/catchup/ledgerFetcher.go @@ -36,14 +36,14 @@ import ( "github.com/algorand/go-algorand/util" ) -var errNoLedgerForRound = errors.New("No ledger available for given round") +var errNoLedgerForRound = errors.New("no ledger available for given round") const ( // maxCatchpointFileChunkSize is a rough estimate for the worst-case scenario we're going to have of all the accounts data per a single catchpoint file chunk. maxCatchpointFileChunkSize = ledger.BalancesPerCatchpointFileChunk * basics.MaxEncodedAccountDataSize // defaultMinCatchpointFileDownloadBytesPerSecond defines the worst-case scenario download speed we expect to get while downloading a catchpoint file defaultMinCatchpointFileDownloadBytesPerSecond = 20 * 1024 - // catchpointFileStreamReadSize defines the number of bytes we would attempt to read at each itration from the incoming http data stream + // catchpointFileStreamReadSize defines the number of bytes we would attempt to read at each iteration from the incoming http data stream catchpointFileStreamReadSize = 4096 ) @@ -114,7 +114,7 @@ func (lf *ledgerFetcher) getPeerLedger(ctx context.Context, peer network.HTTPPee return fmt.Errorf("getPeerLedger error response status code %d", response.StatusCode) } - // at this point, we've already receieved the response headers. ensure that the + // at this point, we've already received the response headers. ensure that the // response content type is what we'd like it to be. contentTypes := response.Header["Content-Type"] if len(contentTypes) != 1 { diff --git a/catchup/ledgerFetcher_test.go b/catchup/ledgerFetcher_test.go index 637064c97d..4cb57d7fd3 100644 --- a/catchup/ledgerFetcher_test.go +++ b/catchup/ledgerFetcher_test.go @@ -30,7 +30,6 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -44,8 +43,7 @@ func TestNoPeersAvailable(t *testing.T) { partitiontest.PartitionTest(t) lf := makeLedgerFetcher(&mocks.MockNetwork{}, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal()) - var peer network.Peer - peer = &lf // The peer is an opaque interface.. we can add anything as a Peer. + peer := &lf // The peer is an opaque interface.. we can add anything as a Peer. err := lf.downloadLedger(context.Background(), peer, basics.Round(0)) require.Equal(t, errNonHTTPPeer, err) } diff --git a/components/mocks/mockCatchpointCatchupAccessor.go b/components/mocks/mockCatchpointCatchupAccessor.go index bd55d29e9b..c92113d70d 100644 --- a/components/mocks/mockCatchpointCatchupAccessor.go +++ b/components/mocks/mockCatchpointCatchupAccessor.go @@ -103,3 +103,8 @@ func (m *MockCatchpointCatchupAccessor) EnsureFirstBlock(ctx context.Context) (b func (m *MockCatchpointCatchupAccessor) CompleteCatchup(ctx context.Context) (err error) { return nil } + +// Ledger returns ledger instance as CatchupAccessorClientLedger interface +func (m *MockCatchpointCatchupAccessor) Ledger() (l ledger.CatchupAccessorClientLedger) { + return nil +} diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index bff4453f4a..3c2f6aceee 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -86,6 +86,9 @@ type CatchpointCatchupAccessor interface { // CompleteCatchup completes the catchpoint catchup process by switching the databases tables around // and reloading the ledger. CompleteCatchup(ctx context.Context) (err error) + + // Ledger returns a narrow subset of Ledger methods needed by CatchpointCatchupAccessor clients + Ledger() (l CatchupAccessorClientLedger) } // CatchpointCatchupAccessorImpl is the concrete implementation of the CatchpointCatchupAccessor interface @@ -111,8 +114,8 @@ const ( CatchpointCatchupStateInactive = iota // CatchpointCatchupStateLedgerDownload indicates that we're downloading the ledger CatchpointCatchupStateLedgerDownload - // CatchpointCatchupStateLastestBlockDownload indicates that we're download the latest block - CatchpointCatchupStateLastestBlockDownload + // CatchpointCatchupStateLatestBlockDownload indicates that we're download the latest block + CatchpointCatchupStateLatestBlockDownload // CatchpointCatchupStateBlocksDownload indicates that we're downloading the blocks prior to the latest one ( total of CatchpointLookback blocks ) CatchpointCatchupStateBlocksDownload // CatchpointCatchupStateSwitch indicates that we're switching to use the downloaded ledger/blocks content @@ -122,6 +125,14 @@ const ( catchpointCatchupStateLast = CatchpointCatchupStateSwitch ) +// CatchupAccessorClientLedger represents ledger interface needed for catchpoint accessor clients +type CatchupAccessorClientLedger interface { + Block(rnd basics.Round) (blk bookkeeping.Block, err error) + GenesisHash() crypto.Digest + BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error) + Latest() (rnd basics.Round) +} + // MakeCatchpointCatchupAccessor creates a CatchpointCatchupAccessor given a ledger func MakeCatchpointCatchupAccessor(ledger *Ledger, log logging.Logger) CatchpointCatchupAccessor { return &CatchpointCatchupAccessorImpl{ @@ -974,6 +985,11 @@ func (c *CatchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err return err } +// Ledger returns ledger instance as CatchupAccessorClientLedger interface +func (c *CatchpointCatchupAccessorImpl) Ledger() (l CatchupAccessorClientLedger) { + return c.ledger +} + var ledgerResetstagingbalancesCount = metrics.NewCounter("ledger_catchup_resetstagingbalances_count", "calls") var ledgerResetstagingbalancesMicros = metrics.NewCounter("ledger_catchup_resetstagingbalances_micros", "µs spent") var ledgerProcessstagingcontentCount = metrics.NewCounter("ledger_catchup_processstagingcontent_count", "calls") diff --git a/ledger/catchupaccessor_test.go b/ledger/catchupaccessor_test.go index f76d586d9b..50a8d9b571 100644 --- a/ledger/catchupaccessor_test.go +++ b/ledger/catchupaccessor_test.go @@ -166,7 +166,7 @@ func TestCatchupAccessorFoo(t *testing.T) { require.NoError(t, err, "catchpointAccessor.SetState") err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLedgerDownload) require.NoError(t, err, "catchpointAccessor.SetState") - err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLastestBlockDownload) + err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLatestBlockDownload) require.NoError(t, err, "catchpointAccessor.SetState") err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateBlocksDownload) require.NoError(t, err, "catchpointAccessor.SetState") diff --git a/node/node.go b/node/node.go index 97f5c3f0d8..5e0ec29fbe 100644 --- a/node/node.go +++ b/node/node.go @@ -297,11 +297,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd return nil, err } if catchpointCatchupState != ledger.CatchpointCatchupStateInactive { - node.catchpointCatchupService, err = catchup.MakeResumedCatchpointCatchupService(context.Background(), node, node.log, node.net, node.ledger.Ledger, node.config) + accessor := ledger.MakeCatchpointCatchupAccessor(node.ledger.Ledger, node.log) + node.catchpointCatchupService, err = catchup.MakeResumedCatchpointCatchupService(context.Background(), node, node.log, node.net, accessor, node.config) if err != nil { log.Errorf("unable to create catchpoint catchup service: %v", err) return nil, err } + node.log.Infof("resuming catchpoint catchup from state %d", catchpointCatchupState) } node.tracer = messagetracer.NewTracer(log).Init(cfg) @@ -1117,7 +1119,8 @@ func (node *AlgorandFullNode) StartCatchup(catchpoint string) error { return MakeCatchpointUnableToStartError(stats.CatchpointLabel, catchpoint) } var err error - node.catchpointCatchupService, err = catchup.MakeNewCatchpointCatchupService(catchpoint, node, node.log, node.net, node.ledger.Ledger, node.config) + accessor := ledger.MakeCatchpointCatchupAccessor(node.ledger.Ledger, node.log) + node.catchpointCatchupService, err = catchup.MakeNewCatchpointCatchupService(catchpoint, node, node.log, node.net, accessor, node.config) if err != nil { node.log.Warnf("unable to create catchpoint catchup service : %v", err) return err @@ -1144,12 +1147,12 @@ func (node *AlgorandFullNode) AbortCatchup(catchpoint string) error { } // SetCatchpointCatchupMode change the node's operational mode from catchpoint catchup mode and back, it returns a -// channel which contains the updated node context. This function need to work asyncronisly so that the caller could -// detect and handle the usecase where the node is being shut down while we're switching to/from catchup mode without +// channel which contains the updated node context. This function need to work asynchronously so that the caller could +// detect and handle the use case where the node is being shut down while we're switching to/from catchup mode without // deadlocking on the shared node mutex. func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode bool) (outCtxCh <-chan context.Context) { // create a non-buffered channel to return the newly created context. The fact that it's non-buffered here - // is imporant, as it allows us to syncronize the "receiving" of the new context before canceling of the previous + // is important, as it allows us to synchronize the "receiving" of the new context before canceling of the previous // one. ctxCh := make(chan context.Context) outCtxCh = ctxCh