Skip to content

Commit

Permalink
catchpoint: fix peer ranking (algorand#4535)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Sep 14, 2022
1 parent c03e3d3 commit cbdc68a
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 55 deletions.
87 changes: 46 additions & 41 deletions catchup/catchpointService.go

Large diffs are not rendered by default.

91 changes: 91 additions & 0 deletions catchup/catchpointService_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/components/mocks"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)

type catchpointCatchupLedger struct {
}

func (l *catchpointCatchupLedger) Block(rnd basics.Round) (blk bookkeeping.Block, err error) {
blk = bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
UpgradeState: bookkeeping.UpgradeState{
CurrentProtocol: protocol.ConsensusCurrentVersion,
},
},
}
commitments, err := blk.PaysetCommit()
if err != nil {
return blk, err
}
blk.TxnCommitments = commitments

return blk, nil
}

func (l *catchpointCatchupLedger) GenesisHash() (d crypto.Digest) {
return
}

func (l *catchpointCatchupLedger) BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error) {
return
}

func (l *catchpointCatchupLedger) Latest() (rnd basics.Round) {
return
}

type catchpointCatchupAccessorMock struct {
mocks.MockCatchpointCatchupAccessor
l *catchpointCatchupLedger
}

func (m *catchpointCatchupAccessorMock) GetCatchupBlockRound(ctx context.Context) (round basics.Round, err error) {
return 1, nil
}

func (m *catchpointCatchupAccessorMock) Ledger() (l ledger.CatchupAccessorClientLedger) {
return m.l
}

// TestCatchpointServicePeerRank ensures CatchpointService does not crash when a block fetched
// from the local ledger and not from network when ranking a peer
func TestCatchpointServicePeerRank(t *testing.T) {
partitiontest.PartitionTest(t)

l := catchpointCatchupLedger{}
a := catchpointCatchupAccessorMock{l: &l}
cs := CatchpointCatchupService{ledgerAccessor: &a, ledger: &l}
cs.initDownloadPeerSelector()

err := cs.processStageLatestBlockDownload()
require.NoError(t, err)
}
6 changes: 3 additions & 3 deletions catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ import (
"github.com/algorand/go-algorand/util"
)

var errNoLedgerForRound = errors.New("No ledger available for given round")
var errNoLedgerForRound = errors.New("no ledger available for given round")

const (
// maxCatchpointFileChunkSize is a rough estimate for the worst-case scenario we're going to have of all the accounts data per a single catchpoint file chunk.
maxCatchpointFileChunkSize = ledger.BalancesPerCatchpointFileChunk * basics.MaxEncodedAccountDataSize
// defaultMinCatchpointFileDownloadBytesPerSecond defines the worst-case scenario download speed we expect to get while downloading a catchpoint file
defaultMinCatchpointFileDownloadBytesPerSecond = 20 * 1024
// catchpointFileStreamReadSize defines the number of bytes we would attempt to read at each itration from the incoming http data stream
// catchpointFileStreamReadSize defines the number of bytes we would attempt to read at each iteration from the incoming http data stream
catchpointFileStreamReadSize = 4096
)

Expand Down Expand Up @@ -114,7 +114,7 @@ func (lf *ledgerFetcher) getPeerLedger(ctx context.Context, peer network.HTTPPee
return fmt.Errorf("getPeerLedger error response status code %d", response.StatusCode)
}

// at this point, we've already receieved the response headers. ensure that the
// at this point, we've already received the response headers. ensure that the
// response content type is what we'd like it to be.
contentTypes := response.Header["Content-Type"]
if len(contentTypes) != 1 {
Expand Down
4 changes: 1 addition & 3 deletions catchup/ledgerFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/test/partitiontest"
)

Expand All @@ -44,8 +43,7 @@ func TestNoPeersAvailable(t *testing.T) {
partitiontest.PartitionTest(t)

lf := makeLedgerFetcher(&mocks.MockNetwork{}, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal())
var peer network.Peer
peer = &lf // The peer is an opaque interface.. we can add anything as a Peer.
peer := &lf // The peer is an opaque interface.. we can add anything as a Peer.
err := lf.downloadLedger(context.Background(), peer, basics.Round(0))
require.Equal(t, errNonHTTPPeer, err)
}
Expand Down
5 changes: 5 additions & 0 deletions components/mocks/mockCatchpointCatchupAccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@ func (m *MockCatchpointCatchupAccessor) EnsureFirstBlock(ctx context.Context) (b
func (m *MockCatchpointCatchupAccessor) CompleteCatchup(ctx context.Context) (err error) {
return nil
}

// Ledger returns ledger instance as CatchupAccessorClientLedger interface
func (m *MockCatchpointCatchupAccessor) Ledger() (l ledger.CatchupAccessorClientLedger) {
return nil
}
20 changes: 18 additions & 2 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type CatchpointCatchupAccessor interface {
// CompleteCatchup completes the catchpoint catchup process by switching the databases tables around
// and reloading the ledger.
CompleteCatchup(ctx context.Context) (err error)

// Ledger returns a narrow subset of Ledger methods needed by CatchpointCatchupAccessor clients
Ledger() (l CatchupAccessorClientLedger)
}

// CatchpointCatchupAccessorImpl is the concrete implementation of the CatchpointCatchupAccessor interface
Expand All @@ -111,8 +114,8 @@ const (
CatchpointCatchupStateInactive = iota
// CatchpointCatchupStateLedgerDownload indicates that we're downloading the ledger
CatchpointCatchupStateLedgerDownload
// CatchpointCatchupStateLastestBlockDownload indicates that we're download the latest block
CatchpointCatchupStateLastestBlockDownload
// CatchpointCatchupStateLatestBlockDownload indicates that we're download the latest block
CatchpointCatchupStateLatestBlockDownload
// CatchpointCatchupStateBlocksDownload indicates that we're downloading the blocks prior to the latest one ( total of CatchpointLookback blocks )
CatchpointCatchupStateBlocksDownload
// CatchpointCatchupStateSwitch indicates that we're switching to use the downloaded ledger/blocks content
Expand All @@ -122,6 +125,14 @@ const (
catchpointCatchupStateLast = CatchpointCatchupStateSwitch
)

// CatchupAccessorClientLedger represents ledger interface needed for catchpoint accessor clients
type CatchupAccessorClientLedger interface {
Block(rnd basics.Round) (blk bookkeeping.Block, err error)
GenesisHash() crypto.Digest
BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error)
Latest() (rnd basics.Round)
}

// MakeCatchpointCatchupAccessor creates a CatchpointCatchupAccessor given a ledger
func MakeCatchpointCatchupAccessor(ledger *Ledger, log logging.Logger) CatchpointCatchupAccessor {
return &CatchpointCatchupAccessorImpl{
Expand Down Expand Up @@ -974,6 +985,11 @@ func (c *CatchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err
return err
}

// Ledger returns ledger instance as CatchupAccessorClientLedger interface
func (c *CatchpointCatchupAccessorImpl) Ledger() (l CatchupAccessorClientLedger) {
return c.ledger
}

var ledgerResetstagingbalancesCount = metrics.NewCounter("ledger_catchup_resetstagingbalances_count", "calls")
var ledgerResetstagingbalancesMicros = metrics.NewCounter("ledger_catchup_resetstagingbalances_micros", "µs spent")
var ledgerProcessstagingcontentCount = metrics.NewCounter("ledger_catchup_processstagingcontent_count", "calls")
Expand Down
2 changes: 1 addition & 1 deletion ledger/catchupaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestCatchupAccessorFoo(t *testing.T) {
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLedgerDownload)
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLastestBlockDownload)
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLatestBlockDownload)
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateBlocksDownload)
require.NoError(t, err, "catchpointAccessor.SetState")
Expand Down
13 changes: 8 additions & 5 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
return nil, err
}
if catchpointCatchupState != ledger.CatchpointCatchupStateInactive {
node.catchpointCatchupService, err = catchup.MakeResumedCatchpointCatchupService(context.Background(), node, node.log, node.net, node.ledger.Ledger, node.config)
accessor := ledger.MakeCatchpointCatchupAccessor(node.ledger.Ledger, node.log)
node.catchpointCatchupService, err = catchup.MakeResumedCatchpointCatchupService(context.Background(), node, node.log, node.net, accessor, node.config)
if err != nil {
log.Errorf("unable to create catchpoint catchup service: %v", err)
return nil, err
}
node.log.Infof("resuming catchpoint catchup from state %d", catchpointCatchupState)
}

node.tracer = messagetracer.NewTracer(log).Init(cfg)
Expand Down Expand Up @@ -1117,7 +1119,8 @@ func (node *AlgorandFullNode) StartCatchup(catchpoint string) error {
return MakeCatchpointUnableToStartError(stats.CatchpointLabel, catchpoint)
}
var err error
node.catchpointCatchupService, err = catchup.MakeNewCatchpointCatchupService(catchpoint, node, node.log, node.net, node.ledger.Ledger, node.config)
accessor := ledger.MakeCatchpointCatchupAccessor(node.ledger.Ledger, node.log)
node.catchpointCatchupService, err = catchup.MakeNewCatchpointCatchupService(catchpoint, node, node.log, node.net, accessor, node.config)
if err != nil {
node.log.Warnf("unable to create catchpoint catchup service : %v", err)
return err
Expand All @@ -1144,12 +1147,12 @@ func (node *AlgorandFullNode) AbortCatchup(catchpoint string) error {
}

// SetCatchpointCatchupMode change the node's operational mode from catchpoint catchup mode and back, it returns a
// channel which contains the updated node context. This function need to work asyncronisly so that the caller could
// detect and handle the usecase where the node is being shut down while we're switching to/from catchup mode without
// channel which contains the updated node context. This function need to work asynchronously so that the caller could
// detect and handle the use case where the node is being shut down while we're switching to/from catchup mode without
// deadlocking on the shared node mutex.
func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode bool) (outCtxCh <-chan context.Context) {
// create a non-buffered channel to return the newly created context. The fact that it's non-buffered here
// is imporant, as it allows us to syncronize the "receiving" of the new context before canceling of the previous
// is important, as it allows us to synchronize the "receiving" of the new context before canceling of the previous
// one.
ctxCh := make(chan context.Context)
outCtxCh = ctxCh
Expand Down

0 comments on commit cbdc68a

Please sign in to comment.