Skip to content

Commit

Permalink
catchpoint: store certs with blocks during catchpoint restore (algora…
Browse files Browse the repository at this point in the history
  • Loading branch information
ohill authored Oct 24, 2023
1 parent 19c1bf5 commit 4b824ee
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 29 deletions.
31 changes: 17 additions & 14 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
Expand Down Expand Up @@ -370,6 +371,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error

attemptsCount := 0
var blk *bookkeeping.Block
var cert *agreement.Certificate
// check to see if the current ledger might have this block. If so, we should try this first instead of downloading anything.
if ledgerBlock, err := cs.ledger.Block(blockRound); err == nil {
blk = &ledgerBlock
Expand All @@ -384,7 +386,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -462,7 +464,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling StoreBalancesRound : %v", err))
}

err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk, cert)
if err != nil {
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
// try again.
Expand Down Expand Up @@ -542,6 +544,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
prevBlock := &topBlock
blocksFetched := uint64(1) // we already got the first block in the previous step.
var blk *bookkeeping.Block
var cert *agreement.Certificate
for retryCount := uint64(1); blocksFetched <= lookback; {
if err := cs.ctx.Err(); err != nil {
return cs.stopOrAbort()
Expand All @@ -564,7 +567,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -624,7 +627,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}

// all good, persist and move on.
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk, cert)
if err != nil {
cs.log.Warnf("processStageBlocksDownload failed to store downloaded staging block for round %d", blk.Round())
cs.updateBlockRetrievalStatistics(-1, -1)
Expand All @@ -649,17 +652,17 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
// fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer.
// The method return stop=true if the caller should exit the current operation
// If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed.
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from : %w", err)
return nil, time.Duration(0), psp, true, cs.abort(err)
return nil, nil, time.Duration(0), psp, true, cs.abort(err)
}
peer := psp.Peer

Expand All @@ -669,26 +672,26 @@ func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount ui
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
}
fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config)
blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
blk, cert, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
if err != nil {
if cs.ctx.Err() != nil {
return nil, time.Duration(0), psp, true, cs.stopOrAbort()
return nil, nil, time.Duration(0), psp, true, cs.stopOrAbort()
}
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
}
// success
return blk, downloadDuration, psp, false, nil
return blk, cert, downloadDuration, psp, false, nil
}

// processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality.
Expand Down
5 changes: 3 additions & 2 deletions components/mocks/mockCatchpointCatchupAccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mocks
import (
"context"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
Expand Down Expand Up @@ -86,12 +87,12 @@ func (m *MockCatchpointCatchupAccessor) StoreBalancesRound(ctx context.Context,
}

// StoreFirstBlock stores a single block to the blocks database.
func (m *MockCatchpointCatchupAccessor) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (m *MockCatchpointCatchupAccessor) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
return nil
}

// StoreBlock stores a single block to the blocks database.
func (m *MockCatchpointCatchupAccessor) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (m *MockCatchpointCatchupAccessor) StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
return nil
}

Expand Down
13 changes: 7 additions & 6 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"time"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merkletrie"
Expand Down Expand Up @@ -78,10 +79,10 @@ type CatchpointCatchupAccessor interface {
StoreBalancesRound(ctx context.Context, blk *bookkeeping.Block) (err error)

// StoreFirstBlock stores a single block to the blocks database.
StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error)
StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error)

// StoreBlock stores a single block to the blocks database.
StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error)
StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error)

// FinishBlocks concludes the catchup of the blocks database.
FinishBlocks(ctx context.Context, applyChanges bool) (err error)
Expand Down Expand Up @@ -1055,12 +1056,12 @@ func (c *catchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context,
}

// StoreFirstBlock stores a single block to the blocks database.
func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerStorefirstblockCount.Inc(nil)
err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
return blockdb.BlockStartCatchupStaging(tx, *blk)
return blockdb.BlockStartCatchupStaging(tx, *blk, *cert)
})
ledgerStorefirstblockMicros.AddMicrosecondsSince(start, nil)
if err != nil {
Expand All @@ -1070,12 +1071,12 @@ func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk
}

// StoreBlock stores a single block to the blocks database.
func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerCatchpointStoreblockCount.Inc(nil)
err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
return blockdb.BlockPutStaging(tx, *blk)
return blockdb.BlockPutStaging(tx, *blk, *cert)
})
ledgerCatchpointStoreblockMicros.AddMicrosecondsSince(start, nil)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions ledger/catchupaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
Expand Down Expand Up @@ -162,7 +163,7 @@ func initializeTestCatchupAccessor(t *testing.T, l *Ledger, accountsCount uint64
require.NoError(t, err)

// We do this to initialize the catchpointblocks table. Needed to be able to use CompleteCatchup.
err = catchpointAccessor.StoreFirstBlock(ctx, &bookkeeping.Block{})
err = catchpointAccessor.StoreFirstBlock(ctx, &bookkeeping.Block{}, &agreement.Certificate{})
require.NoError(t, err)

// We do this to initialize the accounttotals table. Needed to be able to use CompleteCatchup.
Expand Down Expand Up @@ -441,6 +442,7 @@ func TestVerifyCatchpoint(t *testing.T) {

// actual testing...
var blk bookkeeping.Block
var cert agreement.Certificate
err = catchpointAccessor.VerifyCatchpoint(ctx, &blk)
require.Error(t, err)

Expand All @@ -455,14 +457,14 @@ func TestVerifyCatchpoint(t *testing.T) {
err = catchpointAccessor.StoreBalancesRound(ctx, &blk)
require.NoError(t, err)
// StoreFirstBlock is a dumb wrapper on some db logic
err = catchpointAccessor.StoreFirstBlock(ctx, &blk)
err = catchpointAccessor.StoreFirstBlock(ctx, &blk, &cert)
require.NoError(t, err)

_, err = catchpointAccessor.EnsureFirstBlock(ctx)
require.NoError(t, err)

blk.BlockHeader.Round++
err = catchpointAccessor.StoreBlock(ctx, &blk)
err = catchpointAccessor.StoreBlock(ctx, &blk, &cert)
require.NoError(t, err)

// TODO: write a case with working no-err
Expand Down
10 changes: 6 additions & 4 deletions ledger/store/blockdb/blockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func BlockForgetBefore(tx *sql.Tx, rnd basics.Round) error {
}

// BlockStartCatchupStaging initializes catchup for catchpoint
func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error {
func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) error {
// delete the old catchpointblocks table, if there is such.
for _, stmt := range blockResetExprs {
stmt = strings.Replace(stmt, "blocks", "catchpointblocks", 1)
Expand All @@ -262,11 +262,12 @@ func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error {
}

// insert the top entry to the blocks table.
_, err := tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata) VALUES (?, ?, ?, ?)",
_, err := tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata, certdata) VALUES (?, ?, ?, ?, ?)",
blk.Round(),
blk.CurrentProtocol,
protocol.Encode(&blk.BlockHeader),
protocol.Encode(&blk),
protocol.Encode(&cert),
)
if err != nil {
return err
Expand Down Expand Up @@ -305,13 +306,14 @@ func BlockAbortCatchup(tx *sql.Tx) error {
}

// BlockPutStaging store a block into catchpoint staging table
func BlockPutStaging(tx *sql.Tx, blk bookkeeping.Block) (err error) {
func BlockPutStaging(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) (err error) {
// insert the new entry
_, err = tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata) VALUES (?, ?, ?, ?)",
_, err = tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata, certdata) VALUES (?, ?, ?, ?, ?)",
blk.Round(),
blk.CurrentProtocol,
protocol.Encode(&blk.BlockHeader),
protocol.Encode(&blk),
protocol.Encode(&cert),
)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions test/e2e-go/features/catchup/catchpointCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func TestBasicCatchpointCatchup(t *testing.T) {

err = fixture.ClientWaitForRoundWithTimeout(usingNodeRestClient, uint64(targetCatchpointRound+1))
a.NoError(err)

// ensure the raw block can be downloaded (including cert)
_, err = usingNodeRestClient.RawBlock(uint64(targetCatchpointRound))
a.NoError(err)
}

func TestCatchpointLabelGeneration(t *testing.T) {
Expand Down

0 comments on commit 4b824ee

Please sign in to comment.