Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer, main, netsync, blockchain: parallel block downloads #2226

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
netsync: refactor handleBlockMsg
handleBlockMsg used to check that the block header is both valid and
then process the blocks as they come in.  It's now refactored so that
it also handles blocks that are not in order.  For out of order block
downloads handleBlockMsg would mark the block as an orphan but it's now
refactored to handle those cases.

Whenever a block that's not the next from the chain tip is received,
it's now temporarily stored in memory until the next block from the
chain tip is received.  And then all the blocks that are in sequence are
processed.
  • Loading branch information
kcalvinalvin committed Dec 9, 2024
commit afe888d705123d97bb54a99517c9c25f279b8ee8
204 changes: 150 additions & 54 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,12 @@ type SyncManager struct {
lastProgressTime time.Time

// The following fields are used for headers-first mode.
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
queuedBlocks map[chainhash.Hash]*blockMsg
queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
Expand Down Expand Up @@ -774,30 +776,10 @@ func (sm *SyncManager) current() bool {
return true
}

// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// processBlock checks if the block connects to the best chain.
func (sm *SyncManager) processBlock(bmsg *blockMsg) (bool, error) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists = state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s -- "+
"disconnecting", blockHash, peer.Addr())
peer.Disconnect()
return
}
}

// When in headers-first mode, if the block matches the hash of the
// first header in the list of headers that are being fetched, it's
Expand All @@ -823,12 +805,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(state.requestedBlocks, *blockHash)
delete(sm.requestedBlocks, *blockHash)

// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
Expand All @@ -853,7 +829,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// send it.
code, reason := mempool.ErrToRejectErr(err)
peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
return
return false, err
}

// Meta-data about the new block this peer is reporting. We use this
Expand Down Expand Up @@ -929,22 +905,136 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

return isCheckpointBlock, nil
}

// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists := state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s."+
"this peer may be a stalling peer -- disconnecting",
blockHash, peer.Addr())
peer.Disconnect()
return
}
}

// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(sm.requestedBlocks, *blockHash)
delete(state.requestedBlocks, *blockHash)

_, err := sm.processBlock(bmsg)
if err != nil {
return
}

// If we are not in headers first mode, it's a good time to periodically
// flush the blockchain cache because we don't expect new blocks immediately.
// After that, there is nothing more to do.
if !sm.headersFirstMode {
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
}
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
}
}

// handleBlockMsgInHeadersFirst handles block messages from all peers when the
// sync manager is in headers first mode. For blocks received out of order, it
// first keeps them in memory and sends them to be processed when the next block
// from the tip is available.
func (sm *SyncManager) handleBlockMsgInHeadersFirst(bmsg *blockMsg) {
blockHash := bmsg.block.Hash()
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
if _, exists := state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s."+
"this peer may be a stalling peer -- disconnecting",
blockHash, peer.Addr())
peer.Disconnect()
return
}
}

// Add the block to the queue.
sm.queuedBlocks[*blockHash] = bmsg
sm.queuedBlocksPrevHash[bmsg.block.MsgBlock().Header.PrevBlock] = *blockHash

// Remove block from the request map. Either chain will know about it
// and so we shouldn't have any more instances of trying to fetch it, we
// keep it in the queued blocks map, or we will fail the insert and thus
// we'll retry next time we get an inv.
delete(sm.requestedBlocks, *blockHash)

// Since we may receive blocks out of order, attempt to find the next block
// and any other descendent blocks that connect to it.
processBlocks := make([]*blockMsg, 0, 1024)

bestHash := sm.chain.BestSnapshot().Hash
for len(sm.queuedBlocks) > 0 {
hash, found := sm.queuedBlocksPrevHash[bestHash]
if !found {
break
}

b, found := sm.queuedBlocks[hash]
if !found {
// Break when we're missing the next block in
// sequence.
break
}

// Append the block to be processed and delete from the
// queue.
delete(sm.queuedBlocks, hash)
delete(sm.queuedBlocksPrevHash, bestHash)
processBlocks = append(processBlocks, b)
bestHash = hash
}

var isCheckpointBlock bool
if len(processBlocks) > 0 {
for _, blockMsg := range processBlocks {
var err error
isCheckpointBlock, err = sm.processBlock(blockMsg)
if err != nil {
return
}
}
}

// This is headers-first mode, so if the block is not a checkpoint
// request more blocks using the header list when the request queue is
// getting short.
if !isCheckpointBlock {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
len(sm.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
}
return
Expand All @@ -959,7 +1049,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
if sm.nextCheckpoint != nil {
locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
err := sm.syncPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
if err != nil {
log.Warnf("Failed to send getheaders message to "+
"peer %s: %v", peer.Addr(), err)
Expand All @@ -978,7 +1068,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
sm.headerList.Init()
log.Infof("Reached the final checkpoint -- switching to normal mode")
locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
err = peer.PushGetBlocksMsg(locator, &zeroHash)
err := peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
log.Warnf("Failed to send getblocks message to peer %s: %v",
peer.Addr(), err)
Expand Down Expand Up @@ -1451,7 +1541,11 @@ out:
msg.reply <- struct{}{}

case *blockMsg:
sm.handleBlockMsg(msg)
if sm.headersFirstMode {
sm.handleBlockMsgInHeadersFirst(msg)
} else {
sm.handleBlockMsg(msg)
}
msg.reply <- struct{}{}

case *invMsg:
Expand Down Expand Up @@ -1789,19 +1883,21 @@ func (sm *SyncManager) Pause() chan<- struct{} {
// block, tx, and inv updates.
func New(config *Config) (*SyncManager, error) {
sm := SyncManager{
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
queuedBlocks: make(map[chainhash.Hash]*blockMsg),
queuedBlocksPrevHash: make(map[chainhash.Hash]chainhash.Hash),
feeEstimator: config.FeeEstimator,
}

best := sm.chain.BestSnapshot()
Expand Down